当前位置: 移动技术网 > IT编程>开发语言>Java > 浅谈Zookeeper开源客户端框架Curator

浅谈Zookeeper开源客户端框架Curator

2019年07月19日  | 移动技术网IT编程  | 我要评论

zookeepercurator

curator是netflix开源的一套zookeeper客户端框架. netflix在使用zookeeper的过程中发现zookeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. netflix在用zookeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及netflix的技术blog入手.

看完官方的文档之后, 发现curator主要解决了三类问题:

1.封装zookeeper client与zookeeper server之间的连接处理;
2.提供了一套fluent风格的操作api;
3.提供zookeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装.

curator列举的zookeeper使用过程中的几个问题

1.初始化连接的问题: 在client与server之间握手建立连接的过程中, 如果握手失败, 执行所有的同步方法(比如create, getdata等)将抛出异常
2.自动恢复(failover)的问题: 当client与一台server的连接丢失,并试图去连接另外一台server时, client将回到初始连接模式
session过期的问题: 在极端情况下, 出现zookeeper session过期, 客户端需要自己去监听该状态并重新创建zookeeper实例 .
3.对可恢复异常的处理:当在server端创建一个有序znode, 而在将节点名返回给客户端时崩溃, 此时client端抛出可恢复的异常, 用户需要自己捕获这些异常并进行重试
4.使用场景的问题:zookeeper提供了一些标准的使用场景支持, 但是zookeeper对这些功能的使用说明文档很少, 而且很容易用错. 在一些极端场景下如何处理, zk并没有给出详细的文档说明. 比如共享锁服务, 当服务器端创建临时顺序节点成功, 但是在客户端接收到节点名之前挂掉了, 如果不能很好的处理这种情况, 将导致死锁.

curator主要从以下几个方面降低了zk使用的复杂性:

1.重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略, 并且内部也提供了几种标准的重试策略(比如指数补偿).
2.连接状态监控: curator初始化之后会一直的对zk连接进行监听, 一旦发现连接状态发生变化, 将作出相应的处理.
3.zk客户端实例管理:curator对zk客户端到server集群连接进行管理. 并在需要的情况, 重建zk实例, 保证与zk集群的可靠连接
各种使用场景支持:curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景), 这些实现都遵循了zk的较佳实践, 并考虑了各种极端情况.

curator通过以上的处理, 让用户专注于自身的业务本身, 而无需花费更多的精力在zk本身.

curator声称的一些亮点:

日志工具

内部采用slf4j 来输出日志
采用驱动器(driver)机制, 允许扩展和定制日志和跟踪处理
提供了一个tracerdriver接口, 通过实现addtrace()和addcount()接口来集成用户自己的跟踪框架

和curator相比, 另一个zookeeper客户端——zkclient

文档几乎没有
异常处理弱爆了(简单的抛出runtimeexception)
重试处理太难用了
没有提供各种使用场景的实现
对zookeeper自带客户端(zookeeper类)的"抱怨":
只是一个底层实现
要用需要自己写大量的代码
很容易误用
需要自己处理连接丢失, 重试等

curator几个组成部分

1.client: 是zookeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法.
2.framework: 用来简化zookeeper高级功能的使用, 并增加了一些新的功能, 比如管理到zookeeper集群的连接, 重试处理
3.recipes: 实现了通用zookeeper的recipe, 该组件建立在framework的基础之上
4.utilities:各种zookeeper的工具类
5.errors: 异常处理, 连接, 恢复等.
6.extensions: recipe扩展

client

这是一个底层的api, 应用方基本对这个可以无视, 较好直接从curator framework入手,主要包括三部分:

不间断连接管理
连接重试处理
retry loop(循环重试)

一种典型的用法:

retryloop retryloop = client.newretryloop();
while ( retryloop.shouldcontinue() )
{
  try
  {
    // perform your work
    ...
    // it's important to re-get the zk instance as there may have been an error and the instance was re-created
    zookeeper   zk = client.getzookeeper();
    retryloop.markcomplete();
  }
  catch ( exception e )
  {
    retryloop.takeexception(e);
  }
}

如果在操作过程中失败, 且这种失败是可重试的, 而且在允许的次数内, curator将保证操作的最终完成.
另一种使用callable接口的重试做法:

retryloop.callwithretry(client, new callable()
{
   @override
   public void call() throws exception
   {
     // do your work here - it will get retried if needed
     return null;
   }
});

重试策略

retrypolicy接口只有一个方法(以前版本有两个方法):

public boolean allowretry(int retrycount, long elapsedtimems); 

在开始重试之前, allowretry方法被调用, 其参数将指定当前重试次数, 和操作已消耗时间. 如果允许, 将继续重试, 否则抛出异常.

curator内部实现的几种重试策略:

1.exponentialbackoffretry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加.
2.retryntimes:指定较大重试次数的重试策略
3.retryonetime:仅重试一次
4.retryuntilelapsed:一直重试直到达到规定的时间

framework

是zookeeper client更高的抽象api

自动连接管理: 当zookeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明

更清晰的api: 简化了zookeeper原生的方法, 事件等, 提供流程的接口

curatorframeworkfactory类提供了两个方法, 一个工厂方法newclient, 一个构建方法build. 使用工厂方法newclient可以创建一个默认的实例, 而build构建方法可以对实例进行定制. 当curatorframework实例构建完成, 紧接着调用start()方法, 在应用结束的时候, 需要调用close()方法.  curatorframework是线程安全的. 在一个应用中可以共享同一个zk集群的curatorframework.

curatorframework api采用了连贯风格的接口(fluent interface). 所有的操作一律返回构建器, 当所有元素加在一起之后, 整个方法看起来就像一个完整的句子. 比如下面的操作:

client.create().forpath("/head", new byte[0]);
client.delete().inbackground().forpath("/head");
client.create().withmode(createmode.ephemeral_sequential).forpath("/head/child", new byte[0]);
client.getdata().watched().inbackground().forpath("/test");

方法说明:

1.create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forpath()方法结尾
2.delete(): 发起一个删除操作. 可以组合其他方法(version 或background) 最后以forpath()方法结尾
3.checkexists(): 发起一个检查znode 是否存在的操作. 可以组合其他方法(watch 或background) 最后以forpath()方法结尾
4.getdata(): 发起一个获取znode数据的操作. 可以组合其他方法(watch, background 或get stat) 最后以forpath()方法结尾
5.setdata(): 发起一个设置znode数据的操作. 可以组合其他方法(version 或background) 最后以forpath()方法结尾
6.getchildren(): 发起一个获取znode子节点的操作. 可以组合其他方法(watch, background 或get stat) 最后以forpath()方法结尾
7.intransaction(): 发起一个zookeeper事务. 可以组合create, setdata, check, 和/或delete 为一个操作, 然后commit() 提交
.

通知(notification)

curator的相关代码已经更新了, 里面的接口已经由clientlistener改成curatorlistener了, 而且接口中去掉了clientcloseduetoerror方法. 只有一个方法:
eventreceived()            当一个后台操作完成或者指定的watch被触发时该方法被调用

unhandlederrorlistener接口用来对异常进行处理.

curatorevent(在以前版本为clientevent)是对各种操作触发相关事件对象(pojo)的一个完整封装, 而事件对象的内容跟事件类型相关, 下面是对应关系:

create getresultcode() and getpath()
delete getresultcode() and getpath()
exists getresultcode(), getpath() and getstat()
get_data getresultcode(), getpath(), getstat() and getdata()
set_data getresultcode(), getpath() and getstat()
children getresultcode(), getpath(), getstat(), getchildren()
watched getwatchedevent()

名称空间(namespace)

因为一个zk集群会被多个应用共享, 为了避免各个应用的zk patch冲突, curator framework内部会给每一个curator framework实例分配一个namespace(可选). 这样你在create znode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下:

curatorframework  client = curatorframeworkfactory.builder().namespace("myapp") ... build();
…
client.create().forpath("/test", data);
// node was actually written to: "/myapp/test"

recipe

curator实现zookeeper的所有recipe(除了两段提交)

选举

集群领导选举(leader election)

锁服务

共享锁: 全局同步分布式锁, 同一时间两台机器只有一台能获得同一把锁.
共享读写锁: 用于分布式的读写互斥处理, 同时生成两个锁:一个读锁, 一个写锁, 读锁能被多个应用持有, 而写锁只能一个独占, 当写锁未被持有时, 多个读锁持有者可以同时进行读操作
共享信号量: 在分布式系统中的各个jvm使用同一个zk lock path, 该path将跟一个给定数量的租约(lease)相关联, 然后各个应用根据请求顺序获得对应的lease, 相对来说, 这是最公平的锁服务使用方式.
多共享锁:内部构件多个共享锁(会跟一个znode path关联), 在acquire()过程中, 执行所有共享锁的acquire()方法, 如果中间出现一个失败, 则将释放所有已require的共享锁; 执行release()方法时, 则执行内部多个共享锁的release方法(如果出现失败将忽略)

队列(queue)

分布式队列:采用持久顺序zk node来实现fifo队列, 如果有多个消费者, 可以使用leaderselector来保证队列的消费者顺序
分布式优先队列: 优先队列的分布式版本

blockingqueueconsumer: jdk阻塞队列的分布式版本

关卡(barrier)

分布式关卡:一堆客户端去处理一堆任务, 只有所有的客户端都执行完, 所有客户端才能继续往下处理

双分布式关卡:同时开始, 同时结束

计数器(counter)

共享计数器:所有客户端监听同一个znode path, 并共享一个的integer计数值
分布式atomiclong(atomicinteger): atomicxxx的分布式版本, 先采用乐观锁更新, 若失败再采用互斥锁更新, 可以配置重试策略来处理重试

工具类

path cache

path cache用于监听znode的子节点的变化, 当add, update, remove子节点时将改变path cache state, 同时返回所有子节点的data和state.

curator中采用了pathchildrencache类来处理path cache, 状态的变化则采用pathchildrencachelistener来监听.

相关用法参见testpathchildrencache测试类

注意: 当zk server的数据发生变化, zk client会出现不一致, 这个需要通过版本号来识别这种状态的变化

test server

用来在测试中模拟一个本地进程内zookeeper server.

test cluster

用来在测试中模拟一个zookeeper server集群

zkpaths工具类

提供了和znode相关的path处理工具方法:

1.getnodefrompath: 根据给定path获取node name. i.e. "/one/two/three" -> "three"

2.mkdirs: 根据给定路径递归创建所有node

3.getsortedchildren: 根据给定路径, 返回一个按序列号排序的子节点列表

4.makepath: 根据给定的path和子节点名, 创建一个完整path

ensurepath工具类

直接看例子, 具体的说就是调用多次, 只会执行一次创建节点操作.

ensurepath    ensurepath = new ensurepath(afullpathtoensure);
...
string      nodepath = afullpathtoensure + "/foo";
ensurepath.ensure(zk);  // first time syncs and creates if needed
zk.create(nodepath, ...);
...
ensurepath.ensure(zk);  // subsequent times are nops
zk.create(nodepath, ...);

notification事件处理

curator对zookeeper的事件watcher进行了封装处理, 然后实现了一套监听机制. 提供了几个监听接口用来处理zookeeper连接状态的变化

当连接出现异常, 将通过connectionstatelistener接口进行监听, 并进行相应的处理, 这些状态变化包括:

1.暂停(suspended): 当连接丢失, 将暂停所有操作, 直到连接重新建立, 如果在规定时间内无法建立连接, 将触发lost通知
2.重连(reconnected): 连接丢失, 执行重连时, 将触发该通知
3.丢失(lost): 连接超时时, 将触发该通知

从com.netflix.curator.framework.imps.curatorframeworkimpl.validateconnection(curatorevent)方法中我们可以知道, curator分别将zookeeper的disconnected, expired, syncconnected三种状态转换成上面三种状态.

总结

以上就是本文关于浅谈zookeeper开源客户端框架curator的全部内容,感兴趣的朋友可以参阅:、、等,希望对大家有所帮助。如有不足之处,欢迎留言指正。感谢朋友们对本站的支持!

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网