bilibili-Java并发学习笔记14 AQS 之 ReentrantLock
基于 java 1.8.0
ReentrantLock 使用案例
Lock lock = new ReentrantLock(); lock.lock(); try { // do something } finally { lock.unlock(); } 公平锁和非公平锁 // 默认非公平锁 Lock lock = new ReentrantLock(); // 公平锁 Lock lock = new ReentrantLock(true); // 非公平锁 Lock lock = new ReentrantLock(false); /** * 底层实现 */ private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * Sync object for fair locks */ static final class FairSync extends Sync {...} /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync {...} /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer {...} 非公平锁实现 lock() 上锁实现 /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { // cas 操作,若 state 是 0,则将其改为 1,并将独占锁的线程标记指向自己 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); // 若互斥锁已被别的线程占有,则 else // AQS 的 acquire(int) 方法 acquire(1); } /** * 以独占模式获取,忽略中断。通过调用至少一次 tryAcquire 在成功时返回来实现。 * 否则线程将排队,可能会反复阻塞和取消阻塞,调用 tryAcquire 直到成功。 * 此方法可用于实现方法 Lock.lock() * * @param arg the acquire argument. 这个值被传递到tryAcquire,但在其他方面是不令人惊讶的,可以表示任何您喜欢的。 */ public final void acquire(int arg) { // tryAcquire 尝试获取锁 // addWaiter 将线程加入等待队列以独占模式 // acquireQueued 查看当前线程是否被中断 if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) selfInterrupt(); } // NonfairSync in ReentrantLock 实现 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } /** * Performs non-fair tryLock. * tryAcquire是在子类中实现的,但这两个类都需要trylock方法的非空尝试。 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // state 为 0 表示当前无线程占有锁 // 通过 CAS 操作去获取锁 if (compareAndSetState(0, acquires)) { // 当前占有锁的线程标记为自己 setExclusiveOwnerThread(current); return true; } } // 如果当前线程已被占有,且占有的线程正是自己 // 重新进入这个锁 // 所以说 ReentrantLock 是可重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 不过 state 值 +1 setState(nextc); return true; } return false; } /** * 为当前线程和给定模式创建并排队节点。 * * @param mode Node 模式 : Node.EXCLUSIVE 独占锁, Node.SHARED 共享锁 * @return the new node */ private Node addWaiter(Node mode) { // 创建队列节点 Node node = new Node(Thread.currentThread(), mode); // 尝试enq的快速路径;失败时备份到完整enq // 将新节点加入队尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * 以独占不间断模式获取队列中已存在的线程。由条件等待方法和获取使用。 * * @param node the node * @param arg the acquire argument * @return 如果在等待时中断返回 true */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } static void selfInterrupt() { Thread.currentThread().interrupt(); } 公平锁实现 lock() 上锁实现 final void lock() { // 请求锁 acquire(1); } public final void acquire(int arg) { // 尝试获取独占锁 // 若获取失败加入等待队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * 公平锁版本的 tryAcquire. * 除非递归调用或没有等待者或是第一个,否则不要授予访问权限。 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 无线程占有锁 if (c == 0) { // hasQueuedPredecessors 判断等待队列中有没有其他等待线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 重入锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 查询是否有任何线程等待获取的时间比当前线程长。 * * 调用此方法相当于(但可能比)更有效: * getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()} * * 请注意,由于中断和超时可能会在任何时候发生取消, * 所以一个真正的返回并不保证其他线程会在当前线程之前获取。 * 同样,由于队列为空,因此在该方法返回false之后,另一个线程也有可能赢得排队竞争。 * * 这种方法被设计用于公平同步器,以避免碰撞。 * 这种同步器的 tryAcquire 方法应该返回 false,如果这个方法返回 true(除非这是可重入的获取), * 那么它的 tryAcquireShared 方法应该返回一个负值。 * 例如,公平、可重入、独占模式同步器的 tryAcquire 方法可能如下所示: * * protected boolean tryAcquire(int arg) { * if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false; * } else { * // try to acquire normally * } * }} * * @return 如果在当前线程之前有一个排队线程,则为true;如果当前线程位于队列的头部或队列为空,则为 false * * @since 1.7 */ public final boolean hasQueuedPredecessors() { // 它的正确性取决于head在tail之前被初始化头。下一个如果当前线程是队列中的第一个线程,则为精确的。 Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 释放锁 public void unlock() { sync.release(1); } /** * 以独占模式释放。如果 tryRelease 返回true,则通过取消阻止一个或多个线程来实现。此方法可用于实现方法 Lock#unlock * * * * @param arg 释放参数。这个值被传递到tryRelease,但在其他方面是不令人惊讶的,可以表示任何您喜欢的。 * @return 从tryRelease返回的值 */ public final boolean release(int arg) { // 若锁完全释放(state为0) if (tryRelease(arg)) { // 唤醒节点的后续节点 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 只能由占有锁的线程释放锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 完全释放锁 free = true; setExclusiveOwnerThread(null); } // 重入锁层次 -1 setState(c); return free; } /** * 唤醒节点的后续节点(如果存在)。 * * @param node the node */ private void unparkSuccessor(Node node) { /* * 如果状态为阴性(即可能需要信号),则尝试清除,以等待信号。如果失败或状态被等待线程更改,则正常。 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * unpark的线程被保存在后续节点中,后者通常只是下一个节点。但如果取消或明显为空,则从尾部向后遍历,以找到实际未取消的后继项。 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }