Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️本系列源码仓库:多线程并发编程学习的多个代码片段(github)

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

目录

1.前言

1.1.内容回顾

1.2.本节任务

2.实现思路

2.1 线程工厂实现思路

2.2 核心线程与非核心线程实现思路

3.代码实现

3.1.线程池工厂实现

3.2核心线程与非核心线程逻辑

4.总结


✨️本系列源码均已上传仓库 1321928757/Concurrent-MulThread-Demo(github.com)✨️ 

(本章节可参考liushijie-240329-core分支)

1.前言

1.1.内容回顾

往期文章传送门:
Java多线程实战-从零手搓一个简易线程池(一)定义任务等待队列-CSDN博客

Java多线程实战-从零手搓一个简易线程池(二)线程池与拒绝策略实现-CSDN博客

在上一节我们实现了线程池内部的基本运转逻辑,池化了线程资源进行任务处理,细心的同学可以发现,我们上章没有划分核心线程与非核心线程的概念,在JDK官方的提供的线程池中,线程池中的线程从概念上分为核心线程和非核心线程,核心线程是线程池中长久存在的线程,默认不会被回收,而非核心线程在空闲时间超过设置的最大空闲时间时会被回收,当然,我们也可以通过设置一个属性来运行核心线程被回收。

1.2.本节任务

本章节的任务如下:

  1. 实现线程工厂
  2. 实现核心线程与非核心线程

2.实现思路

2.1 线程工厂实现思路

线程工厂是运用了工厂设计模式,可以帮助我们隐藏创建线程的一些细节。我们可以通过线程工厂在创建线程数时定义线程的一些属性,如线程名称、线程组等。实现线程工厂一般有以下步骤:

  1. 定义一个线程工厂接口或抽象类,提供创建新线程的方法。
  2. 实现该接口或继承该抽象类,重写创建线程的方法逻辑。
  3. 在线程池的构造函数中,传入自定义的线程工厂实例。

整体实现还是比较简单,主要就是要注意编码规范

2.2 核心线程与非核心线程实现思路

这里首先要清楚一个概念,JDK线程池源码中没有显式的区别核心线程和非核心线程,他只是线程池在处理线程池不同情况下的线程的一种概念。我们接下来从源码分析(JDK1.8)是如何实现核心线程和非核心线程的管理的。

JDK官方线程池中的runWorker方法作用是用来执行worker线程

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {// 线程执行任务流程,省流}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

同我们上节运行线程一样,他会通过while (task != null || (task = getTask()) != null)来重复获取任务,如果task == null,也就是没获取到,会进入到processWorkerExit函数中,线程会被回收。也就是说,只要getTask方法返回为null,就代表了当前线程需要回收,所以我们接下来重点查看getTask方法的源码:

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?// 1.方法内部使用了一个无限循环for (;;),这意味着线程会一直尝试获取任务,直到成功获取到任务或者满足退出条件。for (;;) {// 2.获取到目前线程池的线程数,最大核心线程,最大总线程数等信息int c = ctl.get();int rs = runStateOf(c);// 3.如果线程池的运行状态至少为SHUTDOWN(在此状态以上的状态,都不会接受新任务了,所以我们直接返回null)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);//获取线程池当前线程数量// 4.根据当前线程数动态判断是否要回收boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

getTask方法主要负责从workQueue队列中获取任务,如果获取到了就返回任务,如果没有获取到就返回null。他会根据线程池的当前状态,当前线程数,来动态的选择是否从workQueue中拿取任务,以及拿取操作是否是超时操作。这里的设计特别巧妙,建议阅读源码仔细体会

如果 当前线程数 > 最大核心线程数,我们就判定存在非核心线程,可以进行回收判断

如果 当前线程数 < 最大线程数,我们就判定不存在核心线程

所以核心线程和非核心线程他们都是一类线程,只是在线程池不同情况下划分的概念恶意

3.代码实现

3.1.线程池工厂实现

3.1.1.线程工厂接口
/*** @author Luckysj @刘仕杰* @description 线程工厂接口* @create 2024/03/28 20:40:18*/
public interface ThreadFactory {/*** @description* @param * @return 创建的线程对象* @date 2024/03/28 21:01:35*/Thread newThread(Runnable r);
}
3.1.2.默认线程工厂实现类

默认线程工厂实现类主要是设置新建线程的线程组,线程名前缀等等信息,更加规范,方便后续日志排查错误

/*** @author Luckysj @刘仕杰* @description 默认线程工厂,我们这里仿照源码写法,为每个线程分配线程组(默认会自动分配),并为每个线程组* @create 2024/03/28 21:27:10*/
public class DefaultThreadFactory implements ThreadFactory{/** 原子序号类,我们可以通过该类为线程工厂来获取一个随机序号,主要是为了区分不同线程池实例*/private static final AtomicInteger poolNumber = new AtomicInteger(1);/** 线程组,每个线程都需要属于一个线程组(平常使用未指定线程组会默认分配)*/private final ThreadGroup group;/** 原子序号类,我们可以通过该类为每个线程来获取一个随机序号*/private static final AtomicInteger threadNumber = new AtomicInteger(1);/** 线程名前缀,以便于在日志、监控等场景下识别和管理线程。*/private final String namePrefix;public DefaultThreadFactory() {// 获取管理安全策略的类,通过这个类我们可以获取对应名称的线程组,SecurityManager 和 group 的存在是为了更好地控制线程的安全性和权限SecurityManager s = System.getSecurityManager();// 存在 SecurityManager实例,则通过 s.getThreadGroup() 获取一个受限制的线程组。// 如果不存在 SecurityManager 实例,则使用当前线程所在的线程组 Thread.currentThread().getThreadGroup()。this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();// 生成前缀this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);// 将线程设置为用户线程if(thread.isDaemon()){thread.setDaemon(false);}// 为线程设置默认优先级if(thread.getPriority() != Thread.NORM_PRIORITY){thread.setPriority(Thread.NORM_PRIORITY);}return thread;}
}
3.1.3.使用线程工厂

在Worker工作线程构造函数中使用工厂创建线程

    class Worker implements Runnable{private Runnable firstTask;private Thread thread;public Worker(Runnable task) {this.firstTask = task;this.thread = threadFactory.newThread(this);}// 省略}

3.2核心线程与非核心线程逻辑

3.2.1.编写getTask方法

getTask方法会根据线程池情况动态从任务队列中获取任务

    /*** @description 从等待队列中获取任务* @return Runnable 待执行的任务,没有获取到会返回null* @date 2024/04/02 10:46:37*/
public Runnable getTask(){//我们使用一个变量来记录上次循环获取任务是否超时boolean preIsTimeOut = false;// 内部使用一个while循环,线程会一直尝试获取任务,直到成功获取到任务或者满足退出条件while(true){// 获取线程池当前线程数量int wc = threadTotalNums.get();// 1.是否要进行核心线程回收操作,当allowCoreThreadTimeOut为true,或者当前线程池数大于核心线程数时,我们需要进行回收判断boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 2.根据情况动态调整线程数,以下情况需要直接返回null(返回null就会回收线程):// (1)当前线程大于最大线程数(就是超过规定大小了),且任务队列为空且存在工作线程// (2)timed为true,上次任务超时了(preIsTimeOut = true),且任务队列为空且存在工作if ( (wc > maximumPoolSize || (timed && preIsTimeOut)) && (wc > 1 || workQueue.isEmpty()) ) {return null;}// 3.根据timed这个条件来选择是超时堵塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;// 获取任务超时了,将preIsTimeOut设为true,下次可以执行回收preIsTimeOut = true;}}
  •  timed 变量决定了线程从等待队列中拿取任务的方式,如果当前线程数大于最大核心线程数,或者开启了允许核心线程回收(allowCoreThreadTimeOut = true),我们就超时拿取,这样如果拿取任务超时就会返回null,线程就会被回收
3.2.2.调整Worker工作线程的run方法

将原来直接从任务队列中获取任务改为通过getTask方法获取

 @Overridepublic void run() {log.info("工作线程====》工作线程{}开始运行", Thread.currentThread());// 1。首先消费当前任务,消费完再去任务队列取,while循环实现线程复用while(firstTask != null || (firstTask = getTask()) != null){try {firstTask.run();}catch (Exception e){throw new RuntimeException(e);}finally {// 执行完后清除任务firstTask = null;}}// 2.跳出循环,说明取任务超过了最大等待时间,线程歇菜休息吧synchronized (workerSet){workerSet.remove(this);threadTotalNums.decrementAndGet(); //计数扣减}log.info("工作线程====》线程{}已被回收,当前线程数:{}", Thread.currentThread(), threadTotalNums.get());}
 3.2.3.编写addWorker方法
/*** @description 添加工作线程* @param firstTask 线程第一次执行的任务* @param isCore 是否为核心线程* @return Boolean 线程是否添加成功* @date 2024/04/02 10:42:43*/public Boolean addWorker(Runnable firstTask, Boolean isCore){if(firstTask == null) {throw new NullPointerException();}// TODO 1.我们在添加线程时,首先可以进行一些与线程池生命周期相关的校验,比如在一些状态下,不允许再添加任务// 2.根据当前线程池和isCore条件判断是否需要创建int wc = threadTotalNums.get();if (wc >= (isCore ? corePoolSize : maximumPoolSize))return false;// 3.创建线程,并添加到线程集合中Worker worker = new Worker(firstTask);Thread t = worker.thread;if(t != null){synchronized (workerSet){workerSet.add(worker);threadTotalNums.getAndIncrement();}t.start();return true;}return false;}
3.2.4.完善excute方法

流程如下:

1.如果当前线程数小于核心线程,直接创建核心线程去运行

2.线程数大于核心线程,我们就将任务加入等待队列

3.队列满了,尝试创建非核心线程,如果失败就触发拒绝策略

public void execute(Runnable task){if(task == null){throw new NullPointerException("传递的Runnable任务为Null");}// 1.如果当前线程数小于核心线程,直接创建线程去运行if(threadTotalNums.get() < corePoolSize){if(addWorker(task, true)) return;}// 2.线程数大于核心线程,我们就将任务加入等待队列if(workQueue.offer(task)){return;}// 3.队列满了,尝试创建非核心线程,如果失败就触发拒绝策略else if(!addWorker(task, false)){reject(task);}}

4.测试

编写如下测试代码,我们会创建一个核心线程数为2,最大线程数为5,等待队列长度为5的线程池,并添加15个任务到线程池中,按照预期会有五个任务触发拒绝策略,在任务执行完成后只保留两个核心线程

@Slf4j
public class MainTest {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(new WorkQueue<>(5), 2, 5,5L, TimeUnit.SECONDS,(queue, task) -> {log.info("拒绝策略====》拒绝策略触发,直接丢弃当前任务");}, new DefaultThreadFactory());threadPool.setAllowCoreThreadTimeOut(false); //不回收核心线程for (int i = 0; i < 15; i++) {threadPool.execute(() -> {System.out.println("执行任务------->当前执行线程为" + Thread.currentThread().toString());try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}});}// ExecutorService executorService = Executors.newFixedThreadPool(2);}
}

运行结果如下:

可以看到运行结果符合预期,任务也被正常消费 

我们设置AllowCoreThreadTimeOut的属性为true,再次进行测试,

threadPool.setAllowCoreThreadTimeOut(true); //回收核心线程

结果输出:

可以看到,核心线程也会被回收,符合预期。

5.总结

在本章节中我们通过学习JDK线程池源码中的部分代码,实现了一个简易版带有核心线程与非核心线程处理逻辑的线程池,我们可以通过指定AllowCoreThreadTimeOut属性来设置是否允许核心线程的回收,默认只会回收非核心线程。线程池的官方源码还是写得相当巧妙的,阅读难度也不高,推荐小伙伴学习~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/788440.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac上怎么合并多张图片?

Mac上怎么合并多张图片&#xff1f;上班过的小伙伴都应该知道&#xff0c;合并拼接图片是一件非常重要且经常需要使用到的图片处理技术&#xff0c;将多张图片合并拼成一张之后能够展现出更多的图片内容。在Mac电脑上&#xff0c;合并多张图片是一项常见的任务&#xff0c;无论…

【教程】Kotlin语言学习笔记(六)——泛型

写在前面&#xff1a; 如果文章对你有帮助&#xff0c;记得点赞关注加收藏一波&#xff0c;利于以后需要的时候复习&#xff0c;多谢支持&#xff01; 【Kotlin语言学习】系列文章 第一章 《认识Kotlin》 第二章 《数据类型》 第三章 《数据容器》 第四章 《方法》 第五章 《L…

关系型数据库设计

目录 1.数据库设计的重要性及定义 1.1 数据库设计的重要性 1.1.1 失败的数据库设计造成的后果 1.1.2 优秀的数据库设计带来的好处 1.2 数据库设计的定义 2.数据库需求分析 2.1 需求分析的步骤 2.1.1 收集信息 2.1.2 标识实体 2.1.3 标识每个实体的详细信息 2.1…

【HTML】制作一个简单的动态SVG图形

目录 前言 开始 HTML部分 CSS部分 效果图 总结 前言 无需多言&#xff0c;本文将详细介绍一段HTML和CSS代码&#xff0c;该代码用于创建一个动态的SVG图形&#xff0c;具体内容如下&#xff1a; 开始 首先新建文件夹&#xff0c;创建两个文本文档&#xff0c;其中HTML的文…

【2023】kafka原生以及配合springboot的使用(Kafka-3)

&#x1f4bb;目录 前言 一、依赖二、原生使用kafka1、发送消息1.1、生产者同步发送消息1.2、生产者异步发送消息1.3、常用配置&#xff1a; 2、接收消息2.1、关于消费者的自动提交和手动提交2.2、长轮训poll消息2.3、消费者的健康状态检查2.4、指定分区和偏移量&#xff0c;时…

使用docker-tc对host容器进行限流

docker-tc是一个github开源项目&#xff0c;项目地址是https://github.com/lukaszlach/docker-tc。 运行docker-tc docker run -d \ --name docker-tc \ --network host \ --cap-add NET_ADMIN \ --restart always \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var…

Transformer - model architecture

Transformer - model architecture flyfish Transformer总体架构可分为四个部分: 输⼊部分 输出部分 编码器部分 解码器部分 输入部分 输出部分 输⼊部分包含: 源嵌⼊层和位置编码 ⽬标嵌⼊层和位置编码 输出部分包含: 线性层 softmax处理器 左侧编码器部分和右侧解码器部…

微信小程序自定义弹窗组件

业务背景&#xff1a;弹窗有时字体较多&#xff0c;超过7个字&#xff0c;不适用wx.showToast. 组件代码 <view class"toast-box {{isShow? show:}}" animation"{{animationData}}"><view class"toast-content" ><view class&q…

Taro + vue3 小程序封装标题组件

分为没有跳转页面的title组件和 有跳转页面的title组件 我们可以把这个封装成一个组件 直接上代码 <template><div class"fixed-title-container"><div class"box"><div class"icon" v-if"isShow" click"…

【论文阅读】DETR 论文逐段精读

【论文阅读】DETR 论文逐段精读 文章目录 【论文阅读】DETR 论文逐段精读&#x1f4d6;DETR 论文精读【论文精读】&#x1f310;前言&#x1f4cb;摘要&#x1f4da;引言&#x1f9ec;相关工作&#x1f50d;方法&#x1f4a1;目标函数&#x1f4dc;模型结构⚙️代码 &#x1f4…

ubuntu-server部署hive-part4-部署hive

参照 https://blog.csdn.net/qq_41946216/article/details/134345137 操作系统版本&#xff1a;ubuntu-server-22.04.3 虚拟机&#xff1a;virtualbox7.0 部署hive 下载上传 下载地址 http://archive.apache.org/dist/hive/ apache-hive-3.1.3-bin.tar.gz 以root用户上传至…

多层PCB内部长啥样?

硬件工程师刚接触多层PCB的时候&#xff0c;很容易看晕。动辄十层八层的&#xff0c;线路像蜘蛛网一样。 画了几张多层PCB电路板内部结构图&#xff0c;用立体图形展示各种叠层结构的PCB图内部架构。 高密度互联板(HDI)的核心 在过孔 多层PCB的线路加工&#xff0c;和单层双…

Transformer - Positional Encoding 位置编码 代码实现

Transformer - Positional Encoding 位置编码 代码实现 flyfish import torch import torch.nn as nn import torch.nn.functional as F import os import mathclass PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len5000):super(PositionalEnco…

深度学习理论基础(六)注意力机制

目录 深度学习中的注意力机制&#xff08;Attention Mechanism&#xff09;是一种模仿人类视觉和认知系统的方法&#xff0c;它允许神经网络在处理输入数据时集中注意力于相关的部分。通过引入注意力机制&#xff0c;神经网络能够自动地学习并选择性地关注输入中的重要信息&…

http: server gave HTTP response to HTTPS client 分析一下这个问题如何解决中文告诉我详细的解决方案

这个错误信息表明 Docker 客户端在尝试通过 HTTPS 协议连接到 Docker 仓库时&#xff0c;但是服务器却返回了一个 HTTP 响应。这通常意味着 Docker 仓库没有正确配置为使用 HTTPS&#xff0c;或者客户端没有正确配置以信任仓库的 SSL 证书。以下是几种可能的解决方案&#xff1…

半导体制程离子注入注入的是哪些离子

离子注入是一种低温过程 通过该过程将一种元素的离子加速进入固体靶材&#xff0c;从而改变靶材的物理、化学或电学性质。离子注入用于半导体器件制造和金属精加工以及材料科学研究。如果离子停止并保留在目标中&#xff0c;则它们可以改变目标的元素成分&#xff08;如果离子…

6 个典型的Java 设计模式应用场景题

单例模式(Singleton) 场景: 在一个Web服务中,数据库连接池应当在整个应用生命周期中只创建一次,以减少资源消耗和提升性能。使用单例模式确保数据库连接池的唯一实例。 代码实现: import java.sql.Connection; import java.sql.SQLException;public class DatabaseConne…

上位机图像处理和嵌入式模块部署(qmacviusal边缘宽度测量)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面有一篇文章&#xff0c;我们了解了测量标定是怎么做的。即&#xff0c;我们需要提前知道测量的方向&#xff0c;灰度的方向&#xff0c;实际的…

“省钱有道”的太平鸟,如何真正“高飞”?

衣食住行产业中&#xff0c;服装品类消费弹性较大、可选属性较强&#xff0c;其发展可以显著反映当前的经济温度。 根据国家统计局数据&#xff0c;2023年1-12月&#xff0c;我国限额以上单位服装类商品零售额累计10352.9亿元&#xff0c;同比增长15.4%&#xff0c;增速比2022…

Python框架下的qt设计之JSON格式化转换小程序

JSON转换小程序 代码展示&#xff1a; 主程序代码&#xff1a; from PyQt6.QtWidgets import (QApplication, QDialog, QMessageBox )import sys import jsonclass MyJsonFormatter(jsonui.Ui_jsonFormatter,QDialog): # jsonui是我qt界面py文件名def __init__(self):super()…