Trino SQL执行过程的关键特性
Client
、Coordinator
、Worker
之间的通讯,基于HTTP协议
。- SQL提交、解析、调度、执行等的流程全异步,最大化运行效率。
- 逻辑计划树被在Coordinator侧被拆分成
PlanFragment
,可以对应于Spark中的Stage
概念,会通过StageScheduler
被调度。 - 一个PlanFragment对应一个Stage,而一个Stage对应一个或多个Data Partition;一个Data
Partition
对应一个SqlTask
,它是在Worker结点上处理数据的实体(容器)。 - 一个
Partition
被拆分成一个或多个Split,是任务调度的最小单元,由Coordinator生成且数量确定。 - 一个
Split
会由Worker负责处理,而一个Split会被分割成一个或多个Page,它是数据处理的最小单元。 - Worker可以并行处理
Split
,且按时间片的调度线程处理Split,如果执行线程超过时间片的限额或由于其它原因被阻塞,则线程会主动放弃当前Split的执行
。(具体过程会在另外的文章中分析) Block
是Trino集群中数据传输的最小单元,以列式的格式存储和压缩。- 默认情况下,如果资源足够,一个SQL作业的所有Stage会被同时调度和创建,
此时作业的处理模型类似Flink,是一个流式的过程,一旦有一个Source Split生成,就可以流经所有的Stage产生最终的输出结果
。 - 默认情况下,所有的SQL作业共用一个
Resource Group
,一旦有一个可用的Worker,就会调度作业执行。 - SQL执行期间,每一个SqlTask的
执行都是全内存的
,意味着无法进行故障恢复
,因此一旦某个任务失败,则最终整个作业的会失败。
SQL提交&注册
通过
/v1/statement/queued
API向coordinator提交新的Query,会首先将此query放入QueryManager的缓存池中,然后返回给客户端下一次应该访问的地址。
客户端提交SQL成功后,会立即调用queued/{queryId}/{slug}/{token}
REST API,轮询SQL的执行状态。
public class QueuedStatementResource {@ResourceSecurity(AUTHENTICATED_USER)@POST@Produces(APPLICATION_JSON)public Response postStatement(String statement,@Context HttpServletRequest servletRequest,@Context HttpHeaders httpHeaders,@Context UriInfo uriInfo){if (isNullOrEmpty(statement)) {throw badRequest(BAD_REQUEST, "SQL statement is empty");}// 注册新的query这里仅仅是创建Query实例,并添加到QueryManager的缓存池中Query query = registerQuery(statement, servletRequest, httpHeaders);return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo));}private Query registerQuery(String statement, HttpServletRequest servletRequest, HttpHeaders httpHeaders){Optional<String> remoteAddress = Optional.ofNullable(servletRequest.getRemoteAddr());Optional<Identity> identity = Optional.ofNullable((Identity) servletRequest.getAttribute(AUTHENTICATED_IDENTITY));MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders();SessionContext sessionContext = sessionContextFactory.createSessionContext(headers, alternateHeaderName, remoteAddress, identity);// 创建一个SQL实例,维护当前SQL生命周期内的各种信息Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory);// 将Query实例注册到QueryManagerqueryManager.registerQuery(query);// let authentication filter know that identity lifecycle has been handed offservletRequest.setAttribute(AUTHENTICATED_IDENTITY, null);return query;}
}
Query类
维护SQL运行时状态,可以通过此类获取SQL运行期间的状态信息;同时也负责与Client交互,提供对SQL任务的管理能力。
private static final class Query{private final String query;private final SessionContext sessionContext;private final DispatchManager dispatchManager;private final QueryId queryId;private final Optional<URI> queryInfoUrl;private final Slug slug = Slug.createNew();private final AtomicLong lastToken = new AtomicLong();private final long initTime = System.nanoTime();private final AtomicReference<Boolean> submissionGate = new AtomicReference<>();private final SettableFuture<Void> creationFuture = SettableFuture.create();public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, QueryInfoUrlFactory queryInfoUrlFactory){this.query = requireNonNull(query, "query is null");this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");this.queryId = dispatchManager.createQueryId();requireNonNull(queryInfoUrlFactory, "queryInfoUrlFactory is null");this.queryInfoUrl = queryInfoUrlFactory.getQueryInfoUrl(queryId);}public boolean isCreated(){return creationFuture.isDone();}private ListenableFuture<Void> waitForDispatched(){// 只能调用`queued/{queryId}/{slug}/{token}` REST API,获取SQL任务的状态时,才会调用此方法,触发当前SQL任务的提交submitIfNeeded();if (!creationFuture.isDone()) {return nonCancellationPropagating(creationFuture);}// otherwise, wait for the query to finishreturn dispatchManager.waitForDispatched(queryId);}private void submitIfNeeded(){if (submissionGate.compareAndSet(null, true)) {// 尝试向dispatcherManager提交一个SQL任务creationFuture.setFuture(dispatchManager.createQuery(queryId, slug, sessionContext, query));}}public QueryResults getQueryResults(long token, UriInfo uriInfo){// 客户端获取结果}public void cancel(){creationFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor());}public void destroy(){sessionContext.getIdentity().destroy();}
}
QueuedStatementResource.QueryManager类
负责维护所有活着的Query实例,为REST API提供快速获取Query功能;同时也负责检查客户端提交超时逻辑,详见tryAbandonSubmissionWithTimeout(clientTimeout)的检查条件。
对于Server来说,只有触发了Query::waitForDispatched()
方法,才将任务的状态设置为submitted
。那如果客户端提交一个SQL执行后失联,肯定不会再调用REST API获取SQL的执行状态了,因此就不可能触发这个方法,这个期间段就被算作提交时间。
@ThreadSafeprivate static class QueryManager{private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(daemonThreadsNamed("drain-state-query-manager"));private final Duration querySubmissionTimeout;public QueryManager(Duration querySubmissionTimeout){this.querySubmissionTimeout = requireNonNull(querySubmissionTimeout, "querySubmissionTimeout is null");}public void initialize(DispatchManager dispatchManager){scheduledExecutorService.scheduleWithFixedDelay(() -> syncWith(dispatchManager), 200, 200, MILLISECONDS);}private void syncWith(DispatchManager dispatchManager){queries.forEach((queryId, query) -> {if (shouldBePurged(dispatchManager, query)) {removeQuery(queryId);}});}private boolean shouldBePurged(DispatchManager dispatchManager, Query query){if (query.isSubmissionAbandoned()) {// Query submission was explicitly abandonedreturn true;}if (query.tryAbandonSubmissionWithTimeout(querySubmissionTimeout)) {// Query took too long to be submitted by the clientreturn true;}if (query.isCreated() && !dispatchManager.isQueryRegistered(query.getQueryId())) {// Query was created in the DispatchManager, and DispatchManager has already purged the queryreturn true;}return false;}private void removeQuery(QueryId queryId){Optional.ofNullable(queries.remove(queryId)).ifPresent(QueryManager::destroyQuietly);}public void registerQuery(Query query){Query existingQuery = queries.putIfAbsent(query.getQueryId(), query);checkState(existingQuery == null, "Query already registered");}@Nullablepublic Query getQuery(QueryId queryId){return queries.get(queryId);}}
Query实例的调度
只有当前客户端尝试获取SQL的执行状态时,才会触发SQL任务的提交,提交到。
public class DispatchManager {public ListenableFuture<Void> createQuery(QueryId queryId, Slug slug, SessionContext sessionContext, String query){requireNonNull(queryId, "queryId is null");requireNonNull(sessionContext, "sessionContext is null");requireNonNull(query, "query is null");checkArgument(!query.isEmpty(), "query must not be empty string");checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(), "query %s already exists", queryId);// It is important to return a future implementation which ignores cancellation request.// Using NonCancellationPropagatingFuture is not enough; it does not propagate cancel to wrapped future// but it would still return true on call to isCancelled() after cancel() is called on it.DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();// 异步创建dispatchExecutor.execute(() -> {try {createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);}finally {queryCreationFuture.set(null);}});return queryCreationFuture;}private <C> void createQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager){Session session = null;PreparedQuery preparedQuery = null;try {if (query.length() > maxQueryLength) {int queryLength = query.length();query = query.substring(0, maxQueryLength);throw new TrinoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));}// decode sessionsession = sessionSupplier.createSession(queryId, sessionContext);// check query execute permissionsaccessControl.checkCanExecuteQuery(sessionContext.getIdentity());// prepare query// 对用户SQL进行Parsing,产生AST实例preparedQuery = queryPreparer.prepareQuery(session, query);// select resource groupOptional<String> queryType = getQueryType(preparedQuery.getStatement()).map(Enum::name);// 如果没有配置ResourceGroup的分配策略,则默认会将当前SQL分析到全局队列中,所有的SQL共享集群SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(sessionContext.getIdentity().getPrincipal().isPresent(),sessionContext.getIdentity().getUser(),sessionContext.getIdentity().getGroups(),sessionContext.getSource(),sessionContext.getClientTags(),sessionContext.getResourceEstimates(),queryType));// apply system default session properties (does not override user set properties)session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, queryType, selectionContext.getResourceGroupId());// mark existing transaction as activetransactionManager.activateTransaction(session, isTransactionControlStatement(preparedQuery.getStatement()), accessControl);// 将query和preparedQuery封装成一个DispatchQuery实例,实际上是一个LocalDispatchQuery类的实例,这个过程是异步的。// 它提供了如下的方法,帮助上层获取任务的调度状态。// ListenableFuture<Void> getDispatchedFuture();// DispatchInfo getDispatchInfo();//// Trino中SQL执行的每一个阶段基本上都是异步的,为了能够在异步情况下正确管理Query的生命周期,都需要在相应的阶段创建一个// 对应的实例,例如这里的DispatchQuery。DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(session,query,preparedQuery,slug,selectionContext.getResourceGroupId());// DispatchQuery一旦创建成功,就会将这个对象添加到QueryTracker对象中的,由它管理SQL的执行生命周期boolean queryAdded = queryCreated(dispatchQuery);if (queryAdded && !dispatchQuery.isDone()) {// 如果SQL成功被添加进了QueryTracker,但是dispatchQuery还没有完成创建,则先将它放进提交到resource group中,等待被调度try {resourceGroupManager.submit(dispatchQuery, selectionContext, dispatchExecutor);}catch (Throwable e) {// dispatch query has already been registered, so just fail it directlydispatchQuery.fail(e);}}}catch (Throwable throwable) {// creation must never fail, so register a failed query in this caseif (session == null) {session = Session.builder(sessionPropertyManager).setQueryId(queryId).setIdentity(sessionContext.getIdentity()).setSource(sessionContext.getSource().orElse(null)).build();}// 如果发生了任务异常,会创建一个FailedDispatchQuery的实例,记录失败的种信息。Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql, Optional.empty(), throwable);queryCreated(failedDispatchQuery);}}
}
创建LocalDispatchQuery实例
public class LocalDispatchQueryFactoryimplements DispatchQueryFactory
{@Overridepublic DispatchQuery createDispatchQuery(Session session,String query,PreparedQuery preparedQuery,Slug slug,ResourceGroupId resourceGroup){WarningCollector warningCollector = warningCollectorFactory.create();// 为新提交的Query实例,创建一个新的状态机QueryStateMachine stateMachine = QueryStateMachine.begin(query,preparedQuery.getPrepareSql(),session,locationFactory.createQueryLocation(session.getQueryId()),resourceGroup,isTransactionControlStatement(preparedQuery.getStatement()),transactionManager,accessControl,executor,metadata,warningCollector,getQueryType(preparedQuery.getStatement()));// It is important that `queryCreatedEvent` is called here. Moving it past the `executor.submit` below// can result in delivering query-created event after query analysis has already started.// That can result in misbehaviour of plugins called during analysis phase (e.g. access control auditing)// which depend on the contract that event was already delivered.//// Note that for immediate and in-order delivery of query events we depend on synchronous nature of// QueryMonitor and EventListenerManager.queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty()));// 异步的方式,创建QueryExecution实例,实际上是SqlQueryExecution的实例ListenableFuture<QueryExecution> queryExecutionFuture = executor.submit(() -> {QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(preparedQuery.getStatement().getClass());if (queryExecutionFactory == null) {throw new TrinoException(NOT_SUPPORTED, "Unsupported statement type: " + preparedQuery.getStatement().getClass().getSimpleName());}try {// 创建return queryExecutionFactory.createQueryExecution(preparedQuery, stateMachine, slug, warningCollector);}catch (Throwable e) {if (e instanceof Error) {if (e instanceof StackOverflowError) {log.error(e, "Unhandled StackOverFlowError; should be handled earlier; to investigate full stacktrace you may need to enable -XX:MaxJavaStackTraceDepth=0 JVM flag");}else {log.error(e, "Unhandled Error");}// wrapping as RuntimeException to guard us from problem that code downstream which investigates queryExecutionFuture may not necessarily handle// Error subclass of Throwable well.RuntimeException wrappedError = new RuntimeException(e);stateMachine.transitionToFailed(wrappedError);throw wrappedError;}stateMachine.transitionToFailed(e);throw e;}});// 返回LocalDispatchQuery的实例,可以看到这个实例,会有接收queryExecutionFuture变量,意味着只有当queryExecutionFuture.isDone()时,// 才标识着此实例创建完成。return new LocalDispatchQuery(stateMachine,queryExecutionFuture,queryMonitor,clusterSizeMonitor,executor,// queryManager是一个SqlQueryManager的实例对象,它内部维护着QueryTracker的引用,因此可以在更上层管理SQL任务的生命周期queryManager::createQuery);}
}
创建SqlQueryExecution实例
通过SqlQueryExecutionFactory.createQueryExecution()创建对象。
@ThreadSafe
public class SqlQueryExecutionimplements QueryExecution
{private SqlQueryExecution(PreparedQuery preparedQuery,QueryStateMachine stateMachine,Slug slug,PlannerContext plannerContext,AnalyzerFactory analyzerFactory,SplitSourceFactory splitSourceFactory,NodePartitioningManager nodePartitioningManager,NodeScheduler nodeScheduler,List<PlanOptimizer> planOptimizers,PlanFragmenter planFragmenter,RemoteTaskFactory remoteTaskFactory,int scheduleSplitBatchSize,ExecutorService queryExecutor,ScheduledExecutorService schedulerExecutor,FailureDetector failureDetector,NodeTaskMap nodeTaskMap,ExecutionPolicy executionPolicy,SplitSchedulerStats schedulerStats,StatsCalculator statsCalculator,CostCalculator costCalculator,DynamicFilterService dynamicFilterService,WarningCollector warningCollector,TableExecuteContextManager tableExecuteContextManager,TypeAnalyzer typeAnalyzer,TaskManager coordinatorTaskManager){try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {this.slug = requireNonNull(slug, "slug is null");this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");this.splitSourceFactory = requireNonNull(splitSourceFactory, "splitSourceFactory is null");this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null");this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null");this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");this.schedulerExecutor = requireNonNull(schedulerExecutor, "schedulerExecutor is null");this.failureDetector = requireNonNull(failureDetector, "failureDetector is null");this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");this.executionPolicy = requireNonNull(executionPolicy, "executionPolicy is null");this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null");this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");checkArgument(scheduleSplitBatchSize > 0, "scheduleSplitBatchSize must be greater than 0");this.scheduleSplitBatchSize = scheduleSplitBatchSize;// 保存状态机的引用this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");// analyze query// preparedQuery保存了SQL文本Parsing后的Statement(AST),因此这里基于此对象,对AST进行解析this.analysis = analyze(preparedQuery, stateMachine, warningCollector, analyzerFactory);// 向状态机注册Listener,一旦状态机的状态被设置为完成状态,就注销dynamicFilterService服务,这个服务的任务会在其它文章中详解。stateMachine.addStateChangeListener(state -> {if (!state.isDone()) {return;}unregisterDynamicFilteringQuery(dynamicFilterService.getDynamicFilteringStats(stateMachine.getQueryId(), stateMachine.getSession()));tableExecuteContextManager.unregisterTableExecuteContextForQuery(stateMachine.getQueryId());});// when the query finishes cache the final query info, and clear the reference to the output stageAtomicReference<SqlQueryScheduler> queryScheduler = this.queryScheduler;stateMachine.addStateChangeListener(state -> {if (!state.isDone()) {return;}// query is now done, so abort any work that is still running// 失败是完成状态的一种SqlQueryScheduler scheduler = queryScheduler.get();if (scheduler != null) {scheduler.abort();}});this.remoteTaskFactory = new MemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine);this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");}}
}
LocalDispatchQuery的创建及执行
实际上就是QueryExecution实例的执行,进入这个过程,实际上还需要经过ResourceGroup的筛选,筛选细节不是这里的重点,
因此略过,只需要知道ResourceGroup最终会调用LocalDispatchQuery::startWaitingForResources
方法。
资源检测
public class LocalDispatchQueryimplements DispatchQuery
{@Overridepublic void startWaitingForResources(){// 将状态机的状态设置为WAITING_RESOURCESif (stateMachine.transitionToWaitingForResources()) {waitForMinimumWorkers();}}private void waitForMinimumWorkers(){// 只有当有足够的Workers结点时,才会开始执行queryExecution实例,但由于我们没有修改默认的参数// 因此这里的限制条件是,一旦有1个Worker可用,就会触发startExecution(queryExecution)的调用。// wait for query execution to finish constructionaddSuccessCallback(queryExecutionFuture, queryExecution -> {Session session = stateMachine.getSession();int executionMinCount = 1; // always wait for 1 node to be upif (queryExecution.shouldWaitForMinWorkers()) {executionMinCount = getRequiredWorkers(session);}ListenableFuture<Void> minimumWorkerFuture = clusterSizeMonitor.waitForMinimumWorkers(executionMinCount, getRequiredWorkersMaxWait(session));// when worker requirement is met, start the executionaddSuccessCallback(minimumWorkerFuture, () -> startExecution(queryExecution));addExceptionCallback(minimumWorkerFuture, throwable -> queryExecutor.execute(() -> stateMachine.transitionToFailed(throwable)));// cancel minimumWorkerFuture if query fails for some reason or is cancelled by userstateMachine.addStateChangeListener(state -> {if (state.isDone()) {minimumWorkerFuture.cancel(true);}});});}private void startExecution(QueryExecution queryExecution){queryExecutor.execute(() -> {// 将状态机的状态设置为DISPATCHINGif (stateMachine.transitionToDispatching()) {try {// 提交给querySubmitter,就是在前面提到的queryManager::createQuery方法,最终会路由到SqlQueryExecution::start方法querySubmitter.accept(queryExecution);if (notificationSentOrGuaranteed.compareAndSet(false, true)) {queryExecution.addFinalQueryInfoListener(queryMonitor::queryCompletedEvent);}}catch (Throwable t) {// this should never happen but be safestateMachine.transitionToFailed(t);log.error(t, "query submitter threw exception");throw t;}finally {submitted.set(null);}}});}
}
SqlQueryExecution::start()
@ThreadSafe
public class SqlQueryExecutionimplements QueryExecution
{@Overridepublic void start(){try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {try {// 将状态机的状态设置为PlANNINGif (!stateMachine.transitionToPlanning()) {// query already started or finishedreturn;}// 启动监听线程,一旦在发现状态机的状态处理失败状态,则强制中止PLANNINGAtomicReference<Thread> planningThread = new AtomicReference<>(currentThread());stateMachine.getStateChange(PLANNING).addListener(() -> {if (stateMachine.getQueryState() == FAILED) {synchronized (this) {Thread thread = planningThread.get();if (thread != null) {thread.interrupt();}}}}, directExecutor());try {// 优化逻辑计划树,并切分为PlanFragments,以便能够调度Plan片段执行PlanRoot plan = planQuery();// DynamicFilterService needs plan for query to be registered.// Query should be registered before dynamic filter suppliers are requested in distribution planning.// 注册动态裁剪服务registerDynamicFilteringQuery(plan);// 调度plan执行,内部会创建SqlQueryScheduler实例,负责调度PlanFragments的分发和状态管理,这个过程是异步的planDistribution(plan);}finally {synchronized (this) {planningThread.set(null);// Clear the interrupted flag in case there was a race condition where// the planning thread was interrupted right after planning completes aboveThread.interrupted();}}tableExecuteContextManager.registerTableExecuteContextForQuery(getQueryId());// 将状态机的状态设置为STARTINGif (!stateMachine.transitionToStarting()) {// query already started or finishedreturn;}// if query is not finished, start the scheduler, otherwise cancel itSqlQueryScheduler scheduler = queryScheduler.get();if (!stateMachine.isDone()) {// 调用SqlQueryScheduler::start()方法,开始调度执行scheduler.start();}}catch (Throwable e) {fail(e);throwIfInstanceOf(e, Error.class);}}}
}
SqlQueryExecution::planDistribution
@ThreadSafe
public class SqlQueryExecutionimplements QueryExecution
{private void planDistribution(PlanRoot plan){// if query was canceled, skip creating schedulerif (stateMachine.isDone()) {return;}// record output fieldPlanFragment rootFragment = plan.getRoot().getFragment();stateMachine.setColumns(((OutputNode) rootFragment.getRoot()).getColumnNames(),rootFragment.getTypes());// build the stage execution objects (this doesn't schedule execution)SqlQueryScheduler scheduler = new SqlQueryScheduler(stateMachine,plan.getRoot(),nodePartitioningManager,nodeScheduler,remoteTaskFactory,plan.isSummarizeTaskInfos(),scheduleSplitBatchSize,queryExecutor,schedulerExecutor,failureDetector,nodeTaskMap,executionPolicy,schedulerStats,dynamicFilterService,tableExecuteContextManager,plannerContext.getMetadata(),splitSourceFactory,coordinatorTaskManager);queryScheduler.set(scheduler);// if query was canceled during scheduler creation, abort the scheduler// directly since the callback may have already firedif (stateMachine.isDone()) {scheduler.abort();queryScheduler.set(null);}}
}
生成StageManager实例,同时为每一个PlanFragment生成SqlStage实例
SqlStage负责维护跟踪所有归属它的任务的生命周期管理,以及状态维护
public class SqlQueryScheduler
{private static class StageManager{private static StageManager create(QueryStateMachine queryStateMachine,Session session,Metadata metadata,RemoteTaskFactory taskFactory,NodeTaskMap nodeTaskMap,ExecutorService executor,SplitSchedulerStats schedulerStats,SubPlan planTree,boolean summarizeTaskInfo){ImmutableMap.Builder<StageId, SqlStage> stages = ImmutableMap.builder();ImmutableList.Builder<SqlStage> coordinatorStagesInTopologicalOrder = ImmutableList.builder();ImmutableList.Builder<SqlStage> distributedStagesInTopologicalOrder = ImmutableList.builder();StageId rootStageId = null;ImmutableMap.Builder<StageId, Set<StageId>> children = ImmutableMap.builder();ImmutableMap.Builder<StageId, StageId> parents = ImmutableMap.builder();// 从Root Plan自顶向下、广度优先遍历,获取所有的SubPlansfor (SubPlan planNode : Traverser.forTree(SubPlan::getChildren).breadthFirst(planTree)) {PlanFragment fragment = planNode.getFragment();// 一个SubPlan或是PlanFragment就是一个Stage(同Spark中的概念相近),StageId的取值为{queryId}-{fragmentId}SqlStage stage = createSqlStage(getStageId(session.getQueryId(), fragment.getId()),fragment,extractTableInfo(session, metadata, fragment),taskFactory,session,summarizeTaskInfo,nodeTaskMap,executor,schedulerStats);StageId stageId = stage.getStageId();stages.put(stageId, stage);// 以拓扑序,维护所有的Stagesif (fragment.getPartitioning().isCoordinatorOnly()) {coordinatorStagesInTopologicalOrder.add(stage);}else {distributedStagesInTopologicalOrder.add(stage);}// 由于外层遍历是自顶向下的,因此每一个Stage就是最上游的Stage,即root stageif (rootStageId == null) {rootStageId = stageId;}// 维护Stages之间的依赖关系Set<StageId> childStageIds = planNode.getChildren().stream().map(childStage -> getStageId(session.getQueryId(), childStage.getFragment().getId())).collect(toImmutableSet());children.put(stageId, childStageIds);childStageIds.forEach(child -> parents.put(child, stageId));}StageManager stageManager = new StageManager(queryStateMachine,stages.build(),coordinatorStagesInTopologicalOrder.build(),distributedStagesInTopologicalOrder.build(),rootStageId,children.build(),parents.build());stageManager.initialize();return stageManager;}}
}
SqlQueryScheduler::start()
public class SqlQueryScheduler
{public synchronized void start(){if (started) {return;}started = true;if (queryStateMachine.isDone()) {return;}// when query is done or any time a stage completes, attempt to transition query to "final query info ready"queryStateMachine.addStateChangeListener(state -> {if (!state.isDone()) {return;}DistributedStagesScheduler distributedStagesScheduler;// synchronize to wait on distributed scheduler creation if it is currently in processsynchronized (this) {distributedStagesScheduler = this.distributedStagesScheduler.get();}if (state == QueryState.FINISHED) {// 如果状态机的状态被设置为FINISHED,就取消所有正在调度的StagescoordinatorStagesScheduler.cancel();if (distributedStagesScheduler != null) {distributedStagesScheduler.cancel();}// 通过StageManager完成stageManager.finish();}else if (state == QueryState.FAILED) {coordinatorStagesScheduler.abort();if (distributedStagesScheduler != null) {distributedStagesScheduler.abort();}stageManager.abort();}queryStateMachine.updateQueryInfo(Optional.ofNullable(getStageInfo()));});// 调度Stages执行coordinatorStagesScheduler.schedule();Optional<DistributedStagesScheduler> distributedStagesScheduler = createDistributedStagesScheduler(currentAttempt.get());distributedStagesScheduler.ifPresent(scheduler -> distributedStagesSchedulingTask = executor.submit(scheduler::schedule, null));}
}
DistributedStagesScheduler::schedule()
DistributedStagesScheduler的创建
负责调度所有的SqlStages执行。
public static PipelinedDistributedStagesScheduler create(QueryStateMachine queryStateMachine, // Query级别的状态机SplitSchedulerStats schedulerStats, // 记录Splits的调度信息NodeScheduler nodeScheduler, // 负责为Split分配合适的Worker NodeNodePartitioningManager nodePartitioningManager, // 提供获取对数据页Page进行Partitioning相关信息StageManager stageManager, // 管理所有的StagesCoordinatorStagesScheduler coordinatorStagesScheduler, // 负责调度所有的Stages到Coordinator结点ExecutionPolicy executionPolicy, // 执行策略器,AllAtOnceExecutionPolicy和PhasedExecutionPolicyFailureDetector failureDetector,ScheduledExecutorService executor, // Stages调度时的线程池SplitSourceFactory splitSourceFactory, // 创建Source Splits的工厂类int splitBatchSize, // 一次调度的最大Splits数量DynamicFilterService dynamicFilterService,TableExecuteContextManager tableExecuteContextManager,RetryPolicy retryPolicy,int attempt){// 由于DistributedStagesScheduler是负责Stages的调度器,这有别与QueryStateMachine的状态,因此这里要创建一个独立的状态机// 负责维护PipelinedDistributedStagesScheduler的状态DistributedStagesSchedulerStateMachine stateMachine = new DistributedStagesSchedulerStateMachine(queryStateMachine.getQueryId(), executor);// 使用Map以PartitioningHandle缓存所有的NodePartitionMap实例,由于PlanFragment对应的PartitioningHandle实例相同// 因此可以避免干次生成NodePartitionMap实例Map<PartitioningHandle, NodePartitionMap> partitioningCacheMap = new HashMap<>();// 根据具体的Connector提供的PartitioningHandle,生成NodePartitonMap:// NodePartitonMap记录了WorkerNode -> PartitionId的映射关系,它的生成可以由Connector提供,// 例如IcebergPartitioningHandle,也可以使用系统默认的实现SystemPartitioningHandle。// 如何生成NodePartitionMap实例,见后面的子章节。Function<PartitioningHandle, NodePartitionMap> partitioningCache = partitioningHandle ->partitioningCacheMap.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(queryStateMachine.getSession(), handle));// 为每一个PlanFragment实例,创建Bucket -> PartitionId的映射// Butcket即桶,类似Hive中的Bucket概念,是对一个数据Partition中的数据的进一步细化,因此一个Partition会包含多个bucketsMap<PlanFragmentId, Optional<int[]>> bucketToPartitionMap = createBucketToPartitionMap(coordinatorStagesScheduler.getBucketToPartitionForStagesConsumedByCoordinator(),stageManager,partitioningCache);// 为每一个PlanFragment创建OutputBufferManager实例,用于创建和维护这个Fragment的输出缓存区// OutputBufferManager分根据PartitioningHandle的不同类型,创建不一样的OutputBuffers,一共有如下三种:// BufferType type = // if partitioningHandle.equals(FIXED_BROADCAST_DISTRIBUTION) then BROADCAST;// else if (partitioningHandle.equals(FIXED_ARBITRARY_DISTRIBUTION) then ARBITRARY;// else PARTITIONED;Map<PlanFragmentId, OutputBufferManager> outputBufferManagers = createOutputBufferManagers(coordinatorStagesScheduler.getOutputBuffersForStagesConsumedByCoordinator(),stageManager,bucketToPartitionMap);TaskLifecycleListener coordinatorTaskLifecycleListener = coordinatorStagesScheduler.getTaskLifecycleListener();if (retryPolicy != RetryPolicy.NONE) {// when retries are enabled only close exchange clients on coordinator when the query is finishedTaskLifecycleListenerBridge taskLifecycleListenerBridge = new TaskLifecycleListenerBridge(coordinatorTaskLifecycleListener);coordinatorTaskLifecycleListener = taskLifecycleListenerBridge;stateMachine.addStateChangeListener(state -> {if (state == DistributedStagesSchedulerState.FINISHED) {taskLifecycleListenerBridge.notifyNoMoreSourceTasks();}});}// 为所有的要调度的Stages创建对应的PipelinedStageExecution实例,每一个PipelinedStageExecution实例则负责各自的Stage的生命周期管理Map<StageId, PipelinedStageExecution> stageExecutions = new HashMap<>();for (SqlStage stage : stageManager.getDistributedStagesInTopologicalOrder()) {Optional<SqlStage> parentStage = stageManager.getParent(stage.getStageId());// TaskLifecycleListener提供了为Stage创建任务的接口TaskLifecycleListener taskLifecycleListener;if (parentStage.isEmpty() || parentStage.get().getFragment().getPartitioning().isCoordinatorOnly()) {// output will be consumed by coordinator// parentStage是Root或是PlanFragment的分区策略是仅位于Coordiantor时,设置这个Stage的生命周期为CoordiatortaskLifecycleListener = coordinatorTaskLifecycleListener;}else {// 非Root Stage时,则获取已经绑定了的实例StageId parentStageId = parentStage.get().getStageId();PipelinedStageExecution parentStageExecution = requireNonNull(stageExecutions.get(parentStageId), () -> "execution is null for stage: " + parentStageId);taskLifecycleListener = parentStageExecution.getTaskLifecycleListener();}PlanFragment fragment = stage.getFragment();// 创建PipelinedStageExecution,负责执行调度&执行当前Stage,会为每一个Partition创建RemoteTask实例,并调度到相应的Worker Node执行PipelinedStageExecution stageExecution = createPipelinedStageExecution(stageManager.get(fragment.getId()),outputBufferManagers,taskLifecycleListener,failureDetector,executor,bucketToPartitionMap.get(fragment.getId()),attempt);stageExecutions.put(stage.getStageId(), stageExecution);}ImmutableMap.Builder<StageId, StageScheduler> stageSchedulers = ImmutableMap.builder();for (PipelinedStageExecution stageExecution : stageExecutions.values()) {List<PipelinedStageExecution> children = stageManager.getChildren(stageExecution.getStageId()).stream().map(stage -> requireNonNull(stageExecutions.get(stage.getStageId()), () -> "stage execution not found for stage: " + stage)).collect(toImmutableList());// 每一个StageExecution实例,创建对应的StageScheduler实例,负责当前Stage的调度执行,Trino实现了几个不同实现类:// FixedSourcePartitionedScheduler// FixedCountScheduler// () -> SourcePartitionedSchedulerStageScheduler scheduler = createStageScheduler(queryStateMachine,stageExecution,splitSourceFactory,children,partitioningCache,nodeScheduler,nodePartitioningManager,splitBatchSize,dynamicFilterService,executor,tableExecuteContextManager);stageSchedulers.put(stageExecution.getStageId(), scheduler);}// 创建PipelinedDistributedStagesScheduler实例,负责所有的Stages的调度执行PipelinedDistributedStagesScheduler distributedStagesScheduler = new PipelinedDistributedStagesScheduler(stateMachine,queryStateMachine,schedulerStats,stageManager,executionPolicy.createExecutionSchedule(stageExecutions.values()),stageSchedulers.build(),ImmutableMap.copyOf(stageExecutions),dynamicFilterService);distributedStagesScheduler.initialize();return distributedStagesScheduler;}/*** 为每一个PlanFragment计算bucketToPartition的映射关系。*/private static Map<PlanFragmentId, Optional<int[]>> createBucketToPartitionMap(Map<PlanFragmentId, Optional<int[]>> bucketToPartitionForStagesConsumedByCoordinator,StageManager stageManager,Function<PartitioningHandle, NodePartitionMap> partitioningCache){ImmutableMap.Builder<PlanFragmentId, Optional<int[]>> result = ImmutableMap.builder();// 忽略,只有在Coordinator上调度时,才会有值result.putAll(bucketToPartitionForStagesConsumedByCoordinator);for (SqlStage stage : stageManager.getDistributedStagesInTopologicalOrder()) {PlanFragment fragment = stage.getFragment();// Optional<int[]> bucketToPartition = getBucketToPartition(fragment.getPartitioning(), partitioningCache, fragment.getRoot(), fragment.getRemoteSourceNodes());for (SqlStage childStage : stageManager.getChildren(stage.getStageId())) {result.put(childStage.getFragment().getId(), bucketToPartition);}}return result.build();}private static Optional<int[]> getBucketToPartition(PartitioningHandle partitioningHandle,Function<PartitioningHandle, NodePartitionMap> partitioningCache,PlanNode fragmentRoot,List<RemoteSourceNode> remoteSourceNodes){if (partitioningHandle.equals(SOURCE_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {// SOURCE_DISTRIBUTION表示一个TableScan算子,而SCALED_WRITER_DISTRIBUTION表示Table Write算子// 因此这种类型的PlanFragment只会有一个分桶return Optional.of(new int[1]);}else if (searchFrom(fragmentRoot).where(node -> node instanceof TableScanNode).findFirst().isPresent()) {if (remoteSourceNodes.stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {return Optional.empty();}else {// remote source requires nodePartitionMap// remote source类型的算子,需要从上游的PlanFragment读取分区的数据,因此bucket到partition的映射关系,需要// 根据绑定的partitioningHandle得到,partitioningCache在之前已经被初始化过了NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);return Optional.of(nodePartitionMap.getBucketToPartition());}}else {// 其它类型,例如ARBITRARY_DISTRIBUTION、FIXED_HASH_DISTRIBUTION等,计算过程同remote source相似NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);List<InternalNode> partitionToNode = nodePartitionMap.getPartitionToNode();// todo this should asynchronously wait a standard timeout period before failingcheckCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");return Optional.of(nodePartitionMap.getBucketToPartition());}}
}
创建NodePartitioningMap实例,维护bucket -> Partition -> Node的映射关系
此实例保存了两个重要的数据结构:
partitionToNode:Data Partition -> Worker Node的映射集合
bucketToPartition:Data Bucket -> Data Partition的映射集合
根据上面两个Map变量,可以做到根据分区键,计算每一个数据行的Bucket ID,就可以知道这一行数据归于哪个Partition,
进而知道应该分布到哪个Worker Node上
public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHandle partitioningHandle){requireNonNull(session, "session is null");requireNonNull(partitioningHandle, "partitioningHandle is null");if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {// 返回系统默认的对象return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getNodePartitionMap(session, nodeScheduler);}// 获取Connector自己实现的Bucket -> Node的映射集合,Connector可以实现接口,定义buckets的数量,以及构建bucket到worker node映射// 由于我们讨论的Iceberg Connector,因此会createArbitraryBucketToNode(...)方法得到实例ConnectorBucketNodeMap connectorBucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle);// safety check for crazy partitioningcheckArgument(connectorBucketNodeMap.getBucketCount() < 1_000_000, "Too many buckets in partitioning: %s", connectorBucketNodeMap.getBucketCount());List<InternalNode> bucketToNode;if (connectorBucketNodeMap.hasFixedMapping()) {bucketToNode = getFixedMapping(connectorBucketNodeMap);}else {CatalogName catalogName = partitioningHandle.getConnectorId().orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));// Create a bucket to node mapping. Consecutive buckets are assigned// to shuffled nodes (e.g "1 -> node2, 2 -> node1, 3 -> node2, 4 -> node1, ...").// 这里必然有这样的不等式:buckets的数量 >= 可用的Workers的数量// Iceberg Connector仅仅定义了buckets数量,没有定义bucket到node映射关系,并且buckets的数量=活跃worker数量bucketToNode = createArbitraryBucketToNode(nodeScheduler.createNodeSelector(session, Optional.of(catalogName)).allNodes(),connectorBucketNodeMap.getBucketCount());}// 前面创建了bucket到worker的映射关系,下面就要构建Bucket与Partition的关系// 创建一个数组,大小为Buckets的数量,同时bucketToPartition[i]存放的是对应的PartitionIdint[] bucketToPartition = new int[connectorBucketNodeMap.getBucketCount()];// BiMap,保证keys和values都各自不重复,也就意味着一个Worker Node唯一对应一个PartitionBiMap<InternalNode, Integer> nodeToPartition = HashBiMap.create();int nextPartitionId = 0; // 初始值for (int bucket = 0; bucket < bucketToNode.size(); bucket++) {InternalNode node = bucketToNode.get(bucket);// bucketToNode中可能会存在重复的Value,即多个Bucket映射到多个Worker NodeInteger partitionId = nodeToPartition.get(node);if (partitionId == null) {// 如果partitionId不存在,即找到了一个新的Worker,那么就递增partitionId// 不难看出在Trino内部,一个WorkerNode就是一个PartitionpartitionId = nextPartitionId++;nodeToPartition.put(node, partitionId);}// 记录bucketId到PartitionId的映射bucketToPartition[bucket] = partitionId;}// 收集所有的WorkerNodeList<InternalNode> partitionToNode = IntStream.range(0, nodeToPartition.size()).mapToObj(partitionId -> nodeToPartition.inverse().get(partitionId)).collect(toImmutableList());// 返回实例return new NodePartitionMap(partitionToNode, bucketToPartition, getSplitToBucket(session, partitioningHandle));}
创建StageScheduler实例,负责一个Stage的调度执行
private static class PipelinedDistributedStagesSchedulerimplements DistributedStagesScheduler{private static StageScheduler createStageScheduler(QueryStateMachine queryStateMachine,PipelinedStageExecution stageExecution,SplitSourceFactory splitSourceFactory,List<PipelinedStageExecution> childStageExecutions,Function<PartitioningHandle, NodePartitionMap> partitioningCache,NodeScheduler nodeScheduler,NodePartitioningManager nodePartitioningManager,int splitBatchSize,DynamicFilterService dynamicFilterService,ScheduledExecutorService executor,TableExecuteContextManager tableExecuteContextManager){Session session = queryStateMachine.getSession();PlanFragment fragment = stageExecution.getFragment();PartitioningHandle partitioningHandle = fragment.getPartitioning();// 尝试为当前的Fragment,为每一个TableScanNode创建一个SplitSource实例,用于对数据源的数据进行切分,生成一系列的数据片段。// 对于Iceberg Connector来说,就是对DataFile进行切分,返回一批IcebergSplit。// SplitSource的提供的接口的最终调用会代理到IcebergSplitSource。// 在创建的过程中还会涉及到SplitManager的对象,不过不在这里解析了。Map<PlanNodeId, SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);if (!splitSources.isEmpty()) {queryStateMachine.addStateChangeListener(new StateChangeListener<>(){private final AtomicReference<Collection<SplitSource>> splitSourcesReference = new AtomicReference<>(splitSources.values());@Overridepublic void stateChanged(QueryState newState){if (newState.isDone()) {// ensure split sources are closed and release memoryCollection<SplitSource> sources = splitSourcesReference.getAndSet(null);if (sources != null) {closeSplitSources(sources);}}}});}if (partitioningHandle.equals(SOURCE_DISTRIBUTION)) {// 如果当前PlanFragment的分区类型是SOURCE_DISTRIBUTION,说明这个Fragment是上游的SubPlan,负责从数据源加载数据// nodes are selected dynamically based on the constraints of the splits and the system loadEntry<PlanNodeId, SplitSource> entry = getOnlyElement(splitSources.entrySet());PlanNodeId planNodeId = entry.getKey();SplitSource splitSource = entry.getValue();Optional<CatalogName> catalogName = Optional.of(splitSource.getCatalogName()).filter(catalog -> !isInternalSystemConnector(catalog));NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, catalogName);// placementPolicy负责根据nodelSelector的实现,为Split分配合适的WorkerNode,SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stageExecution::getAllTasks);checkArgument(!fragment.getStageExecutionDescriptor().isStageGroupedExecution());// 返回一个封装了SourcePartitionedScheduler实例的对象return newSourcePartitionedSchedulerAsStageScheduler(stageExecution,planNodeId,splitSource,placementPolicy,splitBatchSize,dynamicFilterService,tableExecuteContextManager,() -> childStageExecutions.stream().anyMatch(PipelinedStageExecution::isAnyTaskBlocked));}else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {// ...return scheduler;}else {// 如果不是包含TableScan的PlanFragment,比如是一个JOIN类型的Fragment,它存在如下三种情况// left is Source, right is RemoteSource// left is RemoteSource, right is RemoteSource// left is Source, right is Sourceif (!splitSources.isEmpty()) {// contains local sourceList<PlanNodeId> schedulingOrder = fragment.getPartitionedSources();Optional<CatalogName> catalogName = partitioningHandle.getConnectorId();checkArgument(catalogName.isPresent(), "No connector ID for partitioning handle: %s", partitioningHandle);List<ConnectorPartitionHandle> connectorPartitionHandles;boolean groupedExecutionForStage = fragment.getStageExecutionDescriptor().isStageGroupedExecution();// 如果一个Stage被标记为Grouped,这个Stage必须是被Partitioning了,因此可以等价地认为// 一个Group就是一个Bucket,因此这个Group中的Splits都对应同一个Partition,又对应同一个Worker Nodeif (groupedExecutionForStage) {connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);checkState(!ImmutableList.of(NOT_PARTITIONED).equals(connectorPartitionHandles));}else {// 如果不是分组connectorPartitionHandles = ImmutableList.of(NOT_PARTITIONED);}BucketNodeMap bucketNodeMap;List<InternalNode> stageNodeList;if (fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {// no remote sourceboolean dynamicLifespanSchedule = fragment.getStageExecutionDescriptor().isDynamicLifespanSchedule();bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, dynamicLifespanSchedule);// verify execution is consistent with planner's decision on dynamic lifespan scheduleverify(bucketNodeMap.isDynamic() == dynamicLifespanSchedule);// 如果Fragment仅包含本地的TableScanNode,那么所有可用的Worker结点,都是当前Stage可以被调度执行的结点// 因此stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, catalogName).allNodes());Collections.shuffle(stageNodeList);}else {// cannot use dynamic lifespan scheduleverify(!fragment.getStageExecutionDescriptor().isDynamicLifespanSchedule());// remote source requires nodePartitionMapNodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);if (groupedExecutionForStage) {// 如果是Grouped Stage,则需要M个不同的ConnectorPartitionHandle实例,用来计算BucketID,// 同时M == Buckets的数量,才能保存每一个BucketId都对应不同的分区。checkState(connectorPartitionHandles.size() == nodePartitionMap.getBucketToPartition().length);}stageNodeList = nodePartitionMap.getPartitionToNode();bucketNodeMap = nodePartitionMap.asBucketNodeMap();}// 在这种情况下,Buckets的数量是固定的,因此数据源的分区数量也是固定的,因此创建FixedSourcePartitionedScheduler实例return new FixedSourcePartitionedScheduler(stageExecution,splitSources,fragment.getStageExecutionDescriptor(),schedulingOrder,stageNodeList,bucketNodeMap,splitBatchSize,getConcurrentLifespansPerNode(session),nodeScheduler.createNodeSelector(session, catalogName),connectorPartitionHandles,dynamicFilterService,tableExecuteContextManager);}else {// all sources are remote// 如果都是RemoteSources Plan Node,即要读取的数据来自上游的OutputBufers,// 因此这里Partitions的数量,取决于上游,为当前的Stage创建分区任务的数量也是确定的// 例如当Buckets数量 == Partitions数量 == Node数量时,同一个Stage的不同分区上的任务,会发送到不同的WorkerNode;// 但如果Buckets数量多于node数量,一个Stage的多个分区可能会同时运行在一个Node上面NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);List<InternalNode> partitionToNode = nodePartitionMap.getPartitionToNode();// todo this should asynchronously wait a standard timeout period before failingcheckCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");return new FixedCountScheduler(stageExecution, partitionToNode);}}}}
DistributedStagesScheduler的调度
private static class PipelinedDistributedStagesSchedulerimplements DistributedStagesScheduler{@Overridepublic void schedule(){// 调度开始checkState(started.compareAndSet(false, true), "already started");try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {while (!executionSchedule.isFinished()) {List<ListenableFuture<Void>> blockedStages = new ArrayList<>();// 获取要调度的Stages,默认配置下,会调度所有的Stages运行,而不考虑Stages之间的依赖for (PipelinedStageExecution stageExecution : executionSchedule.getStagesToSchedule()) {// 由StageExecution实例代理调度绑定的Stage执行stageExecution.beginScheduling();// perform some scheduling work,异步ScheduleResult result = stageSchedulers.get(stageExecution.getStageId()).schedule();// modify parent and children based on the results of the schedulingif (result.isFinished()) {// 如果Stage完成了,那么就设置完成状态stageExecution.schedulingComplete();}else if (!result.getBlocked().isDone()) {// 如果Stage的状态为BLOCKED,可能是由于前置Stage还没有数据输出blockedStages.add(result.getBlocked());}schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());if (result.getBlockedReason().isPresent()) {switch (result.getBlockedReason().get()) {case WRITER_SCALING:// no-opbreak;case WAITING_FOR_SOURCE:schedulerStats.getWaitingForSource().update(1);break;case SPLIT_QUEUES_FULL:schedulerStats.getSplitQueuesFull().update(1);break;case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:case NO_ACTIVE_DRIVER_GROUP:break;default:throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());}}}// wait for a state change and then schedule again,如果还有被BLOCKED的Stage,则需要进行超时检测if (!blockedStages.isEmpty()) {try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);}for (ListenableFuture<Void> blockedStage : blockedStages) {blockedStage.cancel(true);}}}for (PipelinedStageExecution stageExecution : stageExecutions.values()) {PipelinedStageExecution.State state = stageExecution.getState();if (state != SCHEDULED && state != RUNNING && state != FLUSHING && !state.isDone()) {throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stageExecution.getStageId(), state));}}}catch (Throwable t) {fail(t, Optional.empty());}finally {RuntimeException closeError = new RuntimeException();for (StageScheduler scheduler : stageSchedulers.values()) {try {scheduler.close();}catch (Throwable t) {fail(t, Optional.empty());// Self-suppression not permittedif (closeError != t) {closeError.addSuppressed(t);}}}}}}
FixedSourcePartitionedScheduler::schedule()
GROUP_WIDE,Split的Life Cycle为Task Group级别,只会由中继Stage/PlanFragment的产生,其对应的Split用于读取上游已经被Partitioned的数据(因此可以简单地认为一个Group,就是一个Data Partition)。当有多个SourceScheduler调度Splits时,同一个Source上
的调度策略是按GroupId顺序高度,且同一个时刻只能调度一个Group的Splits执行;而其它SourceScheduler可以并行地调度不同的Group。TASK_WIDE,Split的Life Cycle为Task级别,可以先简单地认为就是Table Scan Stage执行时的Split的LifeCycle。每一个SourceScheduler之间互相不影响,只看当前SqlTask的剩余资源来决定是否要调度新的Splits。
public class FixedSourcePartitionedSchedulerimplements StageScheduler
{@Overridepublic ScheduleResult schedule(){// schedule a task on every node in the distributionList<RemoteTask> newTasks = ImmutableList.of();if (scheduledTasks.isEmpty()) { // 如果这个Stage还没有调度过任务,就为所有的分区创建RemoteTask任务ImmutableList.Builder<RemoteTask> newTasksBuilder = ImmutableList.builder();for (InternalNode node : nodes) { // 遍历当前Stage所有可用的Worker Nodes// 一个Node,就对应唯一一个分区Optional<RemoteTask> task = stageExecution.scheduleTask(node, partitionIdAllocator.getNextId(), ImmutableMultimap.of(), ImmutableMultimap.of());if (task.isPresent()) {scheduledTasks.put(node, task.get());newTasksBuilder.add(task.get());}}newTasks = newTasksBuilder.build();}boolean allBlocked = true;List<ListenableFuture<Void>> blocked = new ArrayList<>();BlockedReason blockedReason = BlockedReason.NO_ACTIVE_DRIVER_GROUP;if (groupedLifespanScheduler.isPresent()) {// Start new driver groups on the first scheduler if necessary,// i.e. when previous ones have finished execution (not finished scheduling).//// Invoke schedule method to get a new SettableFuture every time.// Reusing previously returned SettableFuture could lead to the ListenableFuture retaining too many listeners.blocked.add(groupedLifespanScheduler.get().schedule(sourceSchedulers.get(0)));}int splitsScheduled = 0;// SourceSchedulers保存了每一个Source的调度器实例,即SourcePartitionedScheduler实例,它们负责调度各自的SplitsIterator<SourceScheduler> schedulerIterator = sourceSchedulers.iterator();List<Lifespan> driverGroupsToStart = ImmutableList.of();boolean shouldInvokeNoMoreDriverGroups = false;while (schedulerIterator.hasNext()) {SourceScheduler sourceScheduler = schedulerIterator.next();// 如果是分组调度,意味着底层调度Splits的策略是按分组来的,只有当一个SourceScheduler某个分组的Splits调度完成了,// 下一个SourceScheduler才能调度相应分组的Splits,而其它的分组被BLOCKED,真正第一个SourceScheduler又完成有其它分组上的调度for (Lifespan lifespan : driverGroupsToStart) {sourceScheduler.startLifespan(lifespan, partitionHandleFor(lifespan));}if (shouldInvokeNoMoreDriverGroups) {sourceScheduler.noMoreLifespans();}// 调用FixedSourcePartitionedScheduler::schedule()方法ScheduleResult schedule = sourceScheduler.schedule();// 累加当前Stage总共调度的Splits数量splitsScheduled += schedule.getSplitsScheduled();if (schedule.getBlockedReason().isPresent()) {blocked.add(schedule.getBlocked());blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());}else {verify(schedule.getBlocked().isDone(), "blockedReason not provided when scheduler is blocked");allBlocked = false;}// 如果SourceScheduler instanceOf AsGroupedSourceScheduler,那么drainCompletedLifespans()方法总是会返回对应的LifeSpan对象driverGroupsToStart = sourceScheduler.drainCompletedLifespans();if (schedule.isFinished()) {stageExecution.schedulingComplete(sourceScheduler.getPlanNodeId());schedulerIterator.remove();sourceScheduler.close();shouldInvokeNoMoreDriverGroups = true;}else {shouldInvokeNoMoreDriverGroups = false;}}if (allBlocked) {// 如果所有的SourcePartitionedScheduler被BLOCKED了,那么就返回Blocked信息return new ScheduleResult(sourceSchedulers.isEmpty(), newTasks, whenAnyComplete(blocked), blockedReason, splitsScheduled);}else {// 有正在运行的SourcePartitionedScheduler,就返回已经调度的Splits信息return new ScheduleResult(sourceSchedulers.isEmpty(), newTasks, splitsScheduled);}}
}
SourcePartitionedScheduler::schedule()
最底层的Splits调度器,负责调度、执行SOURCE类型的Stage。
public class SourcePartitionedSchedulerimplements SourceScheduler
{@Overridepublic synchronized ScheduleResult schedule(){dropListenersFromWhenFinishedOrNewLifespansAdded();int overallSplitAssignmentCount = 0;ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();boolean anyBlockedOnPlacements = false;boolean anyBlockedOnNextSplitBatch = false;boolean anyNotBlocked = false;// 遍历每一个ScheduleGroup实例,一个ScheduleGroup对应了// for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {Lifespan lifespan = entry.getKey();ScheduleGroup scheduleGroup = entry.getValue();Set<Split> pendingSplits = scheduleGroup.pendingSplits;if (scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS || scheduleGroup.state == ScheduleGroupState.DONE) {verify(scheduleGroup.nextSplitBatchFuture == null);}else if (pendingSplits.isEmpty()) {// try to get the next batchif (scheduleGroup.nextSplitBatchFuture == null) {// 实际上是通过IcebergConnectorSplitSource获取下一批要调度处理的Splits,// 注意通过splitBatchSize - pendingSplits.size()限制了最大被调度的Splits数量scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());long start = System.nanoTime();addSuccessCallback(scheduleGroup.nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start));}if (scheduleGroup.nextSplitBatchFuture.isDone()) {// 如果nextSplitBatchFuture完成,意味着拿到了Splits实例,因此就可以立即调度了SplitBatch nextSplits = getFutureValue(scheduleGroup.nextSplitBatchFuture);scheduleGroup.nextSplitBatchFuture = null;// 将所有的Splits添加到等待队列中pendingSplits.addAll(nextSplits.getSplits());if (nextSplits.isLastBatch()) {// 如果是最后一批要调度的Splits,则追加一个EmptySplit的实例,以便通知Worker Node上的SqlTask任务停止运行if (scheduleGroup.state == ScheduleGroupState.INITIALIZED && pendingSplits.isEmpty()) {// Add an empty split in case no splits have been produced for the source.// For source operators, they never take input, but they may produce output.// This is well handled by the execution engine.// However, there are certain non-source operators that may produce output without any input,// for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is ().// Scheduling an empty split kicks off necessary driver instantiation to make this work.pendingSplits.add(new Split(splitSource.getCatalogName(),new EmptySplit(splitSource.getCatalogName()),lifespan));}// 通知当前的SourceScheduler,不需要再调度了scheduleGroup.state = ScheduleGroupState.NO_MORE_SPLITS;}}else {overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);anyBlockedOnNextSplitBatch = true;continue;}}Multimap<InternalNode, Split> splitAssignment = ImmutableMultimap.of();if (!pendingSplits.isEmpty()) {if (!scheduleGroup.placementFuture.isDone()) {anyBlockedOnPlacements = true;continue;}if (scheduleGroup.state == ScheduleGroupState.INITIALIZED) {scheduleGroup.state = ScheduleGroupState.SPLITS_ADDED;}if (state == State.INITIALIZED) {state = State.SPLITS_ADDED;}// calculate placements for splits,为每一个Split计算应该被分发到哪个Worker NodeSplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);splitAssignment = splitPlacementResult.getAssignments();// remove splits with successful placementssplitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.overallSplitAssignmentCount += splitAssignment.size();// if not completed placed, mark scheduleGroup as blocked on placementif (!pendingSplits.isEmpty()) {scheduleGroup.placementFuture = splitPlacementResult.getBlocked();overallBlockedFutures.add(scheduleGroup.placementFuture);anyBlockedOnPlacements = true;}}// if no new splits will be assigned, update state and attach completion eventMultimap<InternalNode, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {scheduleGroup.state = ScheduleGroupState.DONE;if (!lifespan.isTaskWide()) {InternalNode node = ((BucketedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);}}// assign the splits with successful placementsoverallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));// Assert that "placement future is not done" implies "pendingSplits is not empty".// The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.// However, there are other reasons that could lead to this.// Note that `computeAssignments` is quite broken:// 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.// 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.// As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.if (scheduleGroup.nextSplitBatchFuture == null && scheduleGroup.pendingSplits.isEmpty() && scheduleGroup.state != ScheduleGroupState.DONE) {anyNotBlocked = true;}}// * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.// If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.// * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.// * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,// which may contain recently published splits. We must not ignore those.// * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.// Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.if ((state == State.NO_MORE_SPLITS || state == State.FINISHED) || (noMoreScheduleGroups && scheduleGroups.isEmpty() && splitSource.isFinished())) {switch (state) {case INITIALIZED:// We have not scheduled a single split so far.// But this shouldn't be possible. See usage of EmptySplit in this method.throw new IllegalStateException("At least 1 split should have been scheduled for this plan node");case SPLITS_ADDED:state = State.NO_MORE_SPLITS;Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();// Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split source.// TODO support grouped executiontableExecuteSplitsInfo.ifPresent(info -> {TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stageExecution.getStageId().getQueryId());tableExecuteContext.setSplitsInfo(info);});splitSource.close();// fall throughcase NO_MORE_SPLITS:state = State.FINISHED;whenFinishedOrNewLifespanAdded.set(null);// fall throughcase FINISHED:splitSource.getMetrics().ifPresent(stageExecution::updateConnectorMetrics);return new ScheduleResult(true,overallNewTasks.build(),overallSplitAssignmentCount);}throw new IllegalStateException("Unknown state");}if (anyNotBlocked) {return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);}if (anyBlockedOnNextSplitBatch&& scheduledTasks.isEmpty()&& dynamicFilterService.isCollectingTaskNeeded(stageExecution.getStageId().getQueryId(), stageExecution.getFragment())) {// schedule a task for collecting dynamic filters in case probe split generator is waiting for themcreateTaskOnRandomNode().ifPresent(overallNewTasks::add);}boolean anySourceTaskBlocked = this.anySourceTaskBlocked.getAsBoolean();if (anySourceTaskBlocked) {// Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.// In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock.dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment());}if (groupedExecution) {overallNewTasks.addAll(finalizeTaskCreationIfNecessary());}else if (anyBlockedOnPlacements && anySourceTaskBlocked) {// In a broadcast join, output buffers of the tasks in build source stage have to// hold onto all data produced before probe side task scheduling finishes,// even if the data is acknowledged by all known consumers. This is because// new consumers may be added until the probe side task scheduling finishes.//// As a result, the following line is necessary to prevent deadlock// due to neither build nor probe can make any progress.// The build side blocks due to a full output buffer.// In the meantime the probe side split cannot be consumed since// builder side hash table construction has not finished.overallNewTasks.addAll(finalizeTaskCreationIfNecessary());}ScheduleResult.BlockedReason blockedReason;if (anyBlockedOnNextSplitBatch) {blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;}else {blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;}overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);return new ScheduleResult(false,overallNewTasks.build(),nonCancellationPropagating(asVoid(whenAnyComplete(overallBlockedFutures))),blockedReason,overallSplitAssignmentCount);}
}
SqlTask的创建
SqlTask,运行在Worker Node上,每一个SqlTask对应 一个Stage中的一个分区,它负责处理这个分区上的所有Splits。
客户端通过/v1/task/{taskId}
,请求对应的Worker Node创建相应的任务实例。
@ResourceSecurity(INTERNAL_ONLY)@POST@Path("{taskId}")@Consumes(MediaType.APPLICATION_JSON)@Produces(MediaType.APPLICATION_JSON)public void createOrUpdateTask(@PathParam("taskId") TaskId taskId,TaskUpdateRequest taskUpdateRequest,@Context UriInfo uriInfo,@Suspended AsyncResponse asyncResponse){requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());if (injectFailure(session.getTraceToken(), taskId, RequestType.CREATE_OR_UPDATE_TASK, asyncResponse)) {return;}// 创建任务TaskInfo taskInfo = taskManager.updateTask(session,taskId,taskUpdateRequest.getFragment(),taskUpdateRequest.getSources(),taskUpdateRequest.getOutputIds(),taskUpdateRequest.getDynamicFilterDomains());if (shouldSummarize(uriInfo)) {taskInfo = taskInfo.summarize();}asyncResponse.resume(Response.ok().entity(taskInfo).build());}
SqlTaskManager::updateTask
public class SqlTaskManagerimplements TaskManager, Closeable
{private final LoadingCache<TaskId, SqlTask> tasks = CacheBuilder.newBuilder().build(CacheLoader.from(taskId -> createSqlTask(taskId,locationFactory.createLocalTaskLocation(taskId),nodeInfo.getNodeId(),queryContexts.getUnchecked(taskId.getQueryId()),sqlTaskExecutionFactory,taskNotificationExecutor,sqlTask -> finishedTaskStats.merge(sqlTask.getIoStats()),maxBufferSize,maxBroadcastBufferSize,failedTasks)));@Overridepublic TaskInfo updateTask(Session session,TaskId taskId,Optional<PlanFragment> fragment,List<TaskSource> sources,OutputBuffers outputBuffers,Map<DynamicFilterId, Domain> dynamicFilterDomains){try {return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, fragment, sources, outputBuffers, dynamicFilterDomains)).call();}catch (Exception e) {throwIfUnchecked(e);// impossible, doUpdateTask does not throw checked exceptionsthrow new RuntimeException(e);}}private TaskInfo doUpdateTask(Session session,TaskId taskId,Optional<PlanFragment> fragment,List<TaskSource> sources,OutputBuffers outputBuffers,Map<DynamicFilterId, Domain> dynamicFilterDomains){requireNonNull(session, "session is null");requireNonNull(taskId, "taskId is null");requireNonNull(fragment, "fragment is null");requireNonNull(sources, "sources is null");requireNonNull(outputBuffers, "outputBuffers is null");SqlTask sqlTask = tasks.getUnchecked(taskId); // 创建一个新的SqlTask实例QueryContext queryContext = sqlTask.getQueryContext();if (!queryContext.isMemoryLimitsInitialized()) {// 如果限制了当前Query运行时的内存,则需要更新相关的属性long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();long sessionQueryTotalMaxMemoryPerNode = getQueryMaxTotalMemoryPerNode(session).toBytes();// Session properties are only allowed to decrease memory limits, not increase themqueryContext.initializeMemoryLimits(resourceOvercommit(session),min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode),min(sessionQueryTotalMaxMemoryPerNode, queryMaxTotalMemoryPerNode));}// 更新SqlTask的心跳信息,实际上就是系统当前的时间// 每一个SqlTask的心跳信息,都会在查找或更新时,被更新,以保证能够根据上一次的心跳时间,判断它是不是失联了sqlTask.recordHeartbeat();// 更新SqlTask实例运行时的参数return sqlTask.updateTask(session, fragment, sources, outputBuffers, dynamicFilterDomains);}
SqlTask的更新
创建SqlTask实例时,或是Coordinator调了新的Splits时,会执行更新过程。
public class SqlTask
{/*** 此方法的所有参数,都来自客户端发送的TaskUpdateRequest的对象,因此在生成Worker端的执行任务时,此Fragment上的输入(Split)、* 输出(outputBuffers)都已经确定了。* session: 保存了Sql执行时的客户端侧会话信息* fragment: 当前SqlTask要执行的逻辑计划片段* sources: 当前SqlTask要读取的数据源的Split描述信息,这些Split要么读取remote source,要么读取table。* outputBuffers: SqlTask的输出缓存区队列,在Coordinator侧创建PipelinedStageExecution实例时就已经被确定了,一共有如下三种类型:* BroadcastOutputBufferManager:广播输出数据,只有一个Partition,因此只有一个buffer* ScaledOutputBufferManager:动态扩展输出Buffer的数量,因此buffers数量是不固定的,被用于写出数据的任务* PartitionedOutputBufferManager:按分区数量创建相同数量的outputBuffer,因此每一个Buffer都对应一个分区ID,供下游Stage* 消费。在当前的执行流程分析场景下,用到的是此类的实例。*/public TaskInfo updateTask(Session session,Optional<PlanFragment> fragment,List<TaskSource> sources,OutputBuffers outputBuffers,Map<DynamicFilterId, Domain> dynamicFilterDomains){try {// trace token must be set first to make sure failure injection for getTaskResults requests works as expectedsession.getTraceToken().ifPresent(traceToken::set);// The LazyOutput buffer does not support write methods, so the actual// output buffer must be established before drivers are created (e.g.// a VALUES query).outputBuffer.setOutputBuffers(outputBuffers);// assure the task execution is only created onceSqlTaskExecution taskExecution;synchronized (this) {// is task already complete?TaskHolder taskHolder = taskHolderReference.get();if (taskHolder.isFinished()) {return taskHolder.getFinalTaskInfo();}taskExecution = taskHolder.getTaskExecution();if (taskExecution == null) {checkState(fragment.isPresent(), "fragment must be present");// 创建SqlTaskExecution实例,负责在当前的Worker结点分析和执行fragment taskExecution = sqlTaskExecutionFactory.create(session,queryContext,taskStateMachine,outputBuffer,fragment.get(),this::notifyStatusChanged);taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));needsPlan.set(false);}}if (taskExecution != null) {// 一旦发现taskExecution实例,就将要处理的数据源Splits添加到等待队列中taskExecution.addSources(sources);// 同时更新dynamicFilter产生的(可以在处理Split时用于过滤数据的值集合)。taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);}}catch (Error e) {failed(e);throw e;}catch (RuntimeException e) {failed(e);}return getTaskInfo();}
}
SqlTaskExecution的创建
创建此实例时,会在创建过程中,生成真正可执行的物理执行计划实例LocalExecutionPlan。
public class SqlTaskExecutionFactory
{public SqlTaskExecution create(Session session,QueryContext queryContext,TaskStateMachine taskStateMachine,OutputBuffer outputBuffer,PlanFragment fragment,Runnable notifyStatusChanged){// 创建TaskContext实例,维护了当前SqlTask运行时的各种信息,例如各种metricsTaskContext taskContext = queryContext.addTaskContext(taskStateMachine,session,notifyStatusChanged,perOperatorCpuTimerEnabled,cpuTimerEnabled);LocalExecutionPlan localExecutionPlan;try (SetThreadName ignored = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {try {// planner是一个LocalExecutionPlanner类型的实例,用于将逻辑计划PlanFragment转换成本地可执行的// 物理执行计划LocalExecutionPlanlocalExecutionPlan = planner.plan(taskContext,fragment.getRoot(),TypeProvider.copyOf(fragment.getSymbols()),fragment.getPartitioningScheme(),fragment.getStageExecutionDescriptor(),fragment.getPartitionedSources(),outputBuffer);}catch (Throwable e) {// planning failedtaskStateMachine.failed(e);throwIfUnchecked(e);throw new RuntimeException(e);}}return createSqlTaskExecution(taskStateMachine,taskContext,outputBuffer,localExecutionPlan,taskExecutor,taskNotificationExecutor,splitMonitor);}
}
LocalExecutionPlanner::plan
将PlanFragment转换成本地可执行的物理计划LocalExecutionPlan。
public class LocalExecutionPlanner
{public LocalExecutionPlan plan(TaskContext taskContext,PlanNode plan,TypeProvider types,PartitioningScheme partitioningScheme,StageExecutionDescriptor stageExecutionDescriptor,List<PlanNodeId> partitionedSourceOrder,OutputBuffer outputBuffer){// 得到当前Fragment的输出布局(layout)List<Symbol> outputLayout = partitioningScheme.getOutputLayout();if (partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION) ||partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) ||partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION) ||partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) ||partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) {// 由于数据是基于Partition的,因此跳过return plan(taskContext, stageExecutionDescriptor, plan, outputLayout, types, partitionedSourceOrder, new TaskOutputFactory(outputBuffer));}// We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixedList<Integer> partitionChannels;List<Optional<NullableValue>> partitionConstants;List<Type> partitionChannelTypes;if (partitioningScheme.getHashColumn().isPresent()) {partitionChannels = ImmutableList.of(outputLayout.indexOf(partitioningScheme.getHashColumn().get()));partitionConstants = ImmutableList.of(Optional.empty());partitionChannelTypes = ImmutableList.of(BIGINT);}else {// 收集分区列的下标。对于常量分区值,则赋值-1partitionChannels = partitioningScheme.getPartitioning().getArguments().stream().map(argument -> {if (argument.isConstant()) {return -1;}return outputLayout.indexOf(argument.getColumn());}).collect(toImmutableList());// 收集分区常量值partitionConstants = partitioningScheme.getPartitioning().getArguments().stream().map(argument -> {if (argument.isConstant()) {return Optional.of(argument.getConstant());}return Optional.<NullableValue>empty();}).collect(toImmutableList());// 收集分区字段的类型partitionChannelTypes = partitioningScheme.getPartitioning().getArguments().stream().map(argument -> {if (argument.isConstant()) {return argument.getConstant().getType();}return types.get(argument.getColumn());}).collect(toImmutableList());}// 得到计算分区ID的函数,一般地,执行Read SQL时,它是一个BucketPartitionFunction的实例// PartitionFunction提供了计算分区ID的方法,getPartition(Page page, int position),即给定一个数据页中的某一行数据,// 而计算PartitionId的算法,一共内置如下几类:// SINGLE: Single partition can only have one bucket// HASH: HashBucketFunction,根据分区字段,计算得到HASH值,后面再将HASH值对partitions数量取余得到某行数据对应的分区ID// ROUND_ROBIN: 净某一行数据以顺序遍历地方式,分区分区IDPartitionFunction partitionFunction = nodePartitioningManager.getPartitionFunction(taskContext.getSession(), partitioningScheme, partitionChannelTypes);OptionalInt nullChannel = OptionalInt.empty();Set<Symbol> partitioningColumns = partitioningScheme.getPartitioning().getColumns();// partitioningColumns expected to have one column in the normal case, and zero columns when partitioning on a constant// 对于常量分区,则不需要额外的列;对于指定了分区字段的情况,则需要一个额外的分区列,来保存每一行的分区值(例如对于HASH分区算法,存储// 的是HASH值)。checkArgument(!partitioningScheme.isReplicateNullsAndAny() || partitioningColumns.size() <= 1);if (partitioningScheme.isReplicateNullsAndAny() && partitioningColumns.size() == 1) {nullChannel = OptionalInt.of(outputLayout.indexOf(getOnlyElement(partitioningColumns)));}return plan(taskContext,stageExecutionDescriptor,plan,outputLayout,types,partitionedSourceOrder,// 创建一个PartitionedOutputFactory类型的实例,它提供了创建PartitionedOutputOperator实例的方法。// 而PartitionedOutputOperator是此PlanFragment上一系列执行算子的最后一个,负责对Data Page按PartitioningColumns// 计算PartitionID,并扔进OutputBuffer中。operatorFactories.partitionedOutput(taskContext,partitionFunction,partitionChannels,partitionConstants,partitioningScheme.isReplicateNullsAndAny(),nullChannel,outputBuffer,maxPagePartitioningBufferSize));}public LocalExecutionPlan plan(TaskContext taskContext,StageExecutionDescriptor stageExecutionDescriptor,PlanNode plan, // PlanFragment的根结点,对应于此逻辑子计划的最上层NodeList<Symbol> outputLayout, // 此PlanFragment的结果的布局信息,实际上就是要输出的列符号TypeProvider types, // 用于描述 每一个Symbol的类型List<PlanNodeId> partitionedSourceOrder, // 保存了所有要输出的Source源的PlanNodeId,例如JOIN,有左、右两个SourceOutputFactory outputOperatorFactory){Session session = taskContext.getSession();// 保存了本地物理执行计划运行时的上下文信息LocalExecutionPlanContext context = new LocalExecutionPlanContext(taskContext, types);// 从PlanFragment的根逻辑计划结点plan开始访问,构建物理执行计划树,实际上就是一组作用于Split之上的Operators(Driver)PhysicalOperation physicalOperation = plan.accept(new Visitor(session, stageExecutionDescriptor), context);// 对齐逻辑计划的outputLayout和物理执行计划的输出layoutputFunction<Page, Page> pagePreprocessor = enforceLoadedLayoutProcessor(outputLayout, physicalOperation.getLayout());// 收集逻辑逻辑计划的输出字段的类型List<Type> outputTypes = outputLayout.stream().map(types::get).collect(toImmutableList());// 创建一个新的Driver。需要将OutputOperator与physicalOperaton串联到一个物理执行流水线中,分配一个新的PipelineId。// 其中physicalOperaton作为流水线的Source Operator,而OutputOperator作为流水线中的Output Operator。context.addDriverFactory(context.isInputDriver(),true, // 标识新的Driver的类型为,Outputnew PhysicalOperation(outputOperatorFactory.createOutputOperator(context.getNextOperatorId(),plan.getId(),outputTypes,pagePreprocessor,new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), isExchangeCompressionEnabled(session))),physicalOperation),context.getDriverInstanceCount());// notify operator factories that planning has completedcontext.getDriverFactories().stream().map(DriverFactory::getOperatorFactories).flatMap(List::stream).filter(LocalPlannerAware.class::isInstance).map(LocalPlannerAware.class::cast).forEach(LocalPlannerAware::localPlannerComplete);return new LocalExecutionPlan(context.getDriverFactories(), partitionedSourceOrder, stageExecutionDescriptor);}
}
SqlTaskExecution的构建
private SqlTaskExecution(TaskStateMachine taskStateMachine,TaskContext taskContext,OutputBuffer outputBuffer,LocalExecutionPlan localExecutionPlan,TaskExecutor taskExecutor,SplitMonitor splitMonitor,Executor notificationExecutor){this.taskStateMachine = requireNonNull(taskStateMachine, "taskStateMachine is null");this.taskId = taskStateMachine.getTaskId();this.taskContext = requireNonNull(taskContext, "taskContext is null");this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null");this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor is null");this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");this.splitMonitor = requireNonNull(splitMonitor, "splitMonitor is null");try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {// index driver factories// 从执行计划,得到Source结点IDSet<PlanNodeId> partitionedSources = ImmutableSet.copyOf(localExecutionPlan.getPartitionedSourceOrder());// 保存所有生成周期为Split级别的DriverSplitRunnerFactory实例ImmutableMap.Builder<PlanNodeId, DriverSplitRunnerFactory> driverRunnerFactoriesWithSplitLifeCycle = ImmutableMap.builder();// 保存所有生命周期为Task级别的DriverSplitRunnerFactory的实例ImmutableList.Builder<DriverSplitRunnerFactory> driverRunnerFactoriesWithTaskLifeCycle = ImmutableList.builder();// 保存所有生命周期为Group级别的DriverSplitRunnerFactory实例ImmutableList.Builder<DriverSplitRunnerFactory> driverRunnerFactoriesWithDriverGroupLifeCycle = ImmutableList.builder();for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {// 获取当前Driver的最上游的PlanNodeIdOptional<PlanNodeId> sourceId = driverFactory.getSourceId();if (sourceId.isPresent() && partitionedSources.contains(sourceId.get())) {// 如果这个Driver有输入,同时是一个分区类型的Source Node,那么这个Driver的生命周期就是与Split绑定的,即// 绑定的Split被处理完,那么这个Driver就没用了。driverRunnerFactoriesWithSplitLifeCycle.put(sourceId.get(), new DriverSplitRunnerFactory(driverFactory, true));}else {// 如果这个Driver是一个下游的Driver实例,switch (driverFactory.getPipelineExecutionStrategy()) {case GROUPED_EXECUTION:// 如果是GROUP LifeSpan,那么就需要每一个Drvier创建一个Runner Factory,添加到相应的等待队列中driverRunnerFactoriesWithDriverGroupLifeCycle.add(new DriverSplitRunnerFactory(driverFactory, false));break;case UNGROUPED_EXECUTION:// 如果是GROUP LifeSpan,那么就需要每一个Drvier创建一个RunnerdriverRunnerFactoriesWithTaskLifeCycle.add(new DriverSplitRunnerFactory(driverFactory, false));break;default:throw new UnsupportedOperationException();}}}this.driverRunnerFactoriesWithSplitLifeCycle = driverRunnerFactoriesWithSplitLifeCycle.build();this.driverRunnerFactoriesWithDriverGroupLifeCycle = driverRunnerFactoriesWithDriverGroupLifeCycle.build();this.driverRunnerFactoriesWithTaskLifeCycle = driverRunnerFactoriesWithTaskLifeCycle.build();this.pendingSplitsByPlanNode = this.driverRunnerFactoriesWithSplitLifeCycle.keySet().stream().collect(toImmutableMap(identity(), ignore -> new PendingSplitsForPlanNode()));this.status = new Status(taskContext,localExecutionPlan.getDriverFactories().stream().collect(toImmutableMap(DriverFactory::getPipelineId, DriverFactory::getPipelineExecutionStrategy)));this.schedulingLifespanManager = new SchedulingLifespanManager(localExecutionPlan.getPartitionedSourceOrder(), localExecutionPlan.getStageExecutionDescriptor(), this.status);checkArgument(this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(partitionedSources),"Fragment is partitioned, but not all partitioned drivers were found");// Pre-register Lifespans for ungrouped partitioned drivers in case they end up get no splits.for (Entry<PlanNodeId, DriverSplitRunnerFactory> entry : this.driverRunnerFactoriesWithSplitLifeCycle.entrySet()) {PlanNodeId planNodeId = entry.getKey();DriverSplitRunnerFactory driverSplitRunnerFactory = entry.getValue();if (driverSplitRunnerFactory.getPipelineExecutionStrategy() == UNGROUPED_EXECUTION) {this.schedulingLifespanManager.addLifespanIfAbsent(Lifespan.taskWide());this.pendingSplitsByPlanNode.get(planNodeId).getLifespan(Lifespan.taskWide());}}// don't register the task if it is already completed (most likely failed during planning above)if (!taskStateMachine.getState().isDone()) {taskHandle = createTaskHandle(taskStateMachine, taskContext, outputBuffer, localExecutionPlan, taskExecutor);}else {taskHandle = null;}// 追加一个Listener,当前Outputbuffer处于FINISHED状态时,检查当前的SqkTaskExecution是否完成了。outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this));}}
SqlTaskExecution::addSources
addSources()方法,用于将客户端(Coordinator)发送的新Splits,按类型添加到相应调度队列中,并尝试调度之。
public class SqlTaskExecution
{public void addSources(List<TaskSource> sources){requireNonNull(sources, "sources is null");checkState(!Thread.holdsLock(this), "Cannot add sources while holding a lock on the %s", getClass().getSimpleName());try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {// update our record of sources and schedule drivers for new partitioned splits// 返回的updatedUnpartitionedSources集合,包含了所有非分区类型的、未处理完成的Splits// 而分区类型的Splits则通过SqlTaskExecution::schedulePartitionedSource(..)方法被调度Map<PlanNodeId, TaskSource> updatedUnpartitionedSources = updateSources(sources);// 调度所有的非分区类型的Splits// tell existing drivers about the new splits; it is safe to update drivers// multiple times and out of order because sources contain full record of// the unpartitioned splitsfor (WeakReference<Driver> driverReference : drivers) {Driver driver = driverReference.get();// the driver can be GCed due to a failure or a limitif (driver == null) {// remove the weak reference from the list to avoid a memory leak// NOTE: this is a concurrent safe operation on a CopyOnWriteArrayListdrivers.remove(driverReference);continue;}Optional<PlanNodeId> sourceId = driver.getSourceId();if (sourceId.isEmpty()) {continue;}TaskSource sourceUpdate = updatedUnpartitionedSources.get(sourceId.get());if (sourceUpdate == null) {continue;}driver.updateSource(sourceUpdate);}// we may have transitioned to no more splits, so check for completioncheckTaskCompletion();}}
}
SqlTask的调度&执行
SqlTaskExecution::schedulePartitionedSource
SqlTask每收到新的Splits,就调用
schedulePartitionedSource(TaskSource)
方法调度Splits。
private synchronized void schedulePartitionedSource(TaskSource sourceUpdate){mergeIntoPendingSplits(sourceUpdate.getPlanNodeId(), sourceUpdate.getSplits(), sourceUpdate.getNoMoreSplitsForLifespan(), sourceUpdate.isNoMoreSplits());while (true) {// SchedulingLifespanManager tracks how far each Lifespan has been scheduled. Here is an example.// Let's say there are 4 source pipelines/nodes: A, B, C, and D, in scheduling order.// And we're processing 3 concurrent lifespans at a time. In this case, we could have//// * Lifespan 10: A B [C] D; i.e. Pipeline A and B has finished scheduling (but not necessarily finished running).// * Lifespan 20: [A] B C D// * Lifespan 30: A [B] C D//// To recap, SchedulingLifespanManager records the next scheduling source node for each lifespan.// schedulingLifespanManager维护了两种类型的LifeSpan:// Task Wide:Split的运行时辐射范围对应于split/task lifecycle,与Group的Source Pipeline是互斥的,// 同一时刻只能有一个task wide的pipeline和一个group wide的pipeline并行调度执行。// Task Group Wide:Split的运行辐射范围对应于Driver Group lifecycle,如果有多个Source Pipeline,那么对于// 相同的Group(就是一个Partition),同一时刻只能有一个在调度&执行的Source Pipeline;对于// 不同的Group,可以并行调度。//// 获取还需要调度执行的LifeSpan,调度属于这个范围内的Splits执行。Iterator<SchedulingLifespan> activeLifespans = schedulingLifespanManager.getActiveLifespans();boolean madeProgress = false;while (activeLifespans.hasNext()) {SchedulingLifespan schedulingLifespan = activeLifespans.next();Lifespan lifespan = schedulingLifespan.getLifespan();// Continue using the example from above. Let's say the sourceUpdate adds some new splits for source node B.//// For lifespan 30, it could start new drivers and assign a pending split to each.// Pending splits could include both pre-existing pending splits, and the new ones from sourceUpdate.// If there is enough driver slots to deplete pending splits, one of the below would happen.// * If it is marked that all splits for node B in lifespan 30 has been received, SchedulingLifespanManager// will be updated so that lifespan 30 now processes source node C. It will immediately start processing them.// * Otherwise, processing of lifespan 30 will be shelved for now.//// It is possible that the following loop would be a no-op for a particular lifespan.// It is also possible that a single lifespan can proceed through multiple source nodes in one run.//// When different drivers in the task has different pipelineExecutionStrategy, it adds additional complexity.// For example, when driver B is ungrouped and driver A, C, D is grouped, you could have something like this:// TaskWide : [B]// Lifespan 10: A [ ] C D// Lifespan 20: [A] C D// Lifespan 30: A [ ] C D// In this example, Lifespan 30 cannot start executing drivers in pipeline C because pipeline B// hasn't finished scheduling yet (albeit in a different lifespan).// Similarly, it wouldn't make sense for TaskWide to start executing drivers in pipeline B until at least// one lifespan has finished scheduling pipeline A.// This is why getSchedulingPlanNode returns an Optional.while (true) {Optional<PlanNodeId> optionalSchedulingPlanNode = schedulingLifespan.getSchedulingPlanNode();if (optionalSchedulingPlanNode.isEmpty()) {break;}PlanNodeId schedulingPlanNode = optionalSchedulingPlanNode.get();// driverRunnerFactoriesWithSplitLifeCycle存储的PlanNode实际上就是Source Node类型,因此这些Split// 对应了数据源的数据,因此需要被Repartitioning,以便能够被Trino对数据进行分桶(重分区),满足在之前章节讲到的// PartitionId -> WorkNode的数据分发策略。DriverSplitRunnerFactory partitionedDriverRunnerFactory = driverRunnerFactoriesWithSplitLifeCycle.get(schedulingPlanNode);PendingSplits pendingSplits = pendingSplitsByPlanNode.get(schedulingPlanNode).getLifespan(lifespan);// Enqueue driver runners with driver group lifecycle for this driver life cycle, if not already enqueued.if (!lifespan.isTaskWide() && !schedulingLifespan.getAndSetDriversForDriverGroupLifeCycleScheduled()) {// 如果当前要调度的LifeSpan的类型为Grouped,同时还没有被调度,就走这里,为当前SqlTask的所有的Pipeline// 创建DriverRunner。// 此时,这个SqlTask是某个一个中继Stage/PlanFragment的某个分区的任务实例,因此属于这个LifSpan的Split的// 信息已经确定了,而且已经被分区过了,因此内部会调用enqueueDriverSplitRunner(true, runners);方法,// 直接唤起每一个DriverRunnerscheduleDriversForDriverGroupLifeCycle(lifespan);}// Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.// 如果lifespan是TASK WIDE,那么这些Split是叶子Split,即TableScan Splits,因此不能直接唤起它们的Runner,// 需要根据当前Worker Node的负载进行调度ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder();for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) {// create a new driver for the splitrunners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));}enqueueDriverSplitRunner(false, runners.build());// If all driver runners have been enqueued for this plan node and driver life cycle combination,// move on to the next plan node.if (pendingSplits.getState() != NO_MORE_SPLITS) {break;}// 到这里,不会再有新的Split了,那么就进当前的SqlTaskExecution实例进行清理partitionedDriverRunnerFactory.noMoreDriverRunner(ImmutableList.of(lifespan));pendingSplits.markAsCleanedUp();schedulingLifespan.nextPlanNode();madeProgress = true;if (schedulingLifespan.isDone()) {break;}}}if (!madeProgress) {break;}}if (sourceUpdate.isNoMoreSplits()) {// 通知SchedulingLifespanManager,当前TaskSource对应的PlanNode的工作已经完成。schedulingLifespanManager.noMoreSplits(sourceUpdate.getPlanNodeId());}}
SqlTaskExecution::scheduleDriversForTaskLifeCycle
SqlTask接收到创建请求时,会尝试创建SqlTaskExecution,作为此SqlTask的执行实体,并在完成实例创建后,调用
scheduleDriversForTaskLifeCycle()方法开始调度。
// scheduleDriversForTaskLifeCycle and scheduleDriversForDriverGroupLifeCycle are similar.// They are invoked under different circumstances, and schedules a disjoint set of drivers, as suggested by their names.// They also have a few differences, making it more convenient to keep the two methods separate.private void scheduleDriversForTaskLifeCycle(){// This method is called at the beginning of the task.// It schedules drivers for all the pipelines that have task life cycle.List<DriverSplitRunner> runners = new ArrayList<>();for (DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithTaskLifeCycle) {for (int i = 0; i < driverRunnerFactory.getDriverInstances().orElse(1); i++) {runners.add(driverRunnerFactory.createDriverRunner(null, Lifespan.taskWide()));}}// driverRunnerFactoriesWithTaskLifeCycle存储的是中继续Stage/PlanFragment对应的某个分区的DriverSplitRunners,因此可以立即执行enqueueDriverSplitRunner(true, runners);for (DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithTaskLifeCycle) {driverRunnerFactory.noMoreDriverRunner(ImmutableList.of(Lifespan.taskWide()));verify(driverRunnerFactory.isNoMoreDriverRunner());}}
SqlTaskExecution::scheduleDriversForDriverGroupLifeCycle
private void scheduleDriversForDriverGroupLifeCycle(Lifespan lifespan){// This method is called when a split that belongs to a previously unseen driver group is scheduled.// It schedules drivers for all the pipelines that have driver group life cycle.if (lifespan.isTaskWide()) {checkArgument(driverRunnerFactoriesWithDriverGroupLifeCycle.isEmpty(), "Instantiating pipeline of driver group lifecycle at task level is not allowed");return;}List<DriverSplitRunner> runners = new ArrayList<>();for (DriverSplitRunnerFactory driverSplitRunnerFactory : driverRunnerFactoriesWithDriverGroupLifeCycle) {for (int i = 0; i < driverSplitRunnerFactory.getDriverInstances().orElse(1); i++) {runners.add(driverSplitRunnerFactory.createDriverRunner(null, lifespan));}}// 与scheduleDriversForTaskLifeCycle方法类似,这里的DriverSplitRunners是属于某个中继Stage/PlanFragment,可以立即执行enqueueDriverSplitRunner(true, runners);for (DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithDriverGroupLifeCycle) {driverRunnerFactory.noMoreDriverRunner(ImmutableList.of(lifespan));}}
DriverSplitRunner的执行,基于时间片的Split调度
DriverSplitRunner,负责应用一组物理
Operators到一个Split上
,表示一段完整的数据处理过程,且数据处理的最小单元是Page
。
待后续的文章中详解,放在这里,篇幅过长了。