Java高手的30k之路|面试宝典|精通BlockingQueue常用实现类

BlockingQueue

BlockingQueue 是 Java 并发包 (java.util.concurrent) 提供的一个接口,用于支持线程安全的队列操作。它特别适用于生产者-消费者模式,提供了阻塞的插入和移除操作。当队列为空时,获取操作将阻塞等待;当队列已满时,插入操作将阻塞等待。

主要特点

  1. 线程安全
    • BlockingQueue 实现了线程安全的队列操作,确保在多线程环境下的安全性。
  2. 阻塞操作
    • 提供了阻塞的插入、移除和检查操作,适用于需要等待队列变为非空或非满的场景。
  3. 可选超时
    • 可以设置操作的超时时间,当超时到达后操作会自动放弃。

常用实现类

  1. ArrayBlockingQueue
    • 基于数组的有界阻塞队列,容量固定,插入和移除操作需要锁定整个队列。
  2. LinkedBlockingQueue
    • 基于链表的阻塞队列,默认容量为 Integer.MAX_VALUE,可以指定容量,插入和移除操作锁定独立节点。
  3. PriorityBlockingQueue
    • 基于优先级堆的无界阻塞队列,元素按优先级排序。
  4. DelayQueue
    • 基于优先级堆的无界阻塞队列,元素带有延迟时间,只有延迟期满时才能从队列中取出。
  5. SynchronousQueue
    • 没有容量的阻塞队列,每个插入操作必须等待相应的移除操作,反之亦然。

ArrayBlockingQueue

ArrayBlockingQueue 是 Java 并发包 (java.util.concurrent) 中的一个线程安全的有界阻塞队列实现。它使用数组作为内部数据结构,并支持在多线程环境下进行高效的生产者-消费者模式操作。

主要特点

  1. 有界队列

    • ArrayBlockingQueue 是有界的,在初始化时需要指定队列的容量。当队列满时,插入操作会被阻塞,直到有空间可用。
  2. 线程安全

    • 通过内部锁(ReentrantLock)和条件变量(Condition)实现线程安全的入队和出队操作。
  3. 先进先出 (FIFO)

    • 队列遵循先进先出 (FIFO) 原则,先插入的元素先被移除。

使用场景

ArrayBlockingQueue 适用于以下场景:

  1. 生产者-消费者模式

    • 生产者线程向队列中添加元素,消费者线程从队列中取元素。通过阻塞机制,可以有效协调生产者和消费者的速度。
  2. 限流和缓冲

    • 可以用来实现限流,防止生产者生产速度过快导致系统过载。也可以用作缓冲区,在多个线程之间传递数据。
  3. 多线程环境中的任务调度

    • 适用于需要在多个线程之间调度任务的场景,如线程池。

示例代码

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Main {public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);// 生产者线程Thread producer = new Thread(() -> {try {for (int i = 0; i < 20; i++) {queue.put(i);System.out.println("Produced: " + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者线程Thread consumer = new Thread(() -> {try {while (true) {Integer value = queue.take();System.out.println("Consumed: " + value);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}});producer.start();consumer.start();producer.join();consumer.join();}
}

性能分析

  • 插入和删除操作:插入 (put) 和删除 (take) 操作的时间复杂度为 O(1),在绝大多数情况下具有很好的性能。
  • 阻塞操作:当队列为空时,消费者线程会被阻塞;当队列满时,生产者线程会被阻塞。这种阻塞机制有效地控制了线程间的协作。
  • 内存开销:由于使用数组作为内部存储结构,内存开销较小。但是需要在初始化时指定容量,并且容量不能动态扩展。

注意事项

  1. 容量限制

    • ArrayBlockingQueue 是有界队列,初始化时必须指定容量。如果容量过小,可能会导致生产者频繁阻塞;如果容量过大,可能会占用过多内存。
  2. 阻塞操作

    • 阻塞操作会使线程进入等待状态,在使用时需要考虑到可能的线程阻塞问题,防止发生死锁或线程饥饿。
  3. 性能

    • 由于内部使用锁机制,ArrayBlockingQueue 在高并发情况下可能会有一定的性能开销。对于无界队列或者需要更高并发性能的场景,可以考虑使用 ConcurrentLinkedQueue

总结

ArrayBlockingQueue 是一个适用于多线程环境中的有界阻塞队列,提供了高效的生产者-消费者模式实现。它通过先进先出 (FIFO) 原则和内部锁机制,保证了队列操作的线程安全性。适用于需要限流、缓冲和多线程任务调度的场景。

Condition

条件变量 (Condition) 是 Java 中用于线程间协调的重要机制之一。它是 Java 并发包 (java.util.concurrent) 中 Lock 接口的一个子接口,允许线程在特定条件下等待和唤醒。条件变量与传统的监视器方法 (wait, notify, notifyAll) 类似,但提供了更高的灵活性和功能。

主要特点

  1. 与锁配合使用

    • Condition 必须与 Lock 一起使用,一个 Condition 实例总是绑定到一个特定的 Lock 实例。通常是通过 Lock 实例的 newCondition() 方法来创建 Condition 实例。
  2. 等待和唤醒机制

    • Condition 提供了 await() 方法,使线程可以等待特定的条件满足。
    • signal()signalAll() 方法用于唤醒等待条件的一个或所有线程。

使用场景

条件变量适用于以下场景:

  1. 生产者-消费者模式

    • 生产者线程向缓冲区添加数据,消费者线程从缓冲区取数据。当缓冲区满时,生产者等待;当缓冲区空时,消费者等待。
  2. 线程同步

    • 适用于需要线程按照某种顺序执行的场景。

示例代码

以下是一个简单的生产者-消费者模式的示例,展示了 Condition 的使用:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ProducerConsumer {private static final int BUFFER_SIZE = 10;private final int[] buffer = new int[BUFFER_SIZE];private int count = 0;private int putIndex = 0;private int takeIndex = 0;private final Lock lock = new ReentrantLock();private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();public void produce(int value) throws InterruptedException {lock.lock();try {while (count == BUFFER_SIZE) {notFull.await();}buffer[putIndex] = value;putIndex = (putIndex + 1) % BUFFER_SIZE;count++;notEmpty.signal();} finally {lock.unlock();}}public int consume() throws InterruptedException {lock.lock();try {while (count == 0) {notEmpty.await();}int value = buffer[takeIndex];takeIndex = (takeIndex + 1) % BUFFER_SIZE;count--;notFull.signal();return value;} finally {lock.unlock();}}public static void main(String[] args) {ProducerConsumer pc = new ProducerConsumer();// Producer threadnew Thread(() -> {try {for (int i = 0; i < 20; i++) {pc.produce(i);System.out.println("Produced: " + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();// Consumer threadnew Thread(() -> {try {for (int i = 0; i < 20; i++) {int value = pc.consume();System.out.println("Consumed: " + value);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}

详细步骤

  1. 创建锁和条件变量

    • 使用 ReentrantLock 创建一个锁实例。
    • 使用锁的 newCondition() 方法创建两个条件变量:notFullnotEmpty
  2. 生产者线程逻辑

    • 获取锁,检查缓冲区是否满。
    • 如果缓冲区满,则调用 notFull.await() 使当前线程等待。
    • 插入数据,更新缓冲区状态。
    • 调用 notEmpty.signal() 唤醒等待数据的消费者线程。
    • 释放锁。
  3. 消费者线程逻辑

    • 获取锁,检查缓冲区是否空。
    • 如果缓冲区空,则调用 notEmpty.await() 使当前线程等待。
    • 取出数据,更新缓冲区状态。
    • 调用 notFull.signal() 唤醒等待空间的生产者线程。
    • 释放锁。

性能分析

  • 等待和唤醒机制:条件变量提供的 await()signal() 方法可以有效地协调多线程间的等待和唤醒,避免了繁忙等待(busy-waiting)带来的 CPU 资源浪费。
  • 锁的开销:使用 ReentrantLockCondition 需要在获取和释放锁时进行相应的开销,但相比繁忙等待,这种开销通常是值得的。

注意事项

  1. 条件检查:在调用 await() 前需要检查条件,以防止虚假唤醒。
  2. 中断处理await() 方法会响应中断,需要在使用时处理 InterruptedException
  3. 避免死锁:确保在每个 await() 调用后都有相应的 signal()signalAll() 调用,以防止线程永久等待导致死锁。

总结

条件变量 (Condition) 提供了一种灵活的线程间协调机制,允许线程在特定条件下等待和唤醒。它必须与锁 (Lock) 配合使用,适用于生产者-消费者模式、线程同步等场景。通过合理使用条件变量,可以有效地协调多线程间的协作,提高并发程序的性能和可靠性。

LinkedBlockingQueue

LinkedBlockingQueue 是 Java 并发包 (java.util.concurrent) 提供的一种基于链表实现的阻塞队列。它具有可选的容量限制,可以用于实现生产者-消费者模式,支持并发环境下的高效队列操作。

主要特点

  1. 基于链表实现

    • 内部使用链表存储元素,不同于基于数组的 ArrayBlockingQueue
  2. 可选容量限制

    • 可以在创建时指定容量限制,如果不指定,默认容量为 Integer.MAX_VALUE
  3. 线程安全

    • LinkedBlockingQueue 使用了独立的锁来控制插入和移除操作,减少了锁争用,提高了并发性能。
  4. 阻塞操作

    • 提供阻塞的插入 (put) 和移除 (take) 操作,当队列满或空时相应操作将阻塞。

常用方法

  • put(E e):将元素插入到队列末尾,如果队列已满,则等待直到有空间。
  • take():从队列头部移除并返回元素,如果队列为空,则等待直到有元素可用。
  • offer(E e, long timeout, TimeUnit unit):将元素插入到队列末尾,如果队列已满,则等待指定的时间,如果仍无法插入则返回 false
  • poll(long timeout, TimeUnit unit):从队列头部移除并返回元素,如果队列为空,则等待指定的时间,如果仍无元素可用则返回 null

示例代码

以下是一个使用 LinkedBlockingQueue 实现生产者-消费者模式的示例:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class ProducerConsumerExample {private static final int BUFFER_SIZE = 10;private static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BUFFER_SIZE);public static void main(String[] args) {// Producer threadnew Thread(new Producer()).start();// Consumer threadnew Thread(new Consumer()).start();}static class Producer implements Runnable {@Overridepublic void run() {try {for (int i = 0; i < 20; i++) {queue.put(i);  // Blocks if the queue is fullSystem.out.println("Produced: " + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class Consumer implements Runnable {@Overridepublic void run() {try {while (true) {Integer value = queue.take();  // Blocks if the queue is emptySystem.out.println("Consumed: " + value);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

详细步骤

  1. 创建队列

    • 使用 LinkedBlockingQueue 创建一个具有固定容量的阻塞队列 queue
  2. 生产者线程逻辑

    • 使用 put() 方法将元素插入到队列末尾。如果队列已满,则阻塞直到队列有空闲位置。
  3. 消费者线程逻辑

    • 使用 take() 方法从队列头部移除元素。如果队列为空,则阻塞直到队列有可用元素。

性能分析

  • 高并发性:由于 LinkedBlockingQueue 使用了独立的锁来控制插入和移除操作,可以减少锁争用,提高并发性能。
  • 灵活性:可以根据需要设置容量限制,灵活控制队列大小。

注意事项

  1. 容量限制

    • 对于有界队列,需要合理设置容量以避免频繁的阻塞和等待。
  2. 异常处理

    • 阻塞操作会响应中断,需要处理 InterruptedException
  3. 性能开销

    • 阻塞队列在高并发环境下可能会引入锁竞争,需要在设计时考虑性能影响。

总结

LinkedBlockingQueue 是 Java 并发编程中的一个重要工具,提供了基于链表的线程安全阻塞队列操作。通过合理使用 LinkedBlockingQueue,可以有效地实现生产者-消费者模式,确保多线程环境下的安全性和高效性。不同于基于数组的 ArrayBlockingQueueLinkedBlockingQueue 在插入和移除操作上使用独立锁,可以在高并发环境下提供更好的性能。

PriorityBlockingQueue

PriorityBlockingQueue 是 Java 并发包 (java.util.concurrent) 中提供的一个基于优先级堆的线程安全队列。它是一种无界的阻塞队列,支持对元素进行自然顺序或自定义顺序的优先级排序。

主要特点

  1. 基于堆实现

    • 内部采用二叉堆(通常是最小堆)来维护元素的优先级顺序。
  2. 无界队列

    • 没有容量限制,队列的大小只受限于可用的内存量。
  3. 线程安全

    • 支持并发访问,通过内部锁机制实现线程安全。
  4. 优先级排序

    • 元素按自然顺序(或通过自定义比较器指定的顺序)排序。
  5. 阻塞操作

    • 提供阻塞的插入 (put) 和移除 (take) 操作,当队列为空时相应操作将阻塞。

常用方法

  • put(E e):将元素插入队列。对于 PriorityBlockingQueue,该方法等同于 offer(E e),因为它是无界的。
  • take():从队列中移除并返回优先级最高的元素,如果队列为空,则等待直到有元素可用。
  • offer(E e, long timeout, TimeUnit unit):将元素插入队列,等待指定的时间,如果队列无法接受元素则返回 false
  • poll(long timeout, TimeUnit unit):从队列中移除并返回优先级最高的元素,等待指定的时间,如果队列为空则返回 null

示例代码

以下是一个使用 PriorityBlockingQueue 的示例:

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;public class PriorityBlockingQueueExample {private static final BlockingQueue<Task> queue = new PriorityBlockingQueue<>();public static void main(String[] args) {// Producer threadnew Thread(new Producer()).start();// Consumer threadnew Thread(new Consumer()).start();}static class Task implements Comparable<Task> {private final int priority;private final String name;public Task(int priority, String name) {this.priority = priority;this.name = name;}public int getPriority() {return priority;}public String getName() {return name;}@Overridepublic int compareTo(Task o) {return Integer.compare(this.priority, o.priority);}@Overridepublic String toString() {return "Task{name='" + name + "', priority=" + priority + '}';}}static class Producer implements Runnable {@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {queue.put(new Task((int) (Math.random() * 10), "Task" + i));System.out.println("Produced: Task" + i);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class Consumer implements Runnable {@Overridepublic void run() {try {while (true) {Task task = queue.take();  // Blocks if the queue is emptySystem.out.println("Consumed: " + task);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

详细步骤

  1. 创建队列

    • 使用 PriorityBlockingQueue 创建一个阻塞队列 queue
  2. 生产者线程逻辑

    • 使用 put() 方法将元素插入队列。由于 PriorityBlockingQueue 是无界的,put() 方法不会阻塞。
  3. 消费者线程逻辑

    • 使用 take() 方法从队列中移除并返回优先级最高的元素。如果队列为空,则阻塞直到有可用元素。

性能分析

  • 高并发性PriorityBlockingQueue 使用内部锁机制来保证线程安全,适用于高并发环境。
  • 优先级处理:基于堆的数据结构提供了高效的优先级排序和检索操作。

注意事项

  1. 无界特性

    • PriorityBlockingQueue 是无界的,在高负载情况下可能会导致内存占用过多,需要注意容量控制。
  2. 元素排序

    • 元素需要实现 Comparable 接口或者提供 Comparator 来定义优先级顺序。
  3. 性能开销

    • 由于使用锁机制来保证线程安全,可能会有一定的性能开销,特别是在高并发情况下。

总结

PriorityBlockingQueue 是 Java 并发编程中的一个重要工具,提供了基于优先级堆的阻塞队列操作。通过合理使用 PriorityBlockingQueue,可以在多线程环境下高效地处理优先级任务调度。它在处理需要优先级排序的任务队列时表现优越,如调度系统、任务管理器等。

DelayQueue

DelayQueue 是 Java 并发包(java.util.concurrent)中的一个实现,用于在一段时间后才能取出元素的阻塞队列。该队列中的元素必须实现 Delayed 接口,并且只有在延迟期满时才能从队列中提取元素。

主要特点

  1. 延迟队列

    • 元素只有在其延迟时间到期后才能被提取。
  2. 无界队列

    • 它是无界的,队列的大小仅受限于可用内存量。
  3. 线程安全

    • 通过内部锁机制实现线程安全。
  4. 基于优先级堆实现

    • 内部使用优先级堆来存储元素,确保延迟期满的元素总是优先出队。

常用方法

  • put(E e):将元素插入队列。由于 DelayQueue 是无界的,此方法永远不会阻塞。
  • take():从队列中取出并移除延迟期满的元素。如果没有延迟期满的元素,则等待。
  • poll(long timeout, TimeUnit unit):从队列中取出并移除延迟期满的元素,等待指定的时间,如果没有延迟期满的元素,则返回 null
  • peek():检索但不移除延迟期满的元素,如果没有这样的元素,则返回 null

示例代码

以下是一个使用 DelayQueue 的示例:

import java.util.concurrent.Delayed;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;public class DelayQueueExample {public static void main(String[] args) throws InterruptedException {DelayQueue<DelayedTask> queue = new DelayQueue<>();// Adding tasks to the DelayQueuequeue.put(new DelayedTask("Task1", 5, TimeUnit.SECONDS));queue.put(new DelayedTask("Task2", 10, TimeUnit.SECONDS));queue.put(new DelayedTask("Task3", 1, TimeUnit.MINUTES));while (!queue.isEmpty()) {DelayedTask task = queue.take();  // This will block until the task's delay has expiredSystem.out.println("Executing: " + task.getName());}}static class DelayedTask implements Delayed {private final String name;private final long startTime;public DelayedTask(String name, long delay, TimeUnit unit) {this.name = name;this.startTime = System.currentTimeMillis() + unit.toMillis(delay);}public String getName() {return name;}@Overridepublic long getDelay(TimeUnit unit) {long diff = startTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {if (this.startTime < ((DelayedTask) o).startTime) {return -1;}if (this.startTime > ((DelayedTask) o).startTime) {return 1;}return 0;}}
}

详细步骤

  1. 创建队列

    • 使用 DelayQueue 创建一个延迟队列 queue
  2. 定义延迟任务

    • 创建一个实现 Delayed 接口的任务类 DelayedTask。在这个类中,定义延迟时间和任务名称,并实现 getDelaycompareTo 方法。
  3. 添加任务

    • 使用 put 方法将任务添加到队列中。
  4. 取出并执行任务

    • 使用 take 方法从队列中取出并移除延迟期满的任务。如果没有延迟期满的任务,take 方法将阻塞直到有任务到期。

性能分析

  • 高并发性

    • DelayQueue 使用内部锁机制来保证线程安全,适用于高并发环境。
  • 时间复杂度

    • 插入和移除操作的时间复杂度为 O(log n),因为内部使用优先级堆进行排序。

注意事项

  1. 无界特性

    • DelayQueue 是无界的,在高负载情况下可能会导致内存占用过多,需要注意容量控制。
  2. 延迟时间

    • 队列中的元素必须实现 Delayed 接口,并且延迟时间的计算必须合理准确。
  3. 性能开销

    • 由于使用锁机制来保证线程安全,可能会有一定的性能开销,特别是在高并发情况下。

总结

DelayQueue 是一个用于在特定延迟时间后才能取出元素的高效阻塞队列。它在需要对任务进行延迟处理的场景中非常有用,如定时任务调度、缓存过期处理等。通过合理使用 DelayQueue,可以在多线程环境下高效地管理延迟任务。

SynchronousQueue

SynchronousQueue 是 Java 并发包(java.util.concurrent)中的一个特殊阻塞队列。与其他阻塞队列不同,SynchronousQueue 不存储元素。每个插入操作必须等待一个对应的移除操作,反之亦然。因此,它适用于需要直接传递数据或任务的高并发场景。

主要特点

  1. 不存储元素

    • 队列本身不存储任何元素,每个插入操作必须等待一个移除操作,反之亦然。
  2. 适用于传递性设计

    • 适用于需要在线程之间直接传递数据或任务的场景。
  3. 高并发

    • 由于没有容量限制,插入和移除操作可以实现高并发性能。

常用方法

  • put(E e):将元素插入队列。此方法将阻塞,直到另一个线程调用 take 方法移除元素。
  • take():从队列中移除元素。此方法将阻塞,直到另一个线程调用 put 方法插入元素。
  • offer(E e, long timeout, TimeUnit unit):尝试在指定时间内将元素插入队列,如果在超时时间内没有配对的移除操作,则返回 false
  • poll(long timeout, TimeUnit unit):尝试在指定时间内从队列中移除元素,如果在超时时间内没有配对的插入操作,则返回 null

示例代码

以下是一个使用 SynchronousQueue 的示例:

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class SynchronousQueueExample {public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue<>();ExecutorService executor = Executors.newFixedThreadPool(2);// 生产者executor.submit(() -> {try {System.out.println("Putting item into the queue...");queue.put("Hello");System.out.println("Item placed into the queue.");} catch (InterruptedException e) {Thread.currentThread().interrupt();}});// 消费者executor.submit(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("Taking item from the queue...");String item = queue.take();System.out.println("Item taken from the queue: " + item);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});executor.shutdown();}
}

详细步骤

  1. 创建队列

    • 使用 SynchronousQueue 创建一个同步队列 queue
  2. 创建生产者和消费者

    • 使用 ExecutorService 创建一个固定大小的线程池,分别提交生产者和消费者任务。
    • 生产者线程尝试将元素放入队列,并阻塞直到消费者线程取走该元素。
    • 消费者线程等待一段时间后,从队列中取出元素。
  3. 启动线程池

    • 启动线程池,执行生产者和消费者任务。

性能分析

  • 高并发性

    • SynchronousQueue 由于不存储元素,每个插入和移除操作都必须配对,可以实现高并发性能。
  • 低延迟

    • 直接传递元素,不需要存储和检索操作,延迟较低。

注意事项

  1. 使用场景

    • 适用于需要在线程之间直接传递数据或任务的场景,如线程池中的任务提交和执行。
  2. 阻塞行为

    • 插入和移除操作都是阻塞的,必须确保生产者和消费者线程能够及时配对。
  3. 容量限制

    • 由于不存储元素,没有容量限制,可能会导致某些线程长时间阻塞,需要合理设计生产者和消费者的配对策略。

总结

SynchronousQueue 是一个用于在线程之间直接传递数据的高效阻塞队列。它在不存储元素的情况下实现了高并发性能,适用于需要直接传递任务或数据的场景。通过合理使用 SynchronousQueue,可以在多线程环境下实现高效的数据传递和任务调度。

BlockingQueue in Spring

在 Spring 中的使用场景

  1. 任务调度和执行
    • Spring Task Executor:Spring 提供了 TaskExecutor 接口,用于异步任务的执行。常见的实现如 ThreadPoolTaskExecutor,可以通过 BlockingQueue 来管理任务队列,控制线程池的任务调度。
    • Scheduled Tasks:在使用 Spring 的 @Scheduled 注解进行定时任务调度时,可以使用 BlockingQueue 来存储和管理定时任务。
  2. 异步事件处理
    • ApplicationEvent:在 Spring 事件机制中,可以使用 BlockingQueue 来实现异步事件处理。例如,使用 LinkedBlockingQueue 存储事件,异步处理这些事件,避免主线程阻塞。
  3. 消息队列
    • Spring Integration:在 Spring Integration 中,可以使用 BlockingQueue 实现内存消息队列,处理消息的发送和接收。例如,使用 QueueChannel 来处理消息的异步传递。

ThreadPoolTaskExecutor 中的 BlockingQueue

ThreadPoolTaskExecutor 使用了 BlockingQueue 来存储待执行的任务。在 Spring 配置中,开发者可以指定不同的 BlockingQueue 实现类来控制任务队列的行为。

ThreadPoolTaskExecutor 源码分析

以下是 ThreadPoolTaskExecutor 的核心部分源码,重点展示了如何使用 BlockingQueue 来管理任务队列:

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {// 核心线程池大小private int corePoolSize = 1;// 最大线程池大小private int maxPoolSize = Integer.MAX_VALUE;// 队列容量private int queueCapacity = Integer.MAX_VALUE;// 线程池中线程的存活时间(秒)private int keepAliveSeconds = 60;// 线程池的具体实现private ThreadPoolExecutor threadPoolExecutor;// 初始化方法@Overridepublic void afterPropertiesSet() {initializeExecutor(threadFactory, rejectedExecutionHandler);}protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);this.threadPoolExecutor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);return this.threadPoolExecutor;}protected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new LinkedBlockingQueue<>(queueCapacity);} else {return new SynchronousQueue<>();}}@Overridepublic void execute(Runnable task) {this.threadPoolExecutor.execute(task);}
}
关键部分解析
  1. 核心配置参数corePoolSize, maxPoolSize, queueCapacity, keepAliveSeconds 这些参数用于配置线程池的行为。
  2. 初始化线程池initializeExecutor 方法中,创建了一个 BlockingQueue 实例,并用它初始化 ThreadPoolExecutor
  3. 创建阻塞队列createQueue 方法根据 queueCapacity 的值决定使用哪种 BlockingQueue 实现。
    • 如果 queueCapacity > 0,则使用 LinkedBlockingQueue
    • 如果 queueCapacity <= 0,则使用 SynchronousQueue

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/28191.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

60.WEB渗透测试-信息收集- 端口、目录扫描、源码泄露(8)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;59.WEB渗透测试-信息收集- 端口、目录扫描、源码泄露&#xff08;7&#xff09; 御剑是用…

人工智能模型组合学习的理论和实验实践

组合学习&#xff0c;即掌握将基本概念结合起来构建更复杂概念的能力&#xff0c;对人类认知至关重要&#xff0c;特别是在人类语言理解和视觉感知方面。这一概念与在未观察到的情况下推广的能力紧密相关。尽管它在智能中扮演着核心角色&#xff0c;但缺乏系统化的理论及实验研…

c语言中的宏是什么?

宏的定义及用途 C语言中的宏是一种预处理指令&#xff0c;它允许程序员定义一个名称&#xff0c;该名称可以代表一段代码或一个值。宏的主要用途是简化代码的编写&#xff0c;提高代码的可读性和可维护性&#xff0c;以及实现代码的重复利用。 宏的定义使用#define指令&#…

DDPAI盯盯拍记录仪删除后的恢复方法(前后双路)

DDPAI盯盯拍行车记录仪的口碑相当不错&#xff0c;其产品一直以行车记录仪为主&#xff0c;曾经使用过比较早的产品&#xff0c;体验还不错。下面来看下这个DDPAI的视频恢复方法。 故障存储: 64G存储卡 /文件系统&#xff1a;FAT32 故障现象: 在发生事故后在记录仪上看到了…

工程设计问题---工业制冷系统的优化设计问题

参考文献&#xff1a; [1]李煜,梁晓,刘景森,等.基于改进平衡优化器算法求解工程优化问题[J/OL].计算机集成制造系统,1-34[2024-06-16].

水滴式粉碎机:玉米饲料加工的新篇章

在饲料加工业中&#xff0c;玉米作为一种重要的原料&#xff0c;其加工方式直接影响到饲料的品质以及动物对饲料的消化吸收率。近年来&#xff0c;随着科技的进步&#xff0c;越多的环保的饲料加工设备被引入到饲料生产中&#xff0c;其中&#xff0c;水滴式粉碎机以其独特的优…

Ubuntu Linux 24.04 C语言TCP/IP socket编程基础知识

socket起源于Unix&#xff0c;Unix/Linux基本哲学之一就是“一切皆文件”&#xff0c;都可以用“打开open –> 读写write/read –> 关闭close”模式来操作。Socket就是该模式的一个实现&#xff0c;socket即是一种特殊的文件&#xff0c;一些socket函数就是对其进行读/写…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 部门项目任务分配(100分) - 三语言AC题解(Python/Java/Cpp)

🍭 大家好这里是清隆学长 ,一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 💻 ACM银牌🥈| 多次AK大厂笔试 | 编程一对一辅导 👏 感谢大家的订阅➕ 和 喜欢💗 📎在线评测链接 部门项目任务分配(100分) 🌍 评测功能需要订阅专栏后私信联…

举例说明 如何通过SparkUI和日志定位任务莫名失败?

有一个Task OOM&#xff1a; 通过概览信息&#xff0c;发现Stage 10的Task 36失败了4次导致Job失败。概览信息中显示最后一次失败的退出代码&#xff08;exit code&#xff09;是143&#xff0c;意味着发生了内存溢出&#xff08;OOM&#xff0c;即Out of Memory&#xff09;。…

mariadb

MariaDB安装配置、使用、授权、增删改查以及数据库备份与恢复 MariaDB安装配置、使用、授权、增删改查以及数据库备份与恢复_mariadb安装及配置教程-CSDN博客mariadb 恢复&#xff1a; ERROR! MySQL server PID file could not be found! 170104 23:04:21 InnoDB: The InnoD…

共同基金(Mutual Funds)是什么?各种不同的基金有什么区别?

共同基金是什么 中文版 共同基金 共同基金是一种投资工具&#xff0c;通过汇集众多投资者的资金&#xff0c;购买一组多样化的股票、债券或其他证券。共同基金由专业的投资组合经理管理&#xff0c;他们旨在实现特定的投资目标。以下是对共同基金的详细介绍&#xff1a; 共…

探索Edge

目录 1.概述 1.1.什么是浏览器 1.2.浏览器的作用 2.Edge 2.1.什么是Edge 2.2.诞生背景 2.3.历史版本 2.4.作用 2.5.优缺点 2.5.1.优点 2.5.2.缺点 3.对比 3.1.和360浏览器的对比 3.2.和谷歌浏览器&#xff08;Chrome&#xff09;的对比 4.未来展望 5.总结 1.概…

从“产品的RFM分析”看如何探索“职业方向”

我们在做产品分析时&#xff0c;经常会用到一种方法“产品的RFM分析”&#xff0c;它是一种客户细分和价值评估的常用方法&#xff0c;广泛应用于电子商务、零售和其他众多行业&#xff0c;它可以帮助企业和产品团队更好地理解用户行为&#xff0c;优化营销策略&#xff0c;提升…

Python基础教程(二十二):XML解析

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…

揭秘newSingleThreadExecutor:深度解析与源码探秘

1. 概述 newSingleThreadExecutor是Java线程池框架中Executors类的一个静态方法,它返回一个线程池实例,该线程池维护一个单一的工作线程来执行任务。这个线程池的特性在于它保证了所有提交的任务会按照它们在队列中的顺序依次执行,而不会并发执行。它适用于需要保证任务顺序…

0 算法复杂度

算法复杂度 时间复杂度有关总结 一&#xff0c;常数时间的操作【基本操作】 常数时间——固定时间——O&#xff08;1&#xff09;——由实现细节决定 不会随着输入规模的变化而增加时间复杂度 1 基本操作解析 1.算数操作: ab a-b a*b a/b int a 32位 int b 32位11 178997…

Linux--MQTT(二)通信基本原理

一、MQTT 通信基本原理 MQTT 是一种基于 客户端 - 服务端 架构的消息传输协议&#xff0c;所以在 MQTT 协议通信中&#xff0c;有两个最为重要的角色&#xff0c;它们便是服务端 和 客户端 。 举例&#xff1a;若开发板向“芯片温度”这一主题发布消息&#xff0c;那么服务…

cocos开发的时候 wx.onShow在vscode里面显示红色

这个函数是在微信小游戏平台才会用到。 cocos识别不到wx这个变量。 可以改成下面的写法。 只要在变量前面加一个globalThis.就能识别这个变量了。也不报错了。 搞死强迫症了。orz 欢迎大家来玩我的微信小游戏。多多提意见啊。

欧阳修,仕途波澜中的文坛巨匠

欧阳修&#xff0c;字永叔&#xff0c;号醉翁、六一居士&#xff0c;生于北宋真宗景德四年&#xff08;公元1007年&#xff09;&#xff0c;卒于北宋神宗熙宁五年&#xff08;公元1072年&#xff09;&#xff0c;享年65岁。他是北宋时期著名的文学家、史学家&#xff0c;也是唐…

计算机缺失d3dcompiler_43.dll怎么办,介绍5种靠谱的解决方法

在电脑使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“找不到d3dcompiler43.dll”的错误。那么&#xff0c;d3dcompiler43.dll到底是什么&#xff1f;为什么会出现丢失的情况&#xff1f;它对计算机有什么具体影响&#xff1f;如何解决这个问题&a…