线程的并发协作:生产者消费者模式 应用场景:生产者消费者问题 1、假设仓库只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中的产品取走消费 2、如果仓库没有产品,则生产者将产品放入仓库,否则停止生产并等待,知道仓库中的产品被消费者取走为止 3、如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止
分析:这是一个线程同步问题,生产者消费者共享同一个资源,并且消费者生产者之间相互依赖,互为条件 1、对于生产者,没有生产产品之前,要通知消费者等待;而生产了产品之后,又要马上通知消费者消费 2、对于消费者,在消费之后,要通知生产者已经消费结束,需要继续生产产品以供消费 3、在生产者消费者模式中,仅有synchronized是不够的 3.1、synchronized可阻止并发更新同一个共享资源,实现了同步 3.2、synchronized不能用来实现不同线程之间的消息传递(通信)
解决方式1:并发协作模型:“生产者/消费者模式” --> 管程法,借助缓冲区 生产者:负责生产数据的模块(这里的模块可能是:方法、对象、线程、进程) 消费者:负责处理数据的模块(这里的模块可能是:方法、对象、线程、进程) 缓冲区:消费者不能直接使用生产者的数据,他们之间有个缓冲区;生产者将产品放入缓冲区,消费者从缓冲区拿要处理的数据
- 实现线程的并发协作 有了缓冲区以后,生产者线程只需要往缓冲区里面放置数据,而不需要管消费者消费的情况; 同样,消费者只需要从缓冲区拿数据处理即可,也不需要管生产者生产的情况。 这样,就从逻辑上实现了“生产者线程”和“消费者线程”的分离。
- 解耦了生产者和消费者 生产者不需要和消费者直接打交道。
- 解决忙闲不均,提高效率 生产者生产数据慢时,缓冲区仍有数据,不影响消费者消费; 消费者处理数据慢时,生产者仍然可以继续往缓冲区里面放置数据 。
解决方式2:并发协作模型:“生产者/消费者模式” --> 信号灯法,借助标志位 生产者:负责生产数据的模块(这里的模块可能是:方法、对象、线程、进程) 消费者:负责处理数据的模块(这里的模块可能是:方法、对象、线程、进程) 缓冲对象:生产者消费者使用同一资源,他们之间有个标志位,类似于信号灯的作用,通过信号灯控制生产者和消费者的循环使用
注:不同场景使用不同的解决方法
一、使用管程法模拟
/** * 协作模型:生产者消费者生产模型方式1:管程法 * 借助缓冲区 */ public class ProduceAndConsumption { public static void main(String[] args) { SynContainer container = new SynContainer(); //缓冲区对象 new Productor(container).start(); //定义生产者线程 new Consumer(container).start(); //定义消费者线程 } } //馒头 class SteamedBread { private int id; SteamedBread(int id){ this.id = id; } public int getId(){ return id; } } //缓冲区,相当于馒头筐 class SynContainer { SteamedBread[] sbs = new SteamedBread[10]; int count = 0; //计数器 //存储 生产 public synchronized void push(SteamedBread sb) { if(count == sbs.length){ //缓冲区满,将if替换为while也可以 //wait后,线程会将持有的锁释放,进入阻塞状态; //这样其它需要锁的线程就可以获得锁; try { this.wait(); //这里的含义是执行此方法的线程暂停,进入阻塞状态, //等消费者消费了馒头后再生产。 } catch (InterruptedException e) { e.printStackTrace(); } } this.notify(); // 唤醒在当前对象等待池中等待的第一个线程。 // notifyAll叫醒所有在当前对象等待池中等待的所有线程。 sbs[count++] = sb; } //消费 获取 public synchronized SteamedBread pop() { //没有可以消费的,需要等待 if(count == 0){ //缓冲区空 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.notify(); //可以消费 return sbs[--count]; } } //生产者 class Productor extends Thread { SynContainer container; public Productor(SynContainer container) { this.container = container; } @Override public void run() { for(int i = 0; i < 20; i++){ System.out.println("生产的产品号" + i); container.push(new SteamedBread(i)); } } } //消费者 class Consumer extends Thread{ SynContainer container; public Consumer(SynContainer container) { this.container = container; } @Override public void run() { for(int i = 0; i < 15; i++){ System.out.println("----------消费的产品号" + container.pop().getId()); } } }模拟结果
生产的产品号0 -----------消费的产品号0 生产的产品号1 生产的产品号2 生产的产品号3 生产的产品号4 -----------消费的产品号2 -----------消费的产品号4 -----------消费的产品号3 -----------消费的产品号1 生产的产品号5 生产的产品号6 生产的产品号7 生产的产品号8 生产的产品号9 生产的产品号10 生产的产品号11 生产的产品号12 -----------消费的产品号9 生产的产品号13 -----------消费的产品号12 生产的产品号14 -----------消费的产品号13 生产的产品号15 -----------消费的产品号14 -----------消费的产品号15 生产的产品号16 -----------消费的产品号11 生产的产品号17 -----------消费的产品号16 -----------消费的产品号17 -----------消费的产品号10 -----------消费的产品号8 生产的产品号18 生产的产品号19 Process finished with exit code 0二、使用信号灯法模拟
/** * 协作模型:生产者消费者生产模型方式2:信号灯法 * 借助标志位 */ public class ProduceAndConsumption02 { public static void main(String[] args) { TV tv = new TV(); //缓冲区对象 new Player(tv).start(); //定义生产者线程 new Watcher(tv).start(); //定义消费者线程 } } //生产者 演员 class Player extends Thread { TV tv; public Player(TV tv) { this.tv = tv; } @Override public void run() { for(int i = 0; i < 10; i++){ if(i % 2 == 0){ this.tv.play("hello"); }else{ this.tv.play("world"); } } } } //消费者 观众 class Watcher extends Thread { TV tv; public Watcher(TV tv) { this.tv = tv; } @Override public void run() { for(int i = 0; i < 10; i++){ this.tv.watch(); } } } //同一个资源 电视 class TV { String voice; //加入信号灯 //true 表示演员表演,观众等待 //false 表示观众观看,演员等待 boolean flag = true; //表演 public synchronized void play(String voice){ //演员等待 if(!flag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //开始表演 this.voice = voice; System.out.println("表演了" + voice); //表演结束,切换flag,唤醒观众, this.flag = !this.flag; //this.flag = false; 也行,不过建议采用取反的方式,防止错误 this.notify(); } //观看 public synchronized void watch(){ //观众等待 if(flag){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //观众开始观看 System.out.println("----------听到了" + voice); //观众观看完,切换flag,唤醒演员 this.flag = !this.flag; this.notify(); } }模拟结果
表演了hello ----------听到了hello 表演了world ----------听到了world 表演了hello ----------听到了hello 表演了world ----------听到了world 表演了hello ----------听到了hello 表演了world ----------听到了world 表演了hello ----------听到了hello 表演了world ----------听到了world 表演了hello ----------听到了hello 表演了world ----------听到了world参考资料:
https://www.sxt.cn/Java_jQuery_in_action/eleven-threadconcurrent-collaboration.html