20 - Lock-Condition 的等待通知

    技术2024-04-02  115

    Lock-Condition 的等待通知

    1. condition 的使用1.2 等待方法1.2 唤醒方法1.3 使用举例 2. condition 与 wait / notify3. 源码分析3.1 条件队列3.2 await3.3 signal3.4 过程总结 4. 生产者消费者5. 总结

      在上一篇文章中,我们讲到 Java SDK 并发包里的 Lock 有别于 synchronized 隐式锁的三个特性:能够响应中断、支持超时和非阻塞地获取锁。那今天我们接着再来详细聊聊 Java SDK 并发包里的 Condition,Condition 实现了管程模型里面的条件变量。

      在《07-管程:并发编程的万能钥匙》里我们提到过 Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。

      在很多并发场景下,支持多个条件变量能够让我们的并发程序可读性更好,实现起来也更容易。例如,实现一个阻塞队列,就需要两个条件变量。

      

    1. condition 的使用

    1.2 等待方法

    // 当前线程进入等待状态,如果其他线程调用 condition 的 signal 或者 signalAll 方法 // 并且当前线程获取 Lock 从 await 方法返回,如果在等待状态中被中断会抛出被中断异常 void await() throws InterruptedException // 当前线程进入等待状态直到被通知,中断或者超时 long awaitNanos(long nanosTimeout) // 同第二个方法,支持自定义时间单位 boolean await(long time, TimeUnit unit)throws InterruptedException // 当前线程进入等待状态直到被通知,中断或者到了某个时间 boolean awaitUntil(Date deadline) throws InterruptedException

      

    1.2 唤醒方法

    // 唤醒一个等待在 condition 上的线程,将该线程从等待队列中转移到同步队列中, // 如果在同步队列中能够竞争到 Lock 则可以从等待方法中返回 void signal() // 与 1 的区别在于能够唤醒所有等待在 condition 上的线程 void signalAll()

      

    1.3 使用举例

    public class TestCondition { static ReentrantLock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); static volatile boolean flag = false; public static void main(String[] args) { new Thread(() -> { awaiter(); }, "等待线程").start(); new Thread(() -> { signal(); }, "唤醒线程").start(); } public static void awaiter() { lock.lock(); try { while (!flag) { System.out.println(Thread.currentThread().getName() + ": 条件不满足,等待..."); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ": 条件已满足,接收数据..."); } finally { lock.unlock(); } } public static void signal() { lock.lock(); try { flag = true; System.out.println(Thread.currentThread().getName() + ": 条件准备完成,唤醒等待线程..."); condition.signalAll(); } finally { lock.unlock(); } } } # 运行结果如下: 等待线程: 条件不满足,等待... 唤醒线程: 条件准备完成,唤醒等待线程... 等待线程: 条件已满足,接收数据...

      

    2. condition 与 wait / notify

      Object 的 wait 和 notify/notify 是与 synchronized 配合完成线程间的等待/通知机制,是属于 Java 底层级别的。而 Condition 是语言级别的,具有更高的可控制性和扩展性。具体表现如下:

    wait/notify 方式是响应中断的,当线程处于 Object.wait()的等待状态中,线程中断会抛出中断异常;Condition 有响应中断和不响应中断模式可以选择;wait/notify 方式一个 synchronized 锁只有一个等待队列;一个 Lock 锁可以根据不同的条件,new 多个 Condition 对象,每个对象包含一个等待队列。

    需要注意的是,Condition 同 wait/notify 一样,在等待与唤醒方法使用之前必须获取到该锁。

      

    3. 源码分析

    需要在理解 AQS 及 ReentrantLock 基础上阅读本文源码。《15 - AQS 源码分析》《16 - ReentrantLock 可重入锁》

    3.1 条件队列

    首先看 Condition 对象的创建:

    ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); /** * Returns a {@link Condition} instance for use with this * {@link Lock} instance. * * <p>The returned {@link Condition} instance supports the same * usages as do the {@link Object} monitor methods ({@link * Object#wait() wait}, {@link Object#notify notify}, and {@link * Object#notifyAll notifyAll}) when used with the built-in * monitor lock. * * <ul> * * <li>If this lock is not held when any of the {@link Condition} * {@linkplain Condition#await() waiting} or {@linkplain * Condition#signal signalling} methods are called, then an {@link * IllegalMonitorStateException} is thrown. * * <li>When the condition {@linkplain Condition#await() waiting} * methods are called the lock is released and, before they * return, the lock is reacquired and the lock hold count restored * to what it was when the method was called. * * <li>If a thread is {@linkplain Thread#interrupt interrupted} * while waiting then the wait will terminate, an {@link * InterruptedException} will be thrown, and the thread's * interrupted status will be cleared. * * <li> Waiting threads are signalled in FIFO order. * * <li>The ordering of lock reacquisition for threads returning * from waiting methods is the same as for threads initially * acquiring the lock, which is in the default case not specified, * but for <em>fair</em> locks favors those threads that have been * waiting the longest. * * </ul> * * @return the Condition object */ public Condition newCondition() { return sync.newCondition(); } abstract static class Sync extends AbstractQueuedSynchronizer { final ConditionObject newCondition() { return new ConditionObject(); } }

      创建的 Condition 对象其实就是 ConditionObject 对象,ConditionObject 是 AbstractQueuedSynchronizer(AQS)的内部类,实现了 Condition 接口。

      每个 ConditionObject 对象都有一个条件等待队列,用于保存在该 Condition 对象上等待的线程。条件等待队列是一个单向链表,结点用的 AQS 的 Node 类,每个结点包含线程、next 结点、结点状态。ConditionObject 通过持有头尾指针类管理条件队列。

    注意区分 AQS 的同步队列和 Condition 的条件队列:

    线程抢锁失败时进入 AQS 同步队列,AQS 同步队列中的线程都是等待着随时准备抢锁的;线程因为没有满足某一条件而调用 condition.await()方法之后进入 Condition 条件队列,Condition 条件队列中的线程只能等着,没有获取锁的机会;当条件满足后调用 condition.signal()线程被唤醒,那么线程就从 Condition 条件队列移除,进入 AQS 同步队列,被赋予抢锁继续执行的机会。

    条件队列源码:

    /** * Condition implementation for a {@link * AbstractQueuedSynchronizer} serving as the basis of a {@link * Lock} implementation. * * <p>Method documentation for this class describes mechanics, * not behavioral specifications from the point of view of Lock * and Condition users. Exported versions of this class will in * general need to be accompanied by documentation describing * condition semantics that rely on those of the associated * {@code AbstractQueuedSynchronizer}. * * <p>This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } /** * Adds a new waiter to wait queue. * 入队操作 * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 如果尾结点取消等待了,将其清除出去, // 并检查整个条件队列将已取消的所有结点清除 if (t != null && t.waitStatus != Node.CONDITION) { // 这个方法会遍历整个条件队列,然后会将已取消的所有结点清除出队列 unlinkCancelledWaiters(); t = lastWaiter; } // 将当前线程构造成结点,加入队尾 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; // 维护尾结点指针 return node; } /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. * 遍历整个条件队列,清除已取消等待的结点 */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; // 用于保存前一个结点 while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { // t结点状态不是Node.CONDITION,说明已经取消等待,删除 t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; // 下次循环中t结点的前一个结点 t = next; } } static final class Node { volatile Thread thread;// 每一个节点对应一个线程 Node nextWaiter;// next结点 volatile int waitStatus;// 结点状态 static final int CONDITION = -2;// 结点状态:当前节点进入等待队列中 ... } }

      

    3.2 await

      当调用 condition.await()方法后会使得线程进入到条件队列,此时线程将被阻塞。当调用 condition.signal()方法后,线程从条件队列进入 AQS 同步队列排队等锁。线程在 AQS 中发生的事情这里就不介绍了,不明白的可以看下《15 - AQS 源码分析》。

    /** * 当前线程被阻塞,并加入条件队列 * 线程在AQS同步队列中被唤醒后尝试获取锁 */ public final void await() throws InterruptedException { // 响应打断 if (Thread.interrupted()) throw new InterruptedException(); // 将当前线程构造成结点,加入条件队列队尾,上文详细分析了该方法 Node node = addConditionWaiter(); // 释放锁,线程阻塞前必须将锁释放,下文详解fullyRelease()方法 int savedState = fullyRelease(node); int interruptMode = 0; /* * 1.isOnSyncQueue()检查node是否在AQS同步队列中,不在同步队列中返回false, * 下文详解isOnSyncQueue()方法 * 2.如果node不在AQS同步队列中,将当前线程阻塞 * 3.当其他代码调用signal()方法,线程进入AQS同步队列后被唤醒, * 继续从这里阻塞的地方开始执行 * 4.注意这里while循环的自旋,线程被唤醒以后还要再检查一下node是否在AQS同步队列中 */ while (!isOnSyncQueue(node)) { // 检查node是否在AQS同步队列中 LockSupport.park(this); // 阻塞,线程被唤醒后从这里开始执行 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /* * 到这里,是当前线程在AQS同步队列中被唤醒了,尝试获取锁 * acquireQueued()方法抢锁,抢不到锁就在同步队列中阻塞 * acquireQueued()方法是AQS文章中详细重点讲解过的这里不详细分析了 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } fullyRelease()方法: /** * 将node线程的锁全部释放 * “全部”是指多次重入的情况,这里一次全部释放 */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState();// 锁状态 if (release(savedState)) {// 释放锁 failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } isOnSyncQueue()方法: /** * 检查node是否在AQS同步队列中,在同步队列中返回true */ final boolean isOnSyncQueue(Node node) { // 状态为Node.CONDITION条件等待状态,肯定是在条件队列中,而不在同步队列中 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果node已经有后继节点next,那肯定是在同步队列了 if (node.next != null) return true; // 遍历同步队列,查看是否有与node相等的结点 return findNodeFromTail(node); } /** * 从同步队列的队尾开始从后往前遍历找,如果找到相等的,说明在同步队列, * 否则就是不在同步队列 */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }

      

    3.3 signal

      调用 condition.signal()方法后,线程从 Condition 条件队列移除,进入 AQS 同步队列排队等锁。

    注意:正常情况下 signal 只是将线程从 Condition 条件队列转移到 AQS 同步队列,并没有唤醒线程。线程的唤醒时机是 AQS 中线程的前驱节点释放锁之后。

    public final void signal() { // 验证当前线程持有锁才能调用该方法 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * 从条件队列队头往后遍历,找出第一个需要转移的结点node,将node从条件队列转移到AQS同步队列 * 为什么需要遍历找?因为前有些线程会取消等待,但是可能还在条件队列中 */ private void doSignal(Node first) { do { // 将first中条件队列中移除,将first的next结点作为头结点赋值给firstWaiter if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; /* * transferForSignal()将first结点加入AQS同步队列 * 如果first结点加入同步队列失败,是因为first结点取消了Node.CONDITION状态, * 原因在下面transferForSignal()的讲解中说明 * 如果first结点加入同步队列失败,那么选择first后面的第一个结点进行转移, * 依此类推 */ } while (!transferForSignal(first) && // 将first结点加入AQS同步队列 // first结点加入同步队列失败,选择first后面的结点进行转移 (first = firstWaiter) != null); } /** * 将结点转移到同步队列 * @return true-代表成功转移;false-代表在signal之前,节点已经取消等待了 */ final boolean transferForSignal(Node node) { /* * CAS设置结点状态 * CAS失败说明此node的waitStatus已不是Node.CONDITION,说明节点已经取消。 * 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点 * CAS失败为什么不是其他线程抢先操作了呢?因为这里还持有lock独占锁, * 只有当前线程可以访问。 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node);// 自旋进入同步队列的队尾 int ws = p.waitStatus; // 正常情况下不会走这里,这里是前驱节点取消或者 CAS 失败的情况 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } static final class Node { volatile Thread thread;// 每一个结点对应一个线程 Node nextWaiter;// next结点 volatile int waitStatus;// 结点状态 static final int CONDITION = -2;// 结点状态:当前结点进入等待队列中 }

      

    3.4 过程总结

    整个 Lock 等待通知的过程如下:

    ReentrantLock lock = new ReentrantLock();创建 lock 锁,对应生成 AQS 同步队列,一个 ReentrantLock 锁对应一个 AQS 同步队列;Condition condition = lock.newCondition();创建 condition,对应生成 condition 条件队列;线程 A 调用condition.await();,线程 A 阻塞并加入 condition 同步队列;线程 B 调用condition.signal();,线程 A 阻塞从 condition1 同步队列转移到 AQS 同步队列的队尾;当 AQS 队列中线程 A 的前驱节点线程执行完并释放锁时,将线程 A 唤醒;线程 A 被唤醒之后抢锁,执行逻辑代码。   

    4. 生产者消费者

      下面使用生产者消费者模式模拟一个阻塞队列:

    public class BlockQueue<T> { private Object[] datas; private int size; private int capacity; private Lock lock; private Condition putCondition; private Condition takeconditon; public BlockQueue(int capacity) { this.datas = new Object[capacity]; this.size = 0; this.capacity = capacity; this.lock = new ReentrantLock(); this.putCondition = lock.newCondition(); // 空了 this.takeconditon = lock.newCondition(); // 满了 } public void put(T t) throws Exception { lock.lock(); try { while (size >= capacity) { System.out.println(Thread.currentThread().getName() + " 队列已满"); putCondition.await(); } System.out.println(Thread.currentThread().getName() + " 队列添加数据:" + t); datas[size++] = t; takeconditon.signalAll(); } finally { lock.unlock(); } } public T take() throws Exception { lock.lock(); try { while (size <= 0) { System.out.println(Thread.currentThread().getName() + " 队列已空"); takeconditon.await(); } T value = (T) datas[--size]; System.out.println(Thread.currentThread().getName() + " 队列获取数据:" + value); putCondition.signalAll(); return value; } finally { lock.unlock(); } } public static void main(String[] args) { BlockQueue<Integer> queue = new BlockQueue<>(5); for (int i = 0; i < 10; i++) { new Thread(() -> { try { queue.put(new Random().nextInt(1000)); } catch (Exception e) { e.printStackTrace(); } }, "put 线程").start(); } for (int i = 0; i < 10; i++) { new Thread(() -> { try { queue.take(); } catch (Exception e) { e.printStackTrace(); } }, "take 线程").start(); } } } # 运行结果如下: put 线程 队列添加数据:828 put 线程 队列添加数据:91 put 线程 队列添加数据:750 put 线程 队列添加数据:168 put 线程 队列添加数据:658 put 线程 队列已满 put 线程 队列已满 put 线程 队列已满 put 线程 队列已满 put 线程 队列已满 take 线程 队列获取数据:658 put 线程 队列添加数据:50 put 线程 队列已满 put 线程 队列已满 put 线程 队列已满 put 线程 队列已满 take 线程 队列获取数据:50 take 线程 队列获取数据:168 put 线程 队列添加数据:599 put 线程 队列添加数据:207 put 线程 队列已满 put 线程 队列已满 take 线程 队列获取数据:207 take 线程 队列获取数据:599 take 线程 队列获取数据:750 take 线程 队列获取数据:91 put 线程 队列添加数据:548 put 线程 队列添加数据:684 take 线程 队列获取数据:684 take 线程 队列获取数据:548 take 线程 队列获取数据:828

      

    5. 总结

      Object 的 wait 和 notify/notify 是与 synchronized 配合完成线程间的等待/通知机制,而 Condition 与 Lock 配合完成等待通知机制。

      Condition 比 wait 和 notify 具有更高的可控制性和扩展性,一个 Lock 锁可以有多个 Condition 条件,此外 Condition 还有响应中断和不响应中断模式可以选择。Condition 的使用与 wait/notify 一样,在等待与唤醒方法使用之前必须获取到锁。

      Condition 的实现原理:每个 condition 都有一个条件队列,调用 condition.await()方法将线程阻塞后线程就进入了条件队列,调用 condition.sigal()方法后线程从 condition 条件队列转移到 AQS 同步队列等锁,该线程的前一节点释放锁之后会唤醒该线程抢锁执行。

      Condition 多用于实现的生产者消费者问题

    Processed: 0.020, SQL: 9