Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

  • 一、相关API的handler
    • 1、接收HTTP请求的hander
    • 2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction
  • 二、对indexShard执行刷新请求
    • 1、首先获取读锁,再获取刷新锁,如果获取不到根据参数决定是否直接返回还是等待
    • 2、在刷新之后translog需要滚动生成新的,这样不会影响正在进行的写入和删除时方便
    • 3、把IndexWriter中的数据持久化磁盘
    • 4、开始处理translog.log
      • (1) 首先把内存中的translog全部写入磁盘
      • (2) 删除磁盘中的对应的translog文件
    • 5、更新最后刷新时间和刷新最后提交的段信息
  • 三、通过源码得到一些结论

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是人工通过flush命令把批量写入内存segment的数据刷新进磁盘,不涉及到在translog.logLucene的数据结构。
通过这个流程知道ES如何把数据刷新进磁盘的,主要是下图的下半部分中fsync部分

在这里插入图片描述

一、相关API的handler

ActionModule.java

registerHandler.accept(new RestFlushAction()); 
actions.register(FlushAction.INSTANCE, TransportFlushAction.class); 
actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class);

1、接收HTTP请求的hander

public class RestFlushAction extends BaseRestHandler {@Overridepublic List<Route> routes() {return List.of(new Route(GET, "/_flush"),new Route(POST, "/_flush"),new Route(GET, "/{index}/_flush"),new Route(POST, "/{index}/_flush"));}@Overridepublic String getName() {return "flush_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));return channel -> client.admin().indices().flush(flushRequest, new RestToXContentListener<>(channel));}
}

上面会执行下面这个,至于怎么到这里的,可以看Elasticsearch 8.9 Master节点处理请求源码

/*** Flush Action.*/
public class TransportFlushAction extends TransportBroadcastReplicationAction<FlushRequest,FlushResponse,ShardFlushRequest,ReplicationResponse> {@Injectpublic TransportFlushAction(ClusterService clusterService,TransportService transportService,NodeClient client,ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver) {super(FlushAction.NAME,FlushRequest::new,clusterService,transportService,client,actionFilters,indexNameExpressionResolver,TransportShardFlushAction.TYPE, //注意这个ThreadPool.Names.FLUSH);}//省略代码
}

这里需要注意上面的TransportShardFlushAction.TYPE

这里看一下它的父类TransportBroadcastReplicationAction

public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest<Request>,Response extends BaseBroadcastResponse,ShardRequest extends ReplicationRequest<ShardRequest>,ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {private final ActionType<ShardResponse> replicatedBroadcastShardAction;public TransportBroadcastReplicationAction(String name,Writeable.Reader<Request> requestReader,ClusterService clusterService,TransportService transportService,NodeClient client,ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver,ActionType<ShardResponse> replicatedBroadcastShardAction,String executor) {//省略代码//这里即上面的TransportShardFlushAction.TYPEthis.replicatedBroadcastShardAction = replicatedBroadcastShardAction;}@Overrideprotected void doExecute(Task task, Request request, ActionListener<Response> listener) {clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));}private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {return new CheckedConsumer<ActionListener<Response>, Exception>() {//省略代码@Overridepublic void accept(ActionListener<Response> listener) {final ClusterState clusterState = clusterService.state();final List<ShardId> shards = shards(request, clusterState);final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();//遍历分片try (var refs = new RefCountingRunnable(() -> finish(listener))) {for (final ShardId shardId : shards) {// NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?shardExecute(task,request,shardId,ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire()));}}}//省略代码};}protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {assert Transports.assertNotTransportThread("may hit all the shards");ShardRequest shardRequest = newShardRequest(request, shardId);shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());//通过执行replicatedBroadcastShardAction,即TransportShardFlushAction.class来实现分片的刷新client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);}
}

2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction

public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ReplicationResponse> {//主分片执行刷新@Overrideprotected void shardOperationOnPrimary(org.elasticsearch.action.admin.indices.flush.ShardFlushRequest shardRequest,IndexShard primary,ActionListener<PrimaryResult<ShardFlushRequest, ReplicationResponse>> listener) {ActionListener.completeWith(listener, () -> {primary.flush(shardRequest.getRequest());logger.trace("{} flush request executed on primary", primary.shardId());return new PrimaryResult<>(shardRequest, new ReplicationResponse());});}//副本分片执行刷新@Overrideprotected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {ActionListener.completeWith(listener, () -> {replica.flush(request.getRequest());logger.trace("{} flush request executed on replica", replica.shardId());return new ReplicaResult();});}  
}

其中shardOperationOnReplicaTransportReplicationAction下的onResponse方法触发

public abstract class TransportReplicationAction<Request extends ReplicationRequest<Request>,ReplicaRequest extends ReplicationRequest<ReplicaRequest>,Response extends ReplicationResponse> extends TransportAction<Request, Response> {//省略代码
private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {@Overridepublic void onResponse(Releasable releasable) {assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";try {shardOperationOnReplica(replicaRequest.getRequest(),replica,ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(ActionListener.wrap(r -> {final ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint(),replica.getLastSyncedGlobalCheckpoint());releasable.close(); // release shard operation lock before responding to callerif (logger.isTraceEnabled()) {logger.trace("action [{}] completed on shard [{}] for request [{}]",transportReplicaAction,replicaRequest.getRequest().shardId(),replicaRequest.getRequest());}setPhase(task, "finished");onCompletionListener.onResponse(response);}, e -> {Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to callerresponseWithFailure(e);})), e -> {Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to callerAsyncReplicaAction.this.onFailure(e);}));// TODO: Evaluate if we still need to catch this exception} catch (Exception e) {Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to callerAsyncReplicaAction.this.onFailure(e);}}
}}

二、对indexShard执行刷新请求

/*** 对引擎执行给定的刷新请求。** @param request the flush request* @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.*         If <code>false</code> is returned, no flush happened.*/public boolean flush(FlushRequest request) {final boolean waitIfOngoing = request.waitIfOngoing();final boolean force = request.force();logger.trace("flush with {}", request);verifyNotClosed();final long time = System.nanoTime();// TODO: Transition this method to async to support async flush 将此方法转换为异步以支持异步刷新PlainActionFuture<Engine.FlushResult> future = PlainActionFuture.newFuture();getEngine().flush(force, waitIfOngoing, future);Engine.FlushResult flushResult = future.actionGet();flushMetric.inc(System.nanoTime() - time);return flushResult.flushPerformed();}
/**刷新引擎的状态,包括事务日志、清除内存以及将 Lucene 索引中的文档写入磁盘。此方法将在调用线程上同步刷新。但是,根据引擎实现的不同,在触发侦听器之前,无法保证完全的耐用性。** @param force         如果为 true则即使不需要提交任何更改,也会执行 Lucene 提交* @param waitIfOngoing 如果为 true,则此调用将阻止,直到所有当前正在运行的刷新都完成。否则,此调用将返回而不阻塞。* @param listener      在达到完全耐久性后通知。如果 waitIfOngoing==false 并且检测到正在进行的请求,则不会发生刷新,侦听器将完成,并带有指示没有刷新和未知生成的标记*/public abstract void flush(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException;

其中实际调用的是Engine的子类InternalEngine

1、首先获取读锁,再获取刷新锁,如果获取不到根据参数决定是否直接返回还是等待

 @Overridepublic void flush(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {//省略代码final long generation;//获取读锁(readLock.acquire())try (ReleasableLock lock = readLock.acquire()) {ensureOpen();//尝试获取刷新锁(flushLock)if (flushLock.tryLock() == false) {//如果无法立即获取到锁,则根据 waitIfOngoing 的值决定是等待刷新完成还是立即返回。// if we can't get the lock right away we block if needed otherwise barfif (waitIfOngoing == false) {logger.trace("detected an in-flight flush, not blocking to wait for it's completion");listener.onResponse(FlushResult.NO_FLUSH);return;}logger.trace("waiting for in-flight flush to finish");//如果成功获取到刷新锁flushLock.lock();logger.trace("acquired flush lock after blocking");} else {logger.trace("acquired flush lock immediately");}try {//仅当下面这四种情况才进行刷新操作//(1) Lucene 有未提交的文档,//或 (2) 被调用方强制执行,//或 (3) 新创建的提交指向不同的 translog 生成(可以释放 translog),//或 (4) 上次提交中的本地检查点信息过时boolean hasUncommittedChanges = hasUncommittedChanges(); 检查是否有未提交的文档变更(hasUncommittedChanges())if (hasUncommittedChanges|| force  //是否需要强制刷新(force)|| shouldPeriodicallyFlush() //是否需要定期刷新(shouldPeriodicallyFlush())//或者本地检查点信息是否过时|| getProcessedLocalCheckpoint() > Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))) {//确保可以执行刷新(ensureCanFlush())ensureCanFlush();//获取最后一个写入的事务日志位置(getTranslogLastWriteLocation())Translog.Location commitLocation = getTranslogLastWriteLocation();try {//如果当前 translog 不为空,则将其滚动到新生成。这不会提交 translog。这样正在进行的写入和刷新磁盘用的translog就不冲突translog.rollGeneration();logger.trace("starting commit for flush; commitTranslog=true");long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong();//记录即将生成的区段,以便实时版本地图存档记录在刷新时转到存档的文档 ID 的正确区段生成。// 否则,如果在提交 IndexWriter 后立即对新文档进行索引更新,并且刷新将它们移动到存档中,则一旦我们在搜索分片上看到该段生成,// 我们就会将它们从存档中清除,但这些更改不包括在提交中,因为它们发生在提交之后//提交索引写入器preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);//执行刷新操作commitIndexWriter(indexWriter, translog);logger.trace("finished commit for flush");//我们需要刷新以清除旧版本值refresh("version_table_flush", SearcherScope.INTERNAL, true);translog.trimUnreferencedReaders();//更新最后一次刷新的时间戳this.lastFlushTimestamp = lastFlushTimestamp;} catch (AlreadyClosedException e) {failOnTragicEvent(e);throw e;} catch (Exception e) {throw new FlushFailedEngineException(shardId, e);}//刷新最后提交的段信息(refreshLastCommittedSegmentInfos()refreshLastCommittedSegmentInfos();//获取刷新后的段信息generation = lastCommittedSegmentInfos.getGeneration();//调用刷新监听器的 afterFlush 方法flushListener.afterFlush(generation, commitLocation);} else {//如果不满足刷新条件,则直接获取最后提交的段信息的代数。generation = lastCommittedSegmentInfos.getGeneration();}} catch (FlushFailedEngineException ex) {maybeFailEngine("flush", ex);listener.onFailure(ex);return;} catch (Exception e) {listener.onFailure(e);return;} finally {//释放刷新锁flushLock.unlock();logger.trace("released flush lock");}}if (engineConfig.isEnableGcDeletes()) {pruneDeletedTombstones();}//等待提交的持久性完成,并通过回调函数返回刷新结果waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation)));}

2、在刷新之后translog需要滚动生成新的,这样不会影响正在进行的写入和删除时方便

/**如果当前 translog 生成不为空,则将其滚动到新生成。这不会提交 translog。* @throws IOException if an I/O exception occurred during any file operations*/public void rollGeneration() throws IOException {syncBeforeRollGeneration();//检查当前操作数是否为0,并且主要期限与当前的主要期限相同。如果满足条件,则直接返回,不执行后续操作if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {return;}//使用writeLock.acquire()获取写锁,确保代码块中的操作是以独占方式进行的。try (Releasable ignored = writeLock.acquire()) {//调用ensureOpen()方法确保资源处于打开状态ensureOpen();try {//将当前的Translog关闭并转换为TranslogReader对象。final TranslogReader reader = current.closeIntoReader();//,将reader添加到readers集合中readers.add(reader);//使用断言确认检查点文件中的generation与当前的generation相匹配。assert Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)).generation == current.getGeneration();//将检查点文件复制到指定的位置copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));//创建一个新的Translog文件,并更新检查点数据current = createWriter(current.getGeneration() + 1);logger.trace("current translog set to [{}]", current.getGeneration());} catch (final Exception e) {tragedy.setTragicException(e);closeOnTragicEvent(e);throw e;}}}

3、把IndexWriter中的数据持久化磁盘

/*** 把索引写入器(IndexWriter)数据持久化到磁盘,* 用到了lucene 索引与 translog 关联的 translog uuid,这样会把这个uuid及之前的数据写入到磁盘* @param writer   the index writer to commit* @param translog the translog*/protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {//确保可以执行刷新操作ensureCanFlush();try {//获取本地检查点(localCheckpoint)final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();//设置一个回调函数,用来设置索引写入器的提交数据,writer.setLiveCommitData(() -> {/** 上面捕获的用户数据(例如本地检查点)包含在 Lucene 刷新段之前必须评估的数据,包括本地检查点和其他值。* 最大序列号是不同的,我们永远不希望最大序列号小于进入 Lucene 提交的最后一个序列号,否则在从此提交点恢复并随后将新文档写入索引时,我们将面临对两个不同文档重复使用序列号的风险。* 由于我们只知道哪些 Lucene 文档在 {@link IndexWritercommit()} 调用刷新所有文档后进入最终提交,因此我们将最大序列号的计算推迟到提交数据迭代器的调用时间(在所有文档刷新到 Lucene 之后发生)。*///创建一个包含提交数据的映射(commitData),包括事务日志的UUID、本地检查点、最大序列号等信息。final Map<String, String> commitData = Maps.newMapWithExpectedSize(8);// translog.getTranslogUUID()返回用于将 lucene 索引与 translog 关联的 translog uuid。commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));commitData.put(HISTORY_UUID_KEY, historyUUID);final String currentForceMergeUUID = forceMergeUUID;if (currentForceMergeUUID != null) {commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);}commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));commitData.put(ES_VERSION, Version.CURRENT.toString());logger.trace("committing writer with commit data [{}]", commitData);//将提交数据返回给索引写入器return commitData.entrySet().iterator();});//禁用大合并后的定期刷新(shouldPeriodicallyFlushAfterBigMerge)shouldPeriodicallyFlushAfterBigMerge.set(false);//提交索引写入器,将数据写入磁盘writer.commit();} catch (final Exception ex) {try {failEngine("lucene commit failed", ex);} catch (final Exception inner) {ex.addSuppressed(inner);}throw ex;} catch (final AssertionError e) {//省略代码}}

在Bulk批量给索引增加数据源码 中,后面的文档添加到Lucene的这个方法indexIntoLucene 里面就是写入了indexWriter 这里commit会直接写入到磁盘

其中indexWriter的全路径org.apache.lucene.index.IndexWriter;

4、开始处理translog.log

在上面rollGeneration()方法就把最后一个translog放入了readers,也为删除做准备

   //Translog 列表保证按 Translog 生成的顺序排列,private final List<TranslogReader> readers = new ArrayList<>();/*** Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum* required generation* 通过要求 {@link TranslogDeletionPolicy} 提供所需的最小生成来修剪未引用的 translog 生成*/public void trimUnreferencedReaders() throws IOException {//首先获取读取锁,并检查是否有可以修剪的读取器try (ReleasableLock ignored = readLock.acquire()) {//如果已关闭或者最小引用的代数与最小文件代数相同,则不进行修剪操作。if (closed.get()) {// 我们可能会因一些悲惨事件而关闭,不要删除任何内容return;}if (getMinReferencedGen() == getMinFileGeneration()) {return;}}// 将大部分数据写入磁盘,以减少写入锁的持有时间sync();//获取写入锁,并再次检查是否已关闭或者最小引用的代数与最小文件代数相同。try (ReleasableLock ignored = writeLock.acquire()) {if (closed.get()) {// we're shutdown potentially on some tragic event, don't delete anything//我们可能会因一些悲惨事件而关闭,不要删除任何内容return;}final long minReferencedGen = getMinReferencedGen();//代码遍历读取器列表,for (Iterator<TranslogReader> iterator = readers.iterator(); iterator.hasNext();) {TranslogReader reader = iterator.next();if (reader.getGeneration() >= minReferencedGen) {break;}//删除不再被引用的读取器iterator.remove();IOUtils.closeWhileHandlingException(reader);//translogPath是translogfinal Path translogPath = reader.path();logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);//打开translog时使用检查点来了解应从哪些文件中恢复。现在,我们更新检查点以忽略要删除的文件。// 请注意,recoverFromFiles 中有一个规定,允许我们同步检查点但在删除文件之前崩溃的情况。立即同步,以确保最多有一个未引用的生成。//将所有缓冲的运算写入磁盘和 fsync 文件。current是TranslogWritercurrent.sync();//并删除相关文件deleteReaderFiles(reader);}assert readers.isEmpty() == false || current.generation == minReferencedGen: "all readers were cleaned but the minReferenceGen ["+ minReferencedGen+ "] is not the current writer's gen ["+ current.generation+ "]";} catch (final Exception ex) {closeOnTragicEvent(ex);throw ex;}}

(1) 首先把内存中的translog全部写入磁盘

/*** Sync's the translog.*/public void sync() throws IOException {try (ReleasableLock lock = readLock.acquire()) {if (closed.get() == false) {current.sync();}} catch (final Exception ex) {closeOnTragicEvent(ex);throw ex;}}
/***将所有缓冲的运算写入磁盘和 fsync 文件。* 同步过程中的任何异常都将被解释为悲剧性异常,写入器将在引发异常之前关闭。*/public void sync() throws IOException {//这里一个是最大,一个是-2L,直接就是强制把translog从内存中刷进磁盘syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO);}
 /***将数据同步到指定的偏移量和全局检查点,确保数据的持久性。如果满足一定条件,会执行同步操作* @return <code>true</code> if this call caused an actual sync operation*/final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException {//检查lastSyncedCheckpoint的偏移量和全局检查点是否小于指定的偏移量和全局检查点,并且需要进行同步操作if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) {//省略代码//获取一个同步锁synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we waitif ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist)&& syncNeeded()) {//双重检查锁定 - 除非我们必须这样做,否则我们不想 fsync,现在我们有了锁,我们应该再次检查,因为如果这个代码很忙,我们可能已经足够 fsync 了final Checkpoint checkpointToSync;final List<Long> flushedSequenceNumbers;final ReleasableBytesReference toWrite;//再获取一个写锁try (ReleasableLock toClose = writeLock.acquire()) {synchronized (this) {ensureOpen();checkpointToSync = getCheckpoint();//获取最新的写入数据buffer。toWrite = pollOpsToWrite();//如果没有未同步的序列号,则设置flushedSequenceNumbers为null;if (nonFsyncedSequenceNumbers.isEmpty()) {flushedSequenceNumbers = null;} else {//否则,将nonFsyncedSequenceNumbers赋值给flushedSequenceNumbersflushedSequenceNumbers = nonFsyncedSequenceNumbers;//nonFsyncedSequenceNumbers重新初始化为一个空的列表nonFsyncedSequenceNumbers = new ArrayList<>(64);}}try {      //写入管道操作,下面channel.force会强制刷盘writeAndReleaseOps(toWrite);assert channel.position() == checkpointToSync.offset;} catch (final Exception ex) {closeWithTragicEvent(ex);throw ex;}}//现在在同步块之外执行实际的 fsync,以便我们可以继续写入缓冲区等。try {assert lastSyncedCheckpoint.offset != checkpointToSync.offset || toWrite.length() == 0;if (lastSyncedCheckpoint.offset != checkpointToSync.offset) {//则调用channel.force(false)方法来强制刷新通道的数据到磁盘。channel.force(false);}//更新检查点//将checkpointToSync写入到指定的checkpointChannel和checkpointPath中Checkpoint.write(checkpointChannel, checkpointPath, checkpointToSync);} catch (final Exception ex) {closeWithTragicEvent(ex);throw ex;}if (flushedSequenceNumbers != null) {//则遍历flushedSequenceNumbers列表,并对每个元素调用persistedSequenceNumberConsumer::accept方法处理。flushedSequenceNumbers.forEach(persistedSequenceNumberConsumer::accept);}assert lastSyncedCheckpoint.offset <= checkpointToSync.offset: "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;lastSyncedCheckpoint = checkpointToSync; // write protected by syncLockreturn true;}}}return false;}

其中 toWrite = pollOpsToWrite(); 就是下面这个,至于buffer是什么?
可以看一下Elasticsearch 8.9 Bulk批量给索引增加数据源码 中的 public Translog.Location add(final BytesReference data, final long seqNo)方法体中会把给this.buffer赋值

 private synchronized ReleasableBytesReference pollOpsToWrite() {ensureOpen();if (this.buffer != null) {//则将 buffer 赋值给 toWrite 变量,并将 buffer 置为 null,同时将 bufferedBytes 置为 0。ReleasableBytesStreamOutput toWrite = this.buffer;this.buffer = null;this.bufferedBytes = 0;//创建一个新的 ReleasableBytesReference 对象,该对象的字节为 toWrite.bytes(),并将 toWrite 作为释放引用return new ReleasableBytesReference(toWrite.bytes(), toWrite);} else {//返回一个空的 ReleasableBytesReference 对象return ReleasableBytesReference.empty();}}

下面就是正常的把数据写入到writeToFile

//将给定的 ReleasableBytesReference 对象写入到文件中,上面有channel.force会强制把Channels刷盘private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {try (ReleasableBytesReference toClose = toWrite) {assert writeLock.isHeldByCurrentThread();final int length = toWrite.length();//检查 toWrite 的长度是否为0,如果是,则直接返回if (length == 0) {return;}//试从 diskIoBufferPool 中获取一个 ioBuffer,用于写入操作。ByteBuffer ioBuffer = diskIoBufferPool.maybeGetDirectIOBuffer();//如果获取不到 ioBuffer,则说明当前线程不使用直接缓冲区,代码会直接将数据写入到文件中,而不是先复制到 ioBuffer 中。if (ioBuffer == null) {// not using a direct buffer for writes from the current thread so just write without copying to the io bufferBytesRefIterator iterator = toWrite.iterator();BytesRef current;while ((current = iterator.next()) != null) {Channels.writeToChannel(current.bytes, current.offset, current.length, channel);}return;}//如果成功获取到 ioBuffer,代码会使用迭代器遍历 toWrite 中的数据,并将数据逐个写入到 ioBuffer 中。BytesRefIterator iterator = toWrite.iterator();BytesRef current;while ((current = iterator.next()) != null) {int currentBytesConsumed = 0;while (currentBytesConsumed != current.length) {int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining());ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite);currentBytesConsumed += nBytesToWrite;//如果 ioBuffer 的空间已满,if (ioBuffer.hasRemaining() == false) {//则将 ioBuffer 翻转(从写模式切换到读模式)ioBuffer.flip();//然后调用 writeToFile 方法将数据写入到文件中,writeToFile(ioBuffer);//并清空 ioBuffer。ioBuffer.clear();}}}//再次翻转 ioBufferioBuffer.flip();//并调用 writeToFile 方法将剩余的数据写入到文件中。writeToFile(ioBuffer);}}

(2) 删除磁盘中的对应的translog文件

/*** 删除与读取器关联的所有文件。package-private,以便此时能够模拟节点故障*/void deleteReaderFiles(TranslogReader reader) {IOUtils.deleteFilesIgnoringExceptions(reader.path(),reader.path().resolveSibling(getCommitCheckpointFileName(reader.getGeneration())));}
 /** 删除所有给定的文件,禁止所有抛出的 {@link IOException}。某些文件可能为 null,如果是这样,则忽略它们。** @param files the paths of files to delete*/public static void deleteFilesIgnoringExceptions(final Path... files) {for (final Path name : files) {if (name != null) {// noinspection EmptyCatchBlocktry {Files.delete(name);} catch (final IOException ignored) {}}}}

5、更新最后刷新时间和刷新最后提交的段信息

//刷新最后提交的段信息(refreshLastCommittedSegmentInfos()
refreshLastCommittedSegmentInfos();
//获取刷新后的段信息
generation = lastCommittedSegmentInfos.getGeneration();
//调用刷新监听器的 afterFlush 方法flushListener.afterFlush(generation, commitLocation);
private void refreshLastCommittedSegmentInfos() {/** 在某些情况下,如果引擎由于意外事件关闭,我们无法获取写锁并等待独占访问。* 这可能会减少对存储的引用计数,从而关闭存储。为了保证能够使用存储,我们需要增加引用计数*/store.incRef();try {//读取存储中的最后提交的段信息,并将结果赋给lastCommittedSegmentInfos变量。lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();} catch (Exception e) {//读取过程中发生异常,代码会检查引擎是否已关闭。如果引擎未关闭,则记录一个警告日志,并检查异常是否是Lucene的损坏异常。// 如果是损坏异常,代码会抛出FlushFailedEngineException异常if (isClosed.get() == false) {logger.warn("failed to read latest segment infos on flush", e);if (Lucene.isCorruptionException(e)) {throw new FlushFailedEngineException(shardId, e);}}} finally {//代码会减少对存储的引用计数,以确保引用计数的正确性和资源释放。store.decRef();}}

三、通过源码得到一些结论

1、translog也会刷进磁盘,不只是在内存中存在,异步方式会定时刷translog到磁盘
2、等segment刷进磁盘后,会把对应translog.log磁盘文件删除,所以translog说是临时文件,没有问题,
3、translog的删除方式是先刷到磁盘,直接通过删除文件的方式删除translog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/206224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android Audio实战——音频链路分析(二十五)

在 Android 系统的开发过程当中,音频异常问题通常有如下几类:无声、调节不了声音、爆音、声音卡顿和声音效果异常(忽大忽小,低音缺失等)等。尤其声音效果这部分问题通常从日志上信息量较少,相对难定位根因。想要分析此类问题,便需要对声音传输链路有一定的了解,能够在链…

【论文解读】:大模型免微调的上下文对齐方法

本文通过对alignmenttuning的深入研究揭示了其“表面性质”&#xff0c;即通过监督微调和强化学习调整LLMs的方式可能仅仅影响模型的语言风格&#xff0c;而对模型解码性能的影响相对较小。具体来说&#xff0c;通过分析基础LLMs和alignment-tuned版本在令牌分布上的差异&#…

100多种视频转场素材|专业胶片,抖动,光效电影转场特效PR效果预设

100多种 Premiere Pro 效果预设&#xff0c;包含&#xff1a;“胶片框架”、“胶片烧录”、“彩色LUT”、“相机抖动”、“电影Vignette”和“胶片颗粒”。非常适合制作复古风格的视频&#xff0c;添加独特的色彩。包括视频教程。 来自PR模板网&#xff1a;https://prmuban.com…

git 本地有改动,远程也有改动,且文件是自动生成的配置文件

在改动过的地方 文件是.lock文件&#xff0c;自动生成的。想切到远程的分支&#xff0c;但是远程的分支也有改动过。这时候就要解决冲突&#xff0c;因为这是两个分支&#xff0c;代码都是不一样的&#xff0c;要先把这改动的代码提交在本地或者提交在本分支的远程才可以切到其…

ke13--10章-1数据库JDBC介绍

注册数据库(两种方式),获取连接,通过Connection对象获取Statement对象,使用Statement执行SQL语句。操作ResultSet结果集 ,回收数据库资源. 需要语句: 1Class.forName("DriverName");2Connection conn DriverManager.getConnection(String url, String user, String…

Qt国际化翻译Linguist使用

QT的国际化是非常方便的&#xff0c;简单的说就是QT有自带的翻译工具把我们源代码中的字符串翻译成任何语言文件&#xff0c;再把这个语言文件加载到项目中就可以显示不同的语言。下面直接上手&#xff1a; 步骤一&#xff1a;打开pro文件&#xff0c;添加&#xff1a;TRANSLA…

idea快速定位文件、自动定位文件位置

如何快速定位到Student类的位置 如下图&#xff1a;点击类似瞄准按钮的图标即可 自动定位到文件位置 打开设置&#xff0c;勾选这个Always Select Opened File 这样子你点击文件他就会自动追踪&#xff0c;切换一个追踪一个&#xff1b; 事半功倍 &#xff01;&#xff01…

[揭秘] 文件恢复工具背后的真相!所有删除的文件都可以恢复吗

许多数据恢复工具声称它们可以在您意外删除文件时为您提供帮助。然而&#xff0c;并非所有数据恢复工具都有相同的目的。有些是针对特定文件类型而设计的&#xff0c;而另一些则无法处理用户的请求。 当我们谈论数据恢复工具时&#xff0c;用户存在很多误解。每个人最常见的问…

Vue配置代理解决跨域

Network的status中报CORS error指在前端&#xff08;Vue.js&#xff09;发起跨域请求时&#xff0c;被服务器拒绝访问的错误 在本地开发环境中&#xff0c;Vue.js 将默认从 http://localhost:8080 启动服务器。如果浏览器访问服务器时使用的 URL 不是该地址&#xff0c;就可能…

编程创意汇聚地,打造个性作品集 | 开源日报 No.97

spring-projects/spring-boot Stars: 70.4k License: Apache-2.0 Spring Boot 是一个用于简化 Spring 应用程序开发的框架&#xff0c;它通过提供默认配置和约定大于配置的方式来减少开发者的工作量。Spring Boot 可以快速地创建独立的、生产级别的基于 Spring 框架的应用程序…

Element-ui框架完成vue2项目的vuex的增删改查

看效果图是否是你需要的 这是原来没有Element-ui框架的 首先&#xff0c;你要在你的项目里安装Element-ui yarn命令 yarn add element-uinpm命令 npm install element-ui --save好了现在可以粘贴代码 //main.js import Vue from vue import Vuex from vuex import VueRouter …

【react】动态页面转换成html文件下载,解决样式问题

需求 今天遇到一个需求&#xff0c;挺恶心人的&#xff0c;将一个在线文档页面&#xff0c;可以导出成为html页面查看。 看到网上有使用fs模块&#xff0c;通过react的ReactDOMServer.renderToStaticMarkup将组件转成html字符串&#xff0c;输出文件了。 但是我尝试了&#x…

STL(一)(pair篇)

1.pair的定义和结构 在c中,pair是一个模板类,用于表示一对值的组合它位于<utility>头文件中 pair的定义如下: template<class T1, class T2> struct pair{T1 first; //第一个值T2 second; //第二个值//构造函数pair();pair(const T1&x,const T2&y);//比较…

3、Linux_系统用户管理

1.Linux 用户管理 1.1概述 Linux系统是一个多用户多任务的操作系统&#xff0c;任何一个要使用系统资源的用户&#xff0c;都必须首先向系统管理员申请一个账号&#xff0c;然后以这个账号的身份进入系统。root用户是系统默认创建的管理员账号。 1.2添加用户 语法 useradd […

[Linux] nginx配置的主配置文件

一、六个模块的作用 全局块&#xff1a;全局配置&#xff0c;对全局生效&#xff1b; events块&#xff1a;配置影响 Nginx 服务器与用户的网络连接&#xff1b; http块&#xff1a;配置代理&#xff0c;缓存&#xff0c;日志定义等绝大多数功能和第三方模块的配置&#xff1b;…

Qt 输入一组数,排序后用柱状图显示

Qt柱状图&#xff0c;需要使用到QChart模块&#xff0c;因此需要在安装Qt时勾选上QChart模块。然后在工程.pro文件中加上 QT charts 参考代码&#xff1a; //MainWindow.h #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include <QPushButton…

模块一:双指针——1089.复写零

文章目录 题目解析算法原理异地原地 代码实现 题目解析 题目链接&#xff1a;1089.复写零 这题的暴力解法还是很简单的&#xff0c;不过这道题因为加了两个限制之后&#xff0c;多了一些细节需要去处理。我们通过一个例子来讲解这道题目&#xff1a; 在这个示例中&#xff0…

六个自媒体写作方法,提升自媒体创作收益

在自媒体时代&#xff0c;写作成为了一个不可或缺的技能。特别是对于新手来说&#xff0c;掌握一些有效的写作方法&#xff0c;可以事半功倍&#xff0c;更好地展现个人创意和观点。在这里&#xff0c;我将分享六个适合新手的自媒体写作方法&#xff0c;希望能够为你在写作之路…

贪心算法背包问题c

在背包问题中&#xff0c;贪心算法通常用来解决0-1背包问题&#xff0c;也就是每种物品都有固定数量&#xff0c;你可以选择拿或者不拿&#xff0c;但不可以拿走部分。以下是一个用C语言实现的贪心算法的例子&#xff1a; #include <stdio.h>#define MAX_N 1000#define …

大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现

大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现 技术栈&#xff1a;大数据爬虫/机器学习学习算法/数据分析与挖掘/大数据可视化/Django框架/Mysql数据库 本项目基于 Django框架开发的房屋可视化分析推荐系统。这个系统结合了大数据爬虫、机器学…