众所周知,在单体服务中处理高并发场景时,可以使用 java 提供的两种内置的锁来保证数据的一致性。例如:
但是,当你的应用涉及到多机、多进程共同完成时,例如现在的互联网分布式架构,存在多个 Server 的情况下,由于负载均衡的路由规则随机,相同的请求可能会打到不同的 Server 上进行处理,那么这时候就需要一个第三方系统的全局锁来实现多个线程 (不同的进程) 之间的同步。这个第三方系统系统需要满足高可用、一致性比较强同时能应付高并发的请求。
业界目前的第三方系统的实现方案有数据库锁、Redis 分布式锁和 ZooKeeper 分布式锁,各个方案各有优缺点。其中 Redis 的使用范围最广,相关文章也最杂乱,随着 Redis 的不断演化,其中不少文章已经不具备时效性了。所以本文将重点梳理 Redis 分布式锁的演化过程,让你从头到尾的了解 Redis 分布式锁。
setNX 是 Redis 提供的一个原子操作,如果指定 key 存在,那么 setNX 失败,如果不存在会进行 Set 操作并返回成功。
if (setnx(key, 1) == 1){
expire(key, 30);
try {
//TODO 业务逻辑
} finally {
del(key);
}
}
步骤解释:
1、setnx(lockkey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功
2、expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。
3、执行完业务代码后,可以通过 delete 命令删除 key。
存在问题:
由于获取锁和给锁设置超时时间是两步操作,如果获取锁成功后应用异常或者重启,锁将无法过期。
Redis 从 2.6.12 起,SET 涵盖了 SETEX 的功能, SET 本身又包含了设置过期时间的功能,所以使用 SET 就可以解决获取锁和设置锁超时时间两条命令无法保证原子性的问题。
if (set(key, value, 30)){
try {
//TODO 业务逻辑
} finally {
del(key);
}
}
存在问题:
如果 A 线程拿到锁后,业务逻辑的执行超过 30 秒,A 线程的锁会自动释放。此时 B 线程获取到了锁,开始执行业务逻辑,但是 A 线程中继续执行了删除锁的操作,此时会出现 A 线程误删除了 B 线程的锁。
引入通用唯一识别码 UUID 来标识锁的拥有者,保证加锁解锁都是同一个线程。
String uuid = UUID.randomUUID().toString();
if (set(key, uuid, 30)){
try {
//TODO 业务逻辑
} finally {
if(get(key)==uuid){
del(key);
}
}
}
存在问题:
如果 A 线程正好已经判断完上锁的是当前线程,但是在删除锁之前正好 A 线程的锁过期导致锁自动释放了,此时 B 线程加锁成功,依然会出现 A 线程误删 B 线程锁的问题。根本原因就是判断是否为当前锁和删除锁是两个步骤,导致删除锁不具备原子性。
在 eval 命令执行 Lua 代码的时候,Lua 代码将被当成一个命令去执行,并且直到 eval 命令执行完成,Redis 才会执行其他命令。所以引入 Lua 脚本就可以保证删除锁的原子性。
String uuid = UUID.randomUUID().toString();
if (set(key, uuid, 30)){
try {
//TODO 业务逻辑
} finally {
EVAL (
//LuaScript
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0 end
)
}
}
存在问题:
如果在 Redis 集群环境下,由于 Redis 集群数据同步为异步,当主节点挂掉时,从节点会取而代之,但客户端无明显感知。当客户端 A 的线程 A 成功加锁,指令还未同步到从节点,此时如果主节点挂掉,从节点提升为主节点,新的主节点没有锁的数据,当客户端 B 的线程 B 加锁时就会成功加锁。
为了避免上述情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。
RLock lock = redisson.getLock("foobar"); // 1.获得锁对象实例
lock.lock(); // 2.获取分布式锁
try {
//TODO 业务逻辑
} finally {
lock.unlock(); // 3.释放锁
}
步骤解释:
@Override
public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(),name);
}
其中的 RedissonLock 实例的创建过程源码:
public RedissonLock(CommandAsyncExecutor commandExecutor,String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = this.id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}
源码解读:
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException){
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
其中 unlockAsync() 方法的源码:
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
this.cancelExpirationRenewal(threadId);
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new
IllegalMonitorStateException("attempt to unlock lock, not locked by current
thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
this.cancelExpirationRenewal(threadId);
result.trySuccess((Object)null);
}
});
return result;
}
源码解读:
public void lock() {
try {
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
其中 lock 方法的源码:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
this.getEntry(threadId).getLatch().acquire();
} else {
this.getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
源码解读:
通过以上的源码解读,大家对于 Redisson 的功能强大应该有了初步了解。Redisson 除了上面列出的基本的可重入锁之外,还提供了公平锁、联锁、红锁、读写锁、信号量等多种锁的方式,感兴趣的同学可以前往 Redisson 的 github 查看研究,希望本片内容可以帮助你在工作中更合理的选择和正确的使用分布式锁。