阻塞队列BlockingQueue

    技术2026-02-21  12

     

    特点:

    支持阻塞插入:当队列满的时候,会阻塞元素的插入,知道队列不满

    支持阻塞移除:当队列为空时,获取元素的线程会等待队列变为非空

    使用场景:

    阻塞队列常用与生产者和消费者的场景,生产者往队列中添加元素,消费者从队列中获取元素

    常用方法和处理方式

    方法、处理方式队列满抛出异常返回特殊值阻塞超时退出插入add(e)offer(e)put(e)offer(e,time,unit)移除remove()poll()take()pol(time,unit)检查方法element()peek()不可用不可用

    抛出异常:队列满添加元素抛出IllegalStateException;队列空从队列获取元素抛出NoSushElementException

    返回特殊值:插入成功返回true,否则false;获取元素没有返回null

    阻塞:往满阻塞队列中添加元素,阻塞生产者,知道队列有空位或者响应中断退出;往空队列获取take元素,队列阻塞消费者,知道队列不空或响应中断

    超时退出:阻塞时,超过等待时间还不能添加或者获取元素,线程就会退出

    队列有界和无界的定义:

    有限队列就是长度有限,满了以后生产者就会阻塞

    无界队列因为李曼能够放武术的元素,所以不会因为队列长度限制被阻塞,空间限制主要来自于系统资源的限制,导致内存越来越大,直接就被JVM给干掉了

    无界队列的阻塞:当无界队列为空时,消费者只能够阻塞了

    常用的阻塞队列:

     

    ArrayBlokngQueue数组+有界阻塞队列LinkedBlockingQueue链表+有界阻塞队列PrioritBlockingQueue支持优先级+无界阻塞队列DelayQueue优先级+无界阻塞队列SynchronousQueue不存储元素+阻塞队列LinkedtransferQueue链表+无界阻塞队列LinkedBlockingDeque链表+双向阻塞队列

    ArrayBlockingQueue

    是一个输入实现的有界阻塞队列,先进先出。可通过构造参数设置线程是公平还是非公平访问队列

    public ArrayBlockingQueue(int capacity, boolean fair),capacity是容量,fair是是否公平竞争,默认非公平

    LinkedBlockingQueue

    链表实现的有界阻塞队列,默认最大长度是Integer.MAX_VALUE

    Array实现和Linked实现、有界阻塞队列的区别

    1、锁不同:ArrayBlockingQueue队列中生产和消费使用同一个锁,LinkedBlockingQueue,生产者和消费者使用不同的锁

    2、生产和消费时操作不同:ArrayBlockingQueue在生产和消费的时候,是直接 将枚举对象插入或移除;LinkedBlockingQueue在生产和消费的时候,需要把枚举兑现个转换为Node进行插入和删除,会影响性能

    3、队列初始化方式不同:ArrayBlockingQueue必须制定对队列大小,否则报错,LinkedBlockingQueue可以不指定大小,默认是Integer.MAX_VALUE

    PriorityBlockingQueue

    支持优先级的无界阻塞队列:默认情况下元素采取自然顺序升序排列,可以以自定义compareTo()方法来指定元素的排序规则。

    demo:通过年龄比较大小,实现Comparable

    public class Person implements Comparable { private String name; private Integer age; public Person(String name, Integer age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } //实现比较方法 @Override public int compareTo(Object o) { Person p = (Person) o; return this.age - p.getAge(); } @Override public String toString() { return "Person{" + "name='" + name + '\'' + ", age=" + age + '}'; } }

    测试

    public class Test { public static void main(String[] args) { PriorityBlockingQueue queue = new PriorityBlockingQueue(); queue.add(new Person("1",45)); queue.add(new Person("2",2)); queue.add(new Person("3",4)); queue.add(new Person("4",3)); System.out.println(queue); } }

    结果:年龄从小到大排序

    [Person{name='2', age=2}, Person{name='4', age=3}, Person{name='3', age=4}, Person{name='1', age=45}]

    DelayQueue

    支持延时获取元素的无界阻塞队列,队列元素必须实现Delayed接口,在创建元素的时候可以指定多久才能从队列中获取当前元素。时间到了才能从队列中获取到元素

    使用场景

    缓存系统的设计:可以使用DelayQueue缓存的元素的有效期,使用线程循环查询DelayQueue,一旦能够从DelayQueue中获取到元素,表示缓存有效期到了:参看此示例

    订单到期:查看此示例

    限时支付

    简单使用

    实体implements Delayed

    public class MyDelayed implements Delayed { private String key; private long currentTime; private long expireTime; public MyDelayed(String key, long expireTime) { this.key=key; this.expireTime=expireTime; this.currentTime = System.currentTimeMillis(); } public String getKey() { return key; } public void setKey(String key) { this.key = key; } /** * 获取剩余的时间 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return expireTime - unit.MILLISECONDS.toSeconds(System.currentTimeMillis()-currentTime); } /** * 剩余时间升序排列 * @param o * @return */ @Override public int compareTo(Delayed o) { MyDelayed p = (MyDelayed) o; if(this.getDelay(TimeUnit.MICROSECONDS) > p.getDelay(TimeUnit.MICROSECONDS)) return 1; if(this.getDelay(TimeUnit.MICROSECONDS) > p.getDelay(TimeUnit.MICROSECONDS)) return -1; return 0; } @Override public String toString() { return "MyDelayed{" + "key='" + key + '\'' + ", currentTime=" + currentTime + ", expireTime=" + expireTime + '}'; } }

    测试:

    public class Test2 { public static void main(String[] args) throws InterruptedException { DelayQueue<MyDelayed> queue = new DelayQueue<>(); queue.add(new MyDelayed("key1", 4)); queue.add(new MyDelayed("key2", 3)); queue.add(new MyDelayed("key3", 2)); System.out.println("会一直阻塞,直到元素过期"); System.out.println(queue.take()); System.out.println(queue); System.out.println(queue.take()); System.out.println(queue); } }

    结果:

    MyDelayed{key='key1', currentTime=1593955613953, expireTime=4} [MyDelayed{key='key3', currentTime=1593955613954, expireTime=2}, MyDelayed{key='key2', currentTime=1593955613953, expireTime=3}] MyDelayed{key='key3', currentTime=1593955613954, expireTime=2} [MyDelayed{key='key2', currentTime=1593955613953, expireTime=3}]

    设计缓存实现

    实体:

    public class MyDelayed implements Delayed { private String key; private long currentTime; private long expireTime; public MyDelayed(String key, long expireTime) { this.key=key; this.expireTime=expireTime; this.currentTime = System.currentTimeMillis(); } public String getKey() { return key; } public void setKey(String key) { this.key = key; } /** * 获取剩余的时间 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return expireTime - unit.MILLISECONDS.toSeconds(System.currentTimeMillis()-currentTime); } /** * 剩余时间升序排列 * @param o * @return */ @Override public int compareTo(Delayed o) { MyDelayed p = (MyDelayed) o; if(this.getDelay(TimeUnit.MICROSECONDS) > p.getDelay(TimeUnit.MICROSECONDS)) return 1; if(this.getDelay(TimeUnit.MICROSECONDS) > p.getDelay(TimeUnit.MICROSECONDS)) return -1; return 0; } @Override public String toString() { return "MyDelayed{" + "key='" + key + '\'' + ", currentTime=" + currentTime + ", expireTime=" + expireTime + '}'; } }

    缓存对象:还未过期的对象存储在map中

    public class MyCache extends Thread { DelayQueue<MyDelayed> queue = new DelayQueue(); //延迟队列只能保证延迟获取元素,但不会清楚时间归0的元素,所以使用map来作为中转 ConcurrentHashMap<String, MyDelayed> map = new ConcurrentHashMap<>(); public MyCache(){ this.start(); } //时刻检查队列中元素是不是到期 @Override public void run() { while (true) { Iterator<MyDelayed> iterator = queue.iterator(); while (iterator.hasNext()) { MyDelayed next = iterator.next(); if (next.getDelay(TimeUnit.MICROSECONDS) <= 0) { //System.out.println(next.getKey() + ":过期了--"+next.getDelay(TimeUnit.MICROSECONDS)); map.remove(next.getKey()); } } } } public void add(String key, Long expireTime) { MyDelayed myDelayed = new MyDelayed(key, expireTime); queue.add(myDelayed); map.put(myDelayed.getKey(), myDelayed); } public Delayed get(String key) { return map.get(key); } }

    使用

    public class Test extends Thread { public static void main(String[] args) throws InterruptedException { MyCache my = new MyCache(); //每3秒给延时队列添加一个元素 System.out.println("开始添加元素"); Thread.sleep(1000); my.add("key-1", 1l); Thread.sleep(1000); my.add("key-2", 1l); //只要还没有过期,就能够从缓存中拿到数据 while (true) { if (my.get("key-1") == null && my.get("key-2") == null) { System.out.println("key-1 过期了"); System.out.println("key-2 过期了"); break; } System.out.println("key-1 还没有过期:"); System.out.println("key-2 还没有过期:"); } } }

    结果:

    开始添加元素 key-1 还没有过期: key-2 还没有过期: 。。。。。。 key-2 还没有过期: key-1 过期了 key-2 过期了

    SynchronousQueue

    是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。本身不存储元素,只是负责传递

    demo

    public class Test extends Thread { public static void main(String[] args) throws InterruptedException { SynchronousQueue queue = new SynchronousQueue(); //生产者线程 new Thread(){ @Override public void run() { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); //消费者线程 new Thread(){ @Override public void run() { try { System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } }

    结果

    1

    LinkedTransferQueue

    链表+无界阻塞队列,多了tryTransfer和transfer方法

    transfer()插队方法:如果当前有消费者正在等待接收元素(使用take()或者超时的poll()时),transfer可以把生产者传入的元素立刻传输给消费者。如果消费者等待消费,该方法就是正常的put操作

    tryTransfer()尝试插队:有消费者等待,插队成功返回true,否则返回false,变为正常的put操作,和transfer()方法区别就是他能够理解返回,而transfer()必须要等到有消费者消费了才能够返回

    LinkedBlockingDeque

    链表+双向阻塞队列:即可以在队列两端进行添加和移除

     

    多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。

     

    另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。

    阻塞队列的实现原理:

    等待通知模式,使用Condition来实现

     

     

     

     

     

    Processed: 0.020, SQL: 9