【Java】实现一个简单的线程池

  📝个人主页:哈__

期待您的关注 

一、线程池的模式

线程池顾名思义就是管理线程的一个池子,我们把创建线程的过程交给线程池来处理,而这个线程池当中的线程都会从阻塞队列当中取获取任务执行。

我们不在直接把任务的创建过程写到我们初始化的线程对象中,而是通过调用线程池的execute()方法,同时把我们的具体任务交作为参数传给线程池,之后线程池就会把任务添加到阻塞队列当中,而线程池当中的线程会从阻塞队列当中获取任务并执行。

二、线程池的一些参数 

  1. corePoolSize:线程池核心线程大小,即最小线程数(初始化线程数)。线程池会维护当前数量的线程在线程池中,即使这些线程一直处于闲置状态,也不会被销毁,除非设置了allowCoreThreadTimeOut。
  2. maximumPoolSize:线程池最大线程数量。当任务提交到线程池后,如果当前线程数小于核心线程数,则会创建新线程来处理任务;如果当前线程数大于或等于核心线程数,但小于最大线程数,并且任务队列已满,则会创建新线程来处理任务。
  3. keepAliveTime:空闲线程的存活时间。当线程池中的线程数量大于核心线程数且线程处于空闲状态时,在指定时间后,这个空闲线程将会被销毁,从而逐渐恢复到稳定的核心线程数数量。
  4. unit:keepAliveTime的存活时间的计量单位,通常使用TimeUnit枚举类中的方法,如TimeUnit.SECONDS表示秒级。
  5. workQueue:任务队列。用于存放等待执行的任务,常见的实现类有LinkedBlockingQueue、ArrayBlockingQueue等。
  6. threadFactory:线程工厂。用于创建新的线程,可以自定义线程的名称、优先级等。
  7. handler:拒绝策略。当任务无法执行(如线程池已满)时,可以选择的策略有:AbortPolicy(抛出异常)、CallerRunsPolicy(调用者运行)、DiscardOldestPolicy(丢弃最老的任务)、DiscardPolicy(无声丢弃)。

三、代码实现

因为我们只是简单的实现,所以有一些情况和实际不太相似。

1.BlockingQueue

先来看看我们阻塞队列当中的一些参数,为了在多线程环境下防止并发问题,我使用了ReentrantLock,使用它的目的是为了创建多个不同的阻塞条件。

在我们调用一个对象的await()方法后,我们的当前线程就会加入到一个特定的队列当中去等待,直到有调用了这个对象的notify()方法后才会从这个队列中抽取一个线程唤醒。

举个例子,我们去医院的时候,一个医生同一时间只能看一个病人,剩下的人都只能等待,如果只有一个大厅的话,看不同病的病人都只能等待在一个候诊室中。使用ReentrentLock的意思就是为了创建多个不同的候诊室,将不同医生的病人分开在不同的候诊室当中。

    //1.阻塞队列private Deque<T> deque = new ArrayDeque<>();//2.实现阻塞的锁private ReentrantLock lock = new ReentrantLock();//3. 生产者等待条件private Condition fullWaitSet = lock.newCondition();//4.消费者等待条件private Condition emptyWaitSet = lock.newCondition();//5.阻塞队列的大小private  int CAPACITY;

在自定义的阻塞队列中,我使用了一个双向队列来存储任务,并且设置了一个队列大小的属性,在我们创建这个队列的时候我们可以进行初始化。

先来看看阻塞队列任务的添加过程。这个逻辑并不难,我们在代码的上方上锁,在finally中解锁。如果这时我们的队列是满的,就无法在继续添加任务了,这个时候我们就把当前线程挂起(注意我们的挂起条件)。如果队列不是满的话那我们就加入到队尾,同时把另一类挂起的线程唤醒(这类线程在队列为空的时候挂起,等待任务的添加)

 // 生产者放入数据public void put(T t) {lock.lock();try {while (deque.size() == CAPACITY) {fullWaitSet.await();}deque.addLast(t);emptyWaitSet.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}

在看看我们取任务的过程。同样加锁,当我们的队列为空的时候,线程挂起,等待任务的添加之后线程唤醒,如果队列不为空的话,我们从队列头部取出一个任务,并且唤起一类线程(这类线程在任务已经满了的时候无法在添加任务了,进行挂起,等待队列不为满)

  // 消费者从线程池当中获取任务public T take(){T t = null;lock.lock();try {while(deque.size() == 0){emptyWaitSet.await();}t = deque.removeFirst();fullWaitSet.signal();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}return t;}

我们上边的代码展示的队列的存取的过程都是死等状态,什么是死等状态?就是任务添加不进去或者取不出来的时候,线程会被一直挂起。真实并不是如此,这里只是简单的展示。

阻塞队列需要的就是这两个存取的过程。

2.ThreadPool

先看看线程池当中的属性。把刚才创建的任务队列加进去,因为线程池要时常和任务队列沟通。然后创建了一个HashSet结构用于存储我们的线程。下边的都是我们线程池需要的一些参数了,拒绝策略在这里没有写。

    // 任务队列private BlockedQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();//核心线程数private int coreSize;// 超时时间private int timeout;// 超时单位private TimeUnit timeUnit;

来看看我们的线程池是如何工作的吧,可以看到我们线程池保存的是Worker对象,我们来看看这个Worker对象是干啥的。这个Worker对象实现了Runnable接口,我们可以把这个类当作线程类,这个类中有一个task属性,因为我们线程池当中的线程是要获取任务执行的,这个任务就用这个task属性代表。

这个Worker类一直在干一件事情,就是不断地从我们的任务队列当中获取任务(Worker类是ThreadPool的内部类),如果获取的任务不为空的话就执行任务,一旦没有任务可以执行那么就把当前的线程从线程池当中移除。

class Worker implements Runnable{private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {while(task!=null || (task = taskQueue.take())!=null){System.out.println("取出的任务是"+task);try {task.run();}catch (Exception e){e.printStackTrace();}finally {task = null;}synchronized (workers){workers.remove(this);}}}}

那什么时候用到这个Worker类呢?当我们调用ThreadPool中的execute()方法时,线程池中的线程会就调用这个run()方法。

来看我们的execute()方法。当我们的线程数小于我们的核心线程数的时候,我们可以直接创建一个新的线程,并且把我们的任务直接交给这个核心线程。反之我们不能创建,而是把任务添加到我们的任务队列当中,等待核心线程去执行这个任务。

 // 任务执行public void execute(Runnable task){synchronized (workers){if(workers.size() < coreSize){// 创建核心线程Worker worker = new Worker(task);workers.add(worker);Thread thread = new Thread(worker);thread.start();}else {taskQueue.put(task);}}}

写完了上边的代码我们测试一下。

 public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.MILLISECONDS,10);for(int i = 0;i<12;i++){int j = i;threadPool.execute(()->{System.out.println("当前线程"+Thread.currentThread().getName()+"task "+j+" is running");try {Thread.currentThread().sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});}}

方法运行了之后,即使任务全部执行,线程也不会结束。这是因为我们的worker类中的run方法调用了任务队列的take()方法,而take方法是会一直挂起的。

我们现在换一种带超时获取,在规定时间内获取不到任务就自动结束任务。这时候就用到我们传入的时间参数了,我们不再调用await()方法了,而是调用awaitNanos()方法,方法可以接收一个时间参数,这个方法可以消耗我们的nanos时间,在这个时间内如果获取不到的话线程就不在挂起了,这时还会进入到我们的while循环当中,判断我们的nanos是不是被消耗完了,如果被消耗完了就说明在规定时间内获取不到任务,直接return结束线程。

 // 带超时获取public T poll(int timeout,TimeUnit timeUnit){T t = null;lock.lock();try {long nanos = timeUnit.toNanos(timeout);while(deque.size() == 0){if(nanos <= 0){return null;}nanos = emptyWaitSet.awaitNanos(nanos);}t = deque.removeFirst();fullWaitSet.signal();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}return t;}

 修改Worker类。

 class Worker implements Runnable{private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){System.out.println("取出的任务是"+task);try {task.run();}catch (Exception e){e.printStackTrace();}finally {task = null;}synchronized (workers){workers.remove(this);}}}}

现在就可以正常结束了。

四、拒绝策略

全部代码如下。要使用拒绝策略,我们定义一个函数式接口,同时写一个参数传给线程池,参数的具体内容就是拒绝策略的拒绝方法,是我们自己定义的。

同时我们的execute()方法不在使用put来添加任务了,而是使用tryPut,如果大家对这一块感兴趣的话,可以在bilibili上观看黑马程序员的课程学习一下。

/*** 自定义线程池*/
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.SECONDS,10,((queue, task) -> {queue.put(task);}));for(int i = 0;i<12;i++){int j = i;threadPool.execute(()->{System.out.println("当前线程"+Thread.currentThread().getName()+"task "+j+" is running");try {Thread.currentThread().sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});}}
}/*** 拒绝策略*/
@FunctionalInterface
interface RejectPolicy<T>{void reject(BlockedQueue<T> queue,T task);
}
/*** 阻塞队列*/
class BlockedQueue <T>{//1.阻塞队列private Deque<T> deque = new ArrayDeque<>();//2.实现阻塞的锁private ReentrantLock lock = new ReentrantLock();//3. 生产者等待条件private Condition fullWaitSet = lock.newCondition();//4.消费者等待条件private Condition emptyWaitSet = lock.newCondition();//5.阻塞队列的大小private  int CAPACITY;public BlockedQueue(int queueCapacity) {this.CAPACITY = queueCapacity;}// 生产者放入数据public void put(T t) {lock.lock();try {while (deque.size() == CAPACITY) {fullWaitSet.await();}deque.addLast(t);emptyWaitSet.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}// 带超时添加public boolean offer(T t,int timeout,TimeUnit timeUnit) {lock.lock();long nanos = timeUnit.toNanos(timeout);try {while (deque.size() == CAPACITY) {if(nanos <= 0){return  false;}nanos = fullWaitSet.awaitNanos(nanos);}deque.addLast(t);emptyWaitSet.signal();return  true;} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}return true;}// 带超时获取public T poll(int timeout,TimeUnit timeUnit){T t = null;lock.lock();try {long nanos = timeUnit.toNanos(timeout);while(deque.size() == 0){if(nanos <= 0){return null;}nanos = emptyWaitSet.awaitNanos(nanos);}t = deque.removeFirst();fullWaitSet.signal();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}return t;}// 消费者从线程池当中获取任务public T take(){T t = null;lock.lock();try {while(deque.size() == 0){emptyWaitSet.await();}t = deque.removeFirst();fullWaitSet.signal();}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}return t;}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {if(deque.size()==CAPACITY){rejectPolicy.reject(this,task);}else{deque.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}
/*** 线程池*/
class ThreadPool{// 任务队列private BlockedQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();//核心线程数private int coreSize;// 超时时间private int timeout;// 超时单位private TimeUnit timeUnit;//拒绝策略private RejectPolicy<Runnable> rejectPolicy;// 任务执行public void execute(Runnable task){synchronized (workers){if(workers.size() < coreSize){// 创建核心线程Worker worker = new Worker(task);workers.add(worker);Thread thread = new Thread(worker);thread.start();}else {// 任务队列//taskQueue.offer(task,timeout,timeUnit);taskQueue.tryPut(rejectPolicy,task);//taskQueue.put(task);}}}public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockedQueue<>(queueCapacity);this.rejectPolicy = rejectPolicy;}class Worker implements Runnable{private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){System.out.println("取出的任务是"+task);try {task.run();}catch (Exception e){e.printStackTrace();}finally {task = null;}synchronized (workers){workers.remove(this);}}}}
}

这个代码我自己觉得是有些问题,因为如果我的任务队列大小有10的时候,我给出了13个任务,两个交给核心线程不占任务队列大小,另外10个任务正好占满,剩下一个放不进去,这时就会卡住不输出。---------未解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/1330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NLP预训练模型-GPT-3

ChatGPT GPT-3是OpenAI开发的一个自然语言处理&#xff08;NLP&#xff09;预训练模型。GPT代表“生成式预训练变换器”&#xff08;Generative Pretrained Transformer&#xff09;。GPT-3是GPT系列的第三代模型&#xff0c;是一种采用了深度学习技术的强大语言模型&#xff…

mapreduce中的ReduceTask工作机制(Hadoop)

ReduceTask 是 Hadoop 中的一个重要组件&#xff0c;负责对 MapTask 的输出进行合并、排序和归并&#xff0c;最终生成最终的输出结果。 ReduceTask 的工作机制 1. 分组&#xff08;Shuffle&#xff09;阶段&#xff1a; 在分组阶段&#xff0c;ReduceTask 会从多个 Mapper …

详解 C++ 实现K-means算法

一、K-means算法概述 K-means算法是一种非常经典的聚类算法,其主要目的是将数据点划分为K个集群,以使得每个数据点与其所属集群的中心点(质心)的平方距离之和最小。这种算法在数据挖掘、图像处理、模式识别等领域有着广泛的应用。 二、K-means算法的基本原理 K-means算法…

【Spring Boot】掌握Spring Boot:深入解析配置文件的使用与管理

&#x1f493; 博客主页&#xff1a;从零开始的-CodeNinja之路 ⏩ 收录文章&#xff1a;【Spring Boot】掌握Spring Boot&#xff1a;深入解析配置文件的使用与管理 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 Spring Boot 配置文件一. 配置文…

基于SpringBoot+Vue的幼儿园管理系统 免费获取源码

项目源码获取方式放在文章末尾处 项目技术 数据库&#xff1a;Mysql5.7/8.0 数据表&#xff1a;19张 开发语言&#xff1a;Java(jdk1.8) 开发工具&#xff1a;idea 前端技术&#xff1a;vue 后端技术&#xff1a;SpringBoot 功能简介 (有文档) 项目获取关键字&#…

Vue实现多角色登录,Vue-Router路由守卫控制权限页面

实现页面侧边栏和头部不变&#xff0c;当点击某个功能时&#xff0c;只有主体部分发生变化&#xff0c;这要用到子路由技术 我的项目结构如上&#xff0c;其中包含侧边栏和头部的文件是Manage.vue&#xff0c;主页面是Home.vue&#xff0c;个人页面是Person.vue&#xff0c;用户…

固态硬盘数据都不能恢复吗?

固态硬盘的数据在某些特定情况下是可以被成功恢复的。 尽管固态硬盘的工作原理与机械硬盘不同&#xff0c;数据恢复方面相对困难&#xff0c;但并不意味着所有情况下都无法恢复数据。 例如&#xff0c;当固态硬盘发生逻辑损坏时&#xff0c;数据恢复的几率会相对较高。此外&am…

回归预测 | Matlab实现DBO-HKELM蜣螂算法优化混合核极限学习机多变量回归预测

回归预测 | Matlab实现DBO-HKELM蜣螂算法优化混合核极限学习机多变量回归预测 目录 回归预测 | Matlab实现DBO-HKELM蜣螂算法优化混合核极限学习机多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现DBO-HKELM蜣螂算法优化混合核极限学习机多变量…

【GIS面试】GIS算法介绍

作者&#xff1a;后端小肥肠 1. 前言 在地理信息系统&#xff08;GIS&#xff09;的领域中&#xff0c;算法扮演着极其重要的角色&#xff0c;它们使得复杂的空间数据分析成为可能。无论是在环境科学、城市规划&#xff0c;还是在灾害管理等众多领域&#xff0c;高效和精确的算…

Zabbix监控Oracle归档日志空间

1、oracle查看归档日志空间的sql语句 select sum(PERCENT_SPACE_USED) from v$recovery_area_usage; 2、交互式查看oracle归档日志空间的命令&#xff0c;可以手动执行一下&#xff0c;注意要用oracle用户 sqlplus -S "/ as sysdba" << EOF select sum(PER…

解决“ImportError: DLL load failed while importing _rust: 找不到指定的程序的问题

运行 scrapy startproject wikiSpider 报错&#xff1a;ImportError: DLL load failed while importing _rust: 找不到指定的程序。 经过尝试 可以更换Python解释器版本来解决 1、点击crtlalts打开设置 点击项目>解释器 选择3.11解释器 &#xff08;我原来报错用的3.9的解…

企业车辆违章查询工具,批量查询企业名下车辆违章情况,专为网约车/出租车管理公司而生

功能介绍 功能分为&#xff1a;违章管理、车辆管理、任务管理 违章管理如图&#xff1a; 搜索条件为车牌号 筛选条件为&#xff1a;时间区间、企业选择、是否处理违章、是否缴纳罚款、所属车管员 车牌管理如图&#xff1a; 可以新增车牌 查询条件为&#xff1a;车牌信息、车…

【备战算法岗】—— 控制模块复习(持续更新!!!)

1 控制理论基础 1.1 控制模块概述 输入&#xff1a;轨迹线Reference、地图信息、定位信息、车辆反馈信息 输出&#xff1a;刹车、油门、转向 CANBUS&#xff1a;车辆底盘交互协议 参考博客&#xff1a;Apollo CANBUS模块解析 apollo&#xff1a;canbus模块&#xff08;1&…

如何完成三只青蛙任务?

如何完成三只青蛙任务&#xff1f; 本文介绍了如何有效完成 三只青蛙任务&#xff0c;包括匹配资源、保护青蛙和拒绝干扰事项。 同时&#xff0c;对于习惯缺乏动力的问题&#xff0c;建议考虑是否有必要去做这个习惯&#xff0c;或者寻找其他激励方法。 大家在践行过程中可能没…

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之二 简单人脸检测添加戴眼镜效果

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之二 简单人脸检测添加戴眼镜效果 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之二 简单人脸检测添加戴眼镜效果 一、简单介绍 二、简单人脸检测添加戴眼镜效…

《机器学习by周志华》学习笔记-线性模型-02

1、对数几率回归 1.1、背景 上一节我们考虑了线性模型的回归学习,但是想要做分类任务就需要用到上文中的广义线性模型。 当联系函数连续且充分光滑,考虑单调可微函数,令: 1.2、概念 找一个单调可谓函数,将分类任务的真实标记与线性回归模型的预测值联系起来,也叫做「…

Kafka集群搭建可视化指南

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 Kafka集群搭建可视化指南 前言准备工作硬件要求环境准备 kafka集群的部署与配置3.1 单节点部署与多节点集群搭建单节点部署&#xff1a;多节点集群搭建&#xff1a; 3.2 Broker配置与优化3.3 Topic的创…

政安晨:【Keras机器学习示例演绎】(七)—— 利用 NeRF 进行 3D 体积渲染

目录 简介 设置 下载并加载数据 NeRF 模型 训练 可视化训练步骤 推理 渲染三维场景 可视化视频 结论 政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: TensorFlow与Keras机器学习实战 希望政安晨的博客能够对您有所裨益&#xff0…

open Gauss 数据库-05 openGauss数据库备份恢复指导手册

发文章是为了证明自己真的掌握了一个知识&#xff0c;同时给他人带来帮助&#xff0c;如有问题&#xff0c;欢迎指正&#xff0c;祝大家万事胜意&#xff01; 目录 前言 openGauss数据库备份恢复 1 实验介绍 1.1 关于本实验 1.2 实验目的 2 实验前提 3 物理备份和恢复…

「GO基础」在Windows上配置VS Code GO语言开发环境

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…