首先来说BlockingQueue是什么?从字面上来看,BlockingQueue就是阻塞队列,它是一个先进先出且线程安全的队列。那么它什么时候发生阻塞?有两种情况:
队列已满时进行入队操作,这个时候会阻塞,等待队列中的元素出队队列为空时进行出队操作,这个时候也会阻塞,等待有元素入队BlockingQueue适用于生产者-消费者的场景。生产者不断地生产对象丢到BlockingQueue中(入队),直到BlockingQueue爆满。生产者线程会阻塞,等待消费者消费对象。而消费者不断地从BlockingQueue中取出对象(出队),直到BlockingQueue为空,消费者线程会阻塞,等待生产者生产对象。
BlockingQueue是一个接口,继承自Queue。BlockingQueue提供了4套方法用于插入,移除,获取元素操作。应用于不同场景。如下面表格所示:
-Throws ExceptionSpecial ValueBlocksTimes OutInsertadd(o)offer(o)put(o)offer(o, timeout, timeunit)Removeremove(o)poll()take()poll(timeout, timeunit)Examineelement()peek()可见BlockingQueue操作一共有四种场景:
ThrowsException:抛出异常SpecialValue:返回特殊值,一般是null或者true/falseBlocks:阻塞等待此操作,直到完成TimesOut:阻塞等待此操作指定时间BlockingQueue 的实现类都遵循表格中的规则,需要注意的一点: 不能向BlockingQueue中插入null,否则会抛出NullPointerException 异常。因为在BlockingQueue中null值通常用做特殊值返回。
BlockingQueue 又分有界队列和无界队列,有界队列在队列中的元素到达上限后,再进行入队操作,入队操作将会被阻塞,无界队列其实也不是说真正的无界,只是队列的上线是Integer.MAX_VALUE。
下面我们从源码来分析BlockingQueue 的实现,比较常用的BlockingQueue主要有ArrayBlockingQueue和LinkedBlockingQueue,对于BlockingQueue,我们应该重点关注它的put和take这两个方法的实现,因为这两个方法是带阻塞的。所以接下来我们分别介绍ArrayBlockingQueue和LinkedBlockingQueue的put/take实现。
ArrayBlockingQueue是有界队列,它的底层实现是一个数组,在构造方法中可以指定队列容量的大小,容量一旦初始化就不可再更改。
ArrayBlockingQueue的并发控制是通过ReentrantLock和Condition来实现的。不管是入队操作还是出队操作,都需要获取到锁才能进行,老规矩,先来看一下ArrayBlockingQueue的构造方法
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }我们看到它有三个重载的构造方法,可在构造方法中设置以下三个参数:
capacity:队列容量,限制队列中元素的个数fail:指定使用公平策略还是非公平策略c:传入一个集合,将集合中的元素先添加到队列中再看一下ArrayBlockingQueue中有哪些重要属性
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; // 核心属性,这个数组就是用来存储元素的 final Object[] items; // 下一次读取元素的位置 int takeIndex; /** * 下一次写入元素的位置 */ int putIndex; /** * 队列中元素的数量 */ int count; /** * 所有读写操作的锁 */ final ReentrantLock lock; /** * 读取线程的condition队列 */ private final Condition notEmpty; /** *写入线程的condition队列 */ private final Condition notFull; ...... }从代码中,我们看到ArrayBlockingQueue依赖于一个ReentrantLock和两个Condition以及一个Object数组来实现。简单用一张图来描述它的结构
也就是说ArrayBlockingQueue的读取和写入操作都需要获取到ReentrantLock独占锁之后才能够进行。
如果队列已满,再进行写入操作的时候,写入操作的线程 需要进入到写入操作condition队列中排队,等待其他线程读取元素后,唤醒写入操作condition队列中的第一个线程。
如果队列为空,这个时候再进行读取操作。读取操作的线程 需要进入到读取操作condition队列中排队,等待其他线程写入新的元素,唤醒读取操作condition队列中的第一个线程。
ArrayBlockingQueue的源码非常简单,我们先来看一下它的写入操作:put
public void put(E e) throws InterruptedException { // 这里会检查null值 checkNotNull(e); final ReentrantLock lock = this.lock; // 获取独占锁 lock.lockInterruptibly(); try { // 只要队列中的元素是满的,当前线程就需要进入到写入操作condition队列中等待 while (count == items.length) notFull.await(); // 将元素加入到数组中 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 将元素加入到数组中 items[putIndex] = x; if (++putIndex == items.length) // 重置putIndex,让它重新从0开始。 putIndex = 0; // 成功写入了一个元素,那么元素数量+1 count++; // 成功写入了一个元素,说明队列不为空了。 // 唤醒读取操作condition队列中正在等待的第一个线程 notEmpty.signal(); }put操作就此完毕,再来看下读取操作:take
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 一样要先获取独占锁 lock.lockInterruptibly(); try { // 只要队列是空的,当前线程就需要进入到读取操作condition队列中等待 while (count == 0) notEmpty.await(); // 取出元素,腾出位置 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 取出元素 E x = (E) items[takeIndex]; // 腾出位置 items[takeIndex] = null; // 重置takeIndex,让它重新从0开始。 if (++takeIndex == items.length) takeIndex = 0; // 成功读取出一个元素,那么元素数量-1 count--; if (itrs != null) itrs.elementDequeued(); // 成功读取出一个元素,说明队列不是满的了,唤醒写入操作condition队列中正在等待的第一个线程 notFull.signal(); // 返回取出的元素 return x; }ArrayBlockingQueue的源码就是如此简单,下面来分析LinkedBlockingQueue
LinkedBlockingQueue底层实现是一个单向链表,其精髓在于读写分离。使用其无参构造方法默认是无界队列(Integer.MAX_VALUE),也可传入参数指定队列大小。
先来看一下LinkedBlockingQueue类的结构
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /** * LinkedBlockingQueue链表结构就是基于这个类 */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } /** 队列的容量*/ private final int capacity; /** 当前队列中元素的数量 */ private final AtomicInteger count = new AtomicInteger(); /** * 单向链表的头节点 * Invariant: head.item == null */ transient Node<E> head; /** * 单向链表的尾节点 * Invariant: last.next == null */ private transient Node<E> last; /** take,poll,etc操作需要获取的锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 读取操作condition队列 */ private final Condition notEmpty = takeLock.newCondition(); /** put,offer,etc操作需要获取的锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 写入操作condition队列 */ private final Condition notFull = putLock.newCondition(); ...... }再来看一下LinkedBlockingQueue的构造方法
// 构造方法中将头节点和尾节点都初始化为同一个空节点,但这里却没有对count进行+1处理, // 也就是说count计数不包括这个空的头节点,而take操作读取元素时也是获取head的下一个节点 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }从代码中我们看到,LinkedBlockingQueue并发控制也是基于ReentranLock和Condition来实现的,只不过LinkedBlockingQueue是使用了两个锁。其底层是一个Node对象组成的单向链表。
LinkedBlockingQueue一共有两把锁和两个Condition:
读取操作锁(takeLock)和写入操作锁(putLock),读取操作condition(notEmpty),写入操作condition(notFull)。线程要写入一个元素,需要获取putLock,如果此时队列已满,还需要等待队列不满,即满足notFull这个条件,才能够写入元素,意味着当前写入的线程需要等待 读取操作的线程执行notFull.signal()来唤醒自己。
线程要读取一个元素。需要获取takeLock,如果此时队列是空的,还需要等待队列不为空,即满足notEmpty这个条件,意味着当前读取的线程需要等待 写入操作的线程执行notEmpty.signal()来唤醒自己。
无论是put还是take,都是获取到锁的情况下执行的,很显然单独的put操作是线程安全的,单独的take操作也是线程安全的,但是如果put操作和take操作同时发生,LinkedBlockingQueue要怎么保证线程安全?毕竟put和take可不是同一把锁。
这里LinkedBlockingQueue的设计是非常巧妙的,写入操作(put)只是对单向链表的尾节点(last)进行修改,而读取操作(take)只是对单向链表的头节点(head)进行修改。这样一来就可以保证全局的线程安全。
先来看一下LinkedBlockingQueue写入的方法:put
// 这个方法就是将元素插入到链表的尾部 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; // 创建一个node节点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 写入前必须先获取写入锁 putLock.lockInterruptibly(); try { /* * 如果队列已满,当前线程进入到notFull这个condition队列中等待 */ while (count.get() == capacity) { notFull.await(); } // 将元素加入到队尾 enqueue(node); // 这里的getAndIncrement是原子操作,可以保证线程安全。 // 对count+1,返回count+1前的值 c = count.getAndIncrement(); // 如果c+1<capacity,说明此次入队后,至少还有一个位置是空的, // 这个时候唤醒在notFull这个condition队列中等待的头节点 if (c + 1 < capacity) notFull.signal(); } finally { // 入队成功后释放掉写入锁 putLock.unlock(); } // 如果c为0说明在这个元素入队前,队列是空的, // 可能就有读取操作的线程因为获取不到元素而阻塞, // 这个时候唤醒在notEmpty这个condition队列中等待的头节点 if (c == 0) signalNotEmpty(); } // 我们看到这个入队操作十分简单,但是它是线程安全的, // 因为所有入队操作必先获取到putLock这把独占锁。而出队操作又是修改的head节点 private void enqueue(Node<E> node) { // 将last指向这个元素,并且last.next也指向这个元素 last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; // 要唤醒读取操作的线程,必先获取读锁takeLock takeLock.lock(); try { // 唤醒读线程 notEmpty.signal(); } finally { takeLock.unlock(); } }再来看读取元素的方法:take
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 要读取元素,必先获得读锁takeLock takeLock.lockInterruptibly(); try { // 如果队列为空,当前线程进入到notEmpty这个condition队列中等待 while (count.get() == 0) { notEmpty.await(); } // 出队,其实是取出head的下一个节点 x = dequeue(); // 这里的getAndDecrement是原子操作,可以保证线程安全。 // 对count-1,返回count-1前的值 c = count.getAndDecrement(); // 如果c>1此次出队后,队列中至少还有一个元素可以获取, // 这个时候唤醒在notEmpty这个condition中等待的读线程 if (c > 1) notEmpty.signal(); } finally { // 出队成功后释放读锁 takeLock.unlock(); } // 如果c == capacity,说明在这个元素被取出前。 // 队列是满的,可能就有写操作线程因为队列已满而阻塞, // 那么取出这个元素后,队列就不满了, // 这个时候唤醒在notFull这个condition中等待的头节点(写线程) if (c == capacity) signalNotFull(); return x; } // 这个方法其实就是取出链表中head的下一个元素,并且将head的下一个元素设置为head。 private E dequeue() { // 一开始的时候head是一个空节点; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC /* * 将head的下一个元素设置为新的head之后,也将新head变成空节点 */ head = first; E x = first.item; first.item = null; return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; // 要唤醒写线程,必先持有写锁putLock putLock.lock(); try { // 唤醒写线程 notFull.signal(); } finally { putLock.unlock(); } }到这里BlockingQueue的源码分析就结束了,ArrayBlockingQueue和LinkedBlockingQueue的源码还是非常简单的。