最近看了看 java.util.concurrent 包下的AbstractQueuedSynchronizer,先说一下我对于它的理解,然后再逐步去分析它的每一个方法具体的含义。AQS,它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类(通常是sync)作为同步组件的内部类。
AQS是JDK1.5之后出现的,由Doug Lea开发设计并实现,本文源码基于JDK1.8版本
AQS基本框架如下图所示:
AQS维护了一个volatile语义(支持多线程下的可见性)的共享资源变量state和一个FIFO线程等待队列(多线程竞争state被阻塞时会进入此队列)。
首先介绍一下AQS的内部类,阻塞队列的节点对象Node:
static final class Node { // 表示节点在共享模式下等待的标记 static final Node SHARED = new Node(); // 表示节点在独占模式下等待的标记 static final Node EXCLUSIVE = null; // 表示等待线程已取消的 static final int CANCELLED = 1; // 表示需要唤醒后续线程 static final int SIGNAL = -1; // 表示线程正在等待触发条件(condition) static final int CONDITION = -2; // 表示下一个acquireShared应无条件传播 static final int PROPAGATE = -3; /** * SIGNAL: 当前节点释放state或者取消后,将通知后续节点竞争state。表示当前节点的后继节点需要被唤醒 * CANCELLED: 线程因timeout和interrupt而放弃竞争state * CONDITION: 表示当前节点处于条件队列中,它将不能用作同步队列节点,直到其waitStatus被重置为0 * PROPAGATE: 表征下一个acquireShared应无条件传播 * 0: 都不是上面的几种状态,我的理解0是一个临时状态,当node初始化的时候是0、当head节点释放锁 * 的时候会把waitStatus改为0、当condition的等待队列中的节点向同步队列转移的时候会改为0 */ volatile int waitStatus; // 前继节点 volatile Node prev; // 后继节点 volatile Node next; // 持有的线程 volatile Thread thread; /** * 链接下一个等待条件触发的节点 * 这里要特殊说明一下,nextWaiter在Condition条件阻塞时存储的是下一个等待节点,在非Condition条件下, * 存储的是SHARED和EXCLUSIVE标记,主要是为了在唤醒节点的时候区分是独占模式还是共享模式 */ Node nextWaiter; // 返回节点是否处于共享模式下 final boolean isShared() { return nextWaiter == SHARED; } // 返回前继节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // Shared模式下的Node构造函数 Node() { } // 用于addWaiter,mode:(SHARED或EXCLUSIVE) Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 用于Condition Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }下面介绍一下AQS内部定义的几个成员属性:
1、同步队列的head节点:
/** * 等待队列的头节点,延迟初始化(在向队列里添加新节点的时候会判断队列是否为空, * 如果队列是空的,会通过compareAndSetHead(new Node())方法初始化head。除初始化外,只能通过setHead方法进行修改。 * 注意:如果head存在,则保证其waitStatus不是CANCELED。 */ private transient volatile Node head;2、同步队列的tail节点:
/** * 等待队列的尾部节点,延迟初始化(在队列是空的时候,会通过compareAndSetHead方法设置head节点, * 并把tail节点指向head节点,此时head节点==tail节点==new Node())。除初始化之外,仅通过方法enq修改以添加新的等待节点。 */ private transient volatile Node tail;3、共享变量state:
/** * 共享资源变量state,多个线程通过获取它的值并通过CAS操作修改它的值,修改成功的意味着获取锁成功 */ private volatile int state; /** * 返回state当前值 * 这个操作具有内存读的可见性语义 */ protected final int getState() { return state; } /** * 设置state新的值 * 这个操作具有内存写的可见性语义 */ protected final void setState(int newState) { state = newState; } /** * 原子性设置新的值,如果当前内存中的值和期望的值一样,则通过CAS操作将其改为新的值 * 这个操作具有内存读/写可见性语义 * * @param expect 期望的值 * @param update 需要新设置的值 * @return 如果修改成功返回true,如果修改失败返回false */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }AQS里面的方法有自己的命名逻辑,稍微介绍一下吧:
需要子类去实现的方法:
1、tryAcquire(int arg) 独占模式尝试获取锁
2、tryRelease(int arg) 独占模式尝试释放锁
3、tryAcquireShared(int arg) 共享模式尝试获取锁
4、tryReleaseShared(int arg) 共享模式尝试释放锁
开放给子类可以直接调用的方法:
独占模式:
1、acquire(int arg) 独占模式下获取锁
2、acquireInterruptibly(int arg) 独占模式下获取锁,是可以被中断的
3、tryAcquireNanos(int arg, long nanosTimeout) 独占模式下获取锁,可以被中断,并且可以设置超时时间,个人感觉这个方法命名为acquireNanos更合适,这样大家多统一啊,哈哈
4、release(int arg) 独占模式下释放锁
共享模式:
1、acquireShared(int arg) 共享模式下释放锁
2、acquireSharedInterruptibly(int arg) 共享模式下获取锁,是可以被中断的
3、tryAcquireSharedNanos(int arg, long nanosTimeout) 共享模式下获取锁,可以被中断,并且可以设置超时时间,同样我感觉这个方法命名为acquireSharedNanos更合适
4、releaseShared(int arg) 共享模式下释放锁
AQS内部使用的一些方法:
独占模式:
1、acquireQueued(final Node node, int arg) 用于已经在阻塞队列中的节点尝试获取锁,方法内部是重复 阻塞—>唤醒—>阻塞...直到获取锁成功
2、doAcquireInterruptibly(int arg) 和acquireQueued方法类似,只不过是可以中断的,线程在获取锁的过程中如果被其他线程标记为interrupt 就抛出中断异常。
3、doAcquireNanos(int arg, long nanosTimeout) 在doAcquireInterruptibly的方法基础上,支持设置等待时长,超时返回false
共享模式:
1、doAcquireShared(int arg) 共享模式下获取锁,不会中断
2、doAcquireSharedInterruptibly(int arg) 共享模式下获取锁,如果在获取锁阻塞期间被标记为中断的,则抛出异常
3、doAcquireSharedNanos(int arg, long nanosTimeout) 与 doAcquireSharedInterruptibly方法类似,只不过支持设置等待时间,超时返回false
接下来开始介绍一些基本的方法:
1、enq(final Node node) 方法:
/** * 将节点插入队列,必要的时候要进行初始化 * @param node 要插入的节点 * @return 返回节点的前置节点 */ private Node enq(final Node node) { // 自旋添加,直到成功 for (;;) { // 获取原tail节点引用 Node t = tail; //判断原tail节点是否为null,如果是null说明原来阻塞队列是空的 if (t == null) { // 必须进行初始化 // 通过CAS原子操作设置新的头节点,设置成功之后将跳出这次循环,下次进入的时候,t != null,将走else里面的逻辑 if (compareAndSetHead(new Node())) // 并将tail节点指向新创建的head节点 tail = head; } else { // 当tail节点不是null的时候,将新添加的节点的前置节点指向原tail节点 node.prev = t; /** * 通过CAS原子操作,将新添加的节点设置成新的tail,这里有一个隐藏的信息,因为当前只有设置新的tail的操作是原子性的, * 只能保证 node.prev = t是没问题的,但是在设置t.next = node的时候由于不是原子操作, * 有可能出现还没来得及设置,就被其他线程唤醒了,因为可能持有锁的线程已经释放了资源,并把刚刚加进去的线程唤醒了, * 这个时候t.next还是null,在unparkSuccessor方法里会有相关逻辑的判断,等讲解unparkSuccessor方法的时候可以留意一下 */ if (compareAndSetTail(t, node)) { // 将原来的tail的后置节点指向新添加的节点 t.next = node; return t; } } } }2、addWaiter(Node mode) 方法:
/** * 把当前线程,根据给定的mode(独占还是共享)包装成一个新的阻塞节点,并添加到阻塞队列中去 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 在调用enq方法之前,做一次快速尝试,enq循环体中的方法逻辑一样 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 这步操作不是原子操作 同样存在pred节点被唤醒的时候 pred.next == null的可能性 pred.next = node; return node; } } // 如果快速尝试失败,则执行enq方法,自旋将自己加到队列中去,直到添加成功为止 enq(node); return node; }快速尝试插入和enq插入都依赖于CAS操作,其实现依赖于unsafe类,具体代码如下:
/** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }unsafe中的CAS操作均是native方法,由计算机CPU的cmpxchg指令来保证其原子性。
3、setHead(Node node) 方法
/** * 将传入的节点设置为阻塞队列的head节点,为出队做准备,只在获取锁的方法中调用, * 并将用不到的值设置成null方便GC回收 * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }4、unparkSuccessor(Node node) 方法:
/** * 如果当前节点存在继任节点(注意这里不一定是后继节点),则唤醒继任节点 * @param node the node */ private void unparkSuccessor(Node node) { /* * 如果waitStatus是负数(很可能是SIGNAL),则尝试通过CAS操作修改waitStatus为0,意思是当前节点 * 后面的节点马上要被唤醒了,不需要SIGNAL指示下一个节点应该被唤醒了 * 如果修改失败或者被其他等待的线程修改了也没关系 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 很可能head的next节点就是我们需要唤醒的节点,如果s != null则立即唤醒 */ Node s = node.next; // 如果head.next == null 或者 waitStatus == CANCELED,则从tail向前查找,直到找到一个未被取消的节点 if (s == null || s.waitStatus > 0) { // 两种情况会走到这里,1.s == null 2.s != null && s.waitStatus == CANCELED // 此操作将s节点引用擦除,因为如果不擦除的话,如果是上面第2种情况,下面的for循环如果没有找到一个合适的节点赋值给s, // 那么当前s != null,会执行LockSupport.unpark操作,唤醒一个已经取消的线程 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 此方法会调用unsafe类的unpark方法,属于JVM操作,这里就不赘述了 LockSupport.unpark(s.thread); }5、doReleaseShared() 方法:
/** * 共享模式下的释放锁操作,唤醒后继节点并确保能传播下去 * 这里特殊说明一下,只有在setHeadAndPropagate方法和releaseShared方法时会调用当前方法, * 即在前一个节点获取锁之后将自己设置为头的时刻和持有锁的节点释放锁的时刻会调用 */ private void doReleaseShared() { // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE for (; ; ) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0 // 防止 unparkSuccessor 被多次执行 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 如果已经是 0 了,改为 -3,用来解决传播性 // 共享模式下节点获得锁之后,如果满足传播条件即propagate>=0,需要向下传递唤醒后面的节点, // PROPAGATE状态是为了解决早期版本的bug,如何解决的 我会在setHeadAndPropagate方法中介绍 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 这里说明一下:如果h != head 说明head已经变成了其他节点,也就是说h的后继节点已经被其他线程唤醒了, // 需要重新获取新的head继续唤醒新head的后继节点 if (h == head) // loop if head changed break; } }6、setHeadAndPropagate(Node node, int propagate) 方法:
/** * 设置队列头,并检查后继者是否可能在共享模式下等待 * 如果propagate > 0 或者 head != null && head.waitStatus == SIGNAL/PROPAGATE,则向下传递。 * @param node the node 即将被设置为新head的节点 * @param propagate tryAcquireShared方法的返回值,>=0 表示向下传递,<0 表示没有剩余可用资源 */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 记录旧的head方便下面检测 setHead(node); /* * 下面的检查可能会造成不必要的唤醒,但也只是在很多线程获取锁/释放锁的时候(也就是说两个线程交替的时候基本上是没问题的), * 即使这样也没有关系,因为大多数线程都是需要给他们信号的,无非就是继续去竞争锁,抢不到还是会重新回到等待队列的 * * 这里特殊说明一下: * PROPAGATE的节点状态在这里就要发挥作用了 * 假设存在某次循环中队列里排队的结点情况为 head(-1) -> t1(-1) -> t2(-1) * 假设存在将要锁释放的 T3 和 T4,释放顺序为先 T3 后 T4 * * 如果没有 PROPAGATE 这个状态,那么情况是: * T3 调用 releaseShared(1),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0 * T1 由于 T3 释放锁被唤醒,调用 tryAcquireShared,假设返回值为0(获取锁成功,但没有剩余资源量) * T4 调用 releaseShared(1),此时 head.waitStatus 为0(此时读到的 head 和 t3读到的为同一个head,因为T1此时还没有setHead),不满足条件,因此不调用 unparkSuccessor(head) * T1 获取锁成功,调用 setHeadAndPropagate 时,因为不满足 propagate > 0(T1调用tryAcquireShared的返回值也就是 propagate(剩余资源量)== 0),从而不会唤醒后继结点,T2 线程得不到唤醒 * * 如果有了 PROPAGATE 这个状态,那么情况是: * T3 调用 releaseShared(),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0 * T1 由于 T3 释放锁被唤醒,调用 tryAcquireShared,假设返回值为0(获取锁成功,但没有剩余资源量) * T4 调用 releaseShared(),此时 head.waitStatus 为0(此时读到的 head 和 t3读到的为同一个head),调用 doReleaseShared() 将等待状态置为 PROPAGATE(-3) * T1 获取锁成功,调用 setHeadAndPropagate 时,读到 h.waitStatus < 0,从而调用 doReleaseShared() 唤醒 T2 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取传入节点的后置节点 Node s = node.next; // 如果 s==null 或者 s是共享模式等待节点 则调用 doReleaseShared 尝试继续唤醒阻塞队列中的等待节点 // s == null 我认为有两种可能 第一种情况是:传入的node已经是最后一个节点,第二种情况是:有新的节点设置head成功,当前节点已经是旧的head,它的next已经被设置为null // 总结一下:除非node.next 是EXCLUSIVE类型的节点,不然就应该将唤醒传递下去,具体能不能唤醒后面的节点,doReleaseShared 方法会自己做判断 if (s == null || s.isShared()) doReleaseShared(); } }7、cancelAcquire(Node node) 方法:
/** * 取消正在尝试获取锁的节点 * @param node the node */ private void cancelAcquire(Node node) { // 如果节点为null直接返回 if (node == null) return; node.thread = null; // 跳过那些已经被CANCELLED的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 本地保存 当前节点的前置节点的 next节点,用于后面的判断 Node predNext = pred.next; // 将node的waitStatus置为CANCELLED node.waitStatus = Node.CANCELLED; // 如果当前节点是tail节点,则把当前节点从队列中删掉(即让tail指向我前面的第一个未被取消的节点) if (node == tail && compareAndSetTail(node, pred)) { // 现在pred已经是tail节点了,把他的next置为null,这里会用到之前保存的predNext compareAndSetNext(pred, predNext, null); } else { // 如果不是tail节点,说明我的后继节点可能需要唤醒,所以需要把当前节点的pred节点的next指向当前节点的next 即:当前节点.pred.next == 当前节点.next int ws; // 这个if判断条件有点多,我来拆解一下 // 条件一:pred节点不是head节点,因为如果是head节点,那当前节点的next节点马上就是第二个节点了,直接唤醒效率更高 // 条件二:pred的waitStatus是SIGNAL,如果不是SIGNAL并且不是CANCELED,那么将它改为SIGNAL // 条件三:pred的thread不是null,因为如果是null的情况,pred即使被唤醒也不会有机会释放锁,后继节点就不能唤醒了 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 上面三个条件都满足的时候 Node next = node.next; // 利用CAS操作将 pred原来的 predNext 设置当前节点的 next if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果三个条件有一个不满足,则直接唤醒当前节点的后置节点 unparkSuccessor(node); } node.next = node; // help GC } }8、shouldParkAfterFailedAcquire(Node pred, Node node) 方法:
/** * 为没有抢到锁的节点 检查并更新他的前置节点的状态 * @param pred 掌握着当前线程状态的前置节点 * @param node 当前节点 * @return true 如果当前线程可以安心的去阻塞(即:park)则返回true */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 前置节点状态已经是 SIGNAL,unparkSuccessor(前置节点)的时候可以唤醒当前节点,就可以安心的去park了 */ return true; if (ws > 0) { /* * 如果前置节点被取消了,则向前遍历查找未被取消的节点,把当前节点挂到 新的pred.next上面, * 然后返回false,跳出方法,通过外面的循环会重新进入当前方法,这时候 ws <= 0,走其他分支 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus 一定是 0 或者 PROPAGATE * 首先说是 0 为什么不能马上park,0的话有两种情况 * 第一种情况:pred节点是之前的tail节点,waitStatus是初始化的0,需要将status设置为SIGNAL,以便pred节点能根据SIGNAL状态唤醒当前节点 * 第二种情况:pred节点是head节点,当前节点获取锁失败,进入当前方法,但是在进入方法之后,head的waitStatus被其他线程修改为0,此时不应该park,退出当前方法,再次试图抢锁,很可能成功 * 再说说PROPAGATE的情况,PROPAGATE只会出现在head节点,所以当前节点此时试图抢锁,很可能成功,将status设置为SIGNAL并返回false,退出方法再抢一次,如果还未成功再次进入方法,返回true * 总结一下: * 对于非队尾节点,如果它的状态为0或PROPAGATE,那么它肯定是head。 * 等待队列中有多个节点时,如果head的状态为0或PROPAGATE,说明head处于一种中间状态,且此时有线程刚才释放锁了。而对于acquire thread来说,如果检测到这种状态,说明再次acquire是极有可能获得到锁的 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }9、selfInterrupt() 方法:
/** * 调用线程的interrupt()方法,将线程标记为中断的 */ static void selfInterrupt() { Thread.currentThread().interrupt(); }10、parkAndCheckInterrupt() 方法
/** * 将当前线程park,JVM会将当前线程喊停 * @return true 如果是标记为中断的则返回true */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // Thread.interrupted() 方法会清除线程的 interrupt标记,并返回清除前的值,即调用完该方法后,interrupt == false return Thread.interrupted(); }接下来说一下获取锁和释放锁的一些方式,包括独占式的和共享式的:
1、acquireQueued(final Node node, int arg) 方法,用于独占模式下已经在队列中的线程获取锁,并且不会中断,同时也用在condition相关的await方法中。
/** * 用于独占模式下已经在队列中的线程获取锁,并且不支持中断,同时也用在condition相关的await方法中 * @param node 当前获取锁失败的 * @param arg 简单理解为获取锁的次数 * @return true 如果在阻塞的时候被中断 则返回true */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋抢锁,抢不到就继续阻塞 for (;;) { // 被唤醒的线程每次会检查自己是不是 head的后继节点,如果是就去抢锁,其实就是去改state的值,能改成功说明抢到了 // 第一次进来的线程在park之前 也会判断一下,如果是head的后继节点 直接抢一下,这样还可以提升效率, // 因为head的后置节点很可能会抢到,可能比head自己unpark要快 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 成功抢到锁,将自己设置为head节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // shouldParkAfterFailedAcquire 会检测前置节点的状态是否符合,上面有介绍 // 如果条件符合则 调用 parkAndCheckInterrupt 将线程挂起,上面有介绍 // 注意线程在被唤醒的时候,是从parkAndCheckInterrupt 里面唤醒的,会再次检测中断状态,如果是true, // 条件满足会设置interrupted = true,之后循环拿到锁的时候,在外面会调用selfInterrupt()方法, // 中断当前线程,但是在当前方法内是不中断的,只是把中断状态传递出去 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 如果发生异常,取消当前节点,cancelAcquire方法上面有介绍 cancelAcquire(node); } }2、doAcquireInterruptibly(int arg) 方法,用于独占模式下获取锁,并且可以被中断(如果中断会抛出InterruptedException)
/** * 独占模式下获取锁的方法,并且不支持中断(如果中断会抛出InterruptedException) * @param arg 想要获取资源的数量 */ private void doAcquireInterruptibly(int arg) throws InterruptedException { // 将当前抢锁的节点封装成一个 独占型的 node节点,并添加到阻塞队列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 自旋方式抢锁,抢成功将自己设置为head,退出方法,抢失败继续阻塞 for (;;) { // 下面逻辑和acquireQueued方法类似 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 这里和acquireQueued方法的区别就是: // 在唤醒的时候,发现自己被标记为中断了,则直接抛出InterruptedException throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }3、doAcquireNanos(int arg, long nanosTimeout) 方法,用于独占模式下获取锁,支持设置获取的时间,超过时间返回false,也是可以中断的。
/** * 用于独占模式下获取锁,支持设置获取的时间,超过时间返回false,也是支持中断的 * @param arg 想要获取资源的个数 * @param nanosTimeout 等待的时间 * @return 如果获取锁成功返回 true,如果超时返回false,如果中断抛出异常 */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 获取等待的最终时间点 单位纳秒 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { // 同样是自旋抢锁 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 获取锁成功,将自己设置为head 并返回true setHead(node); p.next = null; // help GC failed = false; return true; } // 这次没抢到,算一下还剩多少时间 nanosTimeout = deadline - System.nanoTime(); // 如果已经超时了,返回false if (nanosTimeout <= 0L) return false; // 如果满足park的条件,并且剩余时间大于spinForTimeoutThreshold 则park // 这里说明一下 spinForTimeoutThreshold是内部变量,定义为1000,意思是剩余的时间已经不足1000纳秒了, // 那就不要park了,再去抢,因为这样效率要比先park,然后等待线程自己唤醒要更快 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) // 在这里会判断线程是否被标记为interrupt,如果是则抛出异常 // 注意Thread.interrupted()方法会重置 interrupt == false throw new InterruptedException(); } } finally { if (failed) // 如果没有成功获取锁,并抛出异常则取消当前节点 cancelAcquire(node); } }4、doAcquireShared(int arg) 方法,用在共享模式下获取锁,并且是不会中断的。
/** * 用在共享模式下获取锁,并且是不会中断的 * @param arg 要获取的资源数 */ private void doAcquireShared(int arg) { // 将当前节点组装成一个共享式的节点,并添加到阻塞队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 自旋方式获取锁 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 获取锁成功,将自己设置为head,满足传播条件的话,会继续唤醒后置节点, // 具体逻辑请看setHeadAndPropagate方法,我上面有介绍 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) // 如果线程被标记为中断了,则调用selfInterrupt()方法,重新将线程标记为中断 // 其实和acquireQueued类似,当前方法只保证线程拿到锁继续执行,没拿到锁阻塞,具体中断的逻辑由调用者去处理 selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 异常的时候,取消当前节点,具体逻辑请看上面cancelAcquire的介绍 cancelAcquire(node); } }5、doAcquireSharedInterruptibly(int arg) 方法,用于共享模式下获取锁,可以被中断
/** * 用于共享模式下获取锁,可以被中断 */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 逻辑和doAcquireShared方法类似 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 和doAcquireShared方法的区别:如果线程被标记为中断的,则抛出InterruptedException if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }6、doAcquireSharedNanos(int arg, long nanosTimeout) 方法,用于共享模式下获取锁,并支持设置等待时间,超时返回false,获取锁成功返回true。
/** * 用于共享模式下获取锁,并支持设置等待时间,超时返回false,获取锁成功返回true * @param arg 获取资源数 * @param nanosTimeout 最大超时时间 */ private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 保存等待截止的时间点 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 自旋获取锁 for (;;) { final Node p = node.predecessor(); if (p == head) { // 如果当前节点是head的后置节点,则尝试抢锁 int r = tryAcquireShared(arg); if (r >= 0) { // 抢锁成功,设置当前节点为head节点,如果满足条件继续唤醒后置节点,具体逻辑请看setHeadAndPropagate方法 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } // 抢锁失败,获取当前剩余的等待时间 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; // 如果满足park的条件,并且剩余时间大于spinForTimeoutThreshold 则park // 这里说明一下 spinForTimeoutThreshold是内部变量,定义为1000,意思是剩余的时间已经不足1000纳秒了, // 那就不要park了,再去抢,因为这样效率要比先park,然后等待线程自己唤醒要更快 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) // 判断如果线程被其他线程标记为中断的,则抛出InterruptedException throw new InterruptedException(); } } finally { if (failed) // 抛出异常,取消当前节点,逻辑请看上面的cancelAcquire方法讲解 cancelAcquire(node); } }下面的方法在AQS里只做了定义,具体的实现交给子类去完成:
/** * 独占模式下尝试获取锁,子类可以根据需求自己实现逻辑,比如公平式的或者不公平式的 * @param arg 尝试获取的资源数 * @throws UnsupportedOperationException 如果独占模式不支持将抛出异常 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 独占模式下尝试释放锁,子类可以根据需求自己实现逻辑 * @param arg 尝试释放的资源数 * @throws UnsupportedOperationException 如果独占模式不支持将抛出异常 */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * 共享模式下尝试获取锁,子类可以根据需求自己实现逻辑,比如公平式的或者不公平式的 * @param arg 尝试获取的资源数 * @throws UnsupportedOperationException 如果共享模式不支持将抛出异常 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** * 共享模式下尝试释放锁,子类可以根据需求自己实现逻辑 * @param arg 尝试释放的资源数 * @throws UnsupportedOperationException 如果共享模式不支持将抛出异常 */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }7、acquire(int arg) 方法,用在独占模式下获取锁,并且不会中断
/** * 用在独占模式下获取锁,并且不会中断 * 如果上来 tryAcquire 方法成功,则直接结束 * 如果 tryAcquire失败,则线程将排队,并可能反复阻塞和解除阻塞,并调用 tryAcquire 直到成功 * 这个方法可以用来实现 lock.lock()方法 * @param arg 想要获取的资源数 */ public final void acquire(int arg) { // 如果调用 tryAcquire成功,即抢到锁,将短路后面的方法 if (!tryAcquire(arg) && // 当tryAcquire失败的时候,线程需要去排队,具体逻辑请看acquireQueued方法详解 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }8、acquireInterruptibly(int arg) 方法,用于独占模式下获取锁,如果线程被标记为中断,则抛出InterruptedException
/** * 用于独占模式下获取锁,如果线程被标记为中断,则抛出InterruptedException * @param arg 想要获取的资源数 * @throws InterruptedException */ public final void acquireInterruptibly(int arg) throws InterruptedException { // 先判断线程的interrupt状态,如果是true则抛出异常,线程终止 if (Thread.interrupted()) throw new InterruptedException(); // 这个逻辑和 acquire 方法一样,只不过 doAcquireInterruptibly在线程中断时会抛出异常 if (!tryAcquire(arg)) // 具体逻辑请看 doAcquireInterruptibly 方法介绍 doAcquireInterruptibly(arg); }9、tryAcquireNanos(int arg, long nanosTimeout) 方法,用在独占模式下获取锁,可以被中断,可以设置等待的时间,超时则返回false
/** * 用在独占模式下获取锁,可以被中断,可以设置等待的时间,超时则返回false * 如果线程没有被标记为中断,调用tryAcquire 方法成功,则直接结束 * 如果 tryAcquire失败,则线程将排队,并可能反复阻塞和解除阻塞,并调用 tryAcquire 直到成功 * 这个方法可以用来实现 lock的tryLock(long, TimeUnit)方法 * @param arg 想要获取的资源数 * @param nanosTimeout 等待时长 * @return 如果获取锁成功 返回true 如果超时返回false * @throws InterruptedException 如果线程被标记为中断则抛出异常 */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 先检查 interrupt 状态 if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }10、release(int arg) 方法,独占模式下释放锁的操作
/** * 独占模式下释放锁的操作 * 这个方法可以用来实现 Lock.unlock()方法 * @param arg 想要释放的资源数 * @return 返回 tryRelease 的返回结果,即释放成功返回true 否则返回false */ public final boolean release(int arg) { if (tryRelease(arg)) { // 释放锁成功,如果head节点不是null并且head节点waitStatus不是0 则唤醒head的后继节点 Node h = head; if (h != null && h.waitStatus != 0) // 这里的判断,如果 h.waitStatus == 0 表示已经被其他线程调用unparkSuccessor了,这里就不用再调了 unparkSuccessor(h); return true; } return false; }11、acquireShared(int arg) 方法,共享模式下获取锁的方式,不会中断
/** * 共享模式下获取锁的方式,不会中断 * 如果上来 tryAcquireShared 方法成功,则直接结束 * 如果 tryAcquireShared 失败,则线程将排队,并可能反复阻塞和解除阻塞,并调用tryAcquireShared 直到成功 * * @param arg 想要获取的资源数 */ public final void acquireShared(int arg) { // 如果调用 tryAcquireShared 返回 >= 0,代表获取锁成功 if (tryAcquireShared(arg) < 0) // 如果获取锁失败 则去排队,知道获取成功,具体逻辑请看 doAcquireShared方法介绍 doAcquireShared(arg); }12、acquireSharedInterruptibly(int arg) 方法,用于共享模式获取锁,如果线程被其他线程标记中断,则抛出 InterruptedException
/** * 用于共享模式获取锁,如果线程被其他线程标记中断,则抛出 InterruptedException * @param arg 想要获取的资源数 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 首先检查是否被中断,如果被标记为中断,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 先调tryAcquireShared尝试抢锁,如果成功则直接结束 if (tryAcquireShared(arg) < 0) // 如果获取锁失败,则去排队循环获取,知道成功为止,具体逻辑请看doAcquireSharedInterruptibly方法介绍 doAcquireSharedInterruptibly(arg); }13、tryAcquireSharedNanos(int arg, long nanosTimeout) 方法,用于共享模式下获取锁,如果线程被其他线程标记中断则抛出中断异常,如果超时返回false,获取锁成功返回true
/** * 用于共享模式下获取锁,如果线程被其他线程标记中断则抛出中断异常,如果超时返回false * @param arg 想要获取的资源数 * @param nanosTimeout 等待时间 * @return 如果获取锁成功返回true 如果超时返回false */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 如果 tryAcquireShared >= 0 说明抢锁成功,短路后面方法,直接返回true // 如果 tryAcquireShared < 0 说明抢锁失败,则调用doAcquireSharedNanos排队获取锁,具体实现逻辑请看doAcquireSharedNanos方法介绍 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }14、releaseShared(int arg) 方法,用于共享模式下释放锁
/** * 用于共享模式下释放锁 * @param arg 想要释放的资源数 * @return 返回 tryReleaseShared 方法的执行结果,释放成功返回true,释放失败返回false */ public final boolean releaseShared(int arg) { // 尝试释放锁,具体逻辑由子类实现 if (tryReleaseShared(arg)) { // 释放成功则调用 doReleaseShared 唤醒后继节点,具体逻辑请看doReleaseShared方法介绍 doReleaseShared(); return true; } return false; }剩下的还有一些ConditionObject 相关的方法,这里就先不做介绍了,等有时间再写一篇专门介绍Condition相关的方法。
如果哪里写的不对,欢迎指正!或者大家有什么补充也可以提出来