ES线程池设置

一文搞懂ES中的线程池 - 知乎

ES线程池设置-阿里云开发者社区

文章目录

  • 一、简介

  • 二、线程池类型

    • 2.1、fixed

    • 2.2、scaling

    • 2.3、direct

    • 2.4、fixed_auto_queue_size

  • 三、处理器设置

  • 四、查看线程池

    • 4.1、cat thread pool

    • 4.2、nodes info

    • 4.3、nodes stats

    • 4.4、nodes hot threads

    • 4.5、Java 的线程池结构

  • 五、ES的线程池实现

    • 5.1、ThreadPool 类结构与初始化

    • 5.2、fixed类型线程池构建过程

    • 5.3、scaling类型线程池构建过程

    • 5.4、direct类型线程池构建过程

    • 5.5、fixed_auto_queue_size类型线程池构建过程

  • 六、其他线程池

  • 七、思考与总结

一、简介

每个节点都会创建一系列的线程池来执行任务,许多线程池都有与其相关任务队列,用来允许挂起请求,而不是丢弃它。 下面列出目前ES版本中的线程池。

官网: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html

  • generic
    :用于通用的操作(例如,节点发现),线程池类型为 scaling。

  • index
    :用于index/delete操作,线程池类型为fixed,大小为处理器的数量,队列大小为200,允许设置的最大线程数为1+处理器数量。

  • search
    :用于count/search/suggest操作。线程池类型为fixed, 大小为int((处理器数量3)/2)+1,队列大小为1000。

  • get
    :用于get操作。线程池类型为fixed, 大小为处理器的数量,队列大小为1000。

  • bulk
    :用于bulk操作,线程池类型为fixed,大小为处理器的数量,队列大小为200,该线程池允许设置的最大线程数为1+处理器数量。

  • snapshot
    :用于snaphostrestore操作。线程池类型为scaling,线程保持存活时间为5min,最大线程数为min(5, (处理器数量)/2)。

  • warme
    :用于segment warm-up操作。线程池类型为scaling, 线程保持存活时间为5min,最大线程数为min(5, (处理器数量)/2)。

  • refresh
    :用于refresh 操作。线程池类型为scaling, 线程空闲保持存活时间为5min,最大线程数为min(10, (处理器数量)/2)。

  • listener
    :主要用于Java客户端线程监听器被设置为true时执行动作。线程池类型为scaling,最大线程数为min(10, (处理器数量)/2)。

  • same
    :在调用者线程执行,不转移到新的线程池。

  • management
    :管理工作的线程池,例如,Node info、Node tats、 List tasks等。

  • flush
    :用于索引数据的flush操作。

  • force_merge
    :顾名思义,用于Lucene分段的force merge。

  • fetch_shard_started
    :用于TransportNodesAction.

  • fetch_shard_store
    :用于TransportNodesListShardStoreMetaData。

  • thread_pool.search.size: 30
    :线程池和队列的大小可以通过配置文件进行调整,例如,为search增加线程数和队列大小:

二、线程池类型

如同任何要并发处理任务的服务程序一样,线程池要处理的任务类型大致可以分为两类:CPU计算密集型和I/O密集型。对于两种不同的任务类型,需要为线程池设置不同的线程数量。

一般说来,线程池的大小可以参考如下设置,其中N为CPU的个数:

  • 对于CPU密集型任务,线程池大小设置为N+1;

  • 对于I/O密集型任务,线程池大小设置为2N+1;

对于计算密集型任务,线程池的线程数量 一 般不应该超过N+1。如果线程数量太多,则会导致更高的线程间上下文切换的代价。加1是为了当计算线程出现偶尔的故障,或者偶尔的I/O、发送数据、写日志等情况时,这个额外的线程可以保证CPU时钟周期不被浪费。

I/O密集型任务的线程数可以稍大一些,因为I/O密集型任务大部分时间阻塞在I/O过程,适当增加线程数可以增加并发处理能力。而上下文切换的代价相对来说已经不那么敏感。但是线程数量不一定设置为2N+1,具体需要看I/O等待时间有多长。等待时间越长,需要越多的线程,等待时间越少,需要越少的线程。因此也可以参考下面的公式:

最佳线程数= ((线程等待时间 + 线程CPU时间) 线程CPU时间) * CPU数

为了应对这两种类型的任务,ES封装了以下类型的线程池。

2.1、fixed

线程池拥有固定数量的线程来处理请求,当线程空闲时不会销毁,当所有线程都繁忙时,请求被添加到队列中。

  • size
    参数用来控制线程的数量。

  • queue_size
    参数用来控制线程池相关的任务队列大小。设置为-1表示无限制。当请求到达时,如果队列已满,则请求将被拒绝。

例如:

thread_pool.search.size: 30
thread_pool.search.queue_size: 1500

2.2、scaling

scaling 线程池的线程数量是动态的,介于core和max参数之间变化。线程池的最小线程数为配置的core大小,随着请求的增加,当core数量的线程全都繁忙时,线程数逐渐增大到max数量。max是线程池可拥有的线程数.上限。当线程空闲时,线程数从max大小逐渐降低到core大小。

  • keep_alive
    参数用来控制线程在线程池中的最长空闲时间。

例如:

thread_pool.warmer.core: 1
thread_pool.warmer.max:8
thread_pool.warmer.keep_alive: 2m 

2.3、direct

这种线程池对用户并不可见,当某个任务不需要在独立的线程执行,又想被线程池管理时,于是诞生了这种特殊类型的线程池:在调用者线程中执行任务。

2.4、fixed_auto_queue_size

与fixed类型的线程池相似,该线程池的线程数量为固定值,但是队列类型不一样。 其队列大小根据利特尔法则( Little's Law) 自动调整大小。该法则的详细信息可以参考https://en.wikipedia.org/wiki/Little%27s_law。 该线程池有以下参数可以调整:

  • size
    :用于指定线程数量;

  • queue_size
    ,:指定初始队列大小; .

  • min_queue_size
    :最小队列大小;

  • max_queue_size
    :最大队列大小;

  • auto_gueue_frame_size
    : 调整队列之前进行测量的操作数;

  • target_response_time
    :一个时间值设置,用来指示任务的平均响应时间,如果任务经常高于这个时间,则线程池队列将被调小,以便拒绝任务。

该线程类型为实验性质,未来可能会移除。目前只有search线程池使用这种类型。( Deprecated in 7.7.0 and will be removed in 8.0. )

三、处理器设置

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html#node.processors

默认情况下,ES自动探测处理器数量。各个线程池的大小基于这个数量进行初始化。在某些情况下,如果想手工指定处理器数量,则可以通过设置 processors 参数实现:

processors: 2

有以下几种场景是需要明确设置processors数量的:

(1)、在同一台主机上运行多个ES实例,但希望每个实例的线程池只根据一部分CPU来设置,此时可以通过processors参数来设置处理器数量。例如,在16核的服务器上运行2个实例,可以将processors设置为8。请注意,在单台主机上运行多个实例,除了设置processors数量,还有许多更复杂的参数需要设置。例如,修改GC线程数,绑定进程到CPU等。

(2)、有时候自动探测出的处理器数量是错误的,在这种情况下,需要明确设置处理器数量。要检查自动探测的处理器数量,可以使用节点信息API中的os字段来查看。

四、查看线程池

ES提供了丰富的API查看线程池状态,在监控节点健康、排查问题时非常有用。

4.1、cat thread pool

该命令显示每个节点的线程池统计信息。默认情况下,所有线程池都返回活动、队列和被拒绝的统计信息。我们需要特别留意被拒接的信息,例如,bulk 请求被拒绝意味着客户端写入失败。在正常情况下客户端应该捕获这种错误(错误码429)并延迟重试,但有时客户端不一定对这种错误做了处理,导致写入集群的数据量低于预期值。

curl -X GET "localhost:9200/_cat/thread_pool"

返回信息如下:

在这里插入图片描述

  • active 表示当前正在执行任务的线程数量

  • queue 表示队列中等待处理的请求数量

  • rejected 表示由于队列已满,请求被拒绝的次数。

对返回结果进行过滤等更多用法可参考官方手册: https://www.elastic.co/guide/en/elasticsearch/reference/6.1/cat-thread-pool.html。

4.2、nodes info

节点信息API可以返回每个线程池的类型和配置信息,例如,线程数量、队列大小等。下面的第一条命令返回所有节点的信息,第二条命令返回特定节点的信息。.

curl -X GET "localhost:9200/_nodes"
curl -X GET "localhost:9200/_nodes/nodeId1, nodeId2"

节点信息API返回的信息非常大,其中与线程池相关信息在thread_ pool 字段中,选取部分信息如下:

"thread_pool" : {"force_merge" : {"type" : "fixed","min" : 1,"max" : 1,"queue_size" : - 1},"fetch_shard started" : {"type" : "scaling","min" : 1,"max" : 16,"keep_alive" : "5m","queue_size" : -1}
}

该命令的完整信息可参考官方手册: https://www.elastic.co/guide/en/elasticsearch/reference/6.1/cluster-nodes-info.html

4.3、nodes stats

statsAPI返回集群中一个或全部节点的统计数据。

下面的第一条命令返回所有节点的统计数据,第二条命令返回特定节点的统计数据。

curl -X GET "localhost:9200/_nodes/stats" curl -X GET "localhost:9200/_nodes/nodeId1, nodeId2/stats"

默认情况下,该API返回全部 indices、oS、process、jvm、transport、http、fs、breaker 和thread_pool 方面的统计数据。其中线程池相关的返回结果摘要如下:

"thread_pool" : {"bulk" : {"threads" : 0,"queue" : 0,"active" : 0,"rejected" : 0,"largest" : 0,"completed" : 0}
}

该命令的完整使用方式可参考官方手册: https://www.elastic.co/guide/en/elasticsearch/reference/6.1/cluster-nodes-stats.html。

4.4、nodes hot threads

该API返回集群中一个或全部节点的热点线程。

当发现节点进程占用CPU非常高时,想知道是哪些线程导致的,这些线程具体在执行什么操作,常规做法是通过top命令配合jstack来定位线程,现在ES提供了更便捷的方式,通过hot threads API可以直接返回这些信息。

下面的第一条命令返回所有节点的热点线程,第二条命令返回特定节点的热点线程。

curl -X GET "localhost:9200/_nodes/hot_threads"
curl -X GET "localhost:9200/nodes/nodeId1, nodeId2/hot_threads"

该命令支持以下参数:

  • threads
    :返回的热点线程数,默认为3。

  • interval:ES
    对线程做两次检查,来计算某个操作.上花费时间的百分比,此参数定义这个间隔时间。默认为500 ms。

  • type
    :定义要检查的线程状态类型,默认为CPU.API可以检查线程的CPU占用时间、阻塞(block)时间和等待(wait) 时间。

  • ignore_idle_threads
    :如果设置为true, 则空闲线程(例如,在套接字中等待,或者从空队列中获取任务)将被过滤。默认值为true。

其返回信息的样例如下图所示。

在这里插入图片描述

返回信息中的第一行表明这个是哪个节点的信息,以及这个节点的IP地址等。

::: {node1} {un- 9UZ4PS8-K6hF59x1MWA} {bjk2C_ _6USh0YgMYKcBWKLQ} {node1.eshost}
{10.10.13.15:9300}

接下来列出哪个线程占用较多的CPU,以及CPU的占用比:

82.2% (411.2ms out of 500ms) cpu usage by thread 'elasticsearch [node1]
[bulk] [T#1] '

最后是该线程的堆栈信息。

ES中的线程池是基于对Java线程池的封装和扩展。我们先看一下Java线程池的结构和使用方式,这些是ES内部线程原理的基础知识。

4.5、Java 的线程池结构

Java内部的线程池称为Executor框架,几个基本的类概念如下:

  • Runable定义一个要执行的任务。

  • Executor提供了execute 方法,接受一个Runable实例,用来执行一个任务。

  • ExecutorService是线程池的虚基类,继承自Executor, 提供了shutdown, shutdownNow等关闭线程池接口。

  • ThreadPoolExecutor线程池的具体实现。继承自ExecutorService,维护线程创建、线程生命周期、任务队列等。

  • EsThreadPoolExecutor是ES对ThreadPoolExecutor的扩展实现。未来会增加一些统计信息。

这几个类的继承结构如下图所示。

我们以一个简单的例子来看看Java 线程池的用法,ExecutorService 类用于保存创建的线程池实例,后续调用execute方法执行任务。在下面的例子中,任务类TestRunnable只是打印当前线程名称。

import java. util.concurrent.ExecutorService;
import java. util.concurrent.Executors;
publicclass ThreadPoolDemo {public static void main(String[] args) {//通过Executors构建一个固定大小的线程池,线程数量为2,返回线程池实例ExecutorService executorService = Executors.newFixedThreadPool (2) ;//调用 线程池的execute方法执行一个任务executorService.execute(new TestRunnable());}
}
class TestRunnable implements Runnable {public void run() {System.out.println (Thread.currentThread().getName()) ;}
}

ES内部创建线程池时,返回类型同样是ExecutorService类。接下来我们通过构建过程来看看ThreadPoolExecutor的结构,其构造函数如下:

public ThreadPoolExecutor (int corePoolSize,int maximumPoolSize,long keepAl iveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 || max.imumPoolSize <= 0 || maximumPoolSize < corePoolSize ||keepAliveTime < 0)thrownew IllegalArgumentException() ;if (workQueue == null || threadFactory == null || handler == null)thrownewNul lPointerException() ;this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue ;this.keepAliveTime = unit.toNanos (keepAliveTime) ;this.threadFactory = threadFactory;this.handler = handler;
}

几个重要参数的含义如下:

  • corePoolSize
    :线程池大小;

  • maximumPoolSize
    :最大线程数量;

  • keepAliveTime
    :线程空闲回收时间;

  • BlockingQueue
    :任务队列;

  • handler
    : 队列满,拒绝请求时的回调函数。

ThreadPoolExecutor类是Java线程池的具体实现,是整个线程池中最重要的类,ES 基于这个类进行了一些扩展。

五、ES的线程池实现

**ES中使用的线程池绝大部分封装在ThreadPool类中,**个别独立线程池的实现在本章末尾讨论。除了个别情况,在ThreadPool类中,会创建各个模块要使用的全部线程池。本章开始所讨论的几种线程池就是在ThreadPool类中创建的。

ThreadPool类创建各个线程池,要使用线程池的各个内部模块会引用ThreadPool类对象,通过其对外提供executor方法,根据线程池名称获取对应的线程池引用,进而执行某个任务。

ThreadPool对外提供的重要方法如下表所示。

在这里插入图片描述

当某个模块要在新的线程中启动任务时,典型的使用方式如下:

threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() ->beginSnapshot (newState, newSnapshot, request.partial(), listener));threadPool.executor方法返回snapshot线程池( ExecutorService类)的引用,通过线程池的execute方法执行任务,在本例中,任务的Runnable是Lambda表达式定义的。

5.1、ThreadPool 类结构与初始化

ThreadPool类对象在节点启动时初始化,在Node类的构造函数中初始化ThreadPool类:

final ThreadPool threadPool = new ThreadPool (settings, executorBuilders.toArray (new ExecutorBuilder[0]));

线程池对象构建完毕,将这个类的引用传递给其他要使用线程池的模块:

final ResourcewatcherService resourceWatcherService = new ResourceWatcherService (settings, threadPool);

线程池的名称在内部类Names中,最好记住它们的名字,有时需要通过jstack查看堆栈,ES的堆栈非常长,这就需要通过线程池的名称去查找关注的内容。

publicstaticclass Names {publicstaticfinal String SAME = "same";publicstaticfinal String GENERIC = "generic";publicstaticfinal String LISTENER = "listener";publicstaticfinal String GET = "get";publicstaticfinal String INDEX = "index" ;publicstaticfinal String BULK = "bulk";publicstaticfinal String SEARCH = "search";publicstaticfinal String MANAGEMENT = "management" ;publicstaticfinal String FLUSH = "flush";publicstaticfinal String REFRESH = "refresh";publicstaticfinal String WARMER = "warmer" ;publicstaticfinal String SNAPSHOT = "snapshot";publicstaticfinal String FORCE MERGE = "force_merge";publicstaticfinal String FETCH\_SHARD\_STARTED = "fetch\_shard\_started";publicstaticfinal String FETCH\_SHARD\_STORE = "fetch\_shard\_store";
}

线程池类型由枚举类型ThreadPoolType 定义:

enum ThreadPoolType {DIRECT ("direct") ,FIXED ("fixed") ,FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size") ,SCALING ("scaling") ;
}

在ThreadPool类构造函数中,全部的线程池被初始化:

public ThreadPool (final Settings settings, final ExecutorBuilder<?> ... customBuilders) {final Map<String, ExecutorBuilder> builders = new HashMap<> () ;builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue. timeValueSeconds (30))) ;builders.put (Names. INDEX, new FixedExecutorBuilder (settings, Names.INDEX, availableProcessors, 200)) ;//index/delete操作与bulk使用同一个线程池builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200));builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)) ;builders.put (Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSiz (availableProcessors), 1000, 1000, 1000, 2000));builders.put (Names.MANAGEMENT, newScal ingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))) ;builders.put (Names.LISTENER, new FixedExecutorBuilder (settings, Names.LISTENER, halfProcMaxAt10, -1));builders.put (Names.FLUSH, new ScalingExecutorBuilder (Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))) ;builders.put (Names . REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))) ;builders.put (Names.WARMER, newScal ingExecutorBuilder (Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))) ;builders.put (Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))) ;builders.put (Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))) ;builders.put (Names.FORCE_MERGE, new FixedExecutorBuilder (settings, Names.FORCE_MERGE, 1, -1) ) ;builders.put (Names.FETCH_SHARD_STORE, newScal ingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))) ;
}

这些线程池构建成功后,最终保存到一个map结构中,map 列表根据builders信息构建, 将SAME线程池单独添加进去。

Map<String, ExecutorHolder> executors

当某个模块使用线程池时,通过线程池名称从这个map中获取对应的线程池。

public ExecutorService executor (String name) {final ExecutorHolder holder = executors.get(name);return holder.executor() ;
}

map 中的值: ExecutorHolder 是 ThreadPool 内部类,它只是简单封装了一下ExecutorService。

class ExecutorHolder {privatefinal ExecutorService executor ;//Info类主要是线程池名称、类型、队列大小、线程数量的max和min、keepAlive时间publicfinal Info info;
}

5.2、fixed类型线程池构建过程

FіхеdЕхесutоrВuіldеr 类用于fixed类型的线程池构建,它的主要实现是通过 ЕѕЕхесutоrѕ.newFixed 方法构建一个ExecutorService。由于是fixed类型的线程池,因此EsThreadPoolExecutor传入的corePoolSize 和 maximumPoolSize 的大小相同。

public static EsThreadPoolExecutor newFixed (String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) {//使用有限或无限大小的阻塞队列初始化线程池队列BlockingQueue<Runnable> queue;if (queueCapacity < 0) fqueue = Concur rentCollections . newBlockingQueue () ;} else {queue =new SizeBlockingQueue<> (ConcurrentCollections.<Runnable> newBlockingQueue(), queueCapacity) ;}//创建线程池returnnew EsThreadPoolExecutor (name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder) ;
}

5.3、scaling类型线程池构建过程

ScalingExecutorBuilder用于 scaling类型线程池的构建,它的主要实现是通过EsExecutors.newScaling方法创建一个ExecutorService, min 和 max 分别对应corePoolSize 和 maximumPoolSize。

public static EsThreadPoolExecutor newScaling (String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {//创建线程队列ExecutorScal ingQueue<Runnable> queue = new ExecutorScalingQueue<>() ;//min corePoolSize, max maximumPoolSizeEsThreadPoolExecutor executor = new EsThreadPoolExecutor (name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder) ;queue.executor = executor;return executor ;
}

5.4、direct类型线程池构建过程

direct类型的线程池没有通过*ExecutorBuilder类创建,而是通过EsExecutors.newDirectExecutorService
方法直接创建的,该方法中直接返回一个定义好的简单的线程池DIRECT_EXECUTOR_SERVICE
。该线程池的实现如下,在execute方法中直接运行这个任务,因此任务在调用者所执行。

privatestaticfinal ExecutorService DIRECT\_EXECUTOR\_SERVICE = new AbstractExecutorService() {//不支持关闭public void shutdown() {thrownew UnsupportedOperationException() ;}//不支持中止public boolean awaitTermination (long timeout,TimeUnit unit) throws InterruptedException {thrownewUnsuppor tedOperationException() ;}//直接调用任务的运行方法,因此在调用者的线程中执行任务public void execute (Runnable command) {command.run () ;}
};

5.5、fixed_auto_queue_size类型线程池构建过程

该类型的线程池通过AutoQueueAdjustingExecutorBuilder 类构建,构建过程的主要实现是通过EsExecutors.newAutoQueueFixed
方法创建一个ExecutorService
。 该线程池的队列是一个大小可调整的队列,而QueueResizingEsThreadPoolExecutor
继承自EsThreadPoolExecutor
, 在它的基础上实现了动态调整队列大小的利特尔法则( Little's Law)。

public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize, int maxQueueSize, int frameSize, TimeValue targetedResponseTime, ThreadFactory threadFactory, ThreadContext contextHolder) {//初始化大小必须大于1if (initialQueueCapacity <= 0) {thrownew IllegalArgumentException() ;}//初始化一个动态调整的队列ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<> (ConcurrentCollections. <Runnable>newBlockingQueue() , initialQueueCapacity) ;//创建线程池returnnewQueueResi Z ingEsThreadPool Executor (name, size, size, 0, TimeUnit.MILLISECONDS, queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, new EsAbortPolicy(), contextHolder) ;
}

六、其他线程池

除了ThreadPool中封装的各种线程池,ES中还有一种支持优先级的线程池: PrioritizedEs-ThreadPoolExecutor
,这个线程池同样继承自EsThreadPoolExecutor 类。目前,只有主节点执行集群任务,以及从节点应用集群状态时使用该类型的线程池。

该线程池通过EsExecutors.newSinglePrioritizing方法构建,线程池有固定大小,线程数为1。

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing (String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {//corePoolSize和maximumPoolSize都为1returnnew PrioritizedEsThreadPoolExecutor (name, 1,1,0L, TimeUnit.MILLISECONDS,threadFactory, contextHolder, timer) ;
}

在PrioritizedEsThreadPoolExecutor类的构造函数中,会为线程池创建一个支持优先级的队列: PriorityBlockingQueue,该队列是JDK中的实现,初始大小为11,最大为Integer.MAX_VALUE-8,基本是无限的。关于该队列的更多信息可以参考JDK手册: htps://ocs.oracle.com/javase/10/docs/api/java/util/concurrentPriorityBlockingQueue.html。

七、思考与总结

  • (1) 每种不同类型的线程池有各自不同的队列类型。scaling类型的线程池动态维护线程池数量。fixed_auto_queue_size 与fix类型的线程池都有固定的线程数。

  • (2) ThreadPool 类在节点启动时初始化,然后将类的引用传递给其他模块,其他模块通过明确指定线程名称从ThreadPool类中获取对应的线程池,然后执行自定义的任务。

  • (3)节点关闭时,对线程池模块先调用shutdown,等待10秒后,执行shutdownNow。因此线程池中的任务有机会执行完毕,但在超时后会尝试终止线程池中的任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/62479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python爬虫】8.温故而知新

文章目录 前言回顾前路代码实现体验代码功能拆解获取数据解析提取数据存储数据 程序实现与总结 前言 Hello又见面了&#xff01;上一关我们学习了爬虫数据的存储&#xff0c;并成功将QQ音乐周杰伦歌曲信息的数据存储进了csv文件和excel文件。 学到这里&#xff0c;说明你已经…

nvm集合node版本,解决新版本jeecgboot3.5.3前端启动失败问题

jeecgboot前端3.5.3页面如下 使用之前的pnpm启动会报错&#xff0c;pnpm是node进行安装的&#xff0c;查询后发现&#xff0c;vue3版本的页面至少需要node16版本&#xff0c;我之前的版本只有15.5&#xff0c;适用于vue2 那么我将先前的node15.5版本删除&#xff0c;然后安装…

【Hadoop】DataNode 详解

&#x1f341; 博主 "开着拖拉机回家"带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,Java基础学习,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341; 希望本文能够给您带来一定的…

【解决】idea启动spring MVC报错:一个或多个listeners启动失败Listener ClassNotFoundException

idea配置教程。tomcat调试报错Artifact :war exploded: Error during artifact deployment。 修改代码后&#xff0c;启动不生效&#xff0c;仍是旧代码。 根本原因是&#xff1a; Modules output path和Artifacts output directory不匹配 Modules output path一定要等于Ar…

网络编程 day 5

1、根据select TCP服务器流程图编写服务器 #include <myhead.h>#define ERR_MSG(msg) do{\fprintf(stderr, "__%d__:", __LINE__); \perror(msg);\ }while(0)#define PORT 8888 //端口号&#xff0c;范围1024~49151 #define IP "192.168.…

Flink SQL你用了吗?

分析&回答 Flink 1.1.0&#xff1a;第一次引入 SQL 模块&#xff0c;并且提供 TableAPI&#xff0c;当然&#xff0c;这时候的功能还非常有限。Flink 1.3.0&#xff1a;在 Streaming SQL 上支持了 Retractions&#xff0c;显著提高了 Streaming SQL 的易用性&#xff0c;使…

linuxdeploy安装CentOS7搭建django服务

目录 一、busybox安装 二、linuxdeploy安装 三、linuxdeploy软件设置及安装 四、CentOS基础环境配置 五、CentOS7 上安装Python3.8.10 六、systemctl的替代品 七、CentOS7 上安装mysql5.2.27数据库 八、CentOS7 上安装Nginx服务 九、Django项目应用部署 参考文献: 一…

《Python入门到精通》webbrowser模块详解,Python webbrowser标准库,Python浏览器控制工具

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;小白零基础《Python入门到精通》 webbrowser模块详解 1、常用操作2、函数大全webbrowser.open() 打开浏览器webbro…

线程同步与互斥

目录 前言&#xff1a;基于多线程不安全并行抢票 一、线程互斥锁 mutex 1.1 加锁解锁处理多线程并发 1.2 如何看待锁 1.3 如何理解加锁解锁的本质 1.4 CRAII方格设计封装锁 前言&#xff1a;基于线程安全的不合理竞争资源 二、线程同步 1.1 线程同步处理抢票 1.2 如何…

Python爬虫(十七)_糗事百科案例

糗事百科实例 爬取糗事百科段子&#xff0c;假设页面的URL是: http://www.qiushibaike.com/8hr/page/1 要求&#xff1a; 使用requests获取页面信息&#xff0c;用XPath/re做数据提取获取每个帖子里的用户头像连接、用户姓名、段子内容、点赞次数和评论次数保存到json文件内…

实现不同局域网文件共享的解决方案:使用Python自带HTTP服务和端口映射

文章目录 1. 前言2. 本地文件服务器搭建2.1 python的安装和设置2.2 cpolar的安装和注册 3. 本地文件服务器的发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1. 前言 数据共享作为和连接作为互联网的基础应用&#xff0c;不仅在商业和办公场景有广泛的应用…

2023.8.26-2023.9.3 周报【3D+GAN+Diffusion基础知识+训练测试】

目录 学习目标 学习内容 学习时间 学习产出 学习目标 1. 3D方向的基础知识 2. 图像生成的基础知识&#xff08;GAN \ Diffusion&#xff09; 3. 训练测试GAN和Diffusion 学习内容 1. 斯坦福cv课程-3D &#xff08;网课含PPT&#xff09; 2. sjtu生成模型课件 3. ge…

TCP--半连接队列和全连接队列

原文地址&#xff1a;https://plantegg.github.io/2020/04/07/%E5%B0%B1%E6%98%AF%E8%A6%81%E4%BD%A0%E6%87%82TCP–%E5%8D%8A%E8%BF%9E%E6%8E%A5%E9%98%9F%E5%88%97%E5%92%8C%E5%85%A8%E8%BF%9E%E6%8E%A5%E9%98%9F%E5%88%97–%E9%98%BF%E9%87%8C%E6%8A%80%E6%9C%AF%E5%85%AC%E…

【OpenCL基础 · 一】因源

文章目录 前言一、单核标量处理器的瓶颈1.提升时钟频率2.提升指令级并行能力 二、多核和向量化1.多核2.向量化 三、异构并行和OpenCL1.GPGPU2.CUDA和OpenCL 前言 随着人工智能的发展以及大部分场景中实时性的要求&#xff0c;人们对于计算机算力要求逐渐增加。为了提高计算速度…

【ES】笔记-Promise基本使用

笔记-基本使用 一、初始Promise1. 抽象表达:2. 具体表达:为什么要用 Promise?promise的基本流程 二、fs读取文件三、AJAX请求四、Promise封装fs模块五、util.promisify方法六、Promise封装AJAX操作 一、初始Promise 1. 抽象表达: 1. Promise 是一门新的技术(ES6 规范) 2. Pr…

短视频矩阵系统接口部署技术搭建

前言 短视频矩阵系统开发涉及到多个领域的技术&#xff0c;包括视频编解码技术、大数据处理技术、音视频传输技术、电子商务及支付技术等。因此&#xff0c;短视频矩阵系统开发人员需要具备扎实的计算机基础知识、出色的编程能力、熟练掌握多种开发工具和框架&#xff0c;并掌握…

C++(17):异常处理

异常处理机制允许程序中独立开发的部分能够在运行时就出现的问题进行通信并做出相应的处理。 异常使得能够将问题的检测与解决过程分离开来&#xff1a;程序的一部分负责检测问题的出现&#xff0c;然后解决该问题的任务传递给程序的另一部分。检测环节无须知道问题处理模块的…

数据结构入门 — 栈

本文属于数据结构专栏文章&#xff0c;适合数据结构入门者学习&#xff0c;涵盖数据结构基础的知识和内容体系&#xff0c;文章在介绍数据结构时会配合上动图演示&#xff0c;方便初学者在学习数据结构时理解和学习&#xff0c;了解数据结构系列专栏点击下方链接。 博客主页&am…

【C++】关于using namepace xxx 使用命名空间和冲突

官方定义 namespace是指 标识符的各种可见范围。命名空间用关键字namespace来定义。 命名空间是C的一种机制&#xff0c;用来把单个标识符下的大量有逻辑联系的程序实体组合到一起。此标识符作为此组群的名字。 基本使用 编译及执行命令&#xff1a; g test.cpp -o test ./…

汽车制造行业,配电柜如何实施监控?

工业领域的生产过程依赖于高效、稳定的电力供应&#xff0c;而配电柜作为电力分配和控制的关键组件&#xff0c;其监控显得尤为重要。 配电柜监控通过实时监测、数据收集和远程控制&#xff0c;为工业企业提供了一种有效管理电能的手段&#xff0c;从而确保生产的连续性、安全性…