Flink Source 详解

Flink Source 详解

原文

flip-27
FLIP-27 介绍了新版本Source 接口定义及架构

相比于SourceFunction,新版本的Source更具灵活性,原因是将“splits数据获取”与真“正数据获取”逻辑进行了分离
在这里插入图片描述

重要部件

Source 作为工厂类,会创建以下两个重要部件

  1. SplitEnumerator

    • 通过createEnumerator创建

    • SplitEnumerator 响应request split请求

      • handleSplitRequest
    • 工作在SourceCoordinator (官方描述如下),可以理解为在JobMaster上运行一个单线程的逻辑,所以需要跟在worker上的reader通过rpc通信

      Where to run the enumerator
      There was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.Each SplitEnumerator will be encapsulated in one SourceCoordinator. If there are multiple sources, multiple SourceCoordinator will there be. The SourceCoordinators will run in the JobMaster, but not as part of the ExecutionGraph. In this FLIP, we propose to failover the entire execution graph when the SplitEnumerator fails. A finer grained enumerator failover will be proposed in a later FLIP.
      
  2. SourceReader

    • 通过createReader创建

    • 工作在worker

    • 由于单独实现SourceReader过于复杂,官方抽象了3种比较通用的模型供开发者使用,MySqlSourceReader就是继承了SingleThreadMultiplexSourceReaderBase

      1. Sequential Single Split (File, database query, most bounded splits)
      2. Multi-split multiplexed (Kafka, Pulsar, Pravega, …)
      3. Multi-split multi-threaded (Kinesis, …)
        在这里插入图片描述

      在这里插入图片描述

      在这里插入图片描述

    • 使用了抽象后的类,开发者的关注点集中在实现一个SplitReader

      public interface SplitReader<E, SplitT extends SourceSplit> {RecordsWithSplitIds<E> fetch() throws InterruptedException;void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);void wakeUp();
      }
      
      1. fetch 获取数据,这里是包含了split信息的record
      2. 响应split改变
      3. 唤醒
  3. RecordEmitter

    1. The RecordsWithSplitIds returned by the SplitReader will be passed to an RecordEmitter one by one.
    2. The RecordEmitter is responsible for the following:
      • Convert the raw record type into the eventual record type
      • Provide an event time timestamp for the record that it processes.
    3. 在 emitRecord 方法中实现

由于通信使用mail风格的rpc(单线程串行),所以响应函数需要保证非阻塞,所以后面可以看到无论enumerator还是reader的最终响应都是在异步线程池中

Non-blocking progress methods, to it supports running in an actor/mailbox/dispatcher style operator

MysqlSource 举例

以flink cdc中的MysqlSource来举例分析

  1. MysqlSource
    • 通过 createEnumerator 创建 MySqlSourceEnumerator

      • 初始化调用start
        • 调用splitAssigner.open()
          • splitAssigner 是获取/分配split动作的真正实现
            • 创建异步线程,填充remainingSplits
      • handleSplitRequest 响应空闲worker的请求
        • assignSplits
          • splitAssigner.getNext()
            • 从 remainingSplits 拿一个可用的split
      • 调用 context.assignSplit 发送 AddSplitEvent
      • MySqlSourceEnumerator 中 splitAssigner 的实现说明
        • splitAssigner 默认实现是 MySqlHybridSplitAssigner
          • hybrid的含义,启动分为两个步骤 1. 读取全量数据 2. 全量数据读取完毕后读取增量数据。将两种模式混合在一起被称为hybird。所以MySqlSnapshotSplitAssigner可以创建两种split
            1. 通过MySqlSnapshotSplitAssigner创建存量数据的split
              • 在读取存量数据时通过chunkSplitter切分为多个split,之后分发给多个reader并行读取
                • chunkSplitter 通过 chunkKey 的范围将存量数据切分
                • 用户可以手动设置chunkKey,否则使用主key作为chunkKey,切分split
            2. 通过 createBinlogSplit 创建增量数据的split
              • 只assign一次binlog的split
              • 只能分发给一个reader,所以在进入增量模式后flink实际所有并行度上只有一个source有数据
                在这里插入图片描述
    • 通过 createReader 创建 MySqlSourceReader

      • 创建 SingleThreadFetcherManager 传入 elementQueue splitReaderSupplier
        • elementQueue: io线程和主线程公用队列,io线程写,主线程读
        • splitReaderSupplier: split reader的工厂
        • SingleThreadFetcherManager 启动后创建线程池
      • sourceOperator 收到 AddSplitEvent 调用 sourceReader.addSplits 这里 sourceReader 是 MySqlSourceReader
        • readerBase 中会调用 splitFetcherManager.addSplits(splits);
          • 由于使用的是 SingleThreadFetcherManager,所以addSplits中永远看到只同时存在一个fetcher
            • fetcher 初始化时加入默认任务 FetchTask 构造的时候传入 elementQueue 传入构造好的 splitReader
            • fetcher addSplits时加入任务 AddSplitsTask
            • fetcher 启动时调用 startFetcher
              • 调用 executors.submit(fetcher); 提交到线程池
              • 线程池中运行 runOnce
                • FetchTask 调用 splitReader.fetch() 获取records 写入 elementQueue
      • 主线程 SourceReaderBase 中的 pollNext 会被框架调用
        • 调用 getNextFetch
          • elementsQueue.poll() 取得 records
            在这里插入图片描述

其他

在Flink CDC 3.0 中

Flink Composer 中使用 WatermarkStrategy.noWatermarks()

 return env.fromSource(sourceProvider.getSource(),WatermarkStrategy.noWatermarks(),sourceDef.getName().orElse(generateDefaultSourceName(sourceDef)),new EventTypeInfo()).setParallelism(sourceParallelism);

很合理,因为pipeline的定义中不会出现聚合函数 window函数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/60841.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android Settings 单元测试 | 如何运行单元测试?

背景 在Android Settings 单元测试 | Telephony Network 模块 APN 案例中粗略介绍了单元测试逻辑内容&#xff0c;但是在独立APK里面如何将单元测试跑起来还是有疑问&#xff0c;因为APP不能直接install&#xff0c;无法借助Android Studio直接Run&#xff0c;在安装的一步会报…

【Qt聊天室】客户端实现总结

目录 1. 项目概述 2. 功能实现 2.1 主窗口设计 2.2 功能性窗口 2.3 主界面功能实现 2.4 聊天界面功能实现 2.5 个人信息功能开发 2.6 用户信息界面设置功能 2.7 单聊与群聊 2.8 登录窗口 2.9 消息功能 3. 核心设计逻辑 3.1 核心类 3.2 前后端交互与DataCenter 4…

java瑞吉外卖

环境搭建 一、数据库环境搭建 1.新建数据库reggie&#xff0c;这里字符集一般用utf8mb4&#xff0c;排序规则一般用utf8mb4_general_ci或utf8mb4_unicode_ci 2.然后导入表结构 二、创建springboot工程 然后检查maven仓库设置&#xff0c;jdk 这是我的pom.xml文件 <?xml …

App Store用户评论如何影响ASO优化

您是否专注于提高应用的知名度&#xff0c;并想知道应用商店评分和用户评论如何发挥作用&#xff1f;应用商店用户评论和评分对于塑造应用的成功至关重要&#xff0c;并且可以显著影响您的应用商店优化 (ASO) 策略。本文提供了利用这些元素为您带来优势的见解和策略。 如今&…

我谈二值形态学基本运算——腐蚀、膨胀、开运算、闭运算

Gonzalez从集合角度定义膨胀和腐蚀&#xff0c;不易理解。 Through these definitions, you can interpret dilation and erosion as sliding neighborhood operations analogous to convolution (or spatial filtering). 禹晶、肖创柏、廖庆敏《数字图像处理&#xff08;面向…

【AIGC】如何通过ChatGPT提示词Prompt定制个性学习计划

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | 提示词Prompt应用实例 文章目录 &#x1f4af;前言&#x1f4af;提示词&#x1f4af;配置信息使用方法 &#x1f4af;指令/language/plan/start/test/continue/config &#x1f4af;小结 &#x1f4af;前言 在这篇文章中…

RFID被装信息化监控:物联网解决方案深入分析

被装物联网信息化监控构成了一套复杂而高效的处理方案&#xff0c;它巧妙地将物联网技术与被装资源管理的具体需求相结合&#xff0c;实现了对被装资源实时监控、智能化调控和优化分配。以下是对被装物联网信息化监控的详细说明&#xff1a; 一、被装物联网信息化监控的定义 …

C++ 关于类与对象(中篇)一篇详解!(运算符重载)

赋值运算符重载 运算符重载 C 为了 增强代码的可读性 引入了运算符重载 &#xff0c; 运算符重载是具有特殊函数名的函数 &#xff0c;也具有其返回值类型&#xff0c;函数名字以及参数列表&#xff0c;其返回值类型与参数列表与普通的函数类似。 函数名字为&#xff1a;关键…

有效对接礼顿销售单:从数据获取到金蝶云存储

礼顿销售单对接项目&#xff1a;轻松实现数据集成 礼顿销售单对接&#xff08;91-零售业务/5-代销售(供货商发货)&#xff09; 在礼顿销售单对接项目中&#xff0c;我们面临的主要任务是将吉客云奇门的数据集成到金蝶云星空平台。这个过程不仅需要确保数据的准确性和完整性&am…

【C++】—— map 与 set 深入浅出:设计原理与应用对比

不要只因一次失败&#xff0c;就放弃你原来决心想达到的目的。 —— 莎士比亚 目录 1、序列式容器与关联式容器的概述与比较 2、set 与 multiset 2.1 性质分析&#xff1a;唯一性与多重性的差异 2.2 接口解析&#xff1a;功能与操作的全面解读 3、map 与 multimap 3.1 性…

基于微信小程序的平安驾校预约平台的设计与实现(源码+LW++远程调试+代码讲解等)

摘 要 互联网发展至今&#xff0c;广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对高校教师成果信息管理混乱&#xff0c;出错率高&#xff0c;信息安全性差&#xff0c;劳动强度大&#xff0c;费时费力…

SpringBoot+Vue3开发会议管理系统

1 项目介绍 会议管理系统&#xff0c;简化公司内会议方面的流程&#xff0c;提供便捷。实现对会议室的管理、会议的管理、会议预约的管理&#xff0c;三大主流程模块。 系统分为三种角色&#xff0c;分别是员工、管理员和超级管理员。 员工角色功能&#xff1a;查看会议室占…

Docker环境搭建Cloudreve网盘服务(附shell脚本一键搭建)

Docker搭建Cloudreve Cloudreve介绍&#xff1a; Cloudreve 是一个基于 ThinkPHP 框架构建的开源网盘系统&#xff0c;旨在帮助用户以较低的成本快速搭建起既能满足个人也能满足企业需求的网盘服务。Cloudreve 支持多种存储介质&#xff0c;包括但不限于本地存储、阿里云OSS、…

Cadence安装

记录一下安装过程&#xff0c;方便以后安装使用Cadence。 去吴川斌的博客下载安装包&#xff0c;吴川斌博客&#xff1a; https://www.mr-wu.cn/cadence-orcad-allegro-resource-downloads/ 下载阿狸狗破戒大师 我这边下载的是版本V3.2.6&#xff0c;同样在吴川斌的博客下载安装…

系统架构设计师:系统架构设计基础知识

从第一个程序被划分成模块开始&#xff0c;软件系统就有了架构。 现在&#xff0c;有效的软件架构及其明确的描述和设计&#xff0c;已经成为软件工程领域中重要的主题。 由于不同人对Software Architecture (简称SA) 的翻译不尽相同&#xff0c;企业界喜欢叫”软件架构“&am…

Java Web 工程全貌

通过下图&#xff0c;我们可以一览 Java Web 工程的全貌 通过上图&#xff0c;我们能够基本窥探整个 Java Web 工程的面貌&#xff0c;包括前端&#xff0c;后端&#xff0c;甚至是运维。 接下来&#xff0c;我们就结合文字描述&#xff0c;加深理解。 部署Vue前端和Spring…

Linux入门:环境变量与进程地址空间

一. 环境变量 1. 概念 1️⃣基本概念&#xff1a; 环境变量(environment variables)一般是指在操作系统中用来指定操作系统运行环境的一些参数 如&#xff1a;我们在编写C/C代码的时候&#xff0c;在链接的时候&#xff0c;从来不知道我们的所链接的动态静态库在哪里&#x…

【优选算法 — 滑动窗口】水果成篮 找到字符串中所有字母异位词

水果成篮 水果成篮 题目描述 因为只有两个篮子&#xff0c;每个篮子装的水果种类相同&#xff0c;如果从 0 开始摘&#xff0c;则只能摘 0 和 1 两个种类 &#xff1b; 因为当我们在两个果篮都装有水果的情况下&#xff0c;如果再走到下一颗果树&#xff0c;果树的水果种类…

Java 中使用Mockito 模拟对象的单元测试的快速示例

Mockito是一个流行的Java模拟框架&#xff0c;它允许你在单元测试中创建和配置模拟对象&#xff0c;以便在测试过程中替换那些不容易构造或获取的对象。 Mockito可以与JUnit无缝集成&#xff0c;下面的示例演示 Mockito JUnit实现模拟对象的单元测试。 依赖导入 这里使用Mav…

STM32 创建一个工程文件(寄存器、标准库)

首先到官网下载对应型号的固件包&#xff1a; 像我的STM32F103C8T6的就下载这个&#xff1a; 依次打开&#xff1a; .\STM32F10x_StdPeriph_Lib_V3.5.0\STM32F10x_StdPeriph_Lib_V3.5.0\Libraries\CMSIS\CM3\DeviceSupport\ST\STM32F10x\startup\arm 可以看到&#xff1a; 这…