目录
wait , notify
wait vs sleep
正确使用方法
同步保护性暂停
join的源码
Future
异步生产者/消费者模型
定义
Park & Unpark
原理
wait , notify
小故事小南需要烟才能工作,但它又要占这锁让别人无法进来。那么这个时候开一个waitSet相当于就是休息室让小南进去。并且释放锁。如果烟到了,那么notify小南就能够继续工作了。
Blocked和Waiting区别其实就是waiting是释放了锁,blocked是没有锁waiting被notify之后仍然需要进入到entrylist进行等待。
@Slf4j(topic = "c.TestWaitNotify")public class Test {// 锁对象final static Object obj = new Object();public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (obj) {log.debug("执行....");try {obj.wait(); // 让线程在obj上一直等待下去} catch (InterruptedException e) {e.printStackTrace();}log.debug("其它代码....");}},"t1").start();new Thread(() -> {synchronized (obj) {log.debug("执行....");try {obj.wait(); // 让线程在obj上一直等待下去} catch (InterruptedException e) {e.printStackTrace();}log.debug("其它代码....");}},"t2").start();// 主线程两秒后执行Thread.sleep(5000);log.debug("唤醒 obj 上其它线程");synchronized (obj) {// obj.notify(); // 唤醒obj上一个线程obj.notifyAll(); // 唤醒obj上所有等待线程}}}
20:17:53.579 [t1] DEBUG c.TestWaitNotify - 执行....20:17:53.581 [t2] DEBUG c.TestWaitNotify - 执行....20:17:58.584 [main] DEBUG c.TestWaitNotify - 唤醒 obj 上其它线程20:17:58.584 [t2] DEBUG c.TestWaitNotify - 其它代码....20:17:58.584 [t1] DEBUG c.TestWaitNotify - 其它代码....进程已结束,退出代码0
wait vs sleep
sleep:Thread调用,静态方法,而且不会释放锁
wait:所有obj,但是要配合synchronize使用,可以释放锁
sleep在睡眠时,不会释放锁,wait会释放对象锁
通常锁会加上final防止被修改
正确使用方法
小南需要烟才能工作,如果是使用sleep不释放锁,那么其他需要等待干活的人就会干等着,等烟来。但是wait可以让小南释放锁,让其他线程工作,并且唤醒小南
@Slf4j(topic = "c.TestCorrectPosture")public class Test {static final Object room = new Object();// 有无烟static boolean hasCigarette = false;static boolean hasTakeout = false;public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (room) {log.debug("有烟没?[{}]", hasCigarette);if (!hasCigarette) {log.debug("没烟,先歇会!");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("有烟没?[{}]", hasCigarette);if (hasCigarette) {log.debug("可以开始干活了");}}}, "小南").start();for (int i = 0; i < 5; i++) {new Thread(() -> {synchronized (room) {log.debug("可以开始干活了");}}, "其它人").start();}Thread.sleep(1000);// 送烟线程new Thread(() -> {synchronized (room) {hasCigarette = true;log.debug("烟到了噢!");}}, "送烟的").start();}}
20:32:22.014 [小南] DEBUG c.TestCorrectPosture - 有烟没?[false]20:32:22.019 [小南] DEBUG c.TestCorrectPosture - 没烟,先歇会!20:32:24.024 [小南] DEBUG c.TestCorrectPosture - 有烟没?[false]20:32:24.024 [送烟的] DEBUG c.TestCorrectPosture - 烟到了噢!20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了20:32:24.024 [其它人] DEBUG c.TestCorrectPosture - 可以开始干活了进程已结束,退出代码0
存在的问题 :
-
其它干活的线程,都要一致阻塞,效率低
-
就算烟提前送到,也无法立刻醒来
-
送烟加上锁之后,相当于门一直锁着,烟送不进去
改进 :
@Slf4j(topic = "c.TestCorrectPosture")public class Test {static final Object room = new Object();// 有无烟static boolean hasCigarette = false;static boolean hasTakeout = false;public static void main(String[] args) throws InterruptedException {new Thread(() -> {synchronized (room) {log.debug("有烟没?[{}]", hasCigarette);if (!hasCigarette) {log.debug("没烟,先歇会!");try {room.wait();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("有烟没?[{}]", hasCigarette);if (hasCigarette) {log.debug("可以开始干活了");}}}, "小南").start();for (int i = 0; i < 5; i++) {new Thread(() -> {synchronized (room) {log.debug("可以开始干活了");}}, "其它人").start();}Thread.sleep(1000);// 送烟线程new Thread(() -> {synchronized (room) {hasCigarette = true;log.debug("烟到了噢!");room.notify(); // 叫醒小南线程}}, "送烟的").start();}}
存在问题
会不会有其他线程在等待着锁?如果是那么会不会唤醒错了线程?(虚假唤醒)
解决 :
可以通过while多次判断条件是否成立,直接使用notifyAll来唤醒所有的线程。然后线程被唤醒之后先再次判断条件是否成立,成立那么往下面执行,如果不成立那么继续执行wait。
while (!hasCigarette) {log.debug("没烟,先歇会!");try {room.wait();} catch (InterruptedException e) {e.printStackTrace();}}
正确使用 :
synchronized(lock){while(条件不成立){lock.wait();} // 干活}// 另一个线程synchronized(lock){lock.notifyAll();}
同步保护性暂停
定义
-
t1需要t2的结果,那么就可以通过一个中间对象guardedObject来充当这个中间商,t2执行完就发送消息到obj,然后obj交给t1
-
如果是不断发送结果那么可以使用消息队列
-
要等待所以是同步
-
join和future就是用的这个原理
public class Test {public static void main(String[] args) {GuaObj guaObj = new GuaObj();Thread thread = new Thread(() -> {System.out.println("锁住,等待结果");guaObj.get(2000);System.out.println("解锁");}, "t1");thread.start();Thread thread1 = new Thread(() -> {System.out.println("先睡两秒");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("解锁,设置对象");guaObj.set(new Object());}, "t2");thread1.start();}}class GuaObj{// 结果public Object response;// 获取结果// timeout表示最多等多久public Object get(long timeout){synchronized (this){// 开始时间long cur = System.currentTimeMillis();// 经历的时间long paseTime=0;while(response==null){try {// 这一轮应该等的时间long waitTime=timeout-paseTime;//超时就不等了if(waitTime<=0) break;this.wait(waitTime);paseTime=System.currentTimeMillis()-cur;} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("等待结束");return response;}}// 产生结果public void set(Object response){synchronized (this){this.response=response;this.notifyAll();}}}
锁住,等待结果先睡两秒解锁,设置对象等待结束解锁进程已结束,退出代码0
-
需要记录超时的时间,并且重新设置waittime,原因是可能会有虚假唤醒,那么这个时候超时时间不是timeout而是timeout-passedTime,也就是线程执行的时间。
-
如果超时的话,那么就会自动结束
join的源码
public final synchronized void join(long millis)throws InterruptedException {//一开始的时间long base = System.currentTimeMillis();//线程执行的时间long now = 0;//如果是<0那么就抛出异常if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");}//如果是0那么就一直等待线程执行完,isAlive是否生存if (millis == 0) {while (isAlive()) {wait(0);}} else {//timeout超时那么就结束while (isAlive()) {long delay = millis - now;if (delay <= 0) {break;}wait(delay);now = System.currentTimeMillis() - base;}}}
Future
相当于就是一个信箱,里面装了很多GuardObject对象,线程可以通过对应的地址访问对象获取结果
异步生产者/消费者模型
定义
相当于就是生产者给队列生产结果,消费者负责处理结果
-
不需要一一对应
-
平衡资源
-
消息队列有容量控制
-
阻塞队列控制结果出队列
public class Test {public static void main(String[] args) {MesageQueue queue = new MesageQueue(2);for (int i = 0; i < 3; i++) {int id = i;new Thread(() -> {queue.set(new Message(id, "值" + id));},"生产者" + i).start();}new Thread(() -> {while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}Message message = queue.take();}}, "消费者").start();}}@Slf4jclass MesageQueue{//存消息的集合private LinkedList<Message> list = new LinkedList();// 消息容量private int capacity;public MesageQueue(int capacity){this.capacity = capacity;}// 获取消息public Message take() {// 检查队列是否为空synchronized (list){while(list.isEmpty()){try {log.debug("队列为空,消费者线程等待");list.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}Message message = list.removeFirst();log.debug("已经消费了消息 {}",message);list.notifyAll();return message;}}// 存入消息public void set(Message message) {// 检查是不是满了synchronized (list){while(list.size() == capacity){try {log.debug("队列已满,生产者线程等待");list.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}list.addLast(message);log.debug("已经生产了消息 {}",message);list.notifyAll();}}}// 消息类final class Message{private int id;private Object value;public Message(int id, Object value){this.id = id;this.value = value;}public int getId() {return id;}public Object getValue() {return value;}@Overridepublic String toString() {return "Message{" +"id=" + id +", value=" + value +'}';}}
12:58:24.373 [生产者1] DEBUG MesageQueue - 已经生产了消息 Message{id=1, value=值1}12:58:24.375 [生产者2] DEBUG MesageQueue - 已经生产了消息 Message{id=2, value=值2}12:58:24.377 [生产者0] DEBUG MesageQueue - 队列已满,生产者线程等待12:58:25.371 [消费者] DEBUG MesageQueue - 已经消费了消息 Message{id=1, value=值1}12:58:25.371 [生产者0] DEBUG MesageQueue - 已经生产了消息 Message{id=0, value=值0}12:58:26.386 [消费者] DEBUG MesageQueue - 已经消费了消息 Message{id=2, value=值2}12:58:27.397 [消费者] DEBUG MesageQueue - 已经消费了消息 Message{id=0, value=值0}12:58:28.405 [消费者] DEBUG MesageQueue - 队列为空,消费者线程等待
Park & Unpark
与wait和notify的区别
-
不需要与monitor一起使用
-
可以精准唤醒和阻塞线程
-
可以先unpark,但是不能先notify。但是unpark之后park不起作用。
原理
①park,先去到counter里面判断是不是0,如果是那么就让线程进入队列。接着就是把counter设置为0
②unpark,那么唤醒线程,恢复运行,并且把counter设置为1
③先unpark后park,那么就unpark补充counter为1,那么park判断counter是1,认为还有体力可以继续执行。