我手写了AQS实现、画了3张流程图,就是为了让你彻底搞明白AQS原理

    技术2022-07-13  83

    什么是AQS

    字面上来看,AQS是jdk1.5加入的java.util.concurrent.locks.AbstractQueuedSynchronizer类,类名翻译成中文就是抽象的队列同步器。由大名鼎鼎的Doug Lea李大爷来操刀设计并开发实现。 它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。

    为何要了解AQS

    因为AQS是实现 Lock 的基础。想要深入了解Java的并发编程,AQS是锁的实现根基。

    AQS原理

    AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。 图:AQS原理图

    手动实现AQS

    首先,我们模拟一个在线电商的秒杀场景。多位用户一起来抢购某件商品,看不加锁时,会不会发生超卖现象。 然后,基于AQS原理,我们实现一个AQS,看加锁之后,能否解决问题。

    模拟秒杀场景

    import lombok.extern.slf4j.Slf4j; /** * 程序入口 * created at 2020-06-27 20:00 * @author lerry */ @Slf4j public class DiyAqsDemo { /** * 剩余库存 */ private volatile int stock = 5; /** * 模拟用户个数 */ public static final long USER_COUNT = 100; public static void main(String[] args) { DiyAqsDemo diyAqsDemo = new DiyAqsDemo(); for (int i = 0; i < USER_COUNT; i++) { Thread thread = new Thread(() -> diyAqsDemo.buy(), String.format("第%d位顾客的线程", i + 1)); thread.start(); } } /** * 购买 */ public void buy() { try { // 模拟购买的耗时 Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } if (stock > 0) { log.info("购买成功,剩余库存为:{}", this.stock); stock--; } else { log.info("购买失败,库存不足,剩余库存为:{}", this.stock); } } }

    截取部分运行结果如下

    2020-07-02 06:52:22.392 [第70位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:5 2020-07-02 06:52:22.392 [第71位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:5 2020-07-02 06:52:22.392 [第73位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:5 …… 2020-07-02 06:52:22.394 [第84位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买成功,剩余库存为:3 2020-07-02 06:52:22.397 [第98位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买失败,库存不足,剩余库存为:0 2020-07-02 06:52:22.399 [第99位顾客的线程] INFO com.hua.threadtest.aqs.DiyAqsDemo - 购买失败,库存不足,剩余库存为:-28

    可以发现,第99位顾客来购买时,库存是负的。虽然我们使用了volatile关键字来修饰库存变量,但是主内存与工作内存交互时的lock、unlock、read、load、use、assign、store、write步骤,保证不了原子性,读取每个线程拷贝了主内存的库存值到自己的工作内存,它们认为还有库存,继续购买,于是发生了超卖。 图:主内存和工作内存的交互

    那么,我们实现一个AQS锁,在判断库存是否充足时,加锁,等库存修改后,再释放锁,不就解决问题了么。说干就干:

    手动实现AQS

    流程图如下: 图:Thread1线程获取锁 这时,Thread1尝试获取锁,队列为空,获取锁的动作,需要是原子的。这里采用sun.misc.Unsafe的compareAndSwapInt(Object var1, long var2, int var4, int var5)函数,来保证原子性。 线程1修改state=1后,lockHolder引用指向线程1,程序获取锁成功,退出lock()方法,继续业务逻辑。


    业务逻辑执行完成后,执行unlock()方法。首先检查当前线程是不是lockHolder指向的线程,其他线程是无权限释放锁的。 修改state=0,然后把lockHolder对象置空。如果等待队列有值,则取栈首的对象出来,然后唤醒该线程。如果等待队列没有对象,则不作处理。 图:Thread1释放锁

    源码

    import java.lang.reflect.Field; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import sun.misc.Unsafe; /** * 自定义AQS * created at 2020-06-27 19:55 * @author lerry */ @Slf4j public class DiyAqsLock { /** * <pre> * 使用一个Volatile的int类型的成员变量来表示同步状态 * 记录锁的状态 0表示没有线程持有锁 * >0表示有线程持有锁 * </pre> */ private volatile int state = 0; /** * 用于记录持有锁的线程 */ private Thread lockHolder; /** * 存放获取锁失败的线程对象 */ private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>(); /** * 通过Unsafe进行cas操作 */ private static final Unsafe unsafe = UnsafeInstance.getInstance(); private static long stateOffset; static { try { stateOffset = unsafe.objectFieldOffset(DiyAqsLock.class.getDeclaredField("state")); } catch (NoSuchFieldException e) { e.printStackTrace(); } } /** * 加锁 */ public void lock() { // 同步获取锁 if (acquire()) { return; } Thread current = Thread.currentThread(); log.debug("线程状态为:{}", current.getState()); // 获取锁失败的 添加进队列里 waiters.add(current); // 自旋获取锁 for (; ; ) { // 如果当前线程是栈首的对象,并且获取锁成功,则在等待队列中移除栈首对象,否则继续等待 if (current == waiters.peek() && acquire()) { // 移除队列 waiters.poll(); return; } // 让出cpu的使用权 LockSupport.park(current); } } /** * 获取锁 * @return */ private boolean acquire() { int state = getState(); Thread current = Thread.currentThread(); boolean waitCondition = waiters.size() == 0 || current == waiters.peek(); if (state == 0 && waitCondition) { // 没有线程获取到锁 if (compareAndSwapState(0, 1)) { log.info("获取锁成功"); // 同步修改成功 将线程持有者修改为当前线程 setLockHolder(current); return true; } } return false; } /** * cas操作 * @param expect * @param update * @return */ public final boolean compareAndSwapState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /** * 解锁 */ public void unlock() { System.out.printf("当前等待队列为:%s\n", waiters.stream().map(w -> w.getName()).collect(Collectors.toList())); // 1.校验释放锁的线程是不是当前持有锁的线程 if (Thread.currentThread() != lockHolder) { throw new RuntimeException("threadHolder is not current thread"); } // 2. 释放锁修改state if (getState() == 1 && compareAndSwapState(1, 0)) { log.info("释放锁成功"); // 将锁的持有线程置为空 setLockHolder(null); // 2.唤醒队列里的第一个线程 Thread first = waiters.peek(); if (first != null) { // 解除线程的阻塞 LockSupport.unpark(first); } } } public int getState() { return state; } public void setLockHolder(Thread lockHolder) { this.lockHolder = lockHolder; } } @Slf4j class UnsafeInstance { public static Unsafe getInstance() { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (Exception e) { log.error(e.getMessage(), e); return null; } } }

    源码关键部分解读

    boolean waitCondition = waiters.size() == 0 || current == waiters.peek();

    为何获取锁时,要判断这一句呢? 看流程图,如果是线程1获取锁,此时等待队列为空,可以正常获取锁,没有问题。 如果是线程2来获取锁,假设队列不为空(队列里有线程3、线程4等),为了保证排在队伍前面的线程2可以获取到锁,我们加上了current == waiters.peek(),这样就确保了公平性。先入先出。 我们试着去掉这个条件判断,在释放锁时加上当前等待队列的打印

    /** * 解锁 */ public void unlock() { System.out.printf("当前等待队列为:%s\n", waiters.stream().map(w -> w.getName()).collect(Collectors.toList())); ……

    然后再次运行程序,结果如下: 图:插队的情况 可以看到,此时获取锁,本来排在第1位顾客后面的是第6位顾客,却被第92位顾客插队了,不是“先来先得”了。 加上waitCondition判断后,运行结果如下: 图:按照排队顺序获取锁 可以看到,这次没有人再插队了。


    public static Unsafe getInstance()

    这里获取Unsafe对象,没有直接new,因为这个类比较特殊,Java不建议用户直接使用。 查看Unsafe.getUnsafe()源码:

    @CallerSensitive public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); if (!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } }

    当且仅当调用getUnsafe方法的类为引导类加载器所加载时才合法,否则抛出SecurityException异常。


    public final boolean compareAndSwapState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

    cas操作,解决原子操作,确保对state进行修改是原子性的。


    LockSupport.park(current);

    当获取锁失败时,我们采用自旋的方式,让当前线程先等待。如果这里使用wait,则在notify时,我们无法准确唤醒指定的线程。而java.util.concurrent.locks.LockSupport类,则提供了public static void unpark(Thread thread),可以唤醒指定线程。


    Unsafe.objectFieldOffset()

    //返回对象成员属性在内存地址相对于此对象的内存地址的偏移量 public native long objectFieldOffset(Field f);

    参考

    图灵学院:手写高并发秒杀场景同步器锁防超卖,现场压测 手写AQS锁解决秒杀超卖 - 知乎 Java魔法类:Unsafe应用解析 - 美团技术团队

    环境说明

    java -version java version "1.8.0_251" Java(TM) SE Runtime Environment (build 1.8.0_251-b08) Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode) OS:macOS High Sierra 10.13.4日志:logback
    Processed: 0.010, SQL: 10