当前位置: 移动技术网 > IT编程>开发语言>.net > (九)分布式服务----Zookeeper注册中心

(九)分布式服务----Zookeeper注册中心

2019年10月06日  | 移动技术网IT编程  | 我要评论

宝宝什么时候吃dha,好梦学车,爹爹卫珊儿在书房嗯啊

 

 

首先看一下几种注册中心:

最老的就是zookeeper了, 比较新的有eureka,consul 都可以做注册中心。可以自行搜索对比三者的优缺点。

zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了。

几年前大数据很火的时候,只要学hadoop必学zookeeper,当然还有其他成员。

大数据简单说就是分布式,比如分布式文件存储hdfs,分布式数据库hbase,分布式协调zookeeper,还有kafka,flume等等都是hadoop大家族。

zookeeper,现在更多被用来做注册中心,比如阿里的开源soa框架dubbo就经常搭配zookeeper做注册中心。

eureka:java的微服务框架spring cloud中内部已经集成了eureka注册中心。

我选择zookeeper,不是因为他比另外两个强,而是因为我几年前就已经学习过一些zookeeper的原理,上手更容易。网络上学习书籍、资料、视频教程也特别多,学习资料完善。

 

注册中心的基本功能:

1. 注册服务,有点类似dns,所有的服务注册到注册中心,包含服务的地址等信息。

2. 服务订阅,客户端请求服务,注册中心就要把那些能用的服务器地址告诉客户端,服务端有变动时,注册中心也能及时通知到客户端。

3. 性能好且高可用,注册中心自身也是一个集群,如果只有一个注册中心机器的话那岂不是把注册中心累死啊,而且他一旦坏了以后,那客户端都找不到服务器了。所有注册中心就有很多台,其中只有一个老大(leader),老大用来写,小弟用来读。就是说老大来决定一台服务器能不能注册进来,小弟负责帮助客户端查找服务器。因为注册服务的次数是很少的,通常有新服务器加入才需要注册,但是客户端订阅那就很多了,所以注册中心只有一个leader。leader万一坏掉的话,会从小弟中选举出一个来当老大接替工作。

 

上面提到说zookeeper集群,就是说有很多台机器做zookeeper机器,但是这些机器里存储的东西基本上都是一样的,就是说客户端不管连到哪个zookeeper 都是一样的,能做服务订阅。

每一个zookeeper 中都有很多节点(znode)。

接下来说的zookeeper节点和集群完全不是一回事。 有些人喜欢吧集群中的每一台zookeeper机器称为一个节点,但是这个节点(zookeeper机器)和我说的节点(znode)完全不是一回事。

如下图:

 

 本例的图中可以看到,一共有5台机器,每台机器都有5个znode,znode下面的子节点就更多了。

先看5台机器:

一台leader,老大,上文已经介绍,服务都从这些注册写入。

两台follower,小弟,平时用于服务订阅,老大挂掉以后,follower内部就会自行选出老大。

两台observer,观察者,就是属于无业游民,只能看,没有选老大的资格,不能参与竞选也不能投票,唯一的功能就是服务订阅。

  observer模式需要手动开启,为什么会出现observer呢,是因为机器太多的话,每个机器都有选举权的话特别影响性能。全中国14亿人口,每个人都参与国家竞选的话,效率极低。所以呢,选举的工作就交给follower完成就行了,只需要确保一直都有leader接班人就好。

 

再看看zookeeper有什么基本功能:

基本功能很简单,组合以后却可以完成各种复杂工作。

1. 可以创建:临时节点(断开连接时便删除节点) 和 持久化节点(必须手动删除节点)。

2. 可以创建:无序节点 和 有序节点。

3. 节点上可以添加watcher监听功能,监听该节点的增删改,然后触发自定义的事件。

 

看看这些功能怎么用:

1. 节点: 每次注册一个服务就创建一个节点,节点的名称(key)就是服务的名称,服务的详细信息存储在节点value中,客户端通过key找到对应的节点,再找打节点中的value。

2. 临时节点:服务端注册一个服务时创建一个临时节点,服务断开时,临时节点自动销毁,自动完成服务注销。

3. watcher监听: 客户端在注册中心订阅了一个服务的时候,同时在这个服务所在的节点上加一个监听事件,每当服务节点信息有变化的时候,注册中心会自动回调通知客户端。

4. 有序临时节点:分布式锁或者分布式队列(这里与服务注册无关),客户端1想要操作一条数据的时候,在a节点下创建一个有序临时节点,自动分配编号001;客户端1也要操作该数据的时候,在a节点下也创建一个有序临时节点,自动分配编号002。只有编号最小的子节点才会被执行,因此001节点会被执行,客户端1执行完毕后,自动删除001节点,此时002编号为最小子节点。即锁的概念,不能同时操作同一数据;也可以做队列,按照先后顺序依次执行。

5. 有序临时节点+watcher监听: 上面第4条中说到每次执行编号最小的节点,因此需要有一个程序,每次都需要遍历全部节点,然后找出最小的节点,假如是002节点,这时客户端2开始执行。但是添加监听机制以后就不一样了,002监听001,003监听比他小一号的002,这样001销毁的同时通知002开始执行,002销毁的时候通知003开始执行,不需要遍历最小节点,也能有序依次执行。

6. 临时节点+watcher监听: 集群master选举以及高可用。比如hadoop集群,也有一个resourcemanager资源管理器,负责调度其它节点机器,相当于hadoop集群的leader节点。这个leader就可以交由zookeeper管理,所有的hadoop机器同时在zookeeper中创建一个同名的临时节点,由于是同名互斥的节点,因此只有一个节点能被创建,成功创建这个节点的hadoop机器就是leader。同时添加watcher监听,这个leader只要断开连接,临时节点自动销毁,触发监听,其它hadoop开始新一轮的master选举。这也是zookeeper最初在hadoop家族中的重要使命。

7....... 还要很多地方都能用zookeeper,简直无所不能,而且自身也是高可用,高性能,牛x

 

zookeeper本身的操作还是很简单的,无非就是节点的增删改查,可以选择要创建节点的类型,还有就是在节点上添加watcher监听器。就这些。

 

文件结构:

 

上代码:

zookeeper客户端管理类:

public class zookeeperclientprovider
    {
        private configinfo _config;
        private readonly ilogger<zookeeperclientprovider> _logger;
        private readonly dictionary<string, zookeeper> _zookeeperclients = new dictionary<string, zookeeper>();

        public zookeeperclientprovider(configinfo config, ilogger<zookeeperclientprovider> logger)
        {
            _config = config;
            _logger = logger;
        }

        public async task<zookeeper> getzookeeper()
        {
            return await createzookeeper(_config.addresses.firstordefault());
        }
        public async task<zookeeper> createzookeeper(string address)
        {
            if (!_zookeeperclients.trygetvalue(address, out zookeeper result))
            {
                await task.run(() =>
                {
                    result = new zookeeper(address, (int)_config.sessiontimeout.totalmilliseconds,
                        new reconnectionwatcher(
                            async () =>
                            {
                                if (_zookeeperclients.remove(address, out zookeeper value))
                                {
                                    await value.closeasync();
                                }
                                await createzookeeper(address);
                            }));
                    _zookeeperclients.tryadd(address, result);
                });
            }
            return result;
        }

        public async task<ienumerable<zookeeper>> getzookeepers()
        {
            var result = new list<zookeeper>();
            foreach (var address in _config.addresses)
            {
                result.add(await createzookeeper(address));
            }
            return result;
        }
    }

zookeeper服务注册类:

/// <summary>
    /// 一个抽象的服务路由发现者。
    /// </summary>
    public interface iserviceroutemanager
    {

        /// <summary>
        /// 服务路由被创建。
        /// </summary>
        event eventhandler<servicerouteeventargs> created;

        /// <summary>
        /// 服务路由被删除。
        /// </summary>
        event eventhandler<servicerouteeventargs> removed;

        /// <summary>
        /// 服务路由被修改。
        /// </summary>
        event eventhandler<serviceroutechangedeventargs> changed;

        /// <summary>
        /// 获取所有可用的服务路由信息。
        /// </summary>
        /// <returns>服务路由集合。</returns>
        task<ienumerable<serviceroute>> getroutesasync();

        /// <summary>
        /// 设置服务路由。
        /// </summary>
        /// <param name="routes">服务路由集合。</param>
        /// <returns>一个任务。</returns>
        task setroutesasync(ienumerable<serviceroute> routes);

        /// <summary>
        /// 移除地址列表
        /// </summary>
        /// <param name="routes">地址列表。</param>
        /// <returns>一个任务。</returns>
        task remveaddressasync(ienumerable<string> address);
        /// <summary>
        /// 清空所有的服务路由。
        /// </summary>
        /// <returns>一个任务。</returns>
        task clearasync();
    }

    /// <summary>
    /// 服务路由事件参数。
    /// </summary>
    public class servicerouteeventargs
    {
        public servicerouteeventargs(serviceroute route)
        {
            route = route;
        }

        /// <summary>
        /// 服务路由信息。
        /// </summary>
        public serviceroute route { get; private set; }
    }

    /// <summary>
    /// 服务路由变更事件参数。
    /// </summary>
    public class serviceroutechangedeventargs : servicerouteeventargs
    {
        public serviceroutechangedeventargs(serviceroute route, serviceroute oldroute) : base(route)
        {
            oldroute = oldroute;
        }

        /// <summary>
        /// 旧的服务路由信息。
        /// </summary>
        public serviceroute oldroute { get; set; }
    }
public class zookeeperserviceroutemanager : iserviceroutemanager, idisposable
    {
        private readonly configinfo _configinfo;
        private readonly iserializer<byte[]> _serializer;
        private readonly ilogger<zookeeperserviceroutemanager> _logger;
        private serviceroute[] _routes;
        private readonly zookeeperclientprovider _zookeeperclientprovider;

        public zookeeperserviceroutemanager(configinfo configinfo, iserializer<byte[]> serializer,
            iserializer<string> stringserializer,
            ilogger<zookeeperserviceroutemanager> logger,
            zookeeperclientprovider zookeeperclientprovider)
        {
            _configinfo = configinfo;
            _serializer = serializer;
            _logger = logger;
            _zookeeperclientprovider = zookeeperclientprovider;
            enterroutes().wait();
        }

        private eventhandler<servicerouteeventargs> _created;
        private eventhandler<servicerouteeventargs> _removed;
        private eventhandler<serviceroutechangedeventargs> _changed;

        /// <summary>
        /// 服务路由被创建。
        /// </summary>
        public event eventhandler<servicerouteeventargs> created
        {
            add { _created += value; }
            remove { _created -= value; }
        }

        /// <summary>
        /// 服务路由被删除。
        /// </summary>
        public event eventhandler<servicerouteeventargs> removed
        {
            add { _removed += value; }
            remove { _removed -= value; }
        }

        /// <summary>
        /// 服务路由被修改。
        /// </summary>
        public event eventhandler<serviceroutechangedeventargs> changed
        {
            add { _changed += value; }
            remove { _changed -= value; }
        }



        protected void oncreated(params servicerouteeventargs[] args)
        {
            if (_created == null)
                return;

            foreach (var arg in args)
                _created(this, arg);
        }

        protected void onchanged(params serviceroutechangedeventargs[] args)
        {
            if (_changed == null)
                return;

            foreach (var arg in args)
                _changed(this, arg);
        }

        protected void onremoved(params servicerouteeventargs[] args)
        {
            if (_removed == null)
                return;

            foreach (var arg in args)
                _removed(this, arg);
        }


        /// <summary>
        /// 获取所有可用的服务路由信息。
        /// </summary>
        /// <returns>服务路由集合。</returns>
        public async task<ienumerable<serviceroute>> getroutesasync()
        {
            await enterroutes();
            return _routes;
        }

        /// <summary>
        /// 清空所有的服务路由。
        /// </summary>
        /// <returns>一个任务。</returns>
        public async task clearasync()
        {
            if (_logger.isenabled(loglevel.information))
                _logger.loginformation("准备清空所有路由配置。");
            var zookeepers = await _zookeeperclientprovider.getzookeepers();
            foreach (var zookeeper in zookeepers)
            {
                var path = _configinfo.routepath;
                var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);

                var index = 0;
                while (childrens.count() > 1)
                {
                    var nodepath = "/" + string.join("/", childrens);

                    if (await zookeeper.existsasync(nodepath) != null)
                    {
                        var result = await zookeeper.getchildrenasync(nodepath);
                        if (result?.children != null)
                        {
                            foreach (var child in result.children)
                            {
                                var childpath = $"{nodepath}/{child}";
                                if (_logger.isenabled(loglevel.debug))
                                    _logger.logdebug($"准备删除:{childpath}。");
                                await zookeeper.deleteasync(childpath);
                            }
                        }
                        if (_logger.isenabled(loglevel.debug))
                            _logger.logdebug($"准备删除:{nodepath}。");
                        await zookeeper.deleteasync(nodepath);
                    }
                    index++;
                    childrens = childrens.take(childrens.length - index).toarray();
                }
                if (_logger.isenabled(loglevel.information))
                    _logger.loginformation("路由配置清空完成。");
            }
        }

        /// <summary>
        /// 设置服务路由。
        /// </summary>
        /// <param name="routes">服务路由集合。</param>
        /// <returns>一个任务。</returns>
        public async task setroutesasync(ienumerable<serviceroute> routes)
        {
            var hostaddr = netutils.gethostaddress();
            var serviceroutes = await getroutes(routes.select(p => p.serviceroutedescriptor.id));
            if (serviceroutes.count() > 0)
            {
                foreach (var route in routes)
                {
                    var serviceroute = serviceroutes.where(p => p.serviceroutedescriptor.id == route.serviceroutedescriptor.id).firstordefault();
                    if (serviceroute != null)
                    {
                        var addresses = serviceroute.address.concat(
                          route.address.except(serviceroute.address)).tolist();

                        foreach (var address in route.address)
                        {
                            addresses.remove(addresses.where(p => p.tostring() == address.tostring()).firstordefault());
                            addresses.add(address);
                        }
                        route.address = addresses;
                    }
                }
            }
            await removeexceptroutesasync(routes, hostaddr);

            if (_logger.isenabled(loglevel.information))
                _logger.loginformation("准备添加服务路由。");
            var zookeepers = await _zookeeperclientprovider.getzookeepers();
            foreach (var zookeeper in zookeepers)
            {
                await createsubdirectory(zookeeper, _configinfo.routepath);

                var path = _configinfo.routepath;
                if (!path.endswith("/"))
                    path += "/";

                routes = routes.toarray();

                foreach (var serviceroute in routes)
                {
                    var nodepath = $"{path}{serviceroute.serviceroutedescriptor.id}";
                    var nodedata = _serializer.serialize(serviceroute);
                    if (await zookeeper.existsasync(nodepath) == null)
                    {
                        if (_logger.isenabled(loglevel.debug))
                            _logger.logdebug($"节点:{nodepath}不存在将进行创建。");

                        await zookeeper.createasync(nodepath, nodedata, zoodefs.ids.open_acl_unsafe, createmode.persistent);
                    }
                    else
                    {
                        if (_logger.isenabled(loglevel.debug))
                            _logger.logdebug($"将更新节点:{nodepath}的数据。");

                        var onlinedata = (await zookeeper.getdataasync(nodepath)).data;
                        if (!dataequals(nodedata, onlinedata))
                            await zookeeper.setdataasync(nodepath, nodedata);
                    }
                }
                if (_logger.isenabled(loglevel.information))
                    _logger.loginformation("服务路由添加成功。");
            }
        }

        public async task remveaddressasync(ienumerable<string> address)
        {
            var routes = await getroutesasync();
            foreach (var route in routes)
            {
                route.address = route.address.except(address);
            }
            await setroutesasync(routes);
        }

        private async task removeexceptroutesasync(ienumerable<serviceroute> routes, string hostaddr)
        {
            var path = _configinfo.routepath;
            if (!path.endswith("/"))
                path += "/";
            routes = routes.toarray();
            var zookeepers = await _zookeeperclientprovider.getzookeepers();
            foreach (var zookeeper in zookeepers)
            {
                if (_routes != null)
                {
                    var oldrouteids = _routes.select(i => i.serviceroutedescriptor.id).toarray();
                    var newrouteids = routes.select(i => i.serviceroutedescriptor.id).toarray();
                    var deletedrouteids = oldrouteids.except(newrouteids).toarray();
                    foreach (var deletedrouteid in deletedrouteids)
                    {
                        var addresses = _routes.where(p => p.serviceroutedescriptor.id == deletedrouteid).select(p => p.address).firstordefault();
                        if (addresses.contains(hostaddr))
                        {
                            var nodepath = $"{path}{deletedrouteid}";
                            await zookeeper.deleteasync(nodepath);
                        }
                    }
                }
            }
        }

        private async task createsubdirectory(zookeeper zookeeper, string path)
        {
            if (await zookeeper.existsasync(path) != null)
                return;

            if (_logger.isenabled(loglevel.information))
                _logger.loginformation($"节点{path}不存在,将进行创建。");

            var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);
            var nodepath = "/";

            foreach (var children in childrens)
            {
                nodepath += children;
                if (await zookeeper.existsasync(nodepath) == null)
                {
                    await zookeeper.createasync(nodepath, null, zoodefs.ids.open_acl_unsafe, createmode.persistent);
                }
                nodepath += "/";
            }
        }

        private async task<serviceroute> getroute(byte[] data)
        {
            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"准备转换服务路由,配置内容:{encoding.utf8.getstring(data)}。");

            if (data == null)
                return null;

            return await task.run(() =>
            {
                return _serializer.deserialize<serviceroute>(data);
            });
        }

        private async task<serviceroute> getroute(string path)
        {
            serviceroute result = null;
            var zookeeper = await getzookeeper();
            var watcher = new nodemonitorwatcher(getzookeeper(), path,
                 async (olddata, newdata) => await nodechange(olddata, newdata));
            if (await zookeeper.existsasync(path) != null)
            {
                var data = (await zookeeper.getdataasync(path, watcher)).data;
                watcher.setcurrentdata(data);
                result = await getroute(data);
            }
            return result;
        }

        private async task<serviceroute[]> getroutes(ienumerable<string> childrens)
        {
            var rootpath = _configinfo.routepath;
            if (!rootpath.endswith("/"))
                rootpath += "/";

            childrens = childrens.toarray();
            var routes = new list<serviceroute>(childrens.count());

            foreach (var children in childrens)
            {
                if (_logger.isenabled(loglevel.debug))
                    _logger.logdebug($"准备从节点:{children}中获取路由信息。");

                var nodepath = $"{rootpath}{children}";
                var route = await getroute(nodepath);
                if (route != null)
                    routes.add(route);
            }

            return routes.toarray();
        }

        private async task enterroutes()
        {
            if (_routes != null)
                return;
            var zookeeper = await getzookeeper();
            var watcher = new childrenmonitorwatcher(getzookeeper(), _configinfo.routepath,
             async (oldchildrens, newchildrens) => await childrenchange(oldchildrens, newchildrens));
            if (await zookeeper.existsasync(_configinfo.routepath, watcher) != null)
            {
                var result = await zookeeper.getchildrenasync(_configinfo.routepath, watcher);
                var childrens = result.children.toarray();
                watcher.setcurrentdata(childrens);
                _routes = await getroutes(childrens);
            }
            else
            {
                if (_logger.isenabled(loglevel.warning))
                    _logger.logwarning($"无法获取路由信息,因为节点:{_configinfo.routepath},不存在。");
                _routes = new serviceroute[0];
            }
        }

        private static bool dataequals(ireadonlylist<byte> data1, ireadonlylist<byte> data2)
        {
            if (data1.count != data2.count)
                return false;
            for (var i = 0; i < data1.count; i++)
            {
                var b1 = data1[i];
                var b2 = data2[i];
                if (b1 != b2)
                    return false;
            }
            return true;
        }

        public async task nodechange(byte[] olddata, byte[] newdata)
        {
            if (dataequals(olddata, newdata))
                return;

            var newroute = await getroute(newdata);
            //得到旧的路由。
            var oldroute = _routes.firstordefault(i => i.serviceroutedescriptor.id == newroute.serviceroutedescriptor.id);

            lock (_routes)
            {
                //删除旧路由,并添加上新的路由。
                _routes =
                    _routes
                        .where(i => i.serviceroutedescriptor.id != newroute.serviceroutedescriptor.id)
                        .concat(new[] { newroute }).toarray();
            }

            //触发路由变更事件。
            onchanged(new serviceroutechangedeventargs(newroute, oldroute));
        }

        public async task childrenchange(string[] oldchildrens, string[] newchildrens)
        {
            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"最新的节点信息:{string.join(",", newchildrens)}");

            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"旧的节点信息:{string.join(",", oldchildrens)}");

            //计算出已被删除的节点。
            var deletedchildrens = oldchildrens.except(newchildrens).toarray();
            //计算出新增的节点。
            var createdchildrens = newchildrens.except(oldchildrens).toarray();

            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"需要被删除的路由节点:{string.join(",", deletedchildrens)}");
            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"需要被添加的路由节点:{string.join(",", createdchildrens)}");

            //获取新增的路由信息。
            var newroutes = (await getroutes(createdchildrens)).toarray();

            var routes = _routes.toarray();
            lock (_routes)
            {
                _routes = _routes
                    //删除无效的节点路由。
                    .where(i => !deletedchildrens.contains(i.serviceroutedescriptor.id))
                    //连接上新的路由。
                    .concat(newroutes)
                    .toarray();
            }
            //需要删除的路由集合。
            var deletedroutes = routes.where(i => deletedchildrens.contains(i.serviceroutedescriptor.id)).toarray();
            //触发删除事件。
            onremoved(deletedroutes.select(route => new servicerouteeventargs(route)).toarray());

            //触发路由被创建事件。
            oncreated(newroutes.select(route => new servicerouteeventargs(route)).toarray());

            if (_logger.isenabled(loglevel.information))
                _logger.loginformation("路由数据更新成功。");
        }


        /// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void dispose()
        {
        }

        private async task<zookeeper> getzookeeper()
        {
            return await _zookeeperclientprovider.getzookeeper();
        }

    }

zookeeper连接配置类:

public class configinfo
    {
        /// <summary>
        /// 初始化会话超时为20秒的zookeeper配置信息。
        /// </summary>
        /// <param name="connectionstring">连接字符串。</param>
        /// <param name="routepath">路由配置路径。</param>
        /// <param name="subscriberpath">订阅者配置路径</param>
        /// <param name="commandpath">服务命令配置路径</param>
        /// <param name="cachepath">缓存中心配置路径</param>
        /// <param name="mqttroutepath">mqtt路由配置路径</param>
        /// <param name="chroot">根节点。</param>
        public configinfo(string connectionstring, string routepath = "/services/serviceroutes",
            string subscriberpath = "/services/servicesubscribers",
            string commandpath = "/services/servicecommands",
            string cachepath = "/services/servicecaches",
            string mqttroutepath = "/services/mqttserviceroutes",
            string chroot = null,
            bool reloadonchange = false, bool enablechildrenmonitor = false) : this(connectionstring,
                timespan.fromseconds(20),
                routepath,
                subscriberpath,
                commandpath,
                cachepath,
                mqttroutepath,
                chroot,
                reloadonchange, enablechildrenmonitor)
        {
        }

        /// <summary>
        /// 初始化zookeeper配置信息。
        /// </summary>
        /// <param name="connectionstring">连接字符串。</param>
        /// <param name="routepath">路由配置路径。</param>
        /// <param name="commandpath">服务命令配置路径</param>
        /// <param name="subscriberpath">订阅者配置路径</param>
        /// <param name="sessiontimeout">会话超时时间。</param>
        /// <param name="cachepath">缓存中心配置路径</param>
        /// <param name="mqttroutepath">mqtt路由配置路径</param>
        /// <param name="chroot">根节点。</param>
        public configinfo(string connectionstring, timespan sessiontimeout, string routepath = "/services/serviceroutes",
            string subscriberpath = "/services/servicesubscribers",
            string commandpath = "/services/servicecommands",
            string cachepath = "/services/servicecaches",
            string mqttroutepath = "/services/mqttserviceroutes",
            string chroot = null,
            bool reloadonchange = false, bool enablechildrenmonitor = false)
        {
            cachepath = cachepath;
            reloadonchange = reloadonchange;
            chroot = chroot;
            commandpath = commandpath;
            subscriberpath = subscriberpath;
            connectionstring = connectionstring;
            routepath = routepath;
            sessiontimeout = sessiontimeout;
            mqttroutepath = mqttroutepath;
            enablechildrenmonitor = enablechildrenmonitor;
            addresses = connectionstring?.split(",");
        }

        public bool enablechildrenmonitor { get; set; }

        public bool reloadonchange { get; set; }

        /// <summary>
        /// 连接字符串。
        /// </summary>
        public string connectionstring { get; set; }

        /// <summary>
        /// 命令配置路径
        /// </summary>
        public string commandpath { get; set; }

        /// <summary>
        /// 路由配置路径。
        /// </summary>
        public string routepath { get; set; }

        /// <summary>
        /// 订阅者配置路径
        /// </summary>
        public string subscriberpath { get; set; }

        /// <summary>
        /// 会话超时时间。
        /// </summary>
        public timespan sessiontimeout { get; set; }

        /// <summary>
        /// 根节点。
        /// </summary>
        public string chroot { get; set; }


        public ienumerable<string> addresses { get; set; }

        /// <summary>
        /// 缓存中心配置中心
        /// </summary>
        public string cachepath { get; set; }


        /// <summary>
        /// mqtt路由配置路径。
        /// </summary>
        public string mqttroutepath { get; set; }
    }

路由和路由描述:

public class serviceroute
    {
        /// <summary>
        /// 服务可用地址。
        /// </summary>
        public ienumerable<string> address { get; set; }
        /// <summary>
        /// 服务描述符。
        /// </summary>
        public serviceroutedescriptor serviceroutedescriptor { get; set; }

        #region equality members

        /// <summary>determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">the object to compare with the current object. </param>
        public override bool equals(object obj)
        {
            var model = obj as serviceroute;
            if (model == null)
                return false;

            if (obj.gettype() != gettype())
                return false;

            if (model.serviceroutedescriptor != serviceroutedescriptor)
                return false;

            return model.address.count() == address.count() && model.address.all(addressmodel => address.contains(addressmodel));
        }

        /// <summary>serves as the default hash function. </summary>
        /// <returns>a hash code for the current object.</returns>
        public override int gethashcode()
        {
            return tostring().gethashcode();
        }

        public static bool operator ==(serviceroute model1, serviceroute model2)
        {
            return equals(model1, model2);
        }

        public static bool operator !=(serviceroute model1, serviceroute model2)
        {
            return !equals(model1, model2);
        }

        #endregion equality members
    }
/// <summary>
    /// 服务描述符。
    /// </summary>
    [serializable]
    public class serviceroutedescriptor
    {
        /// <summary>
        /// 初始化一个新的服务描述符。
        /// </summary>
        public serviceroutedescriptor()
        {
            metadatas = new dictionary<string, object>(stringcomparer.ordinalignorecase);
        }

        /// <summary>
        /// 服务id。
        /// </summary>
        public string id { get; set; }

        /// <summary>
        /// 访问的令牌
        /// </summary>
        public string token { get; set; }

        /// <summary>
        /// 路由
        /// </summary>
        public string routepath { get; set; }

        /// <summary>
        /// 元数据。
        /// </summary> 
        public idictionary<string, object> metadatas { get; set; }

        /// <summary>
        /// 获取一个元数据。
        /// </summary>
        /// <typeparam name="t">元数据类型。</typeparam>
        /// <param name="name">元数据名称。</param>
        /// <param name="def">如果指定名称的元数据不存在则返回这个参数。</param>
        /// <returns>元数据值。</returns>
        public t getmetadata<t>(string name, t def = default(t))
        {
            if (!metadatas.containskey(name))
                return def;

            return (t)metadatas[name];
        }

        #region equality members

        /// <summary>determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">the object to compare with the current object. </param>
        public override bool equals(object obj)
        {
            var model = obj as serviceroutedescriptor;
            if (model == null)
                return false;

            if (obj.gettype() != gettype())
                return false;

            if (model.id != id)
                return false;

            return model.metadatas.count == metadatas.count && model.metadatas.all(metadata =>
            {
                object value;
                if (!metadatas.trygetvalue(metadata.key, out value))
                    return false;

                if (metadata.value == null && value == null)
                    return true;
                if (metadata.value == null || value == null)
                    return false;

                return metadata.value.equals(value);
            });
        }

        /// <summary>serves as the default hash function. </summary>
        /// <returns>a hash code for the current object.</returns>
        public override int gethashcode()
        {
            return tostring().gethashcode();
        }

        public static bool operator ==(serviceroutedescriptor model1, serviceroutedescriptor model2)
        {
            return equals(model1, model2);
        }

        public static bool operator !=(serviceroutedescriptor model1, serviceroutedescriptor model2)
        {
            return !equals(model1, model2);
        }

        #endregion equality members
    }

watcher监听器:

子节点监听器:

internal class childrenmonitorwatcher : watcher
    {
        private readonly task<zookeeper> _zookeepercall;
        private readonly string _path;
        private readonly action<string[], string[]> _action;
        private string[] _currentdata = new string[0];

        public childrenmonitorwatcher(task<zookeeper> zookeepercall, string path, action<string[], string[]> action)
        {
            _zookeepercall = zookeepercall;
            _path = path;
            _action = action;
        }

        public childrenmonitorwatcher setcurrentdata(string[] currentdata)
        {
            _currentdata = currentdata ?? new string[0];

            return this;
        }

        #region overrides of watcherbase

        public override async task process(watchedevent watchedevent)
        {
            if (watchedevent.getstate() != event.keeperstate.syncconnected || watchedevent.getpath() != _path)
                return;
            var zookeeper = await _zookeepercall;
            //func<childrenmonitorwatcher> getwatcher = () => new childrenmonitorwatcher(_zookeepercall, path, _action);
            task<childrenmonitorwatcher> getwatcher =  task.run(() => {return new childrenmonitorwatcher(_zookeepercall, _path, _action); });
            switch (watchedevent.get_type())
            {
                //创建之后开始监视下面的子节点情况。
                case event.eventtype.nodecreated:
                    await zookeeper.getchildrenasync(_path, await getwatcher);
                    break;

                //子节点修改则继续监控子节点信息并通知客户端数据变更。
                case event.eventtype.nodechildrenchanged:
                    try
                    {
                        var watcher = await getwatcher;
                        var result = await zookeeper.getchildrenasync(_path, watcher);
                        var childrens = result.children.toarray();
                        _action(_currentdata, childrens);
                        watcher.setcurrentdata(childrens);
                    }
                    catch (keeperexception.nonodeexception)
                    {
                        _action(_currentdata, new string[0]);
                    }
                    break;

                //删除之后开始监控自身节点,并通知客户端数据被清空。
                case event.eventtype.nodedeleted:
                    {
                        var watcher = await getwatcher;
                        await zookeeper.existsasync(_path, watcher);
                        _action(_currentdata, new string[0]);
                        watcher.setcurrentdata(new string[0]);
                    }
                    break;
            }
        }
        #endregion overrides of watcherbase
    }

当前节点监听器:

internal class nodemonitorwatcher : watcher
    {
        private readonly task<zookeeper> _zookeepercall;
        private readonly string _path;
        private readonly action<byte[], byte[]> _action;
        private byte[] _currentdata;

        public nodemonitorwatcher(task<zookeeper> zookeepercall, string path, action<byte[], byte[]> action)
        {
            _zookeepercall = zookeepercall;
            _path = path;
            _action = action;
        }

        public nodemonitorwatcher setcurrentdata(byte[] currentdata)
        {
            _currentdata = currentdata;

            return this;
        }

        #region overrides of watcherbase

        public override async task process(watchedevent watchedevent)
        {
            switch (watchedevent.get_type())
            {
                case event.eventtype.nodedatachanged:
                    var zookeeper = await _zookeepercall;
                    var watcher = new nodemonitorwatcher(_zookeepercall, _path, _action);
                    var data = await zookeeper.getdataasync(_path, watcher);
                    var newdata = data.data;
                    _action(_currentdata, newdata);
                    watcher.setcurrentdata(newdata);
                    break;
            }
        }

        #endregion overrides of watcherbase
    }

连接断开监听器:

internal class reconnectionwatcher : watcher
    {
        private readonly action _reconnection;

        public reconnectionwatcher(action reconnection)
        {
            _reconnection = reconnection;
        }

        #region overrides of watcher

        /// <summary>processes the specified event.</summary>
        /// <param name="watchedevent">the event.</param>
        /// <returns></returns>
        public override async task process(watchedevent watchedevent)
        {
            var state = watchedevent.getstate();
            switch (state)
            {
                case event.keeperstate.expired:
                case event.keeperstate.disconnected:
                    {
                        _reconnection();
                        break;
                    }
            }
            await task.completedtask;
        }

        #endregion overrides of watcher
    }

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网