bilibili-Java并发学习笔记13 AQS 概览
基于 java 1.8.0
AbstractQueuedSynchronizer 源码
package java.util.concurrent.locks; import java.util.concurrent.TimeUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import sun.misc.Unsafe; /** * 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。 * 此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。 * 子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。 * 假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。 * 子类可以维护其他状态字段,但只是为了获得同步而只追踪使用 getState()、setState(int) * 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。 * * 应该将子类定义为非公共内部帮助器类,可用它们来实现其封闭类的同步属性。 * 类 AbstractQueuedSynchronizer 没有实现任何同步接口。 * 而是定义了诸如 acquireInterruptibly(int) 之类的一些方法, * 在适当的时候可以通过具体的锁和相关同步器来调用它们,以实现其公共方法。 * * 此类支持默认的独占模式和共享模式之一,或者二者都支持。 * 处于独占模式下时,其他线程试图获取该锁将无法取得成功。 * 在共享模式下,多个线程获取某个锁可能(但不是一定)会获得成功。 * 此类并不“了解”这些不同,除了机械地意识到当在共享模式下成功获取某一锁时,下一个等待线程(如果存在)也必须确定自己是否可以成功获取该锁。 * 处于不同模式下的等待线程可以共享相同的 FIFO 队列。通常,实现子类只支持其中一种模式,但两种模式都可以在(例如)ReadWriteLock 中发挥作用。 * 只支持独占模式或者只支持共享模式的子类不必定义支持未使用模式的方法。 * * 此类通过支持独占模式的子类定义了一个嵌套的 AbstractQueuedSynchronizer.ConditionObject 类,可以将这个类用作 Condition 实现。 * isHeldExclusively() 方法将报告同步对于当前线程是否是独占的; * 使用当前 getState() 值调用 release(int) 方法则可以完全释放此对象; * 如果给定保存的状态值,那么 acquire(int) 方法可以将此对象最终恢复为它以前获取的状态。 * 没有别的 AbstractQueuedSynchronizer 方法创建这样的条件,因此,如果无法满足此约束,则不要使用它。 * AbstractQueuedSynchronizer.ConditionObject 的行为当然取决于其同步器实现的语义。 * * 此类为内部队列提供了检查、检测和监视方法,还为 condition 对象提供了类似方法。 * 可以根据需要使用用于其同步机制的 AbstractQueuedSynchronizer 将这些方法导出到类中。 * * 此类的序列化只存储维护状态的基础原子整数,因此已序列化的对象拥有空的线程队列。 * 需要可序列化的典型子类将定义一个 readObject 方法,该方法在反序列化时将此对象恢复到某个已知初始状态。 * * 使用 * * 为了将此类用作同步器的基础,需要适当地重新定义以下方法, * 这是通过使用 getState()、setState(int) 和/或 compareAndSetState(int, int) 方法来检查和/或修改同步状态来实现的: * * tryAcquire * tryRelease * tryAcquireShared * tryReleaseShared * isHeldExclusively * * 默认情况下,每个方法都抛出 UnsupportedOperationException。 * 这些方法的实现在内部必须是线程安全的,通常应该很短并且不被阻塞。 * 定义这些方法是使用此类的‘唯一’受支持的方式。其他所有方法都被声明为 final,因为它们无法是各不相同的。 * * 您也可以查找从 AbstractOwnableSynchronizer 继承的方法,用于跟踪拥有独占同步器的线程。 * 鼓励使用这些方法,这允许监控和诊断工具来帮助用户确定哪个线程保持锁。 * * 即使此类基于内部的某个 FIFO 队列,它也无法强行实施 FIFO 获取策略。独占同步的核心采用以下形式: * * Acquire: * while (!tryAcquire(arg)) { * enqueue thread if it is not already queued; * possibly block current thread; * } * * Release: * if (tryRelease(arg)) * unblock the first queued thread; * * (共享模式与此类似,但可能涉及级联信号。.) * * 因为要在加入队列之前检查线程的获取状况,所以新获取的线程可能闯入其他被阻塞的和已加入队列的线程之前。 * 不过如果需要,可以内部调用一个或多个检查方法,通过定义 tryAcquire 和/或 tryAcquireShared 来禁用闯入。 * 特别是 getFirstQueuedThread() 没有返回当前线程的时候,严格的 FIFO 锁定可以定义 tryAcquire 立即返回 false。 * 只有 hasQueuedThreads() 返回 true 并且 getFirstQueuedThread 不是当前线程时,更好的非严格公平的版本才可能会立即返回 false; * 如果 getFirstQueuedThread 不为 null 并且不是当前线程,则产生的结果相同。出现进一步的变体也是有可能的。 * * 对于默认闯入(也称为 greedy、renouncement 和 convoy-avoidance)策略,吞吐量和可伸缩性通常是最高的。 * 尽管无法保证这是公平的或是无偏向的,但允许更早加入队列的线程先于更迟加入队列的线程再次争用资源,并且相对于传入的线程,每个参与再争用的线程都有平等的成功机会。 * 此外,尽管从一般意义上说,获取并非“自旋”,它们可以在阻塞之前对用其他计算所使用的 tryAcquire 执行多次调用。 * 在只保持独占同步时,这为自旋提供了最大的好处,但不是这种情况时,也不会带来最大的负担。 * 如果需要这样做,那么可以使用“快速路径”检查来先行调用 acquire 方法,以这种方式扩充这一点, * 如果可能不需要争用同步器,则只能通过预先检查 hasContended() 和/或 hasQueuedThreads() 来确认这一点。 * * 通过特殊化其同步器的使用范围,此类为部分同步化提供了一个有效且可伸缩的基础, * 同步器可以依赖于 int 型的 state、acquire 和 release 参数,以及一个内部的 FIFO 等待队列。 * 这些还不够的时候,可以使用 atomic 类、自己的定制 Queue 类和 LockSupport 阻塞支持,从更低级别构建同步器。 * * 使用示例 * * 以下是一个非再进入的互斥锁类,它使用值 0 表示未锁定状态,使用 1 表示锁定状态。 * 当非重入锁定不严格地需要当前拥有者线程的记录时,此类使得使用监视器更加方便。 * 它还支持一些条件并公开了一个检测方法: * * class Mutex implements Lock, java.io.Serializable { * * // Our internal helper class * private static class Sync extends AbstractQueuedSynchronizer { * // Reports whether in locked state * protected boolean isHeldExclusively() { * return getState() == 1; * } * * // Acquires the lock if state is zero * public boolean tryAcquire(int acquires) { * assert acquires == 1; // Otherwise unused * if (compareAndSetState(0, 1)) { * setExclusiveOwnerThread(Thread.currentThread()); * return true; * } * return false; * } * * // Releases the lock by setting state to zero * protected boolean tryRelease(int releases) { * assert releases == 1; // Otherwise unused * if (getState() == 0) throw new IllegalMonitorStateException(); * setExclusiveOwnerThread(null); * setState(0); * return true; * } * * // Provides a Condition * Condition newCondition() { return new ConditionObject(); } * * // Deserializes properly * private void readObject(ObjectInputStream s) * throws IOException, ClassNotFoundException { * s.defaultReadObject(); * setState(0); // reset to unlocked state * } * } * * // The sync object does all the hard work. We just forward to it. * private final Sync sync = new Sync(); * * public void lock() { sync.acquire(1); } * public boolean tryLock() { return sync.tryAcquire(1); } * public void unlock() { sync.release(1); } * public Condition newCondition() { return sync.newCondition(); } * public boolean isLocked() { return sync.isHeldExclusively(); } * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } * public void lockInterruptibly() throws InterruptedException { * sync.acquireInterruptibly(1); * } * public boolean tryLock(long timeout, TimeUnit unit) * throws InterruptedException { * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); * } * }} * * 以下是一个锁存器类,它类似于 CountDownLatch,除了只需要触发单个 signal 之外。 * 因为锁存器是非独占的,所以它使用 shared 的获取和释放方法。 * * class BooleanLatch { * * private static class Sync extends AbstractQueuedSynchronizer { * boolean isSignalled() { return getState() != 0; } * * protected int tryAcquireShared(int ignore) { * return isSignalled() ? 1 : -1; * } * * protected boolean tryReleaseShared(int ignore) { * setState(1); * return true; * } * } * * private final Sync sync = new Sync(); * public boolean isSignalled() { return sync.isSignalled(); } * public void signal() { sync.releaseShared(1); } * public void await() throws InterruptedException { * sync.acquireSharedInterruptibly(1); * } * }} * * @since 1.5 * @author Doug Lea */ public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; /** * 创建具有初始同步状态 0 的新 AbstractQueuedSynchronizer 实例 */ protected AbstractQueuedSynchronizer() { } /** * 等待队列 node 类. * * 等待队列是"CLH"(Craig, Landin, and Hagersten)锁队列的变体。CLH 锁通常用于自旋锁。 * 相反,我们使用它们来阻止同步器,但使用相同的基本策略,在其节点的前身中存储有关线程的一些控制信息。 * 每个节点中的 "status" 字段跟踪线程是否应该阻止。 * 节点在其前置任务释放时被通知。队列的每个节点都充当一个特定的通知样式监视器,其中包含一个等待线程。 * 但是,status字段不控制线程是否被授予锁等。如果线程是队列中的第一个线程,它可能会尝试获取。但做第一并不能保证成功; * 它只给了争辩的权利。所以当前发布的竞争者线程可能需要重新等待。 * * 为了排队进入一个CLH锁,你需要把它作为一个新的尾部进行原子拼接。要出列,只需设置head字段。 * * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * * 插入 CLH 队列只需要在“tail”上执行单个原子操作,因此有一个从 unqueued 到 queued 的简单原子划分点。 * 类似地,出列只涉及更新“头部”。然而,节点需要更多的工作来确定谁是它们的继承者,部分地处理由于超时和中断而可能发生的取消。 * * “prev”链接(在最初的CLH锁中没有使用),主要用于处理取消。如果节点被取消,其后续节点(通常)将重新链接到未取消的前置节点。 * 有关自旋锁的类似机制的解释,请参阅Scott和Scherer的论文 * http://www.cs.rochester.edu/u/scott/synchronization/ * * 我们还使用“next”链接来实现阻塞机制。 * 每个节点的线程id都保存在它自己的节点中,因此前置节点通过遍历下一个链接来确定它是哪个线程来通知下一个节点唤醒。 * 确定后继者必须避免与新排队的节点争用,以设置其前置节点的“下一个”字段。 * 当一个节点的后继者显示为空时,可以通过从原子更新的“tail”向后检查来解决这个问题。 * (或者,换言之,下一个链接是一个优化,这样我们通常不需要反向扫描。) * 对消在基本算法中引入了一些保守性。 * 因为我们必须轮询其他节点的取消,所以我们可能忽略了被取消的节点是在我们前面还是后面。 * 处理这一问题的办法是,在取消继承人时,总是不加区分,让他们稳定在一个新的前任身上,除非我们能确定一个未被取消的前任将承担这一责任。 * * CLH队列需要一个虚拟头节点才能开始。但我们不会在建筑上创造它们,因为如果没有争论的话,那将是浪费精力。 * 相反,在第一次争用时构造节点并设置头和尾指针。等待条件的线程使用相同的节点,但使用附加链接。 * 条件只需要链接简单(非并发)链接队列中的节点,因为它们只有在独占保留时才被访问。等待时,将节点插入条件队列。 * 收到信号后,节点被转移到主队列。status字段的一个特殊值用于标记节点所在的队列。 * * 感谢Dave Dice、Mark Moir、Victor Luchangco、Bill Scherer和Michael Scott,以及JSR-166专家组的成员,感谢他们对本课程设计的有益想法、讨论和评论。 */ static final class Node { /** 表示节点正在共享模式下等待的标记 */ static final Node SHARED = new Node(); /** 标记以指示节点正在独占模式下等待 */ static final Node EXCLUSIVE = null; /** waitStatus值,指示线程已取消 */ static final int CANCELLED = 1; /** waitStatus值,指示后续线程需要断开连接 */ static final int SIGNAL = -1; /** waitStatus值,指示线程正在等待条件 */ static final int CONDITION = -2; /** * waitStatus值,指示下一个acquireShared应无条件传播 */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回上一个节点,如果为null,则抛出NullPointerException。当前置任务不能为空时使用。可以省略null检查,但它的存在是为了帮助VM。 * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * 等待队列的头,延迟初始化。除了初始化,它只能通过方法setHead进行修改。注意:如果head存在,它的waitStatus保证不会被取消。 */ private transient volatile Node head; /** * 等待队列的尾部,延迟初始化。仅通过方法enq修改以添加新的等待节点。 */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state; /** * 返回同步状态的当前值。此操作具有 volatile 读的内存语义。 */ protected final int getState() { return state; } /** * 设置同步状态的值。此操作具有 volatile 写的内存语义。 */ protected final void setState(int newState) { state = newState; } // Main exported methods /** * 试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。 * * 此方法总是由执行 acquire 的线程来调用。 * 如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。 * 可以用此方法来实现 Lock.tryLock() 方法。 * * 默认实现将抛出 UnsupportedOperationException。 * * @param arg acquire 参数。该值总是传递给 acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。 * @return 如果成功,则返回 true。在成功的时候,此对象已经被获取。 * @throws IllegalMonitorStateException 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。 * @throws UnsupportedOperationException 如果不支持独占模式 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 试图设置状态来反映独占模式下的一个释放。 * * 此方法总是由正在执行释放的线程调用。 * * 默认实现将抛出 UnsupportedOperationException。 * * @param arg release 参数。该值总是传递给 release 方法的那个值,或者是因某个条件等待而保存在条目上的当前状态值。该值是不间断的,并且可以表示任何内容。 * @return 如果此对象现在处于完全释放状态,从而使等待的线程都可以试图获得此对象,则返回 true;否则返回 false。 * @throws IllegalMonitorStateException 如果正在进行的释放操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。 * @throws UnsupportedOperationException 如果不支持独占模式 */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * 试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。 * * 此方法总是由执行 acquire 线程来调用。 * 如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。 * * 默认实现将抛出 UnsupportedOperationException。 * * @param arg acquire 参数。该值总是传递给 acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。 * @return 在失败时返回负值;如果共享模式下的获取成功但其后续共享模式下的获取不能成功,则返回 0;如果共享模式下的获取成功并且其后续共享模式下的获取可能够成功,则返回正值,在这种情况下,后续等待线程必须检查可用性。(对三种返回值的支持使得此方法可以在只是有时候以独占方式获取对象的上下文中使用。)在成功的时候,此对象已被获取。 * @throws IllegalMonitorStateException 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。 * @throws UnsupportedOperationException 如果不支持共享模式 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** * 试图设置状态来反映共享模式下的一个释放。 * * 此方法总是由正在执行释放的线程调用。 * * 默认实现将抛出 UnsupportedOperationException。 * * @param arg release 参数。该值总是传递给 release 方法的那个值,或者是因某个条件等待而保存在条目上的当前状态值。该值是不间断的,并且可以表示任何内容。 * @return 如果此对象现在处于完全释放状态,从而使正在等待的线程都可以试图获得此对象,则返回 true;否则返回 false * @throws IllegalMonitorStateException 如果正在进行的释放操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行 * @throws UnsupportedOperationException 如果不支持共享模式 */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true。此方法是在每次调用非等待 AbstractQueuedSynchronizer.ConditionObject 方法时调用的。(等待方法则调用 release(int)。) * * 默认实现将抛出 UnsupportedOperationException。此方法只是 AbstractQueuedSynchronizer.ConditionObject 方法内进行内部调用,因此,如果不使用条件,则不需要定义它。 * * @return 如果同步是以独占方式进行的,则返回true;其他情况则返回 false * @throws UnsupportedOperationException 如果不支持这些条件 */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } // ... ... }