JUC并发工具十一-阻塞队列

    技术2022-07-10  99

    目录

    1 ArrayBlockingQueue使用及原理

    1.1 ArrayBlockingQueue使用

    1.2 核心方法概览

    1.2.1 add和offer

    1.2.2 带有超时时间的offer方法

    1.2.3 put方法

    1.2.4 remove和poll方法

    1.2.5 带有超时的poll方法

    1.2.6 take方法

    2 LinkedBlockingQueue使用及原理

    2.1 LinkedBlockingQueue的使用

    2.2 核心方法概览

    2.2.1 add和offer

    2.2.2 带有超时时间的offer方法

    2.2.3 put方法

    2.2.4 remove和poll方法

    2.2.5 带有超时的poll方法

    2.2.6 take方法

    3 DelayQueue使用及原理

    3.1 LinkedBlockingQueue的使用

    3.2 核心方法概览

    3.2.1 PriorityQueue简介

    3.2.2 add、put和offer

    3.2.3 take方法


    队列的基本使用在前面文章中已经讲过,这里再复习一下 add:将指定的元素插入到此队列中(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。 offer:将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。可以指定超时时间 put:将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。

    remove:若队列为空,抛出NoSuchElementException异常。 poll:若队列为空,返回null。可以指定超时时间 take:若队列为空,发生阻塞,等待有元素。

    1 ArrayBlockingQueue使用及原理

    基于数组的有界阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。 

    1.1 ArrayBlockingQueue使用

    @Test public void testArrayBlockingQueue() throws InterruptedException {     // 这里创建一个容量为5的阻塞队列     BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);     queue.put("a");     queue.put("b");     queue.put("c");     queue.put("d");     queue.put("e");     new Thread(() ->{         try {             // 这里会阻塞三秒             Thread.sleep(3000L);             queue.remove();         } catch (Exception e) {             // TODO Auto-generated catch block             e.printStackTrace();         }     }).start();     // 输出false     System.out.println(queue.offer("f", 1, TimeUnit.SECONDS));     // 输出true     System.out.println(queue.offer("g", 5, TimeUnit.SECONDS));     // 一直阻塞     queue.put("h"); }

    1.2 核心方法概览

    1.2.1 add和offer

    add方法实际上是通过offer实现的,比较简单不做赘述,这里直接看offer方法代码了

    public boolean offer(E e) {     checkNotNull(e);     final ReentrantLock lock = this.lock;     // 加锁     lock.lock();     try {         if (count == items.length)             // 如果队列已满直接返回false             return false;         else {             // 如果不满则入列,返回true             enqueue(e);             return true;         }     } finally {         // 解锁         lock.unlock();     } } private void enqueue(E x) {     // assert lock.getHoldCount() == 1;     // assert items[putIndex] == null;     final Object[] items = this.items;     // putIndex指向队列尾部     items[putIndex] = x;     if (++putIndex == items.length)         putIndex = 0;     count++;     // 唤醒别的等待线程     notEmpty.signal(); }

    1.2.2 带有超时时间的offer方法

    public boolean offer(E e, long timeout, TimeUnit unit)     throws InterruptedException {     checkNotNull(e);     // 计算还有多长时间超时     long nanos = unit.toNanos(timeout);     final ReentrantLock lock = this.lock;     lock.lockInterruptibly();     try {         // 如果队列已满则进入超时等待         while (count == items.length) {             if (nanos <= 0)                 // 如果已超时则返回false                 return false;             nanos = notFull.awaitNanos(nanos);         }         // 入列         enqueue(e);         return true;     } finally {         lock.unlock();     } }

    1.2.3 put方法

    put方法和带超时时间的offer方法差不多,只不过这里等待是一直等待,不是超时等待

    public void put(E e) throws InterruptedException {     checkNotNull(e);     final ReentrantLock lock = this.lock;     lock.lockInterruptibly();     try {         while (count == items.length)             notFull.await();         enqueue(e);     } finally {         lock.unlock();     } }

    1.2.4 remove和poll方法

    remove方法也是通过poll方法实现的,这里只看poll方法

    public E poll() {     final ReentrantLock lock = this.lock;     // 加锁     lock.lock();     try {         // 如果有元素则弹出,如果没有则返回空         return (count == 0) ? null : dequeue();     } finally {         lock.unlock();     } } private E dequeue() {     final Object[] items = this.items;     @SuppressWarnings("unchecked")     E x = (E) items[takeIndex];     // takeindex指向队列头部     items[takeIndex] = null;     if (++takeIndex == items.length)         takeIndex = 0;     count--;     if (itrs != null)         itrs.elementDequeued();     // 弹出元素后唤醒别的等待线程     notFull.signal();     return x; }

    1.2.5 带有超时的poll方法

    // 和带有超时的offer方法类似,这里不做过多讲解

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {     // 计算阻塞倒计时     long nanos = unit.toNanos(timeout);     final ReentrantLock lock = this.lock;     lock.lockInterruptibly();     try {         // 如果队列为空进入超时等待         while (count == 0) {             if (nanos <= 0)                 return null;             nanos = notEmpty.awaitNanos(nanos);         }         return dequeue();     } finally {         lock.unlock();     } }

    1.2.6 take方法

    take方法和带超时时间的poll方法差不多,只不过这里等待是一直等待,不是超时等待

    public E take() throws InterruptedException {     final ReentrantLock lock = this.lock;     lock.lockInterruptibly();     try {         while (count == 0)             notEmpty.await();         return dequeue();     } finally {         lock.unlock();     } }

    2 LinkedBlockingQueue使用及原理

    基于链表的有界阻塞队列,内部维护者一个数据缓冲队列,该队列由一个链表组成, 采用添加移除两个锁高效的处理并发数据,从而实现添加元素和移除元素并行。

    2.1 LinkedBlockingQueue的使用

    使用比较简单,只需要将引用的对象改成LinkedBlockingQueue创建的对象就行

    @Test public void testLinkedBlockingQueue() throws InterruptedException {     // 这里创建一个容量为5的阻塞队列     BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);     queue.put("a");     queue.put("b");     queue.put("c");     queue.put("d");     queue.put("e");     new Thread(() ->{         try {             // 这里会阻塞三秒             Thread.sleep(3000L);             queue.remove();         } catch (Exception e) {             // TODO Auto-generated catch block             e.printStackTrace();         }     }).start();     // 输出false     System.out.println(queue.offer("f", 1, TimeUnit.SECONDS));     // 输出true     System.out.println(queue.offer("g", 5, TimeUnit.SECONDS));     // 一直阻塞     queue.put("h"); }

    2.2 核心方法概览

    2.2.1 add和offer

    add方法实际上是通过offer实现的,比较简单不做赘述,这里直接看offer方法代码了

    public boolean offer(E e) {     if (e == null) throw new NullPointerException();     final AtomicInteger count = this.count;     // 如果容量已满直接返回false     if (count.get() == capacity)         return false;     int c = -1;     Node<E> node = new Node<E>(e);     final ReentrantLock putLock = this.putLock;     // 加锁     putLock.lock();     try {         // 如果容量不满则元素入列         if (count.get() < capacity) {             enqueue(node);             c = count.getAndIncrement();             if (c + 1 < capacity)                 // 如果队列未满则唤醒添加阻塞的线程                 notFull.signal();         }     } finally {         // 释放锁         putLock.unlock();     }     if (c == 0)         // 如果原来容量为0则唤醒移除阻塞的线程         signalNotEmpty();     return c >= 0; } // 入列方法比较简单,把元素添加到末尾即可 private void enqueue(Node<E> node) {     last = last.next = node; }

    2.2.2 带有超时时间的offer方法

    public boolean offer(E e, long timeout, TimeUnit unit)     throws InterruptedException {     if (e == null) throw new NullPointerException();     // 计算还需要等待的时间     long nanos = unit.toNanos(timeout);     int c = -1;     final ReentrantLock putLock = this.putLock;     final AtomicInteger count = this.count;     putLock.lockInterruptibly();     try {         // 阻塞添加元素,直至超时         while (count.get() == capacity) {             if (nanos <= 0)                 return false;             nanos = notFull.awaitNanos(nanos);         }         enqueue(new Node<E>(e));         c = count.getAndIncrement();         if (c + 1 < capacity)             // 如果队列未满则唤醒添加阻塞的线程             notFull.signal();     } finally {         // 释放锁         putLock.unlock();     }     if (c == 0)         // 如果原来容量为0则唤醒移除阻塞的线程         signalNotEmpty();     return true; }

    2.2.3 put方法

    put方法和带超时时间的offer方法差不多,只不过这里等待是一直等待,不是超时等待

    public void put(E e) throws InterruptedException {     if (e == null) throw new NullPointerException();     int c = -1;     Node<E> node = new Node<E>(e);     final ReentrantLock putLock = this.putLock;     final AtomicInteger count = this.count;     putLock.lockInterruptibly();     try {         while (count.get() == capacity) {             notFull.await();         }         enqueue(node);         c = count.getAndIncrement();         if (c + 1 < capacity)             notFull.signal();     } finally {         putLock.unlock();     }     if (c == 0)         signalNotEmpty(); }

    2.2.4 remove和poll方法

    remove方法也是通过poll方法实现的,这里只看poll方法

    public E poll() {     final AtomicInteger count = this.count;     if (count.get() == 0)         return null;     E x = null;     int c = -1;     final ReentrantLock takeLock = this.takeLock;     // 加take锁     takeLock.lock();     try {         if (count.get() > 0) {             x = dequeue();             c = count.getAndDecrement();             if (c > 1)                 // 如果还有元素则唤醒移除阻塞的线程                 notEmpty.signal();         }     } finally {         // 释放take锁         takeLock.unlock();     }     if (c == capacity)         // 如果原来容量已满则唤醒添加阻塞的线程         signalNotFull();     return x; } // 出列方法比较简单,把元素first元素移除即可 private E dequeue() {     Node<E> h = head;     Node<E> first = h.next;     h.next = h; // help GC     head = first;     E x = first.item;     first.item = null;     return x; }

    2.2.5 带有超时的poll方法

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {     E x = null;     int c = -1;     // 计算还需要等待的时间     long nanos = unit.toNanos(timeout);     final AtomicInteger count = this.count;     final ReentrantLock takeLock = this.takeLock;     takeLock.lockInterruptibly();     try {         // 市场是移除元素,直至成功或超时         while (count.get() == 0) {             if (nanos <= 0)                 return null;             nanos = notEmpty.awaitNanos(nanos);         }         // 出列         x = dequeue();         c = count.getAndDecrement();         if (c > 1)             // 如果还有元素则唤醒移除阻塞的线程             notEmpty.signal();     } finally {         // 释放take锁         takeLock.unlock();     }     if (c == capacity)         // 如果原来容量已满则唤醒添加阻塞的线程         signalNotFull();     return x; }

    2.2.6 take方法

    take方法和带超时时间的poll方法差不多,只不过这里等待是一直等待,不是超时等待

    public E take() throws InterruptedException {     E x;     int c = -1;     final AtomicInteger count = this.count;     final ReentrantLock takeLock = this.takeLock;     takeLock.lockInterruptibly();     try {         while (count.get() == 0) {             notEmpty.await();         }         x = dequeue();         c = count.getAndDecrement();         if (c > 1)             notEmpty.signal();     } finally {         takeLock.unlock();     }     if (c == capacity)         signalNotFull();     return x; }

    3 DelayQueue使用及原理

    带有延迟时间的无界队列,其中元素只有当其指定的延迟时间到了才能够从队列中获取该元素。该队列应用场景很多,比如对缓存超时的对象移除、任务超时处理、空闲连接的关闭等等

    3.1 LinkedBlockingQueue的使用

    这里我们用一个比较有趣的案例,日常生活中的网吧上网用延迟队列来实现

    /**  * 定义一个网民类  */ class InternetMan implements Delayed {     private String name;     //身份证     private String id;     //截止时间     private Long endTime;     //定义时间工具类     private TimeUnit timeUnit = TimeUnit.SECONDS;     public InternetMan(String name, String id, Long endTime) {         super();         this.name = name;         this.id = id;         this.endTime = endTime;     }     /**      * 相互批较排序用      */     @Override     public int compareTo(Delayed man) {         return this.getDelay(this.timeUnit) - man.getDelay(this.timeUnit) > 0 ? 1:0;     }     /**      * 用来判断是否到了截止时间      */     @Override     public long getDelay(TimeUnit unit) {         return endTime - System.currentTimeMillis();     }     @Override     public String toString() {         return "InternetMan [name=" + name + ", id=" + id + "]";     } } /**  * 定义一个网吧类  */ class InternetBar implements Runnable {     private DelayQueue<InternetMan> queue = new DelayQueue<>();     public boolean yinye =true;     public void up(String name, String id, int money){         String time = DateUtil.datetimeToString(new Date());         InternetMan man = new InternetMan(name, id, 1000 * money + System.currentTimeMillis());         System.out.println("time:" + time + "," + man + "交钱" + money + "块,开始上机...");         this.queue.add(man);     }     public void down(InternetMan man){         String time = DateUtil.datetimeToString(new Date());         System.out.println("time:" + time + "," + man + "时间到下机...");     }     @Override     public void run() {         while(yinye){             try {                 // 每秒钟检查一次                 TimeUnit.SECONDS.sleep(1L);                 InternetMan man = queue.take();                 down(man);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }     } }

    测试代码:

    @Test public void testDelayQueue() throws InterruptedException {     try{         System.out.println("网吧开始营业");         InternetBar bar = new InternetBar();         new Thread(bar).start();         bar.up("zhangsan", "1", 1);         bar.up("lisi", "2", 10);         bar.up("wanger", "3", 5);     }     catch(Exception e){         e.printStackTrace();     }     TimeUnit.MINUTES.sleep(1); }

    输出: 网吧开始营业 time:2020-06-30 21:18:18,InternetMan [name=zhangsan, id=1]交钱1块,开始上机... time:2020-06-30 21:18:18,InternetMan [name=lisi, id=2]交钱10块,开始上机... time:2020-06-30 21:18:18,InternetMan [name=wanger, id=3]交钱5块,开始上机... time:2020-06-30 21:18:19,InternetMan [name=zhangsan, id=1]时间到下机... time:2020-06-30 21:18:23,InternetMan [name=wanger, id=3]时间到下机... time:2020-06-30 21:18:28,InternetMan [name=lisi, id=2]时间到下机...

    3.2 核心方法概览

    3.2.1 PriorityQueue简介

    DelayQueue内部维护了一个PriorityQueue,该队列对实现Comparable的元素进行排序。排序规则越小,越先取出来。源码也比较简单,有兴趣的同学可以阅读下源码

    3.2.2 add、put和offer

    add和put方法都是调的offer方法

    public boolean offer(E e) {     final ReentrantLock lock = this.lock;     // 加锁     lock.lock();     try {         // 添加元素         q.offer(e);         if (q.peek() == e) {             leader = null;             // 唤醒取元素的线程             available.signal();         }         return true;     } finally {         lock.unlock();     } }

    3.2.3 take方法

    public E take() throws InterruptedException {     final ReentrantLock lock = this.lock;     // 加锁     lock.lockInterruptibly();     try {         for (;;) {             // 获取第一个元素,如果为空则阻塞等待             E first = q.peek();             if (first == null)                 available.await();             else {                 // 获取取出元素的等待时间                 long delay = first.getDelay(NANOSECONDS);                 if (delay <= 0)                     // 如果等待时间<=0则直接取出                     return q.poll();                 first = null;                 if (leader != null)                     // 如果当前线程不为空则等待                     available.await();                 else {                     // 如果当前线程为空则进行超时等待                     Thread thisThread = Thread.currentThread();                     leader = thisThread;                     try {                         available.awaitNanos(delay);                     } finally {                         if (leader == thisThread)                             leader = null;                     }                 }             }         }     } finally {         if (leader == null && q.peek() != null)             // 如果当前没有占用线程并且有元素则唤醒取元素的线程             available.signal();         // 释放锁         lock.unlock();     } }

    别的方法都比较简单,这里不做太多介绍

    常用的阻塞队列还有PriorityBlockingQueue、SynchronousQueue等,后续再更新  

    Processed: 0.011, SQL: 9