分布式锁能够实现以下两种功能
a、提高效率,避免重复计算。比如多节点同时执行一个批量任务。如果一个节点已经在执行某个任务,其他节点就没必要重复执行这个任务。这时允许存在少量的重复计算,也就是说允许存在偶尔的失败。
b、保证正确性。比如两个客户购买同一件商品,如果一个客户购买了,其他客户就不能购买。这种情况对分布式锁的要求很高,如果重复计算,会对业务的正确性产生影响。也就是不允许失败。
使用redis实现分布式锁需要注意以下两点:
a、加锁和解锁的实现,必须保证是同一把锁。常见的解决方案是:给锁设置唯一ID,加锁时生成,解锁时先判断,再解锁。
b、不能让一个资源永久被锁住。解决方案是给锁设置过期时间,如果加锁的节点宕机,在经过了过期时间之后,锁消失,资源自动释放。
以下提出了几种redis分布式锁的解决方案:
引入springboot redis的jar包:
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-data-redis</artifactId> 4 </dependency>
配置文件中加入redis相关配置:
spring:
redis:
host: 192.168.1.110
database: 2
port: 6380
timeout: 2000
password:
在需要加锁的操作前,使用
Boolean lockResult = stringRedisTemplate.opsForValue().setIfAbsent(key, value, time);
在finally代码块中,要释放这个锁:
if ((driverId + "").equals(stringRedisTemplate.opsForValue().get(lock.intern()))) { stringRedisTemplate.delete(lock.intern()); }
释放这个锁时,要先判断这个锁是否是自己的锁,以防止错误的释放了别的服务设置的锁,判断是否是自己的锁的依据是value,每个服务有一个特殊的value,比如:如果是滴滴司机抢一个订单,那这个value可以是司机的id。
注意:
a、key必须能够唯一表示某个资源;
b、value必须能够唯一确定资源竞争者,以防止释放锁的时候,释放了别人的锁;(为什么可能会存在释放别人锁的情况呢?当前服务设置了锁之后,在锁的过期时间之内,业务并没有完成,导致锁过期自动释放,下一个服务获取锁之后,业务才完成,此时,可能会释放下一个服务设置的锁)
c、加锁操作和给锁设置过期时间的操作必须保证原子性,以防止加锁成功,设置过期时间失败,导致锁无法释放;
缺点:
a、单点问题。单一redis,对redis的可用性要求很高,一旦redis发生宕机,则整个服务不可用;
b、当因为某种异常情况(比如JVM的DC过程,或者网络抖动),导致业务处理时间超过锁的过期时间,会产生业务还未执行完成,锁就释放的情况,如果此时有其他服务来获取这个资源,会导致两个服务同时拥有这个资源的情况,导致业务可能会出现问题。这个问题的解决方案可以是:自己实现一个deamon线程任务,当业务执行时间超过设定的锁过期时间的三分之一的时候,判断业务是否完成,如果还没完成,就延长这个锁的过期时间,延长长度设置为原始的过期时间的长度。(redisson框架就是这样操作的。)
c、即使采用deamon线程的方案,也不能完全保证不出问题。如果上锁之后,服务端与redis服务器失联,导致续期失败,也会出现b的问题。
除了引入redis的jar包,还需要引入redisson的jar包:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.14.0</version> </dependency>
定义redisClient的bean:
@Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);return Redisson.create(config); }
在业务中先获取redisson锁:
String lock = "redisson_lock_orderid_3";
RLock lock1 = redissonClient.getLock(lock.intern());
在具体业务执行前,先给资源上锁:
lock1.lock();
在finally语句块中,释放锁:
lock1.unlock();
注意:
a、redisson默认的锁过期时间是30s,也可以指定锁的过期时间,但是方法并不是这样:
lock1.lock(10, TimeUnit.SECONDS);
(因为用这个方法,虽然锁的过期时间自定义成了10s,但是redisson将不会自动维护这个锁的TTL。)
而是需要在定义RedissonClient这个Bean的时候,配置lockWatchdogTimeout这个量:
@Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort).setDatabase(database); config.setLockWatchdogTimeout(8000); return Redisson.create(config); }
这里config.setLockWatchdogTimeout(8000);表示,将lock的过期时间expireTime设置为8s,而且watchDog会每过2s将这个key的TTL重新设置为8s(前提是这个锁还存在的情况下)。
redisson框架会自己维护当前锁的TTL,以防止业务执行的时间因为GC或者网络的原因异常增长,超过锁的过期时间。源码在RedissonLock.class中:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } } private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); this.renewExpiration(); } } private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { Long threadId = ent.getFirstThreadId(); if (threadId != null) { RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can‘t update lock " + RedissonLock.this.getName() + " expiration", e); RedissonLock.EXPIRATION_RENEWAL_MAP.remove(RedissonLock.this.getEntryName()); } else { if (res) { RedissonLock.this.renewExpiration(); } } }); } } } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } }
当经过了锁的过期时间的三分之一时,watchDog会检查当前锁是否还存在,如果还存在,就给当前锁的过期时间重新设置为初始值。(默认情况下,redisson锁的过期时间是30s,redis中,这个锁的TTL从30开始倒数。在这个锁的TTL为20s时,watchDog将这个锁的TTL设置为30,继续从30开始倒数。)
缺点:
a、单点问题。单一redis,对redis的可用性要求高,一旦redis宕机,则这个服务不可用。
b、无法完全保证不会出现“业务执行时间超过锁TTL的时间”这个问题。假设获取到锁之后,如果与redis失联,锁的TTL无法被延长。
定义多个redissonClient:
@Bean("redissonClient1") @Primary public RedissonClient redissonClient1() { Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6388").setDatabase(0); return Redisson.create(config); } @Bean("redissonClient2") public RedissonClient redissonClient2() { Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6399").setDatabase(0); return Redisson.create(config); } @Bean("redissonClient3") public RedissonClient redissonClient3() { Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6380").setDatabase(2); return Redisson.create(config); }
RLock lock1 = redissonClient1.getLock(lock.intern()); RLock lock2 = redissonClient2.getLock(lock.intern()); RLock lock3 = redissonClient3.getLock(lock.intern()); RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
然后执行业务前先上锁:
redLock.lock();
业务完成,在finally代码块中释放锁:
redLock.unlock();
a、获取当前时间;
b、依次获取N个节点的锁。每个节点获取锁的方法和单一redisson的方法一样,但这里有个细节,在每个节点获取锁的时候,设置的过期时间都不同,需要减去之前获取锁操作所花费的时间:
第一个节点,设置锁的过期时间是500ms,操作时间1ms;
第二个节点,设置锁的过期时间是499ms,操作时间2ms;
第三个节点,设置锁的过期时间是497ms······依次类推;
c、判断上锁是否成功:如果超过N/2 + 1个节点上锁成功,并且每个节点的锁过期时间都大于0,就说明成功获取到了锁,否则,获取锁失败。获取锁失败时,释放锁。
d、释放锁。对所有节点发出释放锁的指令,每个节点释放锁的逻辑和上边单一redisson的逻辑一致。为什么不仅仅对于加锁成功的节点发释放锁的指令,而是对所有节点都发?因为在某个节点上锁失败,不一点表示该节点上锁失败,有可能是因为网络延时导致操作超时,实际上锁成功了。
上边是红锁RedLock的运行流程,但是依然可能出现一些问题,尤其是在高并发情况下:
a、性能问题。分两方面:
一方面,如果节点比较多,挨个加锁,耗时可能会比较长,影响性能。解决办法是:每个节点加锁的操作可以是异步操作,可以同时向多个节点获取锁。
另一方面,被加锁的资源太大。加锁操作本身就是为了保证正确性而牺牲了并发,牺牲和资源大小成正比,这时可以考虑对资源进行拆分。
b、重试问题。当多个client共同竞争一个资源时,每个client都获取了部分锁,但是没有一个超过半数。这时候需要重试。且重试的时间要保证随机,以便让client重新获取锁的操作错开。虽然无法根治,但是可以有效缓解这个问题。
c、节点宕机问题。对于红锁RedLock,如果redis节点不做持久化,某个节点宕机重启了,可能导致多个client重复上锁问题:比如,有A、B、C、D、E五个节点,client1从A、B、C三个节点获取到了锁,这时C宕机重启,client2从C、D、E获取到了锁,这时就出现了两个client同时获取到锁的情况。解决方案三种:
延时启动。让运维配合,当redis节点宕机需要重启时,设置延时启动,延时的时长要大于所有锁的TTL。
总结,无。。。
参考信息:
https://blog.csdn.net/lpd_tech/article/details/104773257/
https://redis.io/topics/distlock
原文:https://www.cnblogs.com/youbing-top/p/14115295.html