戳蓝字“CSDN云计算”关注我们哦!
作者 | 编程新说李新杰
责编 | 阿秃
多线程的问题都曾经困扰过每个开发人员,今天将从全新视角来解说,希望读者都能明白。强烈建议去运行下文章中的示例代码,自己体会下。
问题究竟出在哪里?
一个线程执行,固然是安全的,但是有时太慢了,怎么办?
老祖宗告诉我们,“一方有难,八方支援”,那不就是多叫几个线程来帮忙嘛,好办呀,多new几个不就行了,又不要钱。这样能管用吗?继续往下看。
俗话说,“在家靠父母,出门靠朋友”。有了朋友的帮助,就会事半功倍。是这样的吗?
不一定,如果朋友“不靠谱”,结果竟是在“添乱”。于是就演变为,“不怕神一样的对手,就怕猪一样的队友”。可见“人多力量大”纵然是对的,但也要配合好才能成事。
人和人是朋友,那线程和线程也是“朋友”,如果多线程之间不能配合好的话,最终也会变为“猪一样的队友”。事实证明,这也不是一件易事。且容我慢慢道来。
开发是一门技术,管理是一门艺术。也许你正想带着兄弟们大干一场,可偏偏就有人要辞职。或者你付出了这么多,但别人从来没有感动过。为什么会这样呢?
因为你面对的是人。每个人都是独立的个体,有思想,有灵魂,有情感,有三观。能够接受外界的“输入”,经过“处理”后,能够产生“输出”。
说白了就是会自主的分析问题,并做出决定。这叫什么呢?答案就是,主观能动性。
拥有主观能动性的物体(比如人),你需要和它协商着或配合着来共同完成一件事情,而不能“强迫”它去做什么,因为这样往往不会有好的结果。
费了这么多口舌,就是希望把问题尽量的简单化。终于可以回到程序了,那线程的情况是不是类似的呢?答案是肯定的。
一个线程准备好后,经过CPU的调度,就可以自主的运行了。此时它俨然成了一个独立的个体,且具有主观能动性。
这本是一件好事,但却也有不好的一面,那就是你对它的“掌控”能力变弱了,颇有一种“将在外,君命有所不受”的感觉。
可能你不同意这种看法,说我可以“强迫”它停止运行,调用Thread类的stop()方法来直接把它“掐死”,不好意思,该方法已废弃。
因为线程可能在运行一些“关键”代码(比如转账),此刻不能被终止。Thread类还有一些其它的方法也都废弃了,大抵原因其实都差不多。
讲了这么多,相信你已经明白了,简单总结一下:
事情起因:线程可以独立自主的运行,可以认为它具有主观能动性。
造成结果:对它的掌控能力变弱了,而且又不能直接把它“干掉”。
解决方案:凡事商量着来,互相配合着把事情完成。
作者观点:其实就是把线程当作人来对待。
一个线程执行,固然是安全的,但是有时太慢了,怎么办?
老祖宗告诉我们,“一方有难,八方支援”,那不就是多叫几个线程来帮忙嘛,好办呀,多new几个不就行了,又不要钱。这样能管用吗?继续往下看。
俗话说,“在家靠父母,出门靠朋友”。有了朋友的帮助,就会事半功倍。是这样的吗?
不一定,如果朋友“不靠谱”,结果竟是在“添乱”。于是就演变为,“不怕神一样的对手,就怕猪一样的队友”。可见“人多力量大”纵然是对的,但也要配合好才能成事。
人和人是朋友,那线程和线程也是“朋友”,如果多线程之间不能配合好的话,最终也会变为“猪一样的队友”。事实证明,这也不是一件易事。且容我慢慢道来。
开发是一门技术,管理是一门艺术。也许你正想带着兄弟们大干一场,可偏偏就有人要辞职。或者你付出了这么多,但别人从来没有感动过。为什么会这样呢?
因为你面对的是人。每个人都是独立的个体,有思想,有灵魂,有情感,有三观。能够接受外界的“输入”,经过“处理”后,能够产生“输出”。
说白了就是会自主的分析问题,并做出决定。这叫什么呢?答案就是,主观能动性。
拥有主观能动性的物体(比如人),你需要和它协商着或配合着来共同完成一件事情,而不能“强迫”它去做什么,因为这样往往不会有好的结果。
费了这么多口舌,就是希望把问题尽量的简单化。终于可以回到程序了,那线程的情况是不是类似的呢?答案是肯定的。
一个线程准备好后,经过CPU的调度,就可以自主的运行了。此时它俨然成了一个独立的个体,且具有主观能动性。
这本是一件好事,但却也有不好的一面,那就是你对它的“掌控”能力变弱了,颇有一种“将在外,君命有所不受”的感觉。
可能你不同意这种看法,说我可以“强迫”它停止运行,调用Thread类的stop()方法来直接把它“掐死”,不好意思,该方法已废弃。
因为线程可能在运行一些“关键”代码(比如转账),此刻不能被终止。Thread类还有一些其它的方法也都废弃了,大抵原因其实都差不多。
讲了这么多,相信你已经明白了,简单总结一下:
事情起因:线程可以独立自主的运行,可以认为它具有主观能动性。
造成结果:对它的掌控能力变弱了,而且又不能直接把它“干掉”。
解决方案:凡事商量着来,互相配合着把事情完成。
作者观点:其实就是把线程当作人来对待。
小试牛刀一下
一旦把线程当成人,就来到了人类的世界,这我们太熟悉了,所以很多问题都会变得非常简单明了。一起来看看吧。
场景一:停止
“大胖,大胖,12点了,该去吃饭了,别写了”
“好的,好的,稍等片刻,把这几行代码写完就走”
要点:把停止的信号传达给别人,别人处理完手头的事情就自己主动停止了。
一旦把线程当成人,就来到了人类的世界,这我们太熟悉了,所以很多问题都会变得非常简单明了。一起来看看吧。
场景一:停止
“大胖,大胖,12点了,该去吃饭了,别写了”
“好的,好的,稍等片刻,把这几行代码写完就走”
要点:把停止的信号传达给别人,别人处理完手头的事情就自己主动停止了。
static void stopByFlag() { ARunnable ar = new ARunnable(); new Thread(ar).start(); ar.tellToStop(); } static class ARunnable implements Runnable { volatile boolean stop; void tellToStop() { stop = true; } @Override public void run() { println("进入不可停止区域 1。。。"); doingLongTime(5); println("退出不可停止区域 1。。。"); println("检测标志stop = %s", String.valueOf(stop)); if (stop) { println("停止执行"); return; } println("进入不可停止区域 2。。。"); doingLongTime(5); println("退出不可停止区域 2。。。"); } }
解说:线程在预设的地点检测flag,来决定是否停止。
场景二:暂停/恢复
“大胖,大胖,先别发请求了,对方服务器快挂了”
“好的,好的,等这个执行完就不发了”
过了一会
“大胖,大胖,可以重新发请求了”
“好的,好的”
要点:把暂停的信号传达给别人,别人处理完手头的事情就自己主动暂停了。但是恢复是无法自主进行的,只能由操作系统来恢复线程的执行。
“大胖,大胖,先别发请求了,对方服务器快挂了”
“好的,好的,等这个执行完就不发了”
过了一会
“大胖,大胖,可以重新发请求了”
“好的,好的”
要点:把暂停的信号传达给别人,别人处理完手头的事情就自己主动暂停了。但是恢复是无法自主进行的,只能由操作系统来恢复线程的执行。
static void pauseByFlag() { BRunnable br = new BRunnable(); new Thread(br).start(); br.tellToPause(); sleep(8); br.tellToResume(); } static class BRunnable implements Runnable { volatile boolean pause; void tellToPause() { pause = true; } void tellToResume() { synchronized (this) { this.notify(); } } @Override public void run() { println("进入不可暂停区域 1。。。"); doingLongTime(5); println("退出不可暂停区域 1。。。"); println("检测标志pause = %s", String.valueOf(pause)); if (pause) { println("暂停执行"); try { synchronized (this) { this.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } println("恢复执行"); } println("进入不可暂停区域 2。。。"); doingLongTime(5); println("退出不可暂停区域 2。。。"); } }
解说:还是在预设的地点检测flag。然后就是wait/notify配合使用。
场景三:插队
“大胖,大胖,让我站到你前面,不想排队了”
“好吧”
要点:别人插队到你前面,必须等他完事后才轮到你。
“大胖,大胖,让我站到你前面,不想排队了”
“好吧”
要点:别人插队到你前面,必须等他完事后才轮到你。
static void jqByJoin() { CRunnable cr = new CRunnable(); Thread t = new Thread(cr); t.start(); sleep(1); try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } println("终于轮到我了"); } static class CRunnable implements Runnable { @Override public void run() { println("进入不可暂停区域 1。。。"); doingLongTime(5); println("退出不可暂停区域 1。。。"); } }
解说:join方法可以让某个线程插到自己前面,等它执行完,自己才会继续执行。
场景四:叫醒
“大胖,大胖,醒醒,醒醒,看谁来了”
“谁啊,我去”
要点:要把别人从睡梦中叫醒,一定要采取稍微暴力一点的手段。
“大胖,大胖,醒醒,醒醒,看谁来了”
“谁啊,我去”
要点:要把别人从睡梦中叫醒,一定要采取稍微暴力一点的手段。
static void stopByInterrupt() { DRunnable dr = new DRunnable(); Thread t = new Thread(dr); t.start(); sleep(2); t.interrupt(); } static class DRunnable implements Runnable { @Override public void run() { println("进入暂停。。。"); try { sleep2(5); } catch (InterruptedException e) { println("收到中断异常。。。"); println("做一些相关处理。。。"); } println("继续执行或选择退出。。。"); } }
解说:线程在sleep或wait时,是处于无法交互的状态的,此时只能使用interrupt方法中断它,线程会被激活并收到中断异常。
常见的协作配合
上面那些场景,其实都是对一个线程的操作,下面来看多线程间的一些配合。
事件一:考试
假设今天考试,20个学生,1个监考老师。规定学生可以提前交卷,即把卷子留下,直接走人就行了。
但老师必须等到所有的学生都走后,才可以收卷子,然后装订打包。
如果把学生和老师都看作线程,就是1个线程和20个线程的配合问题,即等20个线程都结束了,这1个线程才开始。
比如20个线程分别在计算数据,等它们都结束后得到20个中间结果,最后这1个线程再进行后续汇总、处理等。
上面那些场景,其实都是对一个线程的操作,下面来看多线程间的一些配合。
事件一:考试
假设今天考试,20个学生,1个监考老师。规定学生可以提前交卷,即把卷子留下,直接走人就行了。
但老师必须等到所有的学生都走后,才可以收卷子,然后装订打包。
如果把学生和老师都看作线程,就是1个线程和20个线程的配合问题,即等20个线程都结束了,这1个线程才开始。
比如20个线程分别在计算数据,等它们都结束后得到20个中间结果,最后这1个线程再进行后续汇总、处理等。
static final int COUNT = 20; public static void main(String[] args) throws Exception { new Thread(new Teacher(cdl)).start(); sleep(1); for (int i = 0; i < COUNT; i++) { new Thread(new Student(i, cdl)).start(); } synchronized (ThreadCo1.class) { ThreadCo1.class.wait(); } } static CountDownLatch cdl = new CountDownLatch(COUNT); static class Teacher implements Runnable { CountDownLatch cdl; Teacher(CountDownLatch cdl) { this.cdl = cdl; } @Override public void run() { println("老师发卷子。。。"); try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } println("老师收卷子。。。"); } } static class Student implements Runnable { CountDownLatch cdl; int num; Student(int num, CountDownLatch cdl) { this.num = num; this.cdl = cdl; } @Override public void run() { println("学生(%d)写卷子。。。", num); doingLongTime(); println("学生(%d)交卷子。。。", num); cdl.countDown(); } }
解说:每完成一个线程,计数器减1,当减到0时,被阻塞的线程自动执行。
事件二:旅游
最近景色宜人,公司组织去登山,大伙都来到了山脚下,登山过程自由进行。
但为了在特定的地点拍集体照,规定1个小时后在半山腰集合,谁最后到的,要给大家表演一个节目。
然后继续登山,在2个小时后,在山顶集合拍照,还是谁最后到的表演节目。
接着开始下山了,在2个小时后在山脚下集合,点名回家,最后到的照例表演节目。
最近景色宜人,公司组织去登山,大伙都来到了山脚下,登山过程自由进行。
但为了在特定的地点拍集体照,规定1个小时后在半山腰集合,谁最后到的,要给大家表演一个节目。
然后继续登山,在2个小时后,在山顶集合拍照,还是谁最后到的表演节目。
接着开始下山了,在2个小时后在山脚下集合,点名回家,最后到的照例表演节目。
static final int COUNT = 5; public static void main(String[] args) throws Exception { for (int i = 0; i < COUNT; i++) { new Thread(new Staff(i, cb)).start(); } synchronized (ThreadCo2.class) { ThreadCo2.class.wait(); } } static CyclicBarrier cb = new CyclicBarrier(COUNT, new Singer()); static class Singer implements Runnable { @Override public void run() { println("为大家唱歌。。。"); } } static class Staff implements Runnable { CyclicBarrier cb; int num; Staff(int num, CyclicBarrier cb) { this.num = num; this.cb = cb; } @Override public void run() { println("员工(%d)出发。。。", num); doingLongTime(); println("员工(%d)到达地点一。。。", num); try { cb.await(); } catch (Exception e) { e.printStackTrace(); } println("员工(%d)再出发。。。", num); doingLongTime(); println("员工(%d)到达地点二。。。", num); try { cb.await(); } catch (Exception e) { e.printStackTrace(); } println("员工(%d)再出发。。。", num); doingLongTime(); println("员工(%d)到达地点三。。。", num); try { cb.await(); } catch (Exception e) { e.printStackTrace(); } println("员工(%d)结束。。。", num); } }
解说:某个线程到达预设点时就在此等待,等所有的线程都到达时,大家再一起向下个预设点出发。如此循环反复下去。
事件三:劳动
大胖和小白去了创业公司,公司为了节约开支,没有请专门的保洁人员。让员工自己扫地和擦桌。
大胖觉得擦桌轻松,就让小白去扫地。可小白觉得扫地太累,也想擦桌。
为了公平起见,于是决定,每人先干一半,然后交换工具,再接着干对方剩下的那一个半。
大胖和小白去了创业公司,公司为了节约开支,没有请专门的保洁人员。让员工自己扫地和擦桌。
大胖觉得擦桌轻松,就让小白去扫地。可小白觉得扫地太累,也想擦桌。
为了公平起见,于是决定,每人先干一半,然后交换工具,再接着干对方剩下的那一个半。
public static void main(String[] args) throws Exception { new Thread(new Staff("大胖", new Tool("笤帚", "扫地"), ex)).start(); new Thread(new Staff("小白", new Tool("抹布", "擦桌"), ex)).start(); synchronized (ThreadCo3.class) { ThreadCo3.class.wait(); } } static Exchanger<Tool> ex = new Exchanger<>(); static class Staff implements Runnable { String name; Tool tool; Exchanger<Tool> ex; Staff(String name, Tool tool, Exchanger<Tool> ex) { this.name = name; this.tool = tool; this.ex = ex; } @Override public void run() { println("%s拿的工具是[%s],他开始[%s]。。。", name, tool.name, tool.work); doingLongTime(); println("%s开始交换工具。。。", name); try { tool = ex.exchange(tool); } catch (Exception e) { e.printStackTrace(); } println("%s的工具变为[%s],他开始[%s]。。。", name, tool.name, tool.work); } } static class Tool { String name; String work; Tool(String name, String work) { this.name = name; this.work = work; } }
解说:两个线程在预设点交换变量,先到达的等待对方。
事件四:魔性游戏
这是一个充满魔性的小游戏,由一个团队一起参加。所有人每隔5秒钟抽一次签,每个人有50%的概率留下来或被淘汰。
留下来的人下次抽签时同样有50%的概率被淘汰。被淘汰的人下次抽签时同样有50%的概率复活。
团队所有成员都被淘汰完,为挑战失败,团队所有成员都回到游戏中(除刚开始外),为挑战成功。
比如一开始10人参与游戏,第一轮抽签后,6人留下,4人淘汰。
第二轮抽签后,留下的6人中4人被淘汰,淘汰的4人中2人复活,那么目前是4人在游戏中,6人被淘汰。
一直如此继续下去,直到10人全部被淘汰,或全部回到游戏中。
可见,人数越多,全部被淘汰的概率越小,但全部回到游戏中的概率也越小。
反之,人数越少,全部回到游戏中的概率越大,但全部被淘汰的概率也越大。
是不是很有魔性啊。哈哈。
这是一个充满魔性的小游戏,由一个团队一起参加。所有人每隔5秒钟抽一次签,每个人有50%的概率留下来或被淘汰。
留下来的人下次抽签时同样有50%的概率被淘汰。被淘汰的人下次抽签时同样有50%的概率复活。
团队所有成员都被淘汰完,为挑战失败,团队所有成员都回到游戏中(除刚开始外),为挑战成功。
比如一开始10人参与游戏,第一轮抽签后,6人留下,4人淘汰。
第二轮抽签后,留下的6人中4人被淘汰,淘汰的4人中2人复活,那么目前是4人在游戏中,6人被淘汰。
一直如此继续下去,直到10人全部被淘汰,或全部回到游戏中。
可见,人数越多,全部被淘汰的概率越小,但全部回到游戏中的概率也越小。
反之,人数越少,全部回到游戏中的概率越大,但全部被淘汰的概率也越大。
是不是很有魔性啊。哈哈。
static final int COUNT = 6; public static void main(String[] args) throws Exception { new Thread(new Challenger("张三")).start(); new Thread(new Challenger("李四")).start(); new Thread(new Challenger("王五")).start(); new Thread(new Challenger("赵六")).start(); new Thread(new Challenger("大胖")).start(); new Thread(new Challenger("小白")).start(); synchronized (ThreadCo4.class) { ThreadCo4.class.wait(); } } static Phaser ph = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { println2("第(%d)局,剩余[%d]人", phase, registeredParties); return registeredParties == 0 || (phase != 0 && registeredParties == COUNT); }; }; static class Challenger implements Runnable { String name; int state; Challenger(String name) { this.name = name; this.state = 0; } @Override public void run() { println("[%s]开始挑战。。。", name); ph.register(); int phase = 0; int h; while (!ph.isTerminated() && phase < 100) { doingLongTime(5); if (state == 0) { if (Decide.goon()) { h = ph.arriveAndAwaitAdvance(); if (h < 0) println("No%d.[%s]继续,但已胜利。。。", phase, name); else println("No%d.[%s]继续at(%d)。。。", phase, name, h); } else { state = -1; h = ph.arriveAndDeregister(); println("No%d.[%s]退出at(%d)。。。", phase, name, h); } } else { if (Decide.revive()) { state = 0; h = ph.register(); if (h < 0) println("No%d.[%s]复活,但已失败。。。", phase, name); else println("No%d.[%s]复活at(%d)。。。", phase, name, h); } else { println("No%d.[%s]没有复活。。。", phase, name); } } phase++; } if (state == 0) { ph.arriveAndDeregister(); } println("[%s]结束。。。", name); } } static class Decide { static boolean goon() { return random(9) > 4; } static boolean revive() { return random(9) < 5; } }
解说:某个线程到达预设点后,可以选择等待同伴或自己退出,等大家都到达后,再一起向下一个预设点出发,随时都可以有新的线程加入,退出的也可以再次加入。
生产与销售的问题
在现实中,工厂生产出来的产品会先放到仓库存储,销售人员签了单子后,会从仓库把产品发给客户。
如果生产的过快,仓库里产品越堆越多,直到把仓库堆满,那就必须停止生产,因为没地方放了。
此时只能让销售人员赶紧出去签单子,把产品发出去,仓库就有了空间,可以恢复生产了。
如果销售的过快,仓库里产品越来越少,直到把仓库清空,那就必须停止销售,因为没产品了。
此时只能让生产人员赶紧生产产品,把产品放到仓库里,仓库里就有了产品,可以恢复销售了。
可能会有人问,为什么不让生产和销售直接挂钩呢,把仓库这个环节去掉?
这样会造成两种不好的情况:
一是突然来了很多单子,生产人员累成死Dog也生产不出来。
二是很长时间没有单子,生产人员闲成废Dog也无事可做。
用稍微“专业”点的术语就是此时的生产和销售是一种强耦合的关系,销售的波动对生产影响太大。
仓库就是一个缓冲区,能有效的吸收波动,很大程度上减少波动的传递,起到一种解耦作用,由强耦合变成一种松散耦合。
这其实就对应计算机里经典的生产者和消费者问题。
经典的生产者和消费者
一到多个线程充当生产者,生产元素。一到多个线程充当消费者,消费元素。
在两者之间插入一个队列(Queue)充当缓冲区,建立起生产者和消费者的松散耦合。
正常情况下,即生产元素的速度和消费元素的速度差不多时,生产者和消费者其实是不需要去关注对方的。
生产者可以一直生产,因为队列里总是有空间。消费者可以一直消费,因为队列里总是有元素。即达到一个动态的平衡。
但在特殊情况下,比如生产元素的速度很快,队列里没有了空间,此时生产者必须自我“ba工”,开始“睡大觉”。
一旦消费者消费了元素之后,队列里才会有空间,生产者才可以重启生产,所以,消费者在消费完元素后有义务去叫醒生产者复工。
更准确的说法应该是,只有在生产者“睡大觉”时,消费者消费完元素后才需要去叫醒生产者。否则,其实可以不用叫醒,因为人家本来就没睡。
反之,如果消费元素的速度很快,队列里没有了元素,只需把上述情况颠倒过来即可。
但这样的话就会引入一个新的问题,就是要能够准备的判断出对方有没有在睡大觉,为此就必须定义一个状态变量,在自己即将开始睡大觉时,自己设置下这个变量。
对方通过检测这个变量,来决定是否进行叫醒操作。当自己被叫醒后,首先要做的就是清除一下这个变量,表明我已经醒来复工了。
这样就需要多维护一个变量和多了一部分判断逻辑。可能有些人会觉得可以通过判断队列的“空”或“满”(即队列中的元素数目)来决定是否进行叫醒操作。
在高并发下,可能刚刚判断队列不为空,瞬间之后队列可能已经变为空的了,这样会导致逻辑出错。线程可能永远无法被叫醒。
因此,综合所有,生产者每生产一个元素后,都会通知消费者,“现在有元素的,你可以消费”。
同样,消费者每消费一个元素后,也会通知生产者,“现在有空间的,你可以生产”。
很明显,这些通知很多时候(即对方没有睡大觉时)是没有真正意义的,不过无所谓,只要忽略它们就行了。
就是“宁可错杀一千,也不放过一个”。首先要保证是正确的,然后才有资格去BB别的。
在现实中,工厂生产出来的产品会先放到仓库存储,销售人员签了单子后,会从仓库把产品发给客户。
如果生产的过快,仓库里产品越堆越多,直到把仓库堆满,那就必须停止生产,因为没地方放了。
此时只能让销售人员赶紧出去签单子,把产品发出去,仓库就有了空间,可以恢复生产了。
如果销售的过快,仓库里产品越来越少,直到把仓库清空,那就必须停止销售,因为没产品了。
此时只能让生产人员赶紧生产产品,把产品放到仓库里,仓库里就有了产品,可以恢复销售了。
可能会有人问,为什么不让生产和销售直接挂钩呢,把仓库这个环节去掉?
这样会造成两种不好的情况:
一是突然来了很多单子,生产人员累成死Dog也生产不出来。
二是很长时间没有单子,生产人员闲成废Dog也无事可做。
用稍微“专业”点的术语就是此时的生产和销售是一种强耦合的关系,销售的波动对生产影响太大。
仓库就是一个缓冲区,能有效的吸收波动,很大程度上减少波动的传递,起到一种解耦作用,由强耦合变成一种松散耦合。
这其实就对应计算机里经典的生产者和消费者问题。
经典的生产者和消费者
一到多个线程充当生产者,生产元素。一到多个线程充当消费者,消费元素。
在两者之间插入一个队列(Queue)充当缓冲区,建立起生产者和消费者的松散耦合。
正常情况下,即生产元素的速度和消费元素的速度差不多时,生产者和消费者其实是不需要去关注对方的。
生产者可以一直生产,因为队列里总是有空间。消费者可以一直消费,因为队列里总是有元素。即达到一个动态的平衡。
但在特殊情况下,比如生产元素的速度很快,队列里没有了空间,此时生产者必须自我“ba工”,开始“睡大觉”。
一旦消费者消费了元素之后,队列里才会有空间,生产者才可以重启生产,所以,消费者在消费完元素后有义务去叫醒生产者复工。
更准确的说法应该是,只有在生产者“睡大觉”时,消费者消费完元素后才需要去叫醒生产者。否则,其实可以不用叫醒,因为人家本来就没睡。
反之,如果消费元素的速度很快,队列里没有了元素,只需把上述情况颠倒过来即可。
但这样的话就会引入一个新的问题,就是要能够准备的判断出对方有没有在睡大觉,为此就必须定义一个状态变量,在自己即将开始睡大觉时,自己设置下这个变量。
对方通过检测这个变量,来决定是否进行叫醒操作。当自己被叫醒后,首先要做的就是清除一下这个变量,表明我已经醒来复工了。
这样就需要多维护一个变量和多了一部分判断逻辑。可能有些人会觉得可以通过判断队列的“空”或“满”(即队列中的元素数目)来决定是否进行叫醒操作。
在高并发下,可能刚刚判断队列不为空,瞬间之后队列可能已经变为空的了,这样会导致逻辑出错。线程可能永远无法被叫醒。
因此,综合所有,生产者每生产一个元素后,都会通知消费者,“现在有元素的,你可以消费”。
同样,消费者每消费一个元素后,也会通知生产者,“现在有空间的,你可以生产”。
很明显,这些通知很多时候(即对方没有睡大觉时)是没有真正意义的,不过无所谓,只要忽略它们就行了。
就是“宁可错杀一千,也不放过一个”。首先要保证是正确的,然后才有资格去BB别的。
public static void main(String[] args) { Queue queue = new Queue(); new Thread(new Producer(queue)).start(); new Thread(new Producer(queue)).start(); new Thread(new Consumer(queue)).start(); } static class Producer implements Runnable { Queue queue; Producer(Queue queue) { this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 10000; i++) { doingLongTime(); queue.putEle(random(10000)); } } catch (Exception e) { e.printStackTrace(); } } } static class Consumer implements Runnable { Queue queue; Consumer(Queue queue) { this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 10000; i++) { doingLongTime(); queue.takeEle(); } } catch (Exception e) { e.printStackTrace(); } } } static class Queue { Lock lock = new ReentrantLock(); Condition prodCond = lock.newCondition(); Condition consCond = lock.newCondition(); final int CAPACITY = 10; Object[] container = new Object[CAPACITY]; int count = 0; int putIndex = 0; int takeIndex = 0; public void putEle(Object ele) throws InterruptedException { try { lock.lock(); while (count == CAPACITY) { println("队列已满:%d,生产者开始睡大觉。。。", count); prodCond.await(); } container[putIndex] = ele; println("生产元素:%d", ele); putIndex++; if (putIndex >= CAPACITY) { putIndex = 0; } count++; println("通知消费者去消费。。。"); consCond.signalAll(); } finally { lock.unlock(); } } public Object takeEle() throws InterruptedException { try { lock.lock(); while (count == 0) { println("队列已空:%d,消费者开始睡大觉。。。", count); consCond.await(); } Object ele = container[takeIndex]; println("消费元素:%d", ele); takeIndex++; if (takeIndex >= CAPACITY) { takeIndex = 0; } count--; println("通知生产者去生产。。。"); prodCond.signalAll(); return ele; } finally { lock.unlock(); } } }
解说:其实就是对await/signalAll的应用,几乎面试必问。
源代码:
https://github.com/coding-new-talking/java-code-demo.git
???特邀各路大大免费入驻CSDN啦,除云计算相关书籍免费赠送外,还有海量福利奥~详情戳?下方图片,么么哒~
福利
扫描添加小编微信,备注“姓名+公司职位”,入驻【CSDN博客】,加入【云计算学习交流群】,和志同道合的朋友们共同打卡学习!
推荐阅读:
- 【不整虚的 全是干货】阿里云 K8S 集群网络详解
- 【ball ball you 不要再重复造轮子了】Elasticsearch 搜索之路(一)
搞开发没加薪、也没升职?都被你浪没了!
- 估值被砍700亿美元后,Waymo发重磅公开信:即将推出全自动驾驶打车服务
Python GUI开发,效率提升10倍的方法!
- 【光说不练假把式】今天说一说Kubernetes 在有赞的实践
- 2019年胡润百富榜发布,比特大陆创始人詹克团成「中国区块链首富」!
- 揭秘 OceanBase 勇夺 TPC 榜首的王者攻略!
真香,朕在看了!