当前位置: 移动技术网 > IT编程>数据库>Redis > 基于Redis实现的分布式锁

基于Redis实现的分布式锁

2020年07月17日  | 移动技术网IT编程  | 我要评论

前言

锁这种功能相信只要是接触过多线程的小伙伴们都比较熟悉,但是多线程内部加锁仅能解决单体架构给我们带来的相关问题,在微服务满天飞的今天,程序内部锁已经不能满足我们的需求了,于是便有了基于各种方式实现的分布式锁,今天就介绍一下在分布式的环境中用Redis实现较为常见的分布式锁

本文将会通过迭代多个版本的代码,来由浅到深的展示:
1.无分布式锁,仅在程序内加锁的场景
2.有简单的分布式锁的场景
3.标准分布式锁场景


源码

GitHub : 基于Redis的分布式锁简单demo(一)无分布式锁,仅在程序内加锁的场景
GitHub : 基于Redis的分布式锁简单demo(二)有简单的分布式锁的场景
GitHub : 基于Redis的分布式锁简单demo(三)标准分布式锁场景(基于Redisson的可重入锁)



分布式、微服务和集群的区别

有的刚接触的小伙伴,可能不太能够分清分布式和集群的区别,这两个东西经常一起谈到,有的人可能就会混为一谈,认为分布式就是集群环境,集群环境就会用到分布式。其实不然,这两个还是有本质的区别的。同时,提到分布式又会经常提到微服务,微服务和分布式有很多的共同点,又有着一些区别,分布式强调的是服务的分散化,微服务则是更强调服务的专业化。当然,在实际应用的场景,微服务多是分布式的。
分布式:

是指将一个业务拆分不同的子业务,分布在不同的机器上执行。

集群:

是指多台服务器集中在一起,实现同一业务,可以视为一台计算机。

微服务:

是一种架构风格,一个大型复杂软件应用由一个或多个微服务组成。系统中的各个微服务可被独立部署,各个微服务之间是松耦合的。每个微服务仅关注于完成一件任务并很好地完成该任务。在所有情况下,每个任务代表着一个小的业务能力。

通俗的来说,一个业务逻辑相同的http服务,运行在不同的机器上,同时有一台或一个集群的nginx做负载均衡和转发,就可以视为是分布式了。而微服务则强调注册和发现,会有独立的注册中心,相对独立的网关,通过网关便可以实现服务的负载均衡了。
下面介绍的项目,则是基于SpringCloud的微服务架构,使用了网关进行负载均衡。


Demo背景

模拟一个购物秒杀的请求,在单体架构情况下,都用不到分布式锁,只要在服务内加锁即可实现。而在真正的业务场景中,秒杀的架构,肯定是分布式的,这就得考虑到多服务间共享了,于是Redis就很好的引入了进来。虽然Redis是单线程的,但是有可能存在多个服务同时读取到了这个被秒杀商品的数量,这样再进行修改,任然可能出现超卖,这时候光靠加锁已经解决不了问题了,于是又引入了基于Redis的分布式锁


环境搭建

环境搭建的比较简单,在服务器上用Docker运行了一个Redis(为什么用Docker?因为真的很方便),我选择将Redis的配置文件自己配置,再映射进容器,配置相关的东西,就不在这里介绍了。在下载完Docer环境后运行:

docker run -d -p 6379:6379 --name=redis -v /data/redis/:/etc/redis/ redis:5.0.5 redis-server /etc/redis/redis.conf --appendonly yes

然后通过docker ps查看一下,此时我们的redis已经启动了。
在这里插入图片描述
还需要安装一个Jmeter的压测工具,来模拟高并发的请求。这里我就安装在了我开发的windows操作系统中。先去官网下一个Jmeter,然后需要配置一下环境变量,新建一个系统变量JMETER_HOME,路径填的是你的解压路径,然后在classpath里面添加,%JMETER_HOME%\lib\ext\ApacheJMeter_core.jar;%JMETER_HOME%\lib\jorphan.jar;%JMETER_HOME%\lib/logkit-2.0.jar;然后双击bin目录中的jmeter.bat就可以启动了。
这样基础环境就搭建完了。


项目的搭建

搭建的项目比较简单,就是基于SpringCloud的Eureka做了个注册中心,然后用Zuul做了个网关,然后用一个简单的shop项目,去调Redis,进行拿库存,减库存操作。这个简单的shop项目我们启动多个,用相同的spring.application.name,这样在Erueka中就会注册成同一个服务,而后通过网关的转发,便可实现访问网关+指定后缀,转发到几个shop服务中去,实现负载均衡。
首先搭建一个父工程,详见源码中最外层的pom。
然后以最简单最快捷的方式搭建Eureka-Server和Zuul网关,在配置文件中,方便指定端口,程序名称等参数。
1、再在Eureka-Server的启动类中,添加@EnableEurekaServer注解
2、在其他服务中添加@EnableDiscoveryClient注解。
3、在Zuul中添加@EnableZuulProxy,启用网关。
Erueka-Server的配置文件:

server:
  port: 8008

eureka:
  server:
    enable-self-preservation: false
  instance:
    hostname: 192.168.0.24
  client:
    fetch-registry: false
    register-with-eureka: false
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
spring:
  application:
    name: eureka-server

Zuul的配置文件中,则多了个关于服务的路由的配置:

server:
  port: 8080

spring:
  application:
    name: gateway
eureka:
  instance:
    hostname: 127.0.0.1
  client:
    eureka-server-port: 8008
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${eureka.client.eureka-server-port}/eureka/
zuul:
  routes:
    shop-api:
      path: /shop-api/**
      serviceId: shop

这样基础的配置就完成了,接下来可以启动两个程序看一下,在Erueka的管理界面中,可以看到zuul已经注册了。
在这里插入图片描述
而后,就是shop服务的配置了。shop服务需要引入spring-boot-starter-data-redis,添加关于redis的依赖。这样,我们需要在配置文件中,指定好redis的相关配置:

server:
  port: 8888 #服务端口,在启动多个服务时,仅需修改这个参数即可
eureka:
  instance:
    hostname: 127.0.0.1 #注册中心ip
  client:
    eureka-server-port: 8008 #注册中心端口
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${eureka.client.eureka-server-port}/eureka/
spring:
  redis:
    host: 192.168.0.128 #改成你的redis的ip
    port: 6379	#改成你的Redis的端口
    password: 123456 #改成你的Redis的密码
  application:
    name: shop #给服务起的名称

然后我们声明一个Controller,在Controller中,有一个接口,接口内是实现从redis拿剩余商品数、消费一个后将剩余的商品数,重新放到redis中的操作。对Redis的操作使用了StringRedisTemplate来实现。

    @Autowired
    private StringRedisTemplate redisTemplate;

    @GetMapping("/shop")
    public String shopPhone() {
        synchronized (this) {
            int number = Integer.parseInt(String.valueOf(redisTemplate.opsForValue().get("phone")));
            if (number > 0) {
                number--;
                redisTemplate.opsForValue().set("phone", String.valueOf(number));
                System.out.println("恭喜抢到啦!"+number);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        }
    }

Demo演示

版本一:无分布式锁,仅在程序内加锁的场景

一切完成,我在redis中,将phone这个商品,添加20件库存,然后我们将几个服务一起运行一下。(当然,这里你可以先只启动一个shop去测试一下)
我启动了三个shop服务,在Erueka注册中心中可以看到,分别启动在了8887、8888、8889端口上。
在这里插入图片描述
然后我们使用JMeter
1.创建一个线程组,将Number of Threads 设置为100,这个参数就是模拟并发请求的人数;Ramp-up perlod 设置为0 ,意思是同时时刻启动这100个请求,最后Loop Count设置为3,意思这100个请求执行三次,总并发量为300
在这里插入图片描述
2.然后创建一个HTTPRequest请求,在HTTPRequest中,指定好Url,url需要填写网关的url,通过网关路由到各个服务上去。
在这里插入图片描述
3.再创建一个显示结果的Aggregate Report,里面可以看到实际请求的每秒并发量。最后我们点击中上方的绿色按钮,启动它(确保你的服务已经启动)。
4.启动后,我们打开程序的控制台,可以看到剩余商品数16、12、8、6、5、3、0都出现了一次或两次的重复,说明有几次重复消费了,这样就会造成我们的商品,超量销售,就出大问题啦。
在这里插入图片描述在这里插入图片描述
在这里插入图片描述
这时候就体现出了分布式锁的必要了。
以上的代码位于:基于Redis的分布式锁简单demo(一)无分布式锁,仅在程序内加锁的场景


版本二:在代码内手动添加简单锁的实现场景

首先介绍一下思路:
1.考虑在Redis中存放一把锁,Redis中有一个setNX操作,可以在指定的 key 不存在时,为 key 设置指定的值,并返回是否设置成功的一个boolean类型。
2.依赖setNX和Redis是单线程的特性,我们可以简单的实现一个分布式锁。
3.比如A服务、B服务、C服务同时有请求进来,但是能获取到该商品的该锁的只会有一个线程。这样在宏观情况下,可以说就是实现了分布式锁。
4.然后往下再细细考虑,加锁,就需要释放锁,那么用try…finally就可以实现了。

		String lock = "phoneLock";
        /*
         * 尝试获取锁,Redis单线程,所以同时只会有一个线程获得锁
         * 未设置超时时间,在服务不出问题的情况下(如服务挂了,网络断了等),可以实现分布式锁
         */
        Boolean result = redisTemplate.opsForValue().setIfAbsent(lock, "phone");
        assert result != null;
        //假如没获取到锁
        if (!result) {
            System.out.println("活动太火爆了,请稍后再试!");
            return "活动太火爆了,请稍后再试!";
        }
        //获取到了,才需要finally去释放锁
        try {
            //取出phone的剩余数量
            String phone = redisTemplate.opsForValue().get("phone");
            if (phone == null) {
                System.out.println("没抢到...");
                return "没抢到...";
            }
            int number = Integer.parseInt(phone);
            //还有剩余
            if (number > 0) {
                //消费一个
                int newNumber = number - 1;
                //将消费完的phone的数量,重新放到redis中
                redisTemplate.opsForValue().set("phone", String.valueOf(newNumber));
                System.out.println("恭喜抢到啦!" + newNumber);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        } finally {
            //释放锁
            redisTemplate.delete(lock);
        }

执行结果的“活动太火爆了,请稍后再试!”有点多,所以我把日志做了下过滤:
在这里插入图片描述在这里插入图片描述在这里插入图片描述
可以看到,的确没有再出现超售的情况了。

但是再细想,还会出现什么问题呢?

当加了锁,程序挂了,那这个锁就成了死锁了,再也没有程序能获取到它,它也会在Redis,永远得不到释放。

怎么解决这个问题呢?

很简单加一个锁超时时间就行了嘛。

但是真有这么简单吗?

加超时时间,说的很简单,但是具体加多久的超时时间呢?
为了确保服务的可靠性,总不能我还没消费完,锁就自动失效了吧?
那样其他的服务就可以拿到锁了,还是会出现超卖现象,还有一个严重的问题。

1.假如A线程先抢到了锁lock,设置了10秒的超时时间,但是10秒后,其实A并没有执行完成,但此时的锁已经失效了,B线程也可以拿到同一把lock锁了,这是第一个问题。
2.第二个问题是,别忘记,A线程还有一个释放锁的操作呀,假设B线程现在并未执行完成,但是A线程执行完成了,A线程以为是它加的锁,然后把锁释放了,这样就会造成大量的循环的问题出现。

于是再考虑,有没有办法解决上面的这个问题呢?

当然是有的,我们给每一个程序加一个唯一标识,让线程解锁的时候判断一下,是不是当前线程加的锁,是的话再将它解锁。

于是便有了以下的代码。

        String lock = "phoneLock";
		/*
         * 标识当前线程,没有该参数,可能会存在的问题:
         *  线程还没消费完,但是锁却被别的线程拿到了。
         *  而后该线程再释放锁,导致又有第三个线程可以拿到该锁,造成循环的问题。
         * 有了该标识,便可解决该问题。
         */
        String uuid = UUID.randomUUID().toString();
        //此处的时间设置多少,都是不合理的,只是为了防止比如程序挂了,网络断了等问题造成的死锁
        Boolean result = redisTemplate.opsForValue().setIfAbsent(lock, uuid, 5, TimeUnit.SECONDS);
        assert result != null;
        //假如没获取到锁
        if (!result) {
            System.out.println("活动太火爆了,请稍后再试!");
            return "活动太火爆了,请稍后再试!";
        }
        //获取到了,才需要finally去释放锁
        try {
            //取出phone的剩余数量
            String phone = redisTemplate.opsForValue().get("phone");
            if (phone == null) {
                System.out.println("没抢到...");
                return "没抢到...";
            }
            int number = Integer.parseInt(phone);
            //还有剩余
            if (number > 0) {
                //消费一个
                int newNumber = number - 1;
                //将消费完的phone的数量,重新放到redis中
                redisTemplate.opsForValue().set("phone", String.valueOf(newNumber));
                System.out.println("恭喜抢到啦!" + newNumber);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        } finally {
            //获取Redis中锁存放的ID
            String id = redisTemplate.opsForValue().get(lock);
            //是当前线程设置的,才释放锁
            if (uuid.equals(id)) {
                //释放锁
                redisTemplate.delete(lock);
            }
        }

重新执行,结果还是一样,没有出现超卖的情况
思考:但是真的就没有问题了吗?
以上的代码在基于Redis的分布式锁简单demo(二)有简单的分布式锁的场景


版本三:使用成熟的第三方锁Redisson加锁的实现场景

问题还是会有的,就比如上面提出的具体要加多久的超时时间呢?单体Redis虽然能支持每秒10w+QPS,但是假如还是不够呢?这就需要用到Redis集群了,引入Redis集群就又会出现新的问题,假如Redis写库已经写入了数据,但是读库还没有同步到这个数据就又被新的线程请求了,这样同样会导致超出预期的问题发生。
那么行业内成熟的分布式锁是怎么实现的呢?
有几种方案:

1.基于数据库
2.基于Zookeeper
3.基于Redis等单线程的内存数据库

基于Redis又有Jedis,Redisson,Lettuce等三种解决方案,本文就介绍基于Redisson实现的分布式锁。
Redisson中包含了很多的功能,Redisson官网介绍的有以下的功能。
大概有:支持分布式,分布式Java锁和同步器,分布式Java服务,分布式Java对象,云上的Redis等等等等,我们这里就用到了分布式Java锁和同步器这块的功能。
在这里插入图片描述
在Redisson的分布式锁中,它实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock)、联锁(MultiLock)、红锁(RedLock)、读写锁(ReadWriteLock)、信号量(Semaphore)、可过期性信号量(PermitExpirableSemaphore)、闭锁(CountDownLatch)等常见的8种锁。
本文就以最简单的可重入锁进行演示,想了解更多的小伙伴,可以自行尝试一下各种锁。GitHub中也有相关的介绍Redisson目录
可重入锁,简单来说便是在获取到锁之后,调用一个新的线程去轮询(轮询间隔大概是锁超时时间的1/3)的查看是否已到超时时间,到了而线程还没执行完,就给这个锁续上时间以便它能拿到锁继续执行,保证不被别的线程抢走。
官方的介绍是:

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

要使用Redisson,就当然要引入依赖,在spring-boot-starter中,也有相关的依赖,直接引入如下依赖就可以了。

		<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>${redisson.version}</version>
        </dependency>

而后,是需要配置相关参数的,如果是集群的,配置的就比较多了,这里提供一个官方的配置方法,作为演示,只使用最简单的单机式Redis,所以就添加一个Redisson的配置类就可以了。注入一个RedissonClient的bean,填好相关的地址和密码就可以了。

	@Bean
    public RedissonClient redisson() throws IOException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+url + ":" + port).setPassword(password);
        return Redisson.create(config);
    }

最后,再对我们的服务进行一些小的修改,把原先我们自己实现的分布式锁给替换成Redisson实现的分布式锁。

		//标识锁用
        String lock = "phoneLock";
        //获取锁
        RLock rLock = redissonClient.getLock(lock);
        if (rLock.isLocked()) {
            System.out.println("活动太火爆了,请稍后再试!");
            return "活动太火爆了,请稍后再试!";
        }
        //获取到了,才需要finally去释放锁
        try {
            //加锁
            rLock.lock(5, TimeUnit.SECONDS);
            //取出phone的剩余数量
            String phone = redisTemplate.opsForValue().get("phone");
            if (phone == null) {
                System.out.println("没抢到...");
                return "没抢到...";
            }
            int number = Integer.parseInt(phone);
            //还有剩余
            if (number > 0) {
                //消费一个
                int newNumber = number - 1;
                //将消费完的phone的数量,重新放到redis中
                redisTemplate.opsForValue().set("phone", String.valueOf(newNumber));
                System.out.println("恭喜抢到啦!" + newNumber);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        } finally {
            if (rLock.isLocked()) {
                //释放锁
                rLock.unlock();
            }
        }

然后同样的执行一下,输出结果如下。
在这里插入图片描述在这里插入图片描述在这里插入图片描述
通过Redisson,我们可以非常方便快捷的使用分布式锁。本项目就演示了简单的可重入锁的使用,不同场景需求Redisson也给我们提供了不同的锁,具体问题,具体分析需要什么锁。

最后再简单介绍一下各种锁吧:

  1. 可重入锁
    基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
    大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
    另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
  2. 公平锁
    它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
  3. 联锁
    基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
  4. 红锁(RedLock)
    基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
  5. 读写锁(ReadWriteLock)
    分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
  6. 信号量(Semaphore)
    基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
  7. 可过期性信号量(PermitExpirableSemaphore)
    基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
  8. 闭锁(CountDownLatch)
    基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

以上代码在基于Redis的分布式锁简单demo(三)标准分布式锁场景(基于Redisson的可重入锁)

本文地址:https://blog.csdn.net/mingwei_cheng/article/details/107369378

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网