Java JUC(四) 自定义线程池实现与原理分析

目录

一. 阻塞队列 BlockingQue

二. 拒绝策略 RejectPolicy

三. 线程池 ThreadPool 

四. 模拟运行


在 Java基础(二) 多线程编程 中,我们简单介绍了线程池 ThreadPoolExecutor 的核心概念与基本使用。在本文中,我们将基于前面学习的各种锁与同步工具来实现自定义的线程池,同时来探究和分析 Java 线程池的基本原理。

一. 阻塞队列 BlockingQue

在线程池的生态中,阻塞队列是至关重要的一环,其用于实现任务与工作线程之间的平衡(类似于生产者/消费者模式)。 在此处,我们实现了一个自定义的阻塞队列 BlockingQue,其代码如下:

// 阻塞队列实现
public class BlockingQue<T> {// 1. 任务队列private Deque<T> queue;// 2. 锁private ReentrantLock lock;// 3. 生产者条件变量private Condition fullWaitSet;// 4. 消费者条件变量private Condition emptyWaitSet;// 5. 容量private int capacity;public BlockingQue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.capacity = capacity;// ArrayDeque: 基于 Object[] 实现,可以自动扩容this.queue = new ArrayDeque<>();this.lock = new ReentrantLock(fair);// 读写共用一把锁this.fullWaitSet = lock.newCondition();this.emptyWaitSet = lock.newCondition();}public BlockingQue(int capacity) {this(capacity, false);}// 阻塞添加public void put(T element) throws InterruptedException {lock.lock();try {while (queue.size() == capacity) {fullWaitSet.await();}queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();} finally {lock.unlock();}}// 非阻塞添加public boolean offer(T element) {lock.lock();try {if (queue.size() == capacity)return false;queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();return  true;} finally {lock.unlock();}}// 超时阻塞添加public boolean offer(T element, long timeout, TimeUnit unit) throws InterruptedException {lock.lock();try {long nanos = unit.toNanos(timeout);while (queue.size() == capacity) {// 已经超时则返回 falseif (nanos <= 0)return false;nanos = fullWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)}queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();return true;}finally {lock.unlock();}}// 阻塞获取public T take() throws InterruptedException {lock.lock();try {while (queue.isEmpty()) {emptyWaitSet.await();}T element = queue.removeFirst();// 唤醒生产线程fullWaitSet.signal();return element;} finally {lock.unlock();}}// 超时阻塞获取public T poll(long timeout, TimeUnit unit) throws InterruptedException {lock.lock();try {// 将 timeout 统一转换为纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {// 已经超时则返回 nullif(nanos <= 0){return null;}nanos = emptyWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)}T element = queue.removeFirst();// 唤醒生产线程fullWaitSet.signal();return element;}finally {lock.unlock();}}//获取大小public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}
}

可以看出,上述代码使用了 Deque 作为元素存储容器,但若将 Deque 换成 Object[] 数组,则其基本就是 ArrayBlockingQueue 的实现源码。在实际工作中,若要实现自定义阻塞队列,我们只需要实现 BlockingQueue<E> 接口及其抽象方法即可。

package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;public interface BlockingQueue<E> extends Queue<E> {boolean add(E e);boolean offer(E e);void put(E e) throws InterruptedException;boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit) throws InterruptedException;int remainingCapacity();boolean remove(Object o);public boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}

二. 拒绝策略 RejectPolicy

在线程数量已满且阻塞队列已满的情况下,主线程则会因为无法放置任务而一直阻塞等待,因此我们需要拒绝策略来处理这种溢出情况。拒绝策略一般定义为接口,并允许我们自定义策略,其代码如下:

// 拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {void reject(BlockingQue<T> queue, T task);
}

一般接口方法需要提供阻塞队列以及当前任务两个参数,并支持函数式编程;常见的拒绝策略包括:阻塞等待、放弃执行、抛出异常、由调用线程执行等(后续会实现)。在实际工作中,Java已经为我们提供了拒绝策略的顶层设计,若想自定义拒绝策略,我们只需实现 RejectedExecutionHandler 接口并实现其 rejectedExecution 抽象方法即可。

package java.util.concurrent;public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

三. 线程池 ThreadPool 

在本节,我们将实现一个简单的自定义线程池,其只包含核心线程数,并且规定线程池的运行规则如下:

1.若当前线程数 < corePoolSize,则新建线程处理任务;

2.若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待;

3.若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略;

/*** 自定义线程池实现:*  1. 若当前线程数 < corePoolSize,则新建线程处理任务*  2. 若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待*  3. 若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略*/
public class ThreadPool {// 任务队列private BlockingQue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 锁private ReentrantLock mainLock = new ReentrantLock();// 核心线程数private int coreSize;// 获取任务的超时时间(allowThreadTimeOut=true时有效)private long timeOut;// 时间单位(allowThreadTimeOut=true时有效)private TimeUnit timeUnit;// 是否允许线程超时等待(默认允许)private boolean allowThreadTimeOut = true;// 拒绝策略private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;this.taskQueue = new BlockingQue<>(queueCapacity);this.rejectPolicy = rejectPolicy;}// 设置 allowThreadTimeOut 参数public void setAllowThreadTimeOut(boolean allowThreadTimeOut) {this.allowThreadTimeOut = allowThreadTimeOut;}// 执行任务 taskpublic void execute(Runnable task){mainLock.lock();try{if(workers.size() < coreSize){// 添加核心线程Worker worker = new Worker(task);workers.add(worker);worker.start();}else if(!taskQueue.offer(task)){// 执行拒绝策略rejectPolicy.reject(taskQueue, task);}} finally {mainLock.unlock();}}// 工作线程类private class Worker extends Thread{// 执行任务private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {// 获取任务while(task != null || (task = getTask()) != null){try{task.run();}catch (Exception e){e.printStackTrace();}finally {task = null;}}// worker 线程终止synchronized (workers){// 移除 workerworkers.remove(this);}}}// 从阻塞队列中获取等待任务(提供给Worker的钩子方法)private Runnable getTask(){for(;;){try {Runnable r = allowThreadTimeOut ? taskQueue.poll(timeOut, timeUnit) : taskQueue.take();return r;} catch (InterruptedException e) {// 若被中断则重新等待e.printStackTrace();}}}
}

Java ThreadPoolExecutor 的实现相比我们自定义的线程池更加复杂和安全(增加了线程池状态的维护、最大线程数的逻辑、线程池终止方法等),但在核心思想的实现上基本一致,因此这段自定义代码的实现可以帮助我们更加方便的理解 ThreadPoolExecutor 的源码。

四. 模拟运行

public class Main {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(3,10000, TimeUnit.MILLISECONDS, 5,(queue, task) -> {// 1. 死等//try {//    queue.put(task);//} catch (InterruptedException e) {//    e.printStackTrace();//}// 2. 放弃任务执行// do nothing...System.out.println("do discard policy...");// 3. 抛出异常//throw new RuntimeException("task run fail" + task);// 4. 调用线程执行任务//task.run();});for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j + "is running...");});}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/57750.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

施耐德M310PLC通讯之ModbusTCP(一)

这是另一个专题----施耐德国产化PLC(M310)的通讯篇 本节是ModbusTcp通讯 测试对象: M310plc与M241PLC 通讯协议: ModbusTcp 主站:M310PLC 从站:M241PLC 1.M310端: 1.1 新建工程(M310采用EcoStruxure Motion Expert 软件) 新建工程,这里不区分PLC型号的,只要是M310即…

电能表预付费系统-标准传输规范(STS)(30)

6.5.3.2 CONTROLBlock construction The 1 6 digit CONTROLBlock is constructed from the data elements in the APDU as defined in Table 36 and Table 37.The most significant digit is in position 1 5 and the least significant digit in position 0. APDU中的数据元素…

Jmeter基础篇(19)JSR223预处理器

前言 JSR223预处理器是Apache JMeter中的一个组件&#xff0c;它允许用户使用任何支持Java Scripting API (JSR 223) 的脚本语言来执行预处理任务。这个功能非常强大&#xff0c;因为它让测试人员能够利用如Groovy、JavaScript&#xff08;Nashorn引擎&#xff09;、BeanShell…

Python基于TensorFlow实现双向循环神经网络GRU加注意力机制分类模型(BiGRU-Attention分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后关注获取。 1.项目背景 随着深度学习技术的发展&#xff0c;循环神经网络&#xff08;RNN&#xff09;及其变种如门控循环…

echart实现地图数据可视化

文章目录 [TOC](文章目录) 前言一、基本地图展示2.数据可视化 总结 前言 最近工作安排使用echarts来制作图形报表&#xff0c;记录一下我的步骤&#xff0c;需求呈现一个地图&#xff0c;地图显示标签&#xff0c;根据业务指标值给地图不同省市填充不同颜色&#xff0c;鼠标放…

数学真题总结

举反例 看清正负号 对应的特征值一致 不用裁开计算行列式要注意符号&#xff01;&#xff01;&#xff01; 根据值的大小确定正负 没有思路就构建tanx求极值要考虑端点线性方程&#xff1a;求通解归并x几何意义 整体思想 u e^x y都设计好了&#xff0c;曲线是f(x,y) 0,直接把…

ES跟Kafka集成

配合流程 1. Kafka作为分布式流处理平台&#xff0c;能够实时收集和处理不同数据源的数据流&#xff1b; 2. 通过Kafka Connect或者Logstash等中间件&#xff0c;可以将Kafka中的数据流实时推送到Elasticsearch中&#xff1b; 3. Elasticsearch接收到数据后&#xff0c;会根据…

价格文本对齐

记录一下工作里常遇到的一些简单问题&#xff1a; 需求是一个购买按钮上同时展示原价和现价&#xff1a; 1.原价现价文本格式不同 2.原价切需要加打折红线&#xff0c;不方便用富文本一个文本处理。 3.需要对两条文本适配父节点的宽度&#xff0c;不能超出按钮 以下是实现代…

c++:vector模拟实现

一、vector成员变量 库里实现用的就是这三个成员变量&#xff0c;咱们实现跟库里一样&#xff0c; namespace myvector {template<class T>class vector{public://vecttor的迭代器是原生指针typedef T* iterator;typedef const T* const_iterator; private:iterator _sta…

【热门主题】000023 计算机视觉:算法与应用的深度探索

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 【热…

国产服务器平台离线部署k8s和kubesphere(含离线部署新方式)

"信创&#xff1a;鲲鹏麒麟&#xff0c;ARM64架构&#xff0c;实现K8s和Kubesphere的离线部署&#xff0c;全新方式助力企业高效运维。" 本文将深入探讨如何借助鲲鹏CPU(arm64)和操作系统Kylin V10 SP2/SP3,通过KubeKey制作KubeSphere与Kubernetes的离线安装包&#…

「C/C++」C/C++ 之 指针详解

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…

CSS--导航栏案例

利用CSS制作北大官网导航栏 详细代码如下&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><style>*{margin: 0;padding: 0;}#menu{background-color: darkred;width: 100%;height: 50px…

【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割!

【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割&#xff01; 【语义分割|代码解析】CMTFNet-2: CNN and Multiscale Transformer Fusion Network 用于遥感图像分割&#xff01; 文章目录 【语义分割|代码解析】CMTFNet-2: …

基于 Python 的 Django 框架开发的电影推荐系统

项目简介&#xff1a;本项目是基于 Python 的 Django 框架开发的电影推荐系统&#xff0c;主要功能包括&#xff1a; 电影信息爬取&#xff1a;获取并更新电影数据。数据展示&#xff1a;提供电影数据的列表展示。推荐系统&#xff1a;基于协同过滤算法实现个性化推荐。用户系…

高并发场景下的性能测试方法!

在现代互联网应用中&#xff0c;高并发场景下的性能测试显得尤为重要。无论是电商平台的秒杀活动&#xff0c;还是社交应用的突发流量&#xff0c;都需要确保系统能够在高并发情况下稳定运行。本文将详细介绍高并发场景下的性能测试方法&#xff0c;并提供具体的方案和实战演练…

超萌!HTMLCSS:超萌卡通熊猫头

效果演示 创建了一个卡通风格的熊猫头 HTML <div class"box"><div class"head"><div class"head-copy"></div><div class"ears-left"></div><div class"ears-right"></di…

springboot高校运动会管理系统-计算机毕业设计源码33814

摘要 本文旨在介绍基于Spring Boot框架和HTML技术开发的高校运动会管理系统。通过该系统&#xff0c;学校能够更高效地组织和管理校园内的各项体育赛事&#xff0c;提升运动会的组织效率和参与体验。系统整合了Spring Boot的强大功能和HTML的灵活性&#xff0c;为高校运动会管理…

Linux特种文件系统--tmpfs文件系统

tmpfs类似于RamDisk&#xff08;只能使用物理内存&#xff09;&#xff0c;使用虚拟内存&#xff08;简称VM&#xff09;子系统的页面存储文件。tmpfs完全依赖VM&#xff0c;遵循子系统的整体调度策略。说白了tmpfs跟普通进程差不多&#xff0c;使用的都是某种形式的虚拟内存&a…

森利威尔SL2516D 耐压60V内置5V功率MOS 支持PWM LED恒流驱动器芯片

一、基本特性 型号&#xff1a;SL2516D封装&#xff1a;ESOP8工作频率&#xff1a;140kHz驱动MOS管&#xff1a;内置 二、电气特性 输入电压范围&#xff1a;8V~100V&#xff08;注意&#xff0c;虽然问题中提到耐压60V&#xff0c;但根据官方信息&#xff0c;其实际耐压范围…