当前位置: 移动技术网 > IT编程>开发语言>Java > Java实现ZooKeeper的zNode监控

Java实现ZooKeeper的zNode监控

2019年08月15日  | 移动技术网IT编程  | 我要评论
上一篇文章已经完成了ZooKeeper的基本搭建和使用的介绍,现在开始用代码说话。这个例子只实现基本的Watcher功能:当创建或修改数据时,控制台打印当前的数据内容和版本号;当节点被删除时,程序退出。 ...

上一篇文章已经完成了zookeeper的基本搭建和使用的介绍,现在开始用代码说话。参考 https://zookeeper.apache.org/doc/current/javaexample.html ,但对场景和代码都做了简化,只实现基本的watcher功能。

1   场景设计

目的是体验zookeeper的watcher功能。程序监控zookeeper的/watcher节点数据变化,当创建或修改数据时,控制台打印当前的数据内容和版本号;当/watcher被删除时,程序退出。

/watcher的创建、修改和删除操作,使用控制台或zkui操作。

2   搭建maven项目

代码相对比较简单,就不用springboot这个大杀器了,使用一个普通的maven项目即可。

zookeeper客户端使用官方提供的java库,org.apache.zookeeper: zookeeper: 3.5.5。日志框架使用习惯的slf4j+log4j2,zookeeper缺省使用的是log4j v1,因此在引入的时候需要excludes。另外,使用了lombok来简化一些代码。

以下是pom.xml的内容

<?xml version="1.0" encoding="utf-8"?>

<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"

     xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelversion>4.0.0</modelversion>

  <groupid>tech.codestory.research</groupid>

  <artifactid>zookeeper</artifactid>

  <version>1.0.0-snapshot</version>

  <dependencies>

    <dependency>

      <groupid>org.apache.zookeeper</groupid>

      <artifactid>zookeeper</artifactid>

      <version>3.5.5</version>

      <exclusions>

        <exclusion>

          <groupid>log4j</groupid>

          <artifactid>log4j</artifactid>

        </exclusion>

        <exclusion>

          <groupid>org.slf4j</groupid>

          <artifactid>slf4j-log4j12</artifactid>

        </exclusion>

      </exclusions>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-core</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-api</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-web</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-slf4j-impl</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.slf4j</groupid>

      <artifactid>slf4j-api</artifactid>

      <version>1.7.28</version>

    </dependency>

    <dependency>

      <groupid>org.slf4j</groupid>

      <artifactid>slf4j-ext</artifactid>

      <version>1.7.28</version>

    </dependency>

    <dependency>

      <groupid>org.projectlombok</groupid>

      <artifactid>lombok</artifactid>

      <version>1.18.8</version>

      <scope>provided</scope>

    </dependency>

  </dependencies>

</project>

 

3   log4j2.xml

在项目的 src/main/resources 下创建一个文件 log4j2.xml,内容为

<?xml version="1.0" encoding="utf-8"?>

<configuration status="debug" name="codestorylogger">

  <appenders>

    <console name="stdout">

      <thresholdfilter level="trace" onmatch="accept" onmismatch="deny"/>

      <patternlayout pattern="%d{hh:mm:ss:sss} [%p] - %c{1}.%m(%l) - %m%n"/>

    </console>

  </appenders>

  <thresholdfilter level="trace"/>

  <loggers>

    <logger name="org.apache.zookeeper.clientcnxn" level="error" additivity="false">

      <appender-ref ref="stdout" />

    </logger>

    <logger name="org.apache.zookeeper" level="trace" additivity="false">

      <appender-ref ref="stdout"/>

    </logger>

    <logger name="tech.codestory" level="trace" additivity="false">

      <appender-ref ref="stdout"/>

    </logger>

    <root level="warn">

      <appender-ref ref="stdout"/>

    </root>

  </loggers>

</configuration>

 

4   创建zookeeper连接

创建连接代码比较简单,只需要创建 zookeeper对象就行,

zookeeper构造函数的定义

/**

 * 创建一个 zookeeper 客户端对象

 * @param connectstring 逗号分隔的 host:port 字符串,

 *    单点如 127.0.0.1:2181,

 *    集群如 192.168.5.128:2181,192.168.5.129:2181,192.168.5.130:2181,

 *    还可以指定根节点,如 127.0.0.1:2181/foo/bar

 * @param sessiontimeout 毫秒为单位的超时时间

 * @param watcher watcher对象,用于接收 matcherevent

 * @throws ioexception 网络错误时抛出异常

 * @throws illegalargumentexception 如果root路径设置错误

 */

public zookeeper(string connectstring, int sessiontimeout, watcher watcher)

  throws ioexception;

写一段测试代码,创建zk对象后判断某一个znode是否存在。

public class zookeeperwatcher implements watcher {

  /** zookeeper的客户端连接 */

  zookeeper zk;

 

  public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception {

    zk = new zookeeper(hostport, 3000, this);

    try {

      stat exists = zk.exists(znode, true);

      if(exists == null){

        log.info(“{} 不存在”, znode)

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

  }

}

运行这段代码,发现会抛异常

java.net.socketexception: socket is not connected

  at sun.nio.ch.net.translatetosocketexception(net.java:162) ~[?:?]

  at sun.nio.ch.net.translateexception(net.java:196) ~[?:?]

  at sun.nio.ch.net.translateexception(net.java:202) ~[?:?]

  at sun.nio.ch.socketadaptor.shutdowninput(socketadaptor.java:400) ~[?:?]

  at org.apache.zookeeper.clientcnxnsocketnio.cleanup(clientcnxnsocketnio.java:198) [zookeeper-3.5.5.jar:3.5.5]

  at org.apache.zookeeper.clientcnxn$sendthread.cleanup(clientcnxn.java:1338) [zookeeper-3.5.5.jar:3.5.5]

  at org.apache.zookeeper.clientcnxn$sendthread.cleanandnotifystate(clientcnxn.java:1276) [zookeeper-3.5.5.jar:3.5.5]

  at org.apache.zookeeper.clientcnxn$sendthread.run(clientcnxn.java:1254) [zookeeper-3.5.5.jar:3.5.5]

caused by: java.nio.channels.notyetconnectedexception

  at sun.nio.ch.socketchannelimpl.shutdowninput(socketchannelimpl.java:917) ~[?:?]

  at sun.nio.ch.socketadaptor.shutdowninput(socketadaptor.java:398) ~[?:?]

  ... 4 more

notyetconnectedexception的字面意思是连接还没有创建好,网络搜索了一下,建立连接需要一些时间,创建zk对象后马上调用exists命令,这时候连接还没有创建好,就会抛异常。zookeeper在连接建立成功之后,会发送一个watchedevent事件,我们可以利用这个事件完成建立连接的过程。修改后的代码如下,顺便添加了slf4j-ext中的profiler,用于记录所消耗的时间。

public class zookeeperwatcher implements watcher {

  /** 等待连接建立成功的信号 */

  private countdownlatch connectedsemaphore = new countdownlatch(1);

 

  /** zookeeper的客户端连接 */

  zookeeper zk;

 

  public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception {

    profiler profiler = new profiler("连接到zookeeper");

    profiler.start("开始链接");

    zk = new zookeeper(hostport, 3000, this);

    try {

          profiler.start("等待连接成功的event");

      connectedsemaphore.await();

      stat exists = zk.exists(znode, true);

      if(exists == null){

        log.info(“{} 不存在”, znode)

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

    profiler.stop();

    profiler.setlogger(log);

    profiler.log();

  }

 

  /** 收到zookeeper的watchedevent */

  @override

  public void process(watchedevent event) {

    log.info("event = {}", event);

    if (event.eventtype.none.equals(event.gettype())) {

      // 连接状态发生变化

      if (event.keeperstate.syncconnected.equals(event.getstate())) {

        // 连接建立成功

        connectedsemaphore.countdown();

      }

    }

  }

}

修改代码之后的执行记录日志如下,可以看到等待连接成功的event耗时9秒多。网络上有文章说关闭防火墙可以秒连,但我测试过,没发现有什么变化,使用systemctl stop firewalld之后重新执行程序,仍然需要9秒多。

[info] - zookeeperwatcher.process(61) - event = watchedevent state:syncconnected type:none path:null

[debug] - zookeeperwatcher.log(201) -

+ profiler [连接到zookeeper]

|-- elapsed time                   [开始链接]    78.912 milliseconds.

|-- elapsed time           [等待连接成功的event]  9330.606 milliseconds.

|-- total                  [连接到zookeeper]  9409.926 milliseconds.

 

[info] - zookeeperwatcher.readnodedata(95) - /watcher 不存在

 

5   读取watchedevent

前面的代码,只是处理了建立连接成功时的event,下面再来看看读取数据的过程。关键代码如下:

if (event.eventtype.nodedatachanged.equals(event.gettype())

    || event.eventtype.nodecreated.equals(event.gettype())) {

  string path = event.getpath();

  if (path != null && path.equals(znode)) {

    // 节点数据被修改

    readnodedata();

  }

}

 

/** 读节点数据 */

private void readnodedata() {

  try {

    stat stat = new stat();

    byte[] data = zk.getdata(znode, true, stat);

    if (data != null) {

      log.info("{}, value={}, version={}", znode, new string(data), stat.getversion());

    }

  } catch (keeperexception e) {

    log.info("{} 不存在", znode);   

  } catch (interruptedexception e) {

    log.error("interruptedexception", e);

  }

}

当接收到创建节点和修改节点的watchedevent,都会将数据读出并打印在控制台。

6   调整后的完整程序清单

对前面的代码做了部分调整,同时添加了退出系统的机制:节点被删除。

package tech.codestory.zookeeper.watcher;

 

import java.io.ioexception;

import java.util.concurrent.countdownlatch;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.stat;

import org.slf4j.profiler.profiler;

import lombok.extern.slf4j.slf4j;

 

/**

 * 用于测试 zookeeper的 watchedevent用法

 * @author code story

 * @date 2019/8/13

 */

@slf4j

public class zookeeperwatcher implements watcher, runnable {

  /** 等待连接建立成功的信号 */

  private countdownlatch connectedsemaphore = new countdownlatch(1);

  /** 退出系统的信号 */

  static integer quitsemaphore = integer.valueof(-1);

 

  string znode;

  zookeeper zk;

 

  public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception {

    this.znode = znode;

 

    profiler profiler = new profiler("连接到zookeeper");

    profiler.start("开始链接");

    zk = new zookeeper(hostport, 3000, this);

    try {

      profiler.start("等待连接成功的event");

      connectedsemaphore.await();

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

    profiler.stop();

    profiler.setlogger(log);

    profiler.log();

 

    // 先读当前的数据

    readnodedata();

  }

 

  /** 收到zookeeper的watchedevent */

  @override

  public void process(watchedevent event) {

    log.info("event = {}", event);

    if (event.eventtype.none.equals(event.gettype())) {

      // 连接状态发生变化

      if (event.keeperstate.syncconnected.equals(event.getstate())) {

        // 连接建立成功

        connectedsemaphore.countdown();

      }

    } else if (event.eventtype.nodedatachanged.equals(event.gettype())

        || event.eventtype.nodecreated.equals(event.gettype())) {

      string path = event.getpath();

      if (path != null && path.equals(znode)) {

        // 节点数据被修改

        readnodedata();

      }

    } else if (event.eventtype.nodedeleted.equals(event.gettype())) {

      string path = event.getpath();

      if (path != null && path.equals(znode)) {

        synchronized (quitsemaphore) {

          // 节点被删除,通知退出线程

          quitsemaphore.notify();

        }

      }

    }

  }

 

  /** 读节点数据 */

  private void readnodedata() {

    try {

      stat stat = new stat();

      byte[] data = zk.getdata(znode, true, stat);

      if (data != null) {

        log.info("{}, value={}, version={}", znode, new string(data), stat.getversion());

      }

    } catch (keeperexception e) {

      log.info("{} 不存在", znode);

      try {

        // 目的是添加watcher

        zk.exists(znode, true);

      } catch (keeperexception ex) {

      } catch (interruptedexception ex) {

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

  }

 

  @override

  public void run() {

    try {

      synchronized (quitsemaphore) {

        quitsemaphore.wait();

        log.info("{} 被删除,退出", znode);

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

  }

 

  /** 主程序,代码中写死了server地址和znode名,也可以改成从args中读取 */

  public static void main(string[] args) {

    string hostport = "192.168.5.128:2181";

    string znode = "/watcher";

    try {

      new zookeeperwatcher(hostport, znode).run();

    } catch (exception e) {

      log.error("new zookeeperexecutor()", e);

    }

  }

}

做一个测试,应用启动后创建节点,修改多次节点,最后删除节点,日志输出如下:

10:13:31:979 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:none path:null

10:13:31:982 [debug] - zookeeperwatcher.log(201) -

+ profiler [连接到zookeeper]

|-- elapsed time                   [开始链接]   210.193 milliseconds.

|-- elapsed time           [等待连接成功的event]  9385.467 milliseconds.

|-- total                  [连接到zookeeper]  9596.196 milliseconds.

 

10:13:31:996 [info] - zookeeperwatcher.readnodedata(84) - /watcher 不存在

10:15:43:451 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodecreated path:/watcher

10:15:43:463 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 00, version=0

10:15:50:906 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:15:50:910 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 01, version=1

10:15:56:004 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:15:56:007 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 02, version=2

10:16:00:246 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:00:249 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 03, version=3

10:16:06:399 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:06:402 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 10, version=4

10:16:10:217 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:10:220 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 11, version=5

10:16:14:444 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:14:447 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 12, version=6

10:16:20:118 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedeleted path:/watcher

10:16:20:118 [info] - zookeeperwatcher.run(101) - /watcher 被删除,退出

如您对本文有疑问或者有任何想说的,请 点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网