Apache SeaTunnel MongoDB CDC 使用指南

随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

file

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批处理
  • 流处理
  • 精确一次
  • 列投影
  • 并行度
  • 支持用户定义分片

功能描述

MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。

支持的数据源信息

要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。

数据源支持的版本依赖
MongoDB通用下载

可用性设置

  1. MongoDB版本:MongoDB 版本 >= 4.0。
  2. 集群部署:副本集或分片集群。
  3. 存储引擎:WiredTiger 存储引擎。
  4. 权限:changeStream 和 read
use admin;
db.createRole({role: "strole",privileges: [{resource: { db: "", collection: "" },actions: ["splitVector","listDatabases","listCollections","collStats","find","changeStream" ]}],roles: [{ role: 'read', db: 'config' }]}
);db.createUser({user: 'stuser',pwd: 'stpw',roles: [{ role: 'strole', db: 'admin' }]}
);

数据类型映射

以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。

MongoDB BSON 类型SeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDATE
TimestampTIMESTAMP
ObjectROW
ArrayARRAY

对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。

MongoDB BSON 类型SeaTunnel STRING 表示
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}
提示

在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。

名称类型必须默认值描述
hostsString-MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018
usernameString-连接 MongoDB 时使用的数据库用户名。
passwordString-连接 MongoDB 时使用的密码。
databaseList-要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。
collectionList-数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。
connection.optionsString-MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。
batch.sizeLong1024游标批大小。
poll.max.batch.sizeEnum1024轮询新数据时包含在单个批次中的更改流文档的最大数量。
poll.await.time.msLong1000等待检查更改流上的新结果之前的时间量。
heartbeat.interval.msString0发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
incremental.snapshot.chunk.size.mbLong64增量快照的块大小(MB)。
common-options-源插件通用参数,请参考源通用选项获取详情。

提示:

  • 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
  • MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
  • 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。

如何创建 MongoDB CDC 数据同步作业

将 CDC 数据打印到客户端

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:

env {# 您可以在此处设置引擎配置parallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpwschema = {fields {"_id" : string,"name" : string,"description" : string,"weight" : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism = 1}
}

将 CDC 数据写入 MysqlDB

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:

env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpw}
}sink {jdbc {url = "jdbc:mysql://mysql_cdc_e2e:3306"driver = "com.mysql.cj.jdbc.Driver"user = "st_user"password = "seatunnel"generate_sink_sql = true# You need to configure both database and tabledatabase = mongodb_cdctable = productsprimary_keys = ["_id"]}
}

多表同步

以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:

env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory","crm"]collection = ["inventory.products","crm.test"]username = stuserpassword = stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism = 1}
}

提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。

使用正则表达式匹配多表

以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:

匹配示例表达式描述
前缀匹配^(test).*匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。
后缀匹配.*[p$]匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }

Console printing of the read Mongodb data

sink { Console { parallelism = 1 } }


### 实时流数据格式

{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }

```

到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。

Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/755711.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jackson 2.x 系列【1】概述

有道无术,术尚可求,有术无道,止于术。 本系列Jackson 版本 2.17.0 源码地址:https://gitee.com/pearl-organization/study-seata-demo 文章目录 1. 前言2. 什么是 JSON3. 常用 Java JSON 库4. Jackson4.1 简介4.2 套件4.3 模块4.…

prompt开发生命周期

1.定义任务场景和成功标准 任务场景可分为简单任务:实体抽取、qa等 复杂任务:代码生成、创意写作等 在定义任务后,就要定义模型实现该任务的成功标准: 模型表现和准确率;延迟;价格。 2.开发测试用例 多…

Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架

一、安装node.js 本来想粗略写一下的,但是搭建脚手架的时候,遇到了很多问题,浪费快两天时间,记录一下自己的解决办法希望对你们有帮助! 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…

代码随想录算法训练营第25天| 216.组合总和III、17.电话号码的字母组合

216.组合总和III 题目链接:组合总和III 题目描述:找出所有相加之和为 n **的 k ****个数的组合,且满足下列条件: 只使用数字1到9每个数字 最多使用一次 返回 所有可能的有效组合的列表 。该列表不能包含相同的组合两次&#xff0c…

2024热门外贸独立站wordpress模板

工艺品wordpress外贸主题 简约大气的wordpress外贸主题,适合做工艺品进出品外贸的公司官网使用。 https://www.jianzhanpress.com/?p5377 日用百货wordpress外贸主题 蓝色大气的wordpress外贸主题,适合做日用百货的外贸公司搭建跨境电商网站使用。 …

Qt教程 — 3.3 深入了解Qt 控件:Input Widgets部件(2)

目录 1 Input Widgets简介 2 如何使用Input Widgets部件 2.1 QSpinBox组件-窗口背景不透明调节器 2.2 DoubleSpinBox 组件-来调节程序窗口的整体大小 2.3 QTimeEdit、QDateEdit、QDateTimeEdit组件-编辑日期和时间的小部件 Input Widgets部件部件较多,将分为三…

centos上安装Docker

0.安装Docker Docker 分为 CE 和 EE 两大版本。CE 即社区版(免费,支持周期 7 个月),EE 即企业版,强调安全,付费使用,支持周期 24 个月。 Docker CE 分为 stable test 和 nightly 三个更新频道…

PlantUML Integration 编写短信服务类图

PlantUML Integration 写一个类图,主要功能为 1、编写一个serviceSms短信服务类; 2、需要用到短信的地方统一调用基建层的服务即可; 3、可以随意切换、增加短信厂商,不需要更改场景代码,只需要更改application.yml 里面…

13个外贸业务员常用邮件模板-订单沟通

除了报价后跟进客户,我们在实际工作过程当中也会遇到很多非常规性的情况,需要和客户及时沟通处理。 以下是13个外贸业务员常用邮件模板-订单沟通:你可以根据自己的行业、公司、产品情况以及自身的经验判断进行调整和完善,做出一套…

水下蓝牙耳机哪个牌子好?业界公认四大高口碑游泳耳机

在这个活力四溢的时代,人们对于健康生活方式的追求愈发热切,游泳作为一项兼顾休闲与健身的运动,深受大众喜爱。在水下世界,音乐的陪伴能增添游泳的乐趣,一款好的水下蓝牙耳机成为游泳爱好者们的新宠。 近年来&#xff…

DZY-212中间继电器 DC 220V 板后接线 面板安装 JOSEF约瑟

系列型号: DZY-200系列中间继电器;DZY-201中间继电器; DZY-202中间继电器;DZY-203中间继电器; DZY-204中间继电器;DZY-205中间继电器; DZY-206中间继电器;DZY-207中间继电器; DZY-20…

openEuler 22.03(华为欧拉)一键安装 Oracle 19C(19.22) 数据库

前言 Oracle 一键安装脚本,演示 openEuler 22.03 一键安装 Oracle 19C 单机版过程(全程无需人工干预):(脚本包括 ORALCE PSU/OJVM 等补丁自动安装) ⭐️ 脚本下载地址:Shell脚本安装Oracle数据…

ssh免密登陆更换目标主机后无法连接

在进行hadoop分布式环境搭建时(三台机,master,slave1,slave2),后期slave2系统出现问题,更换新机后,master与slave2文件传输失败: 以为是秘钥过期的问题,更换…

走出大模型部署新手村!小明这样用魔搭+函数计算

作者:拓山 前文介绍了魔搭 ModelScope 社区模型服务 SwingDeploy 服务。开发者可以将模型从魔搭社区的模型库一键部署至阿里云函数计算,当选择模型并部署时,系统会选择对应的机器配置。按需使用可以在根据工作负载动态的减少资源&#xff0c…

c++多长时间会被Python或者其他语言取代?

c多长时间会被Python或者其他语言取代? 如果不考虑市场因素,C#今天就可以取代C。 自.NET跨平台至今,C能做的工作,C#都能做了,且性能差别不大。 在C最有优势的嵌入式UI方面,C#可以拿出Avalonia替代QT。用 …

9.16单词拆分(LC139-M)

算法: 这道题可以用回溯,但是可能会超时 可以用背包问题解决: 物品:单词 背包:字符串: 单词能否组成字符串s,就是问物品能不能把背包装满。 拆分时可以重复使用字典中的单词,就…

M4芯片和核心板应用于工业网关解决方案

在工业级应用中,M4芯片和核心板凭借其丰富的接口和强大的性能及高性价比特点,已经成为众多工业设备中的首选。本文将从M4芯片和核心板的特点、应用场景、以及其在工业级网关中的应用等方面进行详细阐述。 一、M4芯片和核心板的特点 M4芯片和核心板以其…

nfs介绍与配置

NFS 1. nfs简介 nfs特点 NFS(Network File System)即网络文件系统,是FreeBSD支持的文件系统中的一种,它允许网络中的计算机之间通过TCP/IP网络共享资源在NFS的应用中,本地NFS的客户端应用可以透明地读写位于远端NFS服…

camelot pdf提取表格实践(记录)

参考: 巧用Python的camelot库批量提取PDF发票信息 关于文本pdf的表格抽取 AttributeError: module ‘camelot‘ has no attribute ‘read_pdf‘及类似问题解决办法 camelot 参数 https://blog.csdn.net/INTSIG/article/details/123000010 报错解决: Mod…

美团大规模KV存储挑战与架构实践

KV 存储作为美团一项重要的在线存储服务,承载了在线服务每天万亿级的请求量,并且保持着 99.995% 的服务可用性。在 DataFunSummit 2023 数据基础架构峰会上,我们分享了《美团大规模 KV 存储挑战与架构实践》,本文为演讲内容的整理…