记一次 Flink mongoDB CDC 到Kafka遇到的问题

背景

最近在做一个数据接入的部分事情,从mongo导入到 adb,趁着做的事情聊一下Flink内部的一些机制。
首先这会拆分两个部分,一部分是从 mongo 到 Kafka,另一部分是从 Kafka 到 adb,其中遇到了一些问题,比如说 CDC 的机制,
upset kafka source 和 kafka source的一些区别等
mongo 的版本为 4.4.x

分析

mongo -> kafka

一开始时候 Flink source 是 mongo cdc sink 选择是 正常的 kafka
部分配置如下:

// source
CREATE TABLE products (...PRIMARY KEY(_id) NOT ENFORCED
) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017,localhost:27018,localhost:27019','username' = 'flinkuser','password' = 'flinkpw','database' = 'inventory','collection' = 'products'
);// sink
CREATE TABLE KafkaTable (`ts` TIMESTAMP(3) METADATA FROM 'timestamp',`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING
) WITH (..'connector' = 'kafka','key.json.ignore-parse-errors' = 'true','format' = 'debezium-json',
)

这里选择的formatdebezium-json ,这在后续读取kafka数据进行 Row Number over操作的时候,会报错:

StreamPhysicalOverAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[]..)

从意思来看 Row Number over 操作是不支持 CDC产生的数据的(CDC会产生 +i +U -U 等数据),于是选择了 upsert kafka,upsert kafka这里会有一个解释:

作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)

所以这里我们选择把 kakfa的数据转换成的正常的 数据流,而不是CDC数据,因为我们最终存储的 Adb 是可以支持upsert操作。
可以看到Flink 物理计划中 会额外多出一个StreamExecChangelogNormalize算子,该流的具体流如下:

mongo(4.4.x) ->  StreamExecChangelogNormalize -> ConstraintEnforcer(NotNullEnforcer(fields=[_id])) -> kafkaSink

可以看到StreamExecChangelogNormalize是在kafka sink之前的,也就是说StreamExecChangelogNormalize是用来Flink用来产生CDC数据的,Flink SQL Planner 会自动为 Upsert 类型的 Source 生成一个 ChangelogNormalize 节点,并按照上述操作将其转换为完整的变更流;代价则是该算子节点需要存储体积巨大的 State 数据。具体可以参考深入解读 MongoDB CDC 的设计与实现,产生的CDC数据流如下:

StreamExecChangelogNormalize.translateToPlanInternal||\/ProcTimeDeduplicateKeepLastRowFunction.processElement||\/ProcTimeDeduplicateKeepLastRowFunction.processLastRowOnChangelog 

processLastRowOnChangelog这里会存有 keyedState 状态,但是为了补足这个带有CDC的数据的,所以这里得有依赖状态在flink端进行状态的转换,具体可以看:DeduplicateFunctionHelper.processLastRowOnChangelog 方法:

 static void processLastRowOnChangelog(RowData currentRow,boolean generateUpdateBefore,ValueState<RowData> state,Collector<RowData> out,boolean isStateTtlEnabled,RecordEqualiser equaliser)throws Exception {RowData preRow = state.value();RowKind currentKind = currentRow.getRowKind();if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) {if (preRow == null) {// the first row, send INSERT messagecurrentRow.setRowKind(RowKind.INSERT);out.collect(currentRow);} else {if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {// currentRow is the same as preRow and state cleaning is not enabled.// We do not emit retraction and update message.// If state cleaning is enabled, we have to emit messages to prevent too early// state eviction of downstream operators.return;} else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);out.collect(preRow);}currentRow.setRowKind(RowKind.UPDATE_AFTER);out.collect(currentRow);}}// normalize row kindcurrentRow.setRowKind(RowKind.INSERT);// save to statestate.update(currentRow);} else {// DELETE or UPDATER_BEFOREif (preRow != null) {// always set to DELETE because this row has been removed// even the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it.preRow.setRowKind(RowKind.DELETE);// output the preRow instead of currentRow,// because preRow always contains the full content.// currentRow may only contain key parts (e.g. Kafka tombstone records).out.collect(preRow);// clear state as the row has been removedstate.clear();}// nothing to do if removing a non-existed row}}

kafka -> adb

一开始时候的kafka 我们选择了常规的 kafka source

CREATE TABLE KafkaTable (...
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
)

这里获取到的数据就是 正常的json数据,而不是 debezium-json数据,具体区别,可以参考下面的说明。
如果选择一开始的 kafka sink是 kafka的话 可以看到这里的物理计划流向为:

kafka -> SinkMaterializer -> adb sink 

对于为什么会出现 SinkMaterializer, 为了解决 changlog的乱序问题,为下游提供一个正确的upsert视图, 产生 SinkMaterializer 物理算子的数据流如下:

StreamExecSink ||\/
createSinkTransformation // 这里有   final boolean needMaterialization = !inputInsertOnly && upsertMaterialize; 会插入SinkUpsertMaterializer算子||\/
SinkUpsertMaterializer //table.exec.state.ttl的设置||\/SinkUpsertMaterializer.processElement // 这里有 keyed state

当然SinkUpsertMaterializer 这个算子也是可以通过配置 table.exec.sink.upsert-materialize 控制的

,因为我们现在选择 kafka sink的是upsert kafka 这里会消除掉cdc数据,所以不存在以上的SinkUpsertMaterializer.

debezium-json的格式与 json的格式区别

debezium-json 变成 {before:, after:, op:} before 和after里才是真正的数据
json 直接就是 json格式数据

这里的区别的具体数据流可以参考如下,主要是 对mongo数据的处理:

KafkaDynamicTableFactory.createDynamicTableSink||\/KafkaDynamicSink.getSinkRuntimeProvider||\/final SerializationSchema<RowData> valueSerialization =createSerialization(context, valueEncodingFormat, valueProjection, null);||\/
DynamicKafkaRecordSerializationSchema 这里会用到||\/
valueSerialized = valueSerialization.serialize(valueRow);

valueSerialization 这会有多种序列化的方式,如:
debezium-json 对应 DebeziumJsonSerializationSchema
json对应JsonRowDataSerializationSchema

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/56092.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java多线程之死锁(死锁产生条件、手写简单死锁程序、破坏死锁)(面试常有)

目录 一、死锁。 &#xff08;1&#xff09;实际生活"死锁"情景。 &#xff08;2&#xff09;程序中举例。 &#xff08;3&#xff09;死锁产生必要的条件。 <1> 互斥使用。 <2> 不可抢占。 <3> 请求和保持。 <4> 循环等待。 &#xff08;4&…

iOS 14 自定义画中画悬浮窗 Custom AVPictureInPictureController 实现方案

iOS 14&#xff0c;基于 AVPictureInPictureController&#xff0c;实现自定义画中画&#xff0c;涵盖所有功能与难点。 市面上的各种悬浮钟和提词器的原理都是基于此。 Demo源码在文末。 使用 iOS 画中画的要求&#xff1a; 真机&#xff0c;不能使用模拟器&#xff1b;iO…

starrocks-删除表字段

1、背景 之前做了个大宽表&#xff0c;将近100个字段&#xff0c;但是后来发现很多字段在实际生产上都没有用到&#xff0c;并且随着数据量的增加&#xff0c;给集群的存储以及消费任务的解析带来了比较大的压力。所以决定对字段做删除处理。 当前的表是使用routine load任务从…

微服务架构:核心组件解析与设计思考(服务发现、API网关、 配置中心、负载均衡、服务调用、服务熔断、链路追踪、消息队列、服务安全、分布式事务)

微服务架构已成为大型系统设计中不可忽视的趋势&#xff0c;它通过将单一系统拆分为多个自治的服务&#xff0c;解决了传统单体架构难以应对的复杂性和扩展性问题。然而&#xff0c;微服务架构的成功依赖于多个核心组件的协同工作&#xff0c;从服务发现到API网关&#xff0c;从…

hadoop全分布式搭建(三台虚拟机,一个主节点,两个从节点)

根据尚硅谷哔哩哔哩视频搭建&#xff1a;bilibili.com/video/BV1Qp4y1n7EN/ 安装虚拟机教程可参考&#xff1a;VMware虚拟机 安装 Centos7(linux)&#xff08;新手超详细教程&#xff09;_vmware安装centos7教程-CSDN博客 集群配置如下&#xff1a; 一、先配置一台虚拟机hadoo…

python:假的身份信息生成模块faker

前言 发现一个有趣的python模块&#xff08;faker&#xff09;&#xff0c;他支持生成多个国家语言下的假身份信息&#xff0c;包含人名、地址、邮箱、公司名、电话号码、甚至是个人简历&#xff01; 你可以拿它做一些自动化测试&#xff0c;或一些跟假数据有关的填充工作。 代…

【计算机网络 - 基础问题】每日 3 题(三十八)

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?typeblog &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞…

【华为HCIP实战课程七】OSPF邻居关系排错MTU问题,网络工程师

一、MTU MUT默认1500,最大传输单元,一致性检测 [R3-GigabitEthernet0/0/1]mtu 1503//更改R3的MTU为1503 查看R3和SW1之间的OSPF邻居关系正常: 默认华为设备没有开启MTU一致性检测! [R3-GigabitEthernet0/0/1]ospf mtu-enable //手动开启MTU检测 [SW1-Vlanif30]ospf mtu…

PCL点云处理之求法向量

求法向量干什么&#xff1f;将点渲染成面 1、一个点垂直于一个曲线的切线叫法线 2、在点云中取一块区域&#xff0c;用最小二乘将区域中的点云拟合成一个面&#xff08;贴合在曲面上的一个切面&#xff09;在相近的区域计算出n个这样的面&#xff0c;用这个面求出法向量&#…

第十五届蓝桥杯C++B组省赛

文章目录 1.握手问题解题思路1&#xff08;组合数学&#xff09;解题思路2&#xff08;暴力枚举&#xff09; 2.小球反弹做题思路 3.好数算法思路&#xff08;暴力解法&#xff09;---不会超时 4.R格式算法思路 5.宝石组合算法思路---唯一分解定理 6.数字接龙算法思路----DFS 7…

分布式数据库的进度管理:TiDB 备份恢复工具 PiTR 的原理与实践

导读 对于一款企业级数据库产品而言&#xff0c;数据的安全性和可恢复性是至关重要的。PiTR&#xff08;Point in Time Restore&#xff09;作为 TiDB 备份工具的核心功能之一&#xff0c;提供了一种精细的数据恢复能力&#xff0c;允许用户将数据库集群恢复到过去的任意时间点…

C语言 | 第十六章 | 共用体 家庭收支软件-1

P 151 结构体定义三种形式 2023/3/15 一、创建结构体和结构体变量 方式1-先定义结构体&#xff0c;然后再创建结构体变量。 struct Stu{ char *name; //姓名 int num; //学号 int age; //年龄 char group; //所在学习小组 float score; //成绩 }; struct Stu stu1, stu2; //…

基于SpringBoot+Vue+Uniapp的植物园管理小程序系统(2024最新,源码+文档+远程部署+讲解视频等)

3. 论文参考 4. 项目运行截图 5. 技术框架 5.1 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring …

Spring Boot在知识管理中的应用

1系统概述 1.1 研究背景 如今互联网高速发展&#xff0c;网络遍布全球&#xff0c;通过互联网发布的消息能快而方便的传播到世界每个角落&#xff0c;并且互联网上能传播的信息也很广&#xff0c;比如文字、图片、声音、视频等。从而&#xff0c;这种种好处使得互联网成了信息传…

MySQL 之索引和查询优化

在 MySQL 数据库中&#xff0c;索引是提高查询性能的重要手段之一。而理解和应用最左前缀原则对于有效地利用索引进行查询优化至关重要。 一、索引的作用 索引是一种数据结构&#xff0c;它可以帮助数据库系统快速地定位和检索数据。通过在表的某些列上创建索引&#xff0c;数…

小白投资理财 - 中国股票市场

小白投资理财 - 中国股票市场 股票交易所上海证券交易所&#xff08;SSE&#xff09;深圳证券交易所&#xff08;SZSE&#xff09;北京证券交易所&#xff08;BSE&#xff09;全国中小企业股份转让系统&#xff08;NEEQ&#xff0c;俗称新三板&#xff09;香港联合交易所&#…

数据检测和响应:DDR 用于数据安全

数据检测和响应 (DDR) 用于主动数据安全态势管理 企业必须保护其数据免受网络攻击&#xff0c;主要有三个原因&#xff1a; 1. 公司有法律义务保证客户信息的安全&#xff1b; 2. 不这样做会损害公司的声誉&#xff1b; 3. 补救数据泄露的影响可能代价高昂&#xff0c;而且…

数据结构前置知识(上)

1. 初识集合框架 1.1 什么是集合框架 在了解集合框架之前,我们先来认识一下数据结构,所谓数据结构就是描述和组织数据的一个东西. 那什么是集合框架呢?在java里面集合框架(Java Collection Framework),又被称为容器container,说白了就是很多个接口,抽象类,实现类组成的一个包,…

解决Element-ui input 在搜狗输入法下,限制输入数字时先输入汉字后无法绑定的问题

在使用 Element UI 的 el-input 组件时&#xff0c;如果需要限制用户只能输入数字&#xff0c;并且确保在输入汉字后再输入数字能够正确绑定&#xff0c;以下提供两种解决方案&#xff0c;需要根据情况适当修改 监听 input 事件并处理值&#xff1a; 可以在 el-input 组件上监听…

架构设计笔记-12-信息系统架构设计理论与实践

目录 知识要点 案例分析 1.Java企业级应用系统 2.c/s架构&#xff0c;b/s架构 知识要点 软件架构风格是描述某一特定应用领域中系统组织方式的惯用模式。架构风格定义了一类架构所共有的特征&#xff0c;主要包括架构定义、架构词汇表和架构约束。 数据挖掘是从数据库的大…