JUC:手写实现一个简易的线程池(Java)

目录

​编辑

先上完整代码:

解析:

任务队列:

线程池类:

拒绝策略:


先上完整代码:

public class MyThreadPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {// 1.死等queue.put(task);// 2.带超时时间等待加入等待队列// queue.offer(task, 500, TimeUnit.MICROSECONDS);// 3.放弃任务// 队列满了,没做人任何事情// 4.抛出异常// throw new RuntimeException("任务执行失败" + task);// 5.让调用者自己执行// task.run();});for (int i = 0; i < 15; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j);});}}
}// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {void reject(BlockQueue queue, T task) ;
}
class ThreadPool {// 任务队列private BlockQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet();// 线程数private int coreSize;private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 构造方法public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockQueue<>(queueSize);this.rejectPolicy = rejectPolicy;}public void execute(Runnable task) {// 当任务数没有超过核心数时,直接交给woker对象执行// 如果超过,放入任务队列中存起来synchronized (workers) { // workers不安全,把他锁起来if (workers.size() < coreSize) {Worker worker = new Worker(task);System.out.println("新增worker");workers.add(worker); // 加入线程集合worker.start();} else {// taskQueue.put(task); // 任务添加进入// 1.死等// 2.带超时时间等待// 3.放弃任务// 4.抛出异常// 5.让调用者自己执行taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 当task任务不为空,执行// 当任务为空,去任务队列中去取//  while (task != null || (task = taskQueue.take()) != null) 一直等待获取while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {System.out.println("正在执行" + task);task.run();} catch (Exception e) {} finally {task = null;}}synchronized (workers) {System.out.println("worker被移除" + this);workers.remove(this); // 移除当前集合对象}}}
}// 阻塞队列
class BlockQueue<T> {// 任务队列private Deque<T> queue = new ArrayDeque<>();// 锁private ReentrantLock lock = new ReentrantLock();// 满了等待,生产者private Condition fullWaitSet = lock.newCondition();// 空的等待,消费者private Condition emptyWaitSet = lock.newCondition();// 容量private int capacity;public BlockQueue(int capacity) {this.capacity = capacity;}// 阻塞队列中获取任务public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await(); // 进入等待} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 阻塞队列中添加任务public void put(T t) {lock.lock();try {while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  t);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + t);queue.addLast(t);emptyWaitSet.signal(); // 唤醒}finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();}finally {lock.unlock(); // 就算return也会执行}}// 带超时时间的获取,无需永久的等待了public T poll (long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout); // 时间转换为nswhile (queue.isEmpty()) {try {if (nanos <= 0) return null; // 超时了,直接返回吧nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 带超时时间的添加, return 添加成功 or 失败public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  task);if (nanos <= 0) return false;nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + task);queue.addLast(task);emptyWaitSet.signal(); // 唤醒return true;}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否已满if (queue.size() == capacity) { // 有空闲rejectPolicy.reject(this, task); // 拒绝策略} else { // 有空闲queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}

解析:

任务队列:

// 阻塞队列
class BlockQueue<T> {// 任务队列private Deque<T> queue = new ArrayDeque<>();// 锁private ReentrantLock lock = new ReentrantLock();// 满了等待,生产者private Condition fullWaitSet = lock.newCondition();// 空的等待,消费者private Condition emptyWaitSet = lock.newCondition();// 容量private int capacity;public BlockQueue(int capacity) {this.capacity = capacity;}// 阻塞队列中获取任务public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await(); // 进入等待} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 阻塞队列中添加任务public void put(T t) {lock.lock();try {while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  t);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + t);queue.addLast(t);emptyWaitSet.signal(); // 唤醒}finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();}finally {lock.unlock(); // 就算return也会执行}}// 带超时时间的获取,无需永久的等待了public T poll (long timeout, TimeUnit unit) {lock.lock();try {long nanos = unit.toNanos(timeout); // 时间转换为nswhile (queue.isEmpty()) {try {if (nanos <= 0) return null; // 超时了,直接返回吧nanos = emptyWaitSet.awaitNanos(nanos);// 进入等待,超时不再等待,返回结果为剩余等待时间} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal(); // 唤醒return t;} finally {lock.unlock();}}// 带超时时间的添加, return 添加成功 or 失败public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capacity) { // 如果满了,进入等待try {System.out.println("等待加入任务队列" +  task);if (nanos <= 0) return false;nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("加入任务队列" + task);queue.addLast(task);emptyWaitSet.signal(); // 唤醒return true;}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否已满if (queue.size() == capacity) { // 有空闲rejectPolicy.reject(this, task); // 拒绝策略} else { // 有空闲queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}
  1.  ArrayDeque 作为底层数据结构存储队列元素。
  2.  ReentrantLock 实现了线程安全。
  3. Condition 来实现阻塞等待机制,当队列为空时,消费者线程等待;当队列满时,生产者线程等待。
  4. 常规的入队 put()、出队 take() 操作。
  5. 带有超时的入队 offer() 和出队 poll() 操作。
  6. tryPut() 方法,该方法接受一个 RejectPolicy 接口,用于指定当队列已满时的拒绝策略

方法:

  • take(): 当队列为空时,消费者线程调用该方法将进入等待状态,直到队列中有元素可取。
  • put(T t): 当队列已满时,生产者线程调用该方法将进入等待状态,直到队列有空位可添加元素。
  • poll(long timeout, TimeUnit unit): 带有超时的出队操作,当队列为空时,会等待一段时间,如果在指定时间内仍未有元素可取,则返回 null。
  • offer(T task, long timeout, TimeUnit timeUnit): 带有超时的入队操作,当队列已满时,会等待一段时间,如果在指定时间内仍未有空位可添加元素,则返回 false。
  • tryPut(RejectPolicy<T> rejectPolicy, T task): 尝试添加元素,当队列已满时,根据拒绝策略 RejectPolicy 进行处理。

单看其实就是一个生产者消费者模式而已。

线程池类:

class ThreadPool {// 任务队列private BlockQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet();// 线程数private int coreSize;private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 构造方法public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockQueue<>(queueSize);this.rejectPolicy = rejectPolicy;}public void execute(Runnable task) {// 当任务数没有超过核心数时,直接交给woker对象执行// 如果超过,放入任务队列中存起来synchronized (workers) { // workers不安全,把他锁起来if (workers.size() < coreSize) {Worker worker = new Worker(task);System.out.println("新增worker");workers.add(worker); // 加入线程集合worker.start();} else {// taskQueue.put(task); // 任务添加进入// 1.死等// 2.带超时时间等待// 3.放弃任务// 4.抛出异常// 5.让调用者自己执行taskQueue.tryPut(rejectPolicy, task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 当task任务不为空,执行// 当任务为空,去任务队列中去取//  while (task != null || (task = taskQueue.take()) != null) 一直等待获取while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {System.out.println("正在执行" + task);task.run();} catch (Exception e) {} finally {task = null;}}synchronized (workers) {System.out.println("worker被移除" + this);workers.remove(this); // 移除当前集合对象}}}
}
  1. BlockQueue<Runnable> 来存储待执行的任务。
  2. HashSet<Worker> 来存储线程集合。
  3. 提供构造方法来初始化线程池的核心线程数、超时时间、任务队列大小和拒绝策略。
  4. execute(Runnable task) 方法来提交任务到线程池中执行。
  5. 内部定义了 Worker 内部类,用于执行任务的线程。

方法:

  • execute(Runnable task): 提交任务到线程池中执行。如果当前线程数小于核心线程数,则直接创建新的 Worker 线程执行任务;如果当前线程数已达到核心线程数,则尝试将任务放入任务队列中,根据拒绝策略 rejectPolicy 进行处理。
  • Worker: 内部类实现了线程执行任务的逻辑。在 run() 方法中,线程会不断从任务队列中取出任务执行,如果队列为空则会等待一段时间,超时时间由 timeouttimeUnit 决定。

拒绝策略:

函数式接口,由使用者提供实现。

// 拒接策略
@FunctionalInterface
interface RejectPolicy<T> {void reject(BlockQueue queue, T task) ;
}
```java
public class MyThreadPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000,TimeUnit.MICROSECONDS, 10, (queue, task) -> {// 1.死等queue.put(task);// 2.带超时时间等待加入等待队列// queue.offer(task, 500, TimeUnit.MICROSECONDS);// 3.放弃任务// 队列满了,没做人任何事情// 4.抛出异常// throw new RuntimeException("任务执行失败" + task);// 5.让调用者自己执行// task.run();});for (int i = 0; i < 15; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j);});}}
}

几种拒绝策略实现:

  1. 死等(Blocking): 当任务队列已满时,线程池会一直等待直到有空位。这里使用了 queue.put(task),该方法会阻塞当前线程直到队列有空位可用。

  2. 带超时时间等待(Timeout Blocking): 当任务队列已满时,线程池会等待一段时间,如果在指定时间内仍未有空位可用,则放弃当前任务。这里使用了 queue.offer(task, 500, TimeUnit.MICROSECONDS),该方法会在指定时间内等待,如果超时则返回 false。

  3. 放弃任务(Discard): 当任务队列已满时,线程池会放弃当前任务,不做任何处理。

  4. 抛出异常(Throw Exception): 当任务队列已满时,线程池会抛出异常,通知调用者任务执行失败。

  5. 让调用者自己执行(Caller Runs): 当任务队列已满时,不在线程池内执行任务,而是由调用者自己执行任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/797780.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux进程状态深度解析:探索进程的生命周期

文章目录 一、引言1、进程的概念与重要性2、Linux系统下进程状态的意义3、进程状态与系统性能的关系 二、Linux下进程状态概述1、Linux进程状态的分类2、进程状态信息的获取方法 三、Linux下进程状态详解1、运行状态&#xff08;Running&#xff09;2、可中断睡眠状态&#xff…

27.WEB渗透测试-数据传输与加解密(1)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;26.WEB渗透测试-BurpSuite&#xff08;五&#xff09; BP抓包网站网址&#xff1a;http:…

实现Hello Qt 程序

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;QT❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、使用 "按钮" 实现 1、纯代码方式实现 2、可视化操作实现 &#xff08;1&#xff09…

Hive3.0.0建库表命令测试

Hive创建表格格式如下&#xff1a; create [external] table [if not exists] table_name [(col_name data_type [comment col_comment],)] [comment table_comment] [partitioned by(col_name data_type [comment col_comment],)] [clustered by (col_name,col_name,...)…

对抗样本攻击

对抗样本是指经过特殊设计或调整的输入数据&#xff0c;旨在欺骗人工智能模型&#xff0c;使其产生错误的预测或输出。对抗样本通常是通过对原始输入进行微小但精心计算的改变&#xff0c;使得模型产生意外的结果。这种模糊化的输入可能难以从人类角度甄别&#xff0c;但对机器…

gma 教程:计算标准化降水指数(SPI)

安装 gma&#xff1a;pip install gma &#xff08;依赖的 gdal 需自行安装&#xff09; 本文基于&#xff1a;gma 2.0.8&#xff0c;Python 3.10 本文用到数据请从 gma 网站获取&#xff1a;https://gma.luosgeo.com/UserGuide/climet/Index/SPI.html 。 SPEI 函数简介 gma.c…

比较720组结构数列的收敛过程

在行&#xff0c;列可自由变换的平面上3点结构只有6个 这次计算由这6个结构排列组合&#xff0c;构成的所有720个不重复数列的递推收敛过程。 结果表明&#xff0c;所有的数列都可以在有限步内收敛。 有461个数列在3-4-3的递推过程中是天然稳定的&#xff0c;收敛结果就是本身…

STM32学习和实践笔记(4):分析和理解GPIO_InitTypeDef GPIO_InitStructure (c)

第二个成员变量是GPIOSpeed_TypeDef GPIO_Speed&#xff1b;也与int a一样同理。 GPIOSpeed_TypeDef是一个枚举类型&#xff0c;其定义如下&#xff1a; typedef enum { GPIO_Speed_10MHz 1, GPIO_Speed_2MHz, GPIO_Speed_50MHz }GPIOSpeed_TypeDef; #define IS_GPI…

Leetcode刷题-哈希表详细总结(Java)

哈希表 当我们想使⽤哈希法来解决问题的时候&#xff0c;我们⼀般会选择如下三种数据结构。 数组set &#xff08;集合&#xff09;map&#xff08;映射&#xff09; 当我们遇到了要快速判断⼀个元素是否出现集合⾥的时候&#xff0c;就要考虑哈希法。如果在做⾯试题⽬的时候…

Samba 总是需要输入网络凭证

输入网络凭证&#xff1a; 用户名是 cat /etc/samba/smb.conf&#xff0c;查看 valid users mxw 为用户名。而不是其他账号名或者用户名&#xff0c;更不是登录计算机时的计算机名&#xff1b; 密码是 需要记住安装samba服务器时&#xff0c;自己设置的password&#xff1…

LED发光模组的故障及解决方法

LED发光模组在应用过程中可能会出现各种故障&#xff0c;正确诊断并采取相应的解决方法至关重要&#xff0c;以下是一些常见故障现象及其解决方法的总结&#xff1a; 一、现象&#xff1a;所有的LED闪烁 问题&#xff1a;接触不良 解决方法&#xff1a;检查并重新固定松动处&am…

单片机为什么还在用C语言编程?

单片机产品的成本是非常敏感的。因此对于单片机开发来说&#xff0c;最重要的是在极其有限的ROM和RAM中实现最多产品的功能。或者反过来说&#xff0c;实现相同的产品功能&#xff0c;所需要的ROM和RAM越小越好&#xff0c;在开始前我有一些资料&#xff0c;是我根据网友给的问…

linux网络预备

网络预备 网络协议初识 协议分层 打电话例子 在这个例子中, 我们的协议只有两层; 但是实际的网络通信会更加复杂, 需要分更多的层次。 分层最大的好处在于 “封装” 。 OSI七层模型 OSI&#xff08;Open System Interconnection&#xff0c;开放系统互连&#xff09;七层网…

Datacom HCIP笔记-路由策略与路由控制 之二

路由策略和策略的区别&#xff1f; 路由策略&#xff1a; 操作的对象是路由表条目&#xff0c; 实现路由过滤&#xff0c;从而实现访问控制&#xff0c;引入时过滤&#xff0c;发送和接收路由时过滤。 通过配置cost&#xff0c;来实现路径的控制。 策略路由&#xff1a; 对…

【Vue3源码学习】— CH2.8 Vue 3 响应式系统小结

Vue 3 响应式系统小结 1.核心概念1.1 Proxy和Reflect1.2 响应式API1.3 依赖收集与更新触发1.4 触发更新&#xff08;Triggering Updates&#xff09;&#xff1a;1.5 副作用函数&#xff08;Effect&#xff09;1.6 计算属性和观察者1.7 EffectScope1.8 性能优化&#xff1a; 2.…

GPT-5将在6月发布前进行「红队进攻测试」

“GPT-5将在6月发布”的消息刷屏了AI朋友圈。这则消息之所以被无数人相信并转发&#xff0c;是因为已经有不少技术人员在社交平台上晒出了「红队进攻测试」邀请。 基于 GPT系列庞大的用户体量和影响力&#xff0c;OpenAI 将更加重视GPT-5 的安全性&#xff0c;作为GPT-5上市前的…

【编译原理】手工打造语法分析器

重点&#xff1a; 语法分析的原理递归下降算法&#xff08;Recursive Descent Parsing&#xff09;上下文无关文法&#xff08;Context-free Grammar&#xff0c;CFG&#xff09; 关键点&#xff1a; 左递归问题深度遍历求值 - 后续遍历 上一篇「词法分析器」将字符串拆分为…

水经微图网页版309项功能清单

让每一个人都有自己的地图&#xff01; 水经微图&#xff08;简称“微图”&#xff09;网页版&#xff0c;是越来越受到大家的亲睐了&#xff01; 就后台统计数据来看&#xff0c;日均IP数据在稳步增长&#xff0c;老访客的占比最高达35%以上。 在上上周&#xff0c;还分别有…

PMP考试费太贵,能不能自学?

最近6月PMP考试报名工作正在如火如荼的进行&#xff0c;可能大家对于考试费用已经有了基本了解&#xff0c;今天给大家分享一下PMP证书考下来需要花费多少&#xff1f;能不能自己学习&#xff1f; PMP是什么&#xff1f; PMP项目管理专业人士资格认证&#xff0c;由项目管理协…

uniapp - 微信小程序 - 使用uCharts的一些问题

文章目录 uniapp - 微信小程序 - 使用uCharts的一些问题一、开发者工具显示正常&#xff0c;真机调试统计图不随页面滚动二、数据过多开启滚动条&#xff0c;无法滑动滚动条三、饼图点击不显示提示窗/点击位置bug、多个同类型统计图点击不显示提示框问题四、 formatter 自定义 …