当前位置: 移动技术网 > IT编程>开发语言>Java > zookeeper【3】服务发现

zookeeper【3】服务发现

2018年11月28日  | 移动技术网IT编程  | 我要评论

服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

实现代码如下:

import com.alibaba.fastjson.json;
import org.i0itec.zkclient.izkdatalistener;
import org.i0itec.zkclient.zkclient;
import org.i0itec.zkclient.exception.zknonodeexception;

/**
 * 代表工作服务器
 */
public class workserver {

    private zkclient zkclient;
    // zookeeper
    private string configpath;
    // zookeeper集群中servers节点的路径
    private string serverspath;
    // 当前工作服务器的基本信息
    private serverdata serverdata;
    // 当前工作服务器的配置信息
    private serverconfig serverconfig;
    private izkdatalistener datalistener;

    public workserver(string configpath, string serverspath,
                      serverdata serverdata, zkclient zkclient, serverconfig initconfig) {
        this.zkclient = zkclient;
        this.serverspath = serverspath;
        this.configpath = configpath;
        this.serverconfig = initconfig;
        this.serverdata = serverdata;

        this.datalistener = new izkdatalistener() {

            public void handledatadeleted(string datapath) throws exception {

            }

            public void handledatachange(string datapath, object data)
                    throws exception {
                string retjson = new string((byte[])data);
                serverconfig serverconfiglocal = (serverconfig) json.parseobject(retjson,serverconfig.class);
                updateconfig(serverconfiglocal);
                system.out.println("new work server config is:"+serverconfig.tostring());

            }
        };

    }

    // 启动服务器
    public void start() {
        system.out.println("work server start...");
        initrunning();
    }

    // 停止服务器
    public void stop() {
        system.out.println("work server stop...");
        zkclient.unsubscribedatachanges(configpath, datalistener); // 取消监听config节点
    }

    // 服务器初始化
    private void initrunning() {
        registme(); // 注册自己
        zkclient.subscribedatachanges(configpath, datalistener); // 订阅config节点的改变事件
    }

    // 启动时向zookeeper注册自己的注册函数
    private void registme() {
        string mepath = serverspath.concat("/").concat(serverdata.getaddress());

        try {
            zkclient.createephemeral(mepath, json.tojsonstring(serverdata)
                    .getbytes());
        } catch (zknonodeexception e) {
            zkclient.createpersistent(serverspath, true);
            registme();
        }
    }

    // 更新自己的配置信息
    private void updateconfig(serverconfig serverconfig) {
        this.serverconfig = serverconfig;
    }

}
/**
 * 调度类
 */
public class subscribezkclient {

    private static final int  client_qty = 5; // work server数量

    private static final string  zookeeper_server = "192.168.1.105:2181";

    private static final string  config_path = "/config";
    private static final string  command_path = "/command";
    private static final string  servers_path = "/servers";

    public static void main(string[] args) throws exception {

        list<zkclient> clients = new arraylist<zkclient>();
        list<workserver>  workservers = new arraylist<workserver>();
        manageserver manageserver = null;

        try {

            // 创建一个默认的配置
            serverconfig initconfig = new serverconfig();
            initconfig.setdbpwd("123456");
            initconfig.setdburl("jdbc:mysql://localhost:3306/mydb");
            initconfig.setdbuser("root");

            // 实例化一个manage server
            zkclient clientmanage = new zkclient(zookeeper_server, 5000, 5000, new bytespushthroughserializer());
            manageserver = new manageserver(servers_path, command_path,config_path,clientmanage,initconfig);
            manageserver.start(); // 启动manage server

            // 创建指定个数的工作服务器
            for ( int i = 0; i < client_qty; ++i ) {
                zkclient client = new zkclient(zookeeper_server, 5000, 5000, new bytespushthroughserializer());
                clients.add(client);
                serverdata serverdata = new serverdata();
                serverdata.setid(i);
                serverdata.setname("workserver#"+i);
                serverdata.setaddress("192.168.1."+i);

                workserver  workserver = new workserver(config_path, servers_path, serverdata, client, initconfig);
                workservers.add(workserver);
                workserver.start();    // 启动工作服务器

            }

            system.out.println("敲回车键退出!\n");
            new bufferedreader(new inputstreamreader(system.in)).readline();

        } finally {
            system.out.println("shutting down...");

            for ( workserver workserver : workservers ) {
                try {
                    workserver.stop();
                } catch (exception e) {
                    e.printstacktrace();
                }
            }

            for ( zkclient client : clients ) {
                try {
                    client.close();
                } catch (exception e) {
                    e.printstacktrace();
                }

            }
        }
    }

}
/**
 * 服务器基本信息
 */
public class serverdata {

    private string address;
    private integer id;
    private string name;

    public string getaddress() {
        return address;
    }
    public void setaddress(string address) {
        this.address = address;
    }
    public integer getid() {
        return id;
    }
    public void setid(integer id) {
        this.id = id;
    }
    public string getname() {
        return name;
    }
    public void setname(string name) {
        this.name = name;
    }

    @override
    public string tostring() {
        return "serverdata [address=" + address + ", id=" + id + ", name="
                + name + "]";
    }

}
/**
 * 配置信息
 */
public class serverconfig {

    private string dburl;
    private string dbpwd;
    private string dbuser;
    public string getdburl() {
        return dburl;
    }
    public void setdburl(string dburl) {
        this.dburl = dburl;
    }
    public string getdbpwd() {
        return dbpwd;
    }
    public void setdbpwd(string dbpwd) {
        this.dbpwd = dbpwd;
    }
    public string getdbuser() {
        return dbuser;
    }
    public void setdbuser(string dbuser) {
        this.dbuser = dbuser;
    }

    @override
    public string tostring() {
        return "serverconfig [dburl=" + dburl + ", dbpwd=" + dbpwd
                + ", dbuser=" + dbuser + "]";
    }

}
import com.alibaba.fastjson.json;
import org.i0itec.zkclient.izkchildlistener;
import org.i0itec.zkclient.izkdatalistener;
import org.i0itec.zkclient.zkclient;
import org.i0itec.zkclient.exception.zknonodeexception;
import org.i0itec.zkclient.exception.zknodeexistsexception;

import java.util.list;

public class manageserver {

    // zookeeper的servers节点路径
    private string serverspath;
    // zookeeper的command节点路径
    private string commandpath;
    // zookeeper的config节点路径
    private string configpath;
    private zkclient zkclient;
    private serverconfig config;
    // 用于监听servers节点的子节点列表的变化
    private izkchildlistener childlistener;
    // 用于监听command节点数据内容的变化
    private izkdatalistener datalistener;
    // 工作服务器的列表
    private list<string> workserverlist;

    public manageserver(string serverspath, string commandpath,
                        string configpath, zkclient zkclient, serverconfig config) {
        this.serverspath = serverspath;
        this.commandpath = commandpath;
        this.zkclient = zkclient;
        this.config = config;
        this.configpath = configpath;
        this.childlistener = new izkchildlistener() {

            public void handlechildchange(string parentpath,
                                          list<string> currentchilds) throws exception {
                // todo auto-generated method stub
                workserverlist = currentchilds; // 更新内存中工作服务器列表

                system.out.println("work server list changed, new list is ");
                execlist();

            }
        };
        this.datalistener = new izkdatalistener() {

            public void handledatadeleted(string datapath) throws exception {
                // todo auto-generated method stub
                // ignore;
            }

            public void handledatachange(string datapath, object data)
                    throws exception {
                // todo auto-generated method stub
                string cmd = new string((byte[]) data);
                system.out.println("cmd:"+cmd);
                execmd(cmd); // 执行命令

            }
        };

    }

    private void initrunning() {
        zkclient.subscribedatachanges(commandpath, datalistener);
        zkclient.subscribechildchanges(serverspath, childlistener);
    }

    /*
     * 1: list 2: create 3: modify
     */
    private void execmd(string cmdtype) {
        if ("list".equals(cmdtype)) {
            execlist();

        } else if ("create".equals(cmdtype)) {
            execcreate();
        } else if ("modify".equals(cmdtype)) {
            execmodify();
        } else {
            system.out.println("error command!" + cmdtype);
        }

    }

    // 列出工作服务器列表
    private void execlist() {
        system.out.println(workserverlist.tostring());
    }

    // 创建config节点
    private void execcreate() {
        if (!zkclient.exists(configpath)) {
            try {
                zkclient.createpersistent(configpath, json.tojsonstring(config)
                        .getbytes());
            } catch (zknodeexistsexception e) {
                zkclient.writedata(configpath, json.tojsonstring(config)
                        .getbytes()); // config节点已经存在,则写入内容就可以了
            } catch (zknonodeexception e) {
                string parentdir = configpath.substring(0,
                        configpath.lastindexof('/'));
                zkclient.createpersistent(parentdir, true);
                execcreate();
            }
        }
    }

    // 修改config节点内容
    private void execmodify() {
        // 我们随意修改config的一个属性就可以了
        config.setdbuser(config.getdbuser() + "_modify");

        try {
            zkclient.writedata(configpath, json.tojsonstring(config).getbytes());
        } catch (zknonodeexception e) {
            execcreate(); // 写入时config节点还未存在,则创建它
        }
    }

    // 启动工作服务器
    public void start() {
        initrunning();
    }

    // 停止工作服务器
    public void stop() {
        zkclient.unsubscribechildchanges(serverspath, childlistener);
        zkclient.unsubscribedatachanges(commandpath, datalistener);
    }

}

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网