Apache Flink类型及序列化研读生产应用|得物技术

一、背景

序列化是指将数据从内存中的对象序列化为字节流,以便在网络中传输或持久化存储。序列化在Apache Flink中非常重要,因为它涉及到数据传输和状态管理等关键部分。Apache Flink以其独特的方式来处理数据类型以及序列化,这种方式包括它自身的类型描述符、泛型类型提取以及类型序列化框架。本文将简单介绍它们背后的概念和基本原理,侧重分享在DataStream、Flink SQL自定义函数开发中对数据类型和序列的应用,以提升任务的运行效率。

二、简单理论阐述(基于Flink 1.13)

主要参考Apache Flink 1.13

支持的数据类型

  • Java Tuples and Scala Case Classes
  • Java POJOs
  • Primitive Types
  • Regular Classes
  • Values
  • Hadoop Writables
  • Special Types

具体的数据类型定义在此就不详细介绍了,具体描述可以前往Flink官网查看。

TypeInformation

Apache Flink量身定制了一套序列化框架,好处就是选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好地选取序列化方式,进行数据布局,节省数据的存储空间,甚至直接操作二进制数据。

TypeInformation类是Apache Flink所有类型描述符的基类,通过阅读源码,我们可以大概分成以下几种数据类型。

  • Basic types:所有的Java类型以及包装类:void,String,Date,BigDecimal,and BigInteger等。
  • Primitive arrays以及Object arrays
  • Composite types
  • Flink Java Tuples(Flink Java API的一部分):最多25个字段,不支持空字段
  • Scala case classes(包括Scala Tuples):不支持null字段
  • Row:具有任意数量字段并支持空字段的Tuples
  • POJO 类:JavaBeans
  • Auxiliary types (Option,Either,Lists,Maps,…)
  • Generic types:Flink内部未维护的类型,这种类型通常是由Kryo序列化。

我们简单看下该类的方法,核心是createSerializer,获取org.apache.flink.api.common.typeutils.TypeSerializer,执行序列化以及反序列化方法,主要是:

  • org.apache.flink.api.common.typeutils.TypeSerializer#serialize
  • org.apache.flink.api.common.typeutils.TypeSerializer#deserialize(org.apache.flink.core.memory.DataInputView)

何时需要数据类型获取

在Apache Flink中,算子间的数据类型传递是通过流处理的数据流来实现的。数据流可以在算子之间流动,每个算子对数据流进行处理并产生输出。当数据流从一个算子流向另一个算子时,数据的类型也会随之传递。Apache Flink使用自动类型推断机制来确定数据流中的数据类型。在算子之间传递数据时,Apache Flink会根据上下文自动推断数据的类型,并在运行时保证数据的类型一致性。

举个例子:新增一个kafka source,这个时候我们需要指定数据输出类型。

@Experimental
public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source,WatermarkStrategy<OUT> timestampsAndWatermarks,String sourceName,TypeInformation<OUT> typeInfo) {final TypeInformation<OUT> resolvedTypeInfo =getTypeInfo(source, sourceName, Source.class, typeInfo);return new DataStreamSource<>(this,checkNotNull(source, "source"),checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),checkNotNull(resolvedTypeInfo),checkNotNull(sourceName));
}

那输入类型怎么不需要指定呢?可以简单看下OneInputTransformation(单输入算子的基类)类的getInputType()方法,就是以输入算子的输出类型为输入类型的。

/** Returns the {@code TypeInformation} for the elements of the input. */
public TypeInformation<IN> getInputType() {return input.getOutputType();
}

这样source的输出类型会变成下一个算子的输入。整个DAG的数据类型都会传递下去。Apache Flink获取到数据类型后,就可以获取对应的序列化方法。

还有一种情况就是与状态后端交互的时候需要获取数据类型,特别是非JVM堆存储的后端,需要频繁的序列化以及反序列化,例如RocksDBStateBackend

举个例子,当我们使用ValueState时需要调用以下API:

org.apache.flink.streaming.api.operators.StreamingRuntimeContext#getState

@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(getExecutionConfig());return keyedStateStore.getState(stateProperties);
}public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {if (serializerAtomicReference.get() == null) {checkState(typeInfo != null, "no serializer and no type info");// try to instantiate and set the serializerTypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);// use cas to assure the singletonif (!serializerAtomicReference.compareAndSet(null, serializer)) {LOG.debug("Someone else beat us at initializing the serializer.");}}
}

可以从org.apache.flink.api.common.state.StateDescriptor#initializeSerializerUnlessSet方法看出,需要通过传入的数据类型来获取具体的序列化器。来执行具体的序列化和反序列化逻辑,完成数据的交互。

数据类型的自动推断

乍一看很复杂,各个环节都需要指定数据类型。其实大部分应用场景下,我们不用关注数据的类型以及序列化方式。Flink会尝试推断有关分布式计算期间交换和存储的数据类型的信息。

这里简单介绍Flink类型自动推断的核心类:

org.apache.flink.api.java.typeutils.TypeExtractor

在数据流操作中,Flink使用了泛型来指定输入和输出的类型。例如,DataStream表示一个具有类型T的数据流。在代码中使用的泛型类型参数T会被TypeExtractor类解析和推断。在运行时,Apache Flink会通过调用TypeExtractor的静态方法来分析操作的输入和输出,并将推断出的类型信息存储在运行时的环境中。

举个例子:用的最多的flatMap算子,当我们不指定返回类型的时候,Flink会调用TypeExtractor类自动去推断用户的类型。

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);return this.flatMap(flatMapper, outType);
}

一般看开源框架某个类的功能我都会先看类的注释,也看TypeExtractor的注释,大概意思这是一个对类进行反射分析的实用程序,用于确定返回的数据类型。

/*** A utility for reflection analysis on classes, to determine the return type of implementations of* transformation functions.** <p>NOTES FOR USERS OF THIS CLASS: Automatic type extraction is a hacky business that depends on a* lot of variables such as generics, compiler, interfaces, etc. The type extraction fails regularly* with either {@link MissingTypeInfo} or hard exceptions. Whenever you use methods of this class,* make sure to provide a way to pass custom type information as a fallback.*/

我们来看下其中一个核心的静态推断逻辑,org.apache.flink.api.java.typeutils.TypeExtractor#getUnaryOperatorReturnType

@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function,Class<?> baseClass,int inputTypeArgumentIndex,int outputTypeArgumentIndex,int[] lambdaOutputTypeArgumentIndices,TypeInformation<IN> inType,String functionName,boolean allowMissing) {Preconditions.checkArgument(inType == null || inputTypeArgumentIndex >= 0,"Input type argument index was not provided");Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");Preconditions.checkArgument(lambdaOutputTypeArgumentIndices != null,"Indices for output type arguments within lambda not provided");// explicit result type has highest precedenceif (function instanceof ResultTypeQueryable) {return ((ResultTypeQueryable<OUT>) function).getProducedType();}// perform extractiontry {final LambdaExecutable exec;try {exec = checkAndExtractLambda(function);} catch (TypeExtractionException e) {throw new InvalidTypesException("Internal error occurred.", e);}if (exec != null) {// parameters must be accessed from behind, since JVM can add additional parameters// e.g. when using local variables inside lambda function// paramLen is the total number of parameters of the provided lambda, it includes// parameters added through closurefinal int paramLen = exec.getParameterTypes().length;final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);// number of parameters the SAM of implemented interface has; the parameter indexing// applies to this rangefinal int baseParametersLen = sam.getParameterTypes().length;final Type output;if (lambdaOutputTypeArgumentIndices.length > 0) {output =TypeExtractionUtils.extractTypeFromLambda(baseClass,exec,lambdaOutputTypeArgumentIndices,paramLen,baseParametersLen);} else {output = exec.getReturnType();TypeExtractionUtils.validateLambdaType(baseClass, output);}return new TypeExtractor().privateCreateTypeInfo(output, inType, null);} else {if (inType != null) {validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);}return new TypeExtractor().privateCreateTypeInfo(baseClass,function.getClass(),outputTypeArgumentIndex,inType,null);}} catch (InvalidTypesException e) {if (allowMissing) {return (TypeInformation<OUT>)new MissingTypeInfo(functionName != null ? functionName : function.toString(), e);} else {throw e;}}
}
  • 首先判断该算子是否实现了ResultTypeQueryable接口,本质上就是用户是否显式指定了数据类型,例如我们熟悉的Kafka source就实现了该方法,当使用了JSONKeyValueDeserializationSchema,就显式指定了类型,用户自定义Schema就需要自己指定。
public class KafkaSource<OUT>implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,ResultTypeQueryable<OUT>
//deserializationSchema 是需要用户自己定义的。
@Override
public TypeInformation<OUT> getProducedType() {return deserializationSchema.getProducedType();
}                
//JSONKeyValueDeserializationSchema
@Override
public TypeInformation<ObjectNode> getProducedType() {return getForClass(ObjectNode.class);
}
  • 未实现ResultTypeQueryable接口,就会通过反射的方法获取ReturnType,判断逻辑大概是从是否是Java 8 lambda方法开始判断的。获取到返回类型后再通过new TypeExtractor()).privateCreateTypeInfo(output,inType,(TypeInformation)null)封装成Flink内部能识别的数据类型;大致分为2类,泛型类型变量TypeVariable以及非泛型类型变量。这个封装的过程也是非常重要的,推断的数据类型是Flink内部封装好的类型,序列化基本都很高效,如果不是, 就会推断为GenericTypeInfo走Kryo等序列化方式。如感兴趣,可以看下这块的源码,在此不再赘述。

通过以上的代码逻辑的阅读,我们大概能总结出以下结论:Flink内部维护了很多高效的序列化方式,通常只有数据类型被推断为org.apache.flink.api.java.typeutils.GenericTypeInfo时我们才需要自定义序列化类型,否则性能就是灾难;或者无法推断类型的时候,例如Flink SQL复杂类型有时候是无法自动推断类型的,当然某些特殊的对象Kryo也无法序列化,比如之前遇到过TreeMap无法Kryo序列化 (也可能是自己姿势不对),建议在开发Apache Flink作业时可以养成显式指定数据类型的好习惯。

三、开发实践

Flink代码作业

如何显式指定数据类型

这个简单了,几乎所有的source、Keyby、算子等都暴露了指定TypeInformation typeInfo的构造方法,以下简单列举几个:

  • source
@Experimental
public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo) {TypeInformation<OUT> resolvedTypeInfo = this.getTypeInfo(source, sourceName, Source.class, typeInfo);return new DataStreamSource(this, (Source)Preconditions.checkNotNull(source, "source"), (WatermarkStrategy)Preconditions.checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"), (TypeInformation)Preconditions.checkNotNull(resolvedTypeInfo), (String)Preconditions.checkNotNull(sourceName));
}
  • map
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
  • 自定义Operator
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
  • keyBy
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {Preconditions.checkNotNull(key);Preconditions.checkNotNull(keyType);return new KeyedStream(this, (KeySelector)this.clean(key), keyType);
}
  • 状态后端
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {super(name, typeInfo, (Object)null);
}

自定义数据类型&自定义序列化器

当遇到复杂数据类型,或者需要优化任务性能时,需要自定义数据类型,以下分享几种场景以及实现代码:

  • POJO类

例如大家最常用的POJO类,何为POJO类大家可以自行查询,Flink对POJO类做了大量的优化,大家使用Java对象最好满足POJO的规范。

举个例子,这是一个典型的POJO类:

@Data
public class BroadcastConfig implements Serializable {public String config_type;public String date;public String media_id;public String account_id;public String label_id;public long start_time;public long end_time;public int interval;public String msg;public BroadcastConfig() {}}

我们可以这样指定其数据类型,返回的数据类就是一个TypeInformation

HashMap<String, TypeInformation<?>> pojoFieldName = new HashMap<>();
pojoFieldName.put("config_type", Types.STRING);
pojoFieldName.put("date", Types.STRING);
pojoFieldName.put("media_id", Types.STRING);
pojoFieldName.put("account_id", Types.STRING);
pojoFieldName.put("label_id", Types.STRING);
pojoFieldName.put("start_time", Types.LONG);
pojoFieldName.put("end_time", Types.LONG);
pojoFieldName.put("interval", Types.INT);
pojoFieldName.put("msg", Types.STRING);return Types.POJO(BroadcastConfig.class,pojoFieldName
);

如感兴趣,可以看下org.apache.flink.api.java.typeutils.runtime.PojoSerializer,看Flink本身对其做了哪些优化。

  • 自定义TypeInformation

某些特殊场景可能还需要复杂的对象,例如,需要极致的性能优化,在Flink Table Api中数据对象传输,大部分都是BinaryRowdata,效率非常高。我们在Flink Datastram代码作业中也想使用,怎么操作呢?这里分享一种实现方式——自定义TypeInformation,当然还有更优的实现方式,这里就不介绍了。

代码实现:本质上就是继承TypeInformation,实现对应的方法。核心逻辑是createSerializer()方法,这里我们直接使用Table Api中已经实现的BinaryRowDataSerializer,就可以达到同Flink SQL相同的性能优化。

public  class BinaryRowDataTypeInfo extends TypeInformation<BinaryRowData> {private static final long serialVersionUID = 4786289562505208256L;private final int numFields;private final Class<BinaryRowData> clazz;private final TypeSerializer<BinaryRowData> serializer;public BinaryRowDataTypeInfo(int numFields) {this.numFields=numFields;this.clazz=BinaryRowData.class;serializer= new BinaryRowDataSerializer(numFields);}@Overridepublic boolean isBasicType() {return false;}@Overridepublic boolean isTupleType() {return false;}@Overridepublic int getArity() {return numFields;}@Overridepublic int getTotalFields() {return numFields;}@Overridepublic Class<BinaryRowData> getTypeClass() {return this.clazz;}@Overridepublic boolean isKeyType() {return false;}@Overridepublic TypeSerializer<BinaryRowData> createSerializer(ExecutionConfig config) {return serializer;}@Overridepublic String toString() {return "BinaryRowDataTypeInfo<" + clazz.getCanonicalName() + ">";}@Overridepublic boolean equals(Object obj) {if (obj instanceof BinaryRowDataTypeInfo) {BinaryRowDataTypeInfo that = (BinaryRowDataTypeInfo) obj;return that.canEqual(this)&& this.numFields==that.numFields;} else {return false;}}@Overridepublic int hashCode() {return Objects.hash(this.clazz,serializer.hashCode());}@Overridepublic boolean canEqual(Object obj) {return obj instanceof BinaryRowDataTypeInfo;}
}

所以这里建议Apache Flink代码作业开发可以尽可能使用已经优化好的数据类型,例如BinaryRowdata,可以用于高性能的数据处理场景,例如在内存中进行批处理或流式处理。由于数据以二进制形式存储,可以更有效地使用内存和进行数据序列化。同时,BinaryRowData还提供了一组方法,用于访问和操作二进制数据。

  • 自定义TypeSerializer

上面的例子只是自定义了TypeInformation,当然还会遇到自定义TypeSerializer的场景,例如Apache Flink本身没有封装的数据类型。

代码实现:这里以位图存储Roaring64Bitmap为例,在某些特殊场景可以使用bitmap精准去重,减少存储空间。

我们需要继承TypeSerializer,实现其核心逻辑也是serialize() 、deserialize() 方法,可以使用Roaring64Bitmap自带的序列化、反序列化方法。如果你使用的复杂对象没有提供序列化方法,你也可以自己实现或者找开源的序列化器。有了自定义的TypeSerializer就可以在你自定义的TypeInformation中调用。

public class Roaring64BitmapTypeSerializer extends TypeSerializer<Roaring64Bitmap> {/*** Sharable instance of the Roaring64BitmapTypeSerializer.*/public static final Roaring64BitmapTypeSerializer INSTANCE = new Roaring64BitmapTypeSerializer();private static final long serialVersionUID = -8544079063839253971L;@Overridepublic boolean isImmutableType() {return false;}@Overridepublic TypeSerializer<Roaring64Bitmap> duplicate() {return this;}@Overridepublic Roaring64Bitmap createInstance() {return new Roaring64Bitmap();}@Overridepublic Roaring64Bitmap copy(Roaring64Bitmap from) {Roaring64Bitmap copiedMap = new Roaring64Bitmap();from.forEach(copiedMap::addLong);return copiedMap;}@Overridepublic Roaring64Bitmap copy(Roaring64Bitmap from, Roaring64Bitmap reuse) {from.forEach(reuse::addLong);return reuse;}@Overridepublic int getLength() {return -1;}@Overridepublic void serialize(Roaring64Bitmap record, DataOutputView target) throws IOException {record.serialize(target);}@Overridepublic Roaring64Bitmap deserialize(DataInputView source) throws IOException {Roaring64Bitmap navigableMap = new Roaring64Bitmap();navigableMap.deserialize(source);return navigableMap;}@Overridepublic Roaring64Bitmap deserialize(Roaring64Bitmap reuse, DataInputView source) throws IOException {reuse.deserialize(source);return reuse;}@Overridepublic void copy(DataInputView source, DataOutputView target) throws IOException {Roaring64Bitmap deserialize = this.deserialize(source);copy(deserialize);}@Overridepublic boolean equals(Object obj) {if (obj == this) {return true;} else if (obj != null && obj.getClass() == Roaring64BitmapTypeSerializer.class) {return true;} else {return false;}}@Overridepublic int hashCode() {return this.getClass().hashCode();}@Overridepublic TypeSerializerSnapshot<Roaring64Bitmap> snapshotConfiguration() {return new Roaring64BitmapTypeSerializer.Roaring64BitmapSerializerSnapshot();}public static final class Roaring64BitmapSerializerSnapshotextends SimpleTypeSerializerSnapshot<Roaring64Bitmap> {public Roaring64BitmapSerializerSnapshot() {super(() -> Roaring64BitmapTypeSerializer.INSTANCE);}}
}

Flink SQL自定义函数

如何显式指定数据类型

这里简单分享下,在自定义Function开发下遇到复杂数据类型无法在accumulator 或者input、output中使用的问题,这里我们只介绍使用复杂数据对象如何指定数据类型的场景。

我们可以先看下FunctionDefinitionConvertRule,这是Apache Flink中的一个规则(Rule),用于将用户自定义的函数定义转换为对应的实现。其中通过getTypeInference()方法返回用于执行对此函数定义的调用的类型推理的逻辑。

@Override
public Optional<RexNode> convert(CallExpression call, ConvertContext context) {FunctionDefinition functionDefinition = call.getFunctionDefinition();// built-in functions without implementation are handled separatelyif (functionDefinition instanceof BuiltInFunctionDefinition) {final BuiltInFunctionDefinition builtInFunction =(BuiltInFunctionDefinition) functionDefinition;if (!builtInFunction.getRuntimeClass().isPresent()) {return Optional.empty();}}TypeInference typeInference =functionDefinition.getTypeInference(context.getDataTypeFactory());if (typeInference.getOutputTypeStrategy() == TypeStrategies.MISSING) {return Optional.empty();}switch (functionDefinition.getKind()) {case SCALAR:case TABLE:List<RexNode> args =call.getChildren().stream().map(context::toRexNode).collect(Collectors.toList());final BridgingSqlFunction sqlFunction =BridgingSqlFunction.of(context.getDataTypeFactory(),context.getTypeFactory(),SqlKind.OTHER_FUNCTION,call.getFunctionIdentifier().orElse(null),functionDefinition,typeInference);return Optional.of(context.getRelBuilder().call(sqlFunction, args));default:return Optional.empty();}
}

那我们指定复杂类型也会从通过该方法实现,不多说了,直接上代码实现。

  • 指定accumulatorType

这是之前写的AbstractLastValueWithRetractAggFunction功能主要是为了实现具有local-global的逻辑的LastValue,提升作业性能。

accumulator对象:LastValueWithRetractAccumulator,可以看到该对象是一个非常复杂的对象,包含5个属性,还有List 复杂嵌套,以及MapView等可以操作状态后端的对象,甚至有Object这种通用的对象。

public static class LastValueWithRetractAccumulator {public Object lastValue = null;public Long lastOrder = null;public List<Tuple2<Object, Long>> retractList = new ArrayList<>();public MapView<Object, List<Long>> valueToOrderMap = new MapView<>();public MapView<Long, List<Object>> orderToValueMap = new MapView<>();@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (!(o instanceof LastValueWithRetractAccumulator)) {return false;}LastValueWithRetractAccumulator that = (LastValueWithRetractAccumulator) o;return Objects.equals(lastValue, that.lastValue)&& Objects.equals(lastOrder, that.lastOrder)&& Objects.equals(retractList, that.retractList)&& valueToOrderMap.equals(that.valueToOrderMap)&& orderToValueMap.equals(that.orderToValueMap);}@Overridepublic int hashCode() {return Objects.hash(lastValue, lastOrder, valueToOrderMap, orderToValueMap, retractList);}}

getTypeInference() 是FunctionDefinition接口的方法,而所有的用户自定义函数都实现了该接口,我们只需要重新实现下该方法就可以,以下是代码实现。

这里我们还需要用到工具类TypeInference,这是Flink中的一个模块,用于进行类型推断和类型推理。

可以看出我们在accumulatorTypeStrategy方法中传入了一个构建好的TypeStrategy;这里我们将LastValueWithRetractAccumulator定义为了一个STRUCTURED,不同的属性定义为具体的数据类型,DataTypes工具类提供了很多丰富的对象形式,还有万能的RAW类型。

public TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder().accumulatorTypeStrategy(callContext -> {List<DataType> dataTypes = callContext.getArgumentDataTypes();DataType argDataType;if (dataTypes.get(0).getLogicalType().getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING)) {argDataType = DataTypes.STRING();} elseargDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));DataType accDataType = DataTypes.STRUCTURED(LastValueWithRetractAccumulator.class,DataTypes.FIELD("lastValue", argDataType.nullable()),DataTypes.FIELD("lastOrder", DataTypes.BIGINT()),DataTypes.FIELD("retractList", DataTypes.ARRAY(DataTypes.STRUCTURED(Tuple2.class,DataTypes.FIELD("f0", argDataType.nullable()),DataTypes.FIELD("f1", DataTypes.BIGINT()))).bridgedTo(List.class)),DataTypes.FIELD("valueToOrderMap",MapView.newMapViewDataType(argDataType.nullable(),DataTypes.ARRAY(DataTypes.BIGINT()).bridgedTo(List.class))),//todo:blink 使用SortedMapView 优化性能,开源使用MapView key天然字典升序,倒序遍历性能可能不佳DataTypes.FIELD("orderToValueMap",MapView.newMapViewDataType(DataTypes.BIGINT(),DataTypes.ARRAY(argDataType.nullable()).bridgedTo(List.class))));return Optional.of(accDataType);}).build();
}
  • 指定outputType

这个也很简单,直接上代码实现,主要就是outputTypeStrategy中传入需要输出的数据类型即可。

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder().outputTypeStrategy(callContext -> {List<DataType> dataTypes = callContext.getArgumentDataTypes();DataType argDataType;if (dataTypes.get(0).getLogicalType().getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING)) {argDataType = DataTypes.STRING();} elseargDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));return Optional.of(argDataType);}).build();
}
  • 指定intputType

在此就不做介绍了,同以上类似,在inputTypeStrategy方法传入定义好的TypeStrategy就好。

  • 根据inputType动态调整outType或者accumulatorType

在某些场景下,我们需要让函数功能性更强,比如当我输入是bigint类型的时候,我输出bigint类型等,类似的逻辑。

大家可以发现outputTypeStrategy或者 accumulatorTypeStrategy的入参都是 实现了 TypeStrategy接口的对象,并且需要实现inferType方法。在Flink框架调用该方法的时候会传入一个上下文对象CallContext,提供了获取函数入参类型的api getArgumentDataTypes();

代码实现:这里的逻辑是将获取到的第一个入参对象的类型指定为输出对象的类型。

.outputTypeStrategy(callContext -> {List<DataType> dataTypes = callContext.getArgumentDataTypes();DataType argDataType;if (dataTypes.get(0).getLogicalType().getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING)) {argDataType = DataTypes.STRING();} elseargDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));return Optional.of(argDataType);
}

自定义DataType

可以发现以上分享几乎都是使用的DataTypes封装好的类型,比如DataTypes.STRING()、DataTypes.Long()等。那如果我们需要封装一些其他对象如何操作呢?上文提到DataTypes提供了一个自定义任意类型的方法。

/*** Data type of an arbitrary serialized type. This type is a black box within the table* ecosystem and is only deserialized at the edges.** <p>The raw type is an extension to the SQL standard.** <p>This method assumes that a {@link TypeSerializer} instance is present. Use {@link* #RAW(Class)} for automatically generating a serializer.** @param clazz originating value class* @param serializer type serializer* @see RawType*/
public static <T> DataType RAW(Class<T> clazz, TypeSerializer<T> serializer) {return new AtomicDataType(new RawType<>(clazz, serializer));
}

我们有这样的一个场景,需要在自定义的函数中使用bitmap计算UV值,需要定义Roaring64Bitmap为accumulatorType,直接上代码实现。

这里的Roaring64BitmapTypeSerializer已经在《自定义TypeSerializer》小段中实现,有兴趣的同学可以往上翻翻。

public TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder().accumulatorTypeStrategy(callContext -> {DataType type = DataTypes.RAW(Roaring64Bitmap.class,Roaring64BitmapTypeSerializer.INSTANCE);return Optional.of(type);}).outputTypeStrategy(callContext -> Optional.of(DataTypes.BIGINT())).build();
}

四、结语

本文主要简单分享了一些自身对Flink类型及序列化的认识和应用实践,能力有限,不足之处欢迎指正。

引用:
https://nightlies.apache.org/flink/flink-docs-release-1.13/

*文/ 木木

本文属得物技术原创,更多精彩文章请看:得物技术

未经得物技术许可严禁转载,否则依法追究法律责任!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/35050.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vscode使用Black Formatter以及Flake8实现代码格式化

vscode使用Black Formatter以及Flake8实现代码格式化&#xff08;实现和pycharm相似的效果&#xff09; 简单介绍一下这两个插件的功能&#xff0c;flake8可以实现对python代码风格的检测&#xff0c;对空格换行等内容都会有提示。 Black Formatter则可以实现在保存时&#x…

kafka 消费者 API 使用总结

前言 应用程序使用KafkaConsumer向Kafka订阅主题&#xff0c;并从订阅的主题中接收消息。不同于从其他消息系统读取数据&#xff0c;从Kafka读取数据涉及一些独特的概念和想法。如果不先理解这些概念&#xff0c;则难以理解如何使用消费者API。本文将先解释这些重要的概念&…

【乐吾乐2D可视化组态编辑器】文件

1 文件 文件&#xff1a;文件的新建、打开、导入、保存、另存为、下载JOSN文件、下载ZIP打包文件、导出为HTML、导出为Vue2组件、导出为Vue3组件、导出为React组件&#xff08;老版将不再维护&#xff09;、下载为PNG、下载为SVG 乐吾乐2D可视化组态编辑器demo&#xff1a;ht…

Elasticsearch 聚合查询

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习…

一、安装VMware16

本篇来源&#xff1a;山海同行 本篇地址&#xff1a;https://shanhaigo.cn/courseDetail/1805875642621952000 本篇资源&#xff1a;以整理到-山海同行 一、VMware虚拟机下载 1. 官网下载 1. 打开官网 打开VMware官网地址&#xff1a;https://www.vmware.com/ 2. 选择下载产…

【研究】两千亿高碳投资:头部资管气候行动观察

在气候语境中&#xff0c;“棕”与“绿”是一组对立色&#xff0c;前者被用来描述与低碳理念不符的行动。近日资管机构的“含棕量”受到了气候倡议者的检视。 正文 国际环保机构绿色和平在与海南成美慈善基金会联合发布《中国资产管理机构气候表现研究报告》&#xff08;以下简…

Pytorch实战(一):LeNet神经网络

文章目录 一、模型实现1.1数据集的下载1.2加载数据集1.3模型训练1.4模型预测 LeNet神经网络是第一个卷积神经网络&#xff08;CNN&#xff09;&#xff0c;首次采用了卷积层、池化层这两个全新的神经网络组件&#xff0c;接收灰度图像&#xff0c;并输出其中包含的手写数字&…

告别模糊时代,扫描全能王带来清晰世界

模糊碑文引发的思考 上个月中旬去洛阳拜访了著名的龙门石窟&#xff0c;本就对碑文和文字图画感兴趣的我们&#xff0c;准备好好欣赏一下龙门石窟的历史文化古迹。到了地方之后&#xff0c;我发现石窟的高度和宽度远远超出了想象&#xff0c;正因如此&#xff0c;拍出来的文字…

NewspaceGPT带你玩系列之美人鱼图表

这里写目录标题 注册一个账号&#xff0c;用qq邮箱&#xff0c;然后登录选一个可用的Plus&#xff0c;不要选3.5探索GPT今天的主角是开始寻梦美人鱼图表我选第一个试一下问&#xff1a;重新回答上面的问题&#xff0c;一切都用汉语重新生成一个流程图&#xff1a;生成一个网站登…

OpenAI“跌倒”,国产大模型“吃饱”?

大数据产业创新服务媒体 ——聚焦数据 改变商业 在AI的世界里&#xff0c;OpenAI就像是一位高高在上的霸主&#xff0c;它的一举一动&#xff0c;都能引发行业里的地震。然而&#xff0c;就在不久前&#xff0c;这位霸主突然宣布了一个决定&#xff0c;自7月9日起&#xff0c;…

2024热门骨传导蓝牙耳机怎么选?超全的选购攻略附带好物推荐!

对于很多喜欢运动健身的小伙伴&#xff0c;在现在市面上这么多种类耳机的选择上&#xff0c;对于我来说的话还是很推荐大家去选择骨传导运动耳机的&#xff0c;相较于普通的入耳式蓝牙耳机&#xff0c;骨传导耳机是通过振动来传输声音的&#xff0c;而入耳式耳机则是通过空气传…

以Bert训练为例,测试torch不同的运行方式,并用torch.profile+HolisticTraceAnalysis分析性能瓶颈

以Bert训练为例,测试torch不同的运行方式,并用torch.profileHolisticTraceAnalysis分析性能瓶颈 1.参考链接:2.性能对比3.相关依赖或命令4.测试代码5.HolisticTraceAnalysis代码6.可视化A.优化前B.优化后 以Bert训练为例,测试torch不同的运行方式,并用torch.profileHolisticTra…

正则表达式阅读理解

这段正则表达式可以匹配什么呢&#xff1f; ((max|min)\\s*\\([^\\)]*(,[^\\)]*)*\\)|[a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][a-zA-Z0-9]*)?(\\*||%)?|[0-9](\\.[0-9])?|\\([^\\)]*(,[^\\)]*)*\\))(\\s*[-*/%]\\s*([a-zA-Z][a-zA-Z0-9]*(_[a-zA-Z][a-zA-Z0-9]*)?(\\*||%)?|[0-…

Charls数据库+预测模型发二区top | CHARLS等七大老年公共数据库周报(6.19)

七大老年公共数据库 七大老年公共数据库共涵盖33个国家的数据&#xff0c;包括&#xff1a;美国健康与退休研究 (Health and Retirement Study, HRS)&#xff1b;英国老龄化纵向研究 &#xff08;English Longitudinal Study of Ageing, ELSA&#xff09;&#xff1b;欧洲健康、…

HashMap第5讲——resize方法扩容源码分析及细节

put方法的源码和相关的细节已经介绍完了&#xff0c;下面我们进入扩容功能的讲解。 一、为什么需要扩容 这个也比较好理解。假设现在HashMap里的元素已经很多了&#xff0c;但是链化比较严重&#xff0c;即便树化了&#xff0c;查询效率也是O(logN)&#xff0c;肯定没有O(1)好…

IDEA注释快只有一行时不分行的设置

在编写注释时&#xff0c;有时使用注释块来标注一个变量或者一段代码时&#xff0c;为了节约空间&#xff0c;希望只在一行中显示注释快。只需要按照下图将“一行注释不分行”勾选上即可。

M Farm RPG Assets Pack(农场RPG资源包)

🌟塞尔达的开场动画:风鱼之歌风格!🌟 像素参考:20*20 字体和声音不包括在内 资产包括: 1名身体部位分离的玩家和4个方向动画: 闲逛|散步|跑步|持有物品|使用工具|拉起|浇水 6个带有4个方向动画的工具 斧头|镐|喙|锄头|水壶|篮子 4个NPC,有4个方向动画: 闲逛|散步 �…

LSH算法:高效相似性搜索的原理与Python实现II

局部敏感哈希&#xff08;LSH&#xff09;是一种高效的近似相似性搜索技术&#xff0c;广泛应用于需要处理大规模数据集的场景。在当今数据驱动的世界中&#xff0c;高效的相似性搜索算法对于维持业务运营至关重要&#xff0c;它们是许多顶尖公司技术堆栈的核心。 相似性搜索面…

去掉window11设备和驱动器中的百度网盘图标

背景 window系统设备驱动器中显示百度网盘图标&#xff0c;个人强迫症&#xff0c;要去掉&#xff01;&#xff01;&#xff01; 去掉window11->设备和驱动器->百度网盘 的图标 登录百度网盘点击”同步“ 点击设置 在基本设置里面去掉勾选“在我的电脑中显示百度网盘…

麒麟桌面操作系统上使用命令行添加软件图标到任务栏

原文链接&#xff1a;麒麟桌面操作系统上使用命令行添加软件图标到任务栏 Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇在麒麟桌面操作系统上使用命令行添加软件图标到任务栏的文章。通过命令行添加软件图标到任务栏&#xff0c;可以快速、便捷地将常用的软件固定…