Elasticsearch 8.9 Master节点处理请求源码

大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)

在这里插入图片描述

这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上

  • 一、Master节点处理请求的逻辑
    • 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
    • 2、Master节点处理来自客户端的请求(以创建索引请求举例)
      • (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
      • (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
  • 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
    • 1、首先通过nodeClient执行doExecute()
    • 2、创建一个task任务异步执行TransportAction
    • 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
    • 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

一、Master节点处理请求的逻辑

不是所有的请求都需要Master节点处理,但是有些请求必须让Master节点处理,比如创建index,下面的3就是用创建索引做的示例

1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest

主节点在 Elasticsearch 集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的 MasterNodeRequest 实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的 MasterNodeRequest 实现类的类型,执行相应的操作

/*** A based request for master based operation.* 在master上*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout = in.readTimeValue();}@Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/@SuppressWarnings("unchecked")public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout = timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;}
}

这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接

2、Master节点处理来自客户端的请求(以创建索引请求举例)

(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)

至于请求如何找到RestCreateIndexAction的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码

@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {//省略代码  @Overridepublic List<Route> routes() {return List.of(new Route(PUT, "/{index}"));}@Overridepublic String getName() {return "create_index_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() == RestApiVersion.V_7) {createIndexRequest = prepareRequestV7(request);} else {createIndexRequest = prepareRequest(request);}return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));}//省略代码  
}    

(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法

TransportMasterNodeAction 主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的 TransportMasterNodeAction 实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。


/*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {//省略代码     }
/*** 创建索引操作*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}
}    

二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)

这个场景为主节点和数据节点分离的情况

1、首先通过nodeClient执行doExecute()

client.admin().indices().createcreate方法调用IndicesAdmin类的create方法,再调用execute方法的入参是 CreateIndexAction.INSTANCE

static class IndicesAdmin implements IndicesAdminClient {@Overridepublic void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {execute(CreateIndexAction.INSTANCE, request, listener);}@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(ActionType<Response> action,Request request) {return client.execute(action, request);}}

调用的是AbstractClientexecute方法

  /*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,Request request,ActionListener<Response> listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}

doExecute方法调用的是NodeClient类的方法

  @Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,Request request,ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。*/public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,Request request,ActionListener<Response> listener) {//注册并执行任务return taskManager.registerAndExecute("transport",transportAction(action),request,localConnection,new SafelyWrappedActionListener<>(listener));}   

之后调用TaskManager.java的方法

2、创建一个task任务异步执行TransportAction

public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(String type,TransportAction<Request, Response> action,Request request,Transport.Connection localConnection,ActionListener<Response> taskListener) { //检查请求是否有父任务,如果有,则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode = null;}//创建一个新的跟踪上下文try (var ignored = threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务,并捕获可能的取消任务异常。try {task = register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作,并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener<>() {@Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况,取消子任务并释放资源。@Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}@Overridepublic String toString() {return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";}private void release() {Releasables.close(unregisterChildNode, () -> unregister(task));}});//返回任务对象。return task;}}

下面是TransportAction.java类中的方法

    /*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时,请使用此方法*/public final void execute(Task task, Request request, ActionListener<Response> listener) {final ActionRequestValidationException validationException;//对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。try {validationException = request.validate();} catch (Exception e) {assert false : new AssertionError("validating of request [" + request + "] threw exception", e);logger.warn("validating of request [" + request + "] threw exception", e);listener.onFailure(e);return;}if (validationException != null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}//创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}
 @Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {//`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch (Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}

this.action.doExecute(task, request, listener);action对应的是TransportMasterNodeAction

3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法

TransportMasterNodeAction继承HandledTransportAction
HandledTransportAction继承自TransportAction

public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);}
}    
 protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));}
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);}
//子类实现   
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception;

4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

其中创建索引的actionTransportCreateIndexAction

 @Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

之后调用createIndexService.createIndex创建索引

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/113743.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言常量

常量的概念 常量&#xff1a; 在程序运行期间不可以该改变的量 作用&#xff1a; 用于记录程序中不可更改的数据 常量的五种表现形式 语法&#xff1a; 1、数值常量&#xff08;整数型常量&#xff08;整数&#xff09;、实数型常量&#xff08;小数&#xff09;&#xff09…

C++中的智能指针:更安全、更便利的内存管理

在C++编程中,动态内存管理一直是一个重要且具有挑战性的任务。传统的C++中,程序员需要手动分配和释放内存,这往往会导致内存泄漏和悬挂指针等严重问题。为了解决这些问题,C++11引入了智能指针(Smart Pointers)这一概念,它们是一种高级的内存管理工具,可以自动管理内存的…

【数据结构】线性表(四)双向链表的各种操作(插入、删除、查找、修改、遍历打印)

目录 线性表的定义及其基本操作&#xff08;顺序表插入、删除、查找、修改&#xff09; 四、线性表的链接存储结构 1. 单链表 2. 循环链表 3. 双向链表 a. 双向链表节点结构 b. 创建一个新的节点 c. 在链表末尾插入节点 d. 在指定位置插入节点 e. 删除指定位置的节点…

LeetCode 2172. 数组的最大与和【状压DP,记忆化搜索;最小费用最大流】2392

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

Juniper防火墙SSG-140 session 过高问题

1.SSG-140性能参数 2.问题截图 3.解决方法 &#xff08;1&#xff09;通过telnet 或 consol的方法登录到防火墙&#xff1b; &#xff08;2&#xff09;使用get session 查看总的session会话数&#xff0c;如果大于300 一般属于不正常情况 &#xff08;3&#xff09;使用get…

Python的编辑器VScode中文设置和Hello World

Python的编辑器 个人比较常用的用于Python开发的编辑器是VScode&#xff0c;大概的原因应该是免费&#xff0c;且便于项目文件的管理。 VScode中文设置插件及使用方法 VScode下载安装好之后&#xff0c;可以在软件左侧的“扩展”中搜索安装一些插件&#xff0c;用于辅助开发…

H5随机短视频滑动版带打赏源码,可封装APP软件或嵌入式观看

H5随机短视频滑动版带打赏源码&#xff0c;可封装APP软件或嵌入式观看&#xff0c;网站引流必备源码&#xff01; 数据来源抖音和快手官方短视频链接&#xff0c;无任何违规内容&#xff01;可自行添加广告等等&#xff01; 手机端完美支持滑动屏幕观看&#xff08;向上或向右…

思维模型 上瘾模型(hook model)

本系列文章 主要是 分享 思维模型&#xff0c;涉及各个领域&#xff0c;重在提升认知。你到底是怎么上瘾&#xff08;游戏/抖音&#xff09;的&#xff1f;我们该如何“积极的上瘾”&#xff1f;让我们来一切揭晓这背后的秘密。 1 上瘾模型的应用 1.1上瘾模型的积极应用 1 学…

RT-Thread学习笔记(三):线程管理

线程管理 线程管理相关概念什么是时间片轮转调度器锁线程运行机制线程的五种状态 动态和静态创建线程区别动态和静态创建线程优缺点RT-Thread动态线程管理函数动态创建线程动态删除线程 RT-Thread静态线程管理函数静态创建线程 线程其他操作线程启动线程延时获得当前执行的线程…

四、基本组件

1. Designer设计师 Designer程序是Qt官方推出的专为设计人员使用的UI设计工具&#xff0c;程序员可以使用此工具大幅降低UI设计的代码量。 Designer设计文件的格式是.ui&#xff0c;需要配合同名的头文件与源文件使用。.ui文件通常被称为界面文件&#xff0c;其内部是xml语法的…

微机原理:汇编语言语句类型与格式

文章目录 壹、语句类型1、语句分类2、常用伪代码和运算符2.1数据定义伪指令2.1.1字节定义伪指令DB&#xff08;8位&#xff09;2.1.2字定义伪指令DW&#xff08;16位&#xff09;2.1.3双字节伪指令DD2.1.4 多字节定义DF/DQ/DT&#xff08;了解&#xff09; 2.2 常用运算符2.2.1…

【数据库】SQL 过滤数据

过滤数据 简单过滤where 子句操作符检查单个值范围值检擦空值检查 高级过滤多个过滤条件求值顺序IN 操作符NOT 操作符 在 s q l sql sql 语句中&#xff0c;通过 WHERE 子句指定搜索条件进行过滤。 简单过滤 包含&#xff1a;WHERE&#xff0c;BETWEEN&#xff0c;IS NULL&a…

面向对象与面向过程讲解

目录 简介 面向过程编程&#xff08;Procedural Programming&#xff09; 什么是面向过程编程&#xff1f; 特点&#xff1a; 面向对象编程&#xff08;Object-Oriented Programming&#xff09; 什么是面向对象编程&#xff1f; 特点&#xff1a; 面向对象 vs. 面向过程…

学习人工智能

在线课程 优达学城 当斯坦福大学讲师 Sebastian Thrun 和 Peter Norvig 将他们的课程“人工智能概论”免费放到网上时&#xff0c;Udacity 开始了在线学习的实验。从那时起&#xff0c;它就受到了巨大的欢迎&#xff08;来自 190 多个国家的 160,000 名学生&#xff09;&#x…

[java进阶]——异常详解,try catch捕获异常,抛出异常

&#x1f308;键盘敲烂&#xff0c;年薪30万&#x1f308; 目录 一、异常的体系结构 二、处理异常的本质 三、异常处理的三种方式 3.1虚拟机jvm处理(默认) 3.2 try catch捕获异常 3.3抛出异常 3.4finally关键字 四、自定义异常 五、总结 一、异常的体系结构 分析&#…

【uniapp】proxy 代理切换至线上测试地址调试接口

本地测试地址形如&#xff1a;http://192.168.124.x:xxxx 线上测试地址形如&#xff1a;https://xxxx.xxxx.com 使用线上地址之后需要修改配置项 secure 为 true const constant require(./src/utils/constant) module.exports {devServer: {proxy: {/api: {target: constan…

Node-EventEmitter的用法

题记 EventEmitter的用法&#xff0c;以下是详细过程和代码。 Node.js 所有的异步 I/O 操作在完成时都会发送一个事件到事件队列。 Node.js 里面的许多对象都会分发事件&#xff1a;一个 net.Server 对象会在每次有新连接时触发一个事件&#xff0c; 一个 fs.readStream 对象会…

[python-大语言模型]从浅到深一系列学习笔记记录

整体学习路径参照&#xff1a;点这里 python-机器学习-深度学习-大语言模型-数据开发 面向开发者的LLM入门提示原则 面向开发者的LLM入门 学习链接&#xff1a; github地址&#xff1a;https://github.com/datawhalechina/prompt-engineering-for-developers 在线阅读地址&…

【LeetCode】145. 二叉树的后序遍历 [ 左子树 右子树 根结点]

题目链接 文章目录 Python3方法一&#xff1a; 递归 ⟮ O ( n ) ⟯ \lgroup O(n) \rgroup ⟮O(n)⟯方法二&#xff1a; 迭代 ⟮ O ( n ) ⟯ \lgroup O(n) \rgroup ⟮O(n)⟯方法三&#xff1a; Morris ⟮ O ( n ) 、 O ( 1 ) ⟯ \lgroup O(n)、O(1) \rgroup ⟮O(n)、O(1)⟯写…

MySQL表操作—存储

建表&#xff1a; mysql> create table sch( -> id int primary key, -> name varchar(50) not null, -> glass varchar(50) not null -> ); Query OK, 0 rows affected (0.01 sec) 插入数据&#xff1a; mysql> insert into sch (id,name,…