1、LinkedBlockingQueue: 单向链表 可有界,无界时,最大容量是Integer.MAX_VALUE(默认构造器,带参构造器可以传入capacity) FIFO 两把 ReentrantLock锁,入队列和出队列锁,两个condition用于阻塞和通知 2、LinkedBlockingDeque: 双向链表 可有界,无界,同LinkedBlockingQueue 一把 ReentrantLock 锁,两个condition 3、ArrayBlockingQueue 数组 FIFO 构造方法可以支持公平和非公平 一把 ReentrantLock 锁,两个condition
1、入队列 add(e):调用父类AbstractQueue的add(e)模板方法,然后调用offer(e)方法。如果队列满了,抛出异常 offer(e):如果队列满了,返回false,插入成功则返回true put(e):如果队列满了,阻塞 offer(e,long,TimeUnit):如果队列满了,阻塞指定时间,如果超时返回false,线程退出 2、出队列 remove():调用父类AbstractQueue的remove()模板方法,然后调用poll()方法。从队列头取数据,如果队列为空,抛出异常 poll():当队列中存在元素,则从队列头取数据(dequeue()), 如果队列为空,则直接返回null take():当队列中存在元素,则从队列头取数据(dequeue()), 如果队列为空,阻塞 poll(long,TimeUnit):当队列中存在元素,则从队列头取数据(dequeue()), 如果队列为空,阻塞指定时间,超时返回null,线程退出
和LinkedBlockingDeque的区别之一就是,LinkedBlockingQueue采用了两把锁来对队列进行操作,也就是队尾添加的时候, 队头仍然可以删除等操作。接下来看典型的操作。
对于LinkedBlockingQueue来说,有两个ReentrantLock分别控制队头和队尾,这样就可以使得添加操作分开来做,一般的操作是获取一把锁就可以,但有些操作例如remove指定元素(注:父类的模板方法remove()是从头部移除,获取takeLock即可),则需要同时获取两把锁:
public boolean remove(Object o) { if (o == null) return false; fullyLock(); //获取锁 try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //依次循环遍历 if (o.equals(p.item)) { //找到了 unlink(p, trail); //解除链接 return true; } } return false; //没找到,或者解除失败 } finally { fullyUnlock(); } }除了上述的remove方法外,在Iterator的next方法,remove方法以及LBQSpliterator分割迭代器中也是需要加全锁进行操作的。
offerFirst就是在队头添加一个元素:
public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } } private boolean linkFirst(Node<E> node) { if (count >= capacity) //容量满了 return false; Node<E> f = first; //在队头添加 node.next = f; first = node; if (last == null) //第一个节点 last = node; else f.prev = node; ++count; //count自增 notEmpty.signal(); //说明不为null。唤醒等待队列 return true; }1、ArrayBlockingQueue的构造方法必须传参capacity,所以是有界的。 2、ArrayBlockingQueue支持公平锁(前面LinkedBlockingQueue和LinkedBlockingDeque都不支持)
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }以add方法作为入口,在add方法中会调用父类的add方 法,也就是 AbstractQueue.如果看源码看得比较多的话, 一般这种写法都是调用父类的模版方法来解决通用性问题
AbstractQueue.add //从父类的add方法可以看到,判断队列是否满了,如果队列满了,则直接抛出一个异常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }add方法最终还是调用offer方法来添加数据
public boolean offer(E e) { //1. 判断添加的数据是否为空 checkNotNull(e); final ReentrantLock lock = this.lock; //2. 加锁 lock.lock(); try { //3.判断队列是否满了,满了返回false if (count == items.length) return false; else { //4.不满,调用enqueue方法 enqueue(e); return true; } } finally { lock.unlock(); } }enqueue操作:
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length)//下一次的putIndex越界时,从头开始 putIndex = 0; count++; notEmpty.signal();//通知消费者 }类似add操作,只是满了会阻塞而不是抛出异常
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//队列满了,notFull生产者阻塞 enqueue(e);//enqueue方法内部会唤醒notEmpty消费者 } finally { lock.unlock(); } }对应于put操作,队列为空时阻塞
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//队列为空时,notEmpty消费者阻塞 return dequeue(); } finally { lock.unlock(); } }dequeue操作,从takeIndex取数据
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }