Flink动态分区裁剪

1 原理

1.1 静态分区裁剪与动态分区裁剪

  静态分区裁剪的原理跟谓词下推是一致的,只是适用的是分区表,通过将where条件中的分区条件下推到数据源达到减少分区扫描的目的
  动态分区裁剪应用于Join场景,这种场景下,分区条件在join的一侧,另一侧无法应用条件进行裁剪
  静态分区裁剪是静态的规则优化下推,动态分区裁剪是运行时过滤,通过对有条件一侧的表进行过滤,提取结果中的分区数据,然后传递到另一侧的表中进行数据过滤

  • 静态分区过滤
SELECT * FROM t1 WHERE parcl = 'A'
  • 动态分区过滤
    正常SQL优化,只能在t1表应用Filter进行提前过滤,而动态分区过滤先对t1进行扫描,将结果中的分区值进行收集,基于join的等值条件,传递到t2
SELECT * FROM t1 JOIN t2 on t1.parcl =t2.parcl WHERE t1.cl1 = 'A'

1.2 基础设计

  Flink的FLIP-27设计当中,分片由SplitEnumerator产生,因此动态分区裁剪设计在SplitEnumerator当中
  Flink的设计中引入了Coordinator协调器的角色,运行时由Coordinator将维度表的结果传送至事实表的SplitEnumerator进行分区过滤
  由于之前的Source没有输入,此次为DPP引入一个新的TableScan的物理节点
  设计上,首先是基于规则将事实表的TableScan转为BatchPhysicalDynamicFilteringTableSourceScan,然后真正的动态分区裁剪行为在BatchPhysicalDynamicFilteringTableSourceScan当中进行

2 框架设计

2.1 整体流程

在这里插入图片描述

  首先是正常的SQL解析过程,这里会识别DPP模式并进行一定的转换设置

  • 提交JobGraph
    JobManager先调度维表和一个负责维表数据收集的DynamicFilteringDataCollector,维表的数据会向Join和DynamicFilteringDataCollector双向发送
    DynamicFilteringDataCollector将收集过滤后的数据发送至SourceCoordinator
    DynamicFileSplitEnumerator基于SourceCoordinator的维表分区数据对事实表进行切片
    事实表基于切片产生任务,由JobManager调度
    事实表的任务从DynamicFileSplitEnumerator获取切片,读取数据并发送到Join
  • Join计算
    从整体上来看,DPP对事实表进行了提前的数据过滤,可以大量减少IO;但另一方面,两张表的并行扫描在DPP后变成了顺序扫描

2.2 计划转换

在这里插入图片描述
  Flink基于DPP的执行计划转换如下,核心是加入了一个DynamicFilteringDataCollector的节点,用于维表数据的收集
  前三幅图的转化是加入有输入边的TableScan以后优化器进行优化处理的转化流程
  后两幅图的转化是因为DynamicFilteringDataCollector和事实表之间没有数据依赖关系,调度可能随机,在资源不足时产生死锁(这一块需要研究,线性的算子图为什么没有依赖关系)。为了解决这种死锁,加入了一个依赖运算符,以此来顺序调度DynamicFilteringDataCollector和事实表

3 相关接口

3.1 SupportsDynamicFiltering

  Flink Source数据源支持的能力都是通过提供公共接口然后由数据源各自实现的,动态分区裁剪也是如此,SupportsDynamicFiltering就是提供由数据源实现的公共接口(Flink数据源能力集在flink-table\flink-table-common\src\main\java\org\apache\flink\table\connector\source\abilities和flink-table-planner\src\main\java\org\apache\flink\table\planner\plan\abilities\source,后者没看到实现类,不确定用法)
  接口提供了两个方法抽象:listAcceptedFilterFields、applyDynamicFiltering
  listAcceptedFilterFields接口返回数据源支持做动态分区过滤的字段(也就是可能有些数据源只有部分字段支持)
  applyDynamicFiltering将用于动态分区的字段下推到数据源,具体的过滤数据在运行时提供,这里只是提供字段,用于数据源构建分片器时传入
  目前看只有Hive表的两个实现类HiveTableSource和HiveLookupTableSource实现了该接口(官方设计文档说文件系统也支持,得具体研究)

3.2 DynamicFilteringEvent

  传输DynamicFilteringData的Source数据源,由DynamicFilteringDataCollector发送,经由DynamicFilteringDataCollectorCoordinator和SourceCoordinator发送到数据源的enumerator
  接口的核心就是封装了DynamicFilteringData,并提供获取方法

public class DynamicFilteringEvent implements SourceEvent {private final DynamicFilteringData data;

3.3 DynamicFilteringData

  就是上面DynamicFilteringEvent封装的内容,是动态过滤的数据,数据以RowData形式组建,数据的顺序以SupportsDynamicFiltering#applyDynamicFiltering接口传入的字段为顺序,没有单独再定义schema
  接口有一个重点方法doPrepare,整体功能就是将接收到的序列化数据进行解析,然后重组成RowData队列数据

private void doPrepare() {this.dataMap = new HashMap<>();if (isFiltering) {this.fieldGetters =IntStream.range(0, rowType.getFieldCount()).mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i)).toArray(RowData.FieldGetter[]::new);TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());for (byte[] bytes : serializedData) {try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {RowData partition = serializer.deserialize(inView);List<RowData> partitions =dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());partitions.add(partition);} catch (Exception e) {throw new TableException("Unable to deserialize the value.", e);}}}
}

3.4 DynamicFileEnumerator

  FLIP-27设计中,数据源(目前是Hive和文件系统)切片是由FileEnumerator提供的,这个接口不支持动态分区裁剪,因此在此接口上又提供了子接口DynamicFileEnumerator来支持动态分区裁剪,目前看也是只有Hive实现HiveSourceDynamicFileEnumerator
  DynamicFileEnumerator当中就是额外提供了一个创建自身的工厂类和一个setDynamicFilteringData方法,setDynamicFilteringData方法的作用就是传入上面的DynamicFilteringData

void setDynamicFilteringData(DynamicFilteringData data);/** Factory for the {@link DynamicFileEnumerator}. */
@FunctionalInterface
interface Provider extends FileEnumerator.Provider {DynamicFileEnumerator create();
}

3.5 HiveSourceDynamicFileEnumerator

  DynamicFileEnumerator的实现类,setDynamicFilteringData当中完成了分区过滤,就是拿全量分区与DynamicFilteringData做比较

public void setDynamicFilteringData(DynamicFilteringData data) {LOG.debug("Filtering partitions of table {} based on the data: {}", table, data);if (!data.isFiltering()) {finalPartitions = allPartitions;return;}finalPartitions = new ArrayList<>();RowType rowType = data.getRowType();Preconditions.checkArgument(rowType.getFieldCount() == dynamicFilterPartitionKeys.size());for (HiveTablePartition partition : allPartitions) {RowData partitionRow = createRowData(rowType, partition.getPartitionSpec());if (partitionRow != null && data.contains(partitionRow)) {finalPartitions.add(partition);}}LOG.info("Dynamic filtering table {}, original partition number is {}, remaining partition number {}",table,allPartitions.size(),finalPartitions.size());
}

  其中allPartitions是在HiveSourceBuilder当中确定的,后续通过接口传入到HiveSourceDynamicFileEnumerator,主要的获取方法如下,核心就是利用HiveClient的listPartitions方法获取分区

} else if (partitions == null) {partitions =HivePartitionUtils.getAllPartitions(jobConf, hiveVersion, tablePath, partitionKeys, null);
}

  enumerateSplits方法返回分片,本质就是调用HiveSourceFileEnumerator.createInputSplits,主要差异点就在与这里传入的分区是上面过滤过的分区

public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)throws IOException {return new ArrayList<>(HiveSourceFileEnumerator.createInputSplits(minDesiredSplits, finalPartitions, jobConf, false));
}

3.6 配置项

  动态分区裁剪新增了配置项:table.optimizer.dynamic-filtering.enabled

public static final ConfigOption<Boolean> TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED =key("table.optimizer.dynamic-filtering.enabled").booleanType().defaultValue(true).withDescription("When it is true, the optimizer will try to push dynamic filtering into scan table source,"+ " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime.");

4 优化器

  上面的接口是保证数据源上对动态分区裁剪的支持,完整的功能还需要优化器的参与,具体的优化器实现是FlinkDynamicPartitionPruningProgram,只支持Batch
  注意这里的基类不是calcite的RelRule,而是Flink的FlinkOptimizeProgram,因为目前calcite的HepPlanner不支持部分确定场景和动态替换场景

4.1 转化事实表

  优化器的核心就是对事实表做节点替换,变成BatchPhysicalDynamicFilteringTableSourceScan

Tuple2<Boolean, RelNode> relTuple =DynamicPartitionPruningUtils.canConvertAndConvertDppFactSide(rightSide,joinInfo.rightKeys,leftSide,joinInfo.leftKeys);

  最终的实现逻辑在DynamicPartitionPruningUtils.convertDppFactSide,最终目标就是创建一个BatchPhysicalDynamicFilteringDataCollector,并基于此完成BatchPhysicalDynamicFilteringTableSourceScan创建

BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector =createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices);isChanged = true;
return new BatchPhysicalDynamicFilteringTableSourceScan(scan.getCluster(),scan.getTraitSet(),scan.getHints(),tableSourceTable,dynamicFilteringDataCollector);

  在最终达到创建BatchPhysicalDynamicFilteringTableSourceScan之前,有很多的分支路径和条件判断
  分支路径是因为接口传入的RelNode可能不是TableScan(也就是join的左右字表并不是直接的TableScan,还有其他的如filter存在),分支路径的逻辑基本统一,就是继续解析RelNode并迭代调用convertDppFactSide方法,如下

} else if (rel instanceof Exchange || rel instanceof Filter) {return rel.copy(rel.getTraitSet(),Collections.singletonList(convertDppFactSide(rel.getInput(0), joinKeys, dimSide, dimSideJoinKey)));

  TableScan分支是最终创建BatchPhysicalDynamicFilteringTableSourceScan的分支,创建之前有很多的条件判断
  如果已经是BatchPhysicalDynamicFilteringTableSourceScan,直接返回

if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {// rule appliedreturn rel;
}

  如果不是TableSourceTable,直接返回

TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
if (tableSourceTable == null) {return rel;
}

  如果没有分区键,直接返回

List<String> partitionKeys = catalogTable.getPartitionKeys();
if (partitionKeys.isEmpty()) {return rel;
}

  如果数据源表不支持动态分区过滤,直接返回

DynamicTableSource tableSource = tableSourceTable.tableSource();
if (!(tableSource instanceof SupportsDynamicFiltering)|| !(tableSource instanceof ScanTableSource)) {return rel;
}

  不支持数据源聚合的情况

// Dpp cannot success if source have aggregate push down spec.
if (Arrays.stream(tableSourceTable.abilitySpecs()).anyMatch(spec -> spec instanceof AggregatePushDownSpec)) {return rel;
}

  只支持FLIP-27定义的新Source接口

if (!isNewSource((ScanTableSource) tableSource)) {return rel;
}

  join使用的字段不存在的不支持

List<String> candidateFields =joinKeys.stream().map(i -> scan.getRowType().getFieldNames().get(i)).collect(Collectors.toList());
if (candidateFields.isEmpty()) {return rel;
}

  接下来还是对分区字段的一个校验,前面SupportsDynamicFiltering有一个方法返回可做动态过滤的分区字段,这里就是基于这个做校验

List<String> acceptedFilterFields =getSuitableDynamicFilteringFieldsInFactSide(tableSource, candidateFields);if (acceptedFilterFields.size() == 0) {return rel;
}

  这里这个acceptedFilterFields接下来有多个使用,SupportsDynamicFiltering.applyDynamicFiltering的入参就是它

// Apply suitable accepted filter fields to source.
((SupportsDynamicFiltering) tableSource).applyDynamicFiltering(acceptedFilterFields);

  BatchPhysicalDynamicFilteringDataCollector的构造参数也有基于它的数据

List<Integer> acceptedFieldIndices =acceptedFilterFields.stream().map(f -> scan.getRowType().getFieldNames().indexOf(f)).collect(Collectors.toList());
List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
for (int i = 0; i < joinKeys.size(); ++i) {if (acceptedFieldIndices.contains(joinKeys.get(i))) {dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i));}
}BatchPhysicalDynamicFilteringDataCollector dynamicFilteringDataCollector =createDynamicFilteringConnector(dimSide, dynamicFilteringFieldIndices);

4.2 维表判断

  优化器在执行转化事实表之前还有很多的条件判断

  基础的配置开关判断

if (!ShortcutUtils.unwrapContext(root).getTableConfig().get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {return root;
}

  不是join或者join的类型不符

if (!(rel instanceof Join)|| !DynamicPartitionPruningUtils.isSuitableJoin((Join) rel)) {// Now dynamic partition pruning supports left/right join, inner and semi
// join. but now semi join can not join reorder.
if (join.getJoinType() != JoinRelType.INNER&& join.getJoinType() != JoinRelType.SEMI&& join.getJoinType() != JoinRelType.LEFT&& join.getJoinType() != JoinRelType.RIGHT) {return false;
}

  然后最核心的有一个维表的判断,因为动态分区过滤是对事实表进行的转换,而事实表的过滤数据来自维表,所以需要识别;此外,动态分区过滤需要看效果,如果收集的维表数据过大是会取消执行动态分区过滤的
  维表判断有一个专门的类DppDimSideChecker,判断逻辑是:如果joinKeys为空,则维表无需处理joinKeys,因为这些key已经被规则处理了;如果joinKeys发生了变化,则不是维表

public boolean isDppDimSide() {visitDimSide(this.relNode);return hasFilter && !hasPartitionedScan && tables.size() == 1;
}

  核心是hasFilter和hasPartitionedScan两个成员的值,在DppDimSideChecker.visitDimSide当中赋值,visitDimSide有4.1中转化事实表一样的多分支,本质上最后都走进TableScan分支

  TableScan分支当中,首先是对过滤条件的判断,部分过滤条件是不支持做动态分区过滤的,比如not null只过滤一个默认分区,效果几乎没有

if (!hasFilter&& table.abilitySpecs() != null&& table.abilitySpecs().length != 0) {for (SourceAbilitySpec spec : table.abilitySpecs()) {if (spec instanceof FilterPushDownSpec) {List<RexNode> predicates = ((FilterPushDownSpec) spec).getPredicates();for (RexNode predicate : predicates) {if (isSuitableFilter(predicate)) {hasFilter = true;}}}}
}

  其次是分区判断,必须是分区表

CatalogTable catalogTable = table.contextResolvedTable().getResolvedTable();
if (catalogTable.isPartitioned()) {hasPartitionedScan = true;return;
}

  最后就是数据源表必须只有一个,不能多数据源

// To ensure there is only one source on the dim side.
setTables(table.contextResolvedTable());

5 BatchExecTableSourceScan

  Flink SQL的整体流程当中,有一步FlinkPhysicalRel向ExecNodeGraph转化(PlannerBase.translateToExecNodeGraph),在这一步当中,BatchPhysicalDynamicFilteringTableSourceScan最终转为BatchExecTableSourceScan,在BatchPhysicalDynamicFilteringTableSourceScan的translateToExecNode

override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,InputProperty.DEFAULT,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)
}

  BatchExecTableSourceScan有一个核心方法translateToPlanInternal,也是Flink SQL转化的一个核心步骤,完成向Transformation的转化,Transformation是Flink任务执行的基础,调用点就在PlannerBase.translate的最后一步translateToPlan

val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
val transformations = translateToPlan(execGraph)
afterTranslation()
transformations

5.1 初步转化

  首先是调用父类CommonExecTableSourceScan的translateToPlanInternal,这是一个通用转化,与动态分区过滤无关,主要就是根据provider的类型调用不同的接口方法创建Transformation
  以Hive表HiveTableSource来说,provider获取如下

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {return new DataStreamScanProvider() {@Overridepublic DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {return getDataStream(providerContext, execEnv);}@Overridepublic boolean isBounded() {return !isStreamingSource();}};
}

  这是一个DataStreamScanProvider,所以在CommonExecTableSourceScan走DataStreamScanProvider分支

} else if (provider instanceof DataStreamScanProvider) {Transformation<RowData> transformation =((DataStreamScanProvider) provider).produceDataStream(createProviderContext(config), env).getTransformation();meta.fill(transformation);transformation.setOutputType(outputTypeInfo);return transformation;

  因为provider已经校验过boundedness了,所以后一步直接设置boundedness

// the boundedness has been checked via the runtime provider already, so we can safely
// declare all legacy transformations as bounded to make the stream graph generator happy
ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);

  后一步是动态分区裁剪的支持性判断,要么还有输入,要么是SourceTransformation才支持

// no dynamic filtering applied
if (getInputEdges().isEmpty() || !(transformation instanceof SourceTransformation)) {return transformation;
}

5.2 dynamicFilteringDataCollector的监听设置

  前面创建BatchPhysicalDynamicFilteringTableSourceScan时传入的input是BatchPhysicalDynamicFilteringDataCollector,转化后是BatchExecDynamicFilteringDataCollector,这里直接获取

// handle dynamic filtering
BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector =getDynamicFilteringDataCollector(this);

  之后设置监听,首先把dynamicFilteringDataCollector转化为Transformation

// Set the dynamic filtering data listener ids for both sides. Must use translateToPlan to
// avoid duplication.
Transformation<Object> dynamicFilteringTransform =dynamicFilteringDataCollector.translateToPlan(planner);

  然后记录id,这个id是对象创建时通过UUID产生的

((SourceTransformation<?, ?, ?>) transformation).setCoordinatorListeningID(dynamicFilteringDataListenerID);
((DynamicFilteringDataCollectorOperatorFactory)((OneInputTransformation<?, ?>) dynamicFilteringTransform).getOperatorFactory()).registerDynamicFilteringDataListenerID(dynamicFilteringDataListenerID);

5.3 最终transformation

  之后生成最终的transformation,这里有两个分支,由needDynamicFilteringDependency区分,这是参数的设置在PlannerBase的translateToExecNodeGraph,最终设置在DynamicFilteringDependencyProcessor.process
  判断条件应该是BatchExecTableSourceScan有输入,并且输入不是BatchExecMultipleInput

// The character of the dynamic filter scan is that it
// has an input.
if (input instanceof BatchExecTableSourceScan&& input.getInputEdges().size() > 0) {dynamicFilteringScanDescendants.computeIfAbsent((BatchExecTableSourceScan) input,ignored -> new ArrayList<>()).add(node);
}for (Map.Entry<BatchExecTableSourceScan, List<ExecNode<?>>> entry :dynamicFilteringScanDescendants.entrySet()) {if (entry.getValue().size() == 1) {ExecNode<?> next = entry.getValue().get(0);if (next instanceof BatchExecMultipleInput) {// the source can be chained with BatchExecMultipleInputcontinue;}}// otherwise we need dependenciesentry.getKey().setNeedDynamicFilteringDependency(true);
}

  在BatchExecTableSourceScan当中,不依赖的分支如下

if (!needDynamicFilteringDependency) {planner.addExtraTransformation(dynamicFilteringInputTransform);return transformation;

  这个dynamicFilteringInputTransform最终被加进PlannerBase的extraTransformations,并最终在translateToPlan当中加在transformations的最后

transformations ++ planner.extraTransformations

  另一个分支是构建MultipleInputTransformation,两个输入分别是TableSource和dynamicFilteringInputTransform

MultipleInputTransformation<RowData> multipleInputTransformation =new MultipleInputTransformation<>("Order-Enforcer",new ExecutionOrderEnforcerOperatorFactory<>(),transformation.getOutputType(),transformation.getParallelism(),false);
multipleInputTransformation.addInput(dynamicFilteringInputTransform);
multipleInputTransformation.addInput(transformation);return multipleInputTransformation;

6 DynamicFilteringDataCollectorOperator

  BatchExecTableSourceScan当中很重要的一个点就是BatchExecDynamicFilteringDataCollector,这个对象最终也要转化成Flink的算子Operator,作用就是收集维表的数据,过滤后发送到Coordinator
  BatchExecDynamicFilteringDataCollector.translateToPlanInternal当中创建了DynamicFilteringDataCollectorOperatorFactory并传给InputTransformation,DynamicFilteringDataCollectorOperatorFactory.createStreamOperator当中创建了DynamicFilteringDataCollectorOperator,这就是最后的算子
  BatchExecDynamicFilteringDataCollector当中还有一个配置,配置了动态分区裁剪允许收集维表数据的最大值,以防止OOM的产生。这个配置最终传递到了DynamicFilteringDataCollectorOperator

private static final ConfigOption<MemorySize> TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD =key("table.exec.dynamic-filtering.threshold").memoryType().defaultValue(MemorySize.parse("8 mb")).withDescription("If the collector collects more data than the threshold (default is 8M), "+ "an empty DynamicFilterEvent with a flag only will be sent to Coordinator, "+ "which could avoid exceeding the akka limit and out-of-memory (see "+ AkkaOptions.FRAMESIZE.key()+ "). Otherwise a DynamicFilterEvent with all deduplicated records will be sent to Coordinator.");

  此外,DynamicFilteringDataCollectorOperatorFactory当中也提供了DynamicFilteringDataCollectorOperatorCoordinator的创建,在DefaultExecutionGraph.initializeJobVertex的流程中获取创建

public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {return new DynamicFilteringDataCollectorOperatorCoordinator.Provider(operatorID, new ArrayList<>(dynamicFilteringDataListenerIDs));
}

6.1 open

  open里面最重要的就是FieldGetter数组的创建

this.fieldGetters =IntStream.range(0, dynamicFilteringFieldIndices.size()).mapToObj(i ->RowData.createFieldGetter(dynamicFilteringFieldType.getTypeAt(i),dynamicFilteringFieldIndices.get(i))).toArray(FieldGetter[]::new);

  其本质就是根据字段类型用特定的方式读取数据

switch (fieldType.getTypeRoot()) {case CHAR:case VARCHAR:fieldGetter = row -> row.getString(fieldPos);break;case BOOLEAN:fieldGetter = row -> row.getBoolean(fieldPos);break;

6.2 processElement

  核心处理逻辑,接收维表的数据并做过滤,数据会缓存在buffer中并等待最后发送。因为数据全部缓存在buffer当中,所以过大的话会引起JobManager的OOM,需要前面将的配置做限制;此外数据需要一次发送给Coordinator,为防止akka消息超限,也需要做限制
  方法的头和尾都有一个数据超限的判断,尾部判断超限时会清空buffer缓存

if (exceedThreshold()) {// Clear the filtering data and disable self by leaving the currentSize unchangedbuffer.clear();LOG.warn("Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.",currentSize,threshold);
}

  处理函数的基本逻辑就是取出维表数据当中用于动态分区过滤的字段数据,组建成一个新的数据并序列化缓存起来
  这里先取维表的部分字段组成动态分区过滤的数据

RowData value = element.getValue();
GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {rowData.setField(i, fieldGetters[i].getFieldOrNull(value));
}

  对数据做序列化并缓存进buffer,缓存的同时会做去重,利用的buffer的特性

try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);serializer.serialize(rowData, wrapper);boolean duplicated = !buffer.add(baos.toByteArray());if (duplicated) {return;}currentSize += baos.size();
}

6.3 finish/sendEvent

  动态分区裁剪只适用批作业,数据处理完成,operator也会接受finish调用,一次性处理数据,处理逻辑就是调用sendEvent发送buffer中缓存的数据
  finish当中只是做了一个简单的超限判断,然后就是直接调用的sendEvent

if (exceedThreshold()) {LOG.info("Finish collecting. {} bytes are collected which exceeds the threshold {}. Sending empty data.",currentSize,threshold);
} else {LOG.info("Finish collecting. {} bytes in {} rows are collected. Sending the data.",currentSize,buffer.size());
}
sendEvent();

  sendEvent就是把buffer封装成DynamicFilteringData,然后再封装成DynamicFilteringEvent,最后发送到Coordinator

if (exceedThreshold()) {dynamicFilteringData =new DynamicFilteringData(typeInfo, dynamicFilteringFieldType, Collections.emptyList(), false);
} else {dynamicFilteringData =new DynamicFilteringData(typeInfo, dynamicFilteringFieldType, new ArrayList<>(buffer), true);
}DynamicFilteringEvent event = new DynamicFilteringEvent(dynamicFilteringData);
operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));

7 DynamicFilteringDataCollectorOperatorCoordinator

  核心就是一个handle处理handleEventFromOperator,负责接收处理operator发送的维表数据
  先是从事件中还原DynamicFilteringData

DynamicFilteringData currentData =((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData();

  接下来有一个数据有效性判断,因为在推测机制或者失败重试的场景下,消息可能发送两次,所以需要做一个数据是否已经接受的判断。因为只适用批处理,所以就一次赋值

if (receivedFilteringData == null) {receivedFilteringData = currentData;
} else {// Since there might be speculative execution or failover, we may receive multiple// notifications, and we can't tell for sure which one is valid for further processing.if (DynamicFilteringData.isEqual(receivedFilteringData, currentData)) {// If the notifications contain exactly the same data, everything is alright, and// we don't need to send the event again.return;} else {// In case the mismatching of the source filtering result and the dim data, which// may leads to incorrect result, trigger global failover for fully recomputing.throw new IllegalStateException("DynamicFilteringData is recomputed but not equal. "+ "Triggering global failover in case the result is incorrect. "+ " It's recommended to re-run the job with dynamic filtering disabled.");}
}

  经过一系列的判断,最后把event发送到SourceCoordinator

// Subtask index and attempt number is not necessary for handling
// DynamicFilteringEvent.
((OperatorCoordinator) oldValue).handleEventFromOperator(0, 0, event);

8 SourceCoordinator

  SourceCoordinator是FLP-27设计新增的一个内容,是一个独立的组件,不是ExecutionGraph的一部分,设计上可以运行在JobMaster上或者作为一个独立进程(具体实现应该是在JobMaster上)
  与SourceCoordinator(Enumerator)的通信经过RPC,分片的分配通过RPC以pull方式进行。SourceCoordinator的地址信息在TaskDeploymentDescriptor当中,或者基于JobMaster更新。SourceReader需要向SourceCoordinator注册并发送分片请求
  每个job至多有一个由JobMaster启动的SourceCoordinator。一个job可能有多个Enumerator,因为可能有多个不同的source,所有的Enumerator都运行在这个SourceCoordinator
  Split分配需要满足checkpoint语义。Enumerator有自己的状态(split分配),是全局checkpoint的一部分。当一个新的checkpoing被触发,CheckpointCoordinator首先发送barrier到SourceCoordinator。SourceCoordinator保存所有Enumerator的快照状态,然后发送barrier到SourceReader

8.1 handleSourceEvent

  handleEventFromOperator处理事件请求,判断是SourceEventWrapper类型,走handleSourceEvent分支

} else if (event instanceof SourceEventWrapper) {handleSourceEvent(subtask,attemptNumber,((SourceEventWrapper) event).getSourceEvent());

  handleSourceEvent调用enumerator的处理函数,进行数据分片,动态分区过滤有一个专门的实现:DynamicFileSplitEnumerator(这个会调用DynamicFileEnumerator)

} else {enumerator.handleSourceEvent(subtask, event);
}

9 Hive Connector变更

  官网提供的Hive变更如下,中间两行绿色部分为新增的类
在这里插入图片描述

9.1 DynamicFileSplitEnumerator

  就是8.1提到的DynamicFileSplitEnumerator,是SplitEnumerator的实现,所以有通用的handleSplitRequest等接口,DPP相关的核心点在handleSourceEvent接口

public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {if (sourceEvent instanceof DynamicFilteringEvent) {LOG.warn("Received DynamicFilteringEvent: {}", subtaskId);createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());} else {LOG.error("Received unrecognized event: {}", sourceEvent);}
}

  createSplitAssigner本质就是调用下层的DynamicFileEnumerator完成分片

private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();if (dynamicFilteringData != null) {fileEnumerator.setDynamicFilteringData(dynamicFilteringData);}Collection<FileSourceSplit> splits;try {splits = fileEnumerator.enumerateSplits(new Path[1], context.currentParallelism());allEnumeratingSplits =splits.stream().map(FileSourceSplit::splitId).collect(Collectors.toSet());} catch (IOException e) {throw new FlinkRuntimeException("Could not enumerate file splits", e);}splitAssigner = splitAssignerFactory.create(splits);
}

9.2 DynamicFileEnumerator

  目前的实现只看到HiveSourceDynamicFileEnumerator,重点接口是setDynamicFilteringData,方法中完成了分区提取,如3.5中所述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/707328.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Oracle中序列

1. Sequence 定义 在Oracle中可以用SEQUENCE生成自增字段。Sequence序列是Oracle中用于生成数字序列的对象&#xff0c;可以创建一个唯一的数字作为主键。 2. 为什么要用 Sequence 你可能有疑问为什么要使用序列&#xff1f; 不能使用一个存储主键的表并每次递增吗&#xf…

milvus upsert流程源码分析

milvus版本:v2.3.2 整体架构: Upsert 的数据流向: 1.客户端sdk发出Upsert API请求。 import numpy as np from pymilvus import (connections,Collection, )num_entities, dim 4, 3print("start connecting to Milvus") connections.connect("default",…

11.题目:编号3272 小蓝的漆房

题目&#xff1a; ###本题主要考察暴力&#xff0c;枚举&#xff0c;模拟 #include<bits/stdc.h> using namespace std; const int N1e410; int a[N],b[N]; int main(){ios::sync_with_stdio(0),cin.tie(0),cout.tie(0);int num;cin>>num;//样例个数循环for(int i…

socket套接字

前言 两个应用程序如果需要进行通讯最基本的一个前提就是能够唯一的标示一个进程&#xff0c;我们知道IP层的ip地址可以唯一标示主机&#xff0c;而TCP层协议和端口号可以唯一标示主机的一个进程&#xff0c;这样我们可以利用ip地址&#xff0b;协议&#xff0b;端口号唯一标示…

许战海矩阵|佐香园:“熟酱”的爆品战略

佐香园是辽宁帝华味精食品有限公司的主导品牌&#xff0c;是一个拥有自主研发与生产经营能力的民营企业&#xff0c;主要生产和销售香料、调味料以及食品添加剂等产品。该品牌自创办以来&#xff0c;一直坚持以市场为导向&#xff0c;走专业化发展之路&#xff0c;打造全方位的…

LACP——链路聚合控制协议

LACP——链路聚合控制协议 什么是LACP&#xff1f; LACP&#xff08;Link Aggregation Control Protocol&#xff0c;链路聚合控制协议&#xff09;是一种基于IEEE802.3ad标准的实现链路动态聚合与解聚合的协议&#xff0c;它是链路聚合中常用的一种协议。 链路聚合组中启用了…

TikTok矩阵系统的功能展示:深入解析与源代码分享!

今天我来和大家说说TikTok矩阵系统&#xff0c;在当今数字化时代&#xff0c;社交媒体平台已成为人们获取信息、交流思想和娱乐放松的重要渠道&#xff0c;其中&#xff0c;TikTok作为一款全球知名的短视频社交平台&#xff0c;凭借其独特的创意内容和强大的算法推荐系统&#…

【MQ05】异常消息处理

异常消息处理 上节课我们已经学习到了消息的持久化和确认相关的内容。但是&#xff0c;光有这些还不行&#xff0c;如果我们的消费者出现问题了&#xff0c;无法确认&#xff0c;或者直接报错产生异常了&#xff0c;这些消息要怎么处理呢&#xff1f;直接丢弃&#xff1f;这就是…

带大家做一个,易上手的家常蒜香菠菜

一捆 菠菜 四瓣蒜 蒜去皮切末 菠菜切段 多清洗几次 因为菠菜上面的土真的是太多了 菠菜下锅 加水煮一分钟左右 因为菠菜内的草酸成分非常高 所以这一步肯定是要的 然后将菠菜捞出来 干和叶子分开 锅中水倒掉 清洗一下 然后起锅烧油 下蒜末炒香 然后 下菠菜干 因为干熟的…

Python + Selenium —— 网页元素定位之标签名和链接文本定位

tag name tag name 为标签名定位&#xff0c;使用网页元素的标签名如a, div, input, span 等。 但是有一个问题&#xff0c;常见的标签名比如 在同一个页面上有非常多。会不会觉得 tag name 没什么用呢&#xff1f; 当然普通的模拟操作是不大有用&#xff0c;这个重复性实在…

笔记:GO1.19 带来的优化(重新编译juicefs)

## 背景 go编写的应用程序&#xff08;juicefs&#xff09;在k8s&#xff08;docker&#xff09;中运行&#xff0c;时不时出现 OOM Killed。 ## 分析 发现某些应用使用juicefs会导致内存使用飙升&#xff1b; k8s的pod给的内存资源&#xff1a;request 2G&#xff0c;limit…

基于springboot实现线上阅读系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现线上阅读系统演示 摘要 随着社会发展速度的愈来愈快&#xff0c;以及社会压力变化的越来越快速&#xff0c;致使很多人采取各种不同的方法进行解压。大多数人的稀释压力的方法&#xff0c;是捧一本书籍&#xff0c;心情地让自己沉浸在情节里面&#xff0c;以…

基于沁恒微 ch643q 多通道采集 adc 驱动层实现

一、代码 #include "main.h"/********************************************************************** fn ADC_Function_Init** brief Initializes ADC collection.** return none*/ void ADC_Function_Init(void) {ADC_InitTypeDef ADC_InitStructure …

pdffactory pro 8中文破解版

详细介绍 PdfFactory&#xff0c;PDF文档虚拟打印机&#xff0c;无须Acrobat即可创建Adobe PDF文件&#xff0c;创建PDF文件的方法比其他方法更方便和高效。支持将多个文档整合到一个PDF文件、增加字体和便签、PDF加密、去水印、压缩优化。 FinePrint&#xff0c;Windows虚拟…

【踩坑】修复xrdp无法关闭Authentication Required验证窗口

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 问题如下&#xff0c;时不时出现&#xff0c;有时还怎么都关不掉&#xff0c;很烦&#xff1a; 解决方法一&#xff1a;命令行输入 dbus-send --typemethod_call --destorg.gnome.Shell /org/gnome/Shell org.gn…

js 面试题--事件循环event loop--宏任务和微任务

1 事件循环event loop概念&#xff1a; js 是非阻塞单线程语言&#xff0c;js在执行过程中会产生执行环境&#xff0c;执行环境会按顺序添加到执行栈中&#xff0c;先执行同步栈中的任务&#xff0c;当遇到异步任务时会添加到task队列中&#xff0c;同步栈执行完后&#xff0c…

一文读懂什么是 OCR 识别

在数字化时代&#xff0c;信息处理和数据管理是企业运营的重要环节。然而&#xff0c;手工输入信息存在效率低和准确性低的问题&#xff0c;严重影响了企业的工作流程和决策过程。因此&#xff0c;OCR&#xff08;Optical Character Recognition&#xff09;识别技术的应用变得…

【Unity】导入IAP插件后依赖冲突问题 com.android.billingclient冲突

【Unity】Attribute meta-data#com.google.android.play.billingclient.version 多版本库冲突_unity billingclient-CSDN博客 打开mainTemplate.gradle 找到dependencies { } 在里面末尾加上如下&#xff1a; configurations.all {exclude group: com.android.billingclien…

uni-app 实现拍照后给照片加水印功能

遇到个需求需要实现&#xff0c;研究了一下后写了个demo 本质上就是把拍完照后的照片放到canvas里&#xff0c;然后加上水印样式然后再重新生成一张图片 代码如下&#xff0c;看注释即可~使用的话记得还是得优化下代码 <template><view class"content"&g…

单词倒排——c语言解法

以下是题目&#xff1a; 这个题中有三个点&#xff0c; 一个是将非字母的字符转换为空格&#xff0c; 第二是如果有两个连续的空格&#xff0c; 那么就可以将这两个连续的空格变成一个空格。 第三个点就是让单词倒排。 那么我们就可以将这三个点分别封装成三个函数。 还有就是…