AQS的核心方法-acquire()解析

    技术2025-03-20  40

    以下是AQS的acquire()方法源码:

    public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

    一步步拆解,先看下tryAcquire()方法,这个方法是由AQS子类来做具体实现的,暂不关注,重点关注AQS的几个模版方法。

    addWaiter()方法

    先看addWaiter()方法,参数是一个Node对象,那么就先看下Node对象所对应的类是怎样的定义: 可以发现Node类是AQS类的一个内部类,有若干的方法和属性,这时候还无法知道这些方法和属性的作用是什么,所以继续跟进addWaiter()方法中

    addWaiter()方法

    先看看addWaiter()方法的源码:

    private Node addWaiter(Node mode) { //构造一个Node对象,参数是当前线程对象以及mode对象,mode表示该节点的共享/排他性,值为null为排他模式,不为null则共享模式 Node node = new Node(Thread.currentThread(), mode); //拿到AQS的尾节点 Node pred = tail; //如果尾节点不为空 if (pred != null) { //先把新加入的节点的前驱节点设置为尾节点,新加入的节点会加入队列的尾部 node.prev = pred; //通过CAS操作把新节点设置为尾节点,传入原来的尾节点pred和新节点node做判断,保证并发安全 if (compareAndSetTail(pred, node)) { //把新节点设置为原来尾节点的后继节点 pred.next = node; //返回新节点,这个节点里封装了当前的线程 return node; } } //尾节点为null,则将新节点加入队列 enq(node); return node; }

    enq()方法

    继续跟进enq()方法,看看它是怎么将节点加入队列的:

    private Node enq(final Node node) { //这是一个死循环,不满足一定的条件就不会跳出循环 for (;;) { //获取AQS尾节点 Node t = tail; //如果为null,其实这是个循环判断,可能下次再做判断时,就有其他线程已经往队列中添加了节点,那么tail尾节点可能就不为空了,就走else逻辑了 if (t == null) { // 必须初始化 //则新建一个Node对象,通过CAS设置成头节点,这个head其实是冗余节点 if (compareAndSetHead(new Node())) //把尾节点设置为head tail = head; } else { //尾节点不为空,则把尾节点设置为新节点的前驱节点 node.prev = t; //做CAS操作,把新节点设置为尾节点 if (compareAndSetTail(t, node)) { //CAS成功后,则把新节点设置为原来尾节点的后继节点 t.next = node; //返回新节点,这个节点里封装了当前的线程 return t; } } } } //这个方法关注一下,可能会豁然开朗,这就是当尾节点尾null时,需要设置头节点,来做个初始化的方法,把头节点从null设置为update对象 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }

    所以enq()方法是很有意思的,它通过死循环的方式,来保证节点的正确添加,可以发现只有当新节点被设置为尾节点时,当前线程才能从enq()方法返回,然后再配合上CAS,是不是有一种很抽象的感觉,节点一个一个的被加到队列中,一个一个的接着被设置为尾节点,并发的操作,串行的感觉。

    所以通过addWaiter()方法,竞争同步状态失败的线程就被成功的加入到对列的尾部了

    acquireQueued()方法

    再接着看看acquireQueued()方法:

    final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取node前继节点 final Node p = node.predecessor(); //如果node的前继节点是头节点,同时当前线程获取同步状态成功 if (p == head && tryAcquire(arg)) { //那么把当前节点设置为头节点,同时把当前节点的前继节点置为null setHead(node); //再把前头节点p的后继节点设置为null,这样前头节点就没有任何引用了,帮助GC,清理前头节点 p.next = null; // help GC //这里设置把标志位failed设为false,说明成功走到了这步逻辑 failed = false; //要注意,无限循环只有这个出口,返回interrupted后,跳出循环,这个返回值就表示了要不要中断 return interrupted; } //当node的前继节点不是头节点或者获取锁失败时,判断是否需要阻塞等待,如果需要等待,那么就调用parkAndCheckInterrupt()方法阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果线程被中断的,那么重新设置中断状态为true,然后返回表示需要中断 interrupted = true; } } finally { //如果出现不正常情况,failed标志位还没被置为false,就会取消 if (failed) cancelAcquire(node); } }

    shouldParkAfterFailedAcquire()方法

    深入进去看看shouldParkAfterFailedAcquire()方法:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前继节点状态 int ws = pred.waitStatus; //如果等于SIGNAL,则直接返回true,表示要阻塞 if (ws == Node.SIGNAL) //要注意只有这个分支会返回true return true; //如果状态大于0,表示前继节点需要做的请求被取消了, if (ws > 0) { //这个分支循环做一件事,把所有的被取消的前继节点移除,直到waitStatus值不再大于0,然后把这个没有被取消的节点和node节点连接起来 do { node.prev = pred = pred.prev; //上述代码转换成如下模式可能会更好理解,其实就是反复取值赋值 (node.prev = pred; pred = pred.prev;) } while (pred.waitStatus > 0); pred.next = node; } else { //通过CAS设置前置节点的状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回false return false; }

    这里需要再回顾acquireQueued()方法,shouldParkAfterFailedAcquire()方法的结果一旦返回false,那么acquireQueued()方法的死循环就不会跳出,还是会继续检查node的前继节点是否是头节点,同时当前线程获取同步状态是否成功,而如果shouldParkAfterFailedAcquire()方法的结果是true,就会调用parkAndCheckInterrupt()方法:

    private final boolean parkAndCheckInterrupt() { //LockSupport.park()实现阻塞等待,等着unpark和interrupt叫醒他 LockSupport.park(this); //检查是否被中断,清除中断状态,并返回中断标志 return Thread.interrupted(); }

    到这步可以发现,acquireQueued()方法的死循环逻辑配合上shouldParkAfterFailedAcquire()方法的去除取消节点和设置SIGNAL状态的操作,整个队列慢慢的会趋向于:

    只要不是头节点,那么其他的节点都是返回true,表示需要中断,这都是shouldParkAfterFailedAcquire()方法的功劳只要不是尾节点,那么其他的节点状态都是SIGNAL,因为只有节点状态是SIGNAL,才会返回true,这也是shouldParkAfterFailedAcquire()方法的功劳

    最后如果acquireQueued()方法中出现了异常,则会调用cancelAcquire()方法来取消节点。

    selfInterrupt()方法

    最后再看看selfInterrupt()方法:

    static void selfInterrupt() { Thread.currentThread().interrupt(); }

    这个线程没能获取到同步状态,同时acquireQueued()方法返回true,那么就会调用selfInterrupt()方法来设置当前线程的中断状态。

    acquire()方法流程图

    总结

    看完源码分析,做到大致熟悉,然后以代码为切入口理解流程图即可。

    Processed: 0.014, SQL: 9