Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见

  • 一、相关API的handler
    • 1、接受HTTP请求的hander(RestRefreshAction)
    • 2、往数据节点发送刷新请求的action(TransportRefreshAction)
    • 3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)
  • 二、在IndexShard执行refresh操作
    • 1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据
      • (1)、maybeRefresh和maybeRefreshBlocking的简单介绍
  • 三、lucene源码中执行逻辑
    • 1、判断是否需要刷新

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是refresh命令把ES写入索引缓冲区的数据刷进Lucene,使数据可供查询,搜索,否则,在索引缓冲区是不可见的,不涉及到在translog.logLucene的数据结构。
通过这个流程知道ES如何把索引缓冲区的数据刷进Lucene的,主要是下面左中部分refresh部分

在这里插入图片描述

其他部分源码梳理
1、主节点同时写入ES缓冲区和translog这一部分,请看Elasticsearch 8.9 Bulk批量给索引增加数据源码
2、下半边fsync的源码逻辑,请看Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler

ActionModule.java

 registerHandler.accept(new RestRefreshAction());actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);

1、接受HTTP请求的hander(RestRefreshAction)

public class RestRefreshAction extends BaseRestHandler {@Overridepublic List<Route> routes() {return List.of(new Route(GET, "/_refresh"),new Route(POST, "/_refresh"),new Route(GET, "/{index}/_refresh"),new Route(POST, "/{index}/_refresh"));}@Overridepublic String getName() {return "refresh_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {@Overrideprotected RestStatus getStatus(RefreshResponse response) {return response.getStatus();}});}
}

client.admin().indices().refresh()会执行到下面的父类TransportBroadcastReplicationActiondoExecute方法

2、往数据节点发送刷新请求的action(TransportRefreshAction)

public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest,RefreshResponse,BasicReplicationRequest,ReplicationResponse> {@Injectpublic TransportRefreshAction(ClusterService clusterService,TransportService transportService,ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver,NodeClient client) {super(RefreshAction.NAME,RefreshRequest::new,clusterService,transportService,client,actionFilters,indexNameExpressionResolver,TransportShardRefreshAction.TYPE,ThreadPool.Names.REFRESH);}//省略代码
}
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest<Request>,Response extends BaseBroadcastResponse,ShardRequest extends ReplicationRequest<ShardRequest>,ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {@Overrideprotected void doExecute(Task task, Request request, ActionListener<Response> listener) {clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));}private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {return new CheckedConsumer<ActionListener<Response>, Exception>() {private int totalShardCopyCount;private int successShardCopyCount;private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();@Overridepublic void accept(ActionListener<Response> listener) {assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";final ClusterState clusterState = clusterService.state();final List<ShardId> shards = shards(request, clusterState);final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();try (var refs = new RefCountingRunnable(() -> finish(listener))) {//遍历所有的分片for (final ShardId shardId : shards) {// NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?shardExecute(task,request,shardId,ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire()));}}}};}protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {assert Transports.assertNotTransportThread("may hit all the shards");ShardRequest shardRequest = newShardRequest(request, shardId);shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);}}    

3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)

public class TransportShardRefreshAction extends TransportReplicationAction<BasicReplicationRequest,ShardRefreshReplicaRequest,ReplicationResponse> {private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);public static final String NAME = RefreshAction.NAME + "[s]";public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);public static final String SOURCE_API = "api";@Injectpublic TransportShardRefreshAction(Settings settings,TransportService transportService,ClusterService clusterService,IndicesService indicesService,ThreadPool threadPool,ShardStateAction shardStateAction,ActionFilters actionFilters) {super(settings,NAME,transportService,clusterService,indicesService,threadPool,shardStateAction,actionFilters,BasicReplicationRequest::new,ShardRefreshReplicaRequest::new,ThreadPool.Names.REFRESH);// registers the unpromotable version of shard refresh actionnew TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);}@Overrideprotected void shardOperationOnPrimary(BasicReplicationRequest shardRequest,IndexShard primary,ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener) {primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);replicaRequest.setParentTask(shardRequest.getParentTask());logger.trace("{} refresh request executed on primary", primary.shardId());l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));}));}
}    

primary.externalRefresh执行分片的刷新

二、在IndexShard执行refresh操作

 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {verifyNotClosed();getEngine().externalRefresh(source, listener);}public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {ActionListener.completeWith(listener, () -> {logger.trace("external refresh with source [{}]", source);return refresh(source);});}   

getEngine()的实现是InternalEngine

  @Overridepublic RefreshResult refresh(String source) throws EngineException {return refresh(source, SearcherScope.EXTERNAL, true);}

1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据

   protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {//这两种刷新类型都会导致内部刷新,但只有外部刷新类型也会将新的读取器引用传递给外部读取器管理器。//获取当前的本地检查点。final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();boolean refreshed;long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;try {//refresh 不需要按住 readLock,因为如果引擎在中途关闭,ReferenceManager 可以正确处理。if (store.tryIncRef()) {try {//尽管我们保留了 2 managers,但我们实际上只做过一次繁重的工作。第二次刷新只会做我们必须做的额外工作,以预热缓存等。ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();//根据参数决定是进行阻塞刷新还是非阻塞刷新if (block) { //刷新可能会导致阻塞referenceManager.maybeRefreshBlocking();refreshed = true;} else {//刷新不会导致阻塞refreshed = referenceManager.maybeRefresh();}//如果刷新成功,获取当前的读取器,并更新段的生成号if (refreshed) {//获取当前的目录final ElasticsearchDirectoryReader current = referenceManager.acquire();try {//更新segment信息segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);} finally {referenceManager.release(current);}}} finally {store.decRef();}if (refreshed) {lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);}} else {refreshed = false;}} catch (AlreadyClosedException e) {failOnTragicEvent(e);throw e;} catch (Exception e) {try {failEngine("refresh failed source[" + source + "]", e);} catch (Exception inner) {e.addSuppressed(inner);}throw new RefreshFailedEngineException(shardId, e);}assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh: "refresh checkpoint was not advanced; "+ "local_checkpoint="+ localCheckpointBeforeRefresh+ " refresh_checkpoint="+ lastRefreshedCheckpoint();// TODO: maybe we should just put a scheduled job in threadPool?// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes// for a long time:maybePruneDeletes();mergeScheduler.refreshConfig();return new RefreshResult(refreshed, segmentGeneration);}

其中referenceManager 根据入参是 SearcherScope.EXTERNAL 获得的实现是ExternalReaderManager

    private final ExternalReaderManager externalReaderManager;@Overrideprotected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {return switch (scope) {case INTERNAL -> internalReaderManager;case EXTERNAL -> externalReaderManager;};}

根据入参中的block=true 实际执行的是referenceManager.maybeRefreshBlocking(); 来刷新,是异步非阻塞的,
并且根据下图ExternalReaderManager继承了ReferenceManager,所以没有重写maybeRefreshBlocking 所以执行的是父类ReferenceManager

import org.apache.lucene.search.ReferenceManager;@SuppressForbidden(reason = "reference counting is required here")private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {//省略代码}@Overrideprotected boolean tryIncRef(ElasticsearchDirectoryReader reference) {return reference.tryIncRef();}@Overrideprotected int getRefCount(ElasticsearchDirectoryReader reference) {return reference.getRefCount();}@Overrideprotected void decRef(ElasticsearchDirectoryReader reference) throws IOException {reference.decRef();}}

(1)、maybeRefresh和maybeRefreshBlocking的简单介绍

下面是lucene源码中关于这两个API的实现,

//这个是会尝试获取刷新锁,如果没有则不执行刷新操作public final boolean maybeRefresh() throws IOException {this.ensureOpen();boolean doTryRefresh = this.refreshLock.tryLock();if (doTryRefresh) {try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}return doTryRefresh;}//这里会等待获取刷新锁,所以会阻塞public final void maybeRefreshBlocking() throws IOException {this.ensureOpen();this.refreshLock.lock();try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}

但是实际上最后执行刷新还是执行的this.doMaybeRefresh() 方法

三、lucene源码中执行逻辑

private void doMaybeRefresh() throws IOException {this.refreshLock.lock();boolean refreshed = false;try {Object reference = this.acquire();try {//通知刷新监听器。this.notifyRefreshListenersBefore();//调用 refreshIfNeeded(reference) 返回一个新的引用 (newReference)//用来判断是否需要刷新,如果不需要刷新,refreshIfNeeded 应返回 nullG newReference = this.refreshIfNeeded(reference);if (newReference != null) {assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";try {//调用 swapReference(newReference) 方法来交换旧的引用为新的引用。this.swapReference(newReference);//设置 refreshed 为 true 表示刷新成功。   refreshed = true;} finally {//如果刷新失败,释放新的引用if (!refreshed) {this.release(newReference);}}}} finally {//释放旧的引用this.release(reference);//通知刷新监听器刷新完成this.notifyRefreshListenersRefreshed(refreshed);}this.afterMaybeRefresh();} finally {//最后释放刷新锁this.refreshLock.unlock();}}

1、判断是否需要刷新

其中refreshIfNeeded用的是子类ExternalReaderManager的实现方法

private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {internalReaderManager.maybeRefreshBlocking();//获取其reader对象。final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();//isWarmedUp为false或者获取到的新reader对象与传入的referenceToRefresh对象不相等,说明需要刷新if (isWarmedUp == false || newReader != referenceToRefresh) {boolean success = false;try {refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);isWarmedUp = true;success = true;} finally {if (success == false) {internalReaderManager.release(newReader);}}}//没有任何变化 - 两个 ref 管理器共享同一个实例,因此我们可以使用引用相等性,不需要执行刷新操作if (referenceToRefresh == newReader) {internalReaderManager.release(newReader);return null;} else {return newReader; // steal the reference}}
}        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/212637.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【设计模式--创建型--原型模式】

设计模式--创建型--原型模式 原型模式概述结构实现结果 案例代码结果使用场景 扩展&#xff08;深\浅克隆&#xff09;浅克隆演示&#xff1a;结果&#xff1a;使用深克隆&#xff08;利用对象流&#xff09;结果 原型模式 概述 用一个已经创建的实例作为原型&#xff0c;通过…

Spring Cloud Gateway + Nacos + LoadBalancer实现企业级网关

1. Spring Cloud Gateway 整合Nacos、LoadBalancer 实现企业级网关 前置工作&#xff1a; 创建 SpringBoot 多模块项目创建网关&#xff08;gateway-service&#xff09;、用户&#xff08;user-service&#xff09;模块用户模块添加 Nacos discovery 支持以及 Spring Web&am…

gitbash下载安装

参考教程 零、下载 官网地址 2.43.0win64 链接&#xff1a;https://pan.baidu.com/s/16urs_nmky7j20-qNzUTTkg 提取码&#xff1a;7jaq 一、安装 图标组件&#xff08;Additional icons&#xff09;&#xff1a;选择是否创建桌面快捷方式&#xff1b;桌面浏览&#xff08;Win…

逸迅科技丁红阳:三种能力帮助企业打造GBI “护城河”

大数据产业创新服务媒体 ——聚焦数据 改变商业 近日&#xff0c;由上海市经济和信息化委员会、上海市科学技术委员会指导&#xff0c;数据猿与上海大数据联盟联合主办的“2023企业数智化转型升级发展论坛”在上海举行。本次论坛以“释放数字价值驱动智能升级”为主题&#xf…

piakachu越权漏洞

水平越权 首先打开这一关&#xff0c;在右侧有一些提示&#xff0c;我们可以看到 然后我们随便输入一组信息即可&#xff0c;可以在url中看到这样的字段 当我们尝试在url中直接更换另一个用户名时可以发现&#xff0c;直接切换到了另一个用户的身份 垂直越权 这里可以看到右边…

一文学会使用 PyInstaller 将 Python 脚本打包为 .exe 可执行文件

文章目录 前言PyInstaller特点跨平台支持自动依赖项处理单文件发布支持图形用户界面&#xff08;GUI&#xff09;和命令行界面&#xff08;CLI&#xff09;应用支持多种打包选项 基本用法常用参数其它参数 版本 & 环境实现步骤安装 PyInstaller创建 Python 脚本使用 PyInst…

【SpringBoot教程】SpringBoot 创建定时任务(配合数据库动态执行)

作者简介&#xff1a;大家好&#xff0c;我是撸代码的羊驼&#xff0c;前阿里巴巴架构师&#xff0c;现某互联网公司CTO 联系v&#xff1a;sulny_ann&#xff08;17362204968&#xff09;&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗…

transformer模型结构|李宏毅机器学习21年

来源&#xff1a;https://www.bilibili.com/video/BV1Bb4y1L7FT?p4&vd_sourcef66cebc7ed6819c67fca9b4fa3785d39 文章目录 概述seq2seqtransformerEncoderDecoderAutoregressive&#xff08;AT&#xff09;self-attention与masked-self attentionmodel如何决定输出的长度…

【亲测有效】支持横竖屏 微信小程序video禁止进度条拖动,微信小程序遮罩进度条,

背景&#xff1a;部分课程禁止客户拖动视频进度条直至播放结束 红色是遮罩区域遮罩区域 实际遮罩效果&#xff08;有一个很浅的阴影区域&#xff09; 实现代码 .wxml文件 <video enable-progress-gesture"false" ><cover-view class"cover">…

基于深度学习的yolov7植物病虫害识别及防治系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介简介YOLOv7 系统特性工作流程 二、功能三、系统四. 总结 一项目简介 # YOLOv7植物病虫害识别及防治系统介绍 简介 该系统基于深度学习技术&#xff0c;采…

Seata配置

参考教程 seata 分布式事务的环境搭建与使用 Seata 1.4.0 nacos配置和使用&#xff0c;超详细 Seata 1.4.2 的安装 Nacos的配置和使用 官网下载地址 本文以v1.4.1为例 1.数据库及表的创建 创建seata数据库&#xff0c;创建以下表&#xff08;右键连接-》新建数据库seata-》…

windows系统proteus中Ardunio Mega 2560和虚拟机上Ubuntu系统CuteCom进行串口通信

在文章利用proteus实现串口助手和arduino Mega 2560的串口通信-CSDN博客 中&#xff0c;实现了windows系统的proteus中Ardunio Mega 2560和SSCOM通过虚拟串口进行通信。虚拟串口的连接示意图如下图所示。 在文章windows系统和虚拟机上ubuntu系统通过虚拟串口进行通信-CSDN博客…

3DMAX关于显示驱动问题的解决方法大全

3DMAX与显卡驱动有关的问题主要有以下几种情况&#xff1a; 1.3DMAX启动弹出这样的界面&#xff1a; 2.主工具栏按钮不显示&#xff0c;或者鼠标移上去才显示&#xff08;刷新问题&#xff09;。 3&#xff0e;视口菜单不显示或显示不全。 问题分析&#xff1a; 首先&#x…

安全基础从0开始

文章目录 常见名词小实战 网站搭建小实战抓包模拟器状态码返回值网站搭建WEB应用安全漏洞 数据包&封包&信息收集**参考点** 常见名词 前后端&#xff0c;POC/EXP&#xff0c;Payload/Shellcode&#xff0c;后门/Webshell&#xff0c;木马/病毒&#xff0c; 反弹&…

基于ssm应急资源管理系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本应急资源管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…

排序算法之七:归并排序(递归)

基本思想 基本思想&#xff1a; 归并排序&#xff08;MERGE-SORT&#xff09;是建立在归并操作上的一种有效的排序算法,该算法是采用分治法&#xff08;Divide and Conquer&#xff09;的一个非常典型的应用。将已有序的子序列合并&#xff0c;得到完全有序的序列&#xff1…

C++:this指针

目录 前言 成员函数返回this指向的对象本身时&#xff0c;为什是返回引用类型&#xff1f; 成员函数返回this对象本身时&#xff0c;内部通常会通过拷贝构造函数来创建一个临时对象&#xff1f; 总结 前言 c通过提供特殊的对象指针&#xff0c;this指针 指向被调用的成员函…

Nodejs 第二十二章(脚手架)

编写自己的脚手架 那什么是脚手架&#xff1f; 例如:vue-cli Angular CLI Create React App 编写自己的脚手架是指创建一个定制化的工具&#xff0c;用于快速生成项目的基础结构和代码文件&#xff0c;以及提供一些常用的命令和功能。通过编写自己的脚手架&#xff0c;你可以…

Linux和Windows环境下如何使用gitee?

1. Linux 1.1 创建远程仓库 1.2 安装git sudo yum install -y git 1.3 克隆远程仓库到本地 git clone 地址 1.4 将文件添加到git的暂存区&#xff08;git三板斧之add&#xff09; git add 文件名 # 将指定文件添加到git的暂存区 git add . # 添加新文件和修改过的…

vs2017+qt5.14.2遇到的问题

1、在安装qt插件后&#xff0c;导入pro文件时&#xff0c;报 msvc-version.conf loaded but QMAKE_MSC_VER isn’t set 修改E:\Qt\Qt5.14.2\5.14.2\msvc2017_64\mkspecs\common\msvc-version.conf文件中添加