移动测试开发 分布式锁的那些事

opentest-oper@360.cn · 2020年12月24日 · 1734 次阅读

场景升级为分布式所带来的问题

众所周知,在单体服务中处理高并发场景时,可以使用 java 提供的两种内置的锁来保证数据的一致性。例如:

  1. 使用 JVM 提供的 synchronized 关键字来实现对变量的同步访问以及用 wait 和 notify 来实现线程间通信。
  2. 使用 JDK 提供的 Lock 类,比如 ReentrantLock 提供的公平锁、非公平锁。

但是,当你的应用涉及到多机、多进程共同完成时,例如现在的互联网分布式架构,存在多个 Server 的情况下,由于负载均衡的路由规则随机,相同的请求可能会打到不同的 Server 上进行处理,那么这时候就需要一个第三方系统的全局锁来实现多个线程 (不同的进程) 之间的同步。这个第三方系统系统需要满足高可用、一致性比较强同时能应付高并发的请求。

业界目前的第三方系统的实现方案有数据库锁、Redis 分布式锁和 ZooKeeper 分布式锁,各个方案各有优缺点。其中 Redis 的使用范围最广,相关文章也最杂乱,随着 Redis 的不断演化,其中不少文章已经不具备时效性了。所以本文将重点梳理 Redis 分布式锁的演化过程,让你从头到尾的了解 Redis 分布式锁。

Redis 分布式锁的演化过程

1. 基于 setNX 实现

setNX 是 Redis 提供的一个原子操作,如果指定 key 存在,那么 setNX 失败,如果不存在会进行 Set 操作并返回成功。

if (setnx(key, 1) == 1){ 
    expire(key, 30);  
    try {        
        //TODO 业务逻辑   
        } finally {
            del(key);   
        }
 }

步骤解释:
1、setnx(lockkey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功
2、expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。
3、执行完业务代码后,可以通过 delete 命令删除 key。
存在问题:
由于获取锁和给锁设置超时时间是两步操作,如果获取锁成功后应用异常或者重启,锁将无法过期。

2. 基于 set 实现

Redis 从 2.6.12 起,SET 涵盖了 SETEX 的功能, SET 本身又包含了设置过期时间的功能,所以使用 SET 就可以解决获取锁和设置锁超时时间两条命令无法保证原子性的问题。

if (set(key, value, 30)){ 
    try {        
        //TODO 业务逻辑   
        } finally {
            del(key);   
        }
 }

存在问题:
如果 A 线程拿到锁后,业务逻辑的执行超过 30 秒,A 线程的锁会自动释放。此时 B 线程获取到了锁,开始执行业务逻辑,但是 A 线程中继续执行了删除锁的操作,此时会出现 A 线程误删除了 B 线程的锁。

3. 基于 set 实现 - 引入 UUID

引入通用唯一识别码 UUID 来标识锁的拥有者,保证加锁解锁都是同一个线程。

String uuid = UUID.randomUUID().toString();
if (set(key, uuid, 30)){ 
    try {        
        //TODO 业务逻辑   
        } finally {
            if(get(key)==uuid){
                del(key);   
            }   
        }
 }

存在问题:
如果 A 线程正好已经判断完上锁的是当前线程,但是在删除锁之前正好 A 线程的锁过期导致锁自动释放了,此时 B 线程加锁成功,依然会出现 A 线程误删 B 线程锁的问题。根本原因就是判断是否为当前锁和删除锁是两个步骤,导致删除锁不具备原子性。

4. 基于 set 实现 - 引入 Lua 脚本

在 eval 命令执行 Lua 代码的时候,Lua 代码将被当成一个命令去执行,并且直到 eval 命令执行完成,Redis 才会执行其他命令。所以引入 Lua 脚本就可以保证删除锁的原子性。

String uuid = UUID.randomUUID().toString();
if (set(key, uuid, 30)){ 
    try {        
        //TODO 业务逻辑   
        } finally {
            EVAL ( 
            //LuaScript 
            if redis.call("get",KEYS[1]) == ARGV[1] then
                return redis.call("del",KEYS[1]) 
            else 
                return 0 end
            ) 
        }
 }

存在问题:
如果在 Redis 集群环境下,由于 Redis 集群数据同步为异步,当主节点挂掉时,从节点会取而代之,但客户端无明显感知。当客户端 A 的线程 A 成功加锁,指令还未同步到从节点,此时如果主节点挂掉,从节点提升为主节点,新的主节点没有锁的数据,当客户端 B 的线程 B 加锁时就会成功加锁。

5. 基于 Redisson 实现

为了避免上述情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。

RLock lock = redisson.getLock("foobar"); // 1.获得锁对象实例
lock.lock(); // 2.获取分布式锁
try {
     //TODO 业务逻辑 
} finally {
    lock.unlock(); // 3.释放锁
}

步骤解释:

  1. 通过 RedissonClient 的 getLock() 方法取得一个 RLock 实例。
  2. lock() 方法尝试获取锁,如果成功获得锁,则继续往下执行,否则等待锁被释放,然后再继续尝试获取锁,直到成功获得锁。
  3. unlock() 方法释放获得的锁,并通知等待的节点锁已释放。 这时会有细心的同学发现了,之前的写法依次增加了锁超时时间,加锁和设置超时的原子化,解锁的防误解锁和解锁的原子化操作,但是 Redisson 加解锁的写法从表面上是看不出来是否有这些特性。接下来我们来剖析一下 RedissonLock 中的源码实现(基于 redisson-spring-boot-starter 的 3.11.4 版本):

- getLock() 方法的源码

@Override
public RLock getLock(String name) {
  return new RedissonLock(this.connectionManager.getCommandExecutor(),name);
}

其中的 RedissonLock 实例的创建过程源码:

public RedissonLock(CommandAsyncExecutor commandExecutor,String name) {    
    super(commandExecutor, name);    
    this.commandExecutor = commandExecutor;    
    this.id = commandExecutor.getConnectionManager().getId();        
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();    
    this.entryName = this.id + ":" + name;    
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}

源码解读:

  1. commandExecutor 是与 Redis 节点通信并发送指令的真正实现,底层就是通过 eval 命令来执行 Lua 脚本
  2. id 就是全局 UUID
  3. internalLockLeaseTime,锁释放时间,就是默认的看门狗的超时时间 30 秒 通过以上三点,getLock() 方法可以实现锁超时时间,加锁和设置超时的原子化。

- unlock() 方法源码:

public void unlock() {    
    try {                                                                                   
        this.get(this.unlockAsync(Thread.currentThread().getId()));    
    } catch (RedisException var2) {        
        if (var2.getCause() instanceof IllegalMonitorStateException){            
            throw (IllegalMonitorStateException)var2.getCause();        
        } else {            
            throw var2;        
        }    
     }
 }

其中 unlockAsync() 方法的源码:

public RFuture<Void> unlockAsync(long threadId) {    
    RPromise<Void> result = new RedissonPromise();    
    RFuture<Boolean> future = this.unlockInnerAsync(threadId);    
    future.onComplete((opStatus, e) -> {        
        if (e != null) {            
            this.cancelExpirationRenewal(threadId);            
            result.tryFailure(e);        
        } else if (opStatus == null) {            
            IllegalMonitorStateException cause = new 
IllegalMonitorStateException("attempt to unlock lock, not locked by current 
thread by node id: " + this.id + " thread-id: " + threadId);            
            result.tryFailure(cause);        
        } else {            
            this.cancelExpirationRenewal(threadId);            
            result.trySuccess((Object)null);       
        }    
    });    
    return result;
}

源码解读:

  1. unlockInnerAsync() 方法其实就是通过 EVAL 和 Lua 脚本执行 Redis 命令释放锁
  2. 如果是非锁的持有者释放锁时抛出异常
  3. cancelExpirationRenewal() 方法,释放锁后取消刷新锁失效时间的调度任务 通过以上三点,unlock() 方法可以实现解锁的防误解锁和解锁的原子化操作。

-lock() 方法的源码

public void lock() {    
    try {        
        this.lock(-1L, (TimeUnit)null, false);    
    } catch (InterruptedException var2) {       
        throw new IllegalStateException();   
    }
}

其中 lock 方法的源码:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {    
    long threadId = Thread.currentThread().getId();    
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);    
    if (ttl != null) {        
        RFuture<RedissonLockEntry> future = this.subscribe(threadId);        
        this.commandExecutor.syncSubscription(future);       
        try {            
            while(true) {                
                ttl = this.tryAcquire(leaseTime, unit, threadId);                
                if (ttl == null) {                    
                    return;                
                }                
                if (ttl >= 0L) {                    
                    try {                        
                        this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                    
                    } catch (InterruptedException var13) {                        
                        if (interruptibly) {                            
                            throw var13;                        
                        }                        
                        this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                    
                    }                
                } else if (interruptibly) {                    
                    this.getEntry(threadId).getLatch().acquire();                
                } else {                   
                    this.getEntry(threadId).getLatch().acquireUninterruptibly();                
                }           
            }        
        } finally {            
            this.unsubscribe(future, threadId);        
        }    
    }
}

源码解读:

  1. 使用 tryAcquire() 方法尝试获取锁,返回的是已存在的锁的剩余存活时间,为 null 则说明没有已存在的锁并成功获得锁。
  2. 如果获得锁成功,无需进行逻辑处理,继续执行用户的业务逻辑。
  3. 如果没有获得锁,则需等待锁被释放,并通过 Redis 的 channel 订阅锁释放的消息,这里的具体实现本文也不深入,只是简单提一下 Redisson 在执行 Redis 命令时提供了同步和异步的两种实现,但实际上同步的实现都是基于异步的,具体做法是使用 Netty 中的异步工具 Future 和 FutureListener 结合 JDK 中的 CountDownLatch 一起实现。
  4. 订阅锁的释放消息成功后,进入一个不断重试获取锁的循环,循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。
  5. 如果在重试中拿到了锁,则结束循环,跳过第 6 步。
  6. 如果锁当前是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 并发的信号量工具 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。
  7. 在成功获得锁后,就没必要继续订阅锁的释放消息了,因此要取消对 Redis 上相应 channel 的订阅。

通过以上的源码解读,大家对于 Redisson 的功能强大应该有了初步了解。Redisson 除了上面列出的基本的可重入锁之外,还提供了公平锁、联锁、红锁、读写锁、信号量等多种锁的方式,感兴趣的同学可以前往 Redisson 的 github 查看研究,希望本片内容可以帮助你在工作中更合理的选择和正确的使用分布式锁。

参考文章:
Redis 分布式锁进化史
Redisson 分布式锁实现分析
Redisson 的 github

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册