23 - Semaphore 信号量

    技术2026-01-16  11

    Semaphore 信号量

    1. Semaphore 模型2. Semaphore 使用3. 源码分析3.1 类结果3.2 acquire3.3 release 4. 总结

      本篇介绍第三个并发工具类 Semaphore,Semaphore 可以理解为信号量,用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定资源。   

    1. Semaphore 模型

      信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。你可以结合下图来形象化地理解。

    这三个方法详细的语义具体如下所示:

    init():设置计数器的初始值;down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行;up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

      这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

      信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。

      

    2. Semaphore 使用

      Semaphore管理着一组许可permit,许可的初始数量通过构造函数设定。

      当线程要访问共享资源时,需要先通过acquire()方法获取许可。获取到之后许可就被当前线程占用了,在归还许可之前其他线程不能获取这个许可。

      调用acquire()方法时,如果没有许可可用了,就将线程阻塞,等待有许可被归还了再执行。

      当执行完业务功能后,需要通过release()方法将许可证归还,以便其他线程能够获得许可证继续执行。

    如果初始化了一个许可为1的Semaphore,那么就相当于一个不可重入的互斥锁(Mutex)。

    举个例子理解一下:

      我们假设停车场仅有3个停车位,停车位就是有限的共享资源,许可数为3。一开始停车场没有车辆所有车位全部空着,然后先后到来三辆车,停车场车位够,安排进去停车。之后来的车必须在外面候着,直到停车场有空车位。当停车场有车开出去,里面有空位了,则安排一辆车进去(至于是哪辆要看选择的机制是公平还是非公平)。

      从程序角度看,停车场就相当于有限的公共资源,许可数为3,车辆就相当于线程。当来一辆车时,许可数就会减1,当停车场没有车位了(许可数为0),其他来的车辆需要在外面等候着。如果有一辆车开出停车场,许可数+1,然后放进来一辆车。

    代码实现如下:

    public class TestSemaphore { public static void main(String[] args) { Parking parking = new Parking(3); for (int i = 0; i < 10; i++) { new Thread(() -> { parking.park(); }).start(); } } static class Parking { Semaphore semaphore; public Parking(int count) { this.semaphore = new Semaphore(count); } public void park() { try { semaphore.acquire(); long time = (long) (Math.random() * 10); System.out.println(Thread.currentThread().getName() + " 进入停车场,停车 + " + time + "秒..."); Thread.sleep(time); System.out.println(Thread.currentThread().getName() + " 开出停车场..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } } } # 运行结果如下: Thread-1 进入停车场,停车 + 2... Thread-2 进入停车场,停车 + 2... Thread-0 进入停车场,停车 + 9... Thread-1 开出停车场... Thread-2 开出停车场... Thread-3 进入停车场,停车 + 9... Thread-4 进入停车场,停车 + 1... Thread-4 开出停车场... Thread-5 进入停车场,停车 + 3... Thread-5 开出停车场... Thread-6 进入停车场,停车 + 0... Thread-6 开出停车场... Thread-7 进入停车场,停车 + 9... Thread-0 开出停车场... Thread-8 进入停车场,停车 + 3... Thread-3 开出停车场... Thread-8 开出停车场... Thread-9 进入停车场,停车 + 2... Thread-9 开出停车场... Thread-7 开出停车场...

      Semaphore 可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。假如有多个线程读取数据后,需要将数据保存在数据库中,而可用的最大数据库连接只有10个,这时候就需要使用 Semaphore 来控制能够并发访问到数据库连接资源的线程个数最多只有10个。在限制资源使用的应用场景下,Semaphore 是特别合适的。

      

    3. 源码分析

    3.1 类结果

      Semaphore 同样是由 AQS 实现的,用内部类 Sync 来管理锁,Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁)。

      这个类结构有没有似曾相识的感觉,重入锁 ReentrantLock 也是同样的类结构,Semaphore 的源码跟 ReentrantLock 有很多相似但又比 ReentrantLock简单。

    public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {} static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} }

    看下构造方法,设置许可数 permits 其实就是将 AQS.state 设置为 permits:

    public Semaphore(int permits) { sync = new NonfairSync(permits); } NonfairSync(int permits) { super(permits); } Sync(int permits) { setState(permits); }

      

    3.2 acquire

      acquire() 方法就是获取许可,获取到许可就可以继续执行访问共享资源,获取不到就阻塞等待其他线程归还许可。

    AQS.state 用来记录可用的许可数量,每获取一个许可 state 减1。

    ** * 获取许可的方法其实就是获取锁的方法 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 响应打断 if (Thread.interrupted()) throw new InterruptedException(); // 真正获取锁的方法,由Semaphore.NonfairSync实现 if (tryAcquireShared(arg) < 0) // 获取锁失败,当前线程阻塞并进入AQS同步队列 doAcquireSharedInterruptibly(arg); } /** * Semaphore.NonfairSync实现的获取锁的方法 */ protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } /** * 每获取一个许可,将state-1,state表示剩余的许可数 * 如果许可已经用完,返回remaining<0,表示获取不到锁/许可,线程阻塞 * 如果还有许可,返回remaining>=0,表示获取到锁/许可,线程继续执行 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); // 每获取一个许可,将state-1,state表示剩余的许可数 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

      

    3.3 release

      release() 方法归还许可,其实就是将 AQS.state 加1。归还成功,唤醒 AQS 队列中等锁的线程,从被阻塞的位置开始执行。

    /** * 释放许可调用释放锁的方法 */ public void release() { sync.releaseShared(1); } /** * 释放锁,完全成功,依次唤醒AQS队列中等待共享锁的线程 */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 释放锁,由Semaphore.Sync实现 doReleaseShared(); // 释放锁成功,唤醒AQS队列中等锁的线程 return true; } return false; } /** * 每归还一个许可将state加1 */ protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases;// 每归还一个许可将state加1 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }

      

    4. 总结

      信号量 Semaphore 用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定资源,比如数据库连接等。

      Semaphore 在构造时设置一个许可数量,这个许可数量用 AQS.state来记录。

      acquire() 方法就是获取许可,只有获取到许可才可以继续执行访问共享资源,获取到许可之后 AQS.state 减1,以记录当前可用的许可数量;如果获取不到许可,线程就阻塞等待其他线程归还许可。

      release() 方法将许可归还,AQS.state 加1;归还之后,唤醒 AQS 队列中阻塞的线程获取许可。

    Processed: 0.015, SQL: 9