四、Redis实现分布式锁(Redisson的正确使用)

    技术2024-12-29  26

    本讲解如何通过Redis实现分布式锁。

    1 无锁的例子

    1.1 创建Maven工程

    在Idea中,创建Maven工程。pom.xml文件如下

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.zyp</groupId> <artifactId>RedisLock</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency> </dependencies> </project>

    创建一个Java类,名为:org.zyp.TestDemo

    package org.zyp; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class TestDemo { private int ticketCount = 10; // 剩余票数量 public static void main(String[] args) { TestDemo testDemo = new TestDemo(); testDemo.test(); } public void test() { try { TicketsSale ticketssale = new TicketsSale(); Thread thread1=new Thread(ticketssale,"售票窗口1"); Thread thread2=new Thread(ticketssale,"售票窗口2"); Thread thread3=new Thread(ticketssale,"售票窗口3"); Thread thread4=new Thread(ticketssale,"售票窗口4"); Thread thread5=new Thread(ticketssale,"售票窗口5"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); Thread.currentThread().join(); // 主线程一直等待不停。 } catch (InterruptedException e) { e.printStackTrace(); } } // 内部类,售票动作 class TicketsSale implements Runnable{ @Override public void run() { while (ticketCount>0){ try { if(ticketCount>0){ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000, 2000)); // 随机时长,模拟业务逻辑时间 ticketCount--; System.out.println(Thread.currentThread().getName()+": 已售出1张票, 还剩余["+ticketCount+"]张票!"); } }catch (Exception e){ e.printStackTrace(); } } } } }

    1.2 运行结果

    售票窗口4: 已售出1张票, 还剩余[9]张票! 售票窗口3: 已售出1张票, 还剩余[8]张票! 售票窗口2: 已售出1张票, 还剩余[7]张票! 售票窗口1: 已售出1张票, 还剩余[6]张票! 售票窗口5: 已售出1张票, 还剩余[5]张票! 售票窗口4: 已售出1张票, 还剩余[4]张票! 售票窗口3: 已售出1张票, 还剩余[3]张票! 售票窗口5: 已售出1张票, 还剩余[2]张票! 售票窗口2: 已售出1张票, 还剩余[1]张票! 售票窗口1: 已售出1张票, 还剩余[0]张票! 售票窗口4: 已售出1张票, 还剩余[-1]张票! 售票窗口2: 已售出1张票, 还剩余[-2]张票! 售票窗口5: 已售出1张票, 还剩余[-3]张票! 售票窗口3: 已售出1张票, 还剩余[-4]张票!

    可以看到票出现了超售问题。

    在多线程并发时,由于无锁,导致ticketCount读、写动作整体不是原子的。

    2 分布式锁

    上面的超售的问题,如果增加锁机制即可解决。

    但是传统使用Java自带的锁,这些锁都是在JVM内部,锁的有效范围是同一个JVM中的多个线程。

    当一个系统是多进程,或者多台机器上进行多节点部署时。 需要要从整体上进行锁控制,而不是JVM局部,这就引出了分布式锁。

    可以通过Redis实现分布式锁,发挥Redis的高性能和单进程串行的优势。 在多个节点(多个JVM)运行时,不使用JVM内部自带的锁,而是统一都把锁交个Redis来进行。

    2.1 实现思路

    Redis中有setnx命令,既该命令会判断Redis中是否已经存在key,若存在则set失败。若不存在key,则set成功。我们可以利用此机制来实现分布式加锁。

    当需要释放锁(解锁),则可以del删除这个key,这样其他线程就可以set成功。

    2.2 RedisLock的实现

    创建一个Java类,名为:org.zyp.RedisLock

    package org.zyp; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing * @date 2020/7/3 23:49 */ public class RedisLock { private JedisPool jedisPool = null; // Jedis连接池(如果是Cluster集群,则应该使用JedisCluster) private SetParams params = new SetParams().nx().ex(30); // set的参数。 expire time=30sec public RedisLock(JedisPool pool) { this.jedisPool = pool; } /** * 加锁。一直阻塞(反复循环) * @param lockID 锁ID */ public void lock(String lockID) { while( ! tryLock(lockID) ) { // 死循环,直到拿到锁。 try { TimeUnit.MICROSECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 单词的获得锁的尝试 * @param lockID 锁ID * @return */ private boolean tryLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { String random_value = UUID.randomUUID().toString(); // 随机生成作为value String result = jedis.set(lockID, random_value, params); // set命令必须在一条语句中,确保原子性 if("OK".equalsIgnoreCase(result)) { return true; } return false; } } /** * 释放锁 * @param lockID 锁ID */ public void unLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { jedis.del(lockID); // 通过del key 释放锁 } } }

    注意:为何需要设置expire time?

    为了避免:某个线程拿到锁后,因某种原因宕掉而未执行unlock释放锁,导致锁长期无法释放,其他现在在申请锁时永远死等待。 通过设置expire time,即使某个已拿到锁的线程宕掉未释放,也有超时机制保障其最终释放。

    2.3 测试运行

    package org.zyp; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.JedisPool; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class TestDemo { private int ticketCount = 10; // 剩余票数量 private JedisPool jedisPool = null; private RedisLock distributedLock; // 分布式锁 private final String LOCK_ID = "lock_ticket"; public static void main(String[] args) { TestDemo testDemo = new TestDemo(); testDemo.test(); } public void test() { try { GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMinIdle(5); poolConfig.setMaxTotal(20); jedisPool = new JedisPool(poolConfig, "192.168.43.201", 6379); TicketsSale ticketssale = new TicketsSale(); Thread thread1=new Thread(ticketssale,"售票窗口1"); Thread thread2=new Thread(ticketssale,"售票窗口2"); Thread thread3=new Thread(ticketssale,"售票窗口3"); Thread thread4=new Thread(ticketssale,"售票窗口4"); Thread thread5=new Thread(ticketssale,"售票窗口5"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); Thread.currentThread().join(); // 主线程一直等待不停。 } catch (InterruptedException e) { e.printStackTrace(); } } // 内部类,售票动作 class TicketsSale implements Runnable{ @Override public void run() { distributedLock = new RedisLock(jedisPool); // 分布式锁 while (ticketCount>0){ try { distributedLock.lock(LOCK_ID); //加锁 if(ticketCount>0){ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000, 2000)); // 随机时长,模拟业务逻辑时间 ticketCount--; System.out.println(Thread.currentThread().getName()+": 已售出1张票, 还剩余["+ticketCount+"]张票!"); } }catch (Exception e){ e.printStackTrace(); }finally { distributedLock.unLock(LOCK_ID); // 释放锁 } } } } }

    多次运行结果如下:

    售票窗口4: 已售出1张票, 还剩余[9]张票! 售票窗口4: 已售出1张票, 还剩余[8]张票! 售票窗口5: 已售出1张票, 还剩余[7]张票! 售票窗口1: 已售出1张票, 还剩余[6]张票! 售票窗口1: 已售出1张票, 还剩余[5]张票! 售票窗口2: 已售出1张票, 还剩余[4]张票! 售票窗口4: 已售出1张票, 还剩余[3]张票! 售票窗口1: 已售出1张票, 还剩余[2]张票! 售票窗口2: 已售出1张票, 还剩余[1]张票! 售票窗口2: 已售出1张票, 还剩余[0]张票!

    可以发现,增加Redis锁后,未出现超售。结果符合预期。

    3 改进RedisLock的实现

    3.1 存在的问题

    上面的RedisLock实现,在每个任务的执行时长大于Redis key的expire time时,则存在问题。

    例如,若将上面的RedisLock.java代码修改:Redis key的expire time减少为1秒。 这时,小于每个任务的执行时长(每个任务执行时长约为1~2秒)。再次运行发现结果仍会出现超售。

    为什么?因为

    当线程1还在执行过程中,由于expire time比较短,Redis将其Key删除。这时其他线程(比如线程2)可以获得锁(set一个key),但此时线程1还未执行完。当线程1执行完后,执行unlock释放锁是,del key动作。 这时del删除的key并不是线程1原来的,而是线程2的。所以导致误删除key。这时其他线程(比如线程3)可以获得锁(set一个key),但此时线程2还未执行完。其他类似。。。。

    由于key被误删除,整体上未能实现资源同步。

    3.2 RedisLock实现(改进方案A:避免误删key)

    为了避免误删除key,我们可以为每个线程生成不同的value(通过UUDI),只有value属于本线程的,才执行del,否则不删除。

    改进org.zyp.RedisLock代码如下:

    package org.zyp; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class RedisLock { private JedisPool jedisPool = null; // Jedis连接池(如果是Cluster集群,则应该使用JedisCluster) private SetParams params = new SetParams().nx().ex(1); // set的参数。 expire time private ThreadLocal<String> threadLocal=new ThreadLocal<>(); // 线程私有变量,仅线程自己可以访问 public RedisLock(JedisPool pool) { this.jedisPool = pool; } /** * 加锁。一直阻塞(反复循环) * @param lockID 锁ID */ public void lock(String lockID) { while( ! tryLock(lockID) ) { // 死循环,直到拿到锁。 try { TimeUnit.MICROSECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 单词的获得锁的尝试 * @param lockID 锁ID * @return */ private boolean tryLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { String random_value = UUID.randomUUID().toString(); // 随机生成作为value String result = jedis.set(lockID, random_value, params); // set命令必须在一条语句中,确保原子性 if("OK".equalsIgnoreCase(result)) { threadLocal.set(random_value); // 获得锁后,将其随机产生的value也保持起来。 return true; } return false; } } /** * 释放锁 * @param lockID 锁ID */ public void unLock(String lockID) throws Exception { // lua脚本,先判断value是否相同,只有value相同才执行del,否则不执行del。 String lua_script="if redis.call(\"get\",KEYS[1])==ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; try(Jedis jedis = jedisPool.getResource()) { Object result = jedis.eval(lua_script, Arrays.asList(lockID), Arrays.asList(threadLocal.get())); // 为了保证原子性,放在lua脚本中一次执行。 if(Integer.valueOf(result.toString())==0){ throw new Exception("解锁失败"); // 抛出这个异常是为了让调用方的事务回滚 } } } }

    测试代码TestDemo不变。 多次执行,可以发现:

    超售问题发生的概率,已经大幅度降低。但仍有超时发生。 因为只是避免了误删除key,但是仍存在多个线程同时进入同步资源代码。这时应该,捕获unLock()方法抛出的异常,配合数据回滚,来保证一致性,避免超售。

    可以发现,这种方案比较繁琐,需要开发代码代码较多(回滚需要结合具体代码),不具有通用性。

    3.3 RedisLock实现(改进方案B:锁续期)

    再此分析发生问题的前提时,获得锁业务线程的执行时长(持有锁时长)大于Redis中key的expire time。 那么我们只要保证这个前提不发生,就不会有问题了。既不断的给Redis中key的expire time延期,确保它的时长大于线程的执行时长。

    对称,对于每个lockID,分别启动一个对应的watchdog线程。 watchdog线程不断的查看ttl剩余时长,如未过期的key则进行延期。

    这里仍有一个问题:万一获得锁业务线程崩溃,watchdog仍在不停延时,这仍存在问题。 解决这个问题的办法是将watchdog设置为守护线程,当业务线程崩溃停止只剩下守护线程时,Java会自动全部停止。

    改进org.zyp.RedisLock代码如下:

    package org.zyp; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams; import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class RedisLock { private JedisPool jedisPool = null; // Jedis连接池(如果是Cluster集群,则应该使用JedisCluster) private SetParams params = new SetParams().nx().ex(1); // set的参数。 expire time private ArrayList<String> watch_lockid_set = new ArrayList<String>(); // 存放已被watchdog监控的lockid列表 public RedisLock(JedisPool pool) { this.jedisPool = pool; } /** * 加锁。一直阻塞(反复循环) * @param lockID 锁ID */ public void lock(String lockID) { while( ! tryLock(lockID) ) { // 死循环,直到拿到锁。 try { TimeUnit.MICROSECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 单词的获得锁的尝试 * @param lockID 锁ID * @return */ private boolean tryLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { String random_value = UUID.randomUUID().toString(); // 随机生成作为value String result = jedis.set(lockID, random_value, params); // set命令必须在一条语句中,确保原子性 if("OK".equalsIgnoreCase(result)) { if( ! watch_lockid_set.contains(lockID)) { // list没有包含,则说明是第一次,则需要开启watchdog(既对于同一个lockid,只需要启动一个watchdog) new Thread(new LockWatchLog(jedisPool, lockID), "LockWatchLog-"+ lockID).start(); // 启动监控watchdog watch_lockid_set.add(lockID); } return true; } return false; } } /** * 释放锁 * @param lockID 锁ID */ public void unLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { jedis.del(lockID); // 通过del key 释放锁 } } // 内部类,定义一个守护线程。不断的给仍存在的key(未被del)延长时间 class LockWatchLog implements Runnable{ private JedisPool jedisPool; private String watch_lockid = null; public LockWatchLog(JedisPool jedisPool, String watch_lockid){ this.jedisPool = jedisPool; this.watch_lockid = watch_lockid; } @Override public void run() { Jedis jedis = jedisPool.getResource(); while (true){ Long current_ttl = jedis.ttl(watch_lockid); // 获得剩余的ttl if( current_ttl != null && current_ttl > 0){ // key还没有过期。 jedis.expire(watch_lockid, (int) (current_ttl+1)); // 则延期 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }else { // 否则 watch_lockid_set.remove(watch_lockid); // 移除 break; // 退出循环,结束本watchdog线程。 } } } } }

    测试代码TestDemo不变。 多次执行,可以发现不会出现超售。

    4 Redisson

    其实在Redis官方文档中,有专门介绍分布式锁内容:Distributed locks with Redis(https://redis.io/topics/distlock)

    其中Java语言的实现是Redisson, 该Redisson的文档为(https://github.com/redisson/redisson/wiki)

    该RedissonRedLock的基本原理如同前文中的改进方案B,也是使用了一个watchdog不断的延长锁的有效期。

    同时,前文例子中都是基于Redis服务器为单机的情况。实际环境中Redis为多机(包括读写分离、Cluster等等)。RedissonRedLock的功能比较强大,它支持Redis Server为多节点,包括主从复制、包括Cluster机器。

    4.1 常用的Java客户端

    概念:

    Jedis:是Redis的Java实现客户端,提供了比较全面的Redis命令的支持。 (官方推荐的Java客户端)Redisson:实现了分布式和可扩展的Java数据结构。 (官方推荐的Java客户端)Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。(官方推荐的Java客户端)

    优点:

    Jedis:比较全面的提供了Redis的操作特性(A blazingly small and sane redis java client)

    Redisson:促使使用者对Redis的关注分离,提供很多分布式相关操作服务,例如,分布式锁,分布式集合,可通过Redis支持延迟队列(distributed and scalable Java data structures on top of Redis server)

    Lettuce:主要在一些分布式缓存框架上使用比较多(Advanced Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.)

    可伸缩:

    Jedis:使用阻塞的I/O,且其方法调用都是同步的,程序流需要等到sockets处理完I/O才能执行,不支持异步。Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。

    Redisson:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作

    Lettuce:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作

    结论:

    建议使用:Jedis + Redisson

    4.2 Jedis与Redisson选型对比

    4.2.1 概述

    Jedis是Redis的Java实现的客户端,其API提供了比较全面的Redis命令的支持;Redisson实现了分布式和可扩展的Java数据结构,与Jedis相比,功能较为简单,不支持字符串操作,不支持排序、事务、管道、分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。

    4.2.2 编程代码

    Jedis中的方法调用是比较底层的暴露的Redis的API,也即Jedis中的Java方法基本和Redis的API保持着一致,了解Redis的API,也就能熟练的使用Jedis。而Redisson中的方法则是进行比较高的抽象,每个方法调用可能进行了一个或多个Redis方法调用。

    如下分别为Jedis和Redisson操作的简单示例:

    // Jedis设置key-value与set操作: Jedis jedis =; jedis.set("key", "value"); List<String> values = jedis.mget("key", "key2", "key3"); // Redisson通过map操作 Redisson redisson = … RMap map = redisson.getMap("my-map"); // implement java.util.Map map.put("key", "value"); map.containsKey("key"); map.get("key");

    4.2.3 数据结构

    Jedis仅支持基本的数据类型如:String、Hash、List、Set、Sorted Set。

    Redisson不仅提供了一系列的分布式Java常用对象,基本可以与Java的基本数据结构通用,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。

    Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。。

    在分布式开发中,Redisson可提供更便捷的方法。

    4.2.4 Redis命令和Redisson API对应

    Redis的命令与Redisson的API方法对应关系,见:https://github.com/redisson/redisson/wiki/11.-Redis-commands-mapping

    4.3 使用Redisson

    4.3.1 在pom.xml加入依赖

    <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.2</version> </dependency>

    4.3.2 基本使用

    package org.zyp; import org.redisson.Redisson; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.redisson.config.Config; /** * @author ZhangYuPing */ public class RedissonDemo { public static void main(String[] args) { Config config = new Config(); // Redisson支持多种模式:单节点服务端、集群服务端、主从复制服务端、哨兵服务端、代理模式等。 // config.useClusterServers() // .setScanInterval(2000) // cluster state scan interval in milliseconds // .addNodeAddress("redis://192.168.43.201:6379", "redis://192.168.43.203:6379", "redis://192.168.43.205:6379"); // use "rediss://" for SSL connection config.useSingleServer().setAddress("redis://192.168.43.201:6379"); RedissonClient redisson = Redisson.create(config); // RMap 继承了 java.util.concurrent.ConcurrentMap 接口 RMap<String, String> rmap = redisson.getMap("personalInfo"); rmap.put("name", "ZhansSan"); //这步不仅设置了Java对象,还同时将数据set到了Redis服务端。下同 rmap.put("address", "Beijing"); rmap.put("phone", "13812341234"); RMap<String, String> rmap_new = redisson.getMap("personalInfo"); System.out.println("New Map size: " + rmap_new.size()); System.out.println("Is New map contains key 'link' ? : " + rmap_new.containsKey("phone")); System.out.println("Value New mapped by key 'name': " + rmap_new.get("name")); boolean added = (rmap_new.putIfAbsent("phone2", "222222222") == null) ; //类似于hsetnx personalInfo phone 222222222 System.out.println("Is value mapped by key 'link' added: " + added); redisson.shutdown(); } }

    4.3.3 使用分布式锁

    基于Redis的分布式可重入Lock对象,实现了java.util.concurrent.locks.Lock接口。

    如果已获取锁的Redisson实例崩溃,则该锁无法解锁而永久挂起。为避免此此问题,Redisson维护锁看门狗,它会在锁持有人Redisson实例处于活动状态时延长锁的到期时间。默认情况下,锁看门狗超时为30秒,可以通过Config.lockWatchdogTimeout设置进行更改。

    Redisson还允许leaseTime在锁定获取期间指定参数。在指定的时间间隔后,锁定的锁将自动释放。

    RLock对象的行为符合Java Lock规范。这意味着只有锁拥有者线程才能解锁它,否则IllegalMonitorStateException将被抛出。否则考虑使用RSemaphore对象。

    锁的基本使用代码,如下:

    RLock lock = redisson.getLock("myLock"); // traditional lock method lock.lock(); // or acquire lock and automatically unlock it after 10 seconds lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }

    Redisson除了支持支持可重入锁,还支持公平锁Fair Lock、MultiLock、ReadWriteLock、Semaphore、PermitExpirableSemaphore、CountDownLatch。下面以最常用的可重入锁为例。

    在pom.xml增加Redisson的依赖。然后修改TestDemo.java,让他使用Redisson的锁。

    package org.zyp; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class TestDemo { private int ticketCount = 10; // 剩余票数量 private RedissonClient redisson = null; private final String LOCK_ID = "lock_ticket"; public static void main(String[] args) { TestDemo testDemo = new TestDemo(); testDemo.test(); } public void test() { try { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.43.201:6379"); redisson = Redisson.create(config); TicketsSale ticketssale = new TicketsSale(); Thread thread1=new Thread(ticketssale,"售票窗口1"); Thread thread2=new Thread(ticketssale,"售票窗口2"); Thread thread3=new Thread(ticketssale,"售票窗口3"); Thread thread4=new Thread(ticketssale,"售票窗口4"); Thread thread5=new Thread(ticketssale,"售票窗口5"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); Thread.currentThread().join(); // 主线程一直等待不停。 } catch (InterruptedException e) { e.printStackTrace(); } } // 内部类,售票动作 class TicketsSale implements Runnable{ @Override public void run() { RLock rLock = redisson.getLock(LOCK_ID);; // 使用Redisson分布式锁 while (ticketCount>0){ try { rLock.lock(10, TimeUnit.SECONDS); // 加锁 acquire lock and automatically unlock it after 10 seconds if(ticketCount>0){ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000, 2000)); // 随机时长,模拟业务逻辑时间 ticketCount--; System.out.println(Thread.currentThread().getName()+": 已售出1张票, 还剩余["+ticketCount+"]张票!"); } }catch (Exception e){ e.printStackTrace(); }finally { try { rLock.unlock(); // 释放锁 } catch (Exception e) { e.printStackTrace(); } } } } } }

    多次运行结果,可以发现:

    如果rlock.lock()设置的自动过期时间比较长(大于业务执行时长),则没有超售现象。如果rlock.lock()设置的自动过期时间比较短,仍会有超时现象。且报异常:java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: cda3b92a-f89c-46c1-834e-25d5ca134cb6 thread-id: 47

    为什么存在超售且有异常呢? 网上有很多文章讲是lockInterruptibly的问题,其说法是错误的。

    真正的原因是:

    问题一:如果lock()方法的入参,如果显示指定了leaseTime(自动过期时长),则watchdog机制不会生效。问题二:如果lock()没有指定入参,则watchdog生效。当LockWatchdogTimeout设置过短(小于业务执行时长)仍会存在问题,LockWatchdogTimeout定义了watchdog看门狗的存活时长,超过时长则不在“看门”(不在执行锁续期),因此LockWatchdogTimeout的时长应该设置长一点(大于所有业务的执行时间),LockWatchdogTimeout设置的长一点,几乎没有负作用。另外:unlock()方法执行前可以增加判断if(rLock.isLocked() && rLock.isHeldByCurrentThread()),只有if条件满足才执行unlock()动作。虽然有个这个判断可避免报异常java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id。 但个人强烈不建议增加此判断(官方的代码也无此判断),因为报此异常通常不是unlock的问题而是程序有问题,应该从根本上去分析解决。而加上这个if判断后,容易把存在程序问题掩盖,难以第一时间发现诊断出问题

    修改TestDemo.java,改动两处:

    代码中显示设置config.setLockWatchdogTimeout()的时长使用无参的rLock.lock()方法,使watchdog生效

    修改后的TestDemo.java代码如下:

    package org.zyp; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing * @date 2020/7/3 23:54 */ public class TestDemo { private int ticketCount = 10; // 剩余票数量 private RedissonClient redisson = null; private final String LOCK_ID = "lock_ticket"; public static void main(String[] args) { TestDemo testDemo = new TestDemo(); testDemo.test(); } public void test() { try { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.43.201:6379"); config.setLockWatchdogTimeout(60000); // 确保LockWatchdogTimeout足够长 redisson = Redisson.create(config); TicketsSale ticketssale = new TicketsSale(); Thread thread1=new Thread(ticketssale,"售票窗口1"); Thread thread2=new Thread(ticketssale,"售票窗口2"); Thread thread3=new Thread(ticketssale,"售票窗口3"); Thread thread4=new Thread(ticketssale,"售票窗口4"); Thread thread5=new Thread(ticketssale,"售票窗口5"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); Thread.currentThread().join(); // 主线程一直等待不停。 } catch (InterruptedException e) { e.printStackTrace(); } } // 内部类,售票动作 class TicketsSale implements Runnable{ @Override public void run() { RLock rLock = redisson.getLock(LOCK_ID);; // 分布式锁 while (ticketCount>0){ try { //rLock.lock(1, TimeUnit.SECONDS); // 加锁(watchdog不生效) acquire lock and automatically unlock it after 10 seconds rLock.lock(); // 加锁(watchdog生效) if(!Thread.currentThread().isInterrupted() && ticketCount>0){ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000, 2000)); // 随机时长,模拟业务逻辑时间 ticketCount--; System.out.println(Thread.currentThread().getName()+": 已售出1张票, 还剩余["+ticketCount+"]张票!"); } }catch (Exception e){ e.printStackTrace(); }finally { try { rLock.unlock(); // 释放锁 } catch (Exception e) { e.printStackTrace(); } } } } } }

    再次反复运行,可以发现不再报异常,也未出现超售。

    综上,建议直接使用Redisson使用分布式锁。

    Processed: 0.009, SQL: 9