其实AQS并不难

    技术2022-07-10  197

    不啰嗦,直接上干货

    文章目录

    上锁解锁总结条件队列 newConditionCLH队列的数据结构扩展 interrupted

    上锁

    ReentrantLock reentrantLock = new ReentrantLock(true); 或者 ReentrantLock reentrantLock = new ReentrantLock(); 看构造函数: //无参的构造函数,默认为非公平锁 public ReentrantLock() { sync = new NonfairSync(); } //通过你的传参,创建公平锁或者非公平锁 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

    在这里以非公平锁为例,因为公平锁用到的方法在非公平锁中都有。 先从上锁开始,万事开头难,先把上锁搞定,后面就顺理成章了:

    reentrantLock.lock();

    lock方法在抽象类abstract static class Sync extends AbstractQueuedSynchronizer中定义的抽象方法,所以具体实现为为子类,跟踪后,是在ReentrantLock$NonfairSync内部类中实现。

    ReentrantLock$NonfairSync{ final void lock() { //通过cas算法获取锁,获取到锁则设置当前线程为当前拥有独占访问权限的线程, //这就是非公平锁体现的地方,上了就竞争锁,没抢到就被加入到链表队列中,只要进入到链表队列中就只能是公平获取了 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //否则竞争锁 acquire(1); } } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

    tryAcquire在父类中定义了,但是是在ReentrantLock$NonfairSync内部类中被重写,所以调用的是内部类中的tryAcquire方法。好戏从这才开始

    ReentrantLock$NonfairSync{ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //state表示当前是否被上锁,0:未上锁,1:上锁 int c = getState(); if (c == 0) { //若为0,则再次尝试获取锁,并设置独占锁线程为当前线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果getState!=0,则表示当前被上锁,则判断此时独占锁是否为当 //前线程,若相等,则次数加1,如同sync锁的monitor+1。 //(重入锁的体现) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //都不是,则返回false,表示未抢到锁 return false; }

    通过源码知道了tryAcquire只是通过cas方法区获取锁,如果获取不到锁则返回false。再看上面的代码(我写到这吧),

    public final void acquire(int arg) { //tryAcquire(arg)获取不到锁,则返回false,!false=true,则开始 //进行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法, //首先进入addWaiter方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //创建一个属性值为当前线程的【独占模式】的节点 /** 1. Node.EXCLUSIVE独占锁,EXCLUSIVE=null,也就是说同时也指定了下一个等待节点为null 2. Node.SHARED共享锁,创建读锁时使用,平常一般都是独占锁,与重入锁是两个概念 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //获取未新增节点前的尾部节点,并且将其作为即将要插入节点的前一节点 Node pred = tail; if (pred != null) { node.prev = pred; //利用cas算法进行插入,如果插入失败则调用下面的enq方法 //compareAndSetTail也是只比较并交换,自身无自旋 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果pred为null,则表示链表队列为空,则执行enq方法新建链表队列 enq(node); return node; } //其实这个方法与addWaiter方法没啥区别,就是多了一个两个判断, //并且添加了for(;;),进行自旋 private Node enq(final Node node) { for (;;) { //获取尾部节点 Node t = tail; //如果尾部节点为空,则设置【新建】线程为空的头部节点,且为尾部节点(循环链表),for循环就会执行else操作 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //此时尾部节点已不为空,重新设置尾部节点为当前要插入节点,并插入链表队列中 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

    看了上面的源码,得知addWaiter这个方法,就是将当前节点设置为尾部节点(为啥不设置为头部节点,反而将头部节点thread设置为空??)然后返回。

    //node=通过上面的分析,node节点已创建,为尾部节点 //arg=1 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //标记当前线程是否需要中断 boolean interrupted = false; //死循环,表示自旋 //上面介绍的tryAcquire使用了cas:compareAndSwapInt算法, //cas:compareAndSwapInt这个算法只是进行比较并替换,本身并没有自旋,但是unsafe类中提供的getAndAddInt、getAndAddLong等api,它们本身才带有自旋,compareAndSwapInt这个只是交换 for (;;) { //获取该节点上一节点 final Node p = node.predecessor(); //如果p是head节点,就再次尝试获取锁,这里再次尝试获取 //锁的原因是如果线程在前面没有获取锁,但是没有立即park,而是入队列, //所以就想着万一在入队的过程中释放了锁呢,所以在这里再获取一次锁 //还有另一层意思就是,非公平锁时,一上来就会获取锁,如果锁被新来的线程获取到了呢,所以这里是尝试获取锁,而不是直接获取锁 if (p == head && tryAcquire(arg)) { //前置节点为头部节点,并且获取到锁,则说明头部节点已经执行完毕,并释放锁。 //setHead将当前节点设置为头部节点,并将thread属性置位null,如同上面的enq新建的头部节点格式。(当前线程已经被setExclusiveOwnerThread里面了) setHead(node); //既然获取到了锁,那么也就说明p已经执行完了,那么就要从链表中删除,所以p.next=null p.next = null; // help GC //取消获取锁的标记为false failed = false; //TODO 没搞懂 return interrupted; } //如果上一步失败,则进行此操作 //shouldParkAfterFailedAcquire从方法名上我们可以大概猜出这是判断是否要阻塞当前线程的,这是一个【核心】,看下面的说明 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

    shouldParkAfterFailedAcquire方法是核心方法 Node.SIGNAL = -1,表示当前线程正在阻塞。而这个方法的逻辑是将前置节点的waitStatus置为-1,因为当前节点自己不会主动设置为-1,而是依赖下一个节点,让下一个节点将当前节点设置为-1,为什么要这么做呢,需要结合parkAndCheckInterrupt方法一起分析,看逻辑:

    //pred为当前节点上一节点,node为当前节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前置节点的ws,因为每次新建的节点waitStatus都为0,所以 //执行此方法时,第一次只会执行最后的else操作,然后返回 //false,又因为上游是for(;;)循环,所以当再次调用的时候,执行if操作,返回true,然后执行parkAndCheckInterrupt方法, int ws = pred.waitStatus; if (ws == Node.SIGNAL)//SIGNAL -1 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //状态为CANCELLED /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //则一直往链表队列头部回溯直到找到一个状态不为CANCELLED的结点,将当前节点node挂在这个结点的后面。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //状态为初始化状态(ReentrentLock语境下) /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //修改前置节点的waitStatus从0改为Node.SIGNAL -1。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //shouldParkAfterFailedAcquire()返回为true时,则说明前置节点ws=-1,然后调用此方法 private final boolean parkAndCheckInterrupt() { //直接阻塞当前线程,直到另一个线程调用unpark(A)时被唤醒 LockSupport.park(this); //返回此线程线程中断标识,false return Thread.interrupted(); }

    当前节点阻塞,为啥不自己修改ws为-1呢,为啥要依赖下一个节点进行修改呢,有两个原因:

    如果在阻塞前设置了当前节点的ws=-1,但是因为线程调度的关系,还没来得及执行park,这岂不是乱套了。如果在阻塞后设置为ws=-1,那更不行,阻塞后设置,说明已经不是阻塞状态了,再设置状态为阻塞状态,更不合适了

    正是因为以上两个原因,所以才添加了以上shouldParkAfterFailedAcquire方法,当前节点设置前置节点为-1阻塞状态。 至此,获取锁的逻辑到此为止,只能等待被另一个线程调用unpark(threadId)进行唤醒,所以parkAndCheckInterrupt方法,只会在唤醒后才会被执行完,也就是说上锁的过程已经完了。

    解锁

    结算就容易点了,下面代码是整个调用链,

    public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

    代码就是这样,关键最后一行LockSupport.unpark(s.thread);唤醒下一个节点,那么也就是从parkAndCheckInterrupt方法开始执行呗,进行下一次循环获取锁,未获取到锁则继续进行阻塞。

    总结

    AQS的队列,头部节点的thread永远是空的,即使是初始化的时候,因为队列当中的head永远是持有锁的那个node,队列当中的除了head之外的所有的node都在park,当释放锁之后unpark(唤醒)下一个node(有时候也不会是第二个,比如第二个被cancel之后,至于为什么会被cancel,不在我们讨论范围之内,cancel的条件很苛刻,基本不会发生),node被唤醒,node被设置为head,在sethead方法里面会把node的Thread设置为null,为什么需要设置null?其实原因很简单,现在t2已经拿到锁了,node就不要排队了,那么node对Thread的引用就没有意义了。所以队列的head里面的Thread永远为null。

    条件队列 newCondition

    条件队列,大概原理如下:

    Condition testConditon = lock.newCondition(); //如果链表队列不为空,则初始化一个链表队列, //并将当前线程创建成一个节点,追加队尾 testConditon.await(); //将等待队列中的节点,循环取出,添加到同步队列中。 testConditon.signalAll(); await的大概原理就是,await被调用时,则将当前线程进行park阻塞加入到链表对列中,如果链表队列为空,以当前线程创建的节点为firstWaiter的循环链表。signalAll的大概原理就是,将await创建的等待队列,进行循环,将每个节点添加到同步队列中。

    也就是说一个创建链表队列,一个将队列中的节点取出。程序中 定义多个newCondition,逻辑之间相互调用,效果如同synchronize锁的wait/notifyAll方法,但不同的是,wait/notifyAll 只能阻塞/唤醒一个线程,而reentrantlock的条件对列,可以为多个线程

    AQS state head tail 内部类 Node{ waitStatus(node节点状态) mode(node模式,独占、共享) } 嵌套类ConditionObject{ //条件队列使用 } 等其他属性。 而形成AQS的关键点就是cas、park、队列

    CLH队列的数据结构

    扩展 interrupted

    interrupted只是表示这个线程是否中断的意思,只是表示的意思,也就是说,程序员可以根据interrupted值来判断自己的逻辑是否需要被中断,比如:

    @Test public void testIntreupted(){ System.out.println(Thread.currentThread().isInterrupted()); Thread.currentThread().interrupt(); System.out.println(Thread.currentThread().isInterrupted()); if(Thread.currentThread().isInterrupted()){ //do anything } } output: false true

    也就是说interrupt并不是暂停线程,而只是表示你应该暂停了,给程序一个温柔的反应,比方说,你某个逻辑,不能立即暂停,程序员可以通过这个方法来决定你的逻辑是不是应该暂停。

    Processed: 0.029, SQL: 9