什么是AQS
字面上来看,AQS是jdk1.5加入的java.util.concurrent.locks.AbstractQueuedSynchronizer类,类名翻译成中文就是抽象的队列同步器。由大名鼎鼎的Doug Lea李大爷来操刀设计并开发实现。 它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。
为何要了解AQS
因为AQS是实现 Lock 的基础。想要深入了解Java的并发编程,AQS是锁的实现根基。
AQS原理
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。 图:AQS原理图
手动实现AQS
首先,我们模拟一个在线电商的秒杀场景。多位用户一起来抢购某件商品,看不加锁时,会不会发生超卖现象。 然后,基于AQS原理,我们实现一个AQS,看加锁之后,能否解决问题。
模拟秒杀场景
import lombok
.extern
.slf4j
.Slf4j
;
@Slf4j
public class DiyAqsDemo {
private volatile int stock
= 5;
public static final long USER_COUNT
= 100;
public static void main(String
[] args
) {
DiyAqsDemo diyAqsDemo
= new DiyAqsDemo();
for (int i
= 0; i
< USER_COUNT
; i
++) {
Thread thread
= new Thread(() -> diyAqsDemo
.buy(), String
.format("第%d位顾客的线程", i
+ 1));
thread
.start();
}
}
public void buy() {
try {
Thread
.sleep(10);
}
catch (InterruptedException e
) {
e
.printStackTrace();
}
if (stock
> 0) {
log
.info("购买成功,剩余库存为:{}", this.stock
);
stock
--;
}
else {
log
.info("购买失败,库存不足,剩余库存为:{}", this.stock
);
}
}
}
截取部分运行结果如下
2020-07-02 06:52:22.392 [第70位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:5
2020-07-02 06:52:22.392 [第71位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:5
2020-07-02 06:52:22.392 [第73位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:5
……
2020-07-02 06:52:22.394 [第84位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:3
2020-07-02 06:52:22.397 [第98位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买失败,库存不足,剩余库存为:0
2020-07-02 06:52:22.399 [第99位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买失败,库存不足,剩余库存为:-28
可以发现,第99位顾客来购买时,库存是负的。虽然我们使用了volatile关键字来修饰库存变量,但是主内存与工作内存交互时的lock、unlock、read、load、use、assign、store、write步骤,保证不了原子性,读取每个线程拷贝了主内存的库存值到自己的工作内存,它们认为还有库存,继续购买,于是发生了超卖。 图:主内存和工作内存的交互
那么,我们实现一个AQS锁,在判断库存是否充足时,加锁,等库存修改后,再释放锁,不就解决问题了么。说干就干:
手动实现AQS
流程图如下: 图:Thread1线程获取锁 这时,Thread1尝试获取锁,队列为空,获取锁的动作,需要是原子的。这里采用sun.misc.Unsafe的compareAndSwapInt(Object var1, long var2, int var4, int var5)函数,来保证原子性。 线程1修改state=1后,lockHolder引用指向线程1,程序获取锁成功,退出lock()方法,继续业务逻辑。
业务逻辑执行完成后,执行unlock()方法。首先检查当前线程是不是lockHolder指向的线程,其他线程是无权限释放锁的。 修改state=0,然后把lockHolder对象置空。如果等待队列有值,则取栈首的对象出来,然后唤醒该线程。如果等待队列没有对象,则不作处理。 图:Thread1释放锁
源码
import java
.lang
.reflect
.Field
;
import java
.util
.concurrent
.ConcurrentLinkedQueue
;
import java
.util
.concurrent
.locks
.LockSupport
;
import java
.util
.stream
.Collectors
;
import lombok
.extern
.slf4j
.Slf4j
;
import sun
.misc
.Unsafe
;
@Slf4j
public class DiyAqsLock {
private volatile int state
= 0;
private Thread lockHolder
;
private ConcurrentLinkedQueue
<Thread> waiters
= new ConcurrentLinkedQueue<>();
private static final Unsafe unsafe
= UnsafeInstance
.getInstance();
private static long stateOffset
;
static {
try {
stateOffset
= unsafe
.objectFieldOffset(DiyAqsLock
.class.getDeclaredField("state"));
}
catch (NoSuchFieldException e
) {
e
.printStackTrace();
}
}
public void lock() {
if (acquire()) {
return;
}
Thread current
= Thread
.currentThread();
log
.debug("线程状态为:{}", current
.getState());
waiters
.add(current
);
for (; ; ) {
if (current
== waiters
.peek() && acquire()) {
waiters
.poll();
return;
}
LockSupport
.park(current
);
}
}
private boolean acquire() {
int state
= getState();
Thread current
= Thread
.currentThread();
boolean waitCondition
= waiters
.size() == 0 || current
== waiters
.peek();
if (state
== 0 && waitCondition
) {
if (compareAndSwapState(0, 1)) {
log
.info("获取锁成功");
setLockHolder(current
);
return true;
}
}
return false;
}
public final boolean compareAndSwapState(int expect
, int update
) {
return unsafe
.compareAndSwapInt(this, stateOffset
, expect
, update
);
}
public void unlock() {
System
.out
.printf("当前等待队列为:%s\n", waiters
.stream().map(w
-> w
.getName()).collect(Collectors
.toList()));
if (Thread
.currentThread() != lockHolder
) {
throw new RuntimeException("threadHolder is not current thread");
}
if (getState() == 1 && compareAndSwapState(1, 0)) {
log
.info("释放锁成功");
setLockHolder(null
);
Thread first
= waiters
.peek();
if (first
!= null
) {
LockSupport
.unpark(first
);
}
}
}
public int getState() {
return state
;
}
public void setLockHolder(Thread lockHolder
) {
this.lockHolder
= lockHolder
;
}
}
@Slf4j
class UnsafeInstance {
public static Unsafe
getInstance() {
try {
Field field
= Unsafe
.class.getDeclaredField("theUnsafe");
field
.setAccessible(true);
return (Unsafe
) field
.get(null
);
}
catch (Exception e
) {
log
.error(e
.getMessage(), e
);
return null
;
}
}
}
源码关键部分解读
boolean waitCondition
= waiters
.size() == 0 || current
== waiters
.peek();
为何获取锁时,要判断这一句呢? 看流程图,如果是线程1获取锁,此时等待队列为空,可以正常获取锁,没有问题。 如果是线程2来获取锁,假设队列不为空(队列里有线程3、线程4等),为了保证排在队伍前面的线程2可以获取到锁,我们加上了current == waiters.peek(),这样就确保了公平性。先入先出。 我们试着去掉这个条件判断,在释放锁时加上当前等待队列的打印
public void unlock() {
System
.out
.printf("当前等待队列为:%s\n", waiters
.stream().map(w
-> w
.getName()).collect(Collectors
.toList()));
……
然后再次运行程序,结果如下: 图:插队的情况 可以看到,此时获取锁,本来排在第1位顾客后面的是第6位顾客,却被第92位顾客插队了,不是“先来先得”了。 加上waitCondition判断后,运行结果如下: 图:按照排队顺序获取锁 可以看到,这次没有人再插队了。
public static Unsafe
getInstance()
这里获取Unsafe对象,没有直接new,因为这个类比较特殊,Java不建议用户直接使用。 查看Unsafe.getUnsafe()源码:
@CallerSensitive
public static Unsafe
getUnsafe() {
Class
var0 = Reflection
.getCallerClass();
if (!VM
.isSystemDomainLoader(var0
.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe
;
}
}
当且仅当调用getUnsafe方法的类为引导类加载器所加载时才合法,否则抛出SecurityException异常。
public final boolean compareAndSwapState(int expect
, int update
) {
return unsafe
.compareAndSwapInt(this, stateOffset
, expect
, update
);
}
cas操作,解决原子操作,确保对state进行修改是原子性的。
LockSupport
.park(current
);
当获取锁失败时,我们采用自旋的方式,让当前线程先等待。如果这里使用wait,则在notify时,我们无法准确唤醒指定的线程。而java.util.concurrent.locks.LockSupport类,则提供了public static void unpark(Thread thread),可以唤醒指定线程。
Unsafe.objectFieldOffset()
public native long objectFieldOffset(Field f
);
参考
图灵学院:手写高并发秒杀场景同步器锁防超卖,现场压测 手写AQS锁解决秒杀超卖 - 知乎 Java魔法类:Unsafe应用解析 - 美团技术团队
环境说明
java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)
OS:macOS High Sierra 10.13.4日志:logback