当前位置: 移动技术网 > IT编程>开发语言>Java > Netty + ZooKeeper 实现简单的服务注册与发现

Netty + ZooKeeper 实现简单的服务注册与发现

2019年07月19日  | 移动技术网IT编程  | 我要评论

一. 背景

最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。

二. netty 的使用

在接收到派单任务之后,通过 netty 推送到指定门店相关的设备。在我们的系统中 netty 实现了消息推送、长连接以及心跳机制。

2.1 netty server 端:

每个 netty 服务端通过 concurrenthashmap 保存了客户端的 clientid 以及它连接的 socketchannel。

服务器端向客户端发送消息时,只要获取 clientid 对应的 socketchannel,往 socketchannel 里写入相应的 message 即可。

eventloopgroup boss = new nioeventloopgroup(1);
  eventloopgroup worker = new nioeventloopgroup();
  serverbootstrap bootstrap = new serverbootstrap();
  bootstrap.group(boss, worker)
    .channel(nioserversocketchannel.class)
    .option(channeloption.so_backlog, 128)
    .option(channeloption.tcp_nodelay, true)
    .childoption(channeloption.so_keepalive, true)
    .childhandler(new channelinitializer() {
     @override
     protected void initchannel(channel channel) throws exception {
      channelpipeline p = channel.pipeline();
      p.addlast(new messageencoder());
      p.addlast(new messagedecoder());
      p.addlast(new pushserverhandler());
     }
    });
  channelfuture future = bootstrap.bind(host,port).sync();
  if (future.issuccess()) {
   logger.info("server start...");
  }

2.2 netty client 端:

客户端用于接收服务端的消息,随即进行业务处理。客户端还有心跳机制,它通过 idleevent 事件定时向服务端放送 ping 消息以此来检测 socketchannel 是否中断。

public pushclientbootstrap(string host, int port) throws interruptedexception {
  this.host = host;
  this.port = port;
  start(host,port);
 }
 private void start(string host, int port) throws interruptedexception {
  bootstrap = new bootstrap();
  bootstrap.channel(niosocketchannel.class)
    .option(channeloption.so_keepalive, true)
    .group(workgroup)
    .remoteaddress(host, port)
    .handler(new channelinitializer(){
     @override
     protected void initchannel(channel channel) throws exception {
      channelpipeline p = channel.pipeline();
      p.addlast(new idlestatehandler(20, 10, 0)); // idlestatehandler 用于检测心跳
      p.addlast(new messagedecoder());
      p.addlast(new messageencoder());
      p.addlast(new pushclienthandler());
     }
    });
  doconnect(port, host);
 }
 /**
  * 建立连接,并且可以实现自动重连.
  * @param port port.
  * @param host host.
  * @throws interruptedexception interruptedexception.
  */
 private void doconnect(int port, string host) throws interruptedexception {
  if (socketchannel != null && socketchannel.isactive()) {
   return;
  }
  final int portconnect = port;
  final string hostconnect = host;
  channelfuture future = bootstrap.connect(host, port);
  future.addlistener(new channelfuturelistener() {
   @override
   public void operationcomplete(channelfuture futurelistener) throws exception {
    if (futurelistener.issuccess()) {
     socketchannel = (socketchannel) futurelistener.channel();
     logger.info("connect to server successfully!");
    } else {
     logger.info("failed to connect to server, try connect after 10s");
     futurelistener.channel().eventloop().schedule(new runnable() {
      @override
      public void run() {
       try {
        doconnect(portconnect, hostconnect);
       } catch (interruptedexception e) {
        e.printstacktrace();
       }
      }
     }, 10, timeunit.seconds);
    }
   }
  }).sync();
 }

三. 借助 zookeeper 实现简单的服务注册与发现

3.1 服务注册

服务注册本质上是为了解耦服务提供者和服务消费者。服务注册是一个高可用强一致性的服务发现存储仓库,主要用来存储服务的api和地址对应关系。为了高可用,服务注册中心一般为一个集群,并且能够保证分布式一致性。目前常用的有 zookeeper、etcd 等等。

在我们项目中采用了 zookeeper 实现服务注册。

public class serviceregistry {
 private static final logger logger = loggerfactory.getlogger(serviceregistry.class);
 private countdownlatch latch = new countdownlatch(1);
 private string registryaddress;
 public serviceregistry(string registryaddress) {
  this.registryaddress = registryaddress;
 }
 public void register(string data) {
  if (data != null) {
   zookeeper zk = connectserver();
   if (zk != null) {
    createnode(zk, data);
   }
  }
 }
 /**
  * 连接 zookeeper 服务器
  * @return
  */
 private zookeeper connectserver() {
  zookeeper zk = null;
  try {
   zk = new zookeeper(registryaddress, constants.zk_session_timeout, new watcher() {
    @override
    public void process(watchedevent event) {
     if (event.getstate() == event.keeperstate.syncconnected) {
      latch.countdown();
     }
    }
   });
   latch.await();
  } catch (ioexception | interruptedexception e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 创建节点
  * @param zk
  * @param data
  */
 private void createnode(zookeeper zk, string data) {
  try {
   byte[] bytes = data.getbytes();
   string path = zk.create(constants.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
   logger.debug("create zookeeper node ({} => {})", path, data);
  } catch (keeperexception | interruptedexception e) {
   logger.error("", e);
  }
 }
}

有了服务注册,在 netty 服务端启动之后,将 netty 服务端的 ip 和 port 注册到 zookeeper。

eventloopgroup boss = new nioeventloopgroup(1);
  eventloopgroup worker = new nioeventloopgroup();
  serverbootstrap bootstrap = new serverbootstrap();
  bootstrap.group(boss, worker)
    .channel(nioserversocketchannel.class)
    .option(channeloption.so_backlog, 128)
    .option(channeloption.tcp_nodelay, true)
    .childoption(channeloption.so_keepalive, true)
    .childhandler(new channelinitializer() {
     @override
     protected void initchannel(channel channel) throws exception {
      channelpipeline p = channel.pipeline();
      p.addlast(new messageencoder());
      p.addlast(new messagedecoder());
      p.addlast(new pushserverhandler());
     }
    });
  channelfuture future = bootstrap.bind(host,port).sync();
  if (future.issuccess()) {
   logger.info("server start...");
  }
  if (serviceregistry != null) {
   serviceregistry.register(host + ":" + port);
  }

3.2 服务发现

这里我们采用的是客户端的服务发现,即服务发现机制由客户端实现。

客户端在和服务端建立连接之前,通过查询注册中心的方式来获取服务端的地址。如果存在有多个 netty 服务端的话,可以做服务的负载均衡。在我们的项目中只采用了简单的随机法进行负载。

public class servicediscovery {
 private static final logger logger = loggerfactory.getlogger(servicediscovery.class);
 private countdownlatch latch = new countdownlatch(1);
 private volatile list<string> serviceaddresslist = new arraylist<>();
 private string registryaddress; // 注册中心的地址
 public servicediscovery(string registryaddress) {
  this.registryaddress = registryaddress;
  zookeeper zk = connectserver();
  if (zk != null) {
   watchnode(zk);
  }
 }
 /**
  * 通过服务发现,获取服务提供方的地址
  * @return
  */
 public string discover() {
  string data = null;
  int size = serviceaddresslist.size();
  if (size > 0) {
   if (size == 1) { //只有一个服务提供方
    data = serviceaddresslist.get(0);
    logger.info("unique service address : {}", data);
   } else {   //使用随机分配法。简单的负载均衡法
    data = serviceaddresslist.get(threadlocalrandom.current().nextint(size));
    logger.info("choose an address : {}", data);
   }
  }
  return data;
 }
 /**
  * 连接 zookeeper
  * @return
  */
 private zookeeper connectserver() {
  zookeeper zk = null;
  try {
   zk = new zookeeper(registryaddress, constants.zk_session_timeout, new watcher() {
    @override
    public void process(watchedevent event) {
     if (event.getstate() == watcher.event.keeperstate.syncconnected) {
      latch.countdown();
     }
    }
   });
   latch.await();
  } catch (ioexception | interruptedexception e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 获取服务地址列表
  * @param zk
  */
 private void watchnode(final zookeeper zk) {
  try {
   //获取子节点列表
   list<string> nodelist = zk.getchildren(constants.zk_registry_path, new watcher() {
    @override
    public void process(watchedevent event) {
     if (event.gettype() == event.eventtype.nodechildrenchanged) {
      //发生子节点变化时再次调用此方法更新服务地址
      watchnode(zk);
     }
    }
   });
   list<string> datalist = new arraylist<>();
   for (string node : nodelist) {
    byte[] bytes = zk.getdata(constants.zk_registry_path + "/" + node, false, null);
    datalist.add(new string(bytes));
   }
   logger.debug("node data: {}", datalist);
   this.serviceaddresslist = datalist;
  } catch (keeperexception | interruptedexception e) {
   logger.error("", e);
  }
 }
}

netty 客户端启动之后,通过服务发现获取 netty 服务端的 ip 和 port。

/**
  * 支持通过服务发现来获取 socket 服务端的 host、port
  * @param discoveryaddress
  * @throws interruptedexception
  */
 public pushclientbootstrap(string discoveryaddress) throws interruptedexception {

  servicediscovery = new servicediscovery(discoveryaddress);
  serveraddress = servicediscovery.discover();

  if (serveraddress!=null) {
   string[] array = serveraddress.split(":");
   if (array!=null && array.length==2) {

    string host = array[0];
    int port = integer.parseint(array[1]);

    start(host,port);
   }
  }
 }

四. 总结

服务注册和发现一直是分布式的核心组件。本文介绍了借助 zookeeper 做注册中心,如何实现一个简单的服务注册和发现。其实,注册中心的选择有很多,例如 etcd、eureka 等等。选择符合我们业务需求的才是最重要的。

以上所述是小编给大家介绍的netty + zookeeper 实现简单的服务注册与发现,希望对大家有所帮助

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网