java 多线程阻塞队列 与 阻塞方法与和非阻塞方法

Queue是什么

队列,是一种数据结构。除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的。无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的。在FIFO队列中,所有新元素都插入队列的末尾。队列都是线程安全的,内部已经实现安全措施,不用我们担心

 

Queue中的方法

Queue中的方法不难理解,6个,每2对是一个也就是总共3对。看一下JDK API就知道了:

注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议。



使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

一.几种主要的阻塞队列

自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

  ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

  LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

  PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

  DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。


二.阻塞队列中的方法 VS 非阻塞队列中的方法
1.非阻塞队列中的几个主要方法:
add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
peek():获取队首元素,若成功,则返回队首元素;否则返回null

对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

//在这篇笔记中没有介绍非阻塞队列,大部分阻塞队列都可以有非阻塞方法和阻塞方法
2.阻塞队列中的几个主要方法:
阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。除此之外,阻塞队列提供了另外4个非常有用的方法:
put(E e)
take()
offer(E e,long timeout, TimeUnit unit)
poll(long timeout, TimeUnit unit)
put方法用来向队尾存入元素,如果队列满,则等待;
take方法用来从队首取元素,如果队列为空,则等待;
offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;
 

注意:

1、必须要使用take()方法在获取的时候达成阻塞结果
2、使用poll()方法将产生非阻塞效果


三.阻塞队列的实现原理
前面谈到了非阻塞队列和阻塞队列中常用的方法,下面来探讨阻塞队列的实现原理,本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。
首先看一下ArrayBlockingQueue类中的几个成员变量:

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. private static final long serialVersionUID = -817911632652898426L;
  4. /** The queued items */
  5. private final E[] items;
  6. /** items index for next take, poll or remove */
  7. private int takeIndex;
  8. /** items index for next put, offer, or add. */
  9. private int putIndex;
  10. /** Number of items in the queue */
  11. private int count;
  12. /*
  13. * Concurrency control uses the classic two-condition algorithm
  14. * found in any textbook.
  15. */
  16. /** Main lock guarding all access */
  17. private final ReentrantLock lock;
  18. /** Condition for waiting takes */
  19. private final Condition notEmpty;
  20. /** Condition for waiting puts */
  21. private final Condition notFull;
  22. }

可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。
lock是一个可重入锁,notEmpty和notFull是等待条件。
下面看一下ArrayBlockingQueue的构造器,构造器有三个重载版本:
  1. public ArrayBlockingQueue(int capacity) {
  2. }
  3. public ArrayBlockingQueue(int capacity, boolean fair) {
  4.  
  5. }
  6. public ArrayBlockingQueue(int capacity, boolean fair,
  7.                           Collection<? extends E> c) {
  8. }

第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。
然后看它的两个关键方法的实现:put()和take():
  1. public void put(E e) throws InterruptedException {
  2.     if (e == null) throw new NullPointerException();
  3.     final E[] items = this.items;
  4.     final ReentrantLock lock = this.lock;
  5.     lock.lockInterruptibly();
  6.     try {
  7.         try {
  8.             while (count == items.length)
  9.                 notFull.await();
  10.         } catch (InterruptedException ie) {
  11.             notFull.signal(); // propagate to non-interrupted thread
  12.             throw ie;
  13.         }
  14.         insert(e);
  15.     } finally {
  16.         lock.unlock();
  17.     }
  18. }

从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。
当被其他线程唤醒时,通过insert(e)方法插入元素,最后解锁。
我们看一下insert方法的实现:
  1. private void insert(E x) {
  2.     items[putIndex] = x;
  3.     putIndex = inc(putIndex);
  4.     ++count;
  5.     notEmpty.signal();
  6. }

它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。
下面是take()方法的实现:
  1. public E take() throws InterruptedException {
  2.     final ReentrantLock lock = this.lock;
  3.     lock.lockInterruptibly();
  4.     try {
  5.         try {
  6.             while (count == 0)
  7.                 notEmpty.await();
  8.         } catch (InterruptedException ie) {
  9.             notEmpty.signal(); // propagate to non-interrupted thread
  10.             throw ie;
  11.         }
  12.         E x = extract();
  13.         return x;
  14.     } finally {
  15.         lock.unlock();
  16.     }
  17. }

跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过extract方法取得元素,下面是extract方法的实现:
  1. private E extract() {
  2.     final E[] items = this.items;
  3.     E x = items[takeIndex];
  4.     items[takeIndex] = null;
  5.     takeIndex = inc(takeIndex);
  6.     --count;
  7.     notFull.signal();
  8.     return x;
  9. }

跟insert方法也很类似。
其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。


四.示例和使用场景
下面先使用Object.wait()和Object.notify()、非阻塞队列实现生产者-消费者模式:
  1. public class Test {
  2.     private int queueSize = 10;
  3.     private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
  4.  
  5.     public static void main(String[] args)  {
  6.         Test test = new Test();
  7.         Producer producer = test.new Producer();
  8.         Consumer consumer = test.new Consumer();
  9.  
  10.         producer.start();
  11.         consumer.start();
  12.     }
  13.  
  14.     class Consumer extends Thread{
  15.  
  16.         @Override
  17.         public void run() {
  18.             consume();
  19.         }
  20.  
  21.         private void consume() {
  22.             while(true){
  23.                 synchronized (queue) {
  24.                     while(queue.size() == 0){
  25.                         try {
  26.                             System.out.println("队列空,等待数据");
  27.                             queue.wait();
  28.                         } catch (InterruptedException e) {
  29.                             e.printStackTrace();
  30.                             queue.notify();
  31.                         }
  32.                     }
  33.                     queue.poll();          //每次移走队首元素
  34.                     queue.notify();
  35.                     System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
  36.                 }
  37.             }
  38.         }
  39.     }
  40.  
  41.     class Producer extends Thread{
  42.  
  43.         @Override
  44.         public void run() {
  45.             produce();
  46.         }
  47.  
  48.         private void produce() {
  49.             while(true){
  50.                 synchronized (queue) {
  51.                     while(queue.size() == queueSize){
  52.                         try {
  53.                             System.out.println("队列满,等待有空余空间");
  54.                             queue.wait();
  55.                         } catch (InterruptedException e) {
  56.                             e.printStackTrace();
  57.                             queue.notify();
  58.                         }
  59.                     }
  60.                     queue.offer(1);        //每次插入一个元素
  61.                     queue.notify();
  62.                     System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
  63.                 }
  64.             }
  65.         }
  66.     }
  67. }

这个是经典的生产者-消费者模式,通过阻塞队列和Object.wait()和Object.notify()实现,wait()和notify()主要用来实现线程间通信。
具体的线程间通信方式(wait和notify的使用)在后续问章中会讲述到。
下面是使用阻塞队列实现的生产者-消费者模式:

  1. public class Test {
  2. private int queueSize = 10;
  3. private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
  4. public static void main(String[] args) {
  5. Test test = new Test();
  6. Producer producer = test.new Producer();
  7. Consumer consumer = test.new Consumer();
  8. producer.start();
  9. consumer.start();
  10. }
  11. class Consumer extends Thread{
  12. @Override
  13. public void run() {
  14. consume();
  15. }
  16. private void consume() {
  17. while(true){
  18. try {
  19. queue.take();
  20. System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }
  27. class Producer extends Thread{
  28. @Override
  29. public void run() {
  30. produce();
  31. }
  32. private void produce() {
  33. while(true){
  34. try {
  35. queue.put(1);
  36. System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }
  42. }
  43. }


 有没有发现,使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。
在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。
阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。


转载于:https://www.cnblogs.com/signheart/p/6606475.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/393399.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

批量移动AD用户到指定OU

作为域管理员&#xff0c;在日常工作中使用ADUC&#xff08;AD用户和计算机&#xff09;工具在图形界面中进行账号管理操作可谓是家常便饭了。然而一个个增加、移动、删除用户&#xff0c;这样操作有时真的够烦&#xff0c;当管理大批量的账户时&#xff0c;重复操作浪费的时间…

vs 编译说明

静态编译/MT&#xff0c;/MTD 是指使用libc和msvc相关的静态库(lib)。动态编译&#xff0c;/MD&#xff0c;/MDd是指用相应的DLL版本编译。其中字母含义 d&#xff1a;debug m&#xff1a;multi-threading(多线程) t&#xff1a;text代码 d&#xff1a;dynamic(动态)…

python numeric_Python pandas.to_numeric函数方法的使用

pandas.to_numeric(arg, errorsraise, downcastNone) [source]将参数转换为数字类型。默认返回dtype为float64或int64&#xff0c; 具体取决于提供的数据。使用downcast参数获取其他dtype。请注意&#xff0c;如果传入非常大的数字&#xff0c;则可能会导致精度损失。由…

javascript 分号_让我们谈谈JavaScript中的分号

javascript 分号要使用它们&#xff0c;还是不使用它们… (To use them, or not to use them…) Semicolons in JavaScript divide the community. Some prefer to use them always, no matter what. Others like to avoid them.JavaScript中的分号分隔社区。 有些人更喜欢始终…

leetcode436. 寻找右区间(二分法)

给定一组区间&#xff0c;对于每一个区间 i&#xff0c;检查是否存在一个区间 j&#xff0c;它的起始点大于或等于区间 i 的终点&#xff0c;这可以称为 j 在 i 的“右侧”。 对于任何区间&#xff0c;你需要存储的满足条件的区间 j 的最小索引&#xff0c;这意味着区间 j 有最…

python篇第6天【数据类型】

Python有五个标准的数据类型&#xff1a;Numbers&#xff08;数字&#xff09;String&#xff08;字符串&#xff09;List&#xff08;列表&#xff09;Tuple&#xff08;元组&#xff09;Dictionary&#xff08;字典&#xff09;Python数字数字数据类型用于存储数值。他们是不…

如何确定Ionic是否适合您的项目

by Simon Grimm西蒙格里姆(Simon Grimm) 如何确定Ionic是否适合您的项目 (How to find out if Ionic is the right choice for your project) Ionic has been around for quite some years. With the latest release of version 4, it has become an even better option for d…

二维数组的查找 java_查找二维数组java的总和

我正在一个项目中&#xff0c;我必须读取文件并将内容输入2D数组。然后&#xff0c;我必须对每一行&#xff0c;每一列和矩阵的周长求和。到目前为止&#xff0c;除外围功能外&#xff0c;我一切正常。我正在尝试为两个外部列的顶行&#xff0c;底行和中间创建单独的for循环。矩…

递归法解决兔子问题

记得以前過相似问题&#xff0c;今天有同事问道&#xff0c;竟然不知所答&#xff0c;故写篇文章以记之。 一般而言&#xff0c;兔子在出生两个月后&#xff0c;就有繁殖能力&#xff0c;一对兔子每个月能生出一对小兔子来。如果所有兔子都不死&#xff0c;那么若干月以后可以繁…

mysql本地连接错误解决办法

今天公司同事在测试服务器上死活不能用一个账号在本地登陆,但是远程就可以,于是我帮忙看了下,测试服务器的IP是10.10.2.226,错误如下:linux-0fdr:/home1/mysql_data # mysql -h 10.10.2.226 -u jxq2 -pjxq2ERROR 1045 (28000): Access denied for user jxq2linux-0fdr (using p…

leetcode546. 移除盒子(dp)

给出一些不同颜色的盒子&#xff0c;盒子的颜色由数字表示&#xff0c;即不同的数字表示不同的颜色。 你将经过若干轮操作去去掉盒子&#xff0c;直到所有的盒子都去掉为止。每一轮你可以移除具有相同颜色的连续 k 个盒子&#xff08;k > 1&#xff09;&#xff0c;这样一轮…

408. Valid Word Abbreviation

题目&#xff1a; Given a non-empty string s and an abbreviation abbr, return whether the string matches with the given abbreviation. A string such as "word" contains only the following valid abbreviations: ["word", "1ord", &qu…

oracle常用操作指令

登录oracle用户: sqlplus 用户名/密码 创建用户&#xff1a;create user 要创建的用户名 identified by 当前用户名; 授权&#xff1a;grant resource,connect to 要授权的用户名; 删除用户&#xff1a;drop user 用户名 创建表&#xff1a; create table student( id n…

java接收二进制数据_java-从套接字读取二进制数据

我正在尝试连接到服务器,然后向其发送HTTP请求(在这种情况下为GET).这个想法是请求一个文件,然后从服务器接收它.它应同时适用于文本文件和二进制文件(例如imgs).我对文本文件没有任何问题,它可以完美工作,但是对二进制文件有一些麻烦.首先,我声明一个BufferedReader(用于读取标…

web开发入门_Web开发人员和设计师的自由职业入门

web开发入门Learn how to get started with freelancing as a web developer and designer. Cara Bell shares lessons and tips she has learned from her years as a freelancer.了解如何以网络开发人员和设计师的身份开始自由职业。 卡拉贝尔(Cara Bell)分享了她从自由职业者…

leetcode1343. 大小为 K 且平均值大于等于阈值的子数组数目(队列)

给你一个整数数组 arr 和两个整数 k 和 threshold 。 请你返回长度为 k 且平均值大于等于 threshold 的子数组数目。 示例 1&#xff1a; 输入&#xff1a;arr [2,2,2,2,5,5,5,8], k 3, threshold 4 输出&#xff1a;3 解释&#xff1a;子数组 [2,5,5],[5,5,5] 和 [5,5,8…

二分查找递归和非递归方法分析

递归实现&#xff1a; 自己写的递归&#xff1a;多一个赋值操作&#xff0c;虽然可以得到正确的结果。但是比较难以理解。 问题&#xff1a;没有深刻理解递归返回值。return会在递归调用到最后&#xff0c;在递归结束的地方&#xff0c;会将返回值一层一层返回给方法&#xff0…

BaseYii_autoload

BaseYii_autoload 判断是否是classMap还是命名空间的 然后 转换成 绝对路径 include 文件  public static function autoload($className){      //classMap 一般都是类库 官方 或者自定义类映射 if (isset(static::$classMap[$className])) {$classFile static::$cla…

sasl java_javaSASL_SSL帐号密码方式访问kafka

java SASL_SSL帐号密码 方式访问 kafkaProducer Java Sample java生产者:Properties props new Properties();props.put("bootstrap.servers","*******:9092,*******:9092");props.put("acks", "all");//props.put("retries&quo…

RedHat5.2下Linux Oracle 10g ASM 安装详细实录-第二篇-ASM安装

五、安装ASM 1、在oracle网站下载支持包&#xff1a;http://www.oracle.com/technology ... x/asmlib/rhel5.html 2、根据linux内核下载相应的asm安装包:根据uname –a查看内核&#xff08;黄底红字为内核&#xff09;&#xff1a;$ uname -aLinux L-DB-3-6 2.6.18-92.el5 #1 S…