当前位置: 移动技术网 > IT编程>开发语言>Java > java使用zookeeper实现的分布式锁示例

java使用zookeeper实现的分布式锁示例

2019年07月22日  | 移动技术网IT编程  | 我要评论

口袋西游外挂,动漫美女被虐游戏,泉州公交查询

使用zookeeper实现的分布式锁

分布式锁,实现了lock接口

复制代码 代码如下:

package com.concurrent;

import java.io.ioexception;
import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;

import org.apache.zookeeper.createmode;
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zoodefs;
import org.apache.zookeeper.zookeeper;
import org.apache.zookeeper.data.stat;

/**
   distributedlock lock = null;
 try {
  lock = new distributedlock("127.0.0.1:2182","test");
  lock.lock();
  //do something...
 } catch (exception e) {
  e.printstacktrace();
 }
 finally {
  if(lock != null)
   lock.unlock();
 }
 * @author xueliang
 *
 */
public class distributedlock implements lock, watcher{
 private zookeeper zk;
 private string root = "/locks";//根
 private string lockname;//竞争资源的标志
 private string waitnode;//等待前一个锁
 private string myznode;//当前锁
 private countdownlatch latch;//计数器
 private int sessiontimeout = 30000;
 private list<exception> exception = new arraylist<exception>();

 /**
  * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
  * @param config 127.0.0.1:2181
  * @param lockname 竞争资源标志,lockname中不能包含单词lock
  */
 public distributedlock(string config, string lockname){
  this.lockname = lockname;
  // 创建一个与服务器的连接
   try {
   zk = new zookeeper(config, sessiontimeout, this);
   stat stat = zk.exists(root, false);
   if(stat == null){
    // 创建根节点
    zk.create(root, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.persistent);
   }
  } catch (ioexception e) {
   exception.add(e);
  } catch (keeperexception e) {
   exception.add(e);
  } catch (interruptedexception e) {
   exception.add(e);
  }
 }

 /**
  * zookeeper节点的监视器
  */
 public void process(watchedevent event) {
  if(this.latch != null) { 
            this.latch.countdown(); 
        }
 }

 public void lock() {
  if(exception.size() > 0){
   throw new lockexception(exception.get(0));
  }
  try {
   if(this.trylock()){
    system.out.println("thread " + thread.currentthread().getid() + " " +myznode + " get lock true");
    return;
   }
   else{
    waitforlock(waitnode, sessiontimeout);//等待锁
   }
  } catch (keeperexception e) {
   throw new lockexception(e);
  } catch (interruptedexception e) {
   throw new lockexception(e);
  }
 }

 public boolean trylock() {
  try {
   string splitstr = "_lock_";
   if(lockname.contains(splitstr))
    throw new lockexception("lockname can not contains \\u000b");
   //创建临时子节点
   myznode = zk.create(root + "/" + lockname + splitstr, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.ephemeral_sequential);
   system.out.println(myznode + " is created ");
   //取出所有子节点
   list<string> subnodes = zk.getchildren(root, false);
   //取出所有lockname的锁
   list<string> lockobjnodes = new arraylist<string>();
   for (string node : subnodes) {
    string _node = node.split(splitstr)[0];
    if(_node.equals(lockname)){
     lockobjnodes.add(node);
    }
   }
   collections.sort(lockobjnodes);
   system.out.println(myznode + "==" + lockobjnodes.get(0));
   if(myznode.equals(root+"/"+lockobjnodes.get(0))){
    //如果是最小的节点,则表示取得锁
             return true;
         }
   //如果不是最小的节点,找到比自己小1的节点
   string submyznode = myznode.substring(myznode.lastindexof("/") + 1);
   waitnode = lockobjnodes.get(collections.binarysearch(lockobjnodes, submyznode) - 1);
  } catch (keeperexception e) {
   throw new lockexception(e);
  } catch (interruptedexception e) {
   throw new lockexception(e);
  }
  return false;
 }

 public boolean trylock(long time, timeunit unit) {
  try {
   if(this.trylock()){
    return true;
   }
         return waitforlock(waitnode,time);
  } catch (exception e) {
   e.printstacktrace();
  }
  return false;
 }

 private boolean waitforlock(string lower, long waittime) throws interruptedexception, keeperexception {
        stat stat = zk.exists(root + "/" + lower,true);
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
        if(stat != null){
         system.out.println("thread " + thread.currentthread().getid() + " waiting for " + root + "/" + lower);
         this.latch = new countdownlatch(1);
         this.latch.await(waittime, timeunit.milliseconds);
         this.latch = null;
        }
        return true;
    }

 public void unlock() {
  try {
   system.out.println("unlock " + myznode);
   zk.delete(myznode,-1);
   myznode = null;
   zk.close();
  } catch (interruptedexception e) {
   e.printstacktrace();
  } catch (keeperexception e) {
   e.printstacktrace();
  }
 }

 public void lockinterruptibly() throws interruptedexception {
  this.lock();
 }

 public condition newcondition() {
  return null;
 }

 public class lockexception extends runtimeexception {
  private static final long serialversionuid = 1l;
  public lockexception(string e){
   super(e);
  }
  public lockexception(exception e){
   super(e);
  }
 }

}

并发测试工具

复制代码 代码如下:

package com.concurrent;

import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.copyonwritearraylist;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.atomic.atomicinteger;

/**
  concurrenttask[] task = new concurrenttask[5];
  for(int i=0;i<task.length;i++){
      task[i] = new concurrenttask(){
    public void run() {
     system.out.println("==============");

    }};
  }
  new concurrenttest(task);
 * @author xueliang
 *
 */
public class concurrenttest {
 private countdownlatch startsignal = new countdownlatch(1);//开始阀门
 private countdownlatch donesignal = null;//结束阀门
 private copyonwritearraylist<long> list = new copyonwritearraylist<long>();
 private atomicinteger err = new atomicinteger();//原子递增
 private concurrenttask[] task = null;

 public concurrenttest(concurrenttask... task){
  this.task = task;
  if(task == null){
   system.out.println("task can not null");
   system.exit(1);
  }
  donesignal = new countdownlatch(task.length);
  start();
 }
 /**
  * @param args
  * @throws classnotfoundexception
  */
 private void start(){
  //创建线程,并将所有线程等待在阀门处
  createthread();
  //打开阀门
  startsignal.countdown();//递减锁存器的计数,如果计数到达零,则释放所有等待的线程
  try {
   donesignal.await();//等待所有线程都执行完毕
  } catch (interruptedexception e) {
   e.printstacktrace();
  }
  //计算执行时间
  getexetime();
 }
 /**
  * 初始化所有线程,并在阀门处等待
  */
 private void createthread() {
  long len = donesignal.getcount();
  for (int i = 0; i < len; i++) {
   final int j = i;
   new thread(new runnable(){
    public void run() {
     try {
      startsignal.await();//使当前线程在锁存器倒计数至零之前一直等待
      long start = system.currenttimemillis();
      task[j].run();
      long end = (system.currenttimemillis() - start);
      list.add(end);
     } catch (exception e) {
      err.getandincrement();//相当于err++
     }
     donesignal.countdown();
    }
   }).start();
  }
 }
 /**
  * 计算平均响应时间
  */
 private void getexetime() {
  int size = list.size();
  list<long> _list = new arraylist<long>(size);
  _list.addall(list);
  collections.sort(_list);
  long min = _list.get(0);
  long max = _list.get(size-1);
  long sum = 0l;
  for (long t : _list) {
   sum += t;
  }
  long avg = sum/size;
  system.out.println("min: " + min);
  system.out.println("max: " + max);
  system.out.println("avg: " + avg);
  system.out.println("err: " + err.get());
 }

 public interface concurrenttask {
  void run();
 }

}

测试

复制代码 代码如下:

package com.concurrent;

import com.concurrent.concurrenttest.concurrenttask;

public class zktest {
 public static void main(string[] args) {
  runnable task1 = new runnable(){
   public void run() {
    distributedlock lock = null;
    try {
     lock = new distributedlock("127.0.0.1:2182","test1");
     //lock = new distributedlock("127.0.0.1:2182","test2");
     lock.lock();
     thread.sleep(3000);
     system.out.println("===thread " + thread.currentthread().getid() + " running");
    } catch (exception e) {
     e.printstacktrace();
    }
    finally {
     if(lock != null)
      lock.unlock();
    }

   }

  };
  new thread(task1).start();
  try {
   thread.sleep(1000);
  } catch (interruptedexception e1) {
   e1.printstacktrace();
  }
  concurrenttask[] tasks = new concurrenttask[60];
  for(int i=0;i<tasks.length;i++){
   concurrenttask task3 = new concurrenttask(){
    public void run() {
     distributedlock lock = null;
     try {
      lock = new distributedlock("127.0.0.1:2183","test2");
      lock.lock();
      system.out.println("thread " + thread.currentthread().getid() + " running");
     } catch (exception e) {
      e.printstacktrace();
     }
     finally {
      lock.unlock();
     }

    }
   };
   tasks[i] = task3;
  }
  new concurrenttest(tasks);
 }
}

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网