JUC并发包之阻塞队列LinkedBlockingQueue、LinkedBlockingDeque和ArrayBlockingQueue

    技术2022-07-14  96

    JUC并发包之阻塞队列

    特点阻塞队列的操作方法源码分析LinkedBlockingQueue属性put操作take操作 LinkedBlockingDequeofferFirst操作 ArrayBlockingQueue构造方法add操作offer操作put操作take操作

    特点

    1、LinkedBlockingQueue: 单向链表 可有界,无界时,最大容量是Integer.MAX_VALUE(默认构造器,带参构造器可以传入capacity) FIFO 两把 ReentrantLock锁,入队列和出队列锁,两个condition用于阻塞和通知 2、LinkedBlockingDeque: 双向链表 可有界,无界,同LinkedBlockingQueue 一把 ReentrantLock 锁,两个condition 3、ArrayBlockingQueue 数组 FIFO 构造方法可以支持公平和非公平 一把 ReentrantLock 锁,两个condition

    阻塞队列的操作方法

    1、入队列 add(e):调用父类AbstractQueue的add(e)模板方法,然后调用offer(e)方法。如果队列满了,抛出异常 offer(e):如果队列满了,返回false,插入成功则返回true put(e):如果队列满了,阻塞 offer(e,long,TimeUnit):如果队列满了,阻塞指定时间,如果超时返回false,线程退出 2、出队列 remove():调用父类AbstractQueue的remove()模板方法,然后调用poll()方法。从队列头取数据,如果队列为空,抛出异常 poll():当队列中存在元素,则从队列头取数据(dequeue()), 如果队列为空,则直接返回null take():当队列中存在元素,则从队列头取数据(dequeue()), 如果队列为空,阻塞 poll(long,TimeUnit):当队列中存在元素,则从队列头取数据(dequeue()), 如果队列为空,阻塞指定时间,超时返回null,线程退出

    源码分析

    LinkedBlockingQueue

    属性

    /** * 单向链表。 * FIFO *最大容量是Integer.MAX_VALUE. */ public class LinkedBlockingQueueAnalysis<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /* * putLock * takeLock * remove(e)操作会需要同时获取两把锁,注意是remove指定元素,不是父类的remove() */ //的node节点 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } //容量,构造器入参,默认构造器取Integer.MAX_VALUE private final int capacity; //用AtomicInteger 来记录数量。 private final AtomicInteger count = new AtomicInteger(); //重点,队列的头部存储null,在阅读出队列的方法源码时注意。 //疑问1:这样设计的原因暂时不理解,麻烦理解的同学留个言 //head节点 head.item == null transient Node<E> head; //last节点,last.next == null private transient Node<E> last; //take锁 private final ReentrantLock takeLock = new ReentrantLock(); //等待take的节点序列。 private final Condition notEmpty = takeLock.newCondition(); //put的lock。 private final ReentrantLock putLock = new ReentrantLock(); //等待puts的队列。 private final Condition notFull = putLock.newCondition(); ... }

    和LinkedBlockingDeque的区别之一就是,LinkedBlockingQueue采用了两把锁来对队列进行操作,也就是队尾添加的时候, 队头仍然可以删除等操作。接下来看典型的操作。

    put操作

    public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); //e不能为null int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //获取put锁 final AtomicInteger count = this.count; //获取count putLock.lockInterruptibly();//和 lock 的区别是,这个方法优先允许在等待时由其他线程调用等待线程的 interrupt 方法来中断等待直接返回。 //而 lock方法是尝试获得锁成功后才响应中断 try { while (count.get() == capacity) { //如果满了,那么就需要使用notFull阻塞 notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) //count+1后仍然有空间,唤醒notFull notFull.signal(); } finally { putLock.unlock(); //释放锁 } //疑问2:一般在这里应该不会出现c==0,而且c为0,为啥要通知notEmpty? if (c == 0) //当c为0时候,唤醒notEmpty,允许取数据。 signalNotEmpty(); } private void enqueue(Node<E> node) { //入队操作。 last = last.next = node; //队尾进 } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); //加take锁 try { notEmpty.signal(); //唤醒notEmpty } finally { takeLock.unlock(); } }

    take操作

    public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); //加锁 try { while (count.get() == 0) { //如果没有元素,那么就阻塞性等待 notEmpty.await(); } x = dequeue(); //一定可以拿到。 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); //报告还有元素,唤醒队列 } finally { takeLock.unlock(); //释放锁锁 } if (c == capacity) //同疑问2 signalNotFull(); return x; } private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC 指向自己,帮助gc回收 head = first; E x = first.item; //从队头出。 first.item = null; //将head.item设为null。注意,队列头部存的是null return x; }

    对于LinkedBlockingQueue来说,有两个ReentrantLock分别控制队头和队尾,这样就可以使得添加操作分开来做,一般的操作是获取一把锁就可以,但有些操作例如remove指定元素(注:父类的模板方法remove()是从头部移除,获取takeLock即可),则需要同时获取两把锁:

    public boolean remove(Object o) { if (o == null) return false; fullyLock(); //获取锁 try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //依次循环遍历 if (o.equals(p.item)) { //找到了 unlink(p, trail); //解除链接 return true; } } return false; //没找到,或者解除失败 } finally { fullyUnlock(); } }

    除了上述的remove方法外,在Iterator的next方法,remove方法以及LBQSpliterator分割迭代器中也是需要加全锁进行操作的。

    LinkedBlockingDeque

    /** * 双向链表。 * 最大值是Integer.MAX_VALUE * 利用lock锁去控制并发访问,利用condition去控制阻塞 */ public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { //双向联结的节点。 static final class Node<E> { E item; //泛型的item变量 // 前一个节点 Node<E> prev; //next后一个节点 Node<E> next; Node(E x) { item = x; } } //头节点 transient Node<E> first; //尾节点。 transient Node<E> last; //count,表示数值。 private transient int count; //容量 private final int capacity; //实现控制访问的锁 final ReentrantLock lock = new ReentrantLock(); //take的Condition private final Condition notEmpty = lock.newCondition(); //put的Condition private final Condition notFull = lock.newCondition(); ... }

    offerFirst操作

    offerFirst就是在队头添加一个元素:

    public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } } private boolean linkFirst(Node<E> node) { if (count >= capacity) //容量满了 return false; Node<E> f = first; //在队头添加 node.next = f; first = node; if (last == null) //第一个节点 last = node; else f.prev = node; ++count; //count自增 notEmpty.signal(); //说明不为null。唤醒等待队列 return true; }

    ArrayBlockingQueue

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* *(和 LinkedBlockingDeque 一样) * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; ... }

    构造方法

    1、ArrayBlockingQueue的构造方法必须传参capacity,所以是有界的。 2、ArrayBlockingQueue支持公平锁(前面LinkedBlockingQueue和LinkedBlockingDeque都不支持)

    public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

    add操作

    以add方法作为入口,在add方法中会调用父类的add方 法,也就是 AbstractQueue.如果看源码看得比较多的话, 一般这种写法都是调用父类的模版方法来解决通用性问题

    AbstractQueue.add //从父类的add方法可以看到,判断队列是否满了,如果队列满了,则直接抛出一个异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

    offer操作

    add方法最终还是调用offer方法来添加数据

    public boolean offer(E e) { //1. 判断添加的数据是否为空 checkNotNull(e); final ReentrantLock lock = this.lock; //2. 加锁 lock.lock(); try { //3.判断队列是否满了,满了返回false if (count == items.length) return false; else { //4.不满,调用enqueue方法 enqueue(e); return true; } } finally { lock.unlock(); } }

    enqueue操作:

    private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length)//下一次的putIndex越界时,从头开始 putIndex = 0; count++; notEmpty.signal();//通知消费者 }

    put操作

    类似add操作,只是满了会阻塞而不是抛出异常

    public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//队列满了,notFull生产者阻塞 enqueue(e);//enqueue方法内部会唤醒notEmpty消费者 } finally { lock.unlock(); } }

    take操作

    对应于put操作,队列为空时阻塞

    public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//队列为空时,notEmpty消费者阻塞 return dequeue(); } finally { lock.unlock(); } }

    dequeue操作,从takeIndex取数据

    private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
    Processed: 0.021, SQL: 9