假设有这样一个场景,在一个购票软件上买一张票,但是此时剩余票数只有一张或几张,这个时候有几十个人都在同时使用这个软件购票。在不考虑任何影响下,正常的逻辑是首先判断当前是否还有剩余的票,如果有,那么就进行购买并扣减库存数,否则就会提示票数不足,购买失败。伪代码如下:void buyTicket() {
intstockNum = byTicketMapper.selectStockNum();if(stockNum>0){//TODO 买票流程....byTicketMapper.reduceStock();// 扣减库存}else{ log.info("=====>票卖完了<====");}
}
复制代码这段代码在逻辑上没有问题,但是在并发场景下,可能会存在一个严重的问题。当剩余票数为1时,有A,B两个用户同时点击了购买按钮,A用户通过了库存大于0的校验并开始执行购票逻辑,但是由于一些原因造成A用户的购票线程有短暂的阻塞。而在这个阻塞的过程中,用户B发起了购买请求,并且也通过了库存大于0的校验,直到整个购买流程执行完成并且扣减了库存。那么这个时候剩余库存刚好为0,不会再有用户发起购买请求,这时用户A的购买请求阻塞被唤醒,因为在此之前已经校验过库存大于0,所以执行完购买流程后,库存还会被扣减一次。那么此时的库存为-1,这就是常听到的超卖问题。
为了避免这个问题,我们可以通过加锁了方式,来保证并发的安全性。像JVM提供的内置锁synchronized,JUC提供的重入锁ReentrantLock,但是这两种锁只能保证单机环境下并发安全问题,一般在实际工作中很少会部署单节点的项目,通常都是多节点集群部署,这两个锁就失去了意义。这个时候就可以借助redis来实现分布式锁。setnx在集群部署的情况下,通常使用redis来实现分布式锁。其中redis提供了setnx命令,标识只有key不存在时才能设值成功,从而达到加锁的效果。下面通过redis来改造上述的代码,其方式是购票线程首先获取锁,如果获取锁成功,那么继续执行购票业务流程,直到所有流程执行完成并扣减库存后,最终在释放锁。如果获取锁失败,那么就给出一个友好的系统提示。void buyTicket() {
//获取锁Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","1");if(lock) { int stockNum = byTicketMapper.selectStockNum();if(stockNum>0){//TODO 买票流程.... byTicketMapper.reduceStock();//扣减库存 }else{ log.info("=====>票卖完了<===="); }//释放锁 redisTemplate.delete("lock");}else{ log.info("=====>系统繁忙,请稍后!<====");}
}
复制代码问题1:死锁问题通过上面的一顿梭哈,你以为这样就可以了吗,其实不然。设想一下,如果线程A在获取锁成功后,在执行购票的逻辑中出现了异常,那么这个时候就会造成锁得不到释放,其他线程始终获取不到锁,这就造成严重的死锁问题。为了避免死锁问题的出现,我们可以对异常进行捕获,在finally中去释放锁,这样不管业务执行成功或失败,最后都会去释放锁。void buyTicket() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","1");if(lock) {try{intstockNum = byTicketMapper.selectStockNum();if(stockNum >0) {//TODO 买票流程....byTicketMapper.reduceStock();// 扣减库存}else{ log.info("=====>票卖完了<===="); } }finally { redisTemplate.delete("lock");// 释放锁}}else{ log.info("=====>系统繁忙,请稍后!<====");}
}
复制代码你以为这就结束了吗?死锁就不会发生了吗?如果你认为这样就能避免死锁的发生,那你就太不细心啦。如果在程序刚想像执行释放锁的逻辑时,redis服务突然宕机了,那么这时锁释放就失败了。在将redis服务重启后,加锁的数据又被恢复了,这样又出现了死锁的现象。为了避免这个问题,可以为锁设置一个过期时间,这样即使redis重启恢复数据后,也会很快的过期掉。不过需要注意的是,在设置锁的过期时间时,一定要保证原子性操作,不然还是会出现死锁问题。
//不是原子操作,会出现死锁问题
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1");
//如果刚要执行该语句时,redis宕机了。上面的锁无法释放
redisTemplate.expire("lock",Duration.ofSeconds(5L));
//原子操作
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1", Duration.ofSeconds(5L));
复制代码问题2:锁被其他线程释放问题经过上面的又一顿梭哈,死锁的问题可以避免了,这样在高并发的情况下就能安全的执行了吗。如果锁的过期时间设置了5秒,当A线程发起购票请求并获取到了锁,但是A线程在执行购票流程时花费了6秒,此时线程A的锁已经过期。这时线程B重新获取了锁并且也开始执行购票流程,但是A线程要比B线程执行的要快,当A线程释放锁时,问题就出现了。由于A线程执行的过程锁已经过期了,那么在执行释放锁的流程时,最终被释放的是线程B的锁,这就导致B的锁被A线程释放问题。
对于这个现象,可以给每个锁设置一个唯一标识,比如像UUID,线程ID。在释放锁时,校验一下这个锁的标识是否为需要删除的锁,如果是,在进行锁的释放。public void buyTicket() {
String uuid =UUID.randomUUID().toString();// 为锁设置一个唯一标识Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid, Duration.ofSeconds(5L));if(lock) {try{intstockNum = byTicketMapper.selectStockNum();if(stockNum >0) {//TODO 买票流程....byTicketMapper.reduceStock();// 扣减库存}else{ log.info("=====>票卖完了<===="); } }finally { String lockValue = redisTemplate.opsForValue().get("lock");if(lockValue.equals(uuid)){//校验标识,通过则释放锁redisTemplate.delete("lock"); } }}else{ log.info("=====>系统繁忙,请稍后!<====");}
}
复制代码问题3:锁续期问题使用setnx命令做分布式锁时,无法避免的一个问题就是:线程尚未执行完成,但是锁已经过期。在解决锁被其他线程误删的代码中,并不是100%能解决的,问题点在于下面这段代码。如果线程A已经执行到了if语句并且通过了判断,当刚要执行释放锁的逻辑时,线程A的锁过期了并且线程B重新获取到了锁,那么线程A在释放锁时,释放的是B的锁。为了完全能够解决这个问题,可以采用锁续期的方式,其实现方式是单独开一个线程用来定时监听线程的锁是否还被持有,如果还持有,那么就给这把锁增加一些过期时间,这样就不会出现上述问题了。目前市面上已经为我们提供了锁自动续期的中间件,比如redisson String lockValue = redisTemplate.opsForValue().get("lock");
if(lockValue.equals(uuid)){ // 线程A的锁过期
redisTemplate.delete("lock");//线程A删除了线程B的锁
}
复制代码Redissonredisson一般使用最多的场景就是分布式锁了,它不仅保证了并发场景下线程安全的问题,也解决了锁续期的问题。使用方式也比较简单,以3.5.7版本为例,首先需要配置redisson信息,根据自己的redis集群模式自由选择配置。在配置完成后,再来改造上面的购票方法。 @Bean
public RedissonClient redissonClient() { Config config = new Config();//单机配置 config.useSingleServer().setAddress("redis://127.0.0.1:3306").setDatabase(0);//主从配置//config.useMasterSlaveServers().setMasterAddress("").addSlaveAddress("","");//哨兵配置//config.useSentinelServers().addSentinelAddress("").setMasterName("");//Cluster配置//config.useClusterServers().addNodeAddress(""); return Redisson.create(config);}
复制代码对于redisson使用起来也非常简单,通过getLock方法获取到RLock对象。通过RLock的tryLock或lock方法来进行加锁(底层都是通过Lua脚本来实现的)。当获取到锁并且扣减库存后,可以使用unlock方法进行锁释放。void buyTicket() {
RLock lock = redissonClient.getLock("lock");if(lock.tryLock()) {// 获取锁try{intstockNum = byTicketMapper.selectStockNum();if(stockNum >0) {//TODO 买票流程....byTicketMapper.reduceStock();// 扣减库存}else{ log.info("=====>票卖完了<===="); } } finally { lock.unlock();//释放锁}}else{ log.info("=====>系统繁忙,请稍后!<====");}
}
复制代码Watch Dog机制那redisson是如何做到锁续期的呢?其实在redisson内部有一个看watch dog机制(看门狗机制),但是看门狗机制并不是在加锁时就能启动的。需要注意的是在加锁时,如果使用tryLock(long t1,long t2, TimeUnit unit)或lock(long t1,long t2, TimeUnit unit)方法并且将t2参数值设为了一个不为-1的值,那么看门口将无法生效。看门狗在启动后会监听主线程还在执行,如果还在执行那么将会通过Lua脚本每10秒给锁续期30秒。watchlog的延时时间默认为30秒,这个值可以在配置config时自己定义。private RFuture tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
if(leaseTime != -1L) {// 如果leaseTime不是-1,那么将无法使用看门狗return this.tryLockInnerAsync(leaseTime,unit,threadId, RedisCommands.EVAL_NULL_BOOLEAN);}else{ RFuture ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.addListener(newFutureListener(){ public void operationComplete(Futurefuture)throws Exception {if(future.isSuccess()) { Boolean ttlRemaining = (Boolean)future.getNow();if(ttlRemaining) {// 看门口机制RedissonLock.this.scheduleExpirationRenewal(threadId); } } } }); return ttlRemainingFuture;}
}
复制代码private long lockWatchdogTimeout = 30000L; //默认30秒
复制代码private void scheduleExpirationRenewal(final long threadId) {
if(!expirationRenewalMap.containsKey(this.getEntryName())) {// 每10秒执行续期Timeout task = this.commandExecutor.getConnectionManager().newTimeout(newTimerTask(){ public void run(Timeout timeout) throws Exception {// 通过LUA脚本为锁续期RFuture future =RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",Collections.singletonList(RedissonLock.this.getName()),newObject[]{RedissonLock.this.internalLockLeaseTime,RedissonLock.this.getLockName(threadId)}); future.addListener(newFutureListener(){ public void operationComplete(Futurefuture)throws Exception {RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());if(!future.isSuccess()) {RedissonLock.log.error("Can't update lock "+RedissonLock.this.getName()+" expiration", future.cause()); }else{if((Boolean)future.getNow()) {RedissonLock.this.scheduleExpirationRenewal(threadId); } } } }); } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);// 每10秒执行一次if(expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) { task.cancel(); }}
}
复制代码问题4:主从切换导致锁丢失问题虽然redisson帮助我们解决了锁续期的问题,但是在redis集群架构中,由于主从复制具有一定的延时,那么在极端情况下就会出现这样一个问题:当一个线程获取锁成功,并且成功向主节点保存了锁信息,当主节点还未像从节点同步锁信息时,主节点宕机了,那么当发生故障转移从节点切换为主节点时,线程加的锁就丢失了。为了解决这个问题,redis引入了红锁RedLock,RedLock与大多数中间件的选举机制类似,采用过半的方式来决定操作成功还是不成功。RedLock加锁RedLock在工作中,并不接受redis的集群架构,无论是主从,哨兵还是Cluster。每台redis服务都是独立的,都是一台独立的Master节点。在加锁的过程中,RedLock会记录开始加锁时的时间以及加锁成功后的时间,这两个时间差就是一台机器加锁成功所需要的时间。比如启动了5个redis服务,线程A设置锁的超时时间为5秒,当像第一台redis服务加锁成功后花费了1秒,像第二台服务加锁成功后也花费了一秒。这个时候加到第二台机器时,已经花费了两秒的时间,但是加锁数并未过半,还需要加锁一台才能完全算加锁成功,这个时候第三台机器加锁成功又花费了1秒。那么总的加锁时间就是3秒,锁的实际过期时间就为2秒。特别需要注意的是,在向redis服务建立网络连接时,要设置一个超时时间,避免redis服务宕机时,客户端还在傻傻的等待回应,这里超时时间官方给到建议是5-50毫秒之间,当连接超时时,客户端会继续向下一个节点发起连接。https://b23.tv/hC965Wb
https://b23.tv/qdQ0al5
https://b23.tv/yDrYqsl
https://b23.tv/jkZFwKJ
https://b23.tv/QRnEkSu
https://b23.tv/yUpJXh7
https://b23.tv/w6Ouu6j
https://b23.tv/b9XfpAA
加锁失败如果因为某些原因,获取锁失败(加锁没有超半数或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁,即便某些Redis实例根本就没有加锁成功。失败重试在并发场景下,RedLock会出现这样一个问题,比如有三个线程同时去获取了同一张票的锁,此时A线程已经成功给redis-1和reids-2加上了锁,线程B已经成功给redis-3,reids-4加上了锁,线程C成功的给reids-5加上了锁,这个时候三个线程再去加锁时,没有机器可加了,发现加锁成功数都未过半,那么就导致客户端始终获取不到锁。