文章目录
- 一. RecordWriter封装数据并发送到网络
- 1. 数据发送到网络的具体流程
- 2. 源码层面
- 2.1. Serializer的实现逻辑
- a. SpanningRecordSerializer的实现
- b. SpanningRecordSerializer中如何对数据元素进行序列化
- 2.2. 将ByteBuffer中间数据写入BufferBuilder
- 二. BufferBuilder申请资源并创建
- 1. ChannelSelectorRecordWriter创建BufferBuilder
- 2. BroadcastRecordWriter创建BufferBuilder
一. RecordWriter封装数据并发送到网络
1. 数据发送到网络的具体流程
RecordWriter对接入的StreamRecord数据进行序列化并等待下游任务消费的过程,整个过程细节如下。
StreamRecord通过RecordWriterOutput写入RecordWriter,并在RecordWriter中通过RecordSerializer组件将StreamRecord序列化为ByteBuffer数据格式。
RecordWriter
向ResultPartition申请BufferBuilder对象
,用于构建BufferConsumer对象,将序列化后的二进制数据存储在申请到的Buffer中。ResultPartition会向LocalBufferPool申请MemorySegment内存块,用于存储Buffer数据
。BufferBuilder中会不断接入ByteBuffer数据,
直到将BufferBuilder中的Buffer空间占满
,此时会申请新的BufferBuilder继续构建BufferConsumer数据集。Buffer构建完成后,会调用flushTargetPartition()方法,
让ResultPartition向下游输出数据
,此时会通知NetworkSequenceViewReader组件开始消费ResultSubPartition中的BufferConsumer对象。当BufferConsumer中Buffer数据被
推送到网络后
,回收BufferConsumer中的MemorySegment内存空间,继续用于后续的消息处理。
2. 源码层面
接下来我们从源码的角度了解RecordWriter具体处理数据的逻辑。在RecordWriterOutput中调用pushToRecordWriter方法将数据写出。
通过recordWriter.emit(serializationDelegate)方法,将数据元素发送到RecordWriter中进行处理。主要逻辑如下
- 序列化数据为ByteBuffer二进制数据,并缓存在SpanningRecordSerializer.serializationBuffer对象中。
- 将序列化器生成的中间数据复制到指定分区中,实际上就是将ByteBuffer数据复制到BufferBuiler对象中。
- 如果BufferBuiler中存储了完整的数据元素,就会清空序列化器的中间数据,因为序列化器中累积的数据不宜过大。
protected void emit(T record, int targetSubpartition) throws IOException { checkErroneous(); targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); if (flushAlways) { targetPartition.flush(targetSubpartition); }
}protected void emit(T record, int targetChannel) throws IOException, InterruptedException {checkErroneous();// 数据序列化serializer.serializeRecord(record);// 将序列化器中的数据复制到指定分区中if (copyFromSerializerToTargetChannel(targetChannel)) {// 清空序列化器serializer.prune();}
}
2.1. Serializer的实现逻辑
接着了解如何将序列化器中的数据转换成Buffer并存储到ResultPartiton中,最终将数据发送到下游。
a. SpanningRecordSerializer的实现
SpanningRecordSerializer实现将序列化后的BytesBuffer数据写入BufferBuilder。
SpanningRecordSerializer对象主要包含了DataOutputSerializer serializationBuffer和ByteBuffer dataBuffer两个成员变量。
- DataOutputSerializer可以将数据转换成二进制格式并存储在byte[]数组中。在serialization中会调用serializationBuffer.wrapAsByteBuffer()方法,将serializationBuffer中生成的byte[]数组转换成ByteBuffer数据结构,并赋值给dataBuffer对象。
- ByteBuffer是Java NIO中用于对二进制数据进行操作的Buffer接口,底层有DirectByteBuffer和HeapByteBuffer等实现,通过ByteBuffer提供的方法,可以轻松实现对二进制数据的操作。
b. SpanningRecordSerializer中如何对数据元素进行序列化
SpanningRecordSerializer.serializeRecord()方法主要逻辑如下。
1)清理serializationBuffer的中间数据,实际上就是将byte[]数组的position参数置为0。
2)设定serialization buffer的初始容量,默认不小于4。
3)将数据元素写入serializationBuffer的bytes[]数组。(所有数据元素都实现了IOReadableWritable接口,可以直接将数据对象转换为二进制格式)
4)获取serializationBuffer的长度信息,并写入serializationBuffer。
5)将serializationBuffer中的byte[]数据封装为java.io.ByteBuffer数据结构,最终赋值到dataBuffer的中间结果中。
public void serializeRecord(T record) throws IOException {if (CHECKED) {if (dataBuffer.hasRemaining()) {throw new IllegalStateException("Pending serialization of previous record.");}}// 首先清理serializationBuffer中的数据serializationBuffer.clear();// 设定serialization buffer数量serializationBuffer.skipBytesToWrite(4);// 将record数据写入serializationBufferrecord.write(serializationBuffer);// 获取serializationBuffer的长度信息并记录到serializationBuffer对象中int len = serializationBuffer.length() - 4;serializationBuffer.setPosition(0);serializationBuffer.writeInt(len);serializationBuffer.skipBytesToWrite(len);// 对serializationBuffer进行wrapp处理,转换成ByteBuffer数据结构dataBuffer = serializationBuffer.wrapAsByteBuffer();
}
Flink 1.12版本中RecordWriter就提供了serializeRecord的能力,没有单拎出来实现。
2.2. 将ByteBuffer中间数据写入BufferBuilder
首先BufferBuilder用于构建完整的Buffer数据。在copyFromSerializerToTargetChannel()方法中实现了将RecordSerializer中的ByteBuffer中间数据写入BufferBuilder的逻辑:
- 对序列化器进行Reset操作,重置初始化位置。
- 将序列化器的ByteBuffer中间数据写入BufferBuilder。
- 判断当前BufferBuilder是否构建了完整的Buffer数据,完成BufferBuilder中Buffer的构建。
- 判断SerializationResult中是否具有完整的数据元素,如果是则将pruneTriggered置为True,然后清空当前的BufferBuilder,并跳出循环。
- 创建新的bufferBuilder,继续从序列化器中将中间数据复制到BufferBuilder中。
- 指定flushAlways参数为True,
调用flushTargetPartition()方法将数据写入ResultPartition
。为防止过度频繁地将数据写入ResultPartiton,在RecordWriter中会有独立的outputFlusher线程
(在构造器中),周期性地将构建出来的Buffer数据推送到ResultPartiton本地队列中存储,默认延迟为100ms。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {// 对序列化器进行Reset操作,初始化initial positionserializer.reset();// 创建BufferBuilderboolean pruneTriggered = false;BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);// 调用序列化器将数据写入bufferBuilderSerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);// 如果SerializationResult是完整Bufferwhile (result.isFullBuffer()) {// 则完成创建Buffer数据的操作finishBufferBuilder(bufferBuilder);// 如果是完整记录,则将pruneTriggered置为Trueif (result.isFullRecord()) {pruneTriggered = true;emptyCurrentBufferBuilder(targetChannel);break;}// 创建新的bufferBuilder,继续复制序列化器中的数据到BufferBuilder中bufferBuilder = requestNewBufferBuilder(targetChannel);result = serializer.copyToBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");// 如果指定的flushAlways,则直接调用flushTargetPartition将数据写入ResultPartitionif (flushAlways) {flushTargetPartition(targetChannel);}return pruneTriggered;
}
二. BufferBuilder申请资源并创建
1. ChannelSelectorRecordWriter创建BufferBuilder
在ChannelSelectorRecordWriter.getBufferBuilder()方法中定义了BufferBuilder的创建过程。
//1. targetChannel确认数据写入的分区,ID与下游InputGate中的InputChannelID是对应的
//2.
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {//在ChannelSelectorRecordWriter中维护了//bufferBuilders[]数组,用于存储创建好的BufferBuilder对象if (bufferBuilders[targetChannel] != null) {return bufferBuilders[targetChannel];} else {//只有在无法从bufferBuilders[]中获取BufferBuilder时,//才会调用requestNewBufferBuilder()方法创建新的BufferBuilder对象。return requestNewBufferBuilder(targetChannel);}
}
requestNewBufferBuilder()方法逻辑如下
- 检查bufferBuilders[]的状态,确保bufferBuilders[targetChannel]为空或者bufferBuilders[targetChannel].isFinished()方法返回值为True。
- 调用targetPartition.getBufferBuilder()方法获取新的BufferBuilder,这里的targetPartition就是前面提到的ResultPartition。
在ResultPartition中会向LocalBufferPool申请Buffer内存空间,用于存储序列化后的ByteBuffer数据。
- 向targetPartition添加通过bufferBuilder构建的BufferConsumer对象,bufferBuilder和BufferConsumer内部维护了同一个Buffer数据。BufferConsumer会被存储到ResultSubpartition的BufferConsumer队列中。
- 将创建好的bufferBuilder添加至数组,用于下次直接获取和构建BufferConsumer对象。
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());// 调用targetPartition获取BufferBuilderBufferBuilder bufferBuilder = targetPartition.getBufferBuilder();// 向targetPartition中添加BufferConsumertargetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),targetChannel);// 将创建好的bufferBuilder添加至数组bufferBuilders[targetChannel] = bufferBuilder;return bufferBuilder;
}
2. BroadcastRecordWriter创建BufferBuilder
在BroadcastRecordWriter内部创建BufferBuilder的过程中,会将创建的bufferConsumer对象添加到所有的ResultSubPartition中,实现将Buffer数据下发至所有InputChannel,如下代码:
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilder == null || bufferBuilder.isFinished());BufferBuilder builder = targetPartition.getBufferBuilder();if (randomTriggered) {targetPartition.addBufferConsumer(builder.createBufferConsumer(), targetChannel);} else {try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {for (int channel = 0; channel < numberOfChannels; channel++) {targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);}}}bufferBuilder = builder;return builder;
}
以上步骤就是在RecordWriter组件中将数据元素序列化成二进制格式,然后通过BufferBuilder构建成Buffer类型数据,最终存储在ResultPartition的ResultSubPartition中。
这是从Task的层面了解数据网络传输过程,下篇了解在TaskManager中如何构建底层的网络传输通道。