1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

分布式锁之Redis实现

Last updated at Posted at 2022-12-07

redisson 3.16.7

Redisson作为Redis的Java框架,通过Redis实现了一套相对完整的分布式锁的解决方案。这里主要通过其源码分析分布式锁实现的细节。接下来的内容包括:

  • Redis简单介绍
  • Redis分布式锁实现的理论基础
  • 锁的核心概念实现细节
  • 总结

Redis简单介绍

传统的Java锁机制在单体应用中能解决并发访问的问题,但在如今分布式、微服务技术下应用部署都是集群的方式,传统的Java锁无法跨JVM使用,这样分布式锁就应运而生了。通俗的讲锁的核心概念是:当前线程去获取一个锁[资源],能拿到就执行操作,不能拿到就等待[一些锁可设置等待时间]...。

数据库的方式实现分布式锁

根据锁的概念,可通过传统关系型数据库[如MySQL]的SELECT ... FOR UPDATE语义实现分布式锁,这也就是行锁的方式。[这里就不描述细节了]。这种解决方案的缺点是:首先需要封装一套完整的并发锁解决方案处理锁互斥、重入、超时等各种细节;其次行锁性能消耗大,高并发下容易出现性能瓶颈,对应用系统来说是致命的。

Redis作为一款NoSQL的分布式内存数据库,其高速的数据访问特点加上单线程访问特性解决了并发问题及行锁存在的性能问题。

Redis分布式锁实现的理论基础

单线程机制

Redis是通过单线程执行命令的,这样即使是并发的获取锁资源的请求也会被塞进一个命令管道顺序执行。单线程操作不需要任何锁来解决并发问题,另外Redis是内存操作,其速度是非常快的。

Lua脚本的原语操作

Redisson框架中本质就是通过执行Lua脚本来和Redis服务端打交道的。Lua脚本的好处就是:1.可以像C语言一样处理逻辑,可读性高容易理解;2.可以将一块操作[如:存取值、计算、分支处理和发布订阅等]打包执行,而这一块操作是拥有原语语义的原子操作,这是整个锁机制的关键。

实现JDK并发包的Lock接口

Redisson分布式锁实现了JDK并发包中Lock接口,对Java程序员来说在使用起来没有任何学习成本。开箱即用,这才是优秀开源框架该有的样子:)。好了,下面进入源码细节。

锁的核心概念实现细节

加“锁”的本质

首先看一下redislock.lock()的操作主要的流程:

第一步,先去调用尝试获取锁的方法,方法返回的是一个ttl,这个ttl[生存时间]稍微有点难理解,我们把它当成拿到锁还需要的时间【或者说上一个线程持有锁的剩余时间】。ttl为空时说明之前没有线程持有这个锁,获取锁成功。

第二步,如果ttl不为空,将去订阅[subscribe]一个消息频道[channel]。消息订阅是Redis的一个亮点功能,简单的说就是订阅者先指定订阅频道的名称,在往该频道发布消息的时候,订阅者能及时收到该频道值的变化。在这里的使用场景就是观察锁释放情况,当某个线程释放锁redislock.unlock()的时候,就会通过Lua脚本发布一个释放锁的消息UNLOCK_MESSAGE到对应的频道。

第三步,通过自旋的方式获取锁,也就是在一个死循环中不断尝试获取。这一步主要做两个事情:1.重试第一步的取锁的方法,并判断ttl是否为空,如果为空则跳出循环,获取锁成功;2.等待ttl的时间后再次尝试获取锁。注:这里的ttl每次都会重新获取。这里除了获取到锁会跳出循环,另外在内部线程中断的时候也会跳出循环。代码简洁版如下:

// # RedissonLock.java

void lock(long leaseTime, TimeUnit unit, boolean interruptibly) 
{
    long threadId = Thread.currentThread().getId();
    // 1、尝试获取锁
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // 获取到锁
    if (ttl == null) { return; }
    
    // 2、订阅消息
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    
    // 3、自旋获取锁
    while (true) {
        // 重复步骤1获取锁
        ttl = tryAcquire(-1, leaseTime, unit, threadId);
        if (ttl == null) { break; }
        
        // 等待消息
        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        // ...
    }
    
    finally {
        // 取消订阅。已经获取到锁了就不需要监听任何消息了。
        unsubscribe(future, threadId);
    }
}

接下来主要深入到获取锁的核心方法tryAcquire(...)。获取锁的内部实现是异步的,所以在tryAcquire方法里用了一个get(...)操作将异步方法同步化,以便直接获取到结果。代码如下:

private Long tryAcquire(...) {
    // 异步操作同步获取结果
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

异步获取锁的方法tryAcquireAsync()中除了获取锁,还在获取到锁之后做了一个续期的操作。这里的续期怎么理解?暂且认为我们获取锁的时间是有限的,如果一段时间后我们的操作还没有结束就需要去通知Redis延长这个时间。下面是获取锁的实质操作了:Redisson分布式锁的获取锁的本质就是通过执行一段Lua脚本判断redis中是否存在某个值[锁的名称],若不存在则创建以锁的名称为key的HashMap。该数据结构如下:

"lockName": {
    "uuid:threadId":value // value为整数
}

数据结构很简单,lockName代表一个锁对象,一般最好以业务相关的信息命名。map中的key值由随机值的uuid加线程号拼接而成,这样可以联合key所对应的数值实现锁的可重入。类似于JDK的可重入锁,ReentrantLock在获取锁的时候发现是当前线程再次获取则将state变量++1。这个state就和当前的value存储的值一一对应了。创建完锁资源信息[map]后或者锁重入设置value++1后,会给当前lockName的数据设置一个过期时间,并返回空nil[之前说过,这代表已经获取到锁]。

上锁的时候如果发现lockName已经存在了,而且也不是重入的情况。【这里就包括了,其他集群节点发送获取锁的请求或者同一节点其他线程发送请求】这个时候的操作很简单,直接返回查询到lockName的过期时间ttl。拿到过期时间只后,线程继续自旋等待锁释放再尝试获取。下面是具体的Lua脚本相关的代码:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(
    // 锁名称
    getRawName(), 
    // redis编码相关
    LongCodec.INSTANCE, 
    // redis命令相关
    command,
    // 核心:Lua脚本
    "if (redis.call('exists', KEYS[1]) == 0) then " +  
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " + 
        "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
    "return redis.call('pttl', KEYS[1]);",
    // KEY[...]
    // 这里表示KEY的数组只有一个元素也就是KEY[1]=锁名称
    Collections.singletonList(getRawName()), 
    // ARGV...,
    // unit.toMillis(leaseTime)表示ARGV[1]
    // getLockName(threadId)表示ARGV[2]
    unit.toMillis(leaseTime), getLockName(threadId));
}

再来看看续期操作,之前有提到过在成功获取到锁之后会自动续期,官方文档也叫看门狗机制watchdog。首先,为什么在Redis设置锁资源[lockName的value]之后要设置一个超时时间呢?锁用完之后直接删除掉lockName不就行了吗?问题就在于万一什么情况下没有把lockName给及时删除掉呢,那么其他线程或者节点永远也获取不到这把锁了!比如程序异常或者应用宕机都有可能没办法自动删除掉lockName,所以设置过期时间是有必要的。既然有了过期时间就需要维护这个过期时间,不合理处理这个过期时间有什么问题呢?举一个例子:比如获取到锁时过期时间默认设置为30s,线程A拿到锁之后由于任务繁重没能在30s内完成任务,这时如果lockName自动过期给删除了,线程B再次获得锁并且能获取成功...这时线程A和B就同时获取锁了:(。为了避免这一问题,在线程A未完成操作前需要对lockName进行续期

从下面的代码可见续期操作就是开启一个调度任务,定时通过执行Redis的pexpire命令不断刷新过期时间。

// # RedissonBaseLock.java

// 续期操作
void renewExpiration() 
{
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        // 执行Lua脚本
        RFuture<Boolean> future = renewExpirationAsync(threadId);
        
        // 重新调用续期操作方法
        renewExpiration();
    },
    // 默认每隔30/3 秒
    internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}

// Lua脚本[通过锁名称刷新锁资源过期时间]
RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
            "end; " +
        "return 0;",
        Collections.singletonList(getRawName()),
        internalLockLeaseTime, getLockName(threadId));
    }

Lua脚本的语法非常简单,这里就不过多描述。加锁的整个过程需要理解的是:

  1. 使用Lua脚本批量执行多条Redis命令能达到原子操作的效果。
  2. 使用自旋的方式来实现分布式锁的阻塞效果。
  3. 续期的本质。

tryLock(过期时间)说明:一般情况下我们会使用tryLock的方式获取锁,这样可以设置一个等待时间不用一直阻塞线程。tryLock的实现无非就是在自旋的时候去检查一下是否达到了等待的最大时长,如果是则获取失败退出即可。

下面我们用一幅图来归纳下加锁的过程:

Redis_001_分布式锁.png

公平锁的实现

Redisson实现的公平锁相对于非公平锁来说,就是在获取锁和释放锁的Lua脚本中加入了两个辅助容器的操作:一个是名为redisson_lock_queue:{lockKey}的等待队列,装的是等待获取锁的线程信息uuid_x:tid_y;另一个则是名为redisson_lock_timeout:{lockKey}的有序集合,装的是uuid_x:tid_y的信息以及超时信息waitTime。超时信息有序集合的作用在于,等待时间到了之后,这两个容器将该线程的排队信息删除掉。

获取锁的条件:1.当前锁未创建[或者已释放]并且等待队列不存在;2.当前锁未创建[或者已释放]且等待队列的头元素是当前线程;3.当前锁已创建,属于锁重入的情况。

添加队列信息:尝试获取锁失败,需要将线程信息添加到等待队列及超时有序集合中。

移除队列信息:获取到锁之后或者超时时需要将线程信息在等待队列及超时有序集合中移除。

锁的释放

锁的释放的实质就是通过del命令将锁信息清除,以及发布publish锁释放的信息。在公平锁释放的时候,发布的信息会带有等待队列中下一个元素的线程信息,客户端通过之前的订阅就可以感知到哪个线程可以来获取锁了。

MuitiLock & RedLock

RedissonMultiLock是将多个独立的锁组成一个锁,并将它们作为一个锁管理。主要的应用场景是,当需要同时锁定多个资源的时候,可以同时对多个锁进行锁定。其原理就是在获取组合锁MuitiLock的时候迭代获取每一个锁,每个锁都获取到则表示组合锁获取成功,其中每个锁的获取过程同上面一致。在释放组合锁的时候,也是将所有获取的锁依次释放,释放过程也是一样的。

RedLock是基于RedLock算法实现的锁机制,实现的方式是继承于MuitiLock,所以功能实现上几乎与组合锁一致。

总结

Redisson还实现了其他类JDK的锁机制,如SemaphoreCountDownLatch读写锁等。其实现的方式和上面介绍的方式如出一辙:核心都是Lua脚本的使用,主要的核心算法也都都对Redis数据的增删改查。Lua脚本执行Redis命令将多个命令执行原子化;另外Redis执行命令的单线程串行方式有效避免了并发竞争问题;Redis作为内存数据库其处理命令的速度也是非常之快,对分布式加锁的性能影响也相对较小。除此之外,Redisson实现的各种锁,遵循JDK的锁操作的接口定义,使用上对程序员来说也是相当友好。基于这些特性,Redisson实现的分布式锁还是比较优秀的。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?