『并发包入坑指北』之阻塞队列

006tNc79ly1g1vn9xpgp4j31ak0u013m.jpg

前言

较长一段时间以来我都发现不少开发者对 jdk 中的 J.U.C(java.util.concurrent)也就是 Java 并发包的使用甚少,更别谈对它的理解了;但这却也是我们进阶的必备关卡。

之前或多或少也分享过相关内容,但都不成体系;于是便想整理一套与并发包相关的系列文章。

其中的内容主要包含以下几个部分:

  • 根据定义自己实现一个并发工具。
  • JDK 的标准实现。
  • 实践案例。

基于这三点我相信大家对这部分内容不至于一问三不知。

既然开了一个新坑,就不想做的太差;所以我打算将这个列表下的大部分类都讲到。

006tNc79ly1g1vpwdqbkrj30ab09nmy9.jpg

所以本次重点讨论 ArrayBlockingQueue

自己实现

在自己实现之前先搞清楚阻塞队列的几个特点:

  • 基本队列特性:先进先出。
  • 写入队列空间不可用时会阻塞。
  • 获取队列数据时当队列为空时将阻塞。

实现队列的方式多种,总的来说就是数组和链表;其实我们只需要搞清楚其中一个即可,不同的特性主要表现为数组和链表的区别。

这里的 ArrayBlockingQueue 看名字很明显是由数组实现。

我们先根据它这三个特性尝试自己实现试试。

初始化队列

我这里自定义了一个类:ArrayQueue,它的构造函数如下:

    public ArrayQueue(int size) {items = new Object[size];}

很明显这里的 items 就是存放数据的数组;在初始化时需要根据大小创建数组。

006tNc79ly1g1wd71n229j30wb0u043w.jpg

写入队列

写入队列比较简单,只需要依次把数据存放到这个数组中即可,如下图:

006tNc79ly1g1we7yeykej30b0060mxc.jpg

但还是有几个需要注意的点:

  • 队列满的时候,写入的线程需要被阻塞。
  • 写入过队列的数量大于队列大小时需要从第一个下标开始写。

先看第一个队列满的时候,写入的线程需要被阻塞,先来考虑下如何才能使一个线程被阻塞,看起来的表象线程卡住啥事也做不了。

有几种方案可以实现这个效果:

  • Thread.sleep(timeout)线程休眠。
  • object.wait() 让线程进入 waiting 状态。

当然还有一些 join、LockSupport.part 等不在本次的讨论范围。

阻塞队列还有一个非常重要的特性是:当队列空间可用时(取出队列),写入线程需要被唤醒让数据可以写入进去。

所以很明显Thread.sleep(timeout)不合适,它在到达超时时间之后便会继续运行;达不到空间可用时才唤醒继续运行这个特点。

其实这样的一个特点很容易让我们想到 Java 的等待通知机制来实现线程间通信;更多线程见通信的方案可以参考这里:深入理解线程通信

所以我这里的做法是,一旦队列满时就将写入线程调用 object.wait() 进入 waiting 状态,直到空间可用时再进行唤醒。

    /*** 队列满时的阻塞锁*/private Object full = new Object();/*** 队列空时的阻塞锁*/private Object empty = new Object();

006tNc79ly1g1wf8de8jzj30te0tin1i.jpg

所以这里声明了两个对象用于队列满、空情况下的互相通知作用。

在写入数据成功后需要使用 empty.notify(),这样的目的是当获取队列为空时,一旦写入数据成功就可以把消费队列的线程唤醒。

这里的 wait 和 notify 操作都需要对各自的对象使用 synchronized 方法块,这是因为 wait 和 notify 都需要获取到各自的锁。

消费队列

上文也提到了:当队列为空时,获取队列的线程需要被阻塞,直到队列中有数据时才被唤醒。

006tNc79ly1g1wfhr3r6qj30tg0tiwit.jpg

代码和写入的非常类似,也很好理解;只是这里的等待、唤醒恰好是相反的,通过下面这张图可以很好理解:

006tNc79ly1g1wfwr016gj30o20ksq59.jpg

总的来说就是:

  • 写入队列满时会阻塞直到获取线程消费了队列数据后唤醒写入线程
  • 消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程

测试

先来一个基本的测试:单线程的写入和消费。

006tNc79ly1g1wg97uqgpj30uu0dqwgu.jpg

3
123
1234
12345

通过结果来看没什么问题。


当写入的数据超过队列的大小时,就只能消费之后才能接着写入。

006tNc79ly1g1wgmshqfyj316o0n2ae5.jpg

2019-04-09 16:24:41.040 [Thread-0] INFO  c.c.concurrent.ArrayQueueTest - [Thread-0]123
2019-04-09 16:24:41.040 [main] INFO  c.c.concurrent.ArrayQueueTest - size=3
2019-04-09 16:24:41.047 [main] INFO  c.c.concurrent.ArrayQueueTest - 1234
2019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 12345
2019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 123456

从运行结果也能看出只有当消费数据后才能接着往队列里写入数据。


006tNc79ly1g1wiskvki8j30yy0eo0ve.jpg

006tNc79ly1g1witm4twpj31q60ai0vz.jpg

而当没有消费时,再往队列里写数据则会导致写入线程被阻塞。

并发测试

006tNc79ly1g1wiwyz4j5j30vz0u044f.jpg

三个线程并发写入300条数据,其中一个线程消费一条。

=====0
299

最终的队列大小为 299,可见线程也是安全的。

由于不管是写入还是获取方法里的操作都需要获取锁才能操作,所以整个队列是线程安全的。

ArrayBlockingQueue

下面来看看 JDK 标准的 ArrayBlockingQueue 的实现,有了上面的基础会更好理解。

初始化队列

006tNc79ly1g1wkaau8w7j30ze0lcagb.jpg

看似要复杂些,但其实逐步拆分后也很好理解:

第一步其实和我们自己写的一样,初始化一个队列大小的数组。

第二步初始化了一个重入锁,这里其实就和我们之前使用的 synchronized 作用一致的;

只是这里在初始化重入锁的时候默认是非公平锁,当然也可以指定为 true 使用公平锁;这样就会按照队列的顺序进行写入和消费。

更多关于 ReentrantLock 的使用和原理请参考这里:ReentrantLock 实现原理

三四两步则是创建了 notEmpty notFull 这两个条件,他的作用于用法和之前使用的 object.wait/notify 类似。

这就是整个初始化的内容,其实和我们自己实现的非常类似。

写入队列

006tNc79ly1g1wktuhxzuj30tk0bqq55.jpg
006tNc79ly1g1wktfkwu2j30ug09ugnn.jpg

其实会发现阻塞写入的原理都是差不多的,只是这里使用的是 Lock 来显式获取和释放锁。

同时其中的 notFull.await();notEmpty.signal(); 和我们之前使用的 object.wait/notify 的用法和作用也是一样的。

当然它还是实现了超时阻塞的 API

006tNc79ly1g1wl1n7ir5j30vm0iqdjb.jpg

也是比较简单,使用了一个具有超时时间的等待方法。

消费队列

再看消费队列:

006tNc79ly1g1wl3vcsioj30tc0ayq4y.jpg
006tNc79ly1g1wl4cfrnlj30u00eq0vm.jpg

也是差不多的,一看就懂。

而其中的超时 API 也是使用了 notEmpty.awaitNanos(nanos) 来实现超时返回的,就不具体说了。

实际案例

说了这么多,来看一个队列的实际案例吧。

背景是这样的:

有一个定时任务会按照一定的间隔时间从数据库中读取一批数据,需要对这些数据做校验同时调用一个远程接口。

简单的做法就是由这个定时任务的线程去完成读取数据、消息校验、调用接口等整个全流程;但这样会有一个问题:

假设调用外部接口出现了异常、网络不稳导致耗时增加就会造成整个任务的效率降低,因为他都是串行会互相影响。

所以我们改进了方案:

006tNc79ly1g1wm1v7mfxj30qs0aiq4g.jpg

其实就是一个典型的生产者消费者模型:

  • 生产线程从数据库中读取消息丢到队列里。
  • 消费线程从队列里获取数据做业务逻辑。

这样两个线程就可以通过这个队列来进行解耦,互相不影响,同时这个队列也能起到缓冲的作用。

但在使用过程中也有一些小细节值得注意。

因为这个外部接口是支持批量执行的,所以在消费线程取出数据后会在内存中做一个累加,一旦达到阈值或者是累计了一个时间段便将这批累计的数据处理掉。

但由于开发者的大意,在消费的时候使用的是 queue.take() 这个阻塞的 API;正常运行没啥问题。

可一旦原始的数据源,也就是 DB 中没数据了,导致队列里的数据也被消费完后这个消费线程便会被阻塞。

这样上一轮积累在内存中的数据便一直没机会使用,直到数据源又有数据了,一旦中间间隔较长时便可能会导致严重的业务异常。

所以我们最好是使用 queue.poll(timeout) 这样带超时时间的 api,除非业务上有明确的要求需要阻塞。

这个习惯同样适用于其他场景,比如调用 http、rpc 接口等都需要设置合理的超时时间。

总结

关于 ArrayBlockingQueue 的相关分享便到此结束,接着会继续更新其他并发容器及并发工具。

对本文有任何相关问题都可以留言讨论。

本文涉及到的所有源码:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/ArrayQueue.java

你的点赞与分享是对我最大的支持

转载于:https://www.cnblogs.com/crossoverJie/p/10681080.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/449267.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

个人理财有哪些基本原理和方法?

现金为王:不超额消费,不使用信用卡,不负债(房贷除外) 信贷消费已经成为主流的今天,强调使用现金似乎与时代格格不入。而对于信贷消费的依赖,常常来自于下面几个看起来十分有力的观点&#xff…

2019年3月4日 701. Insert into a Binary Search Tree

比较基础的二叉树排序树插入,写了个递归。# Definition for a binary tree node. # class TreeNode(object): # def __init__(self, x): # self.val x # self.left None # self.right Noneclass Solution(object):def insertIntoBST…

2020-3-18

题目一: JavaScript 字符串转换为数组 其一: let str"apple"; console.log([...str]);运行结果 其二(使用split()): let str"apple"; console.log(str.split());注1:如果将参数省略…

思维导图,流程图模板整合

思维导图与流程图在工作中都是经常使用的,出现频率较高的,有些不会绘制的或者是刚接触这一类的图表形式的都会选择使用模板来完成工作,但是很多朋友却不知道模板在,今天要给大家分享的是几款孩子走精美的思维导图,流程…

解决 List 执行 remove 时报异常 java.lang.UnsupportedOperationException

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到教程。 一、情况描述 报错如题: java.lang.UnsupportedOperationException: nullat java.util.Collections$UnmodifiableCollectio…

2020-3-19

题目一&#xff1a; js split() 分割字符串生成数组 let str"I am a student"; let arrstr.split(" "); for(let i0;i<arr.length;i){console.log(arr[i]); }分析&#xff1a;这里利用字符串的空格来分割字符串生成数组。split()方法的参数设置为"…

上班族怎么创业?白领一族创业当老板!

班族怎么创业?很多上班族无法面对每天平淡的生活&#xff0c;于是想要拥有一份属于自己的事业。上班族创业有哪些好的项目呢?结合自已的兴趣爱好&#xff0c;找到适合的项目&#xff0c;上班的同时也能当老板。 上班族怎么创业?创业项目1、开投资额小的特色店 尝试开店创业的…

一文告诉你 Event Loop 是什么?

Event Loop 也叫做“事件循环”&#xff0c;它其实与 JavaScript 的运行机制有关。 JS初始设计 JavaScript 在设计之初便是单线程&#xff0c;程序运行时&#xff0c;只有一个线程存在&#xff0c;在特定的时候只能有特定的代码被执行。这和 JavaScript 的用途有关&#xff0c;…

Spring Boot -Shiro配置多Realm

2019独角兽企业重金招聘Python工程师标准>>> 核心类简介 xxxToken&#xff1a;用户凭证 xxxFilter&#xff1a;生产token&#xff0c;设置登录成功&#xff0c;登录失败处理方法&#xff0c;判断是否登录连接等 xxxRealm&#xff1a;依据配置的支持Token来认证用户信…

idea工具debug断点红色变成灰色

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 没事别瞎点&#xff0c;禁用了断点当然不走了 转自&#xff1a;https://blog.csdn.net/anlve512/article/details/54583469

2020-3-20前端题目

题目一&#xff1a; 判断checked复选框是否有被选中 <!DOCTYPE html> <html> <head> <meta charset" utf-8"> <script> window.onload () > {let odivdocument.getElementById("ant");let ckdocument.getElementById(&…

上班族如何当老板 五大模式任你选

中国教育在线讯 辞职创业&#xff0c;还是维持现在稳定的工作?这个是很多上班族都纠结过的问题&#xff0c;一边是稳定的工作和收入&#xff0c;一边是创业当老板的诱惑&#xff0c;真是很难选择。 其实&#xff0c;如果安排合理是可以“鱼与熊掌”兼得的&#xff0c;沈阳市古…

利用 Linux tap/tun 虚拟设备写一个 ICMP echo 程序

利用 Linux tap/tun 虚拟设备写一个 ICMP echo 程序 前面两篇文章已经介绍过 tap/tun 的原理和配置工具。这篇文章通过一个编程示例来深入了解 tap/tun 的程序结构。 01 准备工作 首先通过 modinfo tun 查看系统内核是否支持 tap/tun 设备驱动。 Copy[rootby ~]# modinfo tun f…

2020-3-21

题目一&#xff1a; JavaScript 获取月份最后一天日期 月份最后一天日期可能是不同的&#xff0c;比如有的是30、有的是31还有的是28。 <!DOCTYPE html><html> <head> <meta charset" utf-8"> <script type"text/javascript"&…

正方形矩阵求对角线之和

nint(input()) a[] for i in range(n): #循环体里面加入input&#xff08;&#xff09;可以实现一共执行n次input&#xff08;&#xff09; lst[int(x) for x in input().split()]a.append(lst) #用列表解析&#xff0c;两层列表代表行列&#xff0c;很巧妙的方法 w0 bl…

解决: Unable to connect to zookeeper server within timeout: 5000

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 一个项目启动不起来了&#xff0c;报错如题&#xff1a; Caused by: org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to c…

闲钱请看如何处理

买一点基金定投。基金是专家帮你理财。基金的起始资金最低单笔是1000元,定投200元起投 买基金到银行或者基金公司都行。银行能代理很多基金公司的业务&#xff0c;具体开户找银行理财专柜办理。现在有些证券公司也有代理基金买卖的。在银行开通网上银行后网上购买一般收费上有优…

JAVA 数组元素的反转

package Code411;/*数组元素的反转本来[1,2,3,4]反转后[4,3,2,1]1.对称位置的元素交换2.对称位子需要两个索引3.int temp a&#xff1b;ab;btemp;4.什么时候停止交换&#xff08;1&#xff09;minmax (2)min>max */public class CodeArrayReverse { public static void m…

requests模块相关用法

requests模块 -1. 什么是requests模块- python原生的一个基于网络请求的模块&#xff0c;模拟浏览器发起请求。 -2. 为什么使用requests模块-1. 自动处理url编码-2. 自动处理post请求参数-3. 简化cookie和代理的操作-3. requests模块如何被使用安装&#xff1a; pip install re…

2020-3-22

题目一&#xff1a; JavaScript 天小时分钟和秒倒计时 代码与解析&#xff1a; <!DOCTYPE html> <html> <head> <meta charset" utf-8"> <style type"text/css"> *{margin:0;padding:0;list-style:none; } body{font-size:…