当前位置: 移动技术网 > IT编程>开发语言>Java > 求你了,别再问我Zookeeper如何实现分布式锁了!!!

求你了,别再问我Zookeeper如何实现分布式锁了!!!

2020年04月09日  | 移动技术网IT编程  | 我要评论

导读

  • 真是有人()的地方就有江湖(事务),今天不谈江湖,来撩撩人。

  • 分布式锁的概念、为什么使用分布式锁,想必大家已经很清楚了。前段时间作者写过redis是如何实现分布式锁,今天这篇文章来谈谈zookeeper是如何实现分布式锁的。

  • 陈某今天分别从如下几个方面来详细讲讲zk如何实现分布式锁:

    1. zk的四种节点

    2. 排它锁的实现

    3. 读写锁的实现

    4. curator实现分步式锁

zk的四种节点

  • 持久性节点:节点创建后将会一直存在

  • 临时节点:临时节点的生命周期和当前会话绑定,一旦当前会话断开临时节点也会删除,当然可以主动删除。

  • 持久有序节点:节点创建一直存在,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。

  • 临时有序节点:保留临时节点的特性,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。

 

排它锁的实现

  • 排他锁的实现相对简单一点,利用了zk的创建节点不能重名的特性。如下图:

  • 根据上图分析大致分为如下步骤:

    1. 尝试获取锁:创建临时节点,zk会保证只有一个客户端创建成功。

    2. 创建临时节点成功,获取锁成功,执行业务逻辑,业务执行完成后删除锁。

    3. 创建临时节点失败,阻塞等待。

    4. 监听删除事件,一旦临时节点删除了,表示互斥操作完成了,可以再次尝试获取锁。

    5. 递归:获取锁的过程是一个递归的操作,获取锁->监听->获取锁

  • 如何避免死锁:创建的是临时节点,当服务宕机会话关闭后临时节点将会被删除,锁自动释放。

代码实现

  • 作者参照jdk锁的实现方式加上模板方法模式的封装,封装接口如下:

/**
 * @description zk分布式锁的接口
 * @author 陈某
 * @date 2020/4/7 22:52
 */
public interface zklock {
    /**
     * 获取锁
     */
    void lock() throws exception;
​
    /**
     * 解锁
     */
    void unlock() throws exception;
}

 

  • 模板抽象类如下:

/**
 * @description 排他锁,模板类
 * @author 陈某
 * @date 2020/4/7 22:55
 */
public abstract class abstractzklockmutex implements zklock {
​
    /**
     * 节点路径
     */
    protected string lockpath;
​
    /**
     * zk客户端
     */
    protected curatorframework zkclient;
​
    private abstractzklockmutex(){}
​
    public abstractzklockmutex(string lockpath,curatorframework client){
        this.lockpath=lockpath;
        this.zkclient=client;
    }
​
    /**
     * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
     * @throws exception
     */
    @override
    public final void lock() throws exception {
        //获取锁成功
        if (trylock()){
            system.out.println(thread.currentthread().getname()+"获取锁成功");
        }else{  //获取锁失败
            //阻塞一直等待
            waitlock();
            //递归,再次获取锁
            lock();
        }
    }
​
    /**
     * 尝试获取锁,子类实现
     */
    protected abstract boolean trylock() ;
​
​
    /**
     * 等待获取锁,子类实现
     */
    protected abstract void waitlock() throws exception;
​
​
    /**
     * 解锁:删除节点或者直接断开连接
     */
    @override
    public  abstract void unlock() throws exception;
}

 

  • 排他锁的具体实现类如下:

/**
 * @description 排他锁的实现类,继承模板类 abstractzklockmutex
 * @author 陈某
 * @date 2020/4/7 23:23
 */
@data
public class zklockmutex extends abstractzklockmutex {
​
    /**
     * 用于实现线程阻塞
     */
    private countdownlatch countdownlatch;
​
    public zklockmutex(string lockpath,curatorframework zkclient){
        super(lockpath,zkclient);
    }
​
    /**
     * 尝试获取锁:直接创建一个临时节点,如果这个节点存在创建失败抛出异常,表示已经互斥了,
     * 反之创建成功
     * @throws exception
     */
    @override
    protected boolean trylock()  {
        try {
            zkclient.create()
                    //临时节点
                    .withmode(createmode.ephemeral)
                    //权限列表 world:anyone:crdwa
                    .withacl(zoodefs.ids.open_acl_unsafe)
                    .forpath(lockpath,"lock".getbytes());
            return true;
        }catch (exception ex){
            return false;
        }
    }
​
​
    /**
     * 等待锁,一直阻塞监听
     * @return  成功获取锁返回true,反之返回false
     */
    @override
    protected void waitlock() throws exception {
        //监听节点的新增、更新、删除
        final nodecache nodecache = new nodecache(zkclient, lockpath);
        //启动监听
        nodecache.start();
        listenercontainer<nodecachelistener> listenable = nodecache.getlistenable();
​
        //监听器
        nodecachelistener listener=()-> {
            //节点被删除,此时获取锁
            if (nodecache.getcurrentdata() == null) {
                //countdownlatch不为null,表示节点存在,此时监听到节点删除了,因此-1
                if (countdownlatch != null)
                    countdownlatch.countdown();
            }
        };
        //添加监听器
        listenable.addlistener(listener);
​
        //判断节点是否存在
        stat stat = zkclient.checkexists().forpath(lockpath);
        //节点存在
        if (stat!=null){
            countdownlatch=new countdownlatch(1);
            //阻塞主线程,监听
            countdownlatch.await();
        }
        //移除监听器
        listenable.removelistener(listener);
    }
​
    /**
     * 解锁,直接删除节点
     * @throws exception
     */
    @override
    public void unlock() throws exception {
        zkclient.delete().forpath(lockpath);
    }
}

 

可重入性排他锁如何设计

  • 可重入的逻辑很简单,在本地保存一个concurrentmapkey是当前线程,value是定义的数据,结构如下:

 private final concurrentmap<thread, lockdata> threaddata = maps.newconcurrentmap();

 

  • 重入的伪代码如下:

public boolean trylock(){
    //判断当前线程是否在threaddata保存过
    //存在,直接return true
    //不存在执行获取锁的逻辑
    //获取成功保存在threaddata中
}

 

 

读写锁的实现

  • 读写锁分为读锁和写锁,区别如下:

    • 读锁允许多个线程同时读数据,但是在读的同时不允许写线程修改。

    • 写锁在获取后,不允许多个线程同时写或者读。

  • 如何实现读写锁?zk中有一类节点叫临时有序节点,上文有介绍。下面我们来利用临时有序节点来实现读写锁的功能。

 

读锁的设计

  • 读锁允许多个线程同时进行读,并且在读的同时不允许线程进行写操作,实现原理如下图:

  • 根据上图,获取一个读锁分为以下步骤:

    1. 创建临时有序节点(当前线程拥有的读锁或称作读节点)。

    2. 获取路径下所有的子节点,并进行从小到大排序

    3. 获取当前节点前的临近写节点(写锁)。

    4. 如果不存在的临近写节点,则成功获取读锁。

    5. 如果存在临近写节点,对其监听删除事件。

    6. 一旦监听到删除事件,重复2,3,4,5的步骤(递归)

 

写锁的设计

  • 线程一旦获取了写锁,不允许其他线程读和写。实现原理如下:

 

  • 从上图可以看出唯一和写锁不同的就是监听的节点,这里是监听临近节点(读节点或者写节点),读锁只需要监听写节点,步骤如下:

    1. 创建临时有序节点(当前线程拥有的写锁或称作写节点)。

    2. 获取路径下的所有子节点,并进行从小到大排序。

    3. 获取当前节点的临近节点(读节点和写节点)。

    4. 如果不存在临近节点,则成功获取锁。

    5. 如果存在临近节点,对其进行监听删除事件。

    6. 一旦监听到删除事件,重复2,3,4,5的步骤(递归)

 

如何监听

  • 无论是写锁还是读锁都需要监听前面的节点,不同的是读锁只监听临近的写节点,写锁是监听临近的所有节点,抽象出来看其实是一种链式的监听,如下图:

  • 每一个节点都在监听前面的临近节点,一旦前面一个节点删除了,再从新排序后监听前面的节点,这样递归下去。

 

代码实现

  • 作者简单的写了读写锁的实现,先造出来再优化,不建议用在生产环境。代码如下:

public class zklockrw  {
​
    /**
     * 节点路径
     */
    protected string lockpath;
​
    /**
     * zk客户端
     */
    protected curatorframework zkclient;
​
    /**
     * 用于阻塞线程
     */
    private countdownlatch countdownlatch=new countdownlatch(1);
​
​
    private final static string write_name="_w_lock";
​
    private final static string read_name="_r_lock";
​
​
    public zklockrw(string lockpath, curatorframework client) {
        this.lockpath=lockpath;
        this.zkclient=client;
    }
​
    /**
     * 获取锁,如果获取失败一直阻塞
     * @throws exception
     */
    public void lock() throws exception {
        //创建节点
        string node = createnode();
        //阻塞等待获取锁
        trylock(node);
        countdownlatch.await();
    }
​
    /**
     * 创建临时有序节点
     * @return
     * @throws exception
     */
    private string createnode() throws exception {
        //创建临时有序节点
       return zkclient.create()
                .withmode(createmode.ephemeral_sequential)
                .withacl(zoodefs.ids.open_acl_unsafe)
                .forpath(lockpath);
    }
​
    /**
     * 获取写锁
     * @return
     */
    public  zklockrw writelock(){
        return new zklockrw(lockpath+write_name,zkclient);
    }
​
    /**
     * 获取读锁
     * @return
     */
    public  zklockrw readlock(){
        return new zklockrw(lockpath+read_name,zkclient);
    }
​
    private void trylock(string nodepath) throws exception {
        //获取所有的子节点
        list<string> childpaths = zkclient.getchildren()
                .forpath("/")
                .stream().sorted().map(o->"/"+o).collect(collectors.tolist());
​
​
        //第一个节点就是当前的锁,直接获取锁。递归结束的条件
        if (nodepath.equals(childpaths.get(0))){
            countdownlatch.countdown();
            return;
        }
​
        //1. 读锁:监听最前面的写锁,写锁释放了,自然能够读了
        if (nodepath.contains(read_name)){
            //查找临近的写锁
            string prenode = getnearwritenode(childpaths, childpaths.indexof(nodepath));
            if (prenode==null){
                countdownlatch.countdown();
                return;
            }
            nodecache nodecache=new nodecache(zkclient,prenode);
            nodecache.start();
            listenercontainer<nodecachelistener> listenable = nodecache.getlistenable();
            listenable.addlistener(() -> {
                //节点删除事件
                if (nodecache.getcurrentdata()==null){
                    //继续监听前一个节点
                    string nearwritenode = getnearwritenode(childpaths, childpaths.indexof(prenode));
                    if (nearwritenode==null){
                        countdownlatch.countdown();
                        return;
                    }
                    trylock(nearwritenode);
                }
            });
        }
​
        //如果是写锁,前面无论是什么锁都不能读,直接循环监听上一个节点即可,直到前面无锁
        if (nodepath.contains(write_name)){
            string prenode = childpaths.get(childpaths.indexof(nodepath) - 1);
            nodecache nodecache=new nodecache(zkclient,prenode);
            nodecache.start();
            listenercontainer<nodecachelistener> listenable = nodecache.getlistenable();
            listenable.addlistener(() -> {
                //节点删除事件
                if (nodecache.getcurrentdata()==null){
                    //继续监听前一个节点
                    trylock(childpaths.get(childpaths.indexof(prenode) - 1<0?0:childpaths.indexof(prenode) - 1));
                }
            });
        }
    }
​
    /**
     * 查找临近的写节点
     * @param childpath 全部的子节点
     * @param index 右边界
     * @return
     */
    private string  getnearwritenode(list<string> childpath,integer index){
        for (int i = 0; i < index; i++) {
            string node = childpath.get(i);
            if (node.contains(write_name))
                return node;
​
        }
        return null;
    }
​
}

 

curator实现分步式锁

  • curator是netflix公司开源的一个zookeeper客户端,与zookeeper提供的原生客户端相比,curator的抽象层次更高,简化了zookeeper客户端的开发量。

  • curator在分布式锁方面已经为我们封装好了,大致实现的思路就是按照作者上述的思路实现的。中小型互联网公司还是建议直接使用框架封装好的,毕竟稳定,有些大型的互联公司都是手写的,牛逼啊。

  • 创建一个排他锁很简单,如下:

//arg1:curatorframework连接对象,arg2:节点路径
lock=new interprocessmutex(client,path);
//获取锁
lock.acquire();
//释放锁
lock.release();

 

  • 更多的api请参照官方文档,不是此篇文章重点。

  • 至此zk实现分布式锁就介绍完了,如有想要源码的朋友,老规矩,关注微信公众号【码猿技术专栏】,回复关键词分布式锁获取。

一点小福利

  • 对于zookeeper不太熟悉的朋友,陈某特地花费两天时间总结了zk的常用知识点,包括zk常用shell命令、zk权限控制、curator的基本操作api。目录如下:

  • 需要上面pdf文件的朋友,老规矩,关注微信公众号【码猿技术专栏】回复关键词zk总结

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网