可动态调节参数的线程池实现

背景
线程池是一种基于池化思想管理线程的工具,使用线程池可以减少创建销毁线程的开销,避免线程过多导致系统资源耗尽。在高并发的任务处理场景,线程池的使用是必不可少的。在双11主图价格表达项目中为了提升处理性能,很多地方使用到了线程池。随着线程池的使用,逐渐发现一个问题,线程池的参数如何设置?

线程池参数中有三个比较关键的参数,分别是corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、workQueueSzie(工作队列大小)。根据任务的类型可以区分为IO密集型和CPU密集型,对于CPU密集型,一般经验是设置corePoolSize=CPU核数+1,对于IO密集型需要根据具体的RT和流量来设置,没有普适的经验值。然而,我们一般遇到的情况多数是处理IO密集型任务,如果线程池参数不可动态调节,就没办法根据实际情况实时调整处理速度,只能通过发布代码调整参数。

如果线程池参数不合理会导致什么问题呢?下面列举几种可能出现的场景:

最大线程数设置偏小,工作队列大小设置偏小,导致服务接口大量抛出RejectedExecutionException。
最大线程数设置偏小,工作队列大小设置过大,任务堆积过度,接口响应时长变长。
最大线程数设置过大,线程调度开销增大,处理速度反而下降。
核心线程数设置过小,流量突增时需要先创建线程,导致响应时长过大。
核心线程数设置过大,空闲线程太多,占用系统资源。
线程池任务调度机制
要明白线程池参数对运行时的影响,就必须理解其中的原理,所以下面先简单总结了线程池的核心原理。

Java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:

所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其执行流程如下图所示:

动态调节线程池参数实现
线程池相关的重要参数有三个,分别是核心线程数、最大线程数和工作队列大小,接下来将阐述如何实现动态调节线程池参数。

调节核心和最大线程数的原理
ThreadPoolExecutor已经提供了两个方法在运行时设置核心线程数和最大线程数,分别是ThreadPoolExecutor.setCorePoolSize()和ThreadPoolExecutor.setMaximumPoolSize()。

setCorePoolSize方法的执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

setMaximumPoolSize方法执行流程是:首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

调节工作队列大小的原理
线程池中是以生产者消费者模式,通过一个阻塞队列来缓存任务,工作线程从阻塞队列中获取任务。工作队列的接口是阻塞队列(BlockingQueue),在队列为空时,获取元素的线程会等待队列变为非空,当队列满时,存储元素的线程会等待队列可用。

目前JDK提供了以下阻塞队列的实现:

但是很不幸,这些阻塞队列的实现都不支持动态调整大小,那么为什么不自己实现一个可动态调整大小的阻塞队列呢。重复造轮子是不可取的,所以我选择改造轮子。LinkedBlockingQueue是比较常用的一个阻塞队列,它无法修改大小的原因是capacity字段设置成了final private final int capacity;。如果我把final去掉,并提供修改capacity的方法,是不是就满足我们的需求呢?事实证明是可行的,文章末尾上传了ResizeLinkedBlockingQueue的实现。

结合Diamond进行实现
Diamond可以管理我们的配置,如果可以通过Diamond实现线程池参数管理那就再好不过了。接下来就开始上代码了,首先实现一个Diamond配置管理类DispatchConfig,然后,实现一个线程池管理的工厂方法StreamExecutorFactory。

DispatchConfig类是一个静态类,在初始化的时候获取了对应Diamond的内容并设置了监听,使用的时候只需要DispatchConfig.getConfig().getCorePoolSize()。

/**

  • @author moda
    */
    @Slf4j
    @Data
    public class DispatchConfig {
    public static final String DATA_ID = “com.alibaba.mkt.turbo.DispatchConfig”;
    public static final String GROUP_ID = “mkt-turbo”;
    private static DispatchConfig config;

    static {
    try {
    String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
    config = JSON.parseObject(content, DispatchConfig.class);
    Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
    @Override
    public void receiveConfigInfo(String content) {
    try {
    config = JSON.parseObject(content, DispatchConfig.class);
    } catch (Throwable t) {
    log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
    }
    }
    });
    } catch (Exception e) {
    log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);
    }
    }

    public static DispatchConfig getConfig() {
    return config;
    }

    private int corePoolSize = 10;

    private int maximumPoolSize = 30;

    private int workQueueSize = 1024;

    /**

    • 商品分批处理每批大小
      */
      private int itemBatchProcessPageSize = 200;
      }
      StreamExecutorFactory是一个静态类,维护了一个静态属性executor,并通过initExecutor()进行初始化。在初始化的时候,工作队列使用了可调节大小的阻塞队列ResizeLinkedBlockingQueue,并设置了监听Diamond变更。Diamond发生变更的时候通过在callback中对比值是否发生改变,如果发生改变则调整workQueueSize、corePoolSize、maximumPoolSize。使用的时候只需要StreamExecutorFactory.getExecutor(),修改Diamond配置就能动态修改线程池参数。

/**

  • @author moda
    */
    @Slf4j
    public class StreamExecutorFactory {
    private static final String THREAD_NAME = “mkt-turbo_stream_dispatch”;

    private static ThreadPoolExecutor executor = initExecutor();

    private static ThreadPoolExecutor initExecutor() {
    ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
    ResizeLinkedBlockingQueue workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
    //拒绝策略,调用者线程处理
    RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {
    String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
    " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
    " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
    THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
    log.warn(msg);
    if (!e.isShutdown()) {
    r.run();
    }
    };
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    DispatchConfig.getConfig().getCorePoolSize(),
    DispatchConfig.getConfig().getMaximumPoolSize(),
    10,
    TimeUnit.SECONDS,
    workQueue,
    nameThreadFactory,
    rejectedExecutionHandler
    );

     Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {@Overridepublic void receiveConfigInfo(String content) {try {DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);if (workQueue.getCapacity() != config.getWorkQueueSize()) {workQueue.setCapacity(config.getWorkQueueSize());}if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());}if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());}} catch (Throwable t) {log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);}}});return threadPoolExecutor;
    

    }

    public static Executor getExecutor() {
    return executor;
    }
    }

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/514550.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

我的世界服务器linux加mod,在Linux下搭建带MOD 我的世界(Minecraft)服务器

在Linux下搭建带MOD 我的世界(Minecraft)服务器系统要求官方服务器推荐配置要求如下&#xff1a;CPU&#xff1a;Intel Core-Based CPUs or AMD K8-Based CPUs IBM 970 2.0 GHz and better内存&#xff1a;5 GiB硬盘空间&#xff1a;16 GiB上行宽带&#xff1a;8 Mbit/s下行宽带…

2020-12-17

集团关于Blink的相关使用文档已经十分齐全&#xff0c;这里不准备再过多赘述。这篇文章准备对Blink所基于的Apache社区开源产品–Flink的架构做一些浅显分析。 一&#xff1a;Flink历史、基本架构及分布式部署 历史 Flink项目最早开始于2010年由柏林技术大学、柏林洪堡大学、…

分布式、云原生技术之后,分布式云或成数字化转型新利器

编辑 | 宋 慧 出品 | CSDN云计算 头图 | 2021可信云大会现场 7月27日&#xff0c;2021年可信云大会在京顺利开幕。本届大会以“数字裂变&#xff0c;可信发展”为主题&#xff0c;云计算行业专家学者、众多国内一线云计算厂商、头部客户同台论道&#xff0c;围绕云计算行业趋势…

linux磁盘配额edquota,Linux磁盘配额(Quota)

开启磁盘的 quota 功能&#xff1a;由于 quota 需要在 ext 的 Linux 延伸格式档案才可以启动&#xff0c;所以你就必须要将准备开启quota 的磁盘启动参数&#xff0c;写进入 quota 的磁盘设定才行 ( /etc/fstab )&#xff01;以我的例子而言&#xff0c;我想要在/home 底下进行…

图文存储常识:单机、集中、分布式、云、云原生存储

背景 本文主要对杨传辉&#xff08;日照&#xff09;《大规模分布式存储系统原理解析与架构实战》、大话存储、网络资源(具体参考文末链接)及个人理解进行整理&#xff0c;意在构建出存储发展基本轨迹和一些基本常识&#xff0c;让更多像我一样的初入者有个宏观上的认知。 存储…

年终盘点 | 七年零故障支撑 双11 的消息中间件 RocketMQ,怎么做到的?

作者 | 愈安 来源|阿里巴巴云原生公众号 2020 年双十一交易峰值达到 58.3W 笔/秒&#xff0c;消息中间件 RocketMQ 继续数年 0 故障丝般顺滑地完美支持了整个集团大促的各类业务平稳。今年双十一大促中&#xff0c;消息中间件 RocketMQ 发生了以下几个方面的变化&#xff1a; …

2021 ISC会上山石网科重磅发布智能下一代防火墙A系列,重新定义边界安全防御

勒索病毒频繁、威胁隐匿于加密流量、高级威胁藏于内部、物联网安全盲区众多&#xff0c;数字化发展及其带来的网络威胁态势正在发生着质的变化&#xff0c;网络安全所要求的防护能力不断提高。防火墙作为企业安全基础架构最重要的基石之一&#xff0c;其辐射到企业的边界、内网…

十年,他们在云上修了一条“高速公路”

简介&#xff1a; 阿里云网络的工程师们希望&#xff0c;通过这个平台&#xff0c;帮助企业更加智能地运维自己的网络、更加便捷地配置自己的网络&#xff0c;让上云的企业在“云高速”中实现“自动驾驶”。他们说&#xff0c;把路修的更好&#xff0c;让网络更简单&#xff0c…

linux可平通网关但不能上网,redhat问题:能ping通网关和本网段的IP,但是不能ping通DNS,也不能上网...

redhat问题&#xff1a;能ping通网关和本网段的IP&#xff0c;但是不能ping通DNS&#xff0c;也不能上网(2011-12-20 06:11:51)标签&#xff1a;上网杂谈redhat问题&#xff1a;能ping通网关和本网段的IP&#xff0c;但是不能ping通DNS&#xff0c;也不能上网查看路由的信息如下…

阿里云超算异构Spot集群,助力深势科技30%成本驱动MDaaS海量算力

本文主要介绍药物研发算法科技公司深势科技是如何实现低成本在阿里云上构建分子模拟MDaaS &#xff08;Molecular Dynamics as a Service&#xff09;超算集群。 客户简介 公司名称&#xff1a;深势科技 公司网址&#xff1a;http://dptech.deepmd.net/ 公司介绍&#xff1a…

一文读懂DataOps

作者&#xff1a;彭锋 宋文欣等来源&#xff1a;智领云科技大部分企业的数据平台建设要想顺利过渡到第三阶段&#xff0c;则离不开一个关键方法论—DataOps&#xff08;数据运维&#xff09;的帮助。DataOps 与 DevOps 十分形似&#xff0c;也有着与 DevOps 类似的软件开发角色…

Python 命令行库的大乱

当你想实现一个命令行程序时&#xff0c;或许第一个想到的是用 Python 来实现。比如 CentOS 上大名鼎鼎的包管理工具 yum 就是基于 Python 实现的。 而 Python 的世界中有很多命令行库&#xff0c;每个库都各具特色。但我们往往不知道其背后的设计理念&#xff0c;也因此在选择…

Linux系统初学者指南,观点|Linux 系统调用的初学者指南

在过去的几年中&#xff0c;我一直在做大量容器相关的工作。先前&#xff0c;我看到 Julien Friedman 的一个很棒的演讲&#xff0c;它用几行 Go 语言写了一个容器框架。这让我突然了解到容器只是一个受限的 Linux 进程中的机器。构建这个受限视图涉及到 Golang 系统调用包中的…

我在阿里云做前端代码智能化

作为一个整天以代码为伴的码农&#xff0c;避免不了会接触到各种代码提示工具&#xff0c;但是呢&#xff0c;用久了之后会发现他们都有个共同点&#xff0c;那就是 模型巨大&#xff0c;动辄几百兆&#xff1b;并且模型大必然需要更多的计算&#xff0c;同样会导致电脑内存占用…

英特尔携手百度全方位深化合作 共筑智能生态

2021年7月29日&#xff0c;英特尔公司今日出席智能经济高峰论坛暨百度云智峰会2021并分享了一系列与百度在人工智能、云计算、智能边缘等方面的最新合作进展。在智能技术方面&#xff0c;百度基于第三代英特尔至强可扩展处理器&#xff0c;打造全功能AI开发平台Baidu Machine L…

决策树之 GBDT 算法 - 回归部分

GBDT&#xff08;Gradient Boosting Decision Tree&#xff09;是被工业界广泛使用的机器学习算法之一&#xff0c;它既可以解决回归问题&#xff0c;又可以应用在分类场景中&#xff0c;该算法由斯坦福统计学教授 Jerome H. Friedman 在 1999 年发表。本文中&#xff0c;我们主…

广技师17专插本c语言答案,广东技术师范学院2017年专插本C语言程序设计(1)

1、广东技术师范学院2017 年专插本 C语言程序设计注意&#xff1a;请将答案写在答题纸上&#xff0c;否则无效&#xff01;一、判断题&#xff1a; (12 分每题 2 分)1、 C 语言规定 :在一个源程序中 ,main 函数的位置必须在最开始。2、假设所有变量均为整型,则表达式 (a2,b5,b,…

深度强化学习在时序数据压缩中的应用--ICDE 2020收录论文

彼节者有间&#xff0c;而刀刃者无厚&#xff1b;以无厚入有间&#xff0c;恢恢乎其于游刃必有余地矣 ----- 庖丁解牛 前言&#xff1a;随着移动互联网、IoT、5G等的应用和普及&#xff0c;一步一步地我们走进了数字经济时代。随之而来的海量数据将是一种客观的存在&#xff0…

技术干货 | mPaaS 框架下如何使用 Crash SDK 对闪退进行分析?

简介&#xff1a; Android Native Crash 处理案例分享 目前 mPaaS Android 是使用的是 Crash SDK 对闪退进行的处理&#xff0c;Crash SDK 是 Android 平台上一款功能强大的崩溃日志收集 SDK&#xff0c;有着极高的崩溃收集率和完整、全面的崩溃日志信息&#xff0c;生成的日志…

山石网科蒋东毅:网络连接矩阵复杂化,传统安全防护框架需重构

编辑 | 宋慧 供稿 | 山石网科 头图 | 蒋东毅在 ISC 2021主题论坛发表演讲 7月28日上午&#xff0c;在ISC 2021 第九届互联网安全大会主题论坛上&#xff0c;山石网科高级副总裁、首席战略官&#xff08;CSO&#xff09;蒋东毅带来了一场主题为《政企安全面临的多重挑战和未来趋…