这里写目录标题
概念架构BlockingQueue的七个实现类BlockingQueue的API
阻塞队列的应用生产者消费者线程池
概念
当阻塞队列为空时,获取(take)操作是阻塞的;当阻塞队列为满时,添加(put)操作是阻塞的。
阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。许多消息中间件底层(就是阻塞队列生产消费)就是用它们实现的,还有线程池
架构
我们用diagrams查看一下
BlockingQueue的七个实现类
主要就是加粗的三个
类名作用
ArrayBlockingQueue由数组构成的有界阻塞队列LinkedBlockingQueue由链表构成的有界阻塞队列,但有个巨坑,其边界也就是其默认大小是Integer.MAX_VALUE,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor有体现)PriorityBlockingQueue支持优先级排序的无界阻塞队列DelayQueue支持优先级的延迟无界阻塞队列SynchronousQueue不存储元素的阻塞队列,单个元素的阻塞队列,也就是队列只有一个元素,元素不移除,不会进来新的元素LinkedTransferQueue由链表构成的无界阻塞队列LinkedBlockingDeque由链表构成的双向阻塞队列
SynchronousQueue 队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个
import java
.util
.concurrent
.BlockingQueue
;
import java
.util
.concurrent
.SynchronousQueue
;
import java
.util
.concurrent
.TimeUnit
;
public class SynchronousQueueDemo {
public static void main(String
[] args
) {
BlockingQueue
<String> blockingQueue
=new SynchronousQueue<String>();
new Thread(()->{
try {
System
.out
.println(Thread
.currentThread().getName()+" put 1");
blockingQueue
.put("1");
System
.out
.println(Thread
.currentThread().getName()+" put 2");
blockingQueue
.put("2");
System
.out
.println(Thread
.currentThread().getName()+" put 3");
blockingQueue
.put("3");
} catch (InterruptedException e
) {
e
.printStackTrace();
}
},"AAA").start();
new Thread(()->{
try {
try{ TimeUnit
.SECONDS
.sleep(5); }catch (InterruptedException e
){ e
.printStackTrace(); }
System
.out
.println(Thread
.currentThread().getName()+" take "+blockingQueue
.take());
try{ TimeUnit
.SECONDS
.sleep(5); }catch (InterruptedException e
){ e
.printStackTrace(); }
System
.out
.println(Thread
.currentThread().getName()+" take "+blockingQueue
.take());
try{ TimeUnit
.SECONDS
.sleep(5); }catch (InterruptedException e
){ e
.printStackTrace(); }
System
.out
.println(Thread
.currentThread().getName()+" take"+blockingQueue
.take());
} catch (Exception e
) {
e
.printStackTrace();
}
},"BBB").start();
}
}
AAA put
1
BBB take
1
AAA put
2
BBB take
2
AAA put
3
BBB take3
BlockingQueue的API
方法类型抛出异常返回布尔阻塞超时
插入add(E e)offer(E e)put(E e)offer(E e,Time,TimeUnit)取出remove()poll()take()poll(Time,TimeUnit)队首element()peek()无无
有4种类型的api,比如队列已经满了
add会直接抛出异常offer会返回falseput让你陷入阻塞offer(e,t,t)阻塞有一个超时时间,超时后会返回false,开发中一般用这个
阻塞队列的应用
生产者消费者
import java
.util
.concurrent
.ArrayBlockingQueue
;
import java
.util
.concurrent
.BlockingQueue
;
import java
.util
.concurrent
.TimeUnit
;
import java
.util
.concurrent
.atomic
.AtomicInteger
;
public class ProdConsBlockQueueDemo {
public static void main(String
[] args
) {
MyResource myResource
= new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System
.out
.println(Thread
.currentThread().getName() + " 生产线程启动");
try { myResource
.myProd(); } catch (Exception e
) { e
.printStackTrace(); }
}, "prod").start();
new Thread(() -> {
System
.out
.println(Thread
.currentThread().getName() + " 消费线程启动");
try { TimeUnit
.SECONDS
.sleep(1);myResource
.myCons(); } catch (Exception e
) { e
.printStackTrace(); }
}, "cons").start();
try { TimeUnit
.SECONDS
.sleep(1);
TimeUnit
.SECONDS
.sleep(5);
} catch (Exception e
) {
e
.printStackTrace();
}
System
.out
.println("5秒钟后,叫停");
myResource
.stop();
}
}
class MyResource {
private volatile boolean FLAG
= true;
private AtomicInteger atomicInteger
= new AtomicInteger();
private BlockingQueue
<String> blockingQueue
;
public MyResource(BlockingQueue
<String> blockingQueue
) {
this.blockingQueue
= blockingQueue
;
}
public void myProd() throws Exception
{
String data
= null
;
boolean retValue
;
while (FLAG
) {
data
= atomicInteger
.incrementAndGet() + "";
retValue
= blockingQueue
.offer(data
, 2L
, TimeUnit
.SECONDS
);
if (retValue
) {
System
.out
.println(Thread
.currentThread().getName() + " 插入队列" + data
+ "成功");
} else {
System
.out
.println(Thread
.currentThread().getName() + " 插入队列" + data
+ "失败");
}
TimeUnit
.SECONDS
.sleep(1);
}
System
.out
.println(Thread
.currentThread().getName() + " FLAG==false,停止生产");
}
public void myCons() throws Exception
{
String res
;
while (FLAG
) {
res
= blockingQueue
.poll(2L
, TimeUnit
.SECONDS
);
if (null
== res
|| res
.equalsIgnoreCase("")) {
FLAG
= false;
System
.out
.println(Thread
.currentThread().getName() + " 超过2秒钟没有消费,退出消费");
return;
}
System
.out
.println(Thread
.currentThread().getName() + " 消费队列" + res
+ "成功");
}
}
public void stop() {
this.FLAG
= false;
}
}
prod 生产线程启动
cons 消费线程启动
prod 插入队列
1成功
prod 插入队列
2成功
cons 消费队列
1成功
cons 消费队列
2成功
prod 插入队列
3成功
cons 消费队列
3成功
prod 插入队列
4成功
cons 消费队列
4成功
prod 插入队列
5成功
cons 消费队列
5成功
5秒钟后,叫停
prod FLAG
==false,停止生产
cons 超过
2秒钟没有消费,退出消费
线程池
线程池有讲这个