JDK-调度线程池

归档

  • GitHub: JDK-调度线程池

使用示例

  • https://github.com/zengxf/small-frame-demo/blob/master/jdk-demo/simple-demo/src/main/java/test/jdkapi/juc/thread_pool/TestSchedule.java

JDK 版本

openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)

原理

类结构

  • java.util.concurrent.ScheduledThreadPoolExecutor
/*** 调度线程池 */
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService 
{/*** 构造器 */public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue()); // 使用内部队列作任务队列 sign_c_001}}
  • java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue
    /*** sign_c_001 延迟任务队列 */static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16; // 初始队列大小 16 个private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();}
  • java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
    /*** sign_c_010 调度任务 */private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> // 继承 sign_c_011 实现 sign_i_011{private final long sequenceNumber;  // 序列号(添加时递增设置)private volatile long time;         // 触发时间(执行的时间点),基于纳米/*** 重复任务的周期,以纳秒为单位。* 正值表示固定速率执行。* 负值表示固定延迟执行。* 值为 0 表示非重复(单次)任务。*/private final long period;// 构造器ScheduledFutureTask(Runnable r, V result, long triggerTime,long period, long sequenceNumber) {super(r, result); // 执行体交给父类(sign_c_011)保存this.time = triggerTime;this.period = period;this.sequenceNumber = sequenceNumber;}}
  • java.util.concurrent.FutureTask
/*** sign_c_011 */
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;     // 状态private Callable<V> callable;   // 封装的任务public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable}
}
  • Future 相关接口定义
// sign_i_011
// java.util.concurrent.RunnableScheduledFuture
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {/*** 判断是不是周期任务*/boolean isPeriodic();
}// java.util.concurrent.ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}// java.util.concurrent.Delayed
public interface Delayed extends Comparable<Delayed> {/*** 获取延时时间,* Comparable 用于排序,延时小的排前面。*/long getDelay(TimeUnit unit);
}
  • java.util.concurrent.ThreadPoolExecutor.Worker
    /*** sign_c_020 工作者(线程) */private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); // 创建一个线程,自己为执行体 sign_m_030}}

调用链

  • java.util.concurrent.ScheduledThreadPoolExecutor
    /*** 创建调度任务 */public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {... // 省略校验ScheduledFutureTask<Void> sft = // 创建调度任务 sign_c_010 new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period),sequencer.getAndIncrement());RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}// 延迟执行private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task); // 添加到队列if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);elseensurePrestart();       // 启动任务执行者线程 sign_sm_001}}
  • java.util.concurrent.ThreadPoolExecutor
    // sign_sm_001 启动任务执行者线程 void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);  // 添加工作线程else if (wc == 0)addWorker(null, false);}// 添加工作线程private boolean addWorker(Runnable firstTask, boolean core) {... // 省略其他处理boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);  // 创建工作者 sign_c_020final Thread t = w.thread;if (t != null) {......workers.add(w); // 添加到工作者集合...t.start();          // 启动线程...}} ... // finallyreturn workerStarted;}// sign_m_022 执行队列中的任务 final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;... // 省略其他try {while (task != null || (task = getTask()) != null) { // 获取队列中的任务w.lock();... // 线程中断处理try {beforeExecute(wt, task);        // 调用执行前的钩子函数try {task.run();                 // 执行任务体(相当于定时任务的执行体 sign_m_040)afterExecute(task, null);   // 调用执行后的钩子函数} ... // catch} finally {task = null;w.completedTasks++; // 完成的任务数 +1w.unlock();}}...} ... // finally}private Runnable getTask() {...for (;;) {... // 省略其他判断处理try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();   // 默认调用 take(): 获取任务或限时等待 sign_m_050if (r != null)return r;...} ... // catch }}
  • java.util.concurrent.ThreadPoolExecutor.Worker
        // sign_m_030 执行体 public void run() {runWorker(this);    // 执行队列中的任务 sign_m_022}
  • java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask
        /*** sign_m_040 定时任务执行体 */public void run() {if (!canRunInCurrentRunState(this))cancel(false);else if (!isPeriodic())super.run();else if (super.runAndReset()) {     // 执行任务具体逻辑setNextRunTime();               // 设置任务下次被执行的时间reExecutePeriodic(outerTask);   // 重新添加到(当前线程池的)队列}}
  • java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue
        /*** sign_m_050 从队列中获取任务(或限时等待) */public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();  // 没有任务则无限等待else {long delay = first.getDelay(NANOSECONDS);   // 需要延时的时间,实例 ref: sign_c_010if (delay <= 0L)return finishPoll(first);   // 小于 0 表示不需要延时,直接返回first = null; // don't retain ref while waitingif (leader != null)available.await();  // 相当于有线程在等待延时,就无限等待else {Thread thisThread = Thread.currentThread();leader = thisThread;    // 相当于设置标识(给其他线程判断是不是要无限等待)try {available.awaitNanos(delay);    // 等待需要延时的时间} finally {if (leader == thisThread)leader = null;  // 相当于清空标识}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}

总结

  • 线程等待是在队列 take() 方法中处理
    • 等待时延是由 ScheduledFutureTask #getDelay() 进行判断
    • 而不是通过 DelayQueue 实现,但底层原理一样
  • 重复延时执行的任务,每次执行完,会重新添加到队列中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/36291.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

边缘计算VNC智能盒子如何助力HMI设备实现二次开发?

HMI&#xff08;Human-Machine Interface&#xff09;又称人机界面&#xff0c;是用户与机器之间交互和通信的媒介。今天带你了解智能盒子如何助力HMI设备实现二次开发&#xff1f; HMI设备被广泛应用在工业自动化中&#xff0c;具有显示设备信息&#xff0c;实时监测&#xf…

python爬虫--scrapy框架

Scrapy 一 介绍 Scrapy简介 1.Scrapy是用纯Python实现一个为了爬取网站数据、提取结构性数据而编写的应用框架&#xff0c;用途非常广泛2.框架的力量&#xff0c;用户只需要定制开发几个模块就可以轻松的实现一个爬虫&#xff0c;用来抓取网页内容以及各种图片&#xff0c;非…

GPT-5对普通人有何影响

这篇文章对ChatGPT的使用方法和提问技巧进行了讨论&#xff0c;重点强调了背景信息和具体提问的重要性。文章清晰地传达了如何提高ChatGPT回答的质量&#xff0c;以及个人在使用ChatGPT时的体会和建议。然而&#xff0c;文章在逻辑组织和表达方面还有一些可以改进的地方&#x…

Spring Boot与分布式事务的最佳实践

Spring Boot与分布式事务的最佳实践 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们来探讨在Spring Boot应用中如何实现分布式事务的最佳实践。 什么是…

Android Launcher-----MainThreadInitializedObject介绍

MainThreadInitializedObject 是 Android 开发中用于确保对象在主线程上初始化的一种设计模式 一、用途 MainThreadInitializedObject 通常用于确保那些需要在主线程上创建的对象&#xff08;比如UI组件或依赖于主线程环境的对象&#xff09;能够安全地进行初始化 二、优点 …

LeetCode.438找到字符串中所有字母异位词

问题描述 给定两个字符串s和p&#xff0c;找到s中所有p的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字符串&#xff08;包括相同的字符串&#xff09;。 解题思路1 注意&#xff1a;该解题思路是错误的&am…

Microsoft VBA Excel 操控 Access资料表和查询代码进行搬运操作

问题场景 Run_NoSource_AddressSource_FileDestination_AddressDestination_FileCopy_IndicatorRun_Start_Time1C:\Users\EP\path\to\FileSSS-1.MDBC:\Users\EP\path\to\FileSSC-1.MDBY2C:\Users\EP\path\to\FileSSS-2.MDBC:\Users\EP\path\to\FileSSC-2.MDBY3C:\Users\EP\pat…

NC参照 根据名称转换为主键值,如部门、人员等参照根据部门名称、人员名称获取对应的主键值

NC参照 根据名称转换为主键值&#xff0c;如部门、人员等参照根据部门名称、人员名称获取对应的主键值 private BillCardPanel getEditBillCardPanel() {return getEditor().getBillCardPanel(); }private BillData getEditorBillData() {return this.getEditor().getBillCard…

静态库和动态库

1、编译过程 1.预处理&#xff1a;解释并展开源程序当中的所有的预处理指令&#xff0c;此时生成 *.i 文件。 2.编译&#xff1a;词法和语法的分析&#xff0c;生成对应硬件平台的汇编语言文件&#xff0c;此时生成 *.s 文件。 3.汇编&#xff1a;将汇编语言文件翻译为对应处理…

便携式烟气监测仪的应用主要有哪些?

烟气分析仪是一种用于检测和分析烟气中各种成分和污染物含量的仪器&#xff0c;通过采集和处理烟气样品&#xff0c;对其中的各种成分进行定量分析。那么&#xff0c;便携式烟气监测仪的应用主要有哪些&#xff1f;为方便大家了解&#xff0c;下面就让小编来为大家简单介绍一下…

2-2到2-4

计算出所有人的平均年龄&#xff1a; val lines sc.textFile("/root/data/scala/people/page.txt") val count lines.count() val total lines.map(line > line.split(" ")(1)).map(t>t.trim.toInt).collect().reduce((a,b)>ab) val avgAge …

如何防止SQL注入

为了防止SQL注入攻击&#xff0c;可以采取以下一系列的安全措施&#xff0c;这些措施结合了多篇参考文章中的关键信息和方法&#xff1a; 使用参数化查询或预编译语句&#xff1a; 这是防止SQL注入的最常见且最有效的方法之一。通过将用户输入的数据作为参数传递给SQL查询语句…

[Python]根据文件路径获取文件所在目录、文件名和后缀名

一、简介 本文介绍了在python中如何根据文件的路径名字&#xff08;字符串&#xff09;获取文件所在目录名、文件名&#xff08;带后缀&#xff09;、文件名&#xff08;无后缀&#xff09;和文件后缀名。 二、代码 假设文件路径为/home/user/temp.txt&#xff0c;使用以下代…

压缩pdf文件大小的方法,如何压缩pdf格式的大小

pdf太大怎么压缩&#xff1f;当你需要通过电子邮件发送一个PDF文件&#xff0c;却发现文件太大无法成功发出时&#xff0c;这些情况下&#xff0c;我们都需要找到一种方法来压缩PDF文件&#xff0c;以便更便捷地进行分享和传输。PDF文件的大小通常与其中包含的图片、图形和文本…

入门JavaWeb之 Response 下载文件

web 服务器接收到客户端的 http 请求 针对这个请求&#xff0c;分别创建一个代表请求的 HttpServletRequest 对象&#xff0c;代表响应的 HttpServletResponse 对象 获取客户端请求过来的参数&#xff1a;HttpServletRequest 给客户端响应一些信息&#xff1a;HttpServletRe…

数据库索引失效的11种情况

MySQL中 提高性能 的一个最有效的方式是对数据表 设计合理的索引。索引提供了高效访问数据的方法&#xff0c;并且加快查询的速度&#xff0c;因此索引对查询的速度有着至关重要的影响。使用索引可以 快速地定位 表中的某条记录&#xff0c;从而提高数据库査询的速度&#xff0…

js获取选中区域(window.getSelection的基本使用)

返回一个 Selection 对象&#xff0c;表示用户选择的文本范围或光标的当前位置。 const selection window.getSelection() 1.toString() //光标选中的文本 const selectedText selection.toString() 2.getRangeAt() //返回一个包含当前选区内容的区域对象。 selection…

数据与文字的表示方法

目录 1. 数据格式 1. 文本文件格式 2. 二进制文件格式 3. 数据库格式 4. 压缩格式 2. 数字机器码表示 整数表示 浮点数表示 3. 字符与数组的表示方法 1. ASCII&#xff08;美国信息交换标准代码&#xff09; 2. 扩展ASCII 3. Unicode 4. UTF-8&#xff08;8 位 Uni…

面试相关-接口测试常问的问题

1.为什么要做接口测试 (1)现在大多系统都是前后端分离的项目,前端和后端的进度可能不一样,那为了尽早的进入测试,前端界面没有开发完成的情况下,只要后端的接口开发完了,就可以提前做接口测试了; (2)基于安全考虑,只依赖前端进行限制,已经完全不满足系统的安全性…

Power Pivot——常用DAX 函数

常用DAX 函数 以下这些函数是 DAX 中最常用的一部分&#xff0c;通过熟练掌握这些函数&#xff0c;你可以有效地进行数据分析和建模。 聚合函数 (Aggregation Functions) SUM() 用途&#xff1a;对指定列中的所有数值求和。 语法&#xff1a;SUM() 示例&#xff1a;SUM(Sale…