魁拔之十万火急全集,井底之蛙是什么意思,保定小姐
zookeeper的作用是存储kafka的服务器信息,topic信息,和cunsumer信息。如下图:
而zookeeper是个什么东西呢?简单来说就是一个具有通知机制的文件系统,引用网路上的一张图
可以看出来zookeeper是一个树形的文件结构,我们可以自定义node与node的值,并对node进行监视,当node的结构或者值变化时,我们可以收到通知。
连接zookeeper
[root@iz2zei2y693gtrgwlibzlwz ~]# zkcli.sh -server ip:2181
查看zookeeper的所有节点
ls /
查看某个节点的子节点
ls /brokers
创建节点
create /testaa dataaaa
获取节点的值
get /testaa
设置节点值
set /testaa aaabbb
删除节点
delete /testaa
这么看来实际上zookeeper跟数据库类似也是curd操作,我们再来看看zookeeper的安全控制acl
[zk: 3:2181(connected) 11] create /cys cys
访问一一下
[zk: 3:2181(connected) 13] get /cys
查看一下acl
[zk: 3:2181(connected) 15] getacl /cys
下面我们设置一下他的用户
命令为:
具体操作如下:
addauth digest cys:123456 setacl /cys auth:cys:123456:crwda
我们ctrl+c退出zkcli,重新连接一下,然后查询
get /cys
结果如下:
提示我们认证失败,我们登陆一下
addauth digest cys:123456
结果如下:
我们在查看一下/cys节点的acl
可以看出来用户cys对应的密码(加密后的)和权限cdrwa
server端代码
using org.apache.zookeeper; using org.apache.zookeeper.data; using system; using system.collections.generic; using system.text; using system.threading.tasks; namespace consoleapp3 { class program { static void main(string[] args) { while (true) { console.writeline("输入path"); var path = console.readline(); string address = "39.**.**.**:2181"; zookeeper _zookeeper = new zookeeper(address, 1000 * 1000, null); zookeeper.states states = _zookeeper.getstate(); //是否存在 task<org.apache.zookeeper.data.stat> stat = _zookeeper.existsasync(path); stat.wait(); if (stat.result != null && stat.status.tostring().tolower() == "rantocompletion".tolower()) { //已存在 console.writeline($"{path}已存在"); } else { console.writeline("输入data"); var data = console.readline(); //创建 task<string> task = _zookeeper.createasync(path, system.text.encoding.utf8.getbytes(data), zoodefs.ids.open_acl_unsafe, createmode.persistent); task.wait(); if (!string.isnullorempty(task.result) && task.status.tostring().tolower() == "rantocompletion".tolower()) { console.writeline($"{path}创建成功"); } } console.writeline("输入set data"); var dataa = console.readline(); //set值 task<org.apache.zookeeper.data.stat> stata = _zookeeper.setdataasync(path, system.text.encoding.utf8.getbytes(dataa)); stata.wait(); if (stata.result != null && stata.status.tostring().tolower() == "rantocompletion".tolower()) { console.writeline("set 成功"); } console.writeline("输入子path"); var childpath = console.readline(); console.writeline("输入子data"); var childdata = console.readline(); task<string> childtask = _zookeeper.createasync(childpath, system.text.encoding.utf8.getbytes(childdata), zoodefs.ids.open_acl_unsafe, createmode.persistent); childtask.wait(); if (!string.isnullorempty(childtask.result) && childtask.status.tostring().tolower() == "rantocompletion".tolower()) { console.writeline($"{childpath}创建成功"); } console.readline(); _zookeeper.closeasync().wait(); } ////删除 //console.writeline("输入delete path"); //var pathb = console.readline(); //task taska = _zookeeper.deleteasync(pathb); //taska.wait(); //if (taska.status.tostring().tolower() == "rantocompletion".tolower()) //{ // console.writeline("delete 成功"); //} ////获取数据 //task<dataresult> dataresult = _zookeeper.getdataasync(path, new nodewatcher()); //dataresult.wait(); //if (dataresult.result != null && dataresult.status.tostring().tolower() == "rantocompletion".tolower()) //{ // console.writeline(encoding.utf8.getstring(dataresult.result.data)); //} } } }
客户端代码
using org.apache.zookeeper; using org.apache.zookeeper.data; using system; using system.collections.generic; using system.threading; using system.threading.tasks; namespace client { class program { static void main(string[] args) { console.writeline("输入path"); var path = console.readline(); string address = "39.**.**.**:2181"; zookeeper _zookeeper = new zookeeper(address, 10 * 1000, new defaultwatcher());
//用户登陆 _zookeeper.addauthinfo("digest", system.text.encoding.default.getbytes("cys:123456")); //获取child var getresult = _zookeeper.getchildrenasync(path, true); getresult.wait(); //获取数据 task<dataresult> dataresult = _zookeeper.getdataasync(path,true); dataresult.wait(); thread.sleep(100000); } } public class defaultwatcher : watcher { internal static readonly task completedtask = task.fromresult(1); /// <summary> /// 接收通知 /// </summary> /// <param name="event"></param> /// <returns></returns> public override task process(watchedevent @event) { console.writeline(string.format("接收到zookeeper服务端的通知,state是:{0},eventtype是:{1},path是:{2}", @event.getstate(), @event.get_type(), @event.getpath() ?? string.empty)); return completedtask; } } }
这样当server端操作的时候,client端会通过watcher收到通知
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
Net Core Web Api项目与在NginX下发布的方法
asp.net core3.1 引用的元包dll版本兼容性问题解决方案
IdentityServer4实现.Net Core API接口权限认证(快速入门)
ASP.NET Core MVC通过IViewLocationExpander扩展视图搜索路径的实现
网友评论