如上图所示:
step1:线程1之前获得过Monitor,在执行临界区代码时发现部分条件不满足,无法执行完代码,因此主动调用wait让出坑位,自己进入WaitSet ,让其他阻塞的线程能够获得Monitor,避免浪费资源。
step2: 线程1主动放弃Monitor,会唤醒BLOCKED的线程去获得Monitor,图中线程2获得了Monitor。
obj.wait() 让进入 object 监视器的线程到 waitSet 等待
obj.wait(n) 让进入 object 监视器的线程到 waitSet 等待一定时间然后唤醒
obj.notify() 在 object 上正在 waitSet 等待的线程中随机挑一个唤醒
obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒
注意:
package chapter3;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test6")
public class Test6 {
public static void main(String[] args) throws InterruptedException {
String tmp = "";
tmp.wait();
}
}
运行结果:
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at chapter3.Test6.main(Test6.java:8)
package chapter3;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test6")
public class Test6 {
public static void main(String[] args) throws InterruptedException {
String tmp = "";
new Thread(()->{
synchronized (tmp){
log.warn("this is thread 1");
try {
tmp.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("thread 1 run after waiting.");
}
},"t1").start();
new Thread(()->{
synchronized (tmp){
log.warn("this is thread 2");
try {
tmp.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.warn("thread 2 run after waiting.");
}
},"t2").start();
Thread.sleep(5000); // 主线程休眠一会
// 必须成为monitor的ownerh后(获得锁),才有资格wait,notify
log.warn("start notify threads.");
synchronized (tmp){
tmp.notify(); // 主线程任意唤醒waitset中的一个线程
// tmp.notifyAll(); // 主线程任意唤醒waitset中的所有线程
}
}
}
运行结果:
[t1] WARN c.Test6 - this is thread 1
[t2] WARN c.Test6 - this is thread 2
[main] WARN c.Test6 - start notify threads.
[t1] WARN c.Test6 - thread 1 run after waiting.
sleep(n)与wait(n):
虚假唤醒:当多个线程由于执行条件不满足使用wait进入 WAITING状态。此时只有部分线程的条件满足却notify所有线程。对于那些执行条件仍未满足的线程来说就是虚假唤醒。
避免虚假唤醒引发问题的模板如下:
synchronized(lock) {
while(条件不成立) { //while语句保证线程执行的条件必须满足,避免条件不满足的情况下执行程序
lock.wait();
}
// 干活
}
//另一个线程
synchronized(lock) {
lock.notifyAll();
}
定义:保护性暂停(Guarded Suspension),针对一个线程等待另外一个线程的场景。
知识点:
保护性暂停的一个实例
package chapter3;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
class GuardedObject{
private Object response = null;
public Object get(){
synchronized (this){
while(response == null){
try{
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return response;
}
public void complete(Object response){
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}
@Slf4j(topic = "c.Test7")
class Test7{
public static void main(String[] args) {
log.warn("start the main thread!");
GuardedObject guard = new GuardedObject();
// 定义一个线程t1去获取另外一个线程t2网页下载的结果
new Thread(()->{
log.warn("等待网页下载");
List<String> downres = (List<String>) guard.get();
log.warn("已经获得下载的网页");
log.warn("网页的大小{}",downres.size());
},"t1").start();
new Thread(()->{
List<String> tmp;
try {
tmp = Downloader.download();
guard.complete(tmp);
log.warn("下载完成");
} catch (IOException e) {
e.printStackTrace();
}
},"t2").start();
}
}
class Downloader{
public static List<String> download() throws IOException {
HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
List<String> lines = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))
){
String tmp;
while((tmp = reader.readLine()) != null){
lines.add(tmp);
}
};
return lines;
}
}
运行结果:
[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - 等待网页下载
[t2] WARN c.Test7 - 下载完成
[t1] WARN c.Test7 - 已经获得下载的网页
[t1] WARN c.Test7 - 网页的大小3
总结:上面的代码中线程t1等待线程t2准备好对象,并通过GuardedObject获得对象。
优势:
有时间限制的保护性暂停的实例
package chapter3;
import javafx.beans.binding.ObjectExpression;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
// 定义了一个guard object进行有时限的等待
class GuardedObjectTime{
private Object response = null;
// timeout:线程最多等待的时间
public Object get(long timeout){
synchronized (this){
long begin = System.currentTimeMillis();
long passedTime = 0;
while(response == null){
// 之所以单独弄一个waittime是因为需要考虑虚假唤醒的情况
// 虚假唤醒后的线程之前等待的时间也要从总的等待时间减去
long waittime = timeout - passedTime;
if(waittime <= 0)
break;
try{
this.wait(waittime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passedTime = System.currentTimeMillis()-begin;
}
}
return response;
}
public void complete(Object response){
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}
@Slf4j(topic = "c.Test7")
class Test8{
public static void main(String[] args) {
log.warn("start the main thread!");
GuardedObjectTime guard = new GuardedObjectTime();
// 定义一个线程t1去获取另外一个线程t2的结果
new Thread(()->{
Object response = guard.get(2000); // 获取结果最多等待2s
log.warn("The result is {}",response);
},"t1").start();
new Thread(()->{
Object tmp = new Object();
try {
// Thread.sleep(3000); // 线程等待时间超过规定时间,无法得到任何结果
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
guard.complete(tmp);
},"t2").start();
}
}
线程t2睡眠时间为3000ms的运行结果:
[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - The result is null
线程t2睡眠时间为1000ms的运行结果:
[main] WARN c.Test7 - start the main thread!
[t1] WARN c.Test7 - The result is java.lang.Object@54ed57cd
join方法的源代码如下:
/**
* Waits for this thread to die.
*
* <p> An invocation of this method behaves in exactly the same
* way as the invocation
*
* <blockquote>
* {@linkplain #join(long) join}{@code (0)}
* </blockquote>
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
// 可以看到没有参数的join调用的是带时间的join
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
// 情况1:等待时间小于0
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 情况2:等待时间等于0
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
// ============================================================================
// 情况3:等待时间>0, 可以看到这段代码的逻辑 *有时间限制的保护性暂停的实例*中get的方法逻辑相一致
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
// =============================================================================
}
}
总结:join的源码的实现也是利用了保护性暂停的设计模式。
场景:考虑邮递员,收件人,信箱三种线程对象:
动机:多个类之间使用GuardedObject对象作为参数传递不是很方便,因此需要设计一个解耦的中间类,能够做到以下几点:
基本思想:
step1: 为guardedObject添加id。
step2: 定义管理类管理多个guardedObject,有以下功能:
step3:业务类调用管理类实现线程之间的通信
注意点:
package chapter4;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import chapter2.Sleeper;
import lombok.extern.slf4j.Slf4j;
class GuardedObject {
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
private Object response;
public Object get(long timeout) {
synchronized (this) {
long begin = System.currentTimeMillis();
long passedTime = 0;
while (response == null) {
long waitTime = timeout - passedTime;
if (timeout - passedTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虚假唤醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
}
public void complete(Object response) {
synchronized (this) {
this.response = response;
this.notifyAll();
}
}
}
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 产生唯一 id
private static synchronized int generateId() {
return id++;
}
// 由于HashTABLE是线程安全的,所以下面2个方法不需要去synchroized
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.warn("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.warn("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j(topic = "c.PostMan")
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.warn("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
@Slf4j(topic = "c.test1")
public class test1 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
System.out.println(Mailboxes.getIds());
Thread.sleep(1000);
System.out.println(Mailboxes.getIds());
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
运行结果
[]
[Thread-2] WARN c.People - 开始收信 id:3
[Thread-0] WARN c.People - 开始收信 id:2
[Thread-1] WARN c.People - 开始收信 id:1
[3, 2, 1]
[Thread-3] WARN c.PostMan - 送信 id:3, 内容:内容3
[Thread-4] WARN c.PostMan - 送信 id:2, 内容:内容2
[Thread-2] WARN c.People - 收到信 id:3, 内容:内容3
[Thread-0] WARN c.People - 收到信 id:2, 内容:内容2
[Thread-5] WARN c.PostMan - 送信 id:1, 内容:内容1
[Thread-1] WARN c.People - 收到信 id:1, 内容:内容1
生产者与消费者模式特点:
应用场景:JDK 中各种阻塞队列,采用的就是这种模式
问题:生产者与消费者模式与保护性暂停模式的区别?
package chapter4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
// 获取网页下载的内容,并存入到List
class Downloader{
public static List<String> download() throws IOException {
HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
List<String> lines = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))
){
String tmp;
while((tmp = reader.readLine()) != null){
lines.add(tmp);
}
};
return lines;
}
}
final class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
// 使用双向链表实现消息队列,用于存储message instance
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
// 消费者线程判断消息队列是否为空,为空则等待
while (queue.isEmpty()) {
log.warn("没货了, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列的头部获取message
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
// 生产者线程判断队列是否已满,为空则等待
while (queue.size() == capacity) {
log.warn("库存已达上限, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
@Slf4j(topic = "c.test2")
public class test2 {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 下载任务
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
try{
List<String> response = Downloader.download();
log.warn("try put message({})", id);
messageQueue.put(new Message(id, response));
} catch (IOException e) {
e.printStackTrace();
}
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
List<String> response = (List<String>) message.getMessage();
log.warn("take message({}): [{}] lines", message.getId(), response.size());
}
}, "消费者").start();
}
}
总结:生产者与消费者模式本质上依旧是wait/notify组合的应用。
20210228
原文:https://www.cnblogs.com/kfcuj/p/14458395.html