https://blog.csdn.net/wei_ya_wen/article/details/19344939
在java多线程操作中, BlockingQueue<E> 常用的一种方法之一。在看jdk内部尤其是一些多线程,大量使用了blockinkQueue 来做的。
借用jdk api解释下:
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:
抛出异常 特殊值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 检查 element() peek() 不可用 不可用
offer: 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false,不会抛异常:
java源代码
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
put: 将指定元素插入此队列中,将等待可用的空间.通俗点说就是>maxSize 时候,阻塞,直到能够有空间插入元素
java源代码:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞
java 源代码:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
add: 和collection的add一样,没什么可以说的。如果当前没有可用的空间,则抛出 IllegalStateException。
例子如下:
public static void main(String[] args) { java.util.concurrent.Executor executor = Executors.newFixedThreadPool(10); Runnable task = new Runnable() { @Override public void run() { System.out.println("ggg"); } }; executor.execute(task); */ BlockingQueue q = new ArrayBlockingQueue(10); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } } class Producer implements Runnable { private final BlockingQueue<Object> queue; Producer(BlockingQueue q) { queue = q; } public void run() { for(int i=0;i<100;i++){ try { queue.put(i); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } try { Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume( queue.take() ); } } catch (InterruptedException ex) { } } void consume(Object x) { System.out.println(x); }