生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
.
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。
由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。
在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。
/**资源类**/
class Data {
private int number = 0;
/**
* 判断等待、业务、通知
*/
//+1
public synchronized void increment() throws InterruptedException {
if (number != 0) {
// 等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我+1完毕了
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我-1完毕了
this.notifyAll();
}
}
package com.xgp.pc;
/**
* 线程之间的通信问题:生产者和消费者问题! 等待唤醒 通知
* 线程交替问题 A B 操作同一个变量 num = 0
* A num+1
* B num-1
* @author 薛国鹏
*/
@SuppressWarnings("all")
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
进程完成,退出码 0
package com.xgp.pc;
/**
* 线程之间的通信问题:生产者和消费者问题! 等待唤醒 通知
* 线程交替问题 A B 操作同一个变量 num = 0
* A num+1
* B num-1
* @author 薛国鹏
*/
@SuppressWarnings("all")
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
A=>1
B=>0
A=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
D=>0
C=>1
D=>0
进程完成,退出码 0
/**资源类**/
class Data {
private int number = 0;
/**
* 判断等待、业务、通知
*/
//+1
public synchronized void increment() throws InterruptedException {
while (number != 0) {
// 等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我+1完毕了
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我-1完毕了
this.notifyAll();
}
}
A=>1
B=>0
A=>1
B=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
D=>0
C=>1
D=>0
进程完成,退出码 0
package com.xgp.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("all")
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
/**资源类**/
class Data2 {
private int number = 0;
/**
* 判断等待、业务、通知
*/
Lock lock = new ReentrantLock();
//锁监视器(取代了对象监视器的使用)
Condition condition = lock.newCondition();
//+1
public void increment() throws InterruptedException {
lock.lock();
try {
while (number != 0) {
// 等待
condition.await(); //等待
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我+1完毕了
condition.signalAll();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number == 0) {
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我-1完毕了
condition.signalAll();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
进程完成,退出码 0
package com.xgp.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("all")
public class C {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.printA();
} catch (Exception e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.printB();
} catch (Exception e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.printC();
} catch (Exception e) {
e.printStackTrace();
}
}
},"C").start();
}
}
class Data3 {
private int number = 1;
private Lock lock = new ReentrantLock();
//锁监视器(取代了对象监视器的使用)
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
while (number != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName());
//唤醒指定的B
number = 2;
condition2.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (number != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName());
//唤醒指定的B
number = 3;
condition3.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (number != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName());
//唤醒指定的B
number = 1;
condition1.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
进程完成,退出码 0
原文:https://www.cnblogs.com/xgp123/p/12339830.html