mongodb使用debezium

前置

服务器上需要安装jdk11
jdk下载地址

kafka安装

官网下载地址

安装教程

debezium 安装

运行 Debezium 连接器需要 Java 11 或更高版本
Debezium 并不是一个独立的软件,而是很多个 Kafka 连接器的总称。这些 Kafka 连接器分别对应不同的数据库,比如 MySQL、Oracle 等。按 Kafka 连接器的常见命名规则,可能我们会把它们叫做 MySQL Kafka Source Connector 之类。

部署

1.下载对应版本的debezium插件

插件地址
在这里插入图片描述
在这里插入图片描述

2.文件解压

将下载的文件解压,将解压后的文件放到kafka的plugin文件夹下(该plugin文件夹为自己创建的plugin文件夹)*,例如
在这里插入图片描述

3. 通过 kafka connect部署

kafka connect有两种部署方式,一是单机部署,二是分布式部署。单机部署配置kafka/config/connect-standalone.properties 文件,分布式部署则配置kafka/config/connect-distributed.properties。分布式部署支持通过rest api管理connector

此处是单机部署,配置文件为kafka/config/connect-standalone.properties,主要修改以下内容:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/local/kafka/plugin

4.启动kafka-connect

需要先启动kafka

bin/connect-standalone.sh config/connect-standalone.properties

5.创建对应的debezium配置文件

在这里插入图片描述

curl -X POST http://${debezium所在服务器}:8083/connectors

{"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams"}
}
  • 如果需要在cdc输出的语句上显示before信息,需要开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image
  • 如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
    在这里插入图片描述
db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true} 
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true} 
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}

在这里插入图片描述

重点参数

参数描述
connector.class固定值io.debezium.connector.mongodb.MongoDbConnector
mongodb.connection.stringmongodb连接信息
collection.include.list需要监听的具体collection
topic.prefixkafkaTopic前缀
capture.mode输出模式

capture.mode

模式描述
change_streams输出变化流,但是在进行update操作时,不输出after字段
change_streams_update_full在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容
change_streams_with_pre_image在change_streams的基础上,增加before字段的输出,但需要进行配置
change_streams_update_full_with_pre_image在change_streams_with_pre_image的基础上增加,增加after字段,用于输出现在变化后的数据的内容

其他未使用参数

参数描述
database.include.list需要监听的具体database
database.exclude.list不监听的database(不要与database.include.list填写相同的db)
collection.exclude.list不监听的collection(不要与collection.include.list填写相同的collection)

更多参数请参考
在这里插入图片描述

cdc结果

原数据

{"userId": "1000000","allPoints": 190,"createTime": {"$date": "2024-04-25T13:31:59.678Z"},"updateTime": {"$date": "2024-04-25T13:31:59.678Z"}
}

添加数据

在这里插入图片描述
在这里插入图片描述
capture.mode两种模式输出结果一样

push数据

{$push: {"history":{"historyId": "1","changerPoints": 0,"beforePoints": 0,"afterPoints": 0,"status": "0","createTime": {"$date": "2024-01-01T16:00:00.000Z"},"comment": "测试数据","versionNo": 0}},
}

在这里插入图片描述

第一次添加

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}

在这里插入图片描述

第二次添加

在这里插入图片描述

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1\": {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}}","truncatedArrays": null}}

在这里插入图片描述

如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据的完整数据,例如
在这里插入图片描述

修改数组中的值

在这里插入图片描述

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history.1.historyId\": \"100\"}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如
在这里插入图片描述

pull操作

在这里插入图片描述

{$pull: {history: {historyId: "100",},},
}

在这里插入图片描述
此时会把现有的所有数据都返回

{"payload": {"before": null,"after": null,"updateDescription": {"removedFields": null,"updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"3\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"4\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如在这里插入图片描述

删除字段

在这里插入图片描述

{  "payload": {"before": null,"after": null,"updateDescription": {"removedFields": ["updateTime"],"updatedFields": "{}","truncatedArrays": null}}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据(此处删除的是另外一个字段),例如
在这里插入图片描述

删除数据

在这里插入图片描述
在这里插入图片描述
如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
在这里插入图片描述
通过开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image即可解决

db.runCommand({collMod: "对应的controllerName", changeStreamPreAndPostImages: {enabled: true} 
})
例如:
use db_cdc_1
db.runCommand({collMod: "c_cdc_2", changeStreamPreAndPostImages: {enabled: true} 
}){"name": "cdc-connector","config": {"connector.class": "io.debezium.connector.mongodb.MongoDbConnector","mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin","collection.include.list": "db_cdc_1.c_cdc_2","topic.prefix": "mycdc","capture.mode":"change_streams_with_pre_image"}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/4534.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++】C\C++内存管理

下面是围绕C\C内存管理这一块知识谈论相关的内存管理机制,有需要借鉴即可。 同时,我在下面也放了快速建立链表的模板,方便oj题目拿到vs上进行调试。 内存管理目录 1.CPP内存管理1.1new、delete关键字概念1.2特性1.3总结 2.new、delete的底层…

电商API数据采集接口||大数据的发展,带动电子商务产业链,促进了社会的进步

最近几年计算机技术在诸多领域得到了有效的应用,同时在多方面深刻影响着我国经济水平的发展。除此之外,人民群众的日常生活水平也受大数据技术的影响。 主流电商API数据采集接口||在这其中电子商务领域也在大数据技术的支持下,得到了明显的进…

《逃离塔科夫》PVE模式加入付费特别版引发玩家不满

《逃离塔科夫》PVE模式加入付费特别版引发玩家不满 近期,《逃离塔科夫》开发者Battlestate发布了多个新版本,但其中PVE模式只能在价格最高的“Unheard Edition”中购买,导致玩家不满。据悉,“Unheard Edition”售价高达250欧元&a…

Linxu系统服务管理,systemd知识/进程优先级/平均负载/php进程CPU100%怎么解决系列知识!

shell脚本(命令)放后台 sleep 300& 放到后台运行,脚本或命令要全路径 nohup:用户推出系统进程继续工作 【功能说明】 nohup 命令可以将程序以忽略挂起信号的方式运行起来,被运行程序的输出信息将不会显示到终端 如…

小程序的合同是怎么样写的

​很多商家找第三方做小程序都遭遇到了各种问题,如访问速度慢、服务器关闭、反复收费等。如果当初商家找的是正规的第三方服务商,双方签订了明确的合同条款,出现任何问题后,相信都能够进行解决。下面将具体介绍合同内容&#xff0…

大田场景下的路径检测论文汇总

文章目录 2020Visual Servoing-based Navigation for Monitoring Row-Crop Fields 2020 Visual Servoing-based Navigation for Monitoring Row-Crop Fields code: https://github.com/PRBonn/visual-crop-row-navigation 摘要: 自主导航是野外机器人执行精确农业…

ElasticSearch自动补全

一、拼音分词器: 当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图: 这种根据用户输入的字母,提示完整词条的功能,就是自动补全了。 GET /_analyze {"text":"我爱螺蛳粉…

opencv图片绘制图形-------c++

绘制图形 #include <opencv2/opencv.hpp> #include <opencv2/core.hpp> #include <filesystem>bool opencvTool::drawPolygon(std::string image_p, std::vector<cv::Point> points) {cv::Mat ima cv::imread(image_p.c_str()); // 读取图像&#xf…

制造业智慧工厂

在信息化、智能化浪潮的推动下&#xff0c;制造业正迎来一场前所未有的变革。智慧工厂&#xff0c;作为这一变革的核心载体&#xff0c;正逐渐成为制造业发展的新引擎。HiWoo Cloud平台&#xff0c;以其卓越的物联网云技术&#xff0c;为制造业智慧工厂的构建提供了强有力的支持…

Linux逻辑方式合并物理磁盘

在日常生活中&#xff0c;我们总是遇到一个文件太大&#xff0c;以至于我们的两个磁盘都装不下&#xff0c;这时我们就需要将两块物理磁盘逻辑化的连接在一起&#xff0c;把物理磁盘使用逻辑化的方法合并在一起&#xff0c;形成卷组&#xff0c;使得磁盘空间可以公用&#xff1…

【 AIGC 研究最新方向(上)】面向平面、视觉、时尚设计的高可用 AIGC 研究方向总结

目前面向平面、视觉、时尚等设计领域的高可用 AIGC 方向有以下 4 种&#xff1a; 透明图层生成可控生成图像定制化SVG 生成 本篇&#xff08;上篇&#xff09;介绍 1、2&#xff0c;而下篇将介绍 3、4。 透明图层生成 LayerDiffuse 代表性论文&#xff1a;Transparent Imag…

请编写函数fun,该函数的功能是:实现B=A+A‘,即把矩阵A加上A的转置,存放在矩阵B中。计算结果在main函数中输出。

本文收录于专栏:算法之翼 https://blog.csdn.net/weixin_52908342/category_10943144.html 订阅后本专栏全部文章可见。 本文含有题目的题干、解题思路、解题思路、解题代码、代码解析。本文分别包含C语言、C++、Java、Python四种语言的解法完整代码和详细的解析。 题干 请编…

redis基础(一)

启动与关闭 启动命令在/usr/local/bin目录 服务端后台启动&#xff1a;redis-server opt/redis-6.2.1/redis.conf 客户端连接&#xff1a;执行 redis-cli 关闭操作 ​ 方式1&#xff1a;进入终端后关闭 ​ 方式2&#xff1a;直接kill 掉进程 方式3&#xff1a;通过实例关闭 …

【力扣周赛】第394场周赛

文章目录 1.统计特殊字母的数量2.使矩阵满足条件的最少操作次数 1.统计特殊字母的数量 题目链接 &#x1f34e;该题涉及的小技巧&#xff1a;&#x1f425; &#x1f427;①大写字母和对应的小写字母低5位都是相等的&#xff1b; &#x1f427;②大写字母ASCII二进制第 6 位…

应用实战|只需几步,即可享有外卖订餐小程序

本示例是一个简单的外卖查看店铺点菜的外卖微信小程序&#xff0c;小程序后端服务使用了MemFire Cloud&#xff0c;其中使用到的MemFire Cloud功能包括&#xff1a; 其中使用到的MemFire Cloud功能包括&#xff1a; 云数据库&#xff1a;存储外卖微信小程序所有数据表的信息。…

实时采集麦克风并播放(springboot+webscoekt+webrtc)

项目技术 springbootwebscoektwebrtc 项目介绍 项目通过前端webrtc采集麦克风声音&#xff0c;通过websocket发送后台&#xff0c;然后处理成g711-alaw字节数据发生给广播UDP并播放。 后台处理项目使用线程池(5个线程)接受webrtc数据并处理g711-alaw字节数组放到Map容器中&…

PotatoPie 4.0 实验教程(26) —— FPGA实现摄像头图像拉普拉斯锐化

为什么要对图像进行拉普拉斯锐化 对图像进行拉普拉斯锐化的目的是增强图像的边缘和细节&#xff0c;使图像看起来更加清晰和锐利。这种技术常用于图像处理中&#xff0c;具体原因如下&#xff1a; 增强图像的边缘信息&#xff1a;拉普拉斯锐化可以突出图像中的边缘特征&#x…

程序不包含适用于入口点的静态Main方法

&#x1f3c6;本文收录于「Bug调优」专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&&…

西湖大学赵世钰老师【强化学习的数学原理】学习笔记1节

强化学习的数学原理是由西湖大学赵世钰老师带来的关于RL理论方面的详细课程&#xff0c;本课程深入浅出地介绍了RL的基础原理&#xff0c;前置技能只需要基础的编程能力、概率论以及一部分的高等数学&#xff0c;你听完之后会在大脑里面清晰的勾勒出RL公式推导链条中的每一个部…

数据结构_时间复杂度

✨✨所属专栏&#xff1a;数据结构✨✨ ✨✨作者主页&#xff1a;嶔某✨✨ 什么是时间复杂度&#xff1f; 时间复杂度的定义&#xff1a;在计算机科学中&#xff0c;算法的时间复杂度是一个函数&#xff0c;它定量描述了该算法的运行时间。一个算法执行所耗费的时间&#xff0…