package ThreadTest.myBlockQueue;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
* 利用Lock+Conditon+Atomic实现
* */
public class MyBlockedQueue<T> {
private volatile ArrayList<T> blockqueue;
private int maxCap;
private Lock lock;
private Condition isFull;
private Condition isNull;
private AtomicInteger size;
public MyBlockedQueue(int cap, int maxCap){
this.blockqueue = new ArrayList<>(cap);
this.maxCap = maxCap;
this.lock = new ReentrantLock();
this.isFull = lock.newCondition();
this.isNull = lock.newCondition();
this.size = new AtomicInteger(0);
}
public void put(T t) throws InterruptedException{
lock.lock();//保持互斥
try {
while (size.get() >= maxCap) {
isFull.await();
}
blockqueue.add(t);
while(size.incrementAndGet()>0){
isNull.signalAll();
}
}finally {
lock.unlock();
}
}
public T take() throws InterruptedException{
T t = null;
try{
lock.lock();//仅使用原子类并不能保证线程安全性,用于实现读写线程之间的互斥访问
while(size.get()==0){
isNull.await();
}
t = blockqueue.remove(0);
while (size.decrementAndGet()<maxCap){
isFull.signalAll();
}
}finally {
lock.unlock();
}
return t;
}
}
转载请注明原文地址:https://ipadbbs.8miu.com/read-64884.html