JUC集合类之阻塞队列

    技术2022-07-11  89

    这里写目录标题

    概念架构BlockingQueue的七个实现类BlockingQueue的API 阻塞队列的应用生产者消费者线程池

    概念

    当阻塞队列为空时,获取(take)操作是阻塞的;当阻塞队列为满时,添加(put)操作是阻塞的。

    阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。许多消息中间件底层(就是阻塞队列生产消费)就是用它们实现的,还有线程池

    架构

    我们用diagrams查看一下

    BlockingQueue的七个实现类

    主要就是加粗的三个

    类名作用ArrayBlockingQueue由数组构成的有界阻塞队列LinkedBlockingQueue由链表构成的有界阻塞队列,但有个巨坑,其边界也就是其默认大小是Integer.MAX_VALUE,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor有体现)PriorityBlockingQueue支持优先级排序的无界阻塞队列DelayQueue支持优先级的延迟无界阻塞队列SynchronousQueue不存储元素的阻塞队列,单个元素的阻塞队列,也就是队列只有一个元素,元素不移除,不会进来新的元素LinkedTransferQueue由链表构成的无界阻塞队列LinkedBlockingDeque由链表构成的双向阻塞队列

    SynchronousQueue 队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个

    import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue=new SynchronousQueue<String>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName()+" put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName()+" put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"AAA").start(); new Thread(()->{ try { try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); try{ TimeUnit.SECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" take"+blockingQueue.take()); } catch (Exception e) { e.printStackTrace(); } },"BBB").start(); } } AAA put 1 BBB take 1 AAA put 2 BBB take 2 AAA put 3 BBB take3

    BlockingQueue的API

    方法类型抛出异常返回布尔阻塞超时插入add(E e)offer(E e)put(E e)offer(E e,Time,TimeUnit)取出remove()poll()take()poll(Time,TimeUnit)队首element()peek()无无

    有4种类型的api,比如队列已经满了

    add会直接抛出异常offer会返回falseput让你陷入阻塞offer(e,t,t)阻塞有一个超时时间,超时后会返回false,开发中一般用这个

    阻塞队列的应用

    生产者消费者

    import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ProdConsBlockQueueDemo { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 生产线程启动"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } }, "prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 消费线程启动"); try { TimeUnit.SECONDS.sleep(1);myResource.myCons(); } catch (Exception e) { e.printStackTrace(); } }, "cons").start(); try { TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } System.out.println("5秒钟后,叫停"); myResource.stop(); } } class MyResource { private volatile boolean FLAG = true; //默认开启,进行生产+消费 private AtomicInteger atomicInteger = new AtomicInteger(); //架构级别的code一定用接口,他的实现类都可以适配 private BlockingQueue<String> blockingQueue; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } public void myProd() throws Exception { String data = null; boolean retValue; while (FLAG) { data = atomicInteger.incrementAndGet() + "";//++i retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if (retValue) { System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " FLAG==false,停止生产"); } public void myCons() throws Exception { String res; while (FLAG) { res = blockingQueue.poll(2L, TimeUnit.SECONDS); if (null == res || res.equalsIgnoreCase("")) { FLAG = false; System.out.println(Thread.currentThread().getName() + " 超过2秒钟没有消费,退出消费"); return; } System.out.println(Thread.currentThread().getName() + " 消费队列" + res + "成功"); } } public void stop() { this.FLAG = false; } } prod 生产线程启动 cons 消费线程启动 prod 插入队列1成功 prod 插入队列2成功 cons 消费队列1成功 cons 消费队列2成功 prod 插入队列3成功 cons 消费队列3成功 prod 插入队列4成功 cons 消费队列4成功 prod 插入队列5成功 cons 消费队列5成功 5秒钟后,叫停 prod FLAG==false,停止生产 cons 超过2秒钟没有消费,退出消费

    线程池

    线程池有讲这个

    Processed: 0.011, SQL: 9