AQS即AbstractQueuedSynchronizer,是一个用于构建锁和同步器的框架。它能降低构建锁和同步器的工作量,还可以避免处理多个位置上发生的竞争问题。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。
AQS支持独占锁(exclusive)和共享锁(share)两种模式。
独占锁:只能被一个线程获取到(Reentrantlock) 共享锁:可以被多个线程同时获取(CountDownLatch,ReadWriteLock). 无论是独占锁还是共享锁,本质上都是对AQS内部的一个变量state的获取。state是一个原子的int变量,用来表示锁状态、资源数等。
用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁 • getState-获取state状态 • setState-设置state状态 • compareAndSetState-乐观锁机制设置锁状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于fifo的等待队列,类似于monitor的entrylist。
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于monitor的waitSet。
AQS内部实现了两个队列,一个同步队列,一个条件队列。 同步队列的作用是:当线程获取资源失败之后,就进入同步队列的尾部保持自旋等待,不断判断自己是否是链表的头节点,如果是头节点,就不断参试获取资源,获取成功后则退出同步队列。 条件队列是为Lock实现的一个基础同步器,并且一个线程可能会有多个条件队列,只有在使用了Condition才会存在条件队列。 同步队列和条件队列都是由一个个Node组成的。AQS内部有一个静态内部类Node。
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; //当前节点由于超时或中断被取消 static final int CANCELLED = 1; //表示当前节点的前节点被阻塞 static final int SIGNAL = -1; //当前节点在等待condition static final int CONDITION = -2; //状态需要向后传播 static final int PROPAGATE = -3; volatile int waitStatus; // 上一个节点 volatile Node prev; // 下一个节点 volatile Node next; // 所属线程 volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }acquire(int arg)首先调用tryAcquire(arg)尝试直接获取资源,如果获取成功,因为与运算的短路性质,就不再执行后面的判断,直接返回。tryAcquire(int arg)的具体实现由子类负责。如果没有直接获取到资源,就将当前线程加入等待队列的尾部,并标记为独占模式,使线程在等待队列中自旋等待获取资源,直到获取资源成功才返回。如果线程在等待的过程中被中断过,就返回true,否则返回false。 如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)执行过程中被中断过,那么if语句的条件就全部成立,就会执行selfInterrupt();方法。因为在等待队列中自旋状态的线程是不会响应中断的,它会把中断记录下来,如果在自旋时发生过中断,就返回true。然后就会执行selfInterrupt()方法,而这个方法就是简单的中断当前线程Thread.currentThread().interrupt();其作用就是补上在自旋时没有响应的中断。
可以看出在整个方法中,最重要的就是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
我们首先看Node addWaiter(Node mode)方法,顾名思义,这个方法的作用就是添加一个等待者,根据之前对AQS中数据结构的分析,可以知道,添加等待者就是将该节点加入等待队列.
private Node addWaiter(Node mode) { //新建与一个当前线程关联的node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail等待队列尾部节点 Node pred = tail; //尾部节点不为空,队列已经初始化 尝试快速入队 ? if (pred != null) { //将新建的node加入到队尾 node.prev = pred; // cas操作替换 将当前队列尾部节点替换为当前node对象 if (compareAndSetTail(pred, node)) { // 当前等待队列的队尾指向新建的node对象 pred.next = node; //快速入队成功后,就直接返回了 return node; } } //快速入队失败,也就是说队列都还每初始化,就使用enq enq(node); return node; } //执行入队 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //如果队列为空,用一个空节点充当队列头 if (compareAndSetHead(new Node())) tail = head;//尾部指针也指向队列头 } else { //队列已经初始化,入队 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t;//打断循环 } } } }构建完等待队列后,尝试获取锁
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//拿到node的上一个节点 //前置节点为head,说明可以尝试获取资源。排队成功后,尝试拿锁 if (p == head && tryAcquire(arg)) { setHead(node);//获取成功,更新head节点 p.next = null; // help GC failed = false; return interrupted; } //尝试拿锁失败后,根据条件进行park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //获取资源失败后,检测并更新等待状态 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { //如果前节点取消了,那就往前找到一个等待状态的接待你,并排在它的后面 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //阻塞当前线程,返回中断状态 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }具体的boolean tryAcquire(int acquires)实现有所不同。 公平锁的实现如下:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }在AQS的实现中有一个出现了一个park的概念。park即LockSupport.park().它的作用是阻塞当前线程,并且可以调用LockSupport.unpark(Thread)去停止阻塞。它们的实质都是通过UnSafe类使用了CPU的原语。在AQS中使用park的主要作用是,让排队的线程阻塞掉(停止其自旋,自旋会消耗CPU资源),并在需要的时候,可以方便的唤醒阻塞掉的线程。
尝试获取锁,acquires参数为整数1
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取当前状态 int c = getState(); //当前状态为零,也就是处于未锁定状态。 if (c == 0) { // 如果没有后续后续节点,并且设置状态值为1成功。 //将当前线程设为独占。并返回true if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 当前线程已经独占该共享资源。将锁的状态加一,表示重入锁为二。 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }释放锁
protected final boolean tryRelease(int releases) { // Release参数为1。如果当前状态为上锁状态。c就是0 int c = getState() - releases; // 当前线程不是该锁的Owner,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 返回的释放成功状态值 boolean free = false; // 如果当前状态为锁定,释放锁并将独占线程设为空 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //设置锁状态为未锁定 setState(c); return free; }是否持有独占锁
protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner // 判断当前锁持有着是不是当前线程 return getExclusiveOwnerThread() == Thread.currentThread(); }部分内容引用来自:
https://www.cnblogs.com/zofun/p/12206759.html#707765108