阻塞队列其实就是额外增加了两个附加操作的队列
在队列已满的情况下,如果继续put一个元素就会阻塞等待;在队列为空的情况下,如果从队列中take元素就会阻塞等待;接下来我会使用ReentrantLock+Condition的方式来手撸一个阻塞队列,主要的方法就是put()和take()。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @program: tengxun * @description: 阻塞队列 * @author: cjr * @create: 2020-06-30 08:33 **/ public class BlockingQueue<E> { //存放对象的数组 final Object[] item; //当前待获取元素的下标 int takeIndex; //当前待插入元素的下标 int putIndex; //队列中元素个数 int count; //对象数组的容量 int capcity; //代表生产者的条件变量 final Condition producer; //代表消费者的条件变量 final Condition consumer; //锁对象 final ReentrantLock lock; public BlockingQueue(int capcity) { this.capcity = capcity; this.takeIndex = 0; this.putIndex = 0; this.count = 0; this.item = new Object[capcity]; lock = new ReentrantLock(); producer = lock.newCondition(); consumer = lock.newCondition(); } public void put(E var) { //判断插入对象是否为空,如果为空,抛出空指针异常 if (var == null) { throw new NullPointerException(); } ReentrantLock lock1 = lock; try { //使用lockInterruptibly()方式运行当前线程阻塞状态被中断 lock1.lockInterruptibly(); //如果队列已经满了,生产者就会阻塞等待消费者消费,并且唤醒自己 if (count == item.length) { producer.await(); } this.enqueue(var); } catch (Exception e) { e.printStackTrace(); } finally { //最后别忘了手动释放lock lock1.unlock(); } } private void enqueue(E var) { Object[] tmp = this.item; tmp[putIndex] = var; //更新待插入元素下标,如果超过了数组长度,则从0开始 if (++putIndex == tmp.length) { putIndex++; } count++; //生产者生产了一个元素后,唤醒阻塞等待的消费者去消费 consumer.signal(); } public E take() { ReentrantLock lock2 = lock; E var = null; try { lock2.lockInterruptibly(); //如果队列为空,消费者就会阻塞等待生产者生产,并且唤醒自己 if (count == 0) { consumer.await(); } var = this.dequeue(); } catch (Exception e) { e.printStackTrace(); } finally { lock2.unlock(); } return var; } private E dequeue() { Object[] tmp = this.item; E var = (E)tmp[takeIndex]; tmp[takeIndex] = null; if (++takeIndex == tmp.length) { takeIndex = 0; } count--; //消费者消费完就会唤醒阻塞等待的生产者 producer.signal(); return var; } }其实阻塞队列内部实现就是使用ReentrantLock+Condition实现的一个生产者和消费者模型,其实也可以使用synchronized+wait/notify的方式来实现。