并发编程(八)之ReentrantLock

    技术2024-01-28  110

    概述

    可重入锁,简单地讲就是:“同一个线程对于已经获得到的锁,可以多次继续申请到该锁的使用权”。而 synchronized 关键字隐式的支持重进入,比如一个 synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁。ReentrantLock 在调用 lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。

    该特性的实现需要解决以下两个问题:

    线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。锁的最终释放。线程重复 n 次获取了锁,随后在第 n 次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于 0 时表示锁已经成功释放。

    源码分析

    类的继承关系

    public class ReentrantLock implements Lock, java.io.Serializable

    ReentrantLock实现了Lock接口,Lock接口中定义了lock与unlock相关操作,并且还存在newCondition方法,表示生成一个条件。

    类的内部类

    ReentrantLock类内部总共存在Sync、NonfairSync、FairSync三个类,继承关系如下

    Sync类

    abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; //获取锁 abstract void lock(); //非公平方式获取锁 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取状态 int c = getState(); if (c == 0) {//表示没有线程正在竞争该锁 //比较并设置状态成功,状态0表示锁没有被占用 if (compareAndSetState(0, acquires)) { //设置当前线程独占 setExclusiveOwnerThread(current); return true; } } //当前线程拥有该锁 else if (current == getExclusiveOwnerThread()) { //增加重入次数 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //试图在共享模式下获取对象状态,此方法应该查询是否允许它在共享模式下获取对象状态,如果允 //许,则获取 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //判断资源是否被当前线程占有 protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } //返回资源的占用线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } //返回状态 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } //资源是否被占用 final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } 方法描述void lock()子类实现boolean nonfairTryAcquire(int acquires)非公平获取锁,默认方式boolean tryRelease(int releases)试图在共享模式下获取对象状态,此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取boolean isHeldExclusively()判断资源是否被当前线程占有Thread getOwner()返回占有资源的线程int getHoldCount()返回状态boolean isLocked()资源是否被占用

    nonfairTryAcquire 方法增加了再次获取同步状态的处理逻辑:

    通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回 true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。如果该锁被获取了 n 次,那么前(n-1)次 tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回 true。可以看到,该方法将同步状 态是否为 0 作为最终释放的条件,当同步状态为 0 时,将占有线程设置为 null,并返回 true,表示释放成功。

    NonfairSync

    // 非公平锁 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; // 获得锁 final void lock() { if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用 // 把当前线程设置独占了锁 setExclusiveOwnerThread(Thread.currentThread()); else // 锁已经被占用,或者set失败 // 以独占模式获取对象,忽略中断 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

    从lock方法的源码可知,每一次都尝试获取锁,而并不会按照公平等待的原则进行等待,让等待时间最久的线程获得锁。

    FairSyn

    // 公平锁 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { // 以独占模式获取对象,忽略中断 acquire(1); } // 尝试公平获取锁 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取状态 int c = getState(); if (c == 0) { // 状态为0 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且比较并且设置状态成功 // 设置当前线程独占 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占据 // 下一个状态 int nextc = c + acquires; if (nextc < 0) // 超过了int的表示范围 throw new Error("Maximum lock count exceeded"); // 设置状态 setState(nextc); return true; } return false; } }

    跟踪lock方法的源码可知,当资源空闲时,它总是会先判断sync队列(AbstractQueuedSynchronizer中的数据结构)是否有等待时间更长的线程,如果存在,则将该线程加入到等待队列的尾部,实现了公平获取原则。

    因此,只要资源被其他线程占用,该线程就会添加到sync queue中的尾部,而不会先尝试获取资源。这也是和Nonfair最大的区别,Nonfair每一次都会尝试去获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部。

    实现一个自己的重入锁

    public class ReenterSelfLock implements Lock { // 静态内部类,自定义同步器 private static class Sync extends AbstractQueuedSynchronizer { // 是否处于占用状态 protected boolean isHeldExclusively() { return getState() > 0; } // 当状态为0的时候获取锁 public boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; }else if(getExclusiveOwnerThread()==Thread.currentThread()){ setState(getState()+1); return true; } return false; } // 释放锁,将状态设置为0 protected boolean tryRelease(int releases) { if(getExclusiveOwnerThread()!=Thread.currentThread()){ throw new IllegalMonitorStateException(); } if (getState() == 0) throw new IllegalMonitorStateException(); setState(getState()-1); if(getState()==0){ setExclusiveOwnerThread(null); } return true; } // 返回一个Condition,每个condition都包含了一个condition队列 Condition newCondition() { return new ConditionObject(); } } // 仅需要将操作代理到Sync上即可 private final Sync sync = new Sync(); public void lock() { System.out.println(Thread.currentThread().getName()+" ready get lock"); sync.acquire(1); System.out.println(Thread.currentThread().getName()+" already got lock"); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { System.out.println(Thread.currentThread().getName()+" ready release lock"); sync.release(1); System.out.println(Thread.currentThread().getName()+" already released lock"); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
    Processed: 0.014, SQL: 10