其他锁: StampedLock(戳)、Semaphore(信号量)、CountdownLatch(倒计时)

    技术2022-07-14  71

    文章目录

    1. StampedLock(戳)2. Semaphore(信号量)1.自定义例子2. Semaphore加锁解锁流程3. Semaphore源码 3. CountdownLatch(倒计时)

    1. StampedLock(戳)

    该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

    加锁解锁:读锁

    long stamp = lock.readLock(); lock.unlockRead(stamp);

    加锁解锁:写锁

    long stamp = lock.writeLock(); lock.unlockWrite(stamp);

    乐观读,StampedLock 支持tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

    long stamp = lock.tryOptimisticRead(); // 验戳 if(!lock.validate(stamp)){ // 锁升级 }

    提供一个数据容器类内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

    class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public DataContainerStamped(int data) { this.data = data; } public int read(int readTime) { //这里是乐观读,不是真正意义上的加锁 long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}", stamp); sleep(readTime); //对乐观读取后,需要做一次戳校验,校验通过,表示这期间确实没有写操作,数据可以安全使用返回data //校验没通过,则从无锁的乐观读,锁升级 - 读锁 if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}", stamp, data); return data; } // 锁升级 - 读锁 log.debug("updating to read lock... {}", stamp); try { //带戳的读锁 stamp = lock.readLock(); log.debug("read lock {}", stamp); sleep(readTime); log.debug("read finish...{}, data:{}", stamp, data); return data; } finally { log.debug("read unlock {}", stamp); lock.unlockRead(stamp); } } public void write(int newData) { //带戳的写锁 long stamp = lock.writeLock(); log.debug("write lock {}", stamp); try { sleep(2); this.data = newData; } finally { log.debug("write unlock {}", stamp); lock.unlockWrite(stamp); } } }

    测试 读-读 可以优化

    public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.read(0); }, "t2").start(); } 结果:输出结果,可以看到实际没有加读锁,因为读读校验通过,不走锁升级--读锁 15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

    测试 读-写 时优化读补加读锁

    public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.write(100); }, "t2").start(); } 结果:有写锁,校验不通过,升级到读锁,等到t2线程释放写锁,t1才能加读锁 15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 15:57:00.717 c.DataContainerStamped [t2] - write lock 384 15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 15:57:02.719 c.DataContainerStamped [t1] - read lock 513 15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 15:57:03.719 c.DataContainerStamped [t1] - read unlock 513

    注意

    StampedLock 不支持条件变量StampedLock 不支持可重入

    2. Semaphore(信号量)

    信号量,用来限制能同时访问共享资源的线程上限。

    1.自定义例子

    public static void main(String[] args) { // 1. 创建 semaphore 对象,每次都有3个线程能同时工作, //和线程池类似 Semaphore semaphore = new Semaphore(3); // 2. 10个线程同时运行 for (int i = 0; i < 10; i++) { new Thread(() -> { // 3. 获取许可 try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..."); sleep(1); log.debug("end..."); } finally { // 4. 释放许可 semaphore.release(); } }).start(); } }

    2. Semaphore加锁解锁流程

    Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后 停车场显示空余车位减一 刚开始,permits(state)为 3,这时 5 个线程来获取资源

    注意:这里的state表示可以加进来的线程数,0就是没有线程空位了,与之前的state正好相反。

    假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功, 而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

    这时 Thread-4 释放了 permits,状态如下

    接下来 Thread-0 竞争成功,permits 再次设置为 0, 设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0, 因此 Thread-3 在尝试不成功后再次进入 park 状态

    3. Semaphore源码

    其实和之前的源码分析差不多,这里直接上源码了

    static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { // permits 即 state super(permits); } // Semaphore 方法, 方便阅读, 放在此处 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AQS 继承过来的方法, 方便阅读, 放在此处 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 尝试获得共享锁 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // Sync 继承过来的方法, 方便阅读, 放在此处 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if ( // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly remaining < 0 || // 如果 cas 重试成功, 返回正数, 表示获取成功 compareAndSetState(available, remaining) ) { return remaining; } } } // AQS 继承过来的方法, 方便阅读, 放在此处 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次尝试获取许可 int r = tryAcquireShared(arg); if (r >= 0) { // 成功后本线程出队(AQS), 所在 Node设置为 head // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark // 如果 head.waitStatus == 0 ==> Node.PROPAGATE // r 表示可用资源数, 为 0 则不会继续传播 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // Semaphore 方法, 方便阅读, 放在此处 public void release() { sync.releaseShared(1); } // AQS 继承过来的方法, 方便阅读, 放在此处 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } }

    3. CountdownLatch(倒计时)

    用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

    public static void main(String[] args) throws InterruptedException { //3代表倒计时从3开始到0结束 CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { log.debug("begin..."); sleep(1); //countDown() 用来让计数减一 latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(2); //countDown() 用来让计数减一 latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); new Thread(() -> { log.debug("begin..."); sleep(1.5); //countDown() 用来让计数减一 latch.countDown(); log.debug("end...{}", latch.getCount()); }).start(); log.debug("waiting..."); //当计数减为0的时候,开始运行在这之后的程序 latch.await(); log.debug("wait end..."); } 结果: 18:44:00.778 c.TestCountDownLatch [main] - waiting... 18:44:00.778 c.TestCountDownLatch [Thread-2] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-0] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-1] - begin... 18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2 18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1 18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0 18:44:02.782 c.TestCountDownLatch [main] - wait end...
    Processed: 0.030, SQL: 9