Flink的 RecordWriter 数据通道 详解

        本文从基础原理到代码层面逐步解释 Flink 的RecordWriter 数据通道,尽量让初学者也能理解。


1. 什么是 RecordWriter

通俗理解

        RecordWriter 是 Flink 中负责将数据从一个任务(Task)发送到下游任务的组件。想象一下,Flink 是一个巨大的工厂,数据像流水线上的包裹,RecordWriter 就是负责把包裹打包、贴上地址标签,然后通过“传送带”送到下一个站点的工人。

        在 Flink 的分布式计算中,数据处理分为多个并行任务(Task),每个任务可能需要把自己的处理结果发送给其他任务(比如下游的计算节点)。RecordWriter 的作用是:

  • 序列化数据:把数据变成可以在网络上传输的字节流。
  • 分配数据:决定数据应该发送到哪个下游任务(基于分区策略,比如 keyBy)。
  • 发送数据:通过底层的网络通道(比如 Netty)把数据传出去。

官方定义

        根据 Flink 官方文档,RecordWriter 是 Flink 数据流(DataStream)处理中用于将记录(Record)写入到输出通道的核心组件。它是 Flink 运行时(Runtime)层的一部分,位于任务的输出端,负责将上游算子处理后的数据发送到下游算子的输入端。


2. RecordWriter 的工作原理(宏观视角)

        为了让非专业人士理解,我们先从高层次看 RecordWriter 的工作流程,之后再深入到代码和底层细节。

工作流程(类比快递分拣)

  1. 接收包裹(数据记录)RecordWriter 从上游算子(比如 Map 或 Filter)接收到一条数据记录(Record),就像快递员拿到一个包裹。
  2. 贴标签(分区决策):根据用户定义的分区策略(比如 keyBy 或 broadcast),RecordWriter 决定这个包裹要送到哪个下游站点(下游子任务)。
  3. 打包(序列化):包裹不能直接扔到传送带上,RecordWriter 会把数据“打包”成字节流(序列化),方便在网络上传输。
  4. 选择传送带(通道选择):Flink 的任务之间通过逻辑通道(Channel)连接,RecordWriter 选择合适的通道(对应下游的子任务)。
  5. 送上传送带(发送数据)RecordWriter 把打包好的数据通过底层的网络栈(Netty)发送到下游任务。

核心问题

  • 如何确保数据高效传输? Flink 使用缓冲区(Buffer)管理数据,避免频繁的网络调用。
  • 如何保证数据顺序或分区正确? 依赖分区器(Partitioner)和通道选择器(ChannelSelector)。
  • 如何处理分布式环境中的复杂性? Flink 的运行时通过 ResultPartition 和 RecordWriter 抽象化网络通信。

3. 深入 RecordWriter 的源码实现

        现在我们结合 Flink 源码(基于 1.17 版本),从底层逐步分析 RecordWriter 的实现。我会用注释和伪代码的方式解释关键部分,并尽量用类比让逻辑清晰。

3.1 RecordWriter 的类结构

        RecordWriter 的核心代码位于 org.apache.flink.runtime.io.network.api.writer 包中。主要类是 RecordWriter,它是一个抽象类,实际使用的是其子类,比如 RecordWriterDelegate 或 ChannelSelectorRecordWriter

public abstract class RecordWriter<T> {protected final ResultPartitionWriter partitionWriter; // 输出分区protected final int numberOfChannels; // 下游通道数量protected final Random random; // 用于随机分区protected RecordWriter(ResultPartitionWriter writer) {this.partitionWriter = writer;this.numberOfChannels = writer.getNumberOfSubpartitions();this.random = new Random();}// 核心方法:发送一条记录public abstract void emit(T record) throws IOException, InterruptedException;
}
  • ResultPartitionWriterRecordWriter 依赖的分区写入器,负责管理输出缓冲区和实际的网络发送。
  • numberOfChannels:下游子任务的数量,决定了数据可以发送到多少个通道。
  • emit:核心方法,负责将一条记录发送出去。

3.2 数据发送的核心流程(emit 方法)

        emit 方法是 RecordWriter 的核心入口,我们以 ChannelSelectorRecordWriter(支持自定义分区策略的实现)为例,逐步分析其实现。

源码分析(简化和注释)

以下是 ChannelSelectorRecordWriter 的 emit 方法的核心逻辑(简化版,带详细注释):

public class ChannelSelectorRecordWriter<T> extends RecordWriter<T> {private final ChannelSelector<T> channelSelector; // 通道选择器(决定分区)private final SerializationDelegate<T> serializationDelegate; // 序列化代理public ChannelSelectorRecordWriter(ResultPartitionWriter writer,ChannelSelector<T> channelSelector,SerializationDelegate<T> serializationDelegate) {super(writer);this.channelSelector = channelSelector;this.serializationDelegate = serializationDelegate;}@Overridepublic void emit(T record) throws IOException, InterruptedException {// 1. 设置待序列化的记录serializationDelegate.setInstance(record);// 2. 使用通道选择器决定目标通道int channelIndex = channelSelector.selectChannel(record);// 3. 将记录写入目标通道的缓冲区partitionWriter.emitRecord(serializationDelegate.getSerializedData(), // 序列化后的数据channelIndex // 目标通道索引);}
}
步骤拆解与类比
  1. 设置记录(serializationDelegate.setInstance)

    • 类比:快递员拿到包裹,先登记包裹内容。
    • 原理serializationDelegate 是一个序列化代理,负责将用户的数据(比如 Java 对象)变成字节流。Flink 使用 SerializationDelegate 包装用户记录,延迟实际序列化操作,以提高性能。
    • 源码细节serializationDelegate.setInstance(record) 只是简单地将记录存储到代理对象中,实际序列化发生在后续的 getSerializedData 调用时。
  2. 选择通道(channelSelector.selectChannel)

    • 类比:快递员根据包裹上的地址标签,决定送到哪个分拣中心。
    • 原理ChannelSelector 是 Flink 提供的分区逻辑接口,用户可以通过 keyBybroadcast 等算子自定义分区策略。selectChannel 方法返回一个整数(channelIndex),表示数据应该发送到哪个下游子任务。
    • 常见实现
      • KeyGroupStreamPartitioner:基于 Key 的哈希分区(keyBy)。
      • BroadcastPartitioner:将数据广播到所有下游子任务。
      • ForwardPartitioner:直接发送到对应的下游任务(一对一)。
    • 推导
      • 假设用户定义了 keyBy(x -> x.getId())ChannelSelector 会提取记录的 id 字段,计算哈希值(比如 id.hashCode()),然后通过取模(hash % numberOfChannels)决定目标通道。
      • 公式:channelIndex=hash(key)mod  numberOfChannels
      • 这确保相同 key 的记录总是发送到同一个下游任务,满足 keyBy 的语义。
  3. 写入缓冲区(partitionWriter.emitRecord)

    • 类比:快递员把包裹装进集装箱(缓冲区),等待卡车运走。
    • 原理ResultPartitionWriter 是 Flink 运行时中管理输出分区的组件。emitRecord 方法将序列化后的数据写入目标通道的缓冲区(Buffer)。Flink 使用内存池(MemoryPool)管理缓冲区,避免频繁分配内存。
    • 源码细节
      public void emitRecord(BufferBuilder bufferBuilder, int targetSubpartition)throws IOException, InterruptedException {// 将序列化数据写入 BufferBuilderBufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();// 添加到目标子分区的队列addBufferConsumer(bufferConsumer, targetSubpartition);
      }
      
      • BufferBuilder:用于构建缓冲区,负责将数据写入内存。
      • BufferConsumer:表示一个可消费的缓冲区,供下游任务读取。
      • addBufferConsumer:将缓冲区加入目标子分区的队列,等待网络层发送。

3.3 序列化与缓冲区管理

序列化和缓冲区是 RecordWriter 性能的关键。

  • 序列化

    • Flink 使用 TypeSerializer(用户定义或自动推导)将数据对象转为字节流。
    • 类比:把包裹的内容拍成照片(字节流),方便通过网络传输。
    • 源码:SerializationDelegate.getSerializedData 调用 TypeSerializer.serialize
      public class SerializationDelegate<T> {private T instance;private final TypeSerializer<T> serializer;public StreamElement getSerializedData() throws IOException {// 使用序列化器将 instance 转为字节流return serializer.serialize(instance);}
      }
      
  • 缓冲区管理

    • Flink 的缓冲区基于 NetworkBufferPool,每个缓冲区是一个固定大小的内存块(默认 32KB)。
    • 类比:快递员把多个小包裹装进一个大集装箱,避免频繁调用卡车。
    • BufferBuilder 动态分配缓冲区,当缓冲区满时,会触发 BufferConsumer 的创建,并交给 ResultPartitionWriter

3.4 网络传输

  • 底层实现RecordWriter 不直接处理网络传输,而是通过 ResultPartitionWriter 将缓冲区交给 Flink 的网络栈(基于 Netty)。
  • 类比:集装箱装满后,卡车(Netty)把数据送到下游站点。
  • 原理
    • ResultPartitionWriter 将缓冲区写入 PipelinableSubpartition 的队列。
    • Flink 的网络层定期检查队列,使用 Netty 的 Channel 将数据发送到下游 TaskManager。
    • Netty 使用 TCP 协议,确保数据可靠传输。

4. 完整步骤总结(带推导)

        为了让初学者彻底理解,我将 RecordWriter 的工作流程总结为以下步骤,并为每一步提供通俗解释和公式推导(如果适用)。

  1. 接收数据记录

    • 描述:上游算子调用 RecordWriter.emit(record),传入一条数据。
    • 类比:快递员收到一个包裹。
    • 推导:无复杂计算,只是将 record 传递给 serializationDelegate
  2. 选择目标通道

    • 描述:ChannelSelector.selectChannel(record) 返回目标通道索引。
    • 类比:快递员看包裹地址,决定送到哪个分拣中心。
    • 推导:
      • 对于 keyBy 分区:
        • 提取 key:key=keySelector(record)
        • 计算哈希:hash=key.hashCode()
        • 选择通道:channelIndex=hashmod  numberOfChannels
      • 对于广播分区:返回所有通道索引。
      • 公式:channelIndex=f(record,numberOfChannels)
  3. 序列化数据

    • 描述:serializationDelegate.getSerializedData() 将记录转为字节流。
    • 类比:把包裹内容压缩成数字信号。
    • 推导:序列化过程依赖 TypeSerializer,复杂度为 O(size of record)。
  4. 写入缓冲区

    • 描述:partitionWriter.emitRecord 将字节流写入目标通道的缓冲区。
    • 类比:把包裹装进集装箱。
    • 推导:
      • 缓冲区大小固定(默认 32KB)。
      • 如果缓冲区满,触发 BufferBuilder.finish(),创建一个新的 BufferConsumer
      • 公式:bufferSize≤maxBufferSize
  5. 发送数据

    • 描述:缓冲区通过 Netty 传输到下游任务。
    • 类比:卡车把集装箱运到下一个站点。
    • 推导:网络传输的吞吐量取决于 Netty 的配置(线程数、TCP 参数等)。

5. 非专业人士的通俗总结

如果你完全不了解编程或分布式系统,可以把 RecordWriter 想象成一个智能快递员:

  • 任务:把包裹(数据)从一个工厂(任务)送到正确的下游工厂。
  • 步骤
    1. 拿到包裹,检查地址(分区策略)。
    2. 把包裹压缩打包(序列化)。
    3. 装进集装箱(缓冲区)。
    4. 选择正确的传送带(通道)。
    5. 交给卡车(网络)运走。
  • 聪明之处
    • 它会根据包裹的类型(key)确保送到正确的下游工厂。
    • 它会攒够一车包裹再送(缓冲区),避免浪费时间。
    • 它还能同时处理很多包裹(并行处理)。

6. 常见问题解答(Q&A)

Q1:RecordWriter 如何保证数据不丢失?

  • :Flink 的 RecordWriter 通过缓冲区和 Netty 的可靠传输(TCP)确保数据不丢失。如果下游任务失败,Flink 的检查点(Checkpoint)机制会回滚并重试。

Q2:为什么需要序列化?

  • :序列化把复杂的数据对象(比如 Java 类)变成字节流,方便通过网络传输。就像把一本书的内容拍成照片,方便快递寄出。

Q3:ChannelSelector 怎么决定分区的?

  • ChannelSelector 根据用户定义的逻辑(比如 keyBy 的 key)计算目标通道。对于 keyBy,它用哈希函数确保相同 key 的数据总是送到同一个下游任务。

7. 结合官方文档的补充

根据 Flink 官方文档(https://flink.apache.org/):

  • RecordWriter 是 Flink 运行时网络栈的一部分,位于 ResultPartition 和下游 InputGate 之间。
  • 它支持多种分区策略(StreamPartitioner),用户可以通过 DataStream API 灵活配置。
  • Flink 的网络传输基于高效的缓冲区管理和 Netty 框架,RecordWriter 是这一流程的起点。

文档中还提到,RecordWriter 的设计目标是:

  • 高吞吐量:通过缓冲区批量发送数据。
  • 低延迟:优化序列化和通道选择逻辑。
  • 灵活性:支持用户自定义分区策略。

8. 总结

        RecordWriter 是 Flink 数据流处理中不可或缺的组件,负责将数据高效、正确地发送到下游任务。通过序列化、分区选择、缓冲区管理和网络传输,它实现了分布式环境下数据流的可靠传递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/75473.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Dubbo、HTTP、RMI之间的区别

Dubbo、HTTP、RMI之间的区别如下&#xff1a; 表格 复制 特性DubboHTTPRMI通信机制基于Netty的NIO异步通信&#xff0c;采用长连接&#xff0c;支持多种序列化方式基于标准的HTTP协议&#xff0c;无状态&#xff0c;每次请求独立基于Java原生的RMI机制&#xff0c;支持Java对…

wkhtmltopdf生成图片的实践教程,包含完整的环境配置、参数解析及多语言调用示例

欢迎来到涛涛聊AI&#xff0c;最近在研究HTML生成卡片的功能&#xff0c;一起学习下吧。 一、工具特性与安装 wkhtmltoimage是基于WebKit引擎的开源命令行工具&#xff0c;可将HTML网页转换为JPG/PNG等图片格式&#xff0c;支持CSS渲染、JavaScript执行和响应式布局。安装方式…

【在Node.js项目中引入TypeScript:提高开发效率及框架选型指南】

一、TypeScript在Node.js中的核心价值 1.1 静态类型检测 // 错误示例&#xff1a;TypeScript会报错 function add(a: number, b: string) {return a b }1.2 工具链增强 # 安装必要依赖 npm install --save-dev typescript types/node ts-node tsconfig.json1.3 代码维护性提…

化工企业数字化转型:从数据贯通到生态重构的实践路径

一、战略定位&#xff1a;破解行业核心痛点 化工行业面临生产安全风险高&#xff08;全国危化品企业事故率年增5%&#xff09;、能耗与排放压力大&#xff08;占工业总能耗12%&#xff09;、供应链协同低效&#xff08;库存周转率低于制造业均值30%&#xff09;三大挑战。《石…

C#网络编程(Socket编程)

文章目录 0、写在前面的话1、Socket 介绍1.1 Socket是什么1.2 Socket在网络中的位置 2、C# 中的Socket参数2.1 超时控制参数2.2 缓冲区参数2.3 UDP专用参数 3、C# 中的Socket API3.1 Socket&#xff08;构造函数&#xff09;3.1.1 SocketType3.1.2 ProtocolType3.1.3 AddressFa…

Docker部署ES集群

引言&#xff1a; Elasticsearch&#xff08;ES&#xff09;作为分布式搜索引擎&#xff0c;其核心价值在于通过集群部署实现高可用性和数据冗余。 本实验对比两种典型部署方案&#xff1a; 原生Linux部署&#xff1a;直接安装ES服务&#xff0c;适用于生产环境&#xff0c;资…

老硬件也能运行的Win11 IoT LTSC (OEM)物联网版

#记录工作 Windows 11 IoT Enterprise LTSC 2024 属于物联网相关的版本。 Windows 11 IoT Enterprise 是为物联网设备和场景设计的操作系统版本。它通常针对特定的工业控制、智能设备等物联网应用进行了优化和定制&#xff0c;以满足这些领域对稳定性、安全性和长期支持的需求…

【教程】xrdp修改远程桌面环境为xfce4

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 目录 xfce4 vs GNOME对比 配置教程 1. 安装 xfce4 桌面环境 2. 安装 xrdp 3. 配置 xrdp 使用 xfce4 4. 重启 xrdp 服务 5. 配置防火墙&#xff…

【数据结构 · 初阶】- 顺序表

目录 一、线性表 二、顺序表 1.实现动态顺序表 SeqList.h SeqList.c Test.c 问题 经验&#xff1a;free 出问题&#xff0c;2种可能性 解决问题 &#xff08;2&#xff09;尾删 &#xff08;3&#xff09;头插&#xff0c;头删 &#xff08;4&#xff09;在 pos 位…

windows主机中构建适用于K8S Operator开发环境

基于win 10 打造K8S应用开发环境&#xff08;wsl & kind&#xff09; 一、wsl子系统安装 1.1 确认windows系统版本 cmd/powershell 或者win r 运行winver 操作系统要> 19044 1.2 开启wsl功能 控制面板 -> 程序 -> 启用或关闭Windows功能 开启适用于Linu…

计算机视觉色彩空间全解析:RGB、HSV与Lab的实战对比

计算机视觉色彩空间全解析&#xff1a;RGB、HSV与Lab的实战对比 一、前言二、RGB 色彩空间​2.1 RGB 色彩空间原理​2.1.1 基本概念​2.1.2 颜色混合机制​ 2.2 RGB 在计算机视觉中的应用​2.2.1 图像读取与显示​2.2.2 颜色识别​2.2.3 RGB 色彩空间的局限性​ 三、HSV 色彩空…

PyTorch多GPU训练实战:从零实现到ResNet-18模型

本文将介绍如何在PyTorch中实现多GPU训练&#xff0c;涵盖从零开始的手动实现和基于ResNet-18的简洁实现。代码完整可直接运行。 1. 环境准备与库导入 import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2l from torchvisio…

micro介绍

micro介绍 Micro 的首要特点是易于安装&#xff08;它只是一个静态的二进制文件&#xff0c;没有任何依赖关系&#xff09;和易于使用Micro 支持完整的插件系统。插件是用 Lua 编写的&#xff0c;插件管理器可自动为你下载和安装插件。使用简单的 json 格式配置选项&#xff0…

Linux内核分页——线性地址结构

每个进程通过一个指针&#xff08;即进程的mm_struct→pgd&#xff09;指向其专属的页全局目录&#xff08;PGD&#xff09;&#xff0c;该目录本身存储在一个物理页框中。这个页框包含一个类型为pgd_t的数组&#xff0c;该类型是与架构相关的数据结构&#xff0c;定义在<as…

微信小程序开发:微信小程序上线发布与后续维护

微信小程序上线发布与后续维护研究 摘要 微信小程序作为移动互联网的重要组成部分,其上线发布与后续维护是确保其稳定运行和持续优化的关键环节。本文从研究学者的角度出发,详细探讨了微信小程序的上线发布流程、后续维护策略以及数据分析与用户反馈处理的方法。通过结合实…

分享一些使用DeepSeek的实际案例

文章目录 前言职场办公领域生活领域学习教育领域商业领域技术开发领域 前言 以下是一些使用 DeepSeek 的实际案例&#xff1a; DeepSeek使用手册资源链接&#xff1a;https://pan.quark.cn/s/fa502d9eaee1 职场办公领域 行业竞品分析&#xff1a;刚入职的小李被领导要求一天内…

flink iceberg写数据到hdfs,hive同步读取

目录 1、组件版本 环境变量配置 2、hadoop配置 hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml 3、hive配置 hive-env.sh hive-site.xml HIVE LIB 原始JAR 4、flink配置集成HDFS和YARN 修改iceberg源码 编译iceberg-flink-runtime-1…

qq邮箱群发程序

1.界面设计 1.1 环境配置 在外部工具位置进行配置 1.2 UI界面设计 1.2.1 进入QT的UI设计界面 在pycharm中按顺序点击&#xff0c;进入UI编辑界面&#xff1a; 点击第三步后进入QT的UI设计界面&#xff0c;通过点击按钮进行界面设计&#xff0c;设计后进行保存到当前Pycharm…

【C++游戏引擎开发】第10篇:AABB/OBB碰撞检测

一、AABB(轴对齐包围盒) 1.1 定义 ​最小点: m i n = ( x min , y min , z min ) \mathbf{min} = (x_{\text{min}}, y_{\text{min}}, z_{\text{min}}) min=(xmin​,ymin​,zmin​)​最大点: m a x = ( x max , y max , z max ) \mathbf{max} = (x_{\text{max}}, y_{\text{…

大模型是如何把向量解码成文字输出的

hidden state 向量 当我们把一句话输入模型后&#xff0c;例如 “Hello world”&#xff1a; token IDs: [15496, 995]经过 Embedding Transformer 层后&#xff0c;会得到每个 token 的中间表示&#xff0c;形状为&#xff1a; hidden_states: (batch_size, seq_len, hidd…