在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞当阻塞队列是满时,往队列里添加元素的操作将会被阻塞试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图往满的阻塞队列中添加新元素的线程同样也会别阻塞,直到其他线程从队列中移除一个或多个元素或完全清空队列后是队列重新变得空闲并后续新增。
为什么需要阻塞队列? 不需要关心什么时候需要阻塞线程,什么时候唤醒线程,因为这一切都交给了阻塞队列 在concurrent包发布前,多线程环境下我们必须自己控制这些细节,尤其要兼顾效率和线程安全,这无疑给编程增加了复杂度。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
操作失败会报错抛异常
向队列中添加元素
public class bqTest { public static void main(String[] args) { BlockingQueue bq = new ArrayBlockingQueue(3); System.out.println(bq.add("aaa")); System.out.println(bq.add("bbb")); System.out.println(bq.add("ccc")); System.out.println(bq.add("ddd")); } }运行结果: 插入第4个时,报错: Queue full
移除队列中的第一个元素
public class bqTest { public static void main(String[] args) { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); System.out.println(bq.add("aaa")); System.out.println(bq.add("bbb")); System.out.println(bq.add("ccc")); // System.out.println(bq.add("ddd")); System.out.println("=========="); System.out.println(bq.element()); System.out.println("=========="); System.out.println(bq.remove()); System.out.println(bq.remove()); System.out.println(bq.remove()); System.out.println(bq.remove()); } }运行结果: 队列为空时,报错:java.util.NoSuchElementException
检查队列是否为空,若不空则输出队列的第一个元素,为空返回报错:java.util.NoSuchElementException
public class bqTest1 { public static void main(String[] args) { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); System.out.println(bq.add("aaa")); System.out.println(bq.add("bbb")); System.out.println(bq.add("ccc")); // System.out.println(bq.add("ddd")); System.out.println("=========="); System.out.println(bq.element()); System.out.println("=========="); System.out.println(bq.remove()); System.out.println(bq.remove()); System.out.println(bq.remove()); // System.out.println(bq.remove()); System.out.println("=========="); System.out.println(bq.element()); } }运行结果:
为了避免上组的报错抛异常
向队列中添加元素
public class bqTest2 { public static void main(String[] args) { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); System.out.println(bq.offer("a")); System.out.println(bq.offer("b")); System.out.println(bq.offer("c")); System.out.println(bq.offer("d")); } }运行结果: 添加失败时返回false。
移除队列中的第一个元素
public class bqTest2 { public static void main(String[] args) { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); System.out.println(bq.offer("a")); System.out.println(bq.offer("b")); System.out.println(bq.offer("c")); System.out.println(bq.offer("d")); System.out.println("=========="); System.out.println(bq.poll()); System.out.println(bq.poll()); System.out.println(bq.poll()); System.out.println(bq.poll()); } }检查队列是否为空,为空返回null,若不空则输出队列的第一个元素
public class bqTest2 { public static void main(String[] args) { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); System.out.println(bq.offer("a")); System.out.println(bq.offer("b")); System.out.println(bq.offer("c")); System.out.println(bq.offer("d")); System.out.println("=========="); System.out.println(bq.peek()); System.out.println("=========="); System.out.println(bq.poll()); System.out.println(bq.poll()); System.out.println(bq.poll()); System.out.println(bq.poll()); } }运行结果:
与上两组不同,运行失败会一直阻塞。
向队列中添加元素,无返回值,队列满时会一直阻塞。
public class bqTest3 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); bq.put("aa"); bq.put("bb"); bq.put("cc"); System.out.println("========"); bq.put("dd"); } }运行结果:
返回队列第一个元素,队列为空时阻塞。
public class bqTest3 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); bq.put("aa"); bq.put("bb"); bq.put("cc"); System.out.println("========"); // bq.put("dd"); System.out.println(bq.take()); System.out.println(bq.take()); System.out.println(bq.take()); System.out.println("========"); System.out.println(bq.take()); } }运行结果:
与上以组相比增加了两个参数:
timeout:时间unit:时间单位运行结果: 添加前3个元素时无需等待5秒;添加第4个元素时队列已满,阻塞5秒后返回false。
移除队列中第一个元素并返回,队列为空时,阻塞5秒返回null。
public class bqTest4 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); System.out.println(bq.offer("aaaa", 5, TimeUnit.SECONDS)); System.out.println(bq.offer("bbbb", 5, TimeUnit.SECONDS)); System.out.println(bq.offer("cccc", 5, TimeUnit.SECONDS)); // System.out.println(bq.offer("dddd", 5, TimeUnit.SECONDS)); System.out.println("========="); System.out.println(bq.poll(5, TimeUnit.SECONDS)); System.out.println(bq.poll(5, TimeUnit.SECONDS)); System.out.println(bq.poll(5, TimeUnit.SECONDS)); System.out.println(bq.poll(5, TimeUnit.SECONDS)); } }运行结果:
1. ArrayBlockingQueue 数组结构组成的有界阻塞队列
此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的
2. LinkedBlockingQueue 链表结构组成的有界阻塞队列(大小默认值为Integer.MAX_VALUE)
此队列按照先出先进的原则对元素进行排序
3. PriorityBlockingQueue 支持优先级的无界阻塞队列
4. DelayQueue 支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素
5. SynchronousQueue 不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。
public class SynBQ { public static void main(String[] args) { SynchronousQueue<String> bq = new SynchronousQueue<>();// 非公平 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put 1"); bq.put("1"); System.out.println(Thread.currentThread().getName() + "\t put 2"); bq.put("2"); System.out.println(Thread.currentThread().getName() + "\t put 3"); bq.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"AAA" ).start(); new Thread(() -> { try { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t take"); System.out.println(Thread.currentThread().getName() + "\t" + bq.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"BBB" ).start(); } }运行结果:
6. LinkedTransferQueue 链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法
transfer 如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer 用来试探生产者传入的元素能否直接传给消费者。,如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
7. LinkedBlockingDeque 链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争
一个初始值为0的变量,两个线程对其交替操作,一个加1一个减1,循环5轮。
传统版:
/** * 多线程3句口诀: * 1. 线程 操作(方法) 资源类 * 2. 判断 干活 通知 * 3. 防止虚假唤醒机制(判断时不用if,要用while) */ public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); }catch (Exception e){ e.printStackTrace(); } } },"aaa" ).start(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.decrement(); }catch (Exception e){ e.printStackTrace(); } } },"bbb" ).start(); } } class ShareData{// 资源类 private int num = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment(){// 加1 lock.lock(); try { // 1.判断 while (num != 0){ // 等待,不能生产 condition.await(); } // 2.干活 num++; System.out.println(Thread.currentThread().getName() + "\t" + num); // 3.通知唤醒 condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void decrement(){// 减1 lock.lock(); try { // 1.判断 while (num == 0){ // 等待,不能生产 condition.await(); } // 2.干活 num--; System.out.println(Thread.currentThread().getName() + "\t" + num); // 3.通知唤醒 condition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }阻塞队列版
public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 生产线程启动"); try { myResource.myProd(); }catch (Exception e){ e.printStackTrace(); } },"Prod" ).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 消费线程启动"); try { myResource.myConsumer(); System.out.println(); System.out.println(); }catch (Exception e){ e.printStackTrace(); } },"Consume" ).start(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e){ e.printStackTrace(); } System.out.println(); System.out.println("5秒时间到,叫停"); myResource.stop(); } } class MyResource{ private volatile boolean flag = true;// 默认开启,进行生产+消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue){//传的是接口,更灵活 this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName());//看下到底是哪个具体实现类 } public void myProd() throws InterruptedException { String data = null; Boolean retValue; while (flag){ data = atomicInteger.incrementAndGet()+""; retValue = blockingQueue.offer(data,2, TimeUnit.SECONDS); if (retValue){ System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功"); }else { System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName()+"\t 中断,表示flag=false,生产结束"); } public void myConsumer() throws InterruptedException { String result = null; while (flag){ result = blockingQueue.poll(2,TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")){ flag = false; System.out.println(Thread.currentThread().getName() + "\t 超过2秒钟没有取到,消费退出"); return; } System.out.println(Thread.currentThread().getName() + "\t 消费队列" + result + "成功"); } } public void stop(){ this.flag = false; } }结果:
java.util.concurrent.ArrayBlockingQueue Prod 生产线程启动 Prod 插入队列1成功 Consume 消费线程启动 Consume 消费队列1成功 Consume 消费队列2成功 Prod 插入队列2成功 Prod 插入队列3成功 Consume 消费队列3成功 Prod 插入队列4成功 Consume 消费队列4成功 Prod 插入队列5成功 Consume 消费队列5成功 5秒时间到,叫停 Prod 中断,表示flag=false,生产结束 Consume 超过2秒钟没有取到,消费退出 Process finished with exit code 0