Dubbo线程池

前言

    Dubbo使用Netty作为网络调用框架,Netty是一个Reactor模型的框架,线程模型分为boss线程池和worker线程池,boss线程池负责监听、分配事件,worker线程池负责处理事件,简单说就是boss线程池负责hold请求,并分发到worker池,worker线程池负责处理具体事件。

    dubbo在原本的netty中的线程(boss线程和worker)做了一些修改,将其定义为io线程,而后由实现了一套用于处理业务的业务线程池,这就和上一篇介绍的Dubbo协议下的服务端线程模型产生了关联,dubbo的io线程监听请求,业务处理由dubbo自定义的线程池处理,这里将请求分发到具体的业务线程池就是由Dispatcher实现的,默认是AllDispatcher,上一篇已经简单介绍了Dubbo协议的线程池的分发模型,这篇文章就介绍下Dubbo究竟自定义了哪几种线程池的实现,并且都是怎么实现的。

  • 注:Apache Dubbo版本为3.0.7

Dubbo线程池接口ThreadPool

在这里插入图片描述

    Dubbo自定义的线程池的核心接口是org.apache.dubbo.common.threadpool.ThreadPool,并且提供了四种实现分别是CachedThreadPoolFixedThreadPoolLimitedThreadPoolEagerThreadPoolThreadPool接口是SPI的,如果不指定线程池的具体实现默认是fixed,在项目中配置如下:配置线程池类型是fixed,线程数为100,线程模型是all

 

xml

复制代码

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

ThreadPool代码如下,接下来分别简单介绍一下四种线程池的具体实现

 

java

复制代码

@SPI(value = "fixed", scope = ExtensionScope.FRAMEWORK) 
public interface ThreadPool { /** * Thread pool * * @param url URL contains thread parameter* @return thread pool */         @Adaptive({THREADPOOL_KEY}) Executor getExecutor(URL url); 
}

CachedThreadPool缓存线程池

    该线程池是缓存类型的,当空闲到一定时间时会将线程删掉,使用时再创建,具体dubbo的实现如下,代码实现很简单,就是使用JUC的ThreadPoolExecutor创建了一个缓存类型的线程池,将maximumPoolSize设置成Integer.MAX_VALUE,keepAliveTime设置成60000毫秒,队列大小设置成0,当超过任务数超过corePoolSize就会直接创建worker线程,当线程空闲60s后就会被销毁。

public class CachedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}

FixedThreadPool固定线程数的线程池

    该线程池是固定线程数的线程池实现,具体实现也是使用JUC的ThreadPoolExecutor创建了一个固定线程数的线程池,通过url中配置的threads,将corePoolSize和maximumPoolSize都设置成threads的数量,并且keepAliveTime设置成0。

public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}
}

LimitedThreadPool可伸缩线程池

    虽然叫可伸缩线程池,但是实际上只能伸不能缩,官网上说是为了突然大量的流量引起性能问题,具体实现就是将keepAliveTime设置成无限大,这样当队列满了后就会创建线程达到maximumPoolSize,新创建的这些线程因为keepAliveTime设置成无限大所以也不会销毁了。

public class LimitedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}}

EagerThreadPool

    Eager单词是渴望的,热切地的意思,这个线程池所实现的逻辑是,当任务数超过corePoolSize但小于maximumPoolSize时不是将新任务放到队列中,而是优先创建新的worker线程,当线程数已经达到maximumPoolSize,接下来新的任务才会放到阻塞队列中,阻塞队列满了会抛出RejectedExecutionException

    EagerThreadPool线程池就不是通过JUC的ThreadPoolExecutor实现的了,而是继承ThreadPoolExecutor自己实现一些逻辑,下面一步一步看。

  • EagerThreadPool

    Dubbo自己实现了阻塞队列TaskQueue和线程池EagerThreadPoolExecutor,从EagerThreadPool的代码中看不到该类型线程池的核心逻辑,核心逻辑是在TaskQueue代码中,这里跳过直接看TaskQueue代码。

public class EagerThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);// init queue and executorTaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,threads,alive,TimeUnit.MILLISECONDS,taskQueue,new NamedInternalThreadFactory(name, true),new AbortPolicyWithReport(name, url));taskQueue.setExecutor(executor);return executor;}
}
  • TaskQueue

    Dubbo的EagerThreadPool是通过TaskQueueoffer方法实现的,逻辑就是当提交到线程池任务时,如果任务数大于corePoolSize,会将任务offerTaskQueue中,这时如果活跃的线程数大于等于线程池大小,并且当前线程数小于maximumPoolSize时就会伪装成放入到队列失败,这时线程池就会创建线程,从而实现超过corePoolSize不超过maximumPoolSize时创建worker线程而不是将任务放入到队列中。

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = -2635853580887179627L;private EagerThreadPoolExecutor executor;public TaskQueue(int capacity) {super(capacity);}public void setExecutor(EagerThreadPoolExecutor exec) {executor = exec;}@Overridepublic boolean offer(Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor!");}int currentPoolThreadSize = executor.getPoolSize();// have free worker. put task into queue to let the worker deal with task.if (executor.getActiveCount() < currentPoolThreadSize) {return super.offer(runnable);}// 伪装放入队列失败,让线程池创建线程if (currentPoolThreadSize < executor.getMaximumPoolSize()) {return false;}// currentPoolThreadSize >= maxreturn super.offer(runnable);}/*** retry offer task** @param o task* @return offer success or not* @throws RejectedExecutionException if executor is terminated.*/public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if (executor.isShutdown()) {throw new RejectedExecutionException("Executor is shutdown!");}return super.offer(o, timeout, unit);}
}
  • EagerThreadPoolExecutor

    当任务数大于maximumPoolSize时,线程池会抛出RejectedExecutionExceptionEagerThreadPoolExecutor捕获这个异常,并且调用TaskQueueretryOffer方法尝试放入队列,这样就实现了当线程数已经达到maximumPoolSize,接下来新的任务才会放到阻塞队列中,阻塞队列满了会抛出RejectedExecutionException,代码如下:

 
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {public EagerThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, TaskQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overridepublic void execute(Runnable command) {if (command == null) {throw new NullPointerException();}try {super.execute(command);} catch (RejectedExecutionException rx) {// 重新尝试将任务放到队列中.final TaskQueue queue = (TaskQueue) super.getQueue();try {if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {throw new RejectedExecutionException("Queue capacity is full.", rx);}} catch (InterruptedException x) {throw new RejectedExecutionException(x);}}}
}

总结

    Dubbo实现了自定义线程池,其核心接口是ThreadPool,该接口是SPI的默认的实现是fixed,Dubbo提供了四种实现,分别是CachedThreadPoolFixedThreadPoolLimitedThreadPoolEagerThreadPool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/233712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++的static用法总结和代码示例

static用法 修饰普通变量&#xff1a;static关键字可用于修改普通变量的存储区域和生命周期&#xff0c;使其存储在静态区&#xff0c;在程序运行前就分配了空间。如果有初始值&#xff0c;将使用初始值进行初始化&#xff0c;否则系统会用默认值初始化它。修饰普通函数&#…

【算法】选择排序

1、排序逻辑 选择排序逻辑&#xff1a;对数组中的数据&#xff0c;先假定一个最小的数据下标&#xff0c;然后进行循环寻找到最小数据的下标&#xff0c;放在第一层循环的最初始位置 例&#xff1a; 从0 ~ N-1 寻找到最小值&#xff0c;放在0位置 从1~N-1 寻找到最小值 &…

自定义IDEA代码补全插件

目标&#xff1a; 对于项目中的静态方法&#xff08;主要是各种工具类里的静态方法&#xff09;&#xff0c;可以在输入方法名时直接提示相关的静态方法&#xff0c;选中后自动补全代码&#xff0c;并导入静态类。 设计&#xff1a; 初步构想&#xff0c;用户选择要导入的文…

SearchWP WordPress高级网站内容搜索插件

点击阅读SearchWP WordPress高级网站内容搜索插件原文 SearchWP WordPress高级网站内容搜索插件是一个非常强大的工具&#xff0c;可以显着增强您网站的搜索功能。通过向网站访问者提供高度相关和精确的搜索结果&#xff0c;它可以有效地简化他们的搜索过程&#xff0c;促进发…

CentOS 8离线安装telnet

下载telnet rpm安装包&#xff0c;可从https://www.rpmfind.net/linux/rpm2html/search.php?querytelnet&submitSearch…&systemcentos&arch 根据自己的操作系统下载对应的包&#xff0c;这里以CentOS8为例,分别下载如下的rtp包 xinetd-2.3.15-24.el8.x86_64.rpm…

设计师必备的Figma可视化组件库资产已更新至 7.0版本

在当今数字化时代&#xff0c;数据量呈爆炸式增长&#xff0c;大屏可视化的主要程度越来越高&#xff0c;而大屏背后的设计师们面对的挑战也越来越多&#xff0c;其中之一就是大屏可视化设计项目中的重复性元素设计。这一过程不仅耗费时间&#xff0c;还明显降低了设计团队的生…

Qt6.5类库详解:QLineEdit

哈喽大家好&#xff0c;我是20YC小二&#xff01;欢迎关注(20YC编程)&#xff0c;现在有免费《C程序员》视频教程下载哦&#xff01; ~下面开始今天的分享内容~ 1. QLineEdit介绍 QLineEdit是一个单行文本编辑器&#xff0c;允许用户输入和编辑纯文本。它提供了许多有用的编辑…

周记 从现在开始

每周笔记 2023&#xff1a;12.11 早上&#xff1a; ​ 全是课 下午&#xff1a; ​ 全是课 晚上&#xff1a; ​ 全是课 什么也没学 2023&#xff1a;12.12 早上&#xff1a; ​ 全是课 下午&#xff1a; ​ 全是课 晚上&#xff1a; ​ 全是课 什么也没学 20…

SSH的交互原理(wireshark的分析)

SSH的交换原理&#xff08;wireshark篇&#xff09; 首先要想了解ssh的交换原理&#xff0c;必须要先了解他的加密方式&#xff0c;他的加密方式是对称加密&#xff0c;和公钥加密。什么意思呢&#xff1f; 首先我们向服务器发送一个请求&#xff0c;然后服务器会发给我们他的…

GitBook安装及使用——使用 Markdown 创建你自己的博客网站和电子书

目录 前言一、依赖环境二、gitbook安装使用1.安装 gitbook-cli2.安装 gitbook3.Gitbook初始化4.创建你的文章5.修改 SUMMARY.md 和 README.md6.编译生成静态网页7.运行以便在浏览器预览8.运行效果 前言 GitBook是一个命令行工具&#xff0c;用于使用 Markdown 构建漂亮的博客网…

SSE(服务器推送事件)规范

SSE 是指 "Server-Sent Events"&#xff0c;即服务器推送事件。它是一种基于 HTTP 的服务器推送技术&#xff0c;允许服务器实时向客户端推送数据。SSE 规范定义了一种在客户端和服务器之间单向实时通信的方式&#xff0c;通常用于实现服务器向客户端推送更新、通知或…

【matlab】MATLAB常用内置函数示例

目录: 数值计算函数 1.1 abs函数 1.2 sin、cos、tan函数 1.3 exp、log函数 1.4 sqrt函数 1.5 sum、prod函数 矩阵和向量操作函数 2.1 size函数 2.2 reshape函数 2.3 repmat函数 2.4 eye函数 2.5 rand、randn函数 绘图函数 3.1 plot函数 3.2 scatter函数 3.3 bar函数 3.4 hist函…

面试题总结(十四)【系统移植】【华清远见西安中心】

你用过哪些Bootloader&#xff1f;有什么区别&#xff1f; 作为一个AI助手&#xff0c;我没有直接使用过Bootloader。但我可以为您提供一些关于常见Bootloader的信息和他们之间的区别。 1. GRUB (GRand Unified Bootloader)&#xff1a;GRUB是一个功能强大且广泛使用的Bootload…

【鉴权】JWT加密

目录 定义 官网 定义 JWT是JSON Web Token的缩写&#xff0c;是RFC7519规范。该规范目的是为了让客户端和服务端可靠的传递信息。 官网 JSON Web Tokens - jwt.io JWT是由三个部分组成&#xff0c;HMACSHA256( base64UrlEncode(header) "." base64UrlEncode(pa…

rk3568 MDIO总线

rk3568 MDIO总线 MDIO(Management Data Input/Output)是一种管理数据输入/输出协议,用于在以太网交换机和PHY(物理层收发器)之间进行通信。在网络设备中,MDIO总线用于控制网络接口的PHY芯片,例如通过MDIO总线访问PHY芯片的寄存器。这些寄存器包含了一些关于网络连接状态…

AWS 知识二:AWS同一个VPC下的ubuntu实例通过ldapsearch命令查询目录用户信息

前言&#xff1a; 前提&#xff1a;需要完成我的AWS 知识一创建一个成功运行的目录。 主要两个重要&#xff1a;1.本地windows如何通过SSH的方式连接到Ubuntu实例 2.ldapsearch命令的构成 一 &#xff0c;启动一个新的Ubuntu实例 1.创建一个ubuntu实例 具体创建实例步骤我就不…

vue el-date-picker中datetime类型对今天之后的日期包含时分禁用

vue el-date-picker中datetime类型对今天之后的日期包含时分禁用 目前对选择秒那一列未禁用 <template><div><el-date-pickerv-model"deactivateTime"type"datetime"format"yyyy-MM-dd HH:mm:ss"value-format"yyyy-MM-dd HH…

抖音直播间websocket礼物和弹幕消息推送可能出现重复的情况,解决办法

在抖音直播间里&#xff0c;通过websocket收到的礼物消息数据格式如下&#xff1a; {common: {method: WebcastGiftMessage,msgId: 7283420150152942632,roomId: 7283413007005207308,createTime: 1695803662805,isShowMsg: True,describe: 莎***:送给主播 1个入团卡,priority…

HarmonyOS4.0从零开始的开发教程17给您的应用添加通知

HarmonyOS&#xff08;十五&#xff09;给您的应用添加通知 通知介绍 通知旨在让用户以合适的方式及时获得有用的新消息&#xff0c;帮助用户高效地处理任务。应用可以通过通知接口发送通知消息&#xff0c;用户可以通过通知栏查看通知内容&#xff0c;也可以点击通知来打开应…

管理类联考——数学——真题篇——按题型分类——充分性判断题——蒙猜A/B

老规矩&#xff0c;看目录&#xff0c;平均3-5题 文章目录 A/B2023真题&#xff08;2023-19&#xff09;-A-选项特点&#xff1a;两个等号&#xff1b;-纯蒙猜-哪个长选哪个【不要用这招&#xff0c;因为两个选项&#xff0c;总会有一个长的&#xff0c;那不就大多都是A/B&…