目录
1 ArrayBlockingQueue使用及原理
1.1 ArrayBlockingQueue使用
1.2 核心方法概览
1.2.1 add和offer
1.2.2 带有超时时间的offer方法
1.2.3 put方法
1.2.4 remove和poll方法
1.2.5 带有超时的poll方法
1.2.6 take方法
2 LinkedBlockingQueue使用及原理
2.1 LinkedBlockingQueue的使用
2.2 核心方法概览
2.2.1 add和offer
2.2.2 带有超时时间的offer方法
2.2.3 put方法
2.2.4 remove和poll方法
2.2.5 带有超时的poll方法
2.2.6 take方法
3 DelayQueue使用及原理
3.1 LinkedBlockingQueue的使用
3.2 核心方法概览
3.2.1 PriorityQueue简介
3.2.2 add、put和offer
3.2.3 take方法
队列的基本使用在前面文章中已经讲过,这里再复习一下 add:将指定的元素插入到此队列中(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。 offer:将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。可以指定超时时间 put:将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
remove:若队列为空,抛出NoSuchElementException异常。 poll:若队列为空,返回null。可以指定超时时间 take:若队列为空,发生阻塞,等待有元素。
基于数组的有界阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
add方法实际上是通过offer实现的,比较简单不做赘述,这里直接看offer方法代码了
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { if (count == items.length) // 如果队列已满直接返回false return false; else { // 如果不满则入列,返回true enqueue(e); return true; } } finally { // 解锁 lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // putIndex指向队列尾部 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 唤醒别的等待线程 notEmpty.signal(); }put方法和带超时时间的offer方法差不多,只不过这里等待是一直等待,不是超时等待
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }remove方法也是通过poll方法实现的,这里只看poll方法
public E poll() { final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { // 如果有元素则弹出,如果没有则返回空 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // takeindex指向队列头部 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 弹出元素后唤醒别的等待线程 notFull.signal(); return x; }// 和带有超时的offer方法类似,这里不做过多讲解
public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 计算阻塞倒计时 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列为空进入超时等待 while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }take方法和带超时时间的poll方法差不多,只不过这里等待是一直等待,不是超时等待
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }基于链表的有界阻塞队列,内部维护者一个数据缓冲队列,该队列由一个链表组成, 采用添加移除两个锁高效的处理并发数据,从而实现添加元素和移除元素并行。
使用比较简单,只需要将引用的对象改成LinkedBlockingQueue创建的对象就行
@Test public void testLinkedBlockingQueue() throws InterruptedException { // 这里创建一个容量为5的阻塞队列 BlockingQueue<String> queue = new LinkedBlockingQueue<>(5); queue.put("a"); queue.put("b"); queue.put("c"); queue.put("d"); queue.put("e"); new Thread(() ->{ try { // 这里会阻塞三秒 Thread.sleep(3000L); queue.remove(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }).start(); // 输出false System.out.println(queue.offer("f", 1, TimeUnit.SECONDS)); // 输出true System.out.println(queue.offer("g", 5, TimeUnit.SECONDS)); // 一直阻塞 queue.put("h"); }add方法实际上是通过offer实现的,比较简单不做赘述,这里直接看offer方法代码了
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // 如果容量已满直接返回false if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; // 加锁 putLock.lock(); try { // 如果容量不满则元素入列 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) // 如果队列未满则唤醒添加阻塞的线程 notFull.signal(); } } finally { // 释放锁 putLock.unlock(); } if (c == 0) // 如果原来容量为0则唤醒移除阻塞的线程 signalNotEmpty(); return c >= 0; } // 入列方法比较简单,把元素添加到末尾即可 private void enqueue(Node<E> node) { last = last.next = node; }put方法和带超时时间的offer方法差不多,只不过这里等待是一直等待,不是超时等待
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }remove方法也是通过poll方法实现的,这里只看poll方法
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; // 加take锁 takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) // 如果还有元素则唤醒移除阻塞的线程 notEmpty.signal(); } } finally { // 释放take锁 takeLock.unlock(); } if (c == capacity) // 如果原来容量已满则唤醒添加阻塞的线程 signalNotFull(); return x; } // 出列方法比较简单,把元素first元素移除即可 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }take方法和带超时时间的poll方法差不多,只不过这里等待是一直等待,不是超时等待
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }带有延迟时间的无界队列,其中元素只有当其指定的延迟时间到了才能够从队列中获取该元素。该队列应用场景很多,比如对缓存超时的对象移除、任务超时处理、空闲连接的关闭等等
这里我们用一个比较有趣的案例,日常生活中的网吧上网用延迟队列来实现
/** * 定义一个网民类 */ class InternetMan implements Delayed { private String name; //身份证 private String id; //截止时间 private Long endTime; //定义时间工具类 private TimeUnit timeUnit = TimeUnit.SECONDS; public InternetMan(String name, String id, Long endTime) { super(); this.name = name; this.id = id; this.endTime = endTime; } /** * 相互批较排序用 */ @Override public int compareTo(Delayed man) { return this.getDelay(this.timeUnit) - man.getDelay(this.timeUnit) > 0 ? 1:0; } /** * 用来判断是否到了截止时间 */ @Override public long getDelay(TimeUnit unit) { return endTime - System.currentTimeMillis(); } @Override public String toString() { return "InternetMan [name=" + name + ", id=" + id + "]"; } } /** * 定义一个网吧类 */ class InternetBar implements Runnable { private DelayQueue<InternetMan> queue = new DelayQueue<>(); public boolean yinye =true; public void up(String name, String id, int money){ String time = DateUtil.datetimeToString(new Date()); InternetMan man = new InternetMan(name, id, 1000 * money + System.currentTimeMillis()); System.out.println("time:" + time + "," + man + "交钱" + money + "块,开始上机..."); this.queue.add(man); } public void down(InternetMan man){ String time = DateUtil.datetimeToString(new Date()); System.out.println("time:" + time + "," + man + "时间到下机..."); } @Override public void run() { while(yinye){ try { // 每秒钟检查一次 TimeUnit.SECONDS.sleep(1L); InternetMan man = queue.take(); down(man); } catch (InterruptedException e) { e.printStackTrace(); } } } }测试代码:
@Test public void testDelayQueue() throws InterruptedException { try{ System.out.println("网吧开始营业"); InternetBar bar = new InternetBar(); new Thread(bar).start(); bar.up("zhangsan", "1", 1); bar.up("lisi", "2", 10); bar.up("wanger", "3", 5); } catch(Exception e){ e.printStackTrace(); } TimeUnit.MINUTES.sleep(1); }输出: 网吧开始营业 time:2020-06-30 21:18:18,InternetMan [name=zhangsan, id=1]交钱1块,开始上机... time:2020-06-30 21:18:18,InternetMan [name=lisi, id=2]交钱10块,开始上机... time:2020-06-30 21:18:18,InternetMan [name=wanger, id=3]交钱5块,开始上机... time:2020-06-30 21:18:19,InternetMan [name=zhangsan, id=1]时间到下机... time:2020-06-30 21:18:23,InternetMan [name=wanger, id=3]时间到下机... time:2020-06-30 21:18:28,InternetMan [name=lisi, id=2]时间到下机...
DelayQueue内部维护了一个PriorityQueue,该队列对实现Comparable的元素进行排序。排序规则越小,越先取出来。源码也比较简单,有兴趣的同学可以阅读下源码
add和put方法都是调的offer方法
public boolean offer(E e) { final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { // 添加元素 q.offer(e); if (q.peek() == e) { leader = null; // 唤醒取元素的线程 available.signal(); } return true; } finally { lock.unlock(); } }别的方法都比较简单,这里不做太多介绍
常用的阻塞队列还有PriorityBlockingQueue、SynchronousQueue等,后续再更新