redis专题:使用redis实现分布式锁

    技术2023-10-03  72

    文章目录

    1. synchronized解决单应用下并发安全问题2. 手写redis分布式锁解决分布式环境下并发安全问题3. redisson分布式锁解决分布式环境下并发安全问题3.1 redission的lock和unlock方法源码 4. redisson的其他的分布式锁

            redis由于操作的是内存空间,访问速度比磁盘io(数据库)高很多,所以当项目中有较高的访问量时,热门数据一般会存放在redis中,这样可以有效减轻数据库的压力。但是在享受redis高性能的同时,也会存在各种缓存失效、并发安全、数据不一致等问题!

    1. synchronized解决单应用下并发安全问题

            说到多线程并发问题,根据以往经验,首先想到的应该是加锁保证并发安全。在单体项目架构中,确实是这样的,以减库存为例,操作如下

    ①:从redis中获取库存数量 ②:如果库存大于0,执行减库存。如果库存小于0,给出提示 ③:将减少后剩余的库存写入redis中

    在上面三个步骤的执行过程中,如果多线程并发执行,那将会有数据不一致的问题。

    比如线程1 和 线程2同时获取到库存数量为50,两个线程各自在自己的工作空间内减库存,并写入redis。这样一来,本来应该剩余48个库存,实际上redis的库存量却为49个,这样就会因数据不一致引发了超卖问题!

    解决方案:使用synchronized代码块,或者在方法上加synchronized

    代码示例:

    synchronized (this) { //获取库存 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { //减库存 int realStock = stock - 1; //更新缓存 stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } }

    注意:

    使用synchronized代码块,或者在方法上加synchronized,都属于本地锁,只针对单个应用起作用,锁的对象是this,因为springboot默认单例,不会产生多个实例,如果在分布式环境下,因为锁的不是单个实例,所以本地锁不起作用!!!

    查到的数据一定要在synchronized代码块内放入redis,“查库存-减库存-更新缓存” 要保证原子操作

    2. 手写redis分布式锁解决分布式环境下并发安全问题

            分布式环境下因为有多个应用实例,比如使用了8个商品微服务,用来做负载均衡。本地锁synchronized只能锁住当前服务下的对象,不能锁住8个对象,会产生缓存失效问题。

    多应用情况下可以考虑使用redis的set NX PX来实现锁的功能!

    NX : 如果当前key有值,则无法修改(修改失败返回null,成功返回ok) PX: 设置当前key的过期时间(毫秒) EX: 设置当前key的过期时间(秒)

    步骤如下:

    boolean ok = set key value nx if(ok){ 获取库存... 减少库存... 更新库存... } finally{ 删除key }

    但还可能存在以下问题:

    问题一:如果执行业务代码执行抛出异常,没人删除锁,就会造成死锁!

    解决:设置锁的过期时间,set NX EX 设置分布式锁 和 设置过期时间一定要保证原子操作

    问题二:如果业务代码超长(45秒),执行时间超过了锁的自动过期时间(30秒),当前主线程执行完业务代码后,删除锁时把别的线程的锁删除了,怎么办?

    解决: ①:设置分布式锁(占坑)时,给value加上一个uuid,删除锁时比较value,如果相等才执行删除,保证每个线程只能删除自己的锁! ②:使用定时器给锁自动续命。主线程在执行任务时,开辟出一个子线程执行定时任务,每隔一定时间检测当前主线程的锁是否还存在,如果存在,再续30秒时间。如果不在,则结束子线程!

    问题三:执行删除锁前,当前线程要从redis先获取到key的值,与自己的uuid比较,在这个从redis获取到key的值网络交互的过程中,是要消耗一定时间的。比如在获取到自己的key值返回的过程中,刚巧redis中自己的key过期了,这样,执行删除时还是把别人的key给删了!怎么办?

    解决:使用redis提供的lua脚本,这个问题和第一条问题类似,都是需要保证原子性!

    //redis 官网提供的解锁lua脚本 String lua = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

    最终代码示例如下:

    //从数据库中获取首页数据(分布式锁) public Map<String, List<Catelog2Vo>> getCatelogJsonFromDbWithRedisLock() { /**1 多个线程先去占分布式锁 ,去redis中占个坑 相当于 set nx (key :lock, value:1111) *(1)设置过期时间,防止业务逻辑出现异常,锁没人删除,出现死锁! *(2)设置uuid,保证每个线程都删除的是自己的锁 */ String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); if (lock) { try { //加锁成功 ..执行业务逻辑 获取库存... 减少库存... 更新库存... } finally { /**执行完先比较value 再释放锁 * 弃用原因:redis获取到key的值网络交互的过程中,是要消耗一定时间的。 * 比如在获取到自己的key值返回的过程中,刚巧redis中自己的key过期了,这样,执行删除时还是把别人的key给删了! String lock1 = redisTemplate.opsForValue().get("lock"); if (uuid.equals(lock1)){ //比较,相等时才删除自己的锁,防止误删别人的 redisTemplate.delete("lock"); } */ //保证解锁的原子性:使用redis 官网提供的解锁lua脚本 String lua = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; //调用redisTemplate提供的调用脚本方法!execute //Integer:返回值类型 成功返回1 失败返回0 //如果uuid和redis中的值一样才删除 redisTemplate.execute(new DefaultRedisScript<Integer>(lua, Integer.class), Arrays.asList("lock"), uuid); } return dataFromDb; } else { //加锁失败,lock已经有值,被别的线程占用,重试,自旋 try { Thread.sleep(100); //睡眠100毫秒,防止线程溢出 return getCatelogJsonFromDbWithRedisLock(); //自旋 } catch (InterruptedException e) { e.printStackTrace(); } } return null; }

    3. redisson分布式锁解决分布式环境下并发安全问题

    redisson完全实现了juc的功能,不仅有锁,还都是分布式锁! 问题1:redisson框架的实现原理,为什么会满足分布式锁的要求?

            原理图如上,虽然redisson可以保证大部分时候分布式锁的安全性,但是有些特殊情况,还是会出现并发安全问题!比如哨兵架构下:redis的主节点master 在向slave节点同步数据时,主节点挂掉了,剩余的slave节点通过哨兵选举出了一个新的master节点。此时旧的master节点中setnx的key还没有同步到新的master节点,新的master节点中由于已经对外提供服务,其他线程也会由setnx创建key,这样就存在多个setnx的key,此时redis的setnx失效!可能会出现并发安全(超卖)等问题!

    解决方案: ①:如果必须保证强一致性,可以使用zookeeper实现分布式锁,但zookeeper没有redis的性能高。需要权衡利弊来使用,其实大部分情况下。可以容忍一些极小概率才可能出险的问题,如果出现,可以使用日志记录下来并通过人工客服协助解决!

    ②:如果还是想用redis解决这个问题,可以使用红锁redlock。redlock规定使用setnx命令时不再只向一个master节点发送命令,而是向所有节点发送,当超过半数以上节点加锁成功时才算加锁成功!但是不建议使用redis的redlock,最主要的原因还是性能问题!以前只需要与一台redis交互,现在变为多台,性能下降!

    代码示例:

    String lockKey = "product_001"; RLock redissonLock = redisson.getLock(lockKey); try { //加锁 redissonLock.lock(); // setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS) int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { //释放锁 redissonLock.unlock(); }

    3.1 redission的lock和unlock方法源码

    lock:redisson在调用lock方法时,会使用lua脚本尝试获取锁。 ①:如果获取到锁,则以hash类型来存储分布式锁,默认30秒过期,并绑定当前线程。然后创建一个看门狗监听器,每隔10秒监听当前线程的锁是否过期,如果业务时间超长,10秒后锁未过期,看门狗就会再次为当前线程的锁续期30秒!

    ②:如果通过比对线程发现锁发生了重入,就使用hincrby命令进行自增。这个步骤和ReentranLock类似!

    ③:如果没有获取到锁,则返回锁的过期时间,用于看门狗判断!

    lock的Lua脚本如下:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

    unlock:redisson在调用unlock方法时,也会使用lua脚本尝试解锁 unlock会使用del命令删除锁,如果是可重入锁,则一层一层删除!

    unlock的Lua脚本如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }

    问题:lock.lock() 和 lock.lock(10, TimeUnit.SECONDS)两种加锁方式有什么区别? 答:首先如果我们自己设置了过期时间,就不会调用看门狗机制,不会给锁自动续期,当业务逻辑时间大于锁的超时时间时,会抛出异常!

    1.如果我们传递了锁的超时时间,就会发送redis执行脚本,进行占锁,超时时间就是我们自己设置的时间,没有定时任务续期机制!

    2.如果我们未指定锁的超时时间,就默认使用30s【lockWatchdogTimeout看门狗的默认时间】,只要占锁成功,就会启动一个定时任务,重新给锁设置30秒过期时间,定时任务每隔10秒执行一次,为看门狗时间的1/3

    4. redisson的其他的分布式锁

    4.1 读写锁 读写锁的作用是什么? 答:保证一定能读到最新的数据,写锁是一个排它锁(互斥锁),读锁是一个共享锁。当正常读的时候不用等待,如果写数据时,所有读锁必须等待写锁释放,才能读到数据!

    读数据过程中,写数据用不用等待? 答:用,只要有写锁参与,就必须等待,高并发读时,无需等待,相当于没有加锁

    /** 读写锁的作用是什么? 答:保证一定能读到最新的数据,写锁是一个排它锁(互斥锁),读锁是一个共享锁。 当正常读的时候不用等待,如果写数据时,所有读锁必须等待写锁释放,才能读到数据! */ @GetMapping("write") @ResponseBody public String writeLock(){ //获取读写锁rw_lock 互斥锁 RReadWriteLock aaa = redissonClient.getReadWriteLock("rw_lock"); //获取写锁 RLock lock = aaa.writeLock(); //加写锁 lock.lock(); String s= UUID.randomUUID().toString(); try { //业务代码 redisTemplate.opsForValue().set("aaa",s); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }finally { //业务代码执行完,释放写锁 lock.unlock(); } return s; } @GetMapping("read") @ResponseBody public String readLock(){ //获取读写锁rw_lock 共享锁 RReadWriteLock aaa = redissonClient.getReadWriteLock("rw_lock"); //获取读锁 RLock lock = aaa.readLock(); //加读锁 lock.lock(); String s = ""; try { //业务代码 s = (String) redisTemplate.opsForValue().get("aaa"); } catch (Exception e) { e.printStackTrace(); }finally { //业务代码执行完,释放读锁 lock.unlock(); } return s; }

    6.2 闭锁 // 场景:班级里5个人全部走完,才能锁门

    //分布式闭锁, // 场景:班级里5个人全部走完,才能锁门 @GetMapping("/lockdoor") @ResponseBody public String lockDoor() throws Exception{ //获取闭锁 RCountDownLatch door = redissonClient.getCountDownLatch("door"); //设置闭锁的数量 door.trySetCount(5L); //在count数量执行完之前,等待! door.await(); //count执行完,不再等待,执行业务代码 return "人都走完了!放假!"; } @GetMapping("/gogogo") @ResponseBody public String go() throws Exception{ //获取闭锁 RCountDownLatch door = redissonClient.getCountDownLatch("door"); //每次请求,闭锁设置的数量 -1 door.countDown(); return "走了一个人"; }

    6.3 信号量 解释 :类似停车时等车位,只有当有车开走,空出车位时才能停 分布式场景应用:做限流,指定10000个总请求量,到达10000个请求时,阻塞(或者返回false),等其他请求释放再执行

    问:acquire() 和 tryAcquire()的区别? 答:acquire()会一直等待车位! tryAcquire()如果没有车位会返回false!

    代码

    //信号量 占侧位 @GetMapping("/park") @ResponseBody public String park() throws Exception { //获取信号量 比如redis中park原始有3个车位 RSemaphore park = redissonClient.getSemaphore("park"); //占一个车位,如果没有车位,就阻塞等待 park.acquire(); boolean b = park.tryAcquire(); if (b){ //执行业务 }else { //给出提示 return "error"; } return "ok=》"+b; } //信号量 车开走了 @GetMapping("/go1") @ResponseBody public String go1() throws Exception { //获取信号量 比如redis中park原始有3个车位 RSemaphore park = redissonClient.getSemaphore("park"); //释放一个车位 park.release(); return "ok=》"; }

    1、可重入锁,指一个类中,A方法和B方法同时上一把锁,当A获得锁后,B方法也可以执行。 2.公平锁:先申请锁的线程先得到锁,其余在队列中等待锁释放。

    更多详情请关注 redis官网 redisson的其他分布式锁

    Processed: 0.010, SQL: 9