Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见

  • 一、相关API的handler
    • 1、接受HTTP请求的hander(RestRefreshAction)
    • 2、往数据节点发送刷新请求的action(TransportRefreshAction)
    • 3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)
  • 二、在IndexShard执行refresh操作
    • 1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据
      • (1)、maybeRefresh和maybeRefreshBlocking的简单介绍
  • 三、lucene源码中执行逻辑
    • 1、判断是否需要刷新

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是refresh命令把ES写入索引缓冲区的数据刷进Lucene,使数据可供查询,搜索,否则,在索引缓冲区是不可见的,不涉及到在translog.logLucene的数据结构。
通过这个流程知道ES如何把索引缓冲区的数据刷进Lucene的,主要是下面左中部分refresh部分

在这里插入图片描述

其他部分源码梳理
1、主节点同时写入ES缓冲区和translog这一部分,请看Elasticsearch 8.9 Bulk批量给索引增加数据源码
2、下半边fsync的源码逻辑,请看Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler

ActionModule.java

 registerHandler.accept(new RestRefreshAction());actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);

1、接受HTTP请求的hander(RestRefreshAction)

public class RestRefreshAction extends BaseRestHandler {@Overridepublic List<Route> routes() {return List.of(new Route(GET, "/_refresh"),new Route(POST, "/_refresh"),new Route(GET, "/{index}/_refresh"),new Route(POST, "/{index}/_refresh"));}@Overridepublic String getName() {return "refresh_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {@Overrideprotected RestStatus getStatus(RefreshResponse response) {return response.getStatus();}});}
}

client.admin().indices().refresh()会执行到下面的父类TransportBroadcastReplicationActiondoExecute方法

2、往数据节点发送刷新请求的action(TransportRefreshAction)

public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest,RefreshResponse,BasicReplicationRequest,ReplicationResponse> {@Injectpublic TransportRefreshAction(ClusterService clusterService,TransportService transportService,ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver,NodeClient client) {super(RefreshAction.NAME,RefreshRequest::new,clusterService,transportService,client,actionFilters,indexNameExpressionResolver,TransportShardRefreshAction.TYPE,ThreadPool.Names.REFRESH);}//省略代码
}
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest<Request>,Response extends BaseBroadcastResponse,ShardRequest extends ReplicationRequest<ShardRequest>,ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {@Overrideprotected void doExecute(Task task, Request request, ActionListener<Response> listener) {clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));}private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {return new CheckedConsumer<ActionListener<Response>, Exception>() {private int totalShardCopyCount;private int successShardCopyCount;private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();@Overridepublic void accept(ActionListener<Response> listener) {assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";final ClusterState clusterState = clusterService.state();final List<ShardId> shards = shards(request, clusterState);final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();try (var refs = new RefCountingRunnable(() -> finish(listener))) {//遍历所有的分片for (final ShardId shardId : shards) {// NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?shardExecute(task,request,shardId,ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire()));}}}};}protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {assert Transports.assertNotTransportThread("may hit all the shards");ShardRequest shardRequest = newShardRequest(request, shardId);shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);}}    

3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)

public class TransportShardRefreshAction extends TransportReplicationAction<BasicReplicationRequest,ShardRefreshReplicaRequest,ReplicationResponse> {private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);public static final String NAME = RefreshAction.NAME + "[s]";public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);public static final String SOURCE_API = "api";@Injectpublic TransportShardRefreshAction(Settings settings,TransportService transportService,ClusterService clusterService,IndicesService indicesService,ThreadPool threadPool,ShardStateAction shardStateAction,ActionFilters actionFilters) {super(settings,NAME,transportService,clusterService,indicesService,threadPool,shardStateAction,actionFilters,BasicReplicationRequest::new,ShardRefreshReplicaRequest::new,ThreadPool.Names.REFRESH);// registers the unpromotable version of shard refresh actionnew TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);}@Overrideprotected void shardOperationOnPrimary(BasicReplicationRequest shardRequest,IndexShard primary,ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener) {primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);replicaRequest.setParentTask(shardRequest.getParentTask());logger.trace("{} refresh request executed on primary", primary.shardId());l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));}));}
}    

primary.externalRefresh执行分片的刷新

二、在IndexShard执行refresh操作

 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {verifyNotClosed();getEngine().externalRefresh(source, listener);}public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {ActionListener.completeWith(listener, () -> {logger.trace("external refresh with source [{}]", source);return refresh(source);});}   

getEngine()的实现是InternalEngine

  @Overridepublic RefreshResult refresh(String source) throws EngineException {return refresh(source, SearcherScope.EXTERNAL, true);}

1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据

   protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {//这两种刷新类型都会导致内部刷新,但只有外部刷新类型也会将新的读取器引用传递给外部读取器管理器。//获取当前的本地检查点。final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();boolean refreshed;long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;try {//refresh 不需要按住 readLock,因为如果引擎在中途关闭,ReferenceManager 可以正确处理。if (store.tryIncRef()) {try {//尽管我们保留了 2 managers,但我们实际上只做过一次繁重的工作。第二次刷新只会做我们必须做的额外工作,以预热缓存等。ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();//根据参数决定是进行阻塞刷新还是非阻塞刷新if (block) { //刷新可能会导致阻塞referenceManager.maybeRefreshBlocking();refreshed = true;} else {//刷新不会导致阻塞refreshed = referenceManager.maybeRefresh();}//如果刷新成功,获取当前的读取器,并更新段的生成号if (refreshed) {//获取当前的目录final ElasticsearchDirectoryReader current = referenceManager.acquire();try {//更新segment信息segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);} finally {referenceManager.release(current);}}} finally {store.decRef();}if (refreshed) {lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);}} else {refreshed = false;}} catch (AlreadyClosedException e) {failOnTragicEvent(e);throw e;} catch (Exception e) {try {failEngine("refresh failed source[" + source + "]", e);} catch (Exception inner) {e.addSuppressed(inner);}throw new RefreshFailedEngineException(shardId, e);}assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh: "refresh checkpoint was not advanced; "+ "local_checkpoint="+ localCheckpointBeforeRefresh+ " refresh_checkpoint="+ lastRefreshedCheckpoint();// TODO: maybe we should just put a scheduled job in threadPool?// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes// for a long time:maybePruneDeletes();mergeScheduler.refreshConfig();return new RefreshResult(refreshed, segmentGeneration);}

其中referenceManager 根据入参是 SearcherScope.EXTERNAL 获得的实现是ExternalReaderManager

    private final ExternalReaderManager externalReaderManager;@Overrideprotected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {return switch (scope) {case INTERNAL -> internalReaderManager;case EXTERNAL -> externalReaderManager;};}

根据入参中的block=true 实际执行的是referenceManager.maybeRefreshBlocking(); 来刷新,是异步非阻塞的,
并且根据下图ExternalReaderManager继承了ReferenceManager,所以没有重写maybeRefreshBlocking 所以执行的是父类ReferenceManager

import org.apache.lucene.search.ReferenceManager;@SuppressForbidden(reason = "reference counting is required here")private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {//省略代码}@Overrideprotected boolean tryIncRef(ElasticsearchDirectoryReader reference) {return reference.tryIncRef();}@Overrideprotected int getRefCount(ElasticsearchDirectoryReader reference) {return reference.getRefCount();}@Overrideprotected void decRef(ElasticsearchDirectoryReader reference) throws IOException {reference.decRef();}}

(1)、maybeRefresh和maybeRefreshBlocking的简单介绍

下面是lucene源码中关于这两个API的实现,

//这个是会尝试获取刷新锁,如果没有则不执行刷新操作public final boolean maybeRefresh() throws IOException {this.ensureOpen();boolean doTryRefresh = this.refreshLock.tryLock();if (doTryRefresh) {try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}return doTryRefresh;}//这里会等待获取刷新锁,所以会阻塞public final void maybeRefreshBlocking() throws IOException {this.ensureOpen();this.refreshLock.lock();try {this.doMaybeRefresh();} finally {this.refreshLock.unlock();}}

但是实际上最后执行刷新还是执行的this.doMaybeRefresh() 方法

三、lucene源码中执行逻辑

private void doMaybeRefresh() throws IOException {this.refreshLock.lock();boolean refreshed = false;try {Object reference = this.acquire();try {//通知刷新监听器。this.notifyRefreshListenersBefore();//调用 refreshIfNeeded(reference) 返回一个新的引用 (newReference)//用来判断是否需要刷新,如果不需要刷新,refreshIfNeeded 应返回 nullG newReference = this.refreshIfNeeded(reference);if (newReference != null) {assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";try {//调用 swapReference(newReference) 方法来交换旧的引用为新的引用。this.swapReference(newReference);//设置 refreshed 为 true 表示刷新成功。   refreshed = true;} finally {//如果刷新失败,释放新的引用if (!refreshed) {this.release(newReference);}}}} finally {//释放旧的引用this.release(reference);//通知刷新监听器刷新完成this.notifyRefreshListenersRefreshed(refreshed);}this.afterMaybeRefresh();} finally {//最后释放刷新锁this.refreshLock.unlock();}}

1、判断是否需要刷新

其中refreshIfNeeded用的是子类ExternalReaderManager的实现方法

private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {@Overrideprotected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {internalReaderManager.maybeRefreshBlocking();//获取其reader对象。final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();//isWarmedUp为false或者获取到的新reader对象与传入的referenceToRefresh对象不相等,说明需要刷新if (isWarmedUp == false || newReader != referenceToRefresh) {boolean success = false;try {refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);isWarmedUp = true;success = true;} finally {if (success == false) {internalReaderManager.release(newReader);}}}//没有任何变化 - 两个 ref 管理器共享同一个实例,因此我们可以使用引用相等性,不需要执行刷新操作if (referenceToRefresh == newReader) {internalReaderManager.release(newReader);return null;} else {return newReader; // steal the reference}}
}        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/212637.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【华为数据之道学习笔记】3-8以确保合规遵从为核心的外部数据管理

一、以确保合规遵从为核心的外部数据管理 外部数据是指华为公司引入的外部组织或者个人拥有处置权利的 数据&#xff0c;如供应商资质证明、消费者洞察报告等。外部数据治理的出发点是合规遵从优先&#xff0c;与内部数据治理的目的不同。 外部数据的治理主要遵循以下原则。 1&…

【设计模式--创建型--原型模式】

设计模式--创建型--原型模式 原型模式概述结构实现结果 案例代码结果使用场景 扩展&#xff08;深\浅克隆&#xff09;浅克隆演示&#xff1a;结果&#xff1a;使用深克隆&#xff08;利用对象流&#xff09;结果 原型模式 概述 用一个已经创建的实例作为原型&#xff0c;通过…

Go简单了解

0.一直很好奇,go是不是像传说中的速度快,解决了多线程问题,快速进行了解了解,和java进行对比,他是怎么解决语言发展的问题的…,所有语言都是差不多的,只是熟练程度不同而已 1.go图标是土拨鼠,2009发行 docker使用go,解决了并发问题 google facebook 腾讯 百度 七牛云 京东 小米…

Spring Cloud Gateway + Nacos + LoadBalancer实现企业级网关

1. Spring Cloud Gateway 整合Nacos、LoadBalancer 实现企业级网关 前置工作&#xff1a; 创建 SpringBoot 多模块项目创建网关&#xff08;gateway-service&#xff09;、用户&#xff08;user-service&#xff09;模块用户模块添加 Nacos discovery 支持以及 Spring Web&am…

gitbash下载安装

参考教程 零、下载 官网地址 2.43.0win64 链接&#xff1a;https://pan.baidu.com/s/16urs_nmky7j20-qNzUTTkg 提取码&#xff1a;7jaq 一、安装 图标组件&#xff08;Additional icons&#xff09;&#xff1a;选择是否创建桌面快捷方式&#xff1b;桌面浏览&#xff08;Win…

设计模式--命令模式的简单例子

引入&#xff1a;以一个对数组的增删改查为例。通过命令模式可以对数组进行增删改查以及撤销回滚。 一、基本概念 命令模式有多种分法&#xff0c;在本文中主要分为CommandMgr、Command、Receiver. CommandMgr主要用于控制命令执行等操作、Command为具体的命令、Receiver为命…

逸迅科技丁红阳:三种能力帮助企业打造GBI “护城河”

大数据产业创新服务媒体 ——聚焦数据 改变商业 近日&#xff0c;由上海市经济和信息化委员会、上海市科学技术委员会指导&#xff0c;数据猿与上海大数据联盟联合主办的“2023企业数智化转型升级发展论坛”在上海举行。本次论坛以“释放数字价值驱动智能升级”为主题&#xf…

piakachu越权漏洞

水平越权 首先打开这一关&#xff0c;在右侧有一些提示&#xff0c;我们可以看到 然后我们随便输入一组信息即可&#xff0c;可以在url中看到这样的字段 当我们尝试在url中直接更换另一个用户名时可以发现&#xff0c;直接切换到了另一个用户的身份 垂直越权 这里可以看到右边…

QML和C++交互中,实现C++中connect到qml的信号,再从qml发射信号传递数据给C++的一种方式

1.需求&#xff1a; 假设我们有一个需求&#xff0c;要求在用户点击列表中的项目时&#xff0c;不仅在控制台上输出项目的名称&#xff0c;还要在C端进行一些处理。我们希望在C端能够接收到用户点击的项目名称&#xff0c;并进行相应的处理。 2.分析&#xff1a; 在这种情况…

Android 10.0 系统framework修改低电量关机值为2%

1.前言 在10.0的系统产品开发中,在系统关于低电量关机的值,每个平台都不同,根据实际开发底层硬件的要求看实际情况来调整这个值, 所以需要分析相关的电量变化执行的代码流程,来实现这个功能 2.系统framework修改低电量关机值为2%的核心类 frameworks\base\services\cor…

一文学会使用 PyInstaller 将 Python 脚本打包为 .exe 可执行文件

文章目录 前言PyInstaller特点跨平台支持自动依赖项处理单文件发布支持图形用户界面&#xff08;GUI&#xff09;和命令行界面&#xff08;CLI&#xff09;应用支持多种打包选项 基本用法常用参数其它参数 版本 & 环境实现步骤安装 PyInstaller创建 Python 脚本使用 PyInst…

Strange-Towers-of-Hanoi

title: Strange Towers of Hanoi date: 2023-12-11 03:20:05 tags: 递推 categories: 算法进阶指南 题目大意 解出 n n n 个盒子 4 4 4 座塔的汉诺塔问题最少需要多少次&#xff1f; 思路 首先考虑 n n n 个盒子 3 3 3 座塔的经典汉诺塔问题&#xff0c;设 d [ n ] d[n] …

第三十章 控制到 XML 模式的映射 - Array of Classname

文章目录 第三十章 控制到 XML 模式的映射 - Array of ClassnameArray of Classname 第三十章 控制到 XML 模式的映射 - Array of Classname Array of Classname 本部分显示了从启用 XML 的类生成的XML 架构的一部分&#xff0c;此时该类包含定义为类名数组的属性。例如&…

【SpringBoot教程】SpringBoot 创建定时任务(配合数据库动态执行)

作者简介&#xff1a;大家好&#xff0c;我是撸代码的羊驼&#xff0c;前阿里巴巴架构师&#xff0c;现某互联网公司CTO 联系v&#xff1a;sulny_ann&#xff08;17362204968&#xff09;&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗…

transformer模型结构|李宏毅机器学习21年

来源&#xff1a;https://www.bilibili.com/video/BV1Bb4y1L7FT?p4&vd_sourcef66cebc7ed6819c67fca9b4fa3785d39 文章目录 概述seq2seqtransformerEncoderDecoderAutoregressive&#xff08;AT&#xff09;self-attention与masked-self attentionmodel如何决定输出的长度…

【亲测有效】支持横竖屏 微信小程序video禁止进度条拖动,微信小程序遮罩进度条,

背景&#xff1a;部分课程禁止客户拖动视频进度条直至播放结束 红色是遮罩区域遮罩区域 实际遮罩效果&#xff08;有一个很浅的阴影区域&#xff09; 实现代码 .wxml文件 <video enable-progress-gesture"false" ><cover-view class"cover">…

基于深度学习的yolov7植物病虫害识别及防治系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介简介YOLOv7 系统特性工作流程 二、功能三、系统四. 总结 一项目简介 # YOLOv7植物病虫害识别及防治系统介绍 简介 该系统基于深度学习技术&#xff0c;采…

Seata配置

参考教程 seata 分布式事务的环境搭建与使用 Seata 1.4.0 nacos配置和使用&#xff0c;超详细 Seata 1.4.2 的安装 Nacos的配置和使用 官网下载地址 本文以v1.4.1为例 1.数据库及表的创建 创建seata数据库&#xff0c;创建以下表&#xff08;右键连接-》新建数据库seata-》…

kubeadm搭建1.20.7版本k8s

资源 服务器名称ip地址服务master1&#xff08;2C/4G&#xff0c;cpu核心数要求大于2&#xff09;192.168.100.10docker、kubeadm、kubelet、kubectl、flannelnode01&#xff08;2C/2G&#xff09;192.168.100.30docker、kubeadm、kubelet、kubectl、flannelnode02&#xff08…

windows系统proteus中Ardunio Mega 2560和虚拟机上Ubuntu系统CuteCom进行串口通信

在文章利用proteus实现串口助手和arduino Mega 2560的串口通信-CSDN博客 中&#xff0c;实现了windows系统的proteus中Ardunio Mega 2560和SSCOM通过虚拟串口进行通信。虚拟串口的连接示意图如下图所示。 在文章windows系统和虚拟机上ubuntu系统通过虚拟串口进行通信-CSDN博客…