本讲解如何通过Redis实现分布式锁。
在Idea中,创建Maven工程。pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.zyp</groupId> <artifactId>RedisLock</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency> </dependencies> </project>创建一个Java类,名为:org.zyp.TestDemo
package org.zyp; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class TestDemo { private int ticketCount = 10; // 剩余票数量 public static void main(String[] args) { TestDemo testDemo = new TestDemo(); testDemo.test(); } public void test() { try { TicketsSale ticketssale = new TicketsSale(); Thread thread1=new Thread(ticketssale,"售票窗口1"); Thread thread2=new Thread(ticketssale,"售票窗口2"); Thread thread3=new Thread(ticketssale,"售票窗口3"); Thread thread4=new Thread(ticketssale,"售票窗口4"); Thread thread5=new Thread(ticketssale,"售票窗口5"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); Thread.currentThread().join(); // 主线程一直等待不停。 } catch (InterruptedException e) { e.printStackTrace(); } } // 内部类,售票动作 class TicketsSale implements Runnable{ @Override public void run() { while (ticketCount>0){ try { if(ticketCount>0){ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000, 2000)); // 随机时长,模拟业务逻辑时间 ticketCount--; System.out.println(Thread.currentThread().getName()+": 已售出1张票, 还剩余["+ticketCount+"]张票!"); } }catch (Exception e){ e.printStackTrace(); } } } } }可以看到票出现了超售问题。
在多线程并发时,由于无锁,导致ticketCount读、写动作整体不是原子的。
上面的超售的问题,如果增加锁机制即可解决。
但是传统使用Java自带的锁,这些锁都是在JVM内部,锁的有效范围是同一个JVM中的多个线程。
当一个系统是多进程,或者多台机器上进行多节点部署时。 需要要从整体上进行锁控制,而不是JVM局部,这就引出了分布式锁。
可以通过Redis实现分布式锁,发挥Redis的高性能和单进程串行的优势。 在多个节点(多个JVM)运行时,不使用JVM内部自带的锁,而是统一都把锁交个Redis来进行。
Redis中有setnx命令,既该命令会判断Redis中是否已经存在key,若存在则set失败。若不存在key,则set成功。我们可以利用此机制来实现分布式加锁。
当需要释放锁(解锁),则可以del删除这个key,这样其他线程就可以set成功。
创建一个Java类,名为:org.zyp.RedisLock
package org.zyp; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing * @date 2020/7/3 23:49 */ public class RedisLock { private JedisPool jedisPool = null; // Jedis连接池(如果是Cluster集群,则应该使用JedisCluster) private SetParams params = new SetParams().nx().ex(30); // set的参数。 expire time=30sec public RedisLock(JedisPool pool) { this.jedisPool = pool; } /** * 加锁。一直阻塞(反复循环) * @param lockID 锁ID */ public void lock(String lockID) { while( ! tryLock(lockID) ) { // 死循环,直到拿到锁。 try { TimeUnit.MICROSECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 单词的获得锁的尝试 * @param lockID 锁ID * @return */ private boolean tryLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { String random_value = UUID.randomUUID().toString(); // 随机生成作为value String result = jedis.set(lockID, random_value, params); // set命令必须在一条语句中,确保原子性 if("OK".equalsIgnoreCase(result)) { return true; } return false; } } /** * 释放锁 * @param lockID 锁ID */ public void unLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { jedis.del(lockID); // 通过del key 释放锁 } } }注意:为何需要设置expire time?
为了避免:某个线程拿到锁后,因某种原因宕掉而未执行unlock释放锁,导致锁长期无法释放,其他现在在申请锁时永远死等待。 通过设置expire time,即使某个已拿到锁的线程宕掉未释放,也有超时机制保障其最终释放。
多次运行结果如下:
售票窗口4: 已售出1张票, 还剩余[9]张票! 售票窗口4: 已售出1张票, 还剩余[8]张票! 售票窗口5: 已售出1张票, 还剩余[7]张票! 售票窗口1: 已售出1张票, 还剩余[6]张票! 售票窗口1: 已售出1张票, 还剩余[5]张票! 售票窗口2: 已售出1张票, 还剩余[4]张票! 售票窗口4: 已售出1张票, 还剩余[3]张票! 售票窗口1: 已售出1张票, 还剩余[2]张票! 售票窗口2: 已售出1张票, 还剩余[1]张票! 售票窗口2: 已售出1张票, 还剩余[0]张票!可以发现,增加Redis锁后,未出现超售。结果符合预期。
上面的RedisLock实现,在每个任务的执行时长大于Redis key的expire time时,则存在问题。
例如,若将上面的RedisLock.java代码修改:Redis key的expire time减少为1秒。 这时,小于每个任务的执行时长(每个任务执行时长约为1~2秒)。再次运行发现结果仍会出现超售。
为什么?因为
当线程1还在执行过程中,由于expire time比较短,Redis将其Key删除。这时其他线程(比如线程2)可以获得锁(set一个key),但此时线程1还未执行完。当线程1执行完后,执行unlock释放锁是,del key动作。 这时del删除的key并不是线程1原来的,而是线程2的。所以导致误删除key。这时其他线程(比如线程3)可以获得锁(set一个key),但此时线程2还未执行完。其他类似。。。。由于key被误删除,整体上未能实现资源同步。
为了避免误删除key,我们可以为每个线程生成不同的value(通过UUDI),只有value属于本线程的,才执行del,否则不删除。
改进org.zyp.RedisLock代码如下:
package org.zyp; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class RedisLock { private JedisPool jedisPool = null; // Jedis连接池(如果是Cluster集群,则应该使用JedisCluster) private SetParams params = new SetParams().nx().ex(1); // set的参数。 expire time private ThreadLocal<String> threadLocal=new ThreadLocal<>(); // 线程私有变量,仅线程自己可以访问 public RedisLock(JedisPool pool) { this.jedisPool = pool; } /** * 加锁。一直阻塞(反复循环) * @param lockID 锁ID */ public void lock(String lockID) { while( ! tryLock(lockID) ) { // 死循环,直到拿到锁。 try { TimeUnit.MICROSECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 单词的获得锁的尝试 * @param lockID 锁ID * @return */ private boolean tryLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { String random_value = UUID.randomUUID().toString(); // 随机生成作为value String result = jedis.set(lockID, random_value, params); // set命令必须在一条语句中,确保原子性 if("OK".equalsIgnoreCase(result)) { threadLocal.set(random_value); // 获得锁后,将其随机产生的value也保持起来。 return true; } return false; } } /** * 释放锁 * @param lockID 锁ID */ public void unLock(String lockID) throws Exception { // lua脚本,先判断value是否相同,只有value相同才执行del,否则不执行del。 String lua_script="if redis.call(\"get\",KEYS[1])==ARGV[1] then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; try(Jedis jedis = jedisPool.getResource()) { Object result = jedis.eval(lua_script, Arrays.asList(lockID), Arrays.asList(threadLocal.get())); // 为了保证原子性,放在lua脚本中一次执行。 if(Integer.valueOf(result.toString())==0){ throw new Exception("解锁失败"); // 抛出这个异常是为了让调用方的事务回滚 } } } }测试代码TestDemo不变。 多次执行,可以发现:
超售问题发生的概率,已经大幅度降低。但仍有超时发生。 因为只是避免了误删除key,但是仍存在多个线程同时进入同步资源代码。这时应该,捕获unLock()方法抛出的异常,配合数据回滚,来保证一致性,避免超售。可以发现,这种方案比较繁琐,需要开发代码代码较多(回滚需要结合具体代码),不具有通用性。
再此分析发生问题的前提时,获得锁业务线程的执行时长(持有锁时长)大于Redis中key的expire time。 那么我们只要保证这个前提不发生,就不会有问题了。既不断的给Redis中key的expire time延期,确保它的时长大于线程的执行时长。
对称,对于每个lockID,分别启动一个对应的watchdog线程。 watchdog线程不断的查看ttl剩余时长,如未过期的key则进行延期。
这里仍有一个问题:万一获得锁业务线程崩溃,watchdog仍在不停延时,这仍存在问题。 解决这个问题的办法是将watchdog设置为守护线程,当业务线程崩溃停止只剩下守护线程时,Java会自动全部停止。
改进org.zyp.RedisLock代码如下:
package org.zyp; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.params.SetParams; import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class RedisLock { private JedisPool jedisPool = null; // Jedis连接池(如果是Cluster集群,则应该使用JedisCluster) private SetParams params = new SetParams().nx().ex(1); // set的参数。 expire time private ArrayList<String> watch_lockid_set = new ArrayList<String>(); // 存放已被watchdog监控的lockid列表 public RedisLock(JedisPool pool) { this.jedisPool = pool; } /** * 加锁。一直阻塞(反复循环) * @param lockID 锁ID */ public void lock(String lockID) { while( ! tryLock(lockID) ) { // 死循环,直到拿到锁。 try { TimeUnit.MICROSECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 单词的获得锁的尝试 * @param lockID 锁ID * @return */ private boolean tryLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { String random_value = UUID.randomUUID().toString(); // 随机生成作为value String result = jedis.set(lockID, random_value, params); // set命令必须在一条语句中,确保原子性 if("OK".equalsIgnoreCase(result)) { if( ! watch_lockid_set.contains(lockID)) { // list没有包含,则说明是第一次,则需要开启watchdog(既对于同一个lockid,只需要启动一个watchdog) new Thread(new LockWatchLog(jedisPool, lockID), "LockWatchLog-"+ lockID).start(); // 启动监控watchdog watch_lockid_set.add(lockID); } return true; } return false; } } /** * 释放锁 * @param lockID 锁ID */ public void unLock(String lockID) { try(Jedis jedis = jedisPool.getResource()) { jedis.del(lockID); // 通过del key 释放锁 } } // 内部类,定义一个守护线程。不断的给仍存在的key(未被del)延长时间 class LockWatchLog implements Runnable{ private JedisPool jedisPool; private String watch_lockid = null; public LockWatchLog(JedisPool jedisPool, String watch_lockid){ this.jedisPool = jedisPool; this.watch_lockid = watch_lockid; } @Override public void run() { Jedis jedis = jedisPool.getResource(); while (true){ Long current_ttl = jedis.ttl(watch_lockid); // 获得剩余的ttl if( current_ttl != null && current_ttl > 0){ // key还没有过期。 jedis.expire(watch_lockid, (int) (current_ttl+1)); // 则延期 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }else { // 否则 watch_lockid_set.remove(watch_lockid); // 移除 break; // 退出循环,结束本watchdog线程。 } } } } }测试代码TestDemo不变。 多次执行,可以发现不会出现超售。
其实在Redis官方文档中,有专门介绍分布式锁内容:Distributed locks with Redis(https://redis.io/topics/distlock)
其中Java语言的实现是Redisson, 该Redisson的文档为(https://github.com/redisson/redisson/wiki)
该RedissonRedLock的基本原理如同前文中的改进方案B,也是使用了一个watchdog不断的延长锁的有效期。
同时,前文例子中都是基于Redis服务器为单机的情况。实际环境中Redis为多机(包括读写分离、Cluster等等)。RedissonRedLock的功能比较强大,它支持Redis Server为多节点,包括主从复制、包括Cluster机器。
概念:
Jedis:是Redis的Java实现客户端,提供了比较全面的Redis命令的支持。 (官方推荐的Java客户端)Redisson:实现了分布式和可扩展的Java数据结构。 (官方推荐的Java客户端)Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。(官方推荐的Java客户端)优点:
Jedis:比较全面的提供了Redis的操作特性(A blazingly small and sane redis java client)
Redisson:促使使用者对Redis的关注分离,提供很多分布式相关操作服务,例如,分布式锁,分布式集合,可通过Redis支持延迟队列(distributed and scalable Java data structures on top of Redis server)
Lettuce:主要在一些分布式缓存框架上使用比较多(Advanced Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.)
可伸缩:
Jedis:使用阻塞的I/O,且其方法调用都是同步的,程序流需要等到sockets处理完I/O才能执行,不支持异步。Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。
Redisson:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作
Lettuce:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作
结论:
建议使用:Jedis + Redisson
Jedis是Redis的Java实现的客户端,其API提供了比较全面的Redis命令的支持;Redisson实现了分布式和可扩展的Java数据结构,与Jedis相比,功能较为简单,不支持字符串操作,不支持排序、事务、管道、分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。
Jedis中的方法调用是比较底层的暴露的Redis的API,也即Jedis中的Java方法基本和Redis的API保持着一致,了解Redis的API,也就能熟练的使用Jedis。而Redisson中的方法则是进行比较高的抽象,每个方法调用可能进行了一个或多个Redis方法调用。
如下分别为Jedis和Redisson操作的简单示例:
// Jedis设置key-value与set操作: Jedis jedis = …; jedis.set("key", "value"); List<String> values = jedis.mget("key", "key2", "key3"); // Redisson通过map操作 Redisson redisson = … RMap map = redisson.getMap("my-map"); // implement java.util.Map map.put("key", "value"); map.containsKey("key"); map.get("key");Jedis仅支持基本的数据类型如:String、Hash、List、Set、Sorted Set。
Redisson不仅提供了一系列的分布式Java常用对象,基本可以与Java的基本数据结构通用,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。。
在分布式开发中,Redisson可提供更便捷的方法。
Redis的命令与Redisson的API方法对应关系,见:https://github.com/redisson/redisson/wiki/11.-Redis-commands-mapping
基于Redis的分布式可重入Lock对象,实现了java.util.concurrent.locks.Lock接口。
如果已获取锁的Redisson实例崩溃,则该锁无法解锁而永久挂起。为避免此此问题,Redisson维护锁看门狗,它会在锁持有人Redisson实例处于活动状态时延长锁的到期时间。默认情况下,锁看门狗超时为30秒,可以通过Config.lockWatchdogTimeout设置进行更改。
Redisson还允许leaseTime在锁定获取期间指定参数。在指定的时间间隔后,锁定的锁将自动释放。
RLock对象的行为符合Java Lock规范。这意味着只有锁拥有者线程才能解锁它,否则IllegalMonitorStateException将被抛出。否则考虑使用RSemaphore对象。
锁的基本使用代码,如下:
RLock lock = redisson.getLock("myLock"); // traditional lock method lock.lock(); // or acquire lock and automatically unlock it after 10 seconds lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }Redisson除了支持支持可重入锁,还支持公平锁Fair Lock、MultiLock、ReadWriteLock、Semaphore、PermitExpirableSemaphore、CountDownLatch。下面以最常用的可重入锁为例。
在pom.xml增加Redisson的依赖。然后修改TestDemo.java,让他使用Redisson的锁。
package org.zyp; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing */ public class TestDemo { private int ticketCount = 10; // 剩余票数量 private RedissonClient redisson = null; private final String LOCK_ID = "lock_ticket"; public static void main(String[] args) { TestDemo testDemo = new TestDemo(); testDemo.test(); } public void test() { try { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.43.201:6379"); redisson = Redisson.create(config); TicketsSale ticketssale = new TicketsSale(); Thread thread1=new Thread(ticketssale,"售票窗口1"); Thread thread2=new Thread(ticketssale,"售票窗口2"); Thread thread3=new Thread(ticketssale,"售票窗口3"); Thread thread4=new Thread(ticketssale,"售票窗口4"); Thread thread5=new Thread(ticketssale,"售票窗口5"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); Thread.currentThread().join(); // 主线程一直等待不停。 } catch (InterruptedException e) { e.printStackTrace(); } } // 内部类,售票动作 class TicketsSale implements Runnable{ @Override public void run() { RLock rLock = redisson.getLock(LOCK_ID);; // 使用Redisson分布式锁 while (ticketCount>0){ try { rLock.lock(10, TimeUnit.SECONDS); // 加锁 acquire lock and automatically unlock it after 10 seconds if(ticketCount>0){ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000, 2000)); // 随机时长,模拟业务逻辑时间 ticketCount--; System.out.println(Thread.currentThread().getName()+": 已售出1张票, 还剩余["+ticketCount+"]张票!"); } }catch (Exception e){ e.printStackTrace(); }finally { try { rLock.unlock(); // 释放锁 } catch (Exception e) { e.printStackTrace(); } } } } } }多次运行结果,可以发现:
如果rlock.lock()设置的自动过期时间比较长(大于业务执行时长),则没有超售现象。如果rlock.lock()设置的自动过期时间比较短,仍会有超时现象。且报异常:java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: cda3b92a-f89c-46c1-834e-25d5ca134cb6 thread-id: 47为什么存在超售且有异常呢? 网上有很多文章讲是lockInterruptibly的问题,其说法是错误的。
真正的原因是:
问题一:如果lock()方法的入参,如果显示指定了leaseTime(自动过期时长),则watchdog机制不会生效。问题二:如果lock()没有指定入参,则watchdog生效。当LockWatchdogTimeout设置过短(小于业务执行时长)仍会存在问题,LockWatchdogTimeout定义了watchdog看门狗的存活时长,超过时长则不在“看门”(不在执行锁续期),因此LockWatchdogTimeout的时长应该设置长一点(大于所有业务的执行时间),LockWatchdogTimeout设置的长一点,几乎没有负作用。另外:unlock()方法执行前可以增加判断if(rLock.isLocked() && rLock.isHeldByCurrentThread()),只有if条件满足才执行unlock()动作。虽然有个这个判断可避免报异常java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id。 但个人强烈不建议增加此判断(官方的代码也无此判断),因为报此异常通常不是unlock的问题而是程序有问题,应该从根本上去分析解决。而加上这个if判断后,容易把存在程序问题掩盖,难以第一时间发现诊断出问题修改TestDemo.java,改动两处:
代码中显示设置config.setLockWatchdogTimeout()的时长使用无参的rLock.lock()方法,使watchdog生效修改后的TestDemo.java代码如下:
package org.zyp; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * @author ZhangYuPing * @date 2020/7/3 23:54 */ public class TestDemo { private int ticketCount = 10; // 剩余票数量 private RedissonClient redisson = null; private final String LOCK_ID = "lock_ticket"; public static void main(String[] args) { TestDemo testDemo = new TestDemo(); testDemo.test(); } public void test() { try { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.43.201:6379"); config.setLockWatchdogTimeout(60000); // 确保LockWatchdogTimeout足够长 redisson = Redisson.create(config); TicketsSale ticketssale = new TicketsSale(); Thread thread1=new Thread(ticketssale,"售票窗口1"); Thread thread2=new Thread(ticketssale,"售票窗口2"); Thread thread3=new Thread(ticketssale,"售票窗口3"); Thread thread4=new Thread(ticketssale,"售票窗口4"); Thread thread5=new Thread(ticketssale,"售票窗口5"); thread1.start(); thread2.start(); thread3.start(); thread4.start(); thread5.start(); Thread.currentThread().join(); // 主线程一直等待不停。 } catch (InterruptedException e) { e.printStackTrace(); } } // 内部类,售票动作 class TicketsSale implements Runnable{ @Override public void run() { RLock rLock = redisson.getLock(LOCK_ID);; // 分布式锁 while (ticketCount>0){ try { //rLock.lock(1, TimeUnit.SECONDS); // 加锁(watchdog不生效) acquire lock and automatically unlock it after 10 seconds rLock.lock(); // 加锁(watchdog生效) if(!Thread.currentThread().isInterrupted() && ticketCount>0){ TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000, 2000)); // 随机时长,模拟业务逻辑时间 ticketCount--; System.out.println(Thread.currentThread().getName()+": 已售出1张票, 还剩余["+ticketCount+"]张票!"); } }catch (Exception e){ e.printStackTrace(); }finally { try { rLock.unlock(); // 释放锁 } catch (Exception e) { e.printStackTrace(); } } } } } }再次反复运行,可以发现不再报异常,也未出现超售。
综上,建议直接使用Redisson使用分布式锁。