JUC:手写实现一个简易的线程池(Java)

目录

​编辑

先上完整代码:

解析:

任务队列:

线程池类:

拒绝策略:


先上完整代码:

public class MyThreadPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {// 1.死等queue.put(task);// 2.带超时时间等待加入等待队列// queue.offer(task, 500, TimeUnit.MICROSECONDS);// 3.放弃任务// 队列满了,没做人任何事情// 4.抛出异常// throw new RuntimeException("任务执行失败" + task);// 5.让调用者自己执行// task.run();});for (int i = 0; i < 15; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j);});}}
}// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {void reject(BlockQueue queue, T task) ;
}
class ThreadPool {// 任务队列private BlockQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet();// 线程数private int coreSize;private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 构造方法public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockQueue<>(queueSize);this.rejectPolicy = rejectPolicy;}public void execute(Runnable task) {// 当任务数没有超过核心数时,直接交给woker对象执行// 如果超过,放入任务队列中存起来synchronized (workers) { // workers不安全,把他锁起来if (workers.size() < coreSize) {Worker worker = new Worker(task);System.out.println("新增worker");workers.add(worker); // 加入线程集合worker.start();} else {// taskQueue.put(task); // 任务添加进入// 1.死等// 2.带超时时间等待// 3.放弃任务// 4.抛出异常// 5.让调用者自己执行taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 当task任务不为空,执行// 当任务为空,去任务队列中去取//  while (task != null || (task = taskQueue.take()) != null) 一直等待获取while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {System.out.println("正在执行" + task);task.run();} catch (Exception e) {} finally {task = null;}}synchronized (workers) {System.out.println("worker被移除" + this);workers.remove(this); // 移除当前集合对象}}}
}// 阻塞队列
class BlockQueue<T> {// 任务队列private Deque<T> queue = new ArrayDeque<>();// 锁private ReentrantLock lock = new ReentrantLock();// 满了等待,生产者private Condition fullWaitSet = lock.newCondition();// 空的等待,消费者private Condition emptyWaitSet = lock.newCondition();// 容量private int capacity;public BlockQueue(int capacity) {this.capacity = capacity;}// 阻塞队列中获取任务public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await(); // 进入等待} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 阻塞队列中添加任务public void put(T t) {lock.lock();try {while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  t);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + t);queue.addLast(t);emptyWaitSet.signal(); // 唤醒}finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();}finally {lock.unlock(); // 就算return也会执行}}// 带超时时间的获取,无需永久的等待了public T poll (long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout); // 时间转换为nswhile (queue.isEmpty()) {try {if (nanos <= 0) return null; // 超时了,直接返回吧nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 带超时时间的添加, return 添加成功 or 失败public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  task);if (nanos <= 0) return false;nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + task);queue.addLast(task);emptyWaitSet.signal(); // 唤醒return true;}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否已满if (queue.size() == capacity) { // 有空闲rejectPolicy.reject(this, task); // 拒绝策略} else { // 有空闲queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}

解析:

任务队列:

// 阻塞队列
class BlockQueue<T> {// 任务队列private Deque<T> queue = new ArrayDeque<>();// 锁private ReentrantLock lock = new ReentrantLock();// 满了等待,生产者private Condition fullWaitSet = lock.newCondition();// 空的等待,消费者private Condition emptyWaitSet = lock.newCondition();// 容量private int capacity;public BlockQueue(int capacity) {this.capacity = capacity;}// 阻塞队列中获取任务public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await(); // 进入等待} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 阻塞队列中添加任务public void put(T t) {lock.lock();try {while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  t);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + t);queue.addLast(t);emptyWaitSet.signal(); // 唤醒}finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();}finally {lock.unlock(); // 就算return也会执行}}// 带超时时间的获取,无需永久的等待了public T poll (long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout); // 时间转换为nswhile (queue.isEmpty()) {try {if (nanos <= 0) return null; // 超时了,直接返回吧nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 带超时时间的添加, return 添加成功 or 失败public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  task);if (nanos <= 0) return false;nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + task);queue.addLast(task);emptyWaitSet.signal(); // 唤醒return true;}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否已满if (queue.size() == capacity) { // 有空闲rejectPolicy.reject(this, task); // 拒绝策略} else { // 有空闲queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}
  1.  ArrayDeque 作为底层数据结构存储队列元素。
  2.  ReentrantLock 实现了线程安全。
  3. Condition 来实现阻塞等待机制,当队列为空时,消费者线程等待;当队列满时,生产者线程等待。
  4. 常规的入队 put()、出队 take() 操作。
  5. 带有超时的入队 offer() 和出队 poll() 操作。
  6. tryPut() 方法,该方法接受一个 RejectPolicy 接口,用于指定当队列已满时的拒绝策略

方法:

  • take(): 当队列为空时,消费者线程调用该方法将进入等待状态,直到队列中有元素可取。
  • put(T t): 当队列已满时,生产者线程调用该方法将进入等待状态,直到队列有空位可添加元素。
  • poll(long timeout, TimeUnit unit): 带有超时的出队操作,当队列为空时,会等待一段时间,如果在指定时间内仍未有元素可取,则返回 null。
  • offer(T task, long timeout, TimeUnit timeUnit): 带有超时的入队操作,当队列已满时,会等待一段时间,如果在指定时间内仍未有空位可添加元素,则返回 false。
  • tryPut(RejectPolicy<T> rejectPolicy, T task): 尝试添加元素,当队列已满时,根据拒绝策略 RejectPolicy 进行处理。

单看其实就是一个生产者消费者模式而已。

线程池类:

class ThreadPool {// 任务队列private BlockQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet();// 线程数private int coreSize;private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 构造方法public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockQueue<>(queueSize);this.rejectPolicy = rejectPolicy;}public void execute(Runnable task) {// 当任务数没有超过核心数时,直接交给woker对象执行// 如果超过,放入任务队列中存起来synchronized (workers) { // workers不安全,把他锁起来if (workers.size() < coreSize) {Worker worker = new Worker(task);System.out.println("新增worker");workers.add(worker); // 加入线程集合worker.start();} else {// taskQueue.put(task); // 任务添加进入// 1.死等// 2.带超时时间等待// 3.放弃任务// 4.抛出异常// 5.让调用者自己执行taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 当task任务不为空,执行// 当任务为空,去任务队列中去取//  while (task != null || (task = taskQueue.take()) != null) 一直等待获取while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {System.out.println("正在执行" + task);task.run();} catch (Exception e) {} finally {task = null;}}synchronized (workers) {System.out.println("worker被移除" + this);workers.remove(this); // 移除当前集合对象}}}
}
  1. BlockQueue<Runnable> 来存储待执行的任务。
  2. HashSet<Worker> 来存储线程集合。
  3. 提供构造方法来初始化线程池的核心线程数、超时时间、任务队列大小和拒绝策略。
  4. execute(Runnable task) 方法来提交任务到线程池中执行。
  5. 内部定义了 Worker 内部类,用于执行任务的线程。

方法:

  • execute(Runnable task): 提交任务到线程池中执行。如果当前线程数小于核心线程数,则直接创建新的 Worker 线程执行任务;如果当前线程数已达到核心线程数,则尝试将任务放入任务队列中,根据拒绝策略 rejectPolicy 进行处理。
  • Worker: 内部类实现了线程执行任务的逻辑。在 run() 方法中,线程会不断从任务队列中取出任务执行,如果队列为空则会等待一段时间,超时时间由 timeouttimeUnit 决定。

拒绝策略:

函数式接口,由使用者提供实现。

// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {void reject(BlockQueue queue, T task) ;
}
```java
public class MyThreadPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {// 1.死等queue.put(task);// 2.带超时时间等待加入等待队列// queue.offer(task, 500, TimeUnit.MICROSECONDS);// 3.放弃任务// 队列满了,没做人任何事情// 4.抛出异常// throw new RuntimeException("任务执行失败" + task);// 5.让调用者自己执行// task.run();});for (int i = 0; i < 15; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j);});}}
}

几种拒绝策略实现:

  1. 死等(Blocking): 当任务队列已满时,线程池会一直等待直到有空位。这里使用了 queue.put(task),该方法会阻塞当前线程直到队列有空位可用。

  2. 带超时时间等待(Timeout Blocking): 当任务队列已满时,线程池会等待一段时间,如果在指定时间内仍未有空位可用,则放弃当前任务。这里使用了 queue.offer(task, 500, TimeUnit.MICROSECONDS),该方法会在指定时间内等待,如果超时则返回 false。

  3. 放弃任务(Discard): 当任务队列已满时,线程池会放弃当前任务,不做任何处理。

  4. 抛出异常(Throw Exception): 当任务队列已满时,线程池会抛出异常,通知调用者任务执行失败。

  5. 让调用者自己执行(Caller Runs): 当任务队列已满时,不在线程池内执行任务,而是由调用者自己执行任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/797780.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux进程状态深度解析:探索进程的生命周期

文章目录 一、引言1、进程的概念与重要性2、Linux系统下进程状态的意义3、进程状态与系统性能的关系 二、Linux下进程状态概述1、Linux进程状态的分类2、进程状态信息的获取方法 三、Linux下进程状态详解1、运行状态&#xff08;Running&#xff09;2、可中断睡眠状态&#xff…

27.WEB渗透测试-数据传输与加解密(1)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;26.WEB渗透测试-BurpSuite&#xff08;五&#xff09; BP抓包网站网址&#xff1a;http:…

实现Hello Qt 程序

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;QT❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、使用 "按钮" 实现 1、纯代码方式实现 2、可视化操作实现 &#xff08;1&#xff09…

【leetcode面试经典150题】15.分发糖果(C++)

【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主&#xff0c;题解使用C语言。&#xff08;若有使用其他语言的同学也可了解题解思路&#xff0c;本质上语法内容一致&…

Hive3.0.0建库表命令测试

Hive创建表格格式如下&#xff1a; create [external] table [if not exists] table_name [(col_name data_type [comment col_comment],)] [comment table_comment] [partitioned by(col_name data_type [comment col_comment],)] [clustered by (col_name,col_name,...)…

贪心算法Java实现

贪心算法Java实现 贪心算法介绍 贪心算法&#xff08;贪婪算法&#xff09;是一个遵循启发式解决问题的算法范式&#xff0c;核心思想是通过在每一步的选择中都选用当前步骤下最优的选择&#xff0c;期望结果是最优的算法。贪心算法得到的结果不一定是最优结果&#xff0c;但是…

如何使用亮数据的数据IP代理及数据工具采集市场情报

如何使用亮数据的数据IP代理及数据工具采集市场情报 亮数据为粉丝提供了10美金的抵用券&#xff0c;成功注册账户&#xff0c;并登录后在用户界面里输入折扣代码即可享受抵扣&#xff01; 折扣代码&#xff1a;zhouzhou 访问页面&#xff1a;https://www.bright.cn/proxy-types…

对抗样本攻击

对抗样本是指经过特殊设计或调整的输入数据&#xff0c;旨在欺骗人工智能模型&#xff0c;使其产生错误的预测或输出。对抗样本通常是通过对原始输入进行微小但精心计算的改变&#xff0c;使得模型产生意外的结果。这种模糊化的输入可能难以从人类角度甄别&#xff0c;但对机器…

Laravel 开发Api规范

一&#xff0c;修改时区 配置 config/app.php 文件 // 时区修改&#xff0c;感觉两者皆可&#xff0c;自己根据实际情况定义 timezone > PRC, // 大陆时间二&#xff0c;设置 Accept 头中间件 accept头即为客户端请求头&#xff0c;做成中间件来使用。Accept 决定了响应返…

gma 教程:计算标准化降水指数(SPI)

安装 gma&#xff1a;pip install gma &#xff08;依赖的 gdal 需自行安装&#xff09; 本文基于&#xff1a;gma 2.0.8&#xff0c;Python 3.10 本文用到数据请从 gma 网站获取&#xff1a;https://gma.luosgeo.com/UserGuide/climet/Index/SPI.html 。 SPEI 函数简介 gma.c…

c#编程基础学习之方法

目录 C#方法方法参数默认参数值多个参数返回值命名参数 方法重载 C#方法 实例 在程序类内创建一个方法&#xff1a; class Program {static void MyMethod() //static 静态意味着方法属于程序类&#xff0c;而不是程序类的对象。void 表示此方法没有返回值。MyMethod() 是方法…

比较720组结构数列的收敛过程

在行&#xff0c;列可自由变换的平面上3点结构只有6个 这次计算由这6个结构排列组合&#xff0c;构成的所有720个不重复数列的递推收敛过程。 结果表明&#xff0c;所有的数列都可以在有限步内收敛。 有461个数列在3-4-3的递推过程中是天然稳定的&#xff0c;收敛结果就是本身…

上海计算机学会 2024年3月月赛 丙组T1 最近的数字(数学)

第一题&#xff1a;T1最近的数字 标签&#xff1a;数学题意&#xff1a;给定两个正整数 n n n与 d d d&#xff0c;请找到所有最接近 n n n且是 d d d的倍数的整数。如果有多个数字满足要求&#xff0c;从小到大输出。数据范围&#xff1a; 1 ≤ n , d ≤ 1 , 000 , 000 , 000…

STM32学习和实践笔记(4):分析和理解GPIO_InitTypeDef GPIO_InitStructure (c)

第二个成员变量是GPIOSpeed_TypeDef GPIO_Speed&#xff1b;也与int a一样同理。 GPIOSpeed_TypeDef是一个枚举类型&#xff0c;其定义如下&#xff1a; typedef enum { GPIO_Speed_10MHz 1, GPIO_Speed_2MHz, GPIO_Speed_50MHz }GPIOSpeed_TypeDef; #define IS_GPI…

.NET Standard、.NET Framework 、.NET Core三者的关系与区别?

.NET Standard、.NET Framework 和 .NET Core 是 .NET 平台生态中的三个关键概念&#xff0c;它们之间存在明确的关系和显著的区别。下面分别阐述它们各自的角色以及相互间的关系&#xff1a; .NET Standard 角色&#xff1a; .NET Standard 是一套正式的 API 规范&#xff0c…

项目经理常用的工具模型有哪些?

项目经理常用的工具模型包括但不限于以下几种&#xff1a; 甘特图&#xff1a;这是一种将大型项目划分为几个阶段&#xff0c;并展示项目进度的工具。在甘特图中&#xff0c;可以清晰地看到每个任务的开始和结束时间&#xff0c;以及任务之间的依赖关系。 工作分解结构&#…

Leetcode刷题-哈希表详细总结(Java)

哈希表 当我们想使⽤哈希法来解决问题的时候&#xff0c;我们⼀般会选择如下三种数据结构。 数组set &#xff08;集合&#xff09;map&#xff08;映射&#xff09; 当我们遇到了要快速判断⼀个元素是否出现集合⾥的时候&#xff0c;就要考虑哈希法。如果在做⾯试题⽬的时候…

解决npm install报错npm ERR Unsupported URL Type “npm:“: npm:vue-loader@^16.1.0 问题

node版本以及npm版本太旧会造成这个问题 1.下载安装nvm 网址&#xff1a;Releases coreybutler/nvm-windows GitHub 2.安装 后使用nvm命令安装切换node版本 安装node版本&#xff1a; nvm install 12.22.12 等待安装完成&#xff0c;使用命令切换版本 nvm use 12.22.1…

Samba 总是需要输入网络凭证

输入网络凭证&#xff1a; 用户名是 cat /etc/samba/smb.conf&#xff0c;查看 valid users mxw 为用户名。而不是其他账号名或者用户名&#xff0c;更不是登录计算机时的计算机名&#xff1b; 密码是 需要记住安装samba服务器时&#xff0c;自己设置的password&#xff1…

LED发光模组的故障及解决方法

LED发光模组在应用过程中可能会出现各种故障&#xff0c;正确诊断并采取相应的解决方法至关重要&#xff0c;以下是一些常见故障现象及其解决方法的总结&#xff1a; 一、现象&#xff1a;所有的LED闪烁 问题&#xff1a;接触不良 解决方法&#xff1a;检查并重新固定松动处&am…