并发-Executor框架笔记

Executor框架

jdk5开始,把工作单元与执行机制分离开来,工作单元包括Runable和Callable,执行机制由Executor框架来提供。

Executor框架简介

Executor框架的两级调度模型

  • Java线程被一对一映射为本地操作系统线程
    • java线程启动会创建一个本地操作系统线程
    • java线程终止操作系统线程也会被回收
    • 操作系统会调度所有线程并将它们分配给可用的cpu
  • 在上层,java多线程程序通常把应用分解为若干任务,然后使用用户级调度器将这些任务映射为固定数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上
  • 应用通过Executor框架控制上层调度,下层由操作系统内核控制,下层调度不受应用程序控制。

在这里插入图片描述

Executor框架的结构与成员

Executor框架结构
  • 主要由任务,任务执行和异步计算结果组成

    • 任务:包括被执行任务需要实现的接口:Runnable和Callable
    • 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorServie接口。(ThreadPoolExecutor和ScheduledThreadPoolExecutor两个关键类实现了)
    • 异步计算的结果:包括接口Future和实现Future接口的FutureTask类
  • 类和接口

  • 在这里插入图片描述

    • Executor是一个接口,是Executor框架的基础,将任务的提交与任务的执行分离开来
    • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
    • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。比Timer更灵活,功能更强大。
    • Future接口和实现Future接口的FutureTask类,代表异步计算的结果
    • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行
  • 使用:

    • 主线程创建实现Runnable或Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result))
    • 可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command)),或者吧Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task))
    • 执行ExecutorService.submit(…),将会返回一个实现Future接口的对象,也可以创建FutureTask然后直接交给ExecutorService执行
    • 主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel来取消此任务
Executor框架的成员
  • ThreadPoolExecutor:通常使用工厂类Executors创建

    • FixedThreadPool:创建固定线程数的FixedThreadPool

      • 适用于为了满足资源管理的需求,需要限制当前线程数量的应用场景,适用于负载比较重的服务器

      • public static ExecutorService newFixedThreadPool(int nThreads)
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
        
    • SingleThreadExecutor:创建单个线程

      • 适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景

        public static ExecutorService newSingleThreadExecutor()
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
        
    • CachedThreadPool:创建一个会根据需要创建新线程

      • 大小无界的线程池,适用于执行很多短期异步任务的小程序,或负载教轻的服务器

      • public static ExecutorService newCachedThreadPool()
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
        
  • ScheduledThreadPoolExecutor:通常使用工厂类Executors创建

    • 创建ScheduledThreadPoolExecutor:包含若干线程的ScheduledThreadPoolExecutor

      • 适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求需要限制后台线程数量的应用场景

      • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
        
    • 创建SingleThreadScheduledExecutor:包含一个线程的ScheduledThreadPoolExecutor

      • 适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景

      • public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize)
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize, ThreadFactory threadFactory)
        
  • Future接口

    • Future接口和Future接口的FutureTask类用来表示异步计算的结果

    • 把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutors或ScheduledThreadPoolExecutors时,会返回给一个FutureTask对象

    • <T> Future<T> submit(Callable<T> task)
      <T> Future<T> submit(Runnable task, T result)
      Future<> submit(Runnable task)
      
  • Runnable接口和Callable接口

    • Runnable不会返回结果,Callable会返回结果

    • 可以把Runnable包装成Callable

    • public static Callable<Object> callable(Runnable task)
      
    • 可以把Runnable和一个待返回的结果包装成一个Callable

    • public static <T> Callable<T> callable(Runnable task, T result)
      
    • 把callable对象提交给执行时,submit会返回一个FutureTask对象

      • 执行get方法等待任务执行完成,任务执行完成后get方法将返回该任务的结果

ThreadPoolExecutor详解

构成组件

  • corePool:核心线程池大小
  • maximumPool:最大线程池大小
  • BlockingQueue:用来暂时保存任务的工作队列
  • RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或者饱和时,execute方法将要调用的Handler

创建类型

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

FixedThreadPool详解

固定线程数线程池

public static ExecutorService newFixedThreadPool(int nThreads){return new ThreadPoolExecutor(nThreads,nThreads, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
  • corePoolSize 和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数
  • 当啊线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止
  • keepAliveTime设置为0,多余空闲线程会被立即终止

流程

  1. 如果当前运行线程数少于corePoolSize,则创建新线程来执行任务
  2. 在线程池完成预热后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务执行
  • 使用无界队列LinkedBlockingQueue作为线程池工作队列,使用无界队列作为工作队列会对线程池带来影响
    • 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
    • 由于上条,使用无界队列时,maximumPoolSize将是一个无效参数
    • 由于上条和上上条,使用无界队列时keepAliveTIme将是一个无效参数
    • 由于使用无界队列,运行中的FixedThreadPool不会拒绝任务

SingleThreadExecutor详解

使用单个worker线程的Executor

public static ExecutorService newSingleThreadExecutor(){return new ThreadPoolExecutor(1,1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
  • corePoolSize 和maximumPoolSize都被设置为1
  • keepAliveTime设置为0,多余空闲线程会被立即终止
  • 使用无界队列LinkedBlockingQueue作为线程池的工作队列

工作流程

  1. 如果当前运行的线程数少于corePoolSize,则创建一个新线程来执行任务
  2. 在线程池完成预热后,将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行

CachedThreadPool详解

根据需要创建新线程的线程池

public static ExecutorService newCachedThreadPool(int nThreads){return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
}
  • corePoolSize设置为0
  • maximumPoolSize被设置为Integer.MAX_VALUE
  • keepAliveTime设置为60L,线程池中的空闲线程等待新任务的最长时间是60秒,空闲线程超过60秒将会被终止
  • 使用没有容量的SynchronousQueue作为线程池的工作队列
  • 如果主线程提交的任务速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程,极端情况下,会因为创建过多线程为耗尽cpu和内存资源

流程:

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正则执行SynchronousQueue.poll,那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行步骤2
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll.这种情况下,步骤1 将失败,此时CachedThreadPool会创建一个新线程执行任务,execute方法执行完成
  3. 在步骤2中新创建将任务执行完后,会执行SynchronousQueue.poll,这个poll操作会让空闲线程最多在SynchronousQueue中等待60s,如果60s内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务,否则这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

ScheduledThreadPoolExecutor详解

  • 继承自ThreadPoolExecutor
  • 主要用来在给定的延迟之后运行任务,会定期执行任务。
  • 可以在构造函数中指定多个对应的后台线程数

ScheduledThreadPoolExecutor运行机制

  • DelayQueue是一个无界队列。

主要两部分

  • 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
  • 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下修改

  • 使用DelayQueue作为任务队列
  • 获取任务的方式不同
  • 执行周期任务后,增加了额外的处理

ScheduledThreadPoolExecutor的实现

ScheduledFutureTask成员变量:

  • long time:表示这个任务将要被执行的具体时间
  • long sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
  • long period:表示任务执行的间隔周期

DelayQueue封装了一个优先级队列,这个优先级队列会将队列的任务根据time排列,小的在前,如果time相同,比较sequenceNumber,小的在前

ScheduledThreadPoolExecutor执行某个周期任务步骤

  • 线程从DelayQueue中获取已到期的ScheduledFutureTask,到期任务是指ScheduledFutureTask的time大于等于当前时间
    • 获取Lock
    • 获取周期任务
      • 如果PriorityQueue为空,当前线程到Condition中等待
      • 如果PriorityQueue的头元素的time时间比当前时间大,到condition中等待到time时间
      • 获取PriorityQueue的头元素,如果不为空,则唤醒condition中等待的所有线程
    • 释放Lock
  • 线程执行ScheduledFutureTask
  • 线程修改ScheduledFutureTask的time后边变为下次将要被执行的时间
  • 线程把这个修改time之后的ScheduledFutureTask放回到DelayQueue中
    • 获取Lock
    • 添加任务
      • 向PriorityQueue添加任务
      • 如果添加的任务是头元素,唤醒Condition中等待的所有线程
    • 释放Lock

FutureTask详解

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

简介

  • FutureTask除了实现Future接口外,还实现了Runnable接口。
  • FutureTask可以交给Executor执行,也可以由调用线程直接执行
  • FutureTask可以处于3种状态
    • 未启动:run方法还没有被执行前
      • 执行FutureTask.get将导致调用线程阻塞
      • 执行FutureTask.cancel将导致此任务永远不会被执行
    • 已启动:run方法被执行过程中
      • 执行FutureTask.get将导致调用线程阻塞
      • 执行FutureTask.cancel(true)将以中断执行此任务线程的方式来试图停止任务
      • 执行FutureTask.cancel(false)将不会对正在执行此任务的线程产生影响
    • 已完成:run方法执行完后正常结束,或被取消,或执行run方法时抛出异常而异常结束
      • 执行FutureTask.get将导致调用线程立即返回结果或抛出异常
      • 执行FutureTask.cancel()返回false

使用

  • 可以把FutureTask交给Executor执行
  • 可以通过ExecutorService.submit返回一个FutureTask,然后执行FutureTask.get或cancel方法
  • 也可以单独使用FutureTask

当一个线程需要等待另一个线程把某个任务执行完后才能执行,此时可以使用FutureTask

实现

  • 基于AQS实现,包含两种类型操作

    • 至少一个acquire操作,阻塞调用线程,除非直到AQS状态允许这线程继续执行,FutureTask的acquire操作为get方法调用
    • 至少一个release操作,这个操作改变AQS的状态,改变后的状态可以允许一个或多个阻塞线程被解除阻塞,FutureTask的release操作包括run和cancel
  • Sync是FutureTask的内部私有类,继承自AQS,FutureTask的所有公有方法都直接委托给了内部私有Sync

  • FutureTask.get方法会用AQS的acquireSharedInterruptibly方法,执行过程

    • 调用AQS的acquireSharedInterryptibly方法,
      • 回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功
        • 成功条件:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null
    • 如果成功则get方法立即返回,如果失败则到线程等待队列中去等待其他线程执行release操作
    • 当其他线程执行release操作唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程
    • 最后返回计算的结果或抛出异常

    FutureTask.run方法

    • 执行在构造函数中指定的任务(Callable.call)
    • 以原子方式来更新同步,如果这个原则操作成功,就设置代表计算结果的变量result的值为callable.call的返回值,然后调用AQS.releaseShared
    • AQS.releaseShared首先会回调在子类Sync中实现的tryReleaseShared来执行release操作,AQS.releaseShared,然后唤醒线程等待队列中的第一个线程
    • 调用FutureTask.done1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/74453.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity3D URP 仿蜘蛛侠风格化BloomAO

Unity3D URP 仿蜘蛛侠风格化Bloom&AO BloomBloom效果流程&#xff1a;制作控制面板VolumeComponent.CSCustom Renderer FeatherCustom Renderer PassBloom ShaderComposite Shader 完善Custom Feather风格化AO 总结 本篇文章介绍在URP中如何进行风格化后处理&#xff0c;使…

高阶数据结构-----三种平衡树的实现以及原理(未完成)

TreeMap和TreeSet的底层实现原理就是红黑树 一)AVL树: 1)必须是一棵搜索树:前提是二叉树&#xff0c;任取一个节点&#xff0c;它的左孩子的Key小于父亲节点的Key小于右孩子节点的Key&#xff0c;中序遍历是有序的&#xff0c;按照Key的大小进行排列&#xff0c;高度平衡的二叉…

2024字节跳动校招面试真题汇总及其解答(一)

1. 【算法题】重排链表 给定一个单链表 L 的头节点 head ,单链表 L 表示为: L0 → L1 → … → Ln - 1 → Ln请将其重新排列后变为: L0 → Ln → L1 → Ln - 1 → L2 → Ln - 2 → … 不能只是单纯的改变节点内部的值,而是需要实际的进行节点交换。 示例 1: 输入:hea…

SSH是如何配置的

目录 什么是SSH SSH可以做什么其他用途&#xff1f; ssh有几种连接方法吗 我应该用哪种方法连接SSH1或SSH2&#xff1f; 每天都在用SSH你知道SSH的原理吗 开启ssh后telnet会关闭吗 SSH的优缺点 SSH和Telnet之间优缺点的对比 SSH的配置实验 ensp Cisco H3C 1、什么是…

QT—基于http协议的网络文件下载

1.常用到的类 QNetworkAccessManager类用于协调网络操作&#xff0c;负责发送网络请求&#xff0c;创建网络响应 QNetworkReply类表示网络请求的响应。在QNetworkAccessManager发送一个网络请求后创建一个网络响应。它提供了以下信号&#xff1a; finished()&#xff1a;完成…

【八大经典排序算法】:直接插入排序、希尔排序实现 ---> 性能大比拼!!!

【八大经典排序算法】&#xff1a;直接插入排序、希尔排序实现 ---> 性能大比拼&#xff01;&#xff01;&#xff01; 一、 直接插入排序1.1 插入排序原理1.2 代码实现1.3 直接插入排序特点总结 二、希尔排序 ( 缩小增量排序 )2.1 希尔排序原理2.2 代码实现2.3 希尔排序特点…

数据可视化、BI和数字孪生软件:用途和特点对比

在现代企业和科技领域&#xff0c;数据起着至关重要的作用。为了更好地管理和理解数据&#xff0c;不同类型的软件工具应运而生&#xff0c;其中包括数据可视化软件、BI&#xff08;Business Intelligence&#xff09;软件和数字孪生软件。虽然它们都涉及数据&#xff0c;但在功…

文献阅读:Chain-of-Thought Prompting Elicits Reasoning in Large Language Models

文献阅读&#xff1a;Chain-of-Thought Prompting Elicits Reasoning in Large Language Models 1. 文章简介2. 具体方法3. 实验结果 1. 数学推理 1. 实验设计2. 实验结果3. 消解实验4. 鲁棒性考察 2. 常识推理 1. 实验设计2. 实验结果 3. 符号推理 1. 实验设计2. 实验结果 4.…

华为数通方向HCIP-DataCom H12-821题库(单选题:241-260)

第241题 ​​LS Request​​报文不包括以下哪一字段? A、通告路由器(Advertising Router) B、链路状态 ID (Link Srate ID) C、数据库描述序列号(Database Dascription Sequence lumber) D、链路状态类型 Link state type) 答案:C 解析: LS Request 报文中包括以下字段…

协议定制 + Json序列化反序列化

文章目录 协议定制 Json序列化反序列化1. 再谈 "协议"1.1 结构化数据1.2 序列化和反序列化 2. 网络版计算器2.1 服务端2.2 协议定制(1) 网络发送和读取的正确理解(2) 协议定制的问题 2.3 客户端2.4 代码 3. Json实现序列化反序列化3.1 简单介绍3.2 使用 协议定制 J…

【送书活动】揭秘分布式文件系统大规模元数据管理机制——以Alluxio文件系统为例

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

python调用GPT实现:智能用例生成工具

工具作用&#xff1a; 根据输入的功能点&#xff0c;生成通用测试点 实现步骤 工具实现主要分2个步骤&#xff1a; 1.https请求调用Gpt,将返回响应结果保存为.md文件 2.用python实现 将 .md文件转换成.xmind文件 3.写个简单的前端页面&#xff0c;调用上述步骤接口 详细代…

操作系统强化认识之Shell编程学习与总结

目录 1.Shell的概述 2.Shell脚本入门 3.变量 3.1.系统预定义变量 3.2.自定义变量 3.3.特殊变量 4.运算符 5.条件判断 6.流程控制 6.1.if判断 6.2.case语句 6.3.for循环 6.4.while循环 7.read读取控制台输入 8.函数 8.1.系统函数 8.2.自定义函数 9.正则表示式入…

【C++入门到精通】C++入门 ——搜索二叉树(二叉树进阶)

阅读导航 前言一、搜索二叉树简介1. 概念2. 基本操作⭕搜索操作&#x1f36a;搜索操作基本代码&#xff08;非递归&#xff09; ⭕插入操作&#x1f36a;插入操作基本代码&#xff08;非递归&#xff09; ⭕删除操作&#x1f36a;删除操作基本代码&#xff08;非递归&#xff0…

MySQL无法查看系统默认字符集以及校验规则

show variables like character_set_database; show variables like collation_database;这个错误信息表示MySQL在尝试访问performance_schema.session_variables表时&#xff0c;发现该表不存在。这个问题可能是由于MySQL的版本升级导致的。解决这个问题的一种方法是运行mysql…

论文浅尝 | 训练语言模型遵循人类反馈的指令

笔记整理&#xff1a;吴亦珂&#xff0c;东南大学硕士&#xff0c;研究方向为大语言模型、知识图谱 链接&#xff1a;https://arxiv.org/abs/2203.02155 1. 动机 大型语言模型&#xff08;large language model, LLM&#xff09;可以根据提示完成各种自然语言处理任务。然而&am…

Java JUC 并发编程(笔记)

文章目录 再谈多线程并发与并行顺序执行并发执行并行执行 再谈锁机制重量级锁轻量级锁偏向锁锁消除和锁粗化 JMM内存模型Java内存模型重排序volatile关键字happens-before原则 多线程编程核心锁框架Lock和Condition接口可重入锁公平锁与非公平锁 读写锁锁降级和锁升级 队列同步…

[构建 Vue 组件库] 小尾巴 UI 组件库 —— 横向商品卡片(仿淘宝)

文章归档于&#xff1a;https://www.yuque.com/u27599042/row3c6 组件库地址 npm&#xff1a;https://www.npmjs.com/package/xwb-ui?activeTabreadmegitee&#xff1a;https://gitee.com/tongchaowei/xwb-ui 下载 npm i xwb-ui配置 按需导入 import {组件名 } from xwb-…

【Unity】 2D 游戏 库存模块实现

库存模块主要参考了 youtube 上的视频 BMo 的 Flexible INVENTORY SYSTEM in Unity with Events and Scriptable Objects 和 Simple Inventory UI in Unity With Grid Layouts 这两个视频是一个系列 还是一个视频也是 BMo的 How To INTERACT with Game Objects using UNITY E…

Nginx详解 第五部分:Ngnix反向代理(负载均衡 动静分离 缓存 透传 )

Part 5 一、正向代理与反向代理1.1 正向代理简介1.2 反向代理简介 二、配置反向代理2.1 反向代理配置参数2.1.1 proxy_pass2.1.2 其余参数 2.2 配置实例:反向代理单台web服务器2.3 代理转发 三、反向代理实现动静分离四、缓存功能五、反向代理客户端的IP透传5.1 原理概述5.2 一…