当前位置: 移动技术网 > IT编程>开发语言>Java > 详解Java如何实现基于Redis的分布式锁

详解Java如何实现基于Redis的分布式锁

2019年07月22日  | 移动技术网IT编程  | 我要评论
前言 单jvm内同步好办, 直接用jdk提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用redis,当然还有很多其他的实现方

前言

单jvm内同步好办, 直接用jdk提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用redis,当然还有很多其他的实现方式。其实基于redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了。

我这里不实现jdk的java.util.concurrent.locks.lock接口,而是自定义一个,因为jdk的有个newcondition方法我这里暂时没实现。这个lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)

package cc.lixiaohui.lock;

import java.util.concurrent.timeunit;

public interface lock {

 /**
 * 阻塞性的获取锁, 不响应中断
 */
 void lock;
 
 /**
 * 阻塞性的获取锁, 响应中断
 * 
 * @throws interruptedexception
 */
 void lockinterruptibly throws interruptedexception;
 
 /**
 * 尝试获取锁, 获取不到立即返回, 不阻塞
 */
 boolean trylock;
 
 /**
 * 超时自动返回的阻塞性的获取锁, 不响应中断
 * 
 * @param time
 * @param unit
 * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未���取到锁
  * 
 */
 boolean trylock(long time, timeunit unit);
 
 /**
 * 超时自动返回的阻塞性的获取锁, 响应中断
 * 
 * @param time
 * @param unit
 * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁
 * @throws interruptedexception 在尝试获取锁的当前线程被中断
 */
 boolean trylockinterruptibly(long time, timeunit unit) throws interruptedexception;
 
 /**
 * 释放锁
 */
 void unlock;
 
}

看其抽象实现:

package cc.lixiaohui.lock;

import java.util.concurrent.timeunit;

/**
 * 锁的骨架实现, 真正的获取锁的步骤由子类去实现.
 * 
 * @author lixiaohui
 *
 */
public abstract class abstractlock implements lock {

 /**
 * <pre>
 * 这里需不需要保证可见性值得讨论, 因为是分布式的锁, 
 * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性 
 * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.
 * </pre>
 */
 protected volatile boolean locked;

 /**
 * 当前jvm内持有该锁的线程(if have one)
 */
 private thread exclusiveownerthread;

 public void lock {
 try {
 lock(false, 0, null, false);
 } catch (interruptedexception e) {
 // todo ignore
 }
 }

 public void lockinterruptibly throws interruptedexception {
 lock(false, 0, null, true);
 }

 public boolean trylock(long time, timeunit unit) {
 try {
 return lock(true, time, unit, false);
 } catch (interruptedexception e) {
 // todo ignore
 }
 return false;
 }

 public boolean trylockinterruptibly(long time, timeunit unit) throws interruptedexception {
 return lock(true, time, unit, true);
 }

 public void unlock {
 // todo 检查当前线程是否持有锁
 if (thread.currentthread != getexclusiveownerthread) {
 throw new illegalmonitorstateexception("current thread does not hold the lock");
 }
 
 unlock0;
 setexclusiveownerthread(null);
 }

 protected void setexclusiveownerthread(thread thread) {
 exclusiveownerthread = thread;
 }

 protected final thread getexclusiveownerthread {
 return exclusiveownerthread;
 }

 protected abstract void unlock0;
 
 /**
 * 阻塞式获取锁的实现
 * 
 * @param usetimeout 
 * @param time
 * @param unit
 * @param interrupt 是否响应中断
 * @return
 * @throws interruptedexception
 */
 protected abstract boolean lock(boolean usetimeout, long time, timeunit unit, boolean interrupt) throws interruptedexception;

}

基于redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:

package cc.lixiaohui.lock;

import java.util.concurrent.timeunit;

import redis.clients.jedis.jedis;

/**
 * <pre>
 * 基于redis的setnx操作实现的分布式锁
 * 
 * 获取锁时最好用lock(long time, timeunit unit), 以免网路问题而导致线程一直阻塞
 * 
 * <a href="http://redis.io/commands/setnx">setnc操作参考资料</a>
 * </pre>
 * 
 * @author lixiaohui
 *
 */
public class redisbaseddistributedlock extends abstractlock {
 
 private jedis jedis;
 
 // 锁的名字
 protected string lockkey;
 
 // 锁的有效时长(毫秒)
 protected long lockexpires;
 
 public redisbaseddistributedlock(jedis jedis, string lockkey, long lockexpires) {
 this.jedis = jedis;
 this.lockkey = lockkey;
 this.lockexpires = lockexpires;
 }

 // 阻塞式获取锁的实现
 protected boolean lock(boolean usetimeout, long time, timeunit unit, boolean interrupt) throws interruptedexception{
 if (interrupt) {
 checkinterruption;
 }
 
 long start = system.currenttimemillis;
 long timeout = unit.tomillis(time); // if !usetimeout, then it's useless
 
 while (usetimeout ? istimeout(start, timeout) : true) {
 if (interrupt) {
 checkinterruption;
 }
 
 long lockexpiretime = system.currenttimemillis + lockexpires + 1;//锁超时时间
 string stringoflockexpiretime = string.valueof(lockexpiretime);
 
 if (jedis.setnx(lockkey, stringoflockexpiretime) == 1) { // 获取到锁
 // todo 成功获取到锁, 设置相关标识
 locked = true;
 setexclusiveownerthread(thread.currentthread);
 return true;
 }
 
 string value = jedis.get(lockkey);
 if (value != null && istimeexpired(value)) { // lock is expired
 // 假设多个线程(非单jvm)同时走到这里
 string oldvalue = jedis.getset(lockkey, stringoflockexpiretime); // getset is atomic
 // 但是走到这里时每个线程拿到的oldvalue肯定不可能一样(因为getset是原子性的)
 // 加入拿到的oldvalue依然是expired的,那么就说明拿到锁了
 if (oldvalue != null && istimeexpired(oldvalue)) {
  // todo 成功获取到锁, 设置相关标识
  locked = true;
  setexclusiveownerthread(thread.currentthread);
  return true;
 }
 } else { 
 // todo lock is not expired, enter next loop retrying
 }
 }
 return false;
 }
 
 public boolean trylock {
 long lockexpiretime = system.currenttimemillis + lockexpires + 1;//锁超时时间
 string stringoflockexpiretime = string.valueof(lockexpiretime);
 
 if (jedis.setnx(lockkey, stringoflockexpiretime) == 1) { // 获取到锁
 // todo 成功获取到锁, 设置相关标识
 locked = true;
 setexclusiveownerthread(thread.currentthread);
 return true;
 }
 
 string value = jedis.get(lockkey);
 if (value != null && istimeexpired(value)) { // lock is expired
 // 假设多个线程(非单jvm)同时走到这里
 string oldvalue = jedis.getset(lockkey, stringoflockexpiretime); // getset is atomic
 // 但是走到这里时每个线程拿到的oldvalue肯定不可能一样(因为getset是原子性的)
 // 假如拿到的oldvalue依然是expired的,那么就说明拿到锁了
 if (oldvalue != null && istimeexpired(oldvalue)) {
 // todo 成功获取到锁, 设置相关标识
 locked = true;
 setexclusiveownerthread(thread.currentthread);
 return true;
 }
 } else { 
 // todo lock is not expired, enter next loop retrying
 }
 
 return false;
 }
 
 /**
 * queries if this lock is held by any thread.
 * 
 * @return {@code true} if any thread holds this lock and
  *   {@code false} otherwise
 */
 public boolean islocked {
 if (locked) {
 return true;
 } else {
 string value = jedis.get(lockkey);
 // todo 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,
 // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断
 // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就
 // 不是同步控制, 它只是一种锁状态的报告.
 return !istimeexpired(value);
 }
 }

 @override
 protected void unlock0 {
 // todo 判断锁是否过期
 string value = jedis.get(lockkey);
 if (!istimeexpired(value)) {
 dounlock;
 }
 }

 private void checkinterruption throws interruptedexception {
 if(thread.currentthread.isinterrupted) {
 throw new interruptedexception;
 }
 }
 
 private boolean istimeexpired(string value) {
 return long.parselong(value) < system.currenttimemillis;
 }
 
 private boolean istimeout(long start, long timeout) {
 return start + timeout > system.currenttimemillis;
 }
 
 private void dounlock {
 jedis.del(lockkey);
 }

}

如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承abstractlock并实现lock(boolean usetimeout, long time, timeunit unit, boolean interrupt)unlock0方法即可(所谓抽象嘛)

测试

模拟全局id增长器,设计一个idgenerator类,该类负责生成全局递增id,其代码如下:

package cc.lixiaohui.lock;

import java.math.biginteger;
import java.util.concurrent.timeunit;

/**
 * 模拟id生成 
 * @author lixiaohui
 *
 */
public class idgenerator {

 private static biginteger id = biginteger.valueof(0);

 private final lock lock;

 private static final biginteger increment = biginteger.valueof(1);

 public idgenerator(lock lock) {
 this.lock = lock;
 }
 
 public string getandincrement {
 if (lock.trylock(3, timeunit.seconds)) {
 try {
 // todo 这里获取到锁, 访问临界区资源
 return getandincrement0;
 } finally {
 lock.unlock;
 }
 }
 return null;
 //return getandincrement0;
 }

 private string getandincrement0 {
 string s = id.tostring;
 id = id.add(increment);
 return s;
 }
}

测试主逻辑:同一个jvm内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取id(我这里并不是死循环而是跑20s),获取到id存到同一个set里面,在存之前先检查该idset中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同jvm(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:

package cc.lixiaohui.distributedlock.distributedlock;

import java.util.hashset;
import java.util.set;

import org.junit.test;

import redis.clients.jedis.jedis;
import cc.lixiaohui.lock.idgenerator;
import cc.lixiaohui.lock.lock;
import cc.lixiaohui.lock.redisbaseddistributedlock;

public class idgeneratortest {
 
 private static set<string> generatedids = new hashset<string>;
 
 private static final string lock_key = "lock.lock";
 private static final long lock_expire = 5 * 1000;
 
 @test
 public void test throws interruptedexception {
 jedis jedis1 = new jedis("localhost", 6379);
 lock lock1 = new redisbaseddistributedlock(jedis1, lock_key, lock_expire);
 idgenerator g1 = new idgenerator(lock1);
 idconsumemission consume1 = new idconsumemission(g1, "consume1");
 
 jedis jedis2 = new jedis("localhost", 6379);
 lock lock2 = new redisbaseddistributedlock(jedis2, lock_key, lock_expire);
 idgenerator g2 = new idgenerator(lock2);
 idconsumemission consume2 = new idconsumemission(g2, "consume2");
 
 thread t1 = new thread(consume1);
 thread t2 = new thread(consume2);
 t1.start;
 t2.start;
 
 thread.sleep(20 * 1000); //让两个线程跑20秒
 
 idconsumemission.stop;
 
 t1.join;
 t2.join;
 }
 
 static string time {
 return string.valueof(system.currenttimemillis / 1000);
 }
 
 static class idconsumemission implements runnable {

 private idgenerator idgenerator;
 
 private string name;
 
 private static volatile boolean stop;
 
 public idconsumemission(idgenerator idgenerator, string name) {
 this.idgenerator = idgenerator;
 this.name = name;
 }
 
 public static void stop {
 stop = true;
 }
 
 public void run {
 system.out.println(time + ": consume " + name + " start ");
 while (!stop) {
 string id = idgenerator.getandincrement;
 if(generatedids.contains(id)) {
  system.out.println(time + ": duplicate id generated, id = " + id);
  stop = true;
  continue;
 } 
 
 generatedids.add(id);
 system.out.println(time + ": consume " + name + " add id = " + id);
 }
 system.out.println(time + ": consume " + name + " done ");
 }
 
 }
 
}

说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。

测试结果

跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:

idgererator没有加锁(即idgereratorgetandincrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

这个1秒都不到:

这个也1秒都不到:

结束语

好了,以上就是java实现基于redis的分布式锁的全部内容,各位如果发现问题希望能指正,希望这篇文章能对大家的学习和工作带来一定的帮助,如果有疑问可以留言交流。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网