页面查询多项数据组合的线程池设计 | 京东云技术团队

背景

我们应对并发场景时一般会采用下面方式去预估线程池的线程数量,比如QPS需求是1000,平均每个任务需要执行的时间是t秒,那么我们需要的线程数是t * 1000。

但是在一些情况下,这个t是不好估算的,即便是估算出来了,在实际的线程环境上也需要进行验证和微调。比如在本文所阐述分页查询的数据项组合场景中。

1、数据组合依赖不同的上游接接口, 它们的响应时间参差不齐,甚至差距还非常大。有些接口支持批量查询而另一些则不支持批量查询。有些接口因为性能问题还需要考虑降级和平滑方案。

2、为了提升用户体验,这里的查询设计了动态列,因此每一次访问所需要组合的数据项和数量也是不同的。

因此这里如果需要估算出一个合理的t是不太现实的。

方案

一种可动态调节的策略,根据监控的反馈对线程池进行微调。整体设计分为装配逻辑线程池封装设计。

1、装配逻辑

查询结果,拆分分片(水平拆分),并行装配(垂直拆分),获得装配项列表(动态列), 并行装配每一项。

2、线程池封装

可调节的核心线程数、最大线程数、线程保持时间,队列大小,提交任务重试等待时间,提交任务重试次数。 固定异常拒绝策略。

调节参数:

字段名称说明
corePoolSize核心线程数参考线程池定义
maximumPoolSize最大线程数参考线程池定义
keepAliveTime线程存活时间参考线程池定义
queueSize队列长度参考线程池定义
resubmitSleepMillis提交任务重试等待时间添加任务被拒绝后重试时的等待时间
resubmitTimes提交任务重试次数添加任务被拒绝后重试添加的最大次数
    @Dataprivate static class PoolPolicy {/** 核心线程数 */private Integer corePoolSize;/** 最大线程数 */private Integer maximumPoolSize;/** 线程存活时间 */private Integer keepAliveTime;/** 队列容量 */private Integer queueSize;/** 重试等待时间 */private Long resubmitSleepMillis;/** 重试次数 */private Integer resubmitTimes;}

创建线程池:

线程池的创建考虑了动态的需求,满足根据压测结果进行微调的要求。首先缓存旧的线程池后再创建新的线程,当新的线程池创建成功后再去关闭旧的线程池。保证在这个替换过程中不影响正在执行的业务。线程池使用了中断策略,用户可以及时感知到系统繁忙并保证了系统资源占用的安全。

public void reloadThreadPool(PoolPolicy poolPolicy) {if (poolPolicy == null) {throw new RuntimeException("The thread pool policy cannot be empty.");}if (poolPolicy.getCorePoolSize() == null) {poolPolicy.setCorePoolSize(0);}if (poolPolicy.getMaximumPoolSize() == null) {poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);}if (poolPolicy.getKeepAliveTime() == null) {poolPolicy.setKeepAliveTime(60);}if (poolPolicy.getQueueSize() == null) {poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);}if (poolPolicy.getResubmitSleepMillis() == null) {poolPolicy.setResubmitSleepMillis(200L);}if (poolPolicy.getResubmitTimes() == null) {poolPolicy.setResubmitTimes(5);}// - 线程池策略没有变化直接返回已有线程池。ExecutorService original = this.executorService;this.executorService = new ThreadPoolExecutor(poolPolicy.getCorePoolSize(),poolPolicy.getMaximumPoolSize(),poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),new ThreadPoolExecutor.AbortPolicy());this.poolPolicy = poolPolicy;if (original != null) {original.shutdownNow();}
}

任务提交:

线程池封装对象中使用的线程池拒绝策略是AbortPolicy,因此在线程数和阻塞队列到达上限后会触发异常。另外在这里为了保证提交的成功率利用重试策略实现了一定程度的延迟处理,具体场景中可以结合业务特点进行适当的调节和配置。

public <T> Future<T> submit(Callable<T> task) {RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任务future = this.executorService.submit(task);exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {throw exception;}return future;
}

监控:

1、submit提交的监控

见代码中的「监控点①」,在submit方法中添加监控点,监控key的需要添线程池封装对象的线程名称前缀,用于区分具体的线程池对象。

「监控点①」用于监控添加任务的动作是否正常,以便对线程池对象及策略参数进行微调。

public <T> Future<T> submit(Callable<T> task) {// - 监控点①CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,UmpConstant.APP_NAME,UmpConstant.UMP_DISABLE_HEART,UmpConstant.UMP_ENABLE_TP);RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任务future = this.executorService.submit(task);exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {// - 监控点①Profiler.functionError(callerInfo);throw exception;}// - 监控点①Profiler.registerInfoEnd(callerInfo);return future;
}

2、线程池并行任务

见代码的「监控点②」,分别在添加任务和任务完成后。

「监控点②」实时统计在线程中执行的总任务数量,用于评估线程池的任务的数量的满载水平。

/** 任务并行数量统计 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);public <T> Future<T> submit(Callable<T> task) {RejectedExecutionException exception = null;Future<T> future = null;for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {try {// - 添加任务future = this.executorService.submit(()-> {T rst = task.call();// - 监控点②log.info("{} - Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.decrementAndGet());return rst;});// - 监控点②log.info("{} + Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.incrementAndGet());exception = null;break;} catch (RejectedExecutionException e) {exception = e;this.theadSleep(this.poolPolicy.getResubmitSleepMillis());}}if (exception != null) {throw exception;}return future;
}

3、调节

线程池封装对象策略的调节时机

1)上线前基于流量预估的压测阶段;

2)上线后跟进监控数据和线程池中任务的满载水平进行人工微调,也可以通过JOB在指定的时间自动调整;

3)大促前依据往期大促峰值来调高相关参数。

线程池封装对象策略的调节经验

1)访问时长要求较低时,我们可以考虑调小线程数和阻塞队列,适当调大提交任务重试等待时间和次数,以便降低资源占用。

2)访问时长要求较高时,就需要调大线程数并保证相对较小的阻塞队列,调小提交任务的重试等待时间和次数甚至分别调成0和1(即关闭重试提交逻辑)。

作者:京东零售 王文明

来源:京东云开发者社区 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/115269.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql 5.7.31 创建账号并赋予权限

前言 mysql 5.7.31 mysql创建账号并赋予权限 为新的数据库flowdb&#xff0c;创建账户flowdba。将flow库的所有权限赋予flowdba账户。 操作步骤 创建flowdba账户 mysql> use mysql; Database changed mysql> create user flowdba% identified by 123456; Query OK…

VS Code C# 开发工具包正式发布

前言 微软于本月正式发布Visual Studio Code C#开发工具包&#xff0c;此前该开发套件已经以预览版的形式在6月份问世。经过4个月的测试和调整&#xff0c;微软修复了350多个问题&#xff0c;其中大部分是用户反馈导致的问题。此外&#xff0c;微软还对产品进行了300多项有针对…

PCH或stdafx预编译头导致的找不到声明的问题

PCH或stdafx预编译头导致的找不到声明的问题 明明类和函数都已经声明了&#xff0c;声明定义都存在&#xff0c;检查过好几遍&#xff0c;但是编译器还是报错 错误 C2065 “init”: 未声明的标识符刚开始想从文件编码的角度入手&#xff0c;查看是不是编码出现了问题&#xf…

1024 CSDN 程序员节-知存科技-基于存内计算芯片开发板验证语音识别

前言 在今年的 CSDN 程序员节上&#xff0c;我参与了这次知存科技举办的一个 AI Workshop 小活动——“基于存内计算芯片开发板验证语音识别”&#xff0c;并且有幸成为完成任务的学习者之一XD。上一次参与类似的活动是算能公司举办的“千校万里行”AIGC 大模型编译部署活动&a…

Spring 事务一些探讨

关于spring事务的一些思考&#xff1a; 1、spring 事务什么时候会生效什么时候会失效 2、spring 事务与JDBC事务的一些关联 3、spring 事务与数据库死锁之间的关联 下面是一些开发中的经验&#xff0c;写的比较杂&#xff0c;想到什么写什么&#xff0c;见谅 1、spring 事务…

企业微信调用JSSDK wx.agentConfig 签名 安卓正常,IOS失败

1.res.wx.qq.com/wwopen/js/jsapi/jweixin-1.0.0.js open.work.weixin.qq.com/wwopen/js/jwxwork-1.0.0.js 引入两个js 2.授权url,问题就出现在这 login_url 的最后 #wechat_redirect window.location.href https://open.weixin.qq.com/connect/oauth2/authorize?appid${…

【Django 04】Serialization 序列化的高级使用

序列化器 serializers 序列化器的作用 序列化将 queryset 和 instance 转换为 json/xml/yaml 返回给前端 反序列化与序列化则相反 定义序列化器 定义类&#xff0c;继承自 Serializer 通常新建一个 serializers.py 文件 撰写序列化内容 suah as 目前只支持 read_only 只…

软考——软件工程基础知识

软件工程概述 软件工程是由于60年代中期&#xff0c;计算机应用范围迅速扩大&#xff0c;软件的复杂度和开发使用量急剧增长&#xff0c;人们为了解决软件开发过程中不可考的因素而引进的一种工程学概念。 软件工程过程是程序员借助软件工具从而完成的一系列软件工程活动。PDC…

设计模式:外观模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)

大家好&#xff01;本节主要介绍设计模式中的外观模式。 简介&#xff1a; 外观模式&#xff0c;它是一种设计模式&#xff0c;它为子系统中的一组接口提供一个统一的、简单的接口。这种模式主张按照描述和判断资料来评价课程&#xff0c;关键活动是在课程实施的全过程中进行…

科学计算语言Julia编程初步

文章目录 安装基本类型和计算函数初步条件和判断循环向量计算 Julia号称有着比肩C的速度&#xff0c;同时又像Python一样便捷的编程语言&#xff0c;非常适合科研狗使用。之前写了很多博客介绍Julia在数值分析中的应用&#xff0c;这次写一个适合初学者学习的Julia教程系列。 …

中科芯与IAR共建生态合作,IAR集成开发环境全面支持CKS32系列MCU

中国上海–2023年10月18日–嵌入式开发软件和服务的全球领导者IAR今日宣布&#xff0c;与中科芯集成电路有限公司&#xff08;以下简称中科芯&#xff09;达成生态合作&#xff0c;IAR已全面支持CKS32系列MCU的应用开发。这一合作将进一步推动嵌入式系统的发展&#xff0c;并为…

【吞噬星空】战神宫全体投票,为罗峰脱罪,徐欣补办婚礼,洪成功恢复脑电波

【侵权联系删除】【文/郑尔巴金】 吞噬星空动画第90集即将更新&#xff0c;官方相当给力&#xff0c;提前曝光了图文情报与先行预告。虽然罗峰与巴巴塔尚未正式开始闯荡宇宙&#xff0c;但却是斩杀阿特金三大巨头的平稳生活。不但有战神宫为罗峰脱罪&#xff0c;而且还给徐欣补…

Linux安装Redis(这里使用Redis6,其它版本类似)

目录 一、选择需要安装的Redis版本二、下载并解压Redis三、编译安装Redis四、启动Redis4.1、修改配置文件4.2、启动 五、测试连接5.1、本地连接使用自带客户端redis-cli连接操作redis5.2、外部连接使用RedisDesktopManager操作redis 六、关闭Redis七、删除Redis 一、选择需要安…

java的网络编程

网络编程 网络编程&#xff1a;java语言支持网络间的数据传输&#xff0c;将底层的细节封装起来了&#xff0c;给程序员提供了一套标准类库&#xff0c;方便java开发出可以进行网络通信的软件 核心问题&#xff1a;如何找到网络世界中的目标主机&#xff0c;和目标软件&#…

ts json的中boolean布尔值或者int数字都是字符串,转成对象对应类型

没啥好写的再水一篇 json中都是字符串&#xff0c;转换一下就好&#xff0c;简单来说就是转换一次不行&#xff0c;再转换换一次&#xff0c;整体转换不够&#xff0c;细分的再转换一次 这是vue中 ts写法 ,我这里是拿对象做对比&#xff0c;不好字符和对象做对比&#xff0c;…

【Chrome】使用k8s、docker部署无头浏览器Headless,Java调用示例

什么是无头浏览器&#xff1f; 无头浏览器是一种没有图形用户界面的浏览器。无头浏览器不通过其图形用户界面(GUI)控制浏览器的操作&#xff0c;而是使用命令行。 为什么要用Chrome无头&#xff1f; Chrome Headless用于抓取(谷歌)、测试(开发者)和黑客(黑客)。搜索引擎&…

倾斜摄影三维模型根节点合并技术方法探讨

倾斜摄影三维模型根节点合并技术方法探讨 倾斜摄影技术是一种通过无人机或其他航空器采集大量高分辨率照片&#xff0c;并使用特殊软件将这些照片拼接成三维模型的方法。在这个过程中&#xff0c;摄影机以倾斜角度拍摄照片&#xff0c;从而捕捉到目标物体的多个视角&#xff0c…

LTE系统TDD无线帧结构特点

LTE系统TDD无线帧结构的特点主要表现在以下几个方面&#xff1a; 无线帧结构时间描述的最小单位是采样周期Ts。在LTE中&#xff0c;每个子载波为2048阶IFFT采样&#xff0c;△f15kHz&#xff0c;因此采样周期Ts1/(204815000)0.033us。 TDD的帧结构包括两个5ms的半帧&#xff0…

特殊类设计

文章目录 特殊类设计1. 请设计一个类&#xff0c;不能被拷贝2. 请设计一个类&#xff0c;只能在堆上创建对象3. 请设计一个类&#xff0c;只能在栈上创建对象4. 请设计一个类&#xff0c;不能被继承5. 单例模式5.1 设计模式5.2 单例模式(1) 饿汉模式(2) 懒汉模式 特殊类设计 1…

python如何创建自己的对冲交易算法

在这篇文章中&#xff0c;我解释了如何创建一个人工智能来每天为我进行自动交易。 随着机器学习的现代进步和在线数据的轻松访问&#xff0c;参与量化交易变得前所未有的容易。为了让事情变得更好&#xff0c;AWS 等云工具可以轻松地将交易想法转化为真正的、功能齐全的交易机器…