Java并发编程——线程池(基础,使用,拒绝策略,命名,提交方式,状态)

我是一个计算机专业研0的学生卡蒙Camel🐫🐫🐫(刚保研)
记录每天学习过程(主要学习Java、python、人工智能),总结知识点(内容来自:自我总结+网上借鉴)
希望大家能一起发现问题和补充,也欢迎讨论👏👏👏

文章目录

    • 线程池🏊
      • 线程池的好处👍
      • 线程池的创建🏗️
      • 线程池(ThreadPoolExecutor)常见参数🔢
      • 处理任务流程🔃
      • 拒绝策略⭐
        • 使用数据库任务表来自定义拒绝策略
      • 线程池中两种提交方式
      • 线程池命名♂️♀️
      • 线程池状态

线程池🏊

线程池是多线程应用中的一种资源管理技术,它旨在减少创建和销毁线程所带来的开销,并且通过复用已存在的线程来执行任务,提高响应速度。线程池提供了一种限制和管理资源(包括执行一个任务的线程)的方法。

线程池的好处👍

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的创建🏗️

  1. 使用ThreadPoolExecutor构造函数来创建自定义线程池(建议这种方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险)
  2. 通过使用 java.util.concurrent.Executors 工厂类创建

常用的线程池:

Executors 返回线程池对象特点
FixedThreadPool创建一个固定大小的线程池。如果所有线程都处于活动状态,新任务将在队列中等待,直到有线程可用。
CachedThreadPool创建一个可根据需要创建新线程的线程池,但在前一次构造的线程可用时将重用它们。适用于执行大量短期异步任务的应用程序。
SingleThreadExecutor创建一个单线程化的 Executor,它会确保所有任务都在同一个线程中按顺序执行。
ScheduledThreadPool创建一个支持定时及周期性的任务执行的线程池,类似于 Timer
WorkStealingPool创建一个具有多个任务队列的工作窃取线程池,适用于处理大量可并行的任务。

线程池对象的构造方法:

// LinkedBlockingQueue有界队列
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}// LinkedBlockingQueue 无界队列
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}// SynchronousQueue同步队列
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}...
}

线程池(ThreadPoolExecutor)常见参数🔢

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

参数解释

参数解释
corePoolSize线程池的核心线程数量:任务队列未达到队列容量时,最大可以同时运行的线程数量。
maximumPoolSize线程池的最大线程数:任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
keepAliveTime当线程数大于核心线程数时,多余的空闲线程存活的最长时间
unit时间单位,使用java.util.concurrent.TimeUnit枚举值来指定时间单位
workQueue任务队列,用来储存等待执行任务的队列
threadFactory线程工厂,用来创建线程,一般默认即可
handler拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务(常见的拒绝策略包括抛出异常、丢弃任务、执行者自身运行任务等。)

工作队列(workQueue):用于存放待执行的任务的阻塞队列。可以使用BlockingQueue接口的任何实现,常见的有:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个基于链表结构的有界阻塞队列。如果构造时未指定容量,则默认容量为Integer.MAX_VALUE,即视为无界队列
  • SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • **LinkedBlockingDeque:**一个由链表结构组成的双端阻塞队列。

img

处理任务流程🔃

ThreadPoolExecutor中最关键的execute源码:

public void execute(Runnable command) {// 先检查传入的command是否为空,若空则报错if (command == null)throw new NullPointerException();int c = ctl.get(); // 原子操作,返回线程池状态和线程计数的控制字段值// 1. 如果线程数少于corePoolSize,调用addWorker创建核心线程数if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 2. 如果线程数大于corePoolSize,将任务添加进workQueue队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 2.1 如果检查isRunning的状态为false,则remove这个任务,然后执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 2.2 线程池处于running状态,但是没有线程,则创建线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3.如果放入workQueue失败,则创建非核心线程执行任务,如果创建失败(当前线程总数不小于maximumPoolSize),就会拒绝else if (!addWorker(command, false))reject(command);
}

img

拒绝策略⭐

在Java的ThreadPoolExecutor中,当线程池无法处理新提交的任务时(例如,线程池已关闭或者队列已满),可以配置不同的拒绝策略来决定如何处理这些任务。

拒绝策略详细
ThreadPoolExecutor.AbortPolicy**默认的拒绝策略。**抛出 RejectedExecutionException来拒绝新任务的处理。
ThreadPoolExecutor.CallerRunsPolicy如果使用这种策略,那么当任务被拒绝时,该任务将在调用者的线程中执行,即直接在调用execute方法的线程中运行这个任务。这可能会降低任务提交线程的速度,从而减缓任务提交的速度,给线程池一些时间来处理队列中的任务。
ThreadPoolExecutor.DiscardPolicy该策略会悄悄地丢弃无法处理的任务,既不会抛出异常也不会通知调用者。因此,使用此策略时需要注意可能丢失任务的情况。
ThreadPoolExecutor.DiscardOldestPolicy此策略会丢弃队列中最旧的一个未处理任务,并尝试重新提交当前任务。这意味着最老的任务将从队列中移除,然后尝试将当前任务添加到队列中进行处理。
自定义拒绝策略通过实现接口可以自定义任务拒绝策略。

在日常开发中,我们不希望任务被丢弃,我们就会使用CallerRunsPolicy拒绝策略。

// 被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务,除非执行器已关闭,在这种情况下,任务将被丢弃。
public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/***在调用者的线程中执行任务r,除非执行器已关闭,在这种情况下,任务将被丢弃* @param r 请求执行的可运行任务* @param e 试图执行此任务的执行器*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}

又源代码可知:只要当前程序不关闭就会使用执行execute方法的线程执行该任务。

但是会有一定风险

如果走到CallerRunsPolicy的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。也可能会造成死锁(如果调用者线程正在等待线程池中的某个任务完成,而这个任务又依赖于调用者线程能够继续运行,那么就会形成循环依赖,进而导致死锁。)

解决方案:

  1. 继续采用CallerRunsPolicy拒绝策略

  2. 在内存允许的情况下,我们可以增加阻塞队列BlockingQueue的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。

    1. 1为了充分利用 CPU,我们还可以调整线程池的maximumPoolSize (最大线程数)参数,这样可以提高任务处理速度,避免累计在 BlockingQueue的任务过多导致内存用完。
  3. 持久化思路(自定义)

    1. 设计一个数据库表来存储到数据库中

    2. 使用Redis缓存

    3. 提交到消息队列

使用数据库任务表来自定义拒绝策略
  1. 实现RejectedExecutionHandler接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。
public class DatabaseRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 保存任务到数据库的方法saveTaskToDatabase(r);}private void saveTaskToDatabase(Runnable task) {// 实现保存任务到数据库的逻辑// 可以使用JDBC或者其他ORM框架如MyBatis、Hibernate等}
}
  1. 继承BlockingQueue实现一个混合式阻塞队列,该队列包含 JDK 自带的ArrayBlockingQueue。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从 ArrayBlockingQueue中去取任务。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class HybridBlockingQueue<E> implements BlockingQueue<E> {private final ArrayBlockingQueue<E> queue;public HybridBlockingQueue(int capacity) {this.queue = new ArrayBlockingQueue<>(capacity);}// 其他BlockingQueue接口方法的实现...@Overridepublic E take() throws InterruptedException {// 尝试从数据库中取任务E taskFromDb = takeFromDatabase();if (taskFromDb != null) {return taskFromDb;} else {// 如果数据库中没有任务,则从queue中取return queue.take();}}private E takeFromDatabase() {// 实现从数据库中取任务的逻辑// 注意这里需要根据你的E类型来确定如何反序列化或转换成任务对象return null; // 返回null表示数据库中没有任务}@Overridepublic boolean offer(E e) {return queue.offer(e);}@Overridepublic int remainingCapacity() {return queue.remainingCapacity();}// ...其他必须实现的方法,例如put(), poll(), etc.
}

线程池中两种提交方式

  1. execute`方法

void execute(Runnable command): Executor接口中的方

  1. submit方法

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

两个方法区别:

  1. 异常处理

  2. 使用execute()时,未捕获异常导致线程终止,线程池创建新线程替代

  3. 使用submit()时,异常被封装在Future中,线程继续复用。(更加灵活的错误处理机制,允许调用者决定如何处理异常)

  4. 接受参数

  5. 返回值

线程池命名♂️♀️

给线程池命名可以帮助你在调试和监控多线程应用程序时更容易识别不同的线程池。默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

  1. 使用ThreadFactory来创建一个带有自定义名称的线程池:
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 线程工厂,它设置线程名称,有利于我们定位问题。*/
public final class NamingThreadFactory implements ThreadFactory {private final AtomicInteger threadNum = new AtomicInteger();private final String name;/*** 创建一个带名字的线程池生产工厂*/public NamingThreadFactory(String name) {this.name = name;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName(name + " [#" + threadNum.incrementAndGet() + "]");return t;}
}

线程池状态

线程池有5种状态:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
线程池状态详细介绍
RUNNING表示线程池处于正常运行状态,可以接受新的任务并处理已提交的任务。
SHUTDOWN表示线程池不再接受新任务,但会继续执行已经提交的任务直到所有任务完成。通过调用 shutdown() 方法进入此状态。
STOP表示线程池不再接受新任务,并尝试停止正在执行的所有任务。通过调用 shutdownNow() 方法进入此状态。
TIDYING表示所有任务都已完成,工作线程数量为零,即将进入 TERMINATED 状态。这是一个短暂的过渡状态,在这个状态下,terminated() 钩子方法会被调用。
TERMINATED表示线程池完全终止,所有的任务都已完成并且所有的工作线程都已被销毁。

shutdownNow为STOP,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,但是这种方法的作用有限,如果线程中没有sleep、wait、Condition、定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,shutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/67288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx 配置代理,根据 不同的请求头进行转发至不同的代理

解决场景&#xff1a;下载发票的版式文件&#xff0c;第三方返回的是url链接地址&#xff0c;但是服务是部署在内网环境&#xff0c;无法访问互联网进行下载。此时需要进行走反向代理出去&#xff0c;如果按照已有套路&#xff0c;就是根据不同的访问前缀&#xff0c;跳转不同的…

设计一个流程来生成测试模型安全性的问题以及验证模型是否安全

要使用 Ollama 运行 llama3.3:70b 模型&#xff0c;并设计一个流程来生成测试模型安全性的问题以及验证模型是否安全&#xff0c;可以按照以下步骤进行设计和实现。整个过程包括环境配置、设计安全测试提示词、执行测试以及分析结果。以下是详细的步骤和指导&#xff1a; 1. 环…

iOS - TLS(线程本地存储)

从源码中&#xff0c;详细总结 TLS (Thread Local Storage) 的实现&#xff1a; 1. TLS 基本结构 // TLS 的基本结构 struct tls_data {pthread_key_t key; // 线程本地存储的键void (*destructor)(void *); // 清理函数 };// 自动释放池的 TLS class Autorelease…

docker在不删除容器的情况下修改端口映射

注意&#xff1a;必须先停止docker服务&#xff01;&#xff01;&#xff01;&#xff01; 1) 停止容器 2) 停止docker服务(systemctl stop docker) 3) 修改这个容器的hostconfig.json和config.v2.json文件中的端口 先查看容器id docker inspect jenkins 进入该目录 hostcon…

【js进阶】设计模式之单例模式的几种声明方式

单例模式&#xff0c;简言之就是一个类无论实例化多少次&#xff0c;最终都是同一个对象 原生js的几个辅助方式的实现 手写forEch,map,filter Array.prototype.MyForEach function (callback) {for (let i 0; i < this.length; i) {callback(this[i], i, this);} };con…

专题 - STM32

基础 基础知识 STM所有产品线&#xff08;列举型号&#xff09;&#xff1a; STM产品的3内核架构&#xff08;列举ARM芯片架构&#xff09;&#xff1a; STM32的3开发方式&#xff1a; STM32的5开发工具和套件&#xff1a; 若要在电脑上直接硬件级调试STM32设备&#xff0c;则…

-bash: /java: cannot execute binary file

在linux安装jdk报错 -bash: /java: cannot execute binary file 原因是jdk安装包和linux的不一致 程序员的面试宝典&#xff0c;一个免费的刷题平台

【MySQL】使用C语言链接

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;MySQL 目录 一&#xff1a;&#x1f525; MySQL connect &#x1f98b; Connector / C 使用&#x1f98b; mysql 接口介绍&#x1f98b; 完整代码样例 二&#xff1a;&#x1f525; 共勉 一&#…

平滑算法 效果比较

目录 高斯平滑 效果对比 移动平均效果比较: 高斯平滑 效果对比 右边两个参数是1.5 2 代码: smooth_demo.py import numpy as np import cv2 from scipy.ndimage import gaussian_filter1ddef gaussian_smooth_array(arr, sigma):smoothed_arr = gaussian_filter1d(arr, s…

通过ssh连接debian

使用方法 ssh usernameipaddress [inputpasswd]root用户默认无法由ssh连接&#xff0c; 可以通过修改配置 sudo vim /etc/ssh/sshd_config去掉PermitRootLogin前的‘#’,并修改为 PermitRootLogin yes 重启sshd服务 sudo systemctl restart sshd参考 https://linuxconfig.or…

Outlook 无网络连接[2604] 错误解决办法

Outlook 是微软公司开发的一款广泛使用的电子邮件客户端&#xff0c;广泛应用于个人用户和企业办公环境中。然而&#xff0c;许多用户在使用 Outlook 时可能会遇到“无网络连接”或者“错误代码 [2604]”等问题。这个错误通常会导致 Outlook 无法正常连接到邮件服务器&#xff…

.NET 9.0 的 Blazor Web App 项目中 Hash 变换(MD5、Pbkdf2) 使用备忘

一、生成 string 对应的 MD5 码 /// <summary>/// 生成 string 对应的 MD5 码/// </summary>/// <param name"str">需要转换的字符串 string&#xff1a;用于登录认证时&#xff0c;str username 线下传递的key DateTime.Now.Ticks.ToString() …

“UniApp的音频播放——点击视频进入空白+解决视频播放器切换视频时一直加载的问题”——video.js、video-js.css

今天&#xff0c;又解决了一个单子“UniApp的音频播放——点击视频进入空白解决视频播放器切换视频时一直加载的问题” 一、问题描述 在开发一个基于 video.js 的视频播放器时&#xff0c;用户通过上下滑动切换视频时&#xff0c;视频一直处于加载状态&#xff0c;无法正常播放…

P3数据结构、数据类型、数字编码、字符编码:保姆级图文详解

文章目录 前言1、数据结构分类1.1、逻辑结构&#xff1a;线性与非线性1.2、物理结构&#xff1a;连续与分散1.3、数据结构的实现方式1.4、数据结构的选择依据 2、基本数据类型2.1、定义与分类2.2、存储形式 3、数字编码3.1、原码、反码与补码3.2、浮点数编码3.3、整数与浮点数区…

【JavaScript】基础内容,HTML如何引用JavaScript, JS 常用的数据类型

HTML 嵌入 Javascript 的方式 引入外部 js 文件 <head> <script Language "javaScript" src"index.js"/> </head>内部声明 <head> <script language"javascript">function hello(){alert("hello word&qu…

解密AIGC三大核心算法:GAN、Transformer、Diffusion Models原理与应用

在当今数字化时代&#xff0c;人工智能生成内容&#xff08;AIGC&#xff09;技术正以前所未有的速度改变着我们的生活和工作方式。从创意无限的文本生成&#xff0c;到栩栩如生的图像创作&#xff0c;再到动听的音乐旋律&#xff0c;AIGC的魔力无处不在。而这一切的背后&#…

Web前端------HTML表格

一.表格标签介绍 表格&#xff0c;类似操作的软件excel一样&#xff0c;通过规范的行列方式展示数据的一种视图&#xff01; 网页中&#xff08;初级开发&#xff09;&#xff0c;对于这种规范的数据&#xff0c;使用表格标签最方便的&#xff1b; 实际开发&#xff08;高级开…

关于AWS网络架构的思考

目录&#xff1a; AWS概述 EMR Serverless AWS VPC及其网络 关于AWS网络架构的思考 在AWS K8S中部署的业务&#xff0c;有不同的流量路径。 流量进入 客户端请求 普通的客户端流量流向从前到后是: 客户端公司网关(endpoint)业务的Endpoint ServiceLoad Balancers(监听80和…

玩转随机数:用 JavaScript 掌控不可预测的魔力!

玩转随机数&#xff1a;用 JavaScript 掌控不可预测的魔力&#xff01; 当计算机遇上“随机”&#xff0c;我们能做什么&#xff1f; 你曾想过在生活中拥有“超能力”吗&#xff1f;比如&#xff0c;可以预测下一个天气变化&#xff0c;或是猜中下一个彩票号码&#xff1f;虽…

ThreeJs功能演示——几何体操作导入导出

1、内部创建几何体导出编辑能力 1&#xff09;支持内部创建的面、正方体、球体 内部创建物体时&#xff0c;如果是三维物体&#xff0c;要创建集合形状geometry&#xff0c;和对应的材质material。再一起创建一个三维物体。 // 存储创建的几何体列表const geometries [];cre…