阻塞队列BlockingQueue实战及其原理分析

1. 阻塞队列介绍

1.1 队列

  • 是限定在一端进行插入,另一端进行删除的特殊线性表。
  • 先进先出(FIFO)线性表。
  • 允许出队的一端称为队头,允许入队的一端称为队尾。

数据结构演示网站:https://www.cs.usfca.edu/~galles/visualization/Algorithms.html

Queue接口

public interface Queue<E> extends Collection<E> {//添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常boolean add(E e);//添加一个元素,添加成功返回true, 如果队列满了,返回falseboolean offer(E e);//返回并删除队首元素,队列为空则抛出异常E remove();//返回并删除队首元素,队列为空则返回nullE poll();//返回队首元素,但不移除,队列为空则抛出异常E element();//获取队首元素,但不移除,队列为空则返回nullE peek();}

1.2 阻塞队列

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

BlockingQueue接口

方法

抛出异常

返回特定值

阻塞

阻塞特定时间

入队

add(e)

offer(e)

put(e)

offer(e, time, unit)

出队

remove()

poll()

take()

poll(time, unit)

获取队首元素

element()

peek()

不支持

不支持

应用场景

阻塞队列在实际应用中有很多场景,以下是一些常见的应用场景:

1.线程池

线程池中的任务队列通常是一个阻塞队列。当任务数超过线程池的容量时,新提交的任务将被放入任务队列中等待执行。线程池中的工作线程从任务队列中取出任务进行处理,如果队列为空,则工作线程会被阻塞,直到队列中有新的任务被提交。

2.生产者-消费者模型

在生产者-消费者模型中,生产者向队列中添加元素,消费者从队列中取出元素进行处理。阻塞队列可以很好地解决生产者和消费者之间的并发问题,避免线程间的竞争和冲突。

3.消息队列

消息队列使用阻塞队列来存储消息,生产者将消息放入队列中,消费者从队列中取出消息进行处理。消息队列可以实现异步通信,提高系统的吞吐量和响应性能,同时还可以将不同的组件解耦,提高系统的可维护性和可扩展性。

4.缓存系统

缓存系统使用阻塞队列来存储缓存数据,当缓存数据被更新时,它会被放入队列中,其他线程可以从队列中取出最新的数据进行使用。使用阻塞队列可以避免并发更新缓存数据时的竞争和冲突。

5.并发任务处理

在并发任务处理中,可以将待处理的任务放入阻塞队列中,多个工作线程可以从队列中取出任务进行处理。使用阻塞队列可以避免多个线程同时处理同一个任务的问题,并且可以将任务的提交和执行解耦,提高系统的可维护性和可扩展性。

总之,阻塞队列在实际应用中有很多场景,它可以帮助我们解决并发问题,提高程序的性能和可靠性。

1.3 JUC包下的阻塞队列

BlockingQueue 接口的实现类都被放在了 juc 包中,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理却是类似的。

队列

描述

ArrayBlockingQueue

基于数组结构实现的一个有界阻塞队列

LinkedBlockingQueue

基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列

PriorityBlockingQueue

支持按优先级排序的无界阻塞队列

DelayQueue

基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列

SynchronousQueue

不存储元素的阻塞队列

LinkedTransferQueue

基于链表结构实现的一个无界阻塞队列

LinkedBlockingDeque

基于链表结构实现的一个双端阻塞队列

https://www.processon.com/view/link/618ce3941e0853689b0818e2

2. ArrayBlockingQueue

ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。ArrayBlockingQueue可以用于实现数据缓存、限流、生产者-消费者模式等各种应用。

在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。

2.1 ArrayBlockingQueue使用

BlockingQueue queue = new ArrayBlockingQueue(1024);queue.put("1"); //向队列中添加元素Object object = queue.take(); //从队列中取出元素

2.2 ArrayBlockingQueue的原理

ArrayBlockingQueue使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

数据结构

利用了Lock锁的Condition通知机制进行阻塞控制。

核心:一把锁,两个条件

//数据元素数组final Object[] items;//下一个待取出元素索引int takeIndex;//下一个待添加元素索引int putIndex;//元素个数int count;//内部锁final ReentrantLock lock;//消费者private final Condition notEmpty;//生产者private final Condition notFull;  public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {...lock = new ReentrantLock(fair); //公平,非公平notEmpty = lock.newCondition();notFull =  lock.newCondition();}

入队put方法

public void put(E e) throws InterruptedException {//检查是否为空checkNotNull(e);final ReentrantLock lock = this.lock;//加锁,如果线程中断抛出异常lock.lockInterruptibly();try {//阻塞队列已满,则将生产者挂起,等待消费者唤醒//设计注意点: 用while不用if是为了防止虚假唤醒while (count == items.length)notFull.await(); //队列满了,使用notFull等待(生产者阻塞)// 入队enqueue(e);} finally {lock.unlock(); // 唤醒消费者线程}}private void enqueue(E x) {final Object[] items = this.items;//入队   使用的putIndexitems[putIndex] = x;if (++putIndex == items.length)putIndex = 0;  //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部count++;//notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了notEmpty.signal();}

思考: 为什么ArrayBlockingQueue对数组操作要设计成双指针?

使用双指针的好处在于可以避免数组的复制操作。如果使用单指针,每次删除元素时需要将后面的元素全部向前移动,这样会导致时间复杂度为 O(n)。而使用双指针,我们可以直接将 takeIndex 指向下一个元素,而不需要将其前面的元素全部向前移动。同样地,插入新的元素时,我们可以直接将新元素插入到 putIndex 所指向的位置,而不需要将其后面的元素全部向后移动。这样可以使得插入和删除的时间复杂度都是 O(1) 级别,提高了队列的性能。

出队take方法

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁,如果线程中断抛出异常lock.lockInterruptibly();try {//如果队列为空,则消费者挂起while (count == 0)notEmpty.await();//出队return dequeue();} finally {lock.unlock();// 唤醒生产者线程}}private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex]; //取出takeIndex位置的元素items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部count--;if (itrs != null)itrs.elementDequeued();//notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位notFull.signal();return x;}

3. LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

3.1 LinkedBlockingQueue使用

//指定队列的大小创建有界队列BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);//无界队列BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();

3.2 LinkedBlockingQueue原理

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

数据结构

// 容量,指定容量就是有界队列private final int capacity;// 元素数量private final AtomicInteger count = new AtomicInteger();// 链表头  本身是不存储任何元素的,初始化时item指向nulltransient Node<E> head;// 链表尾private transient Node<E> last;// take锁   锁分离,提高效率private final ReentrantLock takeLock = new ReentrantLock();// notEmpty条件// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒private final Condition notEmpty = takeLock.newCondition();// put锁private final ReentrantLock putLock = new ReentrantLock();// notFull条件// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒private final Condition notFull = putLock.newCondition();//典型的单链表结构static class Node<E> {E item;  //存储元素Node<E> next;  //后继节点    单链表结构Node(E x) { item = x; }}

构造器

public LinkedBlockingQueue() {// 如果没传容量,就使用最大int值初始化其容量this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;// 初始化head和last指针为空值节点last = head = new Node<E>(null);}

入队put方法

public void put(E e) throws InterruptedException {    // 不允许null元素if (e == null) throw new NullPointerException();int c = -1;// 新建一个节点Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 使用put锁加锁putLock.lockInterruptibly();try {// 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)while (count.get() == capacity) {notFull.await();}  // 队列不满,就入队enqueue(node);c = count.getAndIncrement();// 队列长度加1,返回原值// 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)// 这里为啥要唤醒一下呢?// 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock(); // 真正唤醒生产者线程}  // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程if (c == 0)signalNotEmpty();}private void enqueue(Node<E> node) {// 直接加到last后面,last指向入队元素last = last.next = node;}    private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();// 加take锁try {  notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程} finally {takeLock.unlock();  // 真正唤醒消费者线程}}

出队take方法

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 使用takeLock加锁takeLock.lockInterruptibly();try {// 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)while (count.get() == 0) {notEmpty.await();}// 否则,出队x = dequeue();c = count.getAndDecrement();//长度-1,返回原值if (c > 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理notEmpty.signal();} finally {takeLock.unlock(); // 真正唤醒消费者线程}// 为什么队列是满的才唤醒阻塞在notFull上的线程呢?// 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,// 这也是锁分离带来的代价// 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程if (c == capacity)signalNotFull();return x;}private E dequeue() {// head节点本身是不存储任何元素的// 这里把head删除,并把head下一个节点作为新的值// 并把其值置空,返回原来的值Node<E> h = head;Node<E> first = h.next;h.next = h; // 方便GChead = first;E x = first.item;first.item = null;return x;}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程} finally {putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程}}

3.3 LinkedBlockingQueue与ArrayBlockingQueue对比

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:

  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
  • 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

4. SynchronousQueue

SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。

如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

4.1 应用场景

SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。

SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

4.2 SynchronousQueue使用

#构建同步队列SynchronousQueue<Integer> queue = new SynchronousQueue<>();

需要注意的是,SynchronousQueue 的使用需要谨慎,因为它非常容易导致死锁,如果没有恰当地设计和同步生产者和消费者线程,可能会造成程序无法继续执行。因此,在使用 SynchronousQueue 时要注意线程同步和错误处理。

死锁场景示例

public class SynchronousQueueDeadlockDemo {public static void main(String[] args) {final SynchronousQueue<Integer> queue = new SynchronousQueue<>();Thread thread1 = new Thread(() -> {try {// 线程1尝试将数据放入队列int data = 42;queue.put(data);System.out.println("线程1放入数据:" + data);// 接着,线程1尝试从队列中获取数据,但此时没有其他线程来获取int result = queue.take();System.out.println("线程1获取数据:" + result);} catch (InterruptedException e) {e.printStackTrace();}});Thread thread2 = new Thread(() -> {try {// 接着,线程2尝试将数据放入队列,但此时没有其他线程来获取int result = 100;queue.put(result);System.out.println("线程2放入数据:" + result);// 线程2尝试从队列中获取数据,但此时没有数据可用int data = queue.take();System.out.println("线程2获取数据:" + data);} catch (InterruptedException e) {e.printStackTrace();}});thread1.start();thread2.start();}}

5. PriorityBlockingQueue

PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,数组的默认长度是11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。

优先级队列PriorityQueue: 队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。

5.1 应用场景

电商抢购活动,会员级别高的用户优先抢购到商品

银行办理业务,vip客户插队

5.2 PriorityBlockingQueue使用

//创建优先级阻塞队列  Comparator为null,自然排序PriorityBlockingQueue<Integer> queue=new PriorityBlockingQueue<Integer>(5);//自定义ComparatorPriorityBlockingQueue queue=new PriorityBlockingQueue<Integer>(5, new Comparator<Integer>() {@Overridepublic int compare(Integer o1, Integer o2) {return o2-o1;}});

思考:如何实现一个优先级队列?

5.3 如何构造优先级队列

使用普通线性数组(无序)来表示优先级队列

  • 执行插入操作时,直接将元素插入到数组末端,需要的成本为O(1),
  • 获取优先级最高元素,我们需要遍历整个线性队列,匹配出优先级最高元素,需要的成本为o(n)
  • 删除优先级最高元素,我们需要两个步骤,第一找出优先级最高元素,第二步删除优先级最高元素,然后将后面的元素依次迁移,填补空缺,需要的成本为O(n)+O(n)=O(n)

使用一个按顺序排列的有序向量实现优先级队列

  • 获取优先级最高元素,O(1)
  • 删除优先级最高元素,O(1)
  • 插入一个元素,需要两个步骤,第一步我们需要找出要插的位置,这里我们可以使用二分查找,成本为O(logn),第二步是插入元素之后,将其所有后继进行后移操作,成本为O(n),所有总成本为O(logn)+O(n)=O(n)

二叉堆

完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。

二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:

大顶堆和小顶堆。

  • 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;
  • 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。

最小堆演示:https://www.cs.usfca.edu/~galles/visualization/Heap.html

6. DelayQueue

DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。

它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:

public interface Delayed extends Comparable<Delayed> {//getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,//如果返回 0 或者负数则代表任务已过期。//元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。long getDelay(TimeUnit unit);}

6.1 DelayQueue使用

DelayQueue 实现延迟订单

在实现一个延迟订单的场景中,我们可以定义一个 Order 类,其中包含订单的基本信息,例如订单编号、订单金额、订单创建时间等。同时,我们可以让 Order 类实现 Delayed 接口,重写 getDelay 和 compareTo 方法。在 getDelay 方法中,我们可以计算订单的剩余延迟时间,而在 compareTo 方法中,我们可以根据订单的延迟时间进行比较。

下面是一个简单的示例代码,演示了如何使用 DelayQueue 来实现一个延迟订单的场景:

public class DelayQueueExample {public static void main(String[] args) throws InterruptedException {DelayQueue<Order> delayQueue = new DelayQueue<>();// 添加三个订单,分别延迟 5 秒、2 秒和 3 秒delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));delayQueue.put(new Order("order2", System.currentTimeMillis(), 2000));delayQueue.put(new Order("order3", System.currentTimeMillis(), 3000));// 循环取出订单,直到所有订单都被处理完毕while (!delayQueue.isEmpty()) {Order order = delayQueue.take();System.out.println("处理订单:" + order.getOrderId());}}static class  Order implements Delayed{private String orderId;private long createTime;private long delayTime;public Order(String orderId, long createTime, long delayTime) {this.orderId = orderId;this.createTime = createTime;this.delayTime = delayTime;}public String getOrderId() {return orderId;}@Overridepublic long getDelay(TimeUnit unit) {long diff = createTime + delayTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);return Long.compare(diff, 0);}}}

由于每个订单都有不同的延迟时间,因此它们将会按照延迟时间的顺序被取出。当延迟时间到达时,对应的订单对象将会被从队列中取出,并被处理。

6.2 DelayQueue原理

数据结构

//用于保证队列操作的线程安全private final transient ReentrantLock lock = new ReentrantLock();// 优先级队列,存储元素,用于保证延迟低的优先执行private final PriorityQueue<E> q = new PriorityQueue<E>();// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程private Thread leader = null;// 条件,用于表示现在是否有可取的元素   当新元素到达,或新线程可能需要成为leader时被通知private final Condition available = lock.newCondition();public DelayQueue() {}public DelayQueue(Collection<? extends E> c) {this.addAll(c);}

入队put方法

public void put(E e) {offer(e);}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {// 入队q.offer(e);if (q.peek() == e) {// 若入队的元素位于队列头部,说明当前元素延迟最小// 将 leader 置空leader = null;// available条件队列转同步队列,准备唤醒阻塞在available上的线程available.signal();}return true;} finally {lock.unlock(); // 解锁,真正唤醒阻塞的线程}}

出队take方法

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)   if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。else {long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间             if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素return q.poll();// 如果delay大于0 ,则下面要阻塞了// 将first置为空方便gcfirst = null;// 如果有线程争抢的Leader线程,则进行无限期等待。if (leader != null)available.await();else {// 如果leader为null,把当前线程赋值给它Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待剩余等待时间available.awaitNanos(delay);} finally {// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素if (leader == thisThread)leader = null;}}}}} finally {// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程if (leader == null && q.peek() != null)// available条件队列转同步队列,准备唤醒阻塞在available上的线程available.signal();// 解锁,真正唤醒阻塞的线程lock.unlock();}}
  1. 当获取元素时,先获取到锁对象。
  2. 获取最早过期的元素,但是并不从队列中弹出元素。
  3. 最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。
  4. 如果最早过期的元素不为空
  5. 获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素
  6. 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。
  7. 最后将Leader线程设置为空
  8. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

7. 如何选择适合的阻塞队列

7.1 选择策略

通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:

功能

第 1 个需要考虑的就是功能层面,比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。

容量

第 2 个需要考虑的是容量,或者说是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的,如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。

能否扩容

第 3 个需要考虑的是能否扩容。因为有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。

内存结构

第 4 个需要考虑的点就是内存结构。我们分析过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。

性能

第 5 点就是从性能的角度去考虑。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。

7.2 线程池对于阻塞队列的选择

线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。

Executors类下的线程池类型:

  • FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
  • CachedThreadPool 选取的是 SynchronousQueue
  • ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/890785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hadoop搭建

前言 一般企业中不会使用master slave01 slave02来命名 vmware创建虚拟机 打开vmware软件&#xff0c;新建虚拟机 典型 稍后安装系统 选择centos7 虚拟机名称和安放位置自行选择&#xff08;最小化安装消耗空间较少&#xff09; 默认磁盘大小即可 自定义硬件 选择centos7的i…

测试 - 1 ( 9000 字详解 )

一&#xff1a; 测试入门 测试是指运用特定的方法、手段或工具&#xff0c;对某一对象进行验证、检查或评估&#xff0c;判断其是否符合预期标准或目标。例如&#xff0c;修理好一盏灯后通过按开关测试其是否正常工作&#xff1b;通过一次数学测验评估学生对代数知识的掌握程度…

【MATLAB第110期】#保姆级教学 | 基于MATLAB的PAWN全局敏感性分析方法(无目标函数)含特征变量置信区间分析

【MATLAB第110期】#保姆级教学 | 基于MATLAB的PAWN全局敏感性分析方法&#xff08;无目标函数&#xff09;含特征变量置信区间分析 一、介绍 PAWN&#xff08;Probabilistic Analysis With Numerical Uncertainties&#xff09;是一种基于密度的全局敏感性分析&#xff08;Gl…

DX12 快速教程(2) —— 渲染天蓝色窗口

快速导航 新建项目 "002-DrawSkyblueWindow"DirectX 12 入门1. COM 技术&#xff1a;DirectX 的中流砥柱什么是 COM 技术COM 智能指针 2.创建 D3D12 调试层设备&#xff1a;CreateDebugDevice什么是调试层如何创建并使用调试层 3.创建 D3D12 设备&#xff1a;CreateD…

【合作原创】使用Termux搭建可以使用的生产力环境(八)

前言 在上一篇【合作原创】使用Termux搭建可以使用的生产力环境&#xff08;七&#xff09;-CSDN博客中我们讲到了安装百度网盘、VS Code还有java&#xff0c;这篇我打算讲一下最后的编程&#xff0c;还有输入法相关问题解决。众所周知我的本职工作是Java程序猿&#xff0c;因…

VLMs之Gemma 2:PaliGemma 2的简介、安装和使用方法、案例应用之详细攻略

VLMs之Gemma 2&#xff1a;PaliGemma 2的简介、安装和使用方法、案例应用之详细攻略 导读&#xff1a;2024年12月4日&#xff0c;PaliGemma 2是一个基于Gemma 2系列语言模型的开源视觉语言模型 (VLM) 家族。PaliGemma 2 通过提供一个规模化、多功能且开源的VLM家族&#xff0c;…

24.12.26 SpringMVCDay01

SpringMVC 也被称为SpringWeb Spring提供的Web框架,是在Servlet基础上,构建的框架 SpringMVC看成是一个特殊的Servlet,由Spring来编写的Servlet 搭建 引入依赖 <dependency><groupId>org.springframework</groupId><artifactId>spring-webmvc<…

国产 HighGo 数据库企业版安装与配置指南

国产 HighGo 数据库企业版安装与配置指南 1. 下载安装包 访问 HighGo 官方网站&#xff08;https://www.highgo.com/&#xff09;&#xff0c;选择并下载企业版安装包。 2. 上传安装包到服务器 将下载的安装包上传至服务器&#xff0c;并执行以下命令&#xff1a; [rootmas…

Java程序设计,使用属性的选项库,轻松实现商品检索的复杂查询(上)

一、背景 本文我们以某商城的商品检索为例,说一说如何使用属性及选项,实现复杂的逻辑表达式的查询。 先贴图,总结出业务需求。 可以通过一系列属性及选项的组合,过滤出用户想要的商品列表。 1、属性 上文中的品牌、分类、屏幕尺寸、CPU型号、运行内存、机身内存、屏幕材…

机器学习(二)-简单线性回归

文章目录 1. 简单线性回归理论2. python通过简单线性回归预测房价2.1 预测数据2.2导入标准库2.3 导入数据2.4 划分数据集2.5 导入线性回归模块2.6 对测试集进行预测2.7 计算均方误差 J2.8 计算参数 w0、w12.9 可视化训练集拟合结果2.10 可视化测试集拟合结果2.11 保存模型2.12 …

WHAT KAN I SAY?Kolmogorov-Arnold Network (KAN)网络结构介绍及实战(文末送书)

一、KAN网络介绍 1.1 Kolmogorov-Arnold Network (KAN)网络结构的提出 2024年4月&#xff0c;来自MIT、加州理工学院、东北大学等团队的研究&#xff0c;引爆了一整个科技圈&#xff1a;Yes We KAN&#xff01; 这种创新方法挑战了多层感知器(Multilayer Perceptron&#xff…

YOLO11改进-模块-引入星型运算Star Blocks

当前网络设计中&#xff0c;“星型运算”&#xff08;逐元素乘法&#xff09;的应用原理未被充分探究&#xff0c;潜力有待挖掘。为解决此问题&#xff0c;我们引入 Star Blocks&#xff0c;其内部由 DW - Conv、BN、ReLU 等模块经星型运算连接&#xff0c;各模块有特定参数。同…

3.银河麒麟V10 离线安装Nginx

1. 下载nginx离线安装包 前往官网下载离线压缩包 2. 下载3个依赖 openssl依赖&#xff0c;前往 官网下载 pcre2依赖下载&#xff0c;前往Git下载 zlib依赖下载&#xff0c;前往Git下载 下载完成后完整的包如下&#xff1a; 如果网速下载不到请使用网盘下载 通过网盘分享的文件…

【理解机器学习中的过拟合与欠拟合】

在机器学习中&#xff0c;模型的表现很大程度上取决于我们如何平衡“过拟合”和“欠拟合”。本文通过理论介绍和代码演示&#xff0c;详细解析过拟合与欠拟合现象&#xff0c;并提出应对策略。主要内容如下&#xff1a; 什么是过拟合和欠拟合&#xff1f; 如何防止过拟合和欠拟…

【婚庆摄影小程序设计与实现】

摘 要 社会发展日新月异&#xff0c;用计算机应用实现数据管理功能已经算是很完善的了&#xff0c;但是随着移动互联网的到来&#xff0c;处理信息不再受制于地理位置的限制&#xff0c;处理信息及时高效&#xff0c;备受人们的喜爱。所以各大互联网厂商都瞄准移动互联网这个潮…

12.26 学习卷积神经网路(CNN)

完全是基于下面这个博客来进行学习的&#xff0c;感谢&#xff01; ​​【深度学习基础】详解Pytorch搭建CNN卷积神经网络LeNet-5实现手写数字识别_pytorch cnn-CSDN博客 基于深度神经网络DNN实现的手写数字识别&#xff0c;将灰度图像转换后的二维数组展平到一维&#xff0c;…

Unity URP多光源支持,多光源阴影投射,多光源阴影接收(优化版)

目录 前言&#xff1a; 一、属性 二、SubShader 三、ForwardLitPass 定义Tags 声明变体 声明变量 定义结构体 顶点Shader 片元Shader 四、全代码 四、添加官方的LitShader代码 五、全代码 六、效果图 七、结语 前言&#xff1a; 哈喽啊&#xff0c;我又来啦。这…

如何使用React,透传各类组件能力/属性?

在23年的时候&#xff0c;我主要使用的框架还是Vue&#xff0c;当时写了一篇“如何二次封装一个Vue3组件库&#xff1f;”的文章&#xff0c;里面涉及了一些如何使用Vue透传组件能力的方法。在我24年接触React之后&#xff0c;我发现这种扩展组件能力的方式有一个专门的术语&am…

109.【C语言】数据结构之求二叉树的高度

目录 1.知识回顾&#xff1a;高度&#xff08;也称深度&#xff09; 2.分析 设计代码框架 返回左右子树高度较大的那个的写法一:if语句 返回左右子树高度较大的那个的写法二:三目操作符 3.代码 4.反思 问题 出问题的代码 改进后的代码 执行结果 1.知识回顾&#xf…

分析排名靠前的一些自媒体平台,如何运用这些平台?

众所周知&#xff0c;现在做网站越来越难了&#xff0c;主要的原因还是因为流量红利时代过去了。并且搜索引擎都在给自己的平台做闭环改造。搜索引擎的流量扶持太低了。如百度投资知乎&#xff0c;给知乎带来很多流量扶持&#xff0c;也为自身内容不足做一个填补。 而我们站长…