ReentrantLock与ConditionObject 管程模型的实现

    技术2022-07-10  105

    背景

    在并发编程领域,有两个核心问题:一是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之前如何通信的问题。这两个问题通过管程都可以解决。

    管程的结构如下图所示: Java目前是提供了两个管程的实现,一个是synchronized ,另一个是并发包下的ReentrantLock类与ConditionObject类。其中ReentrantLock类实现了Lock接口解决了互斥问题,ConditionObject类实现了Condition接口解决了同步问题。 本文简要分析一下这对组合是如何实现管程模型?

    管程模型的实现

    ReentrantLock

    ReentrantLock实现了Lock接口,提供了加锁、解锁和创建条件等待队列的方法。 结构上ReentrantLock持有了一个AbstractQueuedSynchronizer的子类对象,并将Lock接口的实现方法都委托给它来执行。

    public class ReentrantLock implements Lock{ private final Sync sync; // AbstractQueuedSynchronizer的抽象子类 abstract static class Sync extends AbstractQueuedSynchronizer { } // 非公平锁 static final class NonfairSync extends Sync { } // 公平锁 static final class FairSync extends Sync { } public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } } public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }

    管程的信号量

    线程进入临界区需要获取到管程的信号量,退出临界区需要释放获取到的信号量。 AbstractQueuedSynchronizer用它的成员变量 state来表示信号量,并要求想要实现互斥锁的子类重写tryAcquire方法和tryRelease方法。

    public abstract class AbstractQueuedSynchronizer // 信号量 private volatile int state; // 获取的信号量,需要子类重写 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 释放获取的信号量,需要子类重写 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 尝试获取信号量 // 如果获取不到则添加到管程的入口等待队列,并阻塞等待获取到信号量 // 该方法子类无须重写 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } }

    管程的入口等待队列

    AbstractQueuedSynchronizer作为管程的入口等待队列,在数据结构上它是线程安全的双端队列,并且在该队列初始化后,队列头节点一直为一个空节点。

    public abstract class AbstractQueuedSynchronizer{ // 线程安全的入队操作 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 懒加载的方式设置队列初始的空节点 if (compareAndSetHead(new Node())) tail = head; } else { // 确保先link in 再compareAndSetTail node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // 线程安全的出队操作 // 当前节点为头节点的后继节点才有机会出队-> 则当前节点可以出队可以得知此时头节点为空节点(头节点对应的线程早已出队) // 因此每次只有一个线程进行 tryAcquire(arg);setHead(node); // 因此出队操作也是线程安全的 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } }

    管程的条件等待队列

    ConditionObject类是AbstractQueuedSynchronizer的一个内部类,实现了Condition接口,作为管程的一个条件等待队列的实现。 一个ConditionObject对象是管程的一个条件等待队列。 由于对条件等待队列的入队和出队操作都是在临界区内执行的,所以该队列线程不安全即可。

    调用ConditionObject对象的await()方法:线程会释放当前持有的信号量,并创建一个节点添加到ConditionObject对象的条件等待队列,之后挂起等待其他线程的通知。调用ConditionObject对象的signal()或signalAll()方法,会移除条件等待队列中的节点,并唤醒节点中线程引用对应的线程。 public class ConditionObject implements Condition{ private transient Node firstWaiter; private transient Node lastWaiter; // 队尾入队 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } // 队尾出队 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } }
    Processed: 0.012, SQL: 9