生产者消费者

    技术2022-07-14  59

    生产者消费者模式介绍

    生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:

    如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;如果共享数据区为空的话,阻塞消费者继续消费数据;

    在实现生产者消费者问题时,可以采用三种方式:

    1.使用Object的wait/notify的消息通知机制;

    2.使用Lock的Condition的await/signal的消息通知机制;

    3.使用BlockingQueue实现。本文主要将这三种实现方式进行总结归纳。

    使用Object的wait/notify的消息通知机制

    主要成员变量

    public final LinkedList<E> list=new LinkedList<>(); public static final int Default=5; public int max;

    链表是用来存放生产者放入的元素的。 max是链表最大长度

    构造器

    public testDemo1() { this(Default); } public testDemo1(int max) { this.max = max; }

    生产者代码

    public void put(E value){ synchronized (list){ while(list.size()>=max){ System.out.println("生产线已满,不能继续添加"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("该值"+value+"已被放置"); list.addLast(value); list.notifyAll(); } }

    当生产线满了的话,就不能继续放入们就要让生产线的线程等待。否则就将要传入的值传入,并且唤醒消费者线程。

    消费者代码

    public void take(){ synchronized (list){ while (list.size()==0){ System.out.println("已经没有东西可以取出来了"); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } E result=list.removeFirst(); System.out.println("取出来的是"+result); list.notifyAll(); } }

    测试代码

    public static void main(String[] args) { testDemo1<Integer> demo1=new testDemo1<>(); new Thread("生产者"){ @Override public void run() { while (true){ demo1.put((int)(1+Math.random() * 1000)); } } }.start(); new Thread("消费者"){ @Override public void run() { while (true){ demo1.take(); } } }.start(); }

    测试结果如图

    使用ReentrantLock实现

    主要成员变量

    private final LinkedList<E> queue=new LinkedList<E>(); private static final int Max_Value = 5; //链表的默认容量 private int max; //手动设置的容量 private final Lock lock = new ReentrantLock(); //锁 private final Condition putCon = lock.newCondition(); //关于判满的Condition private final Condition takeCon = lock.newCondition(); //关于判空的Condition

    构造器

    public ConsumerAndCustomer1() { this(Max_Value); } public ConsumerAndCustomer1(int max) { this.max = max; }

    主要代码

    生产者代码

    public void put(E val){ try { lock.lock(); while (queue.size()>=max){ System.out.println(Thread.currentThread().getName()+"Queue已经满了"); putCon.await(); } System.out.println("生产者已经生产一个新产品"); queue.addLast(val); takeCon.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } }

    消费者代码

    public void take() { try { lock.lock(); while (queue.isEmpty()) { System.out.println("已经没有产品可以销售了"); takeCon.await(); } E result = queue.removeFirst(); System.out.println("消费者卖了一个新产品"+result); putCon.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }

    测试代码同上。

    BlockingQueue

    使用阻塞队列BlockingQueue实现生产者消费者是最容易的,我们上面实现的两种方法,实际上就是简易版的阻塞队列,所以我们就不用了阻塞队列实现了。

    Processed: 0.008, SQL: 9