Java中队列的解析

    技术2022-07-10  210

    Java中队列的解析 定义 队列是一种特殊的线性表,遵循的原则就是“先入先出”。在我们日常使用中,经常会用来并发操作数据。在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列通常有两种方式:一种是使用阻塞队列,另一种是使用线程同步锁。

    什么是阻塞队列? 假设有一个面包房,里面有一个客人吃面包,一个师傅烤面包。篮子里面最多放2个面包,师傅考完了面包放到篮子里,而客人吃面包则从篮子里面往外拿,为了保证客人吃面包的时候篮子里有面包或者师傅烤面包的时候篮子不会溢出,这时候就需要引用出来阻塞队列的概念,就是我们常说的生产者消费者的模式。

    阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

    1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。

    2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    系统内不阻塞队列:PriorityQueue 和 ConcurrentLinkedQueue 我们来看一下不阻塞队列的关系(以PriorityQueue 为例):

    PriorityQueue 类继承自AbstractQueue,实现了Serializable接口。实质上维护了一个有序列表,PriorityQueue位于Java util包中,观其名字前半部分的单词Priority是优先的意思,实际上这个队列就是具有“优先级”。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。   ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以不需要知道队列的大小, ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列;ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

    实现阻塞接口的队列: java.util.concurrent 中加入了 BlockingQueue 接口和五个阻塞队列类。它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。 五个队列所提供的各有不同:

    ArrayBlockingQueue :一个由数组支持的有界队列。

    LinkedBlockingQueue :一个由链接节点支持的可选有界队列。

    PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。

    DelayQueue :一个由优先级堆支持的、基于时间的调度队列。

    SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。

    我们看一下ArrayBlockingQueue 和LinkedBlockingQueue 的继承关系:

    通过查看两个类的继承关系,我们可以知道,他们也是继承自AbstractQueue,实现了Serializable接口;不同的是他们同时实现了BlockingQueue接口。

    简单介绍下其中的几个:

    LinkedBlockingQueueLinkedBlockingQueue默认大小是Integer.MAX_VALUE,可以理解为一个缓存的有界等待队列,可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。当生产者往队列中放入一个数据时,缓存在队列内部,当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者同理。

    ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按FIFO(先进先出)原则对元素进行排序。

    PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。

    关于ConcurrentLinkedQueue和LinkedBlockingQueue: 也可以理解为阻塞队列和非阻塞队列的区别:

    1.LinkedBlockingQueue是使用锁机制,ConcurrentLinkedQueue是使用CAS算法,虽然LinkedBlockingQueue的底层获取锁也是使用的CAS算法

    2.关于取元素,ConcurrentLinkedQueue不支持阻塞去取元素,LinkedBlockingQueue支持阻塞的take()方法。

    3.关于插入元素的性能,但在实际的使用过程中,尤其在多cpu的服务器上,有锁和无锁的差距便体现出来了,ConcurrentLinkedQueue会比LinkedBlockingQueue快很多。

    生产者消费者代码: 在网上看到一个生产者消费者的小例子,对于理解阻塞队列非常有帮助,代码如下:

    import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;

    public class BlockingQueueTest { public static class Basket { BlockingQueue basket = new ArrayBlockingQueue<>(3);

    private void produce() throws InterruptedException { basket.put("苹果"); } private void consume() throws InterruptedException { basket.take(); } private int getAppleNumber() { return basket.size(); } } private static void testBasket() { final Basket basket = new Basket(); class Producer implements Runnable { public void run() { try { while (true) { System.out.println("生产者开始生产苹果###"); basket.produce(); System.out.println("生产者生产苹果完毕###"); System.out.println("篮子中的苹果数量:" + basket.getAppleNumber() + "个"); Thread.sleep(300); } } catch (InterruptedException e) {} } } class Consumer implements Runnable { public void run() { try { while (true) { System.out.println("消费者开始消费苹果***"); basket.consume(); System.out.println("消费者消费苹果完毕***"); System.out.println("篮子中的苹果数量:" + basket.getAppleNumber() + "个"); Thread.sleep(1000); } } catch (InterruptedException e) {} } } ExecutorService service = Executors.newCachedThreadPool(); Producer producer = new Producer(); Consumer consumer = new Consumer(); service.submit(producer); service.submit(consumer); try { Thread.sleep(10000); } catch (InterruptedException e) {} service.shutdownNow(); } public static void main(String[] args) { BlockingQueueTest.testBasket(); }

    }

    Processed: 0.033, SQL: 9