当前位置: 移动技术网 > IT编程>开发语言>.net > Kafka与.net core(二)zookeeper

Kafka与.net core(二)zookeeper

2019年01月12日  | 移动技术网IT编程  | 我要评论

魁拔之十万火急全集,井底之蛙是什么意思,保定小姐

1.zookeeper简单介绍

1.1作用

zookeeper的作用是存储kafka的服务器信息,topic信息,和cunsumer信息。如下图:

而zookeeper是个什么东西呢?简单来说就是一个具有通知机制的文件系统,引用网路上的一张图

可以看出来zookeeper是一个树形的文件结构,我们可以自定义node与node的值,并对node进行监视,当node的结构或者值变化时,我们可以收到通知。

1.2node类型

1)persistent-持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在
2)persistent_sequential-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是zookeeper给该节点名称进行顺序编号
3)ephemeral-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
4)ephemeral_sequential-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是zookeeper给该节点名称进行顺序编号

2.zookeeper命令操作

连接zookeeper

[root@iz2zei2y693gtrgwlibzlwz ~]# zkcli.sh -server ip:2181

查看zookeeper的所有节点

ls /

查看某个节点的子节点

ls /brokers

创建节点

 create /testaa dataaaa

获取节点的值

get /testaa

设置节点值

 set /testaa aaabbb

删除节点

 delete /testaa

这么看来实际上zookeeper跟数据库类似也是curd操作,我们再来看看zookeeper的安全控制acl

3.zookeeper的acl

3.1zk的节点有5种操作权限:

create、read、write、delete、admin 也就是 增、删、改、查、管理权限,这5种权限简写为crwda(即:每个单词的首字符缩写)

3.2身份的认证有4种方式:

world:默认方式,相当于全世界都能访问
auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
ip:使用ip地址认证

3.3acl实例

默认创建的时world方式,任何人都能访问,我们新建一个测试node节点
[zk: 3:2181(connected) 11] create /cys cys

访问一一下

[zk: 3:2181(connected) 13] get /cys

查看一下acl

[zk: 3:2181(connected) 15] getacl /cys

下面我们设置一下他的用户

 命令为:

1)增加一个认证用户
addauth digest 用户名:密码明文
eg. addauth digest user1:password1
2)设置权限
setacl /path auth:用户名:密码明文:权限
eg. setacl /test auth:user1:password1:cdrwa
3)查看acl设置
getacl /path

 具体操作如下:

addauth digest cys:123456
setacl /cys auth:cys:123456:crwda

我们ctrl+c退出zkcli,重新连接一下,然后查询

get /cys

结果如下:

提示我们认证失败,我们登陆一下

addauth digest cys:123456

结果如下:

我们在查看一下/cys节点的acl

 

可以看出来用户cys对应的密码(加密后的)和权限cdrwa

4..net core 操作

4.1新建server项目,引入zookeepernetex这个nuget包

server端代码

using org.apache.zookeeper;
using org.apache.zookeeper.data;

using system;
using system.collections.generic;
using system.text;
using system.threading.tasks;

namespace consoleapp3
{
    class program
    {
        static void main(string[] args)
        {
            while (true)
            {
                console.writeline("输入path");
                var path = console.readline();
                string address = "39.**.**.**:2181";
                zookeeper _zookeeper = new zookeeper(address, 1000 * 1000, null);
                zookeeper.states states = _zookeeper.getstate();
                //是否存在
                task<org.apache.zookeeper.data.stat> stat = _zookeeper.existsasync(path);
                stat.wait();
                if (stat.result != null && stat.status.tostring().tolower() == "rantocompletion".tolower())
                {
                    //已存在
                    console.writeline($"{path}已存在");
                }
                else
                {
                    console.writeline("输入data");
                    var data = console.readline();
                    //创建
                    task<string> task = _zookeeper.createasync(path, system.text.encoding.utf8.getbytes(data), zoodefs.ids.open_acl_unsafe, createmode.persistent);
                    task.wait();
                    if (!string.isnullorempty(task.result) && task.status.tostring().tolower() == "rantocompletion".tolower())
                    {
                        console.writeline($"{path}创建成功");
                    }
                }
                console.writeline("输入set data");
                var dataa = console.readline();
                //set值
                task<org.apache.zookeeper.data.stat> stata = _zookeeper.setdataasync(path, system.text.encoding.utf8.getbytes(dataa));
                stata.wait();
                if (stata.result != null && stata.status.tostring().tolower() == "rantocompletion".tolower())
                {
                    console.writeline("set 成功");
                }

                console.writeline("输入子path");
                var childpath = console.readline();
     
                console.writeline("输入子data");
                var childdata = console.readline();
                task<string> childtask = _zookeeper.createasync(childpath, system.text.encoding.utf8.getbytes(childdata), zoodefs.ids.open_acl_unsafe, createmode.persistent);
                childtask.wait();
                if (!string.isnullorempty(childtask.result) && childtask.status.tostring().tolower() == "rantocompletion".tolower())
                {
                    console.writeline($"{childpath}创建成功");
                }

                console.readline();
                _zookeeper.closeasync().wait();
            }
            ////删除
            //console.writeline("输入delete path");
            //var pathb = console.readline();
            //task taska = _zookeeper.deleteasync(pathb);
            //taska.wait();
            //if (taska.status.tostring().tolower() == "rantocompletion".tolower())
            //{
            //    console.writeline("delete 成功");
            //}

            ////获取数据
            //task<dataresult> dataresult = _zookeeper.getdataasync(path, new nodewatcher());
            //dataresult.wait();
            //if (dataresult.result != null && dataresult.status.tostring().tolower() == "rantocompletion".tolower())
            //{
            //    console.writeline(encoding.utf8.getstring(dataresult.result.data));
            //}
        }
    }

}

4.2新建client项目,引入zookeepernetex这个nuget包

客户端代码

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using system;
using system.collections.generic;
using system.threading;
using system.threading.tasks;

namespace client
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("输入path");
            var path = console.readline();
            string address = "39.**.**.**:2181";
            zookeeper _zookeeper = new zookeeper(address, 10 * 1000, new defaultwatcher());
       //用户登陆 _zookeeper.addauthinfo("digest", system.text.encoding.default.getbytes("cys:123456")); //获取child var getresult = _zookeeper.getchildrenasync(path, true); getresult.wait(); //获取数据 task<dataresult> dataresult = _zookeeper.getdataasync(path,true); dataresult.wait(); thread.sleep(100000); } } public class defaultwatcher : watcher { internal static readonly task completedtask = task.fromresult(1);         /// <summary>         /// 接收通知         /// </summary>         /// <param name="event"></param>         /// <returns></returns>         public override task process(watchedevent @event) { console.writeline(string.format("接收到zookeeper服务端的通知,state是:{0},eventtype是:{1},path是:{2}", @event.getstate(), @event.get_type(), @event.getpath() ?? string.empty)); return completedtask; } } }

这样当server端操作的时候,client端会通过watcher收到通知

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网