生产者消费者模式
什么是生产者消费者模式?
生产者消费者模式不属于 23 种设计模式,它是一种用于在多线程中分割产生数据的线程以及消费数据的线程的模式
通过添加缓冲区,每次都要求生产者(也就是产生数据的线程)将生产好的数据先添加到缓冲区中,然后再由消费者从缓冲区中获取数据进行处理。如果缓冲区已经没有办法在存放新的数据,那么生产者就会陷入阻塞;反之,如果缓冲区中没有任何数据,消费者也是会陷入阻塞的
注:生产者消费者常见的应用场景:消息队列和线程池
为什么要使用生产者消费者模式?
本身的目的就是为了通过增加缓冲区从而平衡消费者和生产者之间的处理能力,也就是避免因为生产者数据产生过快导致数据无处存放的情况,以及消费者消费数据过快,从而导致的线程空转的现象
此外,可以让生产者和消费者之间的通信解耦,避免两者直接进行数据交互。这样就意味着双方仅需要关注缓冲区中是否还有数据,而不需要关注彼此处于什么状态,也就是实现了异步通信。
注:还有个分布式的理由没有提,视情况而定到底说不说,想起来就说
如何实现生产者消费者模式?
实现的方式主要分为五种,下面分别实现这五种
注:为了方便,这里先把生产者和消费者本身作为公共类抽出来,只有缓冲区的逻辑是不同的
public class Producer implements Runnable{ private Container container; public Producer(Container container){ this.container = container; } public void run(){ while(true) container.put(new Random().nextInt()); } } public class Consumer implements Runnable{ private Container container; public Consumer(container){ this.container = container; } public void run(){ while(true) Integer value = container.take(); } }
|
- synchronized + wait / notify(内置锁)
注:其实生产者消费者的核心实现思路就是这个,后面的不同方式就是采用不同的工具类实现
public class Container{ private static final int SIZE = 10; private int capacity = SIZE; private Deque<Integer> buffer = new ArrayDeque<>(); public void put(int value){ synchronized(this.buffer){ while(this.capacity == buffer.size()){ wait(); } buffer.offer(value); notifyAll(); } } public int take(){ int value = -1; synchronized(this.buffer){ while(buffer.isEmpty()){ wait(); } value = buffer.poll(); notifyAll(); } return value; } }
|
注:这个是 JUC 工具类中的,如果写了这种写法,很有可能扩展到对其源码或者对其他工具类的相关问题
public class Container{ private static final int SIZE = 10; private final int capacity = SIZE; private Deque<Integer> buffer = new ArrayDeque<>();
private Semaphore fullCount = new Semaphore(0); private Semaphore emptyCount = new Semaphore(capacity); private Semaphore isUse = new Semaphore(1); public void put(int value){ try{ emptyCount.acquire(); isUse.acquire(); buffer.offer(value); }catch(Exception e){ e.printStackTrace(); }finally{ fullCount.release(); isUse.release(); } } public int take(){ int value = -1; try{ fullCount.acquire(); isUse.acquire(); value = buffer.poll(); }catch(Exception e){ e.printStackTrace(); }finally{ emptyCount.release(); isUse.acquire(); } return value; } }
|
- 两个 P 操作是否可以交换位置?如果不能,为什么?
- 这两个 P 操作绝对不可以交换位置
- 如果交换位置后,可能会出现 isUse 信号量为 0,那么此时其余线程是无法进一步获取信号量的
- 如果此时 emptyCount 信号量也为 0,那么此时当前的线程就会陷入阻塞,只能够等待其余线程增加信号量
- 但是此时其他的线程在等待当前线程增加 isUse 信号量,所以最终会造成死锁
- BlockingQueue(阻塞队列实现)
注:这个也是 JUC 工具类中的,如果写了这种写法,很有可能问各个阻塞队列之间的区别或者源码
public class Container{ private BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(); public void put(int value){ buffer.put(value); } public int take(){ return buffer.take(); } }
|
注:这个类还是 JUC 工具类中的,如果写了这种写法,很大概率会问 synchronized 和 reentrantlock 的区别
public class Container{ private static final int SIZE = 10; private final int capacity = SIZE; private Deque<Integer> buffer = new ArrayDeque<>(); private ReentrantLock lock = new ReentrantLock(); private Condition emptyWaitingSet = lock.newCondition(); private Condition fullWaitingSet = lock.newCondition(); public void put(int value){ lock.lock(); try{ while(buffer.size() == this.capacity){ fullWaitingSet.await(); } buffer.offer(value); emptyWaitingSet.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } } public int take(){ int value = -1; lock.lock(); try{ while(buffer.isEmpty()){ emptyWaitingSet.await(); } value = buffer.poll(); fullWaitingSet.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally{ lock.unlock(); } return value; } }
|
生产者消费者有哪些形式?