BlockingQueue详解(含动画演示)

目录

  • BlockingQueue详解
    • 0、BlockingQueue简介
      • BlockingQueue接口中方法注释
      • BlockingQueue的实现,总结计划
    • 1、ArrayBlockingQueue简介
    • 2、ArrayBlockingQueue的继承体系
    • 3、ArrayBlockingQueue的构造方法
      • ①、 `ArrayBlockingQueue(int capacity)`
      • ②、`ArrayBlockingQueue(int capacity, boolean fair)`
      • ③、`ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)`
    • 4、ArrayBlockingQueue的适用场景
      • ①、资源池管理
      • ②、多线程任务调度
      • ③、实现生产者-消费者模式
      • ④、使用有界队列
      • ArrayBlockingQueue 的简单使用代码示例:
    • 5、ArrayBlockingQueue的数据结构
      • 看下ArrayBlockingQueue类的部分属性
    • 6、ArrayBlockingQueue的`put`方法
    • 7、ArrayBlockingQueue的`take`方法
    • 8、ArrayBlockingQueue,take和put方法动画演示
    • 9、ArrayBlockingQueue的其他方法
      • `add(E e)`方法
      • `offer(E e)` 方法
      • `offer(E e, long timeout, TimeUnit unit) ` 方法
      • `poll()`方法
      • `poll(long timeout, TimeUnit unit)` 方法
      • `peek()`方法
      • `remove()`方法
      • `contains(Object o)`方法
      • `drainTo(Collection<? super E> c)`方法
    • 10、LinkedBlockingQueue简介和数据结构
      • LinkedBlockingQueue属性和构造函数
      • LinkedBlockingQueue的`take`和`put`方法
      • ArrayBlockingQueue和LinkedBlockingQueue的一些区别?
      • 为什么ArrayBlockingQueue不设计成读写锁分离的模式?
    • 11、其他的BlockingQueue实现
      • PriorityBlockingQueue 简单介绍
      • SynchronousQueue简单介绍
      • LinkedTransferQueue简单介绍
      • LinkedBlockingDeque简单介绍

BlockingQueue详解

总结到这儿,总感觉这集合的多线程味越来越重了~
为什么这样呢,因为 BlockingQueue和前面说过的CopyOnWriteArrayList、
ConcurrentHashMap都是 java.util.concurrent包下的。

这个包就是为了解决多线程的各种问题而设计的,所以java.util.concurrent包下的集合大都和多线程有关系。

0、BlockingQueue简介

阻塞队列的基本概念:
阻塞队列(Blocking Queue)是一种特殊的队列,支持两个特殊的操作:

  • ①、在队列为空时,获取元素的操作将会被阻塞,直到队列变为非空。
  • ②、在队列已满时,插入元素的操作将会被阻塞,直到队列不再是满的。

阻塞队列是线程安全的,且通常用于生产者-消费者模型中。
BlockingQueue 提供了四种不同类型的操作方式:抛出异常、特殊值、阻塞和超时。

BlockingQueue接口中方法注释

public interface BlockingQueue<E> extends Queue<E> {// 尝试添加元素到队列中,如果队列已满,则抛出 IllegalStateExceptionboolean add(E e);// 尝试添加元素到队列中,如果队列已满,则返回 falseboolean offer(E e);// 将元素添加到队列中,如果队列已满,则等待空间可用void put(E e) throws InterruptedException;// 尝试将元素添加到队列中,在指定的等待时间内,如果队列已满,则等待空间可用boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;// 获取并移除队列头部的元素,如果队列为空,则等待直到有元素可用E take() throws InterruptedException;// 尝试获取并移除队列头部的元素,在指定的等待时间内,如果队列为空,则返回 nullE poll(long timeout, TimeUnit unit) throws InterruptedException;// 返回队列剩余的容量int remainingCapacity();// 从队列中移除指定的元素boolean remove(Object o);// 检查队列中是否包含指定的元素boolean contains(Object o);// 从队列中移除所有可用的元素,并将它们添加到指定的集合中int drainTo(Collection<? super E> c);// 从队列中移除最多 maxElements 个可用的元素,并将它们添加到指定的集合中int drainTo(Collection<? super E> c, int maxElements);
}

知道了抽象层接口的规范后。再去看实现就有整体的把握了。

BlockingQueue的实现,总结计划

ArrayBlockingQueue 下面会详细讲解。
LinkedBlockingQueue 讲解一下底层数据结构和 take、put方法。
PriorityBlockingQueue 简单介绍。
SynchronousQueue简单介绍。
LinkedTransferQueue简单介绍。
LinkedBlockingDeque简单介绍。

1、ArrayBlockingQueue简介

ArrayBlockingQueue是JDK的JUC包下对阻塞队列的一种实现。

ArrayBlockingQueue的主要特性如下:

  • ①、有界:队列有固定的容量,容量在创建时指定,且不能改变。如果队列已满,插入操作将被阻塞,直到有空间可用。
  • ②、阻塞:队列支持阻塞的插入和移除操作。这意味着当队列满时,插入操作会等待,直到有空间可用;当队列空时,移除操作会等待,直到有元素可用。
  • ③、线程安全:内部使用可重入锁(ReentrantLock)和两个条件变量(notEmpty和notFull)来管理并发访问。

2、ArrayBlockingQueue的继承体系

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable 

在这里插入图片描述

可以看到ArrayBlockingQueue实现了Collection和Queue接口,是单值集合,具有队列的特性。
又实现了BlockingQueue接口,具有阻塞队列的特性。

3、ArrayBlockingQueue的构造方法

①、 ArrayBlockingQueue(int capacity)

这个构造方法创建一个具有指定容量的 ArrayBlockingQueue,其内部锁的公平性设置为 false。

/*** 创建一个具有指定容量的ArrayBlockingQueue,锁的公平性设置为false。** @param capacity 队列的容量* @throws IllegalArgumentException 如果容量小于等于0*/
public ArrayBlockingQueue(int capacity) {this(capacity, false);
}

②、ArrayBlockingQueue(int capacity, boolean fair)

这个构造方法创建一个具有指定容量的 ArrayBlockingQueue,并允许指定锁的公平性。如果 fair 为 true,锁将采用公平策略,即按顺序分配锁;如果 false,锁将采用非公平策略。

/*** 创建一个具有指定容量的ArrayBlockingQueue,并指定锁的公平性。** @param capacity 队列的容量* @param fair     指定锁的公平性,如果为true则锁是公平的* @throws IllegalArgumentException 如果容量小于等于0*/
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException(); // 如果容量小于等于0,抛出异常this.items = new Object[capacity]; // 初始化存储元素的数组lock = new ReentrantLock(fair);    // 创建一个可重入锁,并指定其公平性notEmpty = lock.newCondition();    // 创建一个条件变量,用于在队列不为空时通知notFull = lock.newCondition();     // 创建一个条件变量,用于在队列未满时通知
}

③、ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

这个构造方法创建一个具有指定容量和锁公平性的 ArrayBlockingQueue,并将指定集合中的元素添加到队列中。

/*** 创建一个具有指定容量和锁公平性的ArrayBlockingQueue,并将集合中的元素添加到队列中。** @param capacity 队列的容量* @param fair     指定锁的公平性,如果为true则锁是公平的* @param c        要添加到队列中的集合* @throws IllegalArgumentException 如果容量小于等于0或者集合中的元素数量超过队列容量* @throws NullPointerException     如果集合或其中任何一个元素为null*/
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {this(capacity, fair); // 调用前一个构造方法,初始化容量和锁的公平性final ReentrantLock lock = this.lock;lock.lock(); // 加锁以确保可见性,而不是互斥try {int i = 0;try {for (E e : c) {checkNotNull(e); // 检查元素是否为null,如果是则抛出NullPointerExceptionitems[i++] = e;  // 将元素添加到队列中}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException(); // 如果集合元素超过容量,抛出异常}count = i; // 设置队列中的元素数量putIndex = (i == capacity) ? 0 : i; // 设置放置下一个元素的索引} finally {lock.unlock(); // 释放锁}
}

4、ArrayBlockingQueue的适用场景

我们先看下这个集合的用途,然后简单的使用体验一下,之后再去看数据结构和一些方法的实现。

①、资源池管理

适用于实现资源池管理,例如数据库连接池。资源池中的资源可以放入 ArrayBlockingQueue,线程可以安全地获取和释放资源。

②、多线程任务调度

在多线程任务调度中,可以使用 ArrayBlockingQueue 来存放任务。工作线程从队列中取任务并执行,新的任务可以被安全地添加到队列中。

③、实现生产者-消费者模式

这个本质上也是多线程任务调度。

④、使用有界队列

当需要限制队列的最大容量时,ArrayBlockingQueue 是一个很好的选择。它在队列满时阻塞生产者,在队列空时阻塞消费者,从而有效地控制系统资源的使用。

还有哪些应用呢? 随便找个Spring项目,然后 Ctrl+鼠标左键点击JDK源码中ArrayBlockingQueue 的类名
就能看到了,可以挑几个感兴趣的研究研究,比如说 兔子消息队列的模板方法类 RabbitTemplate里面有用到ArrayBlockingQueue来实现同步的请求-回复机制。

在这里插入图片描述

ArrayBlockingQueue 的简单使用代码示例:

import java.util.concurrent.ArrayBlockingQueue;public class TestA {private static final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);public static void main(String[] args) {Thread producer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {blockingQueue.put(i);System.out.println("生产者生产了:" + i);}} catch (InterruptedException e) {e.printStackTrace();}}, "生产者");Thread consumer = new Thread(() -> {try {for (int i = 1; i <= 5; i++) {blockingQueue.take();System.out.println("消费者消费了:" + i);}} catch (InterruptedException e) {e.printStackTrace();}}, "消费者");producer.start();consumer.start();}}

执行结果:

生产者生产了:1
消费者消费了:1
生产者生产了:2
消费者消费了:2
生产者生产了:3
消费者消费了:3
生产者生产了:4
消费者消费了:4
生产者生产了:5
消费者消费了:5

是不是感觉变味了,明明在讲集合,咋又扯上多线程了。
因为JUC包里的集合基本上都是为了解决多线程问题而设计的,没办法,不扯多线程不行呀。

不过使用ArrayBlockingQueue 来实现生产者和消费者模型是真的简单呀,只需要利用put和take方法即可,什么线程安全问题,线程通信问题都被ArrayBlockingQueue 给解决了,这些内部的线程安全实现对使用者来说算是透明的,后面会分析这些方法是如何实现的。

5、ArrayBlockingQueue的数据结构

毫无疑问 又是Object[] 数组 。

看下ArrayBlockingQueue类的部分属性

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 存储队列元素的数组final Object[] items;// 下一个要取出、轮询、查看或移除的元素的索引int takeIndex;// 下一个要放置、提供或添加的元素的索引int putIndex;// 队列中的元素数量int count;// 主锁,用于保护所有访问final ReentrantLock lock;// 条件变量,用于等待取出操作private final Condition notEmpty;// 条件变量,用于等待放置操作private final Condition notFull;}

6、ArrayBlockingQueue的put方法

public void put(E e) throws InterruptedException {// 检查要添加的元素是否为空checkNotNull(e);// 获取队列的锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁// lock.lockInterruptibly();try {// 如果队列已满,则等待while (count == items.length)notFull.await();// 将元素加入队列enqueue(e);} finally {// 释放锁lock.unlock();}
}

注意: 当线程尝试通过lockInterruptibly()方法获取锁时,如果在此期间线程被其他线程中断(通过Thread.interrupt()方法),那么这个方法会立即抛出InterruptedException,从而允许线程优雅地处理中断,比如停止执行某些操作或清理资源。

enqueue方法

private void enqueue(E x) {  // 获取队列数组final Object[] items = this.items;// 将元素放入当前插入位置items[putIndex] = x;// 更新插入位置索引,如果到达数组末尾则回绕if (++putIndex == items.length)putIndex = 0;// 增加元素计数count++;// 唤醒等待“非空”条件的线程notEmpty.signal();
}

7、ArrayBlockingQueue的take方法

public E take() throws InterruptedException {// 获取ReentrantLock实例,lock是ArrayBlockingQueue类中的一个成员变量final ReentrantLock lock = this.lock;// 以中断方式获取锁,如果当前线程被中断则抛出InterruptedExceptionlock.lockInterruptibly();try {// 如果队列为空(count == 0),则等待notEmpty条件while (count == 0)notEmpty.await();// 从队列中移除并返回队头元素return dequeue();} finally {// 确保在任何情况下都释放锁,以避免死锁lock.unlock();}
}
private E dequeue() {// 获取存储队列元素的数组final Object[] items = this.items;// 获取takeIndex位置的元素,并将其强制转换为E类型@SuppressWarnings("unchecked")E x = (E) items[takeIndex];// 将取出位置的元素设为null,以便GC回收items[takeIndex] = null;// 增加takeIndex,如果到达数组末尾则回绕到开头if (++takeIndex == items.length)takeIndex = 0;// 递减count,表示队列中的元素数减少count--;// 如果有迭代器在迭代队列,则通知它们元素已被移除if (itrs != null)itrs.elementDequeued();// 唤醒等待队列非满的线程notFull.signal();// 返回取出的元素return x;
}

8、ArrayBlockingQueue,take和put方法动画演示

先看下面的代码示例:

import java.util.concurrent.ArrayBlockingQueue;public class TestA {public static void main(String[] args) {try {ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.put(1);blockingQueue.put(2);blockingQueue.put(3);System.out.println("主线程添加元素:1 2 3");// 使用新线程 t1 再添加 一个元素  由于阻塞队列满了 所以t1线程阻塞Thread t1 = new Thread(() -> {try {blockingQueue.put(4);System.out.println("t1线程添加元素:4");} catch (InterruptedException e) {e.printStackTrace();}});t1.start();Integer take1 = blockingQueue.take();// 主线程拿走一个元素System.out.println("主线程拿走元素:" + take1);t1.join(); // 等待t1 添加完成System.out.println("当前队列中元素:" + blockingQueue);blockingQueue.clear(); // 清空 阻塞队列System.out.println("==========队列已清空===========");// 使用新线程 t2 获取元素  由于阻塞队列为空 所以t2线程阻塞Thread t2 = new Thread(() -> {try {Integer take = blockingQueue.take();System.out.println("t2线程获取元素: " + take + "成功");} catch (InterruptedException e) {e.printStackTrace();}});t2.start();blockingQueue.put(5);System.out.println("主线程添加元素:5");t2.join();} catch (InterruptedException e) {e.printStackTrace();}}}

执行结果:
一定要结合代码的执行顺序看结果

主线程添加元素:1 2 3
主线程拿走元素:1
t1线程添加元素:4
当前队列中元素:[2, 3, 4]
==========队列已清空===========
主线程添加元素:5
t2线程获取元素: 5成功

下面用动画演示下

ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);

添加元素,把队列添加满的情况
在这里插入图片描述

下面再来看当队列满了之后再去添加元素的情况:

 // 使用新线程 t1 再添加 一个元素  由于阻塞队列满了 所以t1线程阻塞
Thread t1 = new Thread(() -> {try {blockingQueue.put(4);System.out.println("t1线程添加元素:4");} catch (InterruptedException e) {e.printStackTrace();}
});
t1.start();
Integer take1 = blockingQueue.take();// 主线程拿走一个元素

在这里插入图片描述

最后再看下队列清空后 执行take的情况:

   blockingQueue.clear(); // 清空 阻塞队列System.out.println("==========队列已清空===========");// 使用新线程 t2 获取元素  由于阻塞队列为空 所以t2线程阻塞Thread t2 = new Thread(() -> {try {Integer take = blockingQueue.take();System.out.println("t2线程获取元素: " + take + "成功");} catch (InterruptedException e) {e.printStackTrace();}});t2.start();blockingQueue.put(5);

在这里插入图片描述

9、ArrayBlockingQueue的其他方法

add(E e)方法

将元素添加到队列中,当队列已满时,抛出 IllegalStateException。

public boolean add(E e) {// 调用父类的add方法(实际就是调用下面重写的add方法)return super.add(e);
}public boolean add(E e) {// 调用offer方法 尝试将元素插入队列  if (offer(e))return true;else// 如果插入失败(队列已满),抛出IllegalStateExceptionthrow new IllegalStateException("Queue full");
}

offer(E e) 方法

将元素添加到队列中,如果队列已满,立即返回 false。

public boolean offer(E e) {// 检查传入的元素是否为null,如果为null,抛出NullPointerExceptioncheckNotNull(e);// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 如果队列已满,返回falseif (count == items.length)return false;else {// 将元素添加到队列enqueue(e);return true;}} finally {// 释放锁lock.unlock();}
}

offer(E e, long timeout, TimeUnit unit) 方法

offer方法的重载,带超时时间。
尝试将元素添加到队列中,如果队列已满,则等待指定的时间。
如果在等待时间内队列有空闲空间,则将元素添加到队列中并返回 true;如果等待超时,则返回 false。

public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {// 检查传入的元素是否为null,如果为null,抛出NullPointerExceptioncheckNotNull(e);// 将等待时间转换为纳秒long nanos = unit.toNanos(timeout);// 获取队列的重入锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁,如果当前线程在等待获取锁时被中断,则抛出InterruptedExceptionlock.lockInterruptibly();try {// 如果队列已满,进入等待状态,直到队列有空闲空间或等待超时while (count == items.length) {// 如果等待时间已到,返回falseif (nanos <= 0)return false;// 等待notFull条件变量,返回剩余的等待时间  = 传入的nanos - notFull条件实际等待的时间nanos = notFull.awaitNanos(nanos);}// 将元素添加到队列enqueue(e);return true;} finally {// 释放锁lock.unlock();}
}

poll()方法

从队列中移除并返回队头元素,如果队列为空,返回 null。

public E poll() {// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 如果队列为空,返回null,否则移除并返回队头元素return (count == 0) ? null : dequeue();} finally {// 释放锁lock.unlock();}
}

poll(long timeout, TimeUnit unit) 方法

从队列中移除并返回队头元素。如果队列为空,则等待指定的时间。
如果在等待时间内队列有元素入队,则返回队头元素;如果等待超时,则返回 null。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 将等待时间转换为纳秒long nanos = unit.toNanos(timeout);// 获取队列的重入锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁,如果当前线程在等待获取锁时被中断,则抛出InterruptedExceptionlock.lockInterruptibly();try {// 如果队列为空,进入等待状态,直到队列有元素或等待超时while (count == 0) {// 如果等待时间已到,返回nullif (nanos <= 0)return null;// 等待notEmpty条件变量,返回剩余的等待时间  = 传入的nanos - notFull条件实际等待的时间nanos = notEmpty.awaitNanos(nanos);}// 从队列中移除并返回队头元素return dequeue();} finally {// 释放锁lock.unlock();}
}

peek()方法

返回队头元素但不移除它,如果队列为空,返回 null。

public E peek() {// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 返回队头元素但不移除它,如果队列为空,返回nullreturn itemAt(takeIndex);} finally {// 释放锁lock.unlock();}
}final E itemAt(int i) {// 获取底层数组中相应索引位置的元素return (E) items[i];}

remove()方法

从队列中移除并返回队头元素,如果队列为空,抛出 NoSuchElementException。

public E remove() {// 尝试移除并返回队头元素E x = poll();// 如果队头元素不为空,返回该元素if (x != null)return x;else// 如果队列为空,抛出NoSuchElementExceptionthrow new NoSuchElementException();
}

contains(Object o)方法

检查队列中是否包含指定元素。

public boolean contains(Object o) {// 如果传入的对象为null,返回falseif (o == null) return false;// 获取队列的数组final Object[] items = this.items;// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 如果队列中有元素,检查每个元素是否与传入的对象相等if (count > 0) {// 获取当前插入位置索引final int putIndex = this.putIndex;// 从队列的读取位置开始检查int i = takeIndex;do {// 如果找到相等的元素,返回trueif (o.equals(items[i]))return true;// 更新检查位置索引if (++i == items.length)i = 0;} while (i != putIndex);}// 如果未找到相等的元素,返回falsereturn false;} finally {// 释放锁lock.unlock();}
}

drainTo(Collection<? super E> c)方法

将队列中的所有元素转移到指定的集合中。
此操作是一个批量操作,试图一次性地清空队列并把队列中所有元素转移到指定的集合中。

public int drainTo(Collection<? super E> c) {// 调用重载方法return drainTo(c, Integer.MAX_VALUE);}public int drainTo(Collection<? super E> c, int maxElements) {// 检查目标集合是否为null,如果为null,抛出NullPointerExceptioncheckNotNull(c);// 检查目标集合是否为队列本身,如果是,抛出IllegalArgumentExceptionif (c == this)throw new IllegalArgumentException();// 如果maxElements小于等于0,直接返回0if (maxElements <= 0)return 0;// 获取队列的重入锁final ReentrantLock lock = this.lock;// 获取锁lock.lock();try {// 计算实际要转移的元素数量int n = Math.min(maxElements, count);// 将元素从队列中转移到目标集合for (int i = 0; i < n; i++) {c.add(this.dequeue());}// 返回实际转移的元素数量return n;} finally {// 释放锁lock.unlock();}
}

10、LinkedBlockingQueue简介和数据结构

LinkedBlockingQueue属性和构造函数

看下面两个类属性就知道了
  transient Node<E> head;private transient Node<E> last;

LinkedBlockingQueue 是一个基于链表实现的阻塞队列。

再看下构造函数:

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 实例。
public LinkedBlockingQueue() {// 调用带有容量参数的构造函数,默认容量为Integer.MAX_VALUEthis(Integer.MAX_VALUE);
}// 创建一个指定容量的 LinkedBlockingQueue 实例。
public LinkedBlockingQueue(int capacity) {// 检查容量是否大于0,否则抛出IllegalArgumentExceptionif (capacity <= 0) throw new IllegalArgumentException();// 初始化队列容量this.capacity = capacity;// 初始化头节点和尾节点为哨兵节点,不存储实际数据last = head = new Node<E>(null);
}// 使用给定的集合创建一个 LinkedBlockingQueue 实例,并将集合中的元素添加到队列中。
public LinkedBlockingQueue(Collection<? extends E> c) {// 调用带有容量参数的构造函数,默认容量为Integer.MAX_VALUEthis(Integer.MAX_VALUE);// 获取插入锁final ReentrantLock putLock = this.putLock;// 获取锁以确保可见性putLock.lock();try {int n = 0;// 遍历集合中的每个元素for (E e : c) {// 如果元素为null,抛出NullPointerExceptionif (e == null)throw new NullPointerException();// 如果元素数量达到容量,抛出IllegalStateExceptionif (n == capacity)throw new IllegalStateException("Queue full");// 将元素包装成节点并添加到队列中enqueue(new Node<E>(e));++n;}// 更新队列中元素的数量count.set(n);} finally {// 释放锁putLock.unlock();}
}

LinkedBlockingQueue的takeput方法

首先先明确一点LinkedBlockingQueue的读锁和写锁是分开的,这点和ArrayBlockingQueue不同。ArrayBlockingQueue读写用的都是同一个锁。
LinkedBlockingQueue 使用了读写分离锁,即 takeLock 和 putLock,来分别控制读取和插入操作。这种设计可以减少锁竞争,提高并发性能。

private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();

take方法

public E take() throws InterruptedException {E x; // 存储从队列中取出的元素int c = -1; // 用于记录当前队列元素数量final AtomicInteger count = this.count; // 队列中元素的数量  缓存成员变量到局部变量final ReentrantLock takeLock = this.takeLock; // 读取操作的锁takeLock.lockInterruptibly(); // 获取读取锁,可被中断try {// 当队列为空时,等待notEmpty条件while (count.get() == 0) {notEmpty.await(); // 等待队列非空}x = dequeue(); // 从队列中取出一个元素c = count.getAndDecrement(); // 获取并递减当前队列中的元素数量// 如果队列中还有剩余元素,唤醒其他等待线程if (c > 1)notEmpty.signal(); // 唤醒其他等待的读取操作} finally {takeLock.unlock(); // 释放读取锁}// 如果取出元素后,队列变得不满,唤醒等待的插入操作if (c == capacity)signalNotFull(); // 通知等待的插入操作return x; // 返回取出的元素
}

take方法步骤说明:

  • 获取 takeLock 锁,确保只有一个线程可以执行读取操作。
  • 如果队列为空,等待 notEmpty 条件。
  • 从队列中取出一个元素,并递减元素数量。
  • 如果队列中仍有元素,唤醒其他等待的读取操作。
  • 释放 takeLock 锁。
  • 如果队列之前已满,现在有空闲,唤醒等待的插入操作。
  • 返回取出的元素。

put方法

public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException(); // 检查插入元素是否为nullint c = -1; // 用于记录当前队列元素数量Node<E> node = new Node<E>(e); // 创建一个新的节点包装插入的元素final ReentrantLock putLock = this.putLock; // 插入操作的锁final AtomicInteger count = this.count; // 队列中元素的数量  缓存成员变量到局部变量putLock.lockInterruptibly(); // 获取插入锁,可被中断try {// 当队列已满时,等待notFull条件while (count.get() == capacity) {notFull.await(); // 等待队列非满}enqueue(node); // 将新节点插入到队列中c = count.getAndIncrement(); // 获取并递增当前队列中的元素数量// 如果插入后队列仍未满,唤醒其他等待线程if (c + 1 < capacity)notFull.signal(); // 唤醒其他等待的插入操作} finally {putLock.unlock(); // 释放插入锁}// 如果插入元素前队列为空,唤醒等待的读取操作if (c == 0)signalNotEmpty(); // 通知等待的读取操作
}

put方法步骤说明:

  • 检查插入的元素是否为 null,如果是,则抛出 NullPointerException。
  • 创建一个新节点包装插入的元素。
  • 获取 putLock 锁,确保只有一个线程可以执行插入操作。
  • 如果队列已满,等待 notFull 条件。
  • 将新节点插入到队列中,并递增元素数量。
  • 如果插入后队列仍未满,唤醒其他等待的插入操作。
  • 释放 putLock 锁。
  • 如果插入元素前队列为空,现在有元素,唤醒等待的读取操作。

注意: 队列中元素数量使用AtomicInteger保证原子性。同时缓存成员变量到局部变量有一些好处。
例如可以提高代码的可读性和可维护性,同时也有助于性能优化和确保线程安全的引用。

  final AtomicInteger count = this.count; // 队列中元素的数量

并且源码中对于上面这段代码不在lock锁范围内使用有一段注释:
关于count的非保护使用: 注释指出,尽管count变量没有直接由putLock保护(即在读取count时没有加锁),这种做法在这里是安全的。原因是当线程持有putLock时,其他试图插入或者移除元素的线程都被阻塞了,因此count的值只可能因为当前线程或其他线程从等待中被唤醒并完成了插入或移除操作而改变。这意味着,即使在等待期间检查count的值,也不会因为并发修改而导致不一致。

ArrayBlockingQueue和LinkedBlockingQueue的一些区别?

特性ArrayBlockingQueueLinkedBlockingQueue
底层数据结构数组链表
有界/无界有界(在构造时指定固定容量)可以有界(指定容量)也可以无界(默认)
锁机制单一锁(用于 put 和 take 操作)读写锁分离(分别用于 put 和 take 操作)
性能在单线程或低并发场景中性能较好,因为锁开销较少在高并发场景中性能较好,因为读写操作分离减少了锁竞争
内存开销固定内存开销(数组大小)潜在的更高内存开销(链表节点)
容量扩展不支持动态扩展,容量固定支持动态扩展(如果无界)
适用场景适用于固定大小的队列和低并发场景适用于需要动态扩展的队列和高并发场景

为什么ArrayBlockingQueue不设计成读写锁分离的模式?

我觉得有如下原因:

  1. 数据结构导致的设计差异

ArrayBlockingQueue:
使用数组作为底层数据结构。
需要在固定大小的数组中进行元素的插入和删除操作,这些操作涉及到对数组索引的维护和管理。
队列满或空时,需要阻塞插入或删除操作,涉及到队列头尾指针的调整。
如果使用读写锁分离,需要考虑并处理队列头尾指针调整的线程安全问题,设计起来比较复杂。

LinkedBlockingQueue:
使用链表作为底层数据结构。
插入和删除操作只需调整链表的头尾指针,不涉及到数组索引的管理。
链表的结构使得读写操作更容易分离,适合使用不同的锁进行管理。

  1. 性能和设计权衡
    ArrayBlockingQueue是针对固定大小的队列,设计目标就是简单、高效。
    ArrayBlockingQueue也并非不能设计成读写锁模式,只是投入和回报比不成正比。读写锁的引入会增加复杂度,性能提升可能有限,不一定值得。

11、其他的BlockingQueue实现

PriorityBlockingQueue 简单介绍

PriorityBlockingQueue 是一个基于优先级的无界阻塞队列。

继承体系

public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable

在这里插入图片描述
数据结构:
底层数据结构:PriorityBlockingQueue 基于一个可调整大小的数组实现的二叉小顶堆(最小堆),它使用数组来表示二叉堆。后面会再写一篇PriorityQueue详解的文章详细介绍其内部数据结构(二叉小顶堆)。

优先级排序:
队列中的元素按照自然顺序(通过实现 Comparable 接口)或通过提供的比较器(Comparator)进行排序。元素的优先级决定了它们在队列中的顺序。

适用场景:
**任务调度:**适用于需要按照优先级处理任务的场景。
路径查找: 在图算法中,优先队列通常用于实现最短路径查找算法,如 Dijkstra 算法。
**多线程环境:**需要在多线程环境中进行优先级调度的场景。

SynchronousQueue简单介绍

SynchronousQueue 是 Java 并发包中的一种特殊类型的队列。

继承体系

public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable

在这里插入图片描述
特点:
无存储元素: SynchronousQueue 内部并不存储任何元素,即使是空间大小为 0。
直接传输: 它主要用于线程之间直接传输元素,生产者线程将元素直接交给消费者线程,而不会将元素存储在队列中。
阻塞操作: SynchronousQueue 的插入和移除操作是阻塞的。
公平性: 它是一个公平的队列,采用公平的顺序来处理元素的访问。
容量: 容量为 0,即只能容纳一个正在进行交换的元素。

适用场景
直接传输: 适用于需要在生产者和消费者之间进行直接传输的场景,例如线程池任务调度等。
流水线处理: 适用于将生产者生成的数据直接传输给消费者处理的情况,避免数据存储和额外的线程开销。
同步控制: 在并发编程中,用于线程同步和控制流量的传输。

LinkedTransferQueue简单介绍

LinkedTransferQueue 是 Java 并发包中的一个特殊类型的队列,它结合了队列(Queue)和传输队列(TransferQueue)的特性。

继承体系

public class LinkedTransferQueue<E> extends AbstractQueue<E>implements TransferQueue<E>, java.io.Serializable

在这里插入图片描述
数据结构:
链表

特点
无界队列: LinkedTransferQueue 是一个无界队列,可以动态增长以容纳更多的元素。
插入和移除操作: 支持常规的插入(offer、add)和移除(poll、take)操作。
传输操作: 支持直接的元素传输操作,即使队列为空也可以进行元素传输。
阻塞和非阻塞操作: 提供了阻塞和非阻塞的插入和移除方法,以及等待传输的操作。
公平性: 在处理元素时采用公平的顺序,即按照等待时间长短处理。

适用场景
生产者-消费者模式: 适用于多线程环境下的生产者和消费者模式,支持高并发的元素传输和处理。
任务调度: 在任务调度器中,可以使用 LinkedTransferQueue 来管理和调度任务,实现任务之间的依赖和传递。
优先级处理: 可以基于元素的属性或优先级进行传输和处理,支持优先级队列的特性。

LinkedBlockingDeque简单介绍

LinkedBlockingDeque 是 Java 并发包中的双向阻塞队列,结合了双向队列(Deque)和阻塞队列(BlockingDeque)的特性。

继承体系

public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, java.io.Serializable

在这里插入图片描述
数据结构:
双向链表

特点
双向队列: LinkedBlockingDeque 是一个双向队列,支持在队列两端(头部和尾部)进行插入和移除操作。
阻塞操作: 队列的插入和移除操作是阻塞的,即当队列为空或已满时,插入和移除操作会阻塞调用线程,直到有可用的空间或元素。
无界队列: 与 LinkedBlockingQueue 类似,LinkedBlockingDeque 也是一个无界队列,可以动态增长以容纳更多的元素。
线程安全: LinkedBlockingDeque 是线程安全的,支持多个线程同时进行插入、移除和访问操作。
公平性: 使用公平的顺序来处理等待队列的线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/857424.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机组成原理 —— 存储系统(概述)

计算机组成原理 —— 存储系统&#xff08;概述&#xff09; 存储系统按层次划分按照存储介质分类按照存储方式分类按照信息可更改性分类根据信息的可保存性分类存储器性能指标 我们今天来学习计算机组成原理中的存储系统&#xff1a; 存储系统 存储系统是计算机系统中用于存…

vue实现的商品列表网页

一、商品列表效果如下 二、代码&#xff1b; vue实现的商品列表网页 &#xff0c; 图片在vue项目的Public文件夹里的 imgs中 <template><div class"common-layout"><!-- el-container:外层容器。 当子元素中包含 <el-header> 或 <el-foo…

mysql:简单理解mysql mvcc的可重复读

# 原理 假设有这样的sql begin select&#xff08;或update、insert、delete&#xff09; ... commit当执行【begin】的时候&#xff0c;标记有一个新事务要开始&#xff0c;但是事务还没有真正开始&#xff0c;事务id还没有产生当执行事务里面的第一个sql语句时&#xff08;…

java之url任意跳转漏洞

1 漏洞介绍 URLRedirect url重定向漏洞也称url任意跳转漏洞&#xff0c;网站信任了用户的输入导致恶意攻击&#xff0c;url重定向主要用来钓鱼&#xff0c;比如url跳转中最常见的跳转在登陆口&#xff0c;支付口&#xff0c;也就是一旦登陆将会跳转任意自己构造的网站&#xf…

Xshell7免费版下载安装使用

​一、下载安装​ 1.打开官网下载 https://www.xshell.com/zh/free-for-home-school/ 2.选择合适的下载路径&#xff0c;点击下载按钮&#xff0c;然后按照提示完成安装。 二、Xshell7的使用&#xff0c;Xhell连接Linux 1.连接之前&#xff0c;确保在Linux中开启SSH。参考&a…

YOLOv8中的C2f模块

文章目录 一、结构概述二、模块功能 一、结构概述 C2f块:首先由一个卷积块(Conv)组成&#xff0c;该卷积块接收输入特征图并生成中间特征图特征图拆分:生成的中间特征图被拆分成两部分&#xff0c;一部分直接传递到最终的Concat块&#xff0c;另一部分传递到多个Botleneck块进…

QT基础 - 文本文件读写

目录 零. 前言 一.读取文件 二. 写入文件 三. 和二进制读写的区别 零. 前言 在 Qt 中&#xff0c;对文本文件进行读写操作是常见的任务之一。这对于保存和加载配置信息、处理数据文件等非常有用。 Qt 提供了多种方式来读写文本文件&#xff0c;使得文件操作变得相对简单和…

SwiftUI 6.0(iOS 18)ScrollView 全新的滚动位置(ScrollPosition)揭秘

概览 在只有方寸之间大小的手持设备上要想体面的向用户展示海量信息&#xff0c;滚动视图&#xff08;ScrollView&#xff09;无疑是绝佳的“东牀之选”。 在 SwiftUI 历史的长河中&#xff0c;总觉得苹果对于 ScrollView 视图功能的升级是在“挤牙膏”。这不&#xff0c;在本…

spire.Pdf 将pdf转成image

一、nuget安装 <ItemGroup><PackageReference Include"Spire.PDF" Version"10.6.7" /></ItemGroup> 二、直接上代码 using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using System; using System.IO;namespace …

乱弹篇(35)掩耳盗铃与两三十年

成语“ 掩耳盗铃 ”&#xff0c;比喻自己欺骗自己&#xff0c;明明是掩盖不住的事情偏要想法子掩盖。它多用来讽刺那些做事不想让别人知道&#xff0c;却偏偏又引起他人注意的人。 现在网络上以新浪微博和邪恶的“800727”为典型的自媒体平台和其掌控者&#xff0c;就是现代版…

提升研发效率:三品PLM解决方案在汽车汽配行业的实践

随着全球汽车市场的快速发展&#xff0c;中国汽车汽配行业迎来了前所未有的发展机遇。然而&#xff0c;在这一过程中&#xff0c;企业也面临着诸多挑战&#xff0c;如研发能力的提升、技术资料管理的复杂性、以及跨部门协作的困难等。为了应对这些挑战&#xff0c;三品产品生命…

模式分解算法-满足3NF的无损且保持函数依赖的分解算法、满足BCNF的无损连接分解算法

一、引言 1、对指定的关系模式&#xff0c;若范式级别较低&#xff0c;为第一范式或第二范式&#xff0c;由于存在数据冗余或更新异常问题&#xff0c;在实际中一般是不可用的&#xff0c;关系模式的规范化就是将满足低一级的关系模式分解为若干满足高一级范式的关系模式的集合…

Python轻松设置Excel单元格数字显示格式

Excel作为强大的数据处理与分析工具&#xff0c;不仅能够存储大量数据&#xff0c;还支持复杂的数据处理与可视化功能。而如何恰当地展示Excel表格中的数据是Excel文件制作的关键之一。这便涉及到Excel单元格数字格式的设置。数字格式不仅关乎数据的美学呈现&#xff0c;如货币…

聊聊 C# dynamic 类型,并分享一个将 dynamic 类型变量转为其它类型的技巧和实例

前言 dynamic 是一种有别于传统变量类型的动态类型声明&#xff0c;刚开始接触可能在理解上会有些困难&#xff0c;可以简单地把它理解为一个盲盒&#xff0c;你可以任意猜测盒子有什么东西&#xff0c;并认为这些东西真正存在而进行处理&#xff0c;等到真正打开时&#xff0…

网上的流量卡真的可以免费领取吗?

网上的流量卡真的可以免费领取吗&#xff1f;当然可以&#xff0c;目前运营商推出的流量卡都是可以免费领取的。 有很多朋友私信给小编&#xff0c;听说流量卡是免费领取的就觉得不太靠谱&#xff0c;其实这种想法是不对的&#xff0c;首先大家要换位思考一下&#xff0c;如果我…

Ubuntu配置ssh+vnc(完整版)

Ubuntu配置sshvnc&#xff08;完整版&#xff09; 1 配置ssh 1. 安装openssh-server&#xff0c;配置开机自启 # 更新包 sudo apt-get update # 安装openssh-server sudo apt-get install -y openssh-server # 启动服务 sudo service ssh start # 配置开机自启 sudo systemc…

细说MCU定时器模块的输入捕捉功能的实现方法

目录 一、工程背景 二、建立工程 1、配置GPIO 2、选择时钟源和Debug 3、 配置定时器TIM1 4、 配置定时器TIM13 5、配置串口 6、配置中断 7、配置系统时钟 三、代码修改 1、使能TIM1输入捕捉功能和TIM3的PWM输出功能 2、自定义变量 3、重定义回调函数 4、输出到…

【国际化I18n使用方法】vue2使用i18简单实现多语种切换,刷新保持,动态数据处理

效果图 使用流程 总结就是&#xff0c;安装好插件后&#xff0c;配置几个语言的js文件&#xff0c;每个词都要在每个js内写一遍对应的语言&#xff0c;然后通过切换js文件拿到对应的语言&#xff0c;实现翻译的效果。然后当前使用什么语言保存到本地&#xff0c;这样刷新就可以…

【进阶篇-Day4:使用JAVA编写石头迷阵游戏】

目录 1、绘制界面2、打乱石头方块3、移动业务4、游戏判定胜利5、统计步数6、重新游戏7、完整代码&#xff1a; 1、绘制界面 上述思路是&#xff1a;使用一个二维数组存放图片的编号&#xff0c;然后在后持遍历即可获取对应的图片。 代码如下&#xff1a; package com.itheima.s…

取代煤气灶,新能源电燃灶真有那么牛吗

在当今社会&#xff0c;能源问题日益凸显&#xff0c;能源危机的警钟不断敲响。与此同时&#xff0c;人们对于生活品质和安全环保的要求也越来越高。在这样的背景下&#xff0c;一种号称能取代燃气灶和电磁炉的新能源电燃灶——华火电燃灶进入了人们的视野。 华火电燃灶的出现似…