ScheduleThreadPoolExecutor应用源码

4.1 ScheduleThreadPoolExecutor介绍

从名字上就可以看出,当前线程池是用于执行定时任务的线程池。
Java比较早的定时任务工具是Timer类。但是Timer问题很多,串行的,不靠谱,会影响到其他的任务执行。
其实除了Timer以及ScheduleThreadPoolExecutor之外,正常在企业中一般会采用Quartz或者是SpringBoot提供的Schedule的方式去实现定时任务的功能。
ScheduleThreadPoolExecutor支持延迟执行以及周期性执行的功能。

4.2 ScheduleThreadPoolExecutor应用

定时任务线程池的有参构造

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

发现ScheduleThreadPoolExecutor在构建时,直接调用了父类的构造方法
ScheduleThreadPoolExecutor的父类就是ThreadPoolExecutor
首先ScheduleThreadPoolExecutor最多允许设置3个参数:
● 核心线程数
● 线程工厂
● 拒绝策略
首先没有设置阻塞队列,以及最大线程数和空闲时间以及单位
阻塞队列设置的是DelayedWorkQueue,其实本质就是DelayQueue,一个延迟队列。DelayQueue是一个无界队列。所以最大线
程数以及非核心线程的空闲时间是不需要设置的。
代码落地使用

public static void main(String[] args) {
//1. 构建定时任务线程池
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
5,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
return t;
}
},
new ThreadPoolExecutor.AbortPolicy());
//2. 应用ScheduledThreadPoolExecutor
// 跟直接执行线程池的execute没啥区别
pool.execute(() -> {
System.out.println("execute");
});
// 指定延迟时间执行
System.out.println(System.currentTimeMillis());
pool.schedule(() -> {
System.out.println("schedule");
System.out.println(System.currentTimeMillis());
},2, TimeUnit.SECONDS);
// 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务开始时就计算
// 周期性执行就是将执行完毕的任务再次社会好延迟时间,并且重新扔到阻塞队列
// 计算的周期执行,也是在原有的时间上做累加,不关注任务的执行时长。
System.out.println(System.currentTimeMillis());
pool.scheduleAtFixedRate(() -> {
System.out.println("scheduleAtFixedRate");
System.out.println(System.currentTimeMillis());
},2,3,TimeUnit.SECONDS);
// // 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务结束后再计算下次的延迟时间
System.out.println(System.currentTimeMillis());pool.scheduleWithFixedDelay(() -> {
System.out.println("scheduleWithFixedDelay");
System.out.println(System.currentTimeMillis());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},2,3,TimeUnit.SECONDS);
}

4.3 ScheduleThreadPoolExecutor源码剖析

4.3.1 核心属性

后面的方法业务流程会涉及到这些属性。

// 这里是针对任务取消时的一些业务判断会用到的标记
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
private volatile boolean removeOnCancel = false;
// 计数器,如果两个任务的执行时间节点一模一样,根据这个序列来判断谁先执行
private static final AtomicLong sequencer = new AtomicLong();
// 这个方法是获取当前系统时间的毫秒值
final long now() {
return System.nanoTime();
}
// 内部类。核心类之一。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 全局唯一的序列,如果两个任务时间一直,基于当前属性判断
private final long sequenceNumber;
// 任务执行的时间,单位纳秒
private long time;
/**
* period == 0:执行一次的延迟任务
* period > 0:代表是At
* period < 0:代表是With
*/
private final long period;
// 周期性执行时,需要将任务重新扔回阻塞队列,基础当前属性拿到任务,方便扔回阻塞队列
RunnableScheduledFuture<V> outerTask = this;
/**
* 构建schedule方法的任务*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 构建At和With任务的有参构造
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
// 内部类。核心类之一。
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// 这个类就是DelayQueue,不用过分关注,如果没看过,看阻塞队列中的优先级队列和延迟队列

4.3.2 schedule方法

execute方法也是调用的schedule方法,只不过传入的延迟时间是0纳秒
schedule方法就是将任务和延迟时间封装到一起,并且将任务扔到阻塞队列中,再去创建工作线程去take阻塞队列。

// 延迟任务执行的方法。
// command:任务
// delay:延迟时间
// unit:延迟时间的单位
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 健壮性校验。
if (command == null || unit == null)
throw new NullPointerException();
// 将任务和延迟时间封装到一起,最终组成ScheduledFutureTask
// 要分成三个方法去看
// triggerTime:计算延迟时间。最终返回的是当前系统时间 + 延迟时间
// triggerTime就是将延迟时间转换为纳秒,并且+当前系统时间,再做一些健壮性校验
// ScheduledFutureTask有参构造:将任务以及延迟时间封装到一起,并且设置任务执行的方式
// decorateTask:当前方式是让用户基于自身情况可以动态修改任务的一个扩展口
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 任务封装好,执行delayedExecute方法,去执行任务
delayedExecute(t);
// 返回FutureTaskreturn t;
}
// triggerTime做的事情
// 外部方法,对延迟时间做校验,如果小于0,就直接设置为0
// 并且转换为纳秒单位
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
// 将延迟时间+当前系统时间
// 后面的校验是为了避免延迟时间超过Long的取值范围
long triggerTime(long delay) {
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// ScheduledFutureTask有参构造
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// time就是任务要执行的时间
this.time = ns;
// period,为0,代表任务是延迟执行,不是周期执行
this.period = 0;
// 基于AtmoicLong生成的序列
this.sequenceNumber = sequencer.getAndIncrement();
}
// delayedExecute 执行延迟任务的操作
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 查看当前线程池是否还是RUNNING状态,如果不是RUNNING,进到if
if (isShutdown())
// 不是RUNNING。
// 执行拒绝策略。
reject(task);
else {
// 线程池状态是RUNNING
// 直接让任务扔到延迟的阻塞队列中
super.getQueue().add(task);
// DCL的操作,再次查看线程池状态
// 如果线程池在添加任务到阻塞队列后,状态不是RUNNING
if (isShutdown() &&
// task.isPeriodic():现在反回的是false,因为任务是延迟执行,不是周期执行
// 默认情况,延迟队列中的延迟任务,可以执行
!canRunInCurrentRunState(task.isPeriodic()) &&
// 从阻塞队列中移除任务。
remove(task))
task.cancel(false);
else
// 线程池状态正常,任务可以执行
ensurePrestart();
}
}
// 线程池状态不为RUNNING,查看任务是否可以执行
// 延迟执行:periodic==false
// 周期执行:periodic==true
// continueExistingPeriodicTasksAfterShutdown:周期执行任务,默认为false
// executeExistingDelayedTasksAfterShutdown:延迟执行任务,默认为true
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
// 当前情况,shutdownOK为true
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
// 如果状态是RUNNING,正常可以执行,返回true
// 如果状态是SHUTDOWN,根据shutdownOK来决定
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
// 任务可以正常执行后,做的操作
void ensurePrestart() {
// 拿到工作线程个数
int wc = workerCountOf(ctl.get());
// 如果工作线程个数小于核心线程数
if (wc < corePoolSize)
// 添加核心线程去处理阻塞队列中的任务
addWorker(null, true);
else if (wc == 0)// 如果工作线程数为0,核心线程数也为0,这是添加一个非核心线程去处理阻塞队列任务
addWorker(null, false);
}

4.3.3 At和With方法&任务的run方法

这两个方法在源码层面上的第一个区别,就是在计算周期时间时,需要将这个值传递给period,基于正负数在区别At和With
所以查看一个方法就ok,查看At方法

// At方法,
// command:任务
// initialDelay:第一次执行的延迟时间
// period:任务的周期执行时间
// unit:上面两个时间的单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 健壮性校验
if (command == null || unit == null)
throw new NullPointerException();
// 周期时间不能小于等于0.
if (period <= 0)
throw new IllegalArgumentException();
// 将任务以及第一次的延迟时间,和后续的周期时间封装好。
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 扩展口,可以对任务做修改。
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 周期性任务,需要在任务执行完毕后,重新扔会到阻塞队列,为了方便拿任务,将任务设置到outerTask成员变量中
sft.outerTask = t;
// 和schedule方法一样的方式
// 如果任务刚刚扔到阻塞队列,线程池状态变为SHUTDOWN,默认情况,当前任务不执行
delayedExecute(t);
return t;
}
// 延迟任务以及周期任务在执行时,都会调用当前任务的run方法。
public void run() {
// periodic == false:一次性延迟任务
// periodic == true:周期任务
boolean periodic = isPeriodic();
// 任务执行前,会再次判断状态,能否执行任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 判断是周期执行还是一次性任务
else if (!periodic)
// 一次性任务,让工作线程直接执行command的逻辑
ScheduledFutureTask.super.run();// 到这个else if,说明任务是周期执行
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次任务执行的时间
setNextRunTime();
// 将任务重新扔回线程池做处理
reExecutePeriodic(outerTask);
}
}
// 设置下次任务执行的时间
private void setNextRunTime() {
// 拿到period值,正数:At,负数:With
long p = period;
if (p > 0)
// 拿着之前的执行时间,直接追加上周期时间
time += p;
else
// 如果走到else,代表任务是With方式,这种方式要重新计算延迟时间
// 拿到当前系统时间,追加上延迟时间,
time = triggerTime(-p);
}
// 将任务重新扔回线程池做处理
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 如果状态ok,可以执行
if (canRunInCurrentRunState(true)) {
// 将任务扔到延迟队列
super.getQueue().add(task);
// DCL,判断线程池状态if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 添加工作线程
ensurePrestart();
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/203855.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学生成绩管理系统(Java)

开发环境: Windows 11 IDEA 2021.3.3 需求: package com.it.neu;import java.util.ArrayList; import java.util.Scanner;import static java.time.Clock.system;class Student { //创建学生类private String Stu_name;private String Stu_id;public Student(String id, S…

圆通单号查询,圆通速递物流查询,对需要的单号进行颜色标记

批量查询圆通速递单号的物流信息&#xff0c;并对需要的单号进行颜色标记。 所需工具&#xff1a; 一个【快递批量查询高手】软件 圆通速递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;第一次使用的伙伴记得先注册&#xff0c…

【Java】实现顺序表基本的操作(数据结构)

文章目录 前言顺序表1、打印顺序表2、增加元素3、在任意位置增加元素4、判断是否包含某个元素5、查找某个元素对于的位置6、获取任意位置的元素7、将任意位置的元素设为value8、删除第一次出现的关键字9、获取顺序表长度10、清空顺序表总结 前言 在了解顺序表之前我们要先了解…

Kubernetes集群安装高可用postgresql

Kubernetes集群安装高可用postgresql Bitnami 提供的 postgresql-ha 解决方案是一个预配置的、高可用的 PostgreSQL 集群配置&#xff0c;通常部署在 Kubernetes 环境中。它使用了一些关键技术和组件来实现数据库的高可用性。&#xff0c;Bitnami postgresql-ha 主要采用以下构…

JVM 虚拟机(二)类的生命周期

类的声明周期描述了一个类加载、使用和卸载的整个过程。 一个类的声明周期包括五个阶段&#xff1a;加载、连接、初始化、使用、卸载&#xff0c;其中连接部分分为验证、准备和解析阶段。 加载阶段 加载阶段是第一步是类加载器根据类的全限定名通过不同的渠道以二进制流的方式…

区块链媒体:Web3.0时代的推广创新10爆款策略概览-华媒舍

随着Web3.0时代的到来&#xff0c;互联网推广正经历着一场创新的革命。在这个新的时代背景下&#xff0c;一系列全新的推广策略正在兴起&#xff0c;引领着市场的变革。本文将基于这一背景&#xff0c;为大家介绍Web3.0时代中的10大爆款推广策略概览。 1. 个性化推广 在Web3.0…

【MATLAB】SSA+FFT+HHT组合算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 SSAFFTHHT组合算法是一种基于奇异谱分析&#xff08;SSA&#xff09;、快速傅里叶变换&#xff08;FFT&#xff09;和希尔伯特-黄变换&#xff08;HHT&#xff09;的组合算法。 其中&am…

谈谈SQL的优化经验

目录 前言 表设计优化 索引优化 读写分离&#xff0c;主从复制优化 ​编辑sql语句优化 前言 SQL调优在项目中是比较常见的&#xff0c;SQL调优不仅仅包括SQL语句的编写&#xff0c;其中还应包括了数据库的表设计&#xff0c;数据库的配置架构&#xff08;主从复制&#xf…

pure::variants—产品平台化及变体管理工具

产品概述 pure::variants是德国pure-systems公司的产品&#xff0c;其目的是帮助企业实现对产品线的变体管理&#xff0c;提高企业项目资产的复用效率。pure::variants的核心理念是运用产品线管理方法对项目资产&#xff08;项目计划、需求、模型、功能模块、代码、测试用例&am…

如何将 MySQL 数据库转换为 SQL Server

本文解释了为什么组织希望将其 MySQL 数据库转换为 Microsoft SQL 数据库。本文接着详细介绍了尝试转换之前需要记住的事项以及所涉及的方法。专业的数据库转换器工具将帮助您快速将 MySQL 数据库记录转换为 MS SQL Server。 在继续之前&#xff0c;我们先讨论一下 MySQL 到 M…

Stable Diffusion WebUI常用Tag收集

捆绑(nsfw)*可以直接加人物lora Masterpiece, high quality, beautiful wallpaper, 16k, animation, illustration, positive perspective, perfect body, complete body, detailed face, delicate features, (solo:1.2), ((1girl)), thin, sexy, (medium to large breasts :1…

Linux(centos)学习笔记(初学)

[rootlocalhost~]#:[用户名主机名 当前所在目录]#超级管理员标识 $普通用户的标识 Ctrlshift放大终端字体 Ctrl缩小终端字体 Tab可以补全命令 Ctrlshiftc/V复制粘贴 / &#xff1a;根目录&#xff0c;Linux系统起点 ls&#xff1a; #list列出目录的内容&#xff0c;通常用户查看…

HTML试题——附答案

HTML试题题目 1. HTML是什么意思&#xff1f;它是什么类型的语言&#xff1f; 2. 请解释HTML标签和元素之间的区别。 3. 以下HTML标记用于什么目的&#xff1f; <!DOCTYPE html> <html> <head><title>我的网页</title> </head> <bo…

什么是 performance_schema ?

MySQL的performance_schema是运行在较低级别的用于监控MySQL Server运行过程中的资源消耗、资源等待等情况的一个功能特性&#xff0c;它具有以下特点。 performance_schema提供了一种在数据库运行时实时检查Server内部执行情况的方法。performance_schema数据库中的表使用per…

Redis研学-认识与安装

一 NoSql-非关系型数据库 1 NoSql特点&#xff1a;解耦 数据模型简单&#xff0c;灵活性更强&#xff0c;对数据库的性能要求比较高(可能出现数据不一致或丢数据)&#xff0c;不需要高度的数据一致性&#xff0c;对给定的key比较容易映射到更复杂的环境 优点&#xff1a; 对数…

WordPress发布文件随机设置作者昵称信息

我们是否看到有些明显是个人网站&#xff0c;但是他有很多的发布者。其实这个都是他个人发布的&#xff0c;只是选择的不同用户&#xff0c;感觉这个网站是多人编辑的。包括我们看到有些明显是采编采集的网站&#xff0c;他们就说是投稿的&#xff0c;实际上哪里有人投稿&#…

单片机开发常用的软件构架

对于单片机程序来说&#xff0c;大家都不陌生&#xff0c;但是真正使用架构&#xff0c;考虑架构的恐怕并不多&#xff0c;随着程序开发的不断增多&#xff0c;架构是非常必要的。 一、时间片轮询法 介于前后台顺序执行法和操作系统之间的一种程序架构设计方案。该设计方案需能…

Modbus TCP工业RFID读写器的选型要点

Modbus TCP工业RFID读写器是一种采用Modbus TCP通信协议的RFID读写器。它可以通过TCP/IP网络与计算机或其它设备进行通信&#xff0c;实现远程读取和写入RFID标签数据的目的。 与传统的RFID读写器相比&#xff0c;Modbus TCP工业RFID读写器具有更远的读写距离、更高的读写灵敏度…

DTD文档约束讲解及其使用案例

DTD&#xff08;Document Type Definition&#xff09;文档类型定义是一种用于描述XML文档结构的语法规则。它定义了XML文档的元素、属性和实体等的规范格式。在XML中&#xff0c;可以将DTD定义在XML文档中或者在一个单独的外部文件中&#xff0c;以便在多个XML文档中共享。 下…

Restarting Application Engine Programs 重新启动应用程序引擎程序

Restarting Application Engine Programs 重新启动应用程序引擎程序 A key feature of Application Engine is its built-in checkpoint and restart capabilities. If a program step terminates abnormally or fails, you can restart the request from the last successf…