👏作者简介:大家好,我是若明天不见,BAT的Java高级开发工程师,CSDN博客专家,后端领域优质创作者
📕系列专栏:多线程及高并发系列
📕其他专栏:微服务框架系列、MySQL系列、Redis系列、Leetcode算法系列、GraphQL系列
📜如果感觉博主的文章还不错的话,请👍点赞收藏关注👍支持一下博主哦❤️
✨时间是条环形跑道,万物终将归零,亦得以圆全完美
并发集合及线程池详解
- BlockingQueue
- BlockingQueue & BlockingDeque 对比
- ArrayBlockingQueue & LinkedBlockingQueue 对比
- PriorityBlockingQueue
- DelayQueue
- Future
- FutureTask
- ThreadPoolExecutor 线程池
- 配置参数
- 合理配置线程池
- 线程池监控
- 动态化线程池
- @Async 自定义线程池
- 线程池和 ThreadLocal 共用的坑
多线程及高并发系列
- 【多线程及高并发 一】内存模型及理论基础
- 【多线程及高并发 二】线程基础及线程中断同步
- 【多线程及高并发 三】volatile & synchorized 详解
- 【多线程与高并发 四】CAS、Unsafe 及 JUC 原子类详解
- 【多线程及高并发 五】AQS & ReentranLock 详解
- 【多线程及高并发 番外篇】虚拟线程怎么被 synchronized 阻塞了?
在 Java 并发编程中,BlockingQueue
、Future
、FutureTask
和ThreadPoolExecutor
是相互关联的重要概念和组件
- BlockingQueue:是一个支持线程安全的、阻塞操作的队列。提供了线程间的数据传递机制
- Future:是一个接口,表示一个异步计算的结果。提供了异步任务的结果获取机制
- FutureTask: 是
Future
的实现类,同时也是一个可执行的任务 - ThreadPoolExecutor:线程池是一个线程管理的工具,用于管理和复用线程资源。管理和调度任务的执行,将任务封装成
FutureTask
并通过BlockingQueue
进行交互
BlockingQueue
BlockingQueue
是一个支持线程安全的、阻塞操作的队列,它的实现类都有这两个特性,在后文介绍时就不详细介绍了。常见的实现类有ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
等。BlockingQueue
在并发编程中广泛应用于实现生产者-消费者模式,其中生产者将数据放入队列,消费者从队列中取出数据进行处理
BlockingQueue 的阻塞操作(如 put() 和 take())可以确保生产者和消费者之间的同步,避免了线程之间的竞争条件
BlockingQueue
类型:
ArrayBlockingQueue
:由数组结构组成的有界阻塞队列LinkedBlockingQueue
:由链表结构组成的有界阻塞队列PriorityBlockingQueue
:支持优先级排序的无界阻塞队列DealyQueue
:使用优先级队列实现的无界阻塞队列SynchronousQueue
:不存储元素的阻塞队列LinkedTransferQueue
:由链表结构组成的无界阻塞队列LinkedBlockingDeque
:由链表结构组成的双向阻塞队列
ConcurrentLinkedQueue
是一个线程安全的无界队列实现,队列按照FIFO
原则对元素进行排序。使用 链表数据结构 和 CAS 操作来实现高并发的插入和提取操作
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 | 特定值 | 阻塞 | 超时 |
---|---|---|---|
插入 | add(o) | offer(o) | put(o) |
移除 | remove() | poll() | take() |
检查 | element() | peek() |
BlockingQueue & BlockingDeque 对比
BlockingQueue
和BlockingDeque
是Java中用于多线程编程的接口,它们都提供了阻塞操作的功能,但在使用方式和特性上有一些异同。
相同点:
- 都是用于在多线程环境下进行安全的数据交换的接口
- 都提供了阻塞操作,即在队列为空时,获取元素的操作会被阻塞,直到队列中有元素可用;在队列已满时,插入元素的操作会被阻塞,直到队列有空闲位置
不同点:
-
数据结构差异:
BlockingQueue
是一种队列,它按照先进先出(FIFO)的顺序处理元素BlockingDeque
是一种双端队列,它允许在队列的两端进行插入和提取操作
-
操作的位置差异:
BlockingQueue
的操作只涉及到队列的一端,即插入和提取操作只发生在队列的一端BlockingDeque
的操作可以在队列的两端进行,可以在队列的头部和尾部进行插入和提取操作
BlockingDeque
为双端队列,因此可以根据使用方式模拟堆或栈的特性
BlockingDeque
模拟栈的使用示例:
public class BlockingDequeStackExample {private BlockingDeque<Integer> stack;public BlockingDequeStackExample() {// 创建一个双端阻塞队列作为栈的实现stack = new LinkedBlockingDeque<>();}public void push(int element) {// 在队列的头部插入元素,模拟入栈操作。同addFirst方法stack.push(element);System.out.println("Pushed element: " + element);}public int pop() {// 从队列的头部提取元素,模拟出栈操作。同removeFirst方法int element = stack.pop();System.out.println("Popped element: " + element);return element;}public static void main(String[] args) {BlockingDequeStackExample stackExample = new BlockingDequeStackExample();// 模拟入栈和出栈操作stackExample.push(1);stackExample.push(2);stackExample.push(3);stackExample.pop();stackExample.pop();stackExample.pop();}
}
ArrayBlockingQueue & LinkedBlockingQueue 对比
ArrayBlockingQueue
和LinkedBlockingQueue
都是Java中的阻塞队列,一个是数组结构,一个是链表结构
异同点如下:
-
实现方式:
ArrayBlockingQueue
基于数组实现,内部使用ReentrantLock
来保证线程安全LinkedBlockingQueue
基于链表实现,内部使用两个锁(一个用于生产者,一个用于消费者)来保证线程安全
-
长度限制:
ArrayBlockingQueue
在创建时需要指定一个固定的容量,即队列的长度是固定的,不能动态改变LinkedBlockingQueue
可以选择在创建时指定一个可选的固定容量,如果未指定,则默认为 Integer.MAX_VALUE,即队列长度可以无限扩展
-
内存消耗:
ArrayBlockingQueue
使用数组作为底层数据结构,因此在创建时需要预分配固定大小的内存空间,即使队列中只有少量元素,也会占用整个数组的空间LinkedBlockingQueue
使用链表作为底层数据结构,内存空间按需分配,只会占用实际元素所需的内存空间
-
公平性:
ArrayBlockingQueue
和LinkedBlockingQueue
都支持公平性设置。公平性表示线程是否按照它们加入队列的顺序来获取元素。当设置为公平模式时,线程将按照先进先出的顺序获取元素,但会对性能产生一定影响。默认情况下,ArrayBlockingQueue
和LinkedBlockingQueue
都是非公平,ArrayBlockingQueue
可以通过构造函数指定使用公平锁的ReentranLock
-
性能差异:
- 由于内部实现方式不同,
ArrayBlockingQueue
在高并发环境下的性能通常优于LinkedBlockingQueue
。这是因为ArrayBlockingQueue
使用单锁来保证线程安全,而LinkedBlockingQueue
使用两个锁,增加了一些额外的开销
- 由于内部实现方式不同,
根据具体的使用场景和需求,可以选择适合的阻塞队列实现。最后根据场景,控制变量后分别压测,选择最合适的阻塞队列
PriorityBlockingQueue
PriorityBlockingQueue
是Java中的一个基于优先级的无界阻塞队列。它具有以下特性:
- 按优先级排序:
PriorityBlockingQueue
会根据元素的优先级进行排序。优先级高的元素在队列中排在前面。元素的优先级可以通过元素自身的比较器(Comparator
)或者元素自身的自然顺序来确定 - 无界队列:
PriorityBlockingQueue
没有容量限制,可以根据需要动态地添加元素。它不会出现因队列已满而阻塞添加操作的情况 - 线程安全
- 阻塞操作
import java.util.concurrent.PriorityBlockingQueue;public class PriorityBlockingQueueExample {public static void main(String[] args) {// 创建一个PriorityBlockingQueue实例PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();// 添加元素到队列中queue.offer(5);queue.offer(3);queue.offer(1);queue.offer(4);queue.offer(2);// 提取并打印队列中的元素 12345while (!queue.isEmpty()) {int element = queue.poll();System.out.println("Polled element: " + element);}}
}
在实际应用中,PriorityBlockingQueue可用于实现任务调度、优先级队列等场景,其中需要按照优先级处理元素
DelayQueue
DelayQueue是Java中的一个基于延迟时间的阻塞队列。它具有以下特性:
- 延迟处理:
DelayQueue
中的元素必须实现Delayed
接口。Delayed
接口定义了一个getDelay(TimeUnit unit)
方法,用于获取元素的剩余延迟时间。只有当延迟时间小于等于零时,元素才可以从队列中提取 - 按延迟时间排序:
DelayQueue
根据元素的延迟时间进行排序。延迟时间越短的元素在队列中排在前面 - 无界队列:
DelayQueue
没有容量限制,可以根据需要动态地添加元素。它不会出现因队列已满而阻塞添加操作的情况 - 线程安全
- 阻塞操作
在下述示例中,我们创建了一个DelayQueue
实例,并添加了一些延迟元素。延迟元素的延迟时间通过构造函数指定,并在getDelay
方法中计算剩余延迟时间
public class DelayQueueExample {static class DelayedElement implements Delayed {private String value;private long endTime;public DelayedElement(String value, long delayMs) {this.value = value;this.endTime = System.currentTimeMillis() + delayMs;}@Overridepublic long getDelay(TimeUnit unit) {long remainingTime = endTime - System.currentTimeMillis();return unit.convert(remainingTime, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed other) {long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);return Long.compare(diff, 0);}@Overridepublic String toString() {return "DelayedElement{" +"value='" + value + '\'' +", endTime=" + endTime +'}';}}public static void main(String[] args) {// 创建一个DelayQueue实例DelayQueue<DelayedElement> queue = new DelayQueue<>();// 添加延迟元素到队列中queue.offer(new DelayedElement("Element 1", 2000));queue.offer(new DelayedElement("Element 2", 5000));queue.offer(new DelayedElement("Element 3", 3000));// 提取并打印延迟元素while (!queue.isEmpty()) {try {DelayedElement element = queue.take();System.out.println("Polled element: " + element);} catch (InterruptedException e) {e.printStackTrace();}}}
}
使用take
方法从队列中提取延迟元素,并将其打印出来。由于DelayQueue
是一个阻塞队列,当队列为空时,提取操作会被阻塞,直到有元素的延迟时间到期
输出结果示例
Polled element: DelayedElement{value='Element 1', endTime=1641418006091}
Polled element: DelayedElement{value='Element 3', endTime=1641418009091}
Polled element: DelayedElement{value='Element 2', endTime=1641418013091}
在实际应用中,DelayQueue可用于实现定时任务、缓存过期等场景,其中需要根据延迟时间对元素进行排序和处理
Future
Future
用于异步结果计算。它提供了一些方法来检查计算是否完成,使用get
方法将阻塞线程直到结果返回
cancel
:尝试取消任务的执行,如果任务已完成或已取消,此操作无效isCancelled
:任务是否已取消isDone
:任务是否已完成get
:阻塞线程以获取计算结果,直至任务执行完毕返回结果get(long timeout, TimeUnit unit)
:阻塞线程以获取计算结果,若在指定时间没返回结果,则返回null
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
Future
结合线程池的使用
public void futureTest(){ExecutorService executorService = Executors.newFixedThreadPool(10);Future<String> nickFuture = executorService.submit(() -> userService.getNick());Future<String> nameFuture = executorService.submit(() -> userService.getUserName());// 阻塞开始,等待结果String nick = nickFuture.get(1000, TimeUnit.MILLISECONDS);String name = nameFuture.get();
}
FutureTask
FutureTask
是 Java 中的一个类,实现了Future
接口,同时也可以用作可执行的任务,特别适用于需要异步执行任务并获取结果的场景。FutureTask
常用来封装Callable
和Runnable
,将任务提交给线程池执行,并通过FutureTask
获取任务的执行结果。同时,FutureTask
也提供了一些方法来管理和控制任务的执行状态、取消任务的执行,并处理任务执行过程中的异常
FutureTask
是使用CAS操作来实现对任务状态的并发操作的。CAS机制保证了对任务状态的更新操作是原子性的,避免了竞态条件和数据不一致的问题
FutureTask
主要包括以下几种状态:
- NEW:任务的初始状态,表示任务尚未执行
- COMPLETING:表示任务正在执行完成的过程中,但结果尚未设置完毕
- NORMAL:任务执行成功完成
- EXCEPTIONAL:任务执行过程中发生了异常
- CANCELLED:任务被取消
- INTERRUPTING:任务正在被中断的过程中
- INTERRUPTED:任务被中断
ThreadPoolExecutor 线程池
ThreadPoolExecutor 是 Java 中 Executor 框架提供的一个线程池实现类。它提供了一种方便的方式来管理和复用线程,并执行提交的任务。线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程
常用默认实现:
Executors#newCachedThreadPool
:无边界线程池,带有自动线程回收Executors#newFixedThreadPool
:固定大小的线程池Executors#newSingleThreadExecutor
:单个后台线程,大多数场景用于预初始化配置
有需要执行的任务进入线程池时
- 当前线程数小于核心线程数时,创建线程。
- 当前线程数大于等于核心线程数,且工作队列未满时,将任务放入工作队列。
- 当前线程数大于等于核心线程数,且工作队列已满
- 若线程数小于最大线程数,创建线程
- 若线程数等于最大线程数,抛出异常,拒绝任务(具体处理方式取决于
handler
的策略)
当ThreadPoolExecutor
执行execute
方法时,当前worker数小于corePoolSize
,会调用addWorker
方法,而workers.add(w)
是在ReentranLock全局锁里执行的,可能会导致以下问题:
- 阻塞其他线程:在
ReentrantLock
的锁范围内执行workers.add(w)
操作,那么其他线程在尝试获取该锁时将被阻塞,直到当前线程释放锁。这可能会导致其他线程在等待期间出现延迟或阻塞 - 性能下降:它们将按顺序等待
ReentrantLock
的释放。这可能导致线程竞争和延迟,从而降低整体性能
预热线程池是一种优化方法,可以在系统启动时提前创建一定数量的线程,以减少在系统运行时动态创建线程的开销
配置参数
-
corePoolSize
核心线程数。空闲时仍会保留在池中的线程数,除非设置了allowCoreThreadTimeOut
参数 -
maximumPoolSize
最大线程数。允许在池中的最大线程数 -
keepAliveTime
存活时间。当前线程数大于核心线程数时,空余线程的最长存活时间 -
unit
单位。keepAliveTime
参数的时间单位 -
workQueue
工作队列,接口类为阻塞队列。任务执行前存储的队列,只有通过submit
方法提交的任务才会进入队列 -
threadFactory
线程工厂。创建线程。默认使用Executors.defaultThreadFactory()
,所有的线程都属于同一个ThreadGroup
,都有相同的优先级,且均不是守护线程。
(可用new NamedThreadFactory("test")
来对线程池中的线程添加前缀标识) -
handler
任务丢弃策略。若线程池已经关闭、或线程池已满,那么新的任务会被拒绝。ThreadPoolExecutor.AbortPolicy
:丢弃任务并抛出RejectedExecutionException
异常ThreadPoolExecutor.DiscardPolicy
:丢弃任务,但不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(循环此过程)ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务
合理配置线程池
-
线程池必须手动通过
ThreadPoolExecutor
的构造函数来声明,避免使用Executors
类创建线程池,否则会因为使用了无界队列或任务队列最大长度为 Integer.MAX_VALUE,导致堆积大量的请求 会有 OOM 风险 -
推荐使用有界队列,可以有效地控制线程池占用的内存和其他资源的数量,且
maximumPoolSize
配置能排上用场
如果线程池的工作队列已满,但是线程池的线程数还没有达到maximumPoolSize
,那么线程池会创建新的非核心线程来处理这些任务,以避免任务积压和系统性能下降。
- corePoolSize 配置
-
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
-
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用
线程池监控
可以利用ThreadPoolExecutor
的相关API
做一个基础的监控。从下图可以看出,ThreadPoolExecutor
提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等
也可以使用 SpringBoot 中的 Actuator 组件或 有监控功能的开源动态线程池Dynamic TP
动态化线程池
通过ThreadPoolExecutor
提供的 public 方法可以动态修改参数配置
注意的是程序运行期间的时候,我们调用
setCorePoolSize()
这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize
,如果大于的话就会回收工作线程
更多动态修改线程池参数的功能,可以使用开源软件:
- Hippo4jopen:异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入。支持多种使用模式,轻松引入,致力于提高系统运行保障能力
- Dynamic TP:轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通过 SPI 自定义实现)
@Async 自定义线程池
在@Async
注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor
线程池。
默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果
spring默认线程池配置参数
org.springframework.boot.autoconfigure.task.TaskExecutionProperties
/*** Configuration properties for task execution.** @author Stephane Nicoll* @since 2.1.0*/
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {private final Pool pool = new Pool();/*** Prefix to use for the names of newly created threads.*/private String threadNamePrefix = "task-";public static class Pool {/*** Queue capacity. An unbounded capacity does not increase the pool and therefore* ignores the "max-size" property.*/private int queueCapacity = Integer.MAX_VALUE;/*** Core number of threads.*/private int coreSize = 8;/*** Maximum allowed number of threads. If tasks are filling up the queue, the pool* can expand up to that size to accommodate the load. Ignored if the queue is* unbounded.*/private int maxSize = Integer.MAX_VALUE;/*** Whether core threads are allowed to time out. This enables dynamic growing and* shrinking of the pool.*/private boolean allowCoreThreadTimeout = true;/*** Time limit for which threads may remain idle before being terminated.*/private Duration keepAlive = Duration.ofSeconds(60);//getter/setter}
}
线程池和 ThreadLocal 共用的坑
线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值
阿里开源的TransmittableThreadLocal(TTL)能解决线程池中ThreadLocal的问题。
TransmittableThreadLocal
类继承并加强了 JDK 内置的InheritableThreadLocal
类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题
参考资料:
- Java线程池实现原理及其在美团业务中的实践
- Java 线程池最佳实践
- Java 线程池作用及类型
- Java 并发编程 Future及CompletionService
- TransmittableThreadLocal(TTL)
- 案例分析|线程池相关故障梳理&总结