服务容错之限流之 Tomcat 限流 Tomcat 线程池的拒绝策略

在文章开头,先和大家抛出两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

JUC 线程池

在谈 Tomcat 的线程池前,先看一下 JUC 中线程池的执行流程,这里使用《Java 并发编程的艺术》中的一张图:
在这里插入图片描述

即执行流程为:

  1. 收到提交任务
  2. 当前线程数小于核心线程数,创建一个新的线程来执行任务
  3. 当前线程数大于等于核心线程数,
    • 如果阻塞队列未满,将任务存储到队列
    • 如果阻塞队列已满
      • 如果当前线程数小于最大线程数,则创建一个线程来执行新提交的任务
      • 如果当前线程数大于等于最大线程数,执行拒绝策略

可以看到设计思想是任务可以等待执行,但要尽量少的创造过多线程。如果队列很大,则很难扩大到最大线程数,同时会有大量的任务等待。

Tomcat 线程池分析

Tomcat 线程池是在 LifeCycle 中创建的。跳过前面繁琐的流程,直接看 org.apache.tomcat.util.net.NioEndpoint#startInternal

    /*** Start the NIO endpoint, creating acceptor, poller threads.*/@Overridepublic void startInternal() throws Exception {if (!running) {running = true;paused = false;processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getProcessorCache());eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getEventCache());nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getBufferPool());// Create worker collectionif ( getExecutor() == null ) {createExecutor();}initializeConnectionLatch();// Start poller threadspollers = new Poller[getPollerThreadCount()];for (int i=0; i<pollers.length; i++) {pollers[i] = new Poller();Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();}startAcceptorThreads();}}

再看 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor

    public void createExecutor() {internalExecutor = true;TaskQueue taskqueue = new TaskQueue(); //无界队列TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);taskqueue.setParent( (ThreadPoolExecutor) executor);}

要注意这里的 ThreadPoolExecutor 不是 JUC 里面的 java.util.concurrent.ThreadPoolExecutor,而是 Tomcat 的 org.apache.tomcat.util.threads.ThreadPoolExecutor,它继承了 JUC 的 java.util.concurrent.ThreadPoolExecutor

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {...
}

查看它的构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());//提前启动核心线程prestartAllCoreThreads();}

可以发现它在构造的时候就会启动核心线程,而 java.util.concurrent.ThreadPoolExecutor 则是需要手动启动。而阻塞队列使用是 org.apache.tomcat.util.threads.TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = 1L;private volatile ThreadPoolExecutor parent = null;// No need to be volatile. This is written and read in a single thread// (when stopping a context and firing the  listeners)private Integer forcedRemainingCapacity = null;public TaskQueue() {super();}...
}

而在创建 org.apache.tomcat.util.threads.TaskQueue 的时候,并没有传递 capacity,也就是说 Tomcat 的线程池使用的是无界队列。

接下来看一下最核心的org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)

/*** {@inheritDoc}*/@Overridepublic void execute(Runnable command) {//重载 java.util.concurrent.ThreadPoolExecutor#executeexecute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}

本质上还是执行的 java.util.concurrent.ThreadPoolExecutor#execute 方法:

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// Tomcat 中这块逻辑不会执行,因为构造时已经初始化了核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}//强制入队public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

这里的 workQueueorg.apache.tomcat.util.threads.TaskQueueorg.apache.tomcat.util.threads.TaskQueue#offer

    @Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) return super.offer(o);//we are maxed out on threads, simply queue the object//当前线程数达到最大,任务入队if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);//we have idle threads, just add it to the queue//如果已提交未执行完的任务数小于当前线程数(来了任务先+1,再入队,执行完才-1,说明还有空闲的worker线程),任务入队if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);//if we have less threads than maximum force creation of a new thread// 如果当前线程数小于最大线程数量,则直接返回false,java.util.concurrent.ThreadPoolExecutor#execute 会创建新的线程来执行任务if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;//if we reached here, we need to add it to the queue//任务入队(当前线程数大于最大线程数)return super.offer(o);}

再看下拒绝策略,结合 java.util.concurrent.ThreadPoolExecutor#execute 方法,需要 java.util.concurrent.ThreadPoolExecutor#addWorker 返回 false 才会触发,即达到了最大线程数才会触发,而 org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable) 在触发了拒绝策略后还有一个特殊处理:

					//如果是 TaskQueueif (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {//强制入队if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else { //非 TaskQueue 直接触发拒绝策略submittedCount.decrementAndGet();throw rx;}

再看 org.apache.tomcat.util.threads.TaskQueue#force(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

说白了就是直接入队(无界队列):

    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) { //capacity是Integer最大值if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}

这么看,Tomcat 的线程池基本上不会触发拒绝策略。可以写个例子试一下:

package blog.dongguabai.others.tomcat_threadpool;import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @author dongguabai* @date 2023-11-18 22:04*/
public class Demo {public static void main(String[] args) {//无界队列TaskQueue taskqueue = new TaskQueue();TaskThreadFactory tf = new TaskThreadFactory("dongguabai_blog" + "-exec-", false, 2);final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, taskqueue, tf);taskqueue.setParent(executor);observe(executor);while (true) {executor.execute(new Runnable() {public void run() {excuteForever();}});}}private static void observe(final ThreadPoolExecutor executor) {Runnable task = new Runnable() {public void run() {while (true) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new Date().toLocaleString() + "->" + executor.getQueue().size());}}};new Thread(task).start();}public static void excuteForever() {while (true) {}}
}

输出:

2023-11-18 22:18:27->6541506
2023-11-18 22:18:34->14395417
2023-11-18 22:18:37->25708908
2023-11-18 22:18:50->32014458
2023-11-18 22:19:07->47236736
2023-11-18 22:19:10->65616058
2023-11-18 22:19:32->66856933
...

可以看到,队列里的任务都有六千多万了,还没有触发拒绝策略,线程池还是可以继续接收任务。

当然我们也是可以自定义的,只需要重写 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 即可:

    public Executor getExecutor() { return executor; }

org.apache.tomcat.util.net.NioEndpoint#startInternal 会进行判断:

@Overridepublic void startInternal() throws Exception {if (!running) {...if ( getExecutor() == null ) {createExecutor(); //如果没有自定义实现,就会使用默认实现}}...}

Tomcat 默认线程池优先创建线程执行任务,达到了最大线程数,不会直接执行拒绝策略,而是尝试返回等待队列,但由于等待队列的容量是 Integer 最大值,所以几乎不会触发拒绝策略。

最后

最后再回过头看文章开头的两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?

    Tomcat 的确可以用来做限流,比如可以控制最大线程数,这样后续的任务均会在队列等待,并不会执行。org.apache.tomcat.util.net.AbstractEndpoint#setMaxConnectionsConnector 的角度设置,这块不在本文探讨范围之内。

    虽然基于 Tomcat 的限流是一种可能的方案,但在实际应用中,我们通常会选择其他的层次来实现服务限流:

    • 可扩展性:基于 Tomcat 的限流方案通常只能在单个服务实例上工作,且只能针对HTTP/HTTPS协议的请。而在微服务或者分布式系统中,我们可能需要分布式限流方案和针对多协议的 限流。
    • 灵活性:在应用层或者分布式系统层实现的限流方案通常可以提供更多的配置选项和更精细的控制。例如,请求的资源、来源或者其他属性来进行限流。
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

    Tomcat 默认无限队列,难以触发拒绝策略,所以会有内存泄漏的风险。可以基于 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 自定义线程池进行控制。

References

  • 《Java 并发编程的艺术》

欢迎关注公众号:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/147114.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java中利用OpenCV进行人脸识别

OpenCV 概述 ​ OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源计算机视觉库&#xff0c;它提供了丰富的工具和算法&#xff0c;用于处理图像和视频数据。该库由一系列高效的计算机视觉算法组成&#xff0c;涵盖了许多领域&#xff0c;包括目…

修改服务器端Apache默认根目录

目标&#xff1a;修改默认Apache网站根目录 /var/www/html 一、找到 DocumentRoot “/var/www/html” 这一段 apache的根目录&#xff0c;把/var/www/html 这个目录改 #DocumentRoot "/var/www/html" DocumentRoot "/home/cloud/tuya_mini_h5/build" 二、…

小程序如何添加打印机来打印订单

在采云小程序中&#xff0c;支持打印订单的小票、标签、发货单和电子面单。小票打印机用于打印小票&#xff0c;类似于超市小票、外卖小票等。标签打印机用于打印商品标签&#xff0c;类似于奶茶上面粘贴的标签&#xff0c;用于表示饮料名称和规格等。货单打印机用于打印发货单…

【好奇心驱动力】ESP8266驱动SG90舵机开关灯

0.前言 ESP8266弄丢了好几个都忘记放在哪&#xff0c;重新买了个typeC接口的方便多了&#xff0c;看到驱动SG90舵机作为智能开关&#xff0c;简单复现了一下&#xff0c;代码比较简单&#xff0c;没有连接小爱同学或者其他语音助手。 1.实验方法 ESP8266连接SG90舵机&#x…

32位单片机PY32F040,主频72M,外设丰富,支持断码LCD

PY32F040 系列微控制器采用高性能的 32 位 ARM Cortex-M0 内核,宽电压工作范围的 MCU。嵌入高达 128 Kbytes flash 和 16 Kbytes SRAM 存储器,最高工作频率 72 MHz。LQFP64封装两块出头就可以拿到&#xff0c;我们还有开发板和开发资料帮助客户更好的开发。 PY32F040 系列微控…

C# NAudio 音频库

C# NAudio 音频库 NAudio安装NAudio简述简单示例1录制麦克风录制系统声卡WAV格式播放MP3格式播放AudioFileReader读取播放音频MediaFoundationReader 读取播放音频 NAudio安装 项目>NuGet包管理器 搜索NAudio点击安装&#xff0c;自动安装依赖库。 安装成功后工具箱会新增…

视频推拉流EasyDSS直播点播平台获取指定时间快照的实现方法

视频推拉流直播点播系统EasyDSS平台&#xff0c;可提供流畅的视频直播、点播、视频推拉流、转码、管理、分发、录像、检索、时移回看等功能&#xff0c;可兼容多操作系统&#xff0c;在直播点播领域具有广泛的场景应用。为了便于用户集成、调用与二次开发。 今天我们来介绍下在…

KT148A语音芯片使用串口uart本控制的完整说明_包含硬件和指令举例

一、功能简介 KT148A肯定是支持串口的&#xff0c;有客户反馈使用一线还是不方便&#xff0c;比如一些大型的系统不适合有延时的操作&#xff0c;所以更加倾向于使用uart控制&#xff0c;这里我们也给出解决方案 延伸出来另外一个版本&#xff0c;KT158A 注意次版本芯片还是…

ArcGIS Maps SDK for JS:监听图层的visible属性

文章目录 1 问题描述2 解决方案3 拓展 1 问题描述 近期有这么一个需求。在 ArcGIS Maps SDK for JavaScript 中&#xff0c;使用图层的visible属性同步显示某个组件&#xff0c;即打开图层时显示组件&#xff0c;关闭图层时隐藏组件。 首先想到的是&#xff0c;通过点击图层列…

HTTP 到 HTTPS 再到 HSTS 的转变

近些年&#xff0c;随着域名劫持、信息泄漏等网络安全事件的频繁发生&#xff0c;网站安全也变得越来越重要&#xff0c;也促成了网络传输协议从 HTTP 到 HTTPS 再到 HSTS 的转变。 HTTP HTTP&#xff08;超文本传输协议&#xff09; 是一种用于分布式、协作式和超媒体信息系…

OpenAI的多函数调用(Multiple Function Calling)简介

我在六月份写了一篇关于GPT 函数调用&#xff08;Function calling) 的博客https://blog.csdn.net/xindoo/article/details/131262670&#xff0c;其中介绍了函数调用的方法&#xff0c;但之前的函数调用&#xff0c;在一轮对话中只能调用一个函数。就在上周&#xff0c;OpenAI…

Ubuntu22.04 部署Mqtt服务器

1、打开Download EMQX &#xff08;www.emqx.io&#xff09;下载mqtt服务器版本 2、Download the EMQX repository curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash 3.Install EMQX sudo apt-get install emqx 4.Run EMQX sudo systemctl start…

开源与闭源:数字化时代的辩论与未来走向

在当今的数字化时代&#xff0c;关于开源和闭源软件的辩论一直是技术界的热门话题。 特斯拉CEO马斯克最近也加入了这场辩论&#xff0c;公开表示OpenAI不应该闭源&#xff0c;而他自己的首款聊天机器人将选择开源。 这引发了人们对开源与闭源软件的进一步思考&#xff1a;开源是…

《全程软件测试 第三版》拆书笔记

第一章 对软件测试的全面认识&#xff0c;测试不能是穷尽的 软件测试的作用&#xff1a; 1.产品质量评估&#xff1b;2.持续质量反馈&#xff1b;3.客户满意度提升&#xff1b;4.缺陷的预防 正反思维&#xff1a;正向思维&#xff08;广度&#xff0c;良好覆盖面&#xff09;逆…

sql注入 [极客大挑战 2019]LoveSQL 1

打开题目 几次尝试&#xff0c;发现输1 1"&#xff0c;页面都会回显NO,Wrong username password&#xff01;&#xff01;&#xff01; 只有输入1&#xff0c;页面报错&#xff0c;说明是单引号的字符型注入 那我们万能密码试试能不能登录 1 or 11 # 成功登录 得到账号…

系列六、JVM的内存结构【栈】

一、产生背景 由于跨平台性的设计&#xff0c;Java的指令都是根据栈来设计的&#xff0c;不同平台的CPU架构不同&#xff0c;所以不能设计为基于寄存器的。 二、概述 栈也叫栈内存&#xff0c;主管Java程序的运行&#xff0c;是在线程创建时创建&#xff0c;线程销毁时销毁&…

从0开始学习JavaScript--JavaScript 循环与迭代详解

JavaScript中的循环和迭代是编写高效和灵活代码的关键。循环用于重复执行一段代码&#xff0c;而迭代则用于遍历数据结构。本文将深入研究JavaScript中常见的循环结构和迭代方法&#xff0c;并通过丰富的示例代码来帮助读者更好地理解和运用这些概念。 基本的for循环 for循环…

el-table固定表头(设置height)出现内容过多时不能滚动问题

主要原因是el-table没有div包裹 解决&#xff1a;加一个div并设置其高度和overflow 我自己的主要代码 <div class"contentTable"><el-tableref"table":data"tableData"striperow-dblclick"onRowDblclick"height"100%&q…

B031-网络编程 Socket Http TomCat

目录 计算机网络网络编程相关术语IP地址ip的概念InerAdress的了解与测试 端口URLTCP、UDP和7层架构TCPUDPTCP与UDP的区别和联系TCP的3次握手七层架构 Socket编程服务端代码客户端代码 http协议概念Http报文 Tomcat模拟 计算机网络 见文档 网络编程相关术语 见文档 IP地址 …

3.3 Windows驱动开发:内核MDL读写进程内存

MDL内存读写是一种通过创建MDL结构体来实现跨进程内存读写的方式。在Windows操作系统中&#xff0c;每个进程都有自己独立的虚拟地址空间&#xff0c;不同进程之间的内存空间是隔离的。因此&#xff0c;要在一个进程中读取或写入另一个进程的内存数据&#xff0c;需要先将目标进…