Redis 实现分布式锁 && Redisson

Java 已经提供了 synchronized 关键字和 Lock 实现类,为什么还需要分布式锁?因为他们都是本地锁,或者说是进程锁,在 web 环境中,对于核心业务通常有大量的并发请求,采用 synchronized 和 Lock

Redis 实现分布式锁 && Redisson

Java 已经提供了 synchronized 关键字和 Lock 实现类,为什么还需要分布式锁?因为他们都是本地锁,或者说是进程锁,在 web 环境中,对于核心业务通常有大量的并发请求,采用 synchronized 和 Lock,假如四台订单服务负载均衡 10000 并发请求:
本地锁只能锁住自己当前服务,如果部署单节点,那么没有什么问题。以下面代码为例

codelayui.code
 @Service
    public class TestService {
        @Transactional
        public void test(){
            synchronized (this){
                //执行业务...
            }
        }
    }

让我们看下这段代码,synchronized 代码块锁的是 this ,this 是当前 Service 的实例,在一个服务中,只有一个 Spring 容器,默认 Bean 是单例的,所以这个 synchronized 能锁住当前这个服务的所有请求。然而生产环境都是服务集群,基本不可能有单服务节点的情况。这样一来,本地锁就有了一些局限,它无法保证同一时间只能有一个用户请求执行业务代码。如果是为了减少数据库的并发压力,那其实本地锁没有问题。如上图所示,即使本地锁无法锁住其他实例的请求,顶多并发请求和实例个数一样多,MySQL 还是能顶得住这些压力。只是特定对于某些业务场景必须同一时间只有一个请求抢占到锁,那么就要用分布式锁了。比如博主公司的 APP,会员提交订单,由于我们自己以前技术实现我们需要加分布式锁来确保同一个用户同一时间只能提交一个订单,避免重复使用优惠券。
实现分布式锁其核心就是我们要把锁放在一个公共访问的地方,这样才能真正锁住所有请求。Redis 这种基于内存的中间件数据库简直是量身打造。Redis 提供了 SET EX 命令可以用来实现分布式锁。在 SpringBoot 中 spring-boot-starter-data-redis 给我们提供了相关 API。

codelayui.code
@Transactional
    public void test(){
        Boolean flag = stringRedisTemplate.opsForValue().
                setIfAbsent("lock", "value", 5, TimeUnit.SECONDS);
        //抢占锁成功
        if(flag != null && flag){
            //执行业务...
            //释放锁
            stringRedisTemplate.delete("lock");
        }else {
            //自旋获取锁,也可以睡 100ms 来降低自旋频率
            test();
        }
    }

最简单的分布式锁就完成了。这个方法是往 Redis 设置一个 key 如果不存在就设置进去,返回 true,如果存在就返回 false。抢不到锁就去自旋,直到获取到锁为止。但其实仔细分析上面的分布式锁代码,这里有很大问题。
假设我们线程 A 业务代码执行的时间超过五秒钟,那么还没删除锁的时候其实这个锁就过期了。那么其他线程 B 就抢到锁了,其他线程 B 正在执行业务,线程 A 把锁删掉了,但是 A 删掉的其实是线程 B 设置的锁。线程 C 又抢到锁,线程 C 又执行业务,这样就乱掉了。
所以这里为了保证每一个线程删除的是自己的锁,我们可以在设置锁的时候给一个仅属于当前线程的 UUID,代码做如下改进

codelayui.code
@Transactional
public void test(){
    String uuid = UUID.randomUUID().toString().replace("-","");
    Boolean flag = stringRedisTemplate.opsForValue().
    setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS);
    //抢占锁成功
    if(flag != null && flag){
        //执行业务...
        //释放锁(先判断锁是自己的,再去删除锁)
        if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){
            stringRedisTemplate.delete("lock");
        } else { 
           //自旋获取锁,也可以睡 100ms 来降低自旋频率
           test();
        }
    }
}

这样改进之后可以说是相对比较好的分布式锁了,线程不会误删其他线程的锁。但是如果较真的话,这里还有点问题。因为我们保证了加锁和设置过期时间是原子操作,所以加锁是安全的,但是解锁我们没有和判断是否是自己的锁做成原子操作。要保证判断锁是自己的和删除锁作为原子操作,这样才是真正安全的分布式锁,我们需要借助 Redis 的 lua 脚本。它可以绑定多个操作原子执行。参考 Redis 中文网 它给我们提供了一段删除锁的 Lua 脚本

codelayui.code
if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end
再次对上面分布式锁代码改进
codelayui.code
@Transactional
public void test(){
    String uuid = UUID.randomUUID().toString().replace("-","");
    Boolean flag = stringRedisTemplate.opsForValue()
    .setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS);
    //抢占锁成功
    if(flag != null && flag){
        //执行业务...
        //释放锁(先判断锁是自己加的,再去删除锁)
        String lua = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else  return 0 end";
        Long lock = stringRedisTemplate.execute(new DefaultRedisScript<>(lua, Long.class), Collections.singletonList("lock"), uuid);
    } else {
        //自旋获取锁,也可以睡 100ms 来降低自旋频率
        test();
    }
}

这段代码就真正的实现了可靠的分布式锁,但是参考 Redis 中文网 它告诉我们这种方式并不推荐用来做分布式锁,分布式锁有更专业的框架 —— Redisson 。
Redisson Github Wiki网址:https://github.com/redisson/redisson/wiki/目录 基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。所以,以前本地锁 Lock 实现类怎么用,这个就怎么用。只不过这个功能更强大!常用的可重入锁:

codelayui.code
RLock lock = redissonClient.getLock("lock");//可重入锁
        try{
            lock.lock();
        }finally {
            lock.unlock();   
        }

为什么这里不用自旋呢?因为 lock.lock() 或者 lock.tryLock() 这些加锁的方法都是阻塞方法,拿不到锁就会阻塞在这一直等,所以这里不需要自旋。

codelayui.code
RLock lock = redissonClient.getLock("lock");//可重入锁
RLock fairLock = redissonClient.getFairLock("fairLock");//公平锁
RLock multiLock = redissonClient.getMultiLock(lock, fairLock);//联锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");//读写锁
RLock readLock = readWriteLock.readLock();//读锁
RLock writeLock = readWriteLock.writeLock();//写锁
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");//信号量
RPermitExpirableSemaphore mySemaphore = redissonClient.getPermitExpirableSemaphore("mySemaphore");//可过期信号量
RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");//闭锁

以上是Redisson 提供的各种锁,Redisson使用看门狗机制不停的给锁续期来避免死锁。而且绝大部分 Redisson 的锁相关 API 底层都是 Lua 脚本执行的,保证了多个命令的原子性。
具体使用方法可以参考 wiki 网址,Redisson 不仅仅提供了分布式锁,还提供了各种Redis高级特性 ,具体可以查询Github https://github.com/redisson/redisson/wiki/目录