生产者消费者模式

生产者消费者模式

什么是生产者消费者模式?

生产者消费者模式不属于 23 种设计模式,它是一种用于在多线程中分割产生数据的线程以及消费数据的线程的模式

通过添加缓冲区,每次都要求生产者(也就是产生数据的线程)将生产好的数据先添加到缓冲区中,然后再由消费者从缓冲区中获取数据进行处理。如果缓冲区已经没有办法在存放新的数据,那么生产者就会陷入阻塞;反之,如果缓冲区中没有任何数据,消费者也是会陷入阻塞的

注:生产者消费者常见的应用场景:消息队列和线程池

为什么要使用生产者消费者模式?

本身的目的就是为了通过增加缓冲区从而平衡消费者和生产者之间的处理能力,也就是避免因为生产者数据产生过快导致数据无处存放的情况,以及消费者消费数据过快,从而导致的线程空转的现象

此外,可以让生产者和消费者之间的通信解耦,避免两者直接进行数据交互。这样就意味着双方仅需要关注缓冲区中是否还有数据,而不需要关注彼此处于什么状态,也就是实现了异步通信。

注:还有个分布式的理由没有提,视情况而定到底说不说,想起来就说

如何实现生产者消费者模式?

实现的方式主要分为五种,下面分别实现这五种

注:为了方便,这里先把生产者和消费者本身作为公共类抽出来,只有缓冲区的逻辑是不同的

  • 基础代码:生产者和消费者
public class Producer implements Runnable{
// 缓冲区
private Container container;
public Producer(Container container){
this.container = container;
}
// 生产数据
public void run(){
while(true)
container.put(new Random().nextInt());
}
}
public class Consumer implements Runnable{
private Container container;
public Consumer(container){
this.container = container;
}
// 获取数据
public void run(){
while(true)
Integer value = container.take();
}
}
  • synchronized + wait / notify(内置锁)

注:其实生产者消费者的核心实现思路就是这个,后面的不同方式就是采用不同的工具类实现

public class Container{
private static final int SIZE = 10;
private int capacity = SIZE;
private Deque<Integer> buffer = new ArrayDeque<>();

public void put(int value){
synchronized(this.buffer){
// 注: 这里需要使用 while, 因为 synchronized 存在虚假唤醒的情况, ReentrantLock 不用
while(this.capacity == buffer.size()){
// 这里直接省略异常处理, 太麻烦了
wait();
}
// 放入数据
buffer.offer(value);
// 唤醒消费者
notifyAll();
}
}

public int take(){
int value = -1;
synchronized(this.buffer){
while(buffer.isEmpty()){
wait();
}
// 获取数据
value = buffer.poll();
// 唤醒生产者
notifyAll();
}
return value;
}
}
  • Semaphore(信号量实现)

注:这个是 JUC 工具类中的,如果写了这种写法,很有可能扩展到对其源码或者对其他工具类的相关问题

public class Container{
private static final int SIZE = 10;
private final int capacity = SIZE;
private Deque<Integer> buffer = new ArrayDeque<>();

/** 需要准备的变量
* 1. fullCount: 表示缓冲区是否已满
* 2. emptyCount: 表示缓冲区是否为空
* 3. isUse: 每次仅允许一个线程使用缓冲区, 避免出现并发异常
* 注: synchronized 因为只使用单个条件变量, 所以会存在虚假唤醒的问题
* 注: 信号量为 0 就会被阻塞, 信号量为 >= 0 才可以执行
*/
private Semaphore fullCount = new Semaphore(0);
private Semaphore emptyCount = new Semaphore(capacity);
private Semaphore isUse = new Semaphore(1);
public void put(int value){
try{
// 获取信号量
emptyCount.acquire();
isUse.acquire();
// 存放数据
buffer.offer(value);
}catch(Exception e){
e.printStackTrace();
}finally{
// 记得修改信号量
fullCount.release();
isUse.release();
}
}

public int take(){
int value = -1;
try{
fullCount.acquire();
isUse.acquire();
value = buffer.poll();
}catch(Exception e){
e.printStackTrace();
}finally{
emptyCount.release();
isUse.acquire();
}
return value;
}
}
  • 两个 P 操作是否可以交换位置?如果不能,为什么?
    • 这两个 P 操作绝对不可以交换位置
    • 如果交换位置后,可能会出现 isUse 信号量为 0,那么此时其余线程是无法进一步获取信号量的
    • 如果此时 emptyCount 信号量也为 0,那么此时当前的线程就会陷入阻塞,只能够等待其余线程增加信号量
    • 但是此时其他的线程在等待当前线程增加 isUse 信号量,所以最终会造成死锁
  • BlockingQueue(阻塞队列实现)

注:这个也是 JUC 工具类中的,如果写了这种写法,很有可能问各个阻塞队列之间的区别或者源码

// 直接使用阻塞队列,因为阻塞队列的内部就是使用生产者消费者模式实现的
public class Container{
// 完全交给阻塞队列完成
private BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>();

public void put(int value){
buffer.put(value);
}

public int take(){
return buffer.take();
}
}
  • ReentrantLock(互斥锁)

注:这个类还是 JUC 工具类中的,如果写了这种写法,很大概率会问 synchronized 和 reentrantlock 的区别

public class Container{
private static final int SIZE = 10;
private final int capacity = SIZE;
private Deque<Integer> buffer = new ArrayDeque<>();

private ReentrantLock lock = new ReentrantLock();
private Condition emptyWaitingSet = lock.newCondition();
private Condition fullWaitingSet = lock.newCondition();

public void put(int value){
lock.lock();
try{
while(buffer.size() == this.capacity){
fullWaitingSet.await();
}
buffer.offer(value);
emptyWaitingSet.signalAll();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}

public int take(){
int value = -1;
lock.lock();
try{
while(buffer.isEmpty()){
emptyWaitingSet.await();
}
value = buffer.poll();
fullWaitingSet.signalAll();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
return value;
}
}

生产者消费者有哪些形式?

Author: Fuyusakaiori
Link: http://example.com/2021/11/04/juc/设计模式/生产者消费者模式/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.