Debezium发布历史56

原文地址: https://debezium.io/blog/2019/05/23/tutorial-using-debezium-connectors-with-apache-pulsar/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

将 Debezium 连接器与 Apache Pulsar 结合使用的教程
2019 年 5 月 23 日 作者: Jia Zhai,StreamNative
讨论 实例
这是 Apache Pulsar PMC 成员兼提交者 Jia Zhai 的客座文章。

Debezium是一个用于变更数据捕获 (CDC) 的开源项目。它基于Apache Kafka Connect构建,支持多种数据库,例如 MySQL、MongoDB、PostgreSQL、Oracle 和 SQL Server。Apache Pulsar包含一组基于 Pulsar IO 框架的内置连接器,它与 Apache Kafka Connect 相对应。

从版本 2.3.0 开始,Pulsar IO 提供了对Debezium 源连接器的开箱即用支持,因此您可以利用 Debezium 将数据库中的更改流式传输到 Apache Pulsar。本教程将引导您通过 Pulsar IO 设置 MySQL 的 Debezium 连接器。

教程步骤
本教程与Debezium 教程类似,只是事件流的存储从 Kafka 更改为 Pulsar。主要包括六个步骤:

启动MySQL服务器;

启动独立的 Pulsar 服务;

在 Pulsar IO 中启动 Debezium 连接器。Pulsar IO 读取 MySQL 服务器中存在的数据库更改;

订阅 Pulsar 主题,监控 MySQL 变化;

在 MySQL 服务器中进行更改,并验证更改是否立即记录在 Pulsar 主题中;

清理。

第1步:启动MySQL服务器
启动包含数据库示例的 MySQL 服务器,Debezium 从中捕获更改。打开一个新终端来启动一个新容器,该容器运行预先配置了名为 inventory 的数据库的 MySQL 数据库服务器:

docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
显示以下信息:

2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: ‘5.7.25-log’ socket: ‘/var/run/mysqld/mysqld.sock’ port: 3306 MySQL Community Server (GPL)
第 2 步:启动独立的 Pulsar 服务
以独立模式在本地启动 Pulsar 服务。Pulsar 2.3.0 中引入了对在 Pulsar IO 中运行 Debezium 连接器的支持。下载2.3.0 版本的 Pulsar 二进制文件和2.3.0 版本的 pulsar-io-kafka-connect-adaptor-2.3.0.nar。在 Pulsar 中,所有 Pulsar IO 连接器都打包为单独的NAR文件。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp …/pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
图片来自于官网原文
在这里插入图片描述

步骤 3:在 Pulsar IO 中启动 Debezium MySQL 连接器
在另一个终端选项卡中以本地运行模式启动 Pulsar IO 中的 Debezium MySQL 连接器。“debezium-mysql-source-config.yaml”文件包含所有配置,主要参数列在“configs”节点下。.yaml 文件包含“task.class”参数。配置文件还包括 MySQL 相关参数(如服务器、端口、用户、密码)以及用于“历史”和“偏移”存储的 Pulsar 主题的两个名称。

$ bin/pulsar-admin source localrun --sourceConfigFile debezium-mysql-source-config.yaml
“debezium-mysql-source-config.yaml”文件中的内容如下。

tenant: “test”
namespace: “test-namespace”
name: “debezium-kafka-source”
topicName: “kafka-connect-topic”
archive: “connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar”

parallelism: 1

configs:

sourceTask

task.class: “io.debezium.connector.mysql.MySqlConnectorTask”

config for mysql, docker image: debezium/example-mysql:0.8

database.hostname: “localhost”
database.port: “3306”
database.user: “debezium”
database.password: “dbz”
database.server.id: “184054”
database.server.name: “dbserver1”
database.whitelist: “inventory”

database.history: “org.apache.pulsar.io.debezium.PulsarDatabaseHistory”
database.history.pulsar.topic: “history-topic”
database.history.pulsar.service.url: “pulsar://127.0.0.1:6650”

KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG

key.converter: “org.apache.kafka.connect.json.JsonConverter”
value.converter: “org.apache.kafka.connect.json.JsonConverter”

PULSAR_SERVICE_URL_CONFIG

pulsar.service.url: “pulsar://127.0.0.1:6650”

OFFSET_STORAGE_TOPIC_CONFIG

offset.storage.topic: “offset-topic”
表是在前面提到的 MySQL 服务器中自动创建的。因此 Debezium 连接器从头开始从 MySQL binlog 文件中读取历史记录。在输出中,您会发现连接器已在 47 条记录中被触发和处理。
图片来自于官网原文
在这里插入图片描述

连接器启动过程记录
有关如何管理连接器的更多信息,请参阅Pulsar IO 文档。

Debezium 捕获和读取的记录会自动发布到 Pulsar 主题。当您启动新终端时,您将使用以下命令在 Pulsar 中找到当前主题:

$ bin/pulsar-admin topics list public/default
图片来自于官网原文
在这里插入图片描述

对于每个已更改的表,更改数据存储在单独的 Pulsar 主题中。除了数据库表相关主题外,另外两个名为“history-topic”和“offset-topic”的主题用于存储历史和偏移量相关数据。

persistent://public/default/history-topic
persistent://public/default/offset-topic
第四步:订阅Pulsar主题来监控MySQL变化
以persistent://public/default/dbserver1.inventory.products题目为例。使用 CLI 命令来使用该主题并在“产品”表更改时监视更改。

$ bin/pulsar-client consume -s “sub-products” public/default/dbserver1.inventory.products -n 0
输出如下:


22:17:41.201 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]
22:17:41.223 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 – consumer: 0
您还可以使用偏移量主题来监控偏移量更改,同时表更改存储在persistent://public/default/dbserver1.inventory.productsPulsar 主题中。

$ bin/pulsar-client consume -s “sub-offset” offset-topic -n 0
第 5 步:在 MySQL 服务器中进行更改,并验证更改是否立即记录在 Pulsar 主题中
启动 MySQL CLI docker 连接器,您可以更改 MySQL 服务器中的“产品”表。

d o c k e r r u n − i t − − r m − − n a m e m y s q l t e r m − − l i n k m y s q l − − r m m y s q l : 5.7 s h − c ′ e x e c m y s q l − h " docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h" dockerrunitrmnamemysqltermlinkmysqlrmmysql:5.7shcexecmysqlh"MYSQL_PORT_3306_TCP_ADDR" -P" M Y S Q L P O R T 3 30 6 T C P P O R T " − u r o o t − p " MYSQL_PORT_3306_TCP_PORT" -uroot -p" MYSQLPORT3306TCPPORT"urootp"MYSQL_ENV_MYSQL_ROOT_PASSWORD"’
运行该命令后,将显示MySQL CLI,您可以更改“products”表中两项的名称。

mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM products ;
mysql> UPDATE products SET name=‘1111111111’ WHERE id=101;
mysql> UPDATE products SET name=‘1111111111’ WHERE id=107;

图片来自于官网原文
在这里插入图片描述

在您消费产品主题的终端中,您发现添加了两个更改。
图片来自于官网原文
在这里插入图片描述

表 topic 存储 mysql 更新
在消费偏移量主题的终端中,您发现添加了两个偏移量。
图片来自于官网原文
在这里插入图片描述

偏移主题得到更新
在本地运行连接器的终端中,您会发现另外两条记录已被处理。
图片来自于官网原文
在这里插入图片描述

表主题获取更多记录
第6步:清理。
使用“Ctrl + C”关闭终端。使用“docker ps”和“docker Kill”停止MySQL相关容器。

mysql> quit

$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
84d66c2f591d debezium/example-mysql:0.8 “docker-entrypoint.s…” About an hour ago Up About an hour 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

$ docker kill 84d66c2f591d
要删除 Pulsar 数据,请删除 Pulsar 二进制目录中的 data 目录。

$ pwd
/Users/jia/ws/releases/apache-pulsar-2.3.0

$ rm -rf data
结论
Pulsar IO 框架允许运行 Debezium 连接器来捕获变更数据,将不同数据库中的数据变更流式传输到 Apache Pulsar。在本教程中,您学习了如何捕获 MySQL 数据库中的数据更改并将其传播到 Pulsar。我们正在不断改进对使用 Apache Pulsar 运行 Debezium 连接器的支持,在 Pulsar 2.4.0 发布后将更容易使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/607383.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

笔试案例2

文章目录 1、笔试案例22、思维导图 1、笔试案例2 09)查询学过「张三」老师授课的同学的信息 selects.*,c.cname,t.tname,sc.score from t_mysql_teacher t, t_mysql_course c, t_mysql_student s, t_mysql_score sc where t.tidc.cid and c.cidsc.cid and sc.sids…

简洁大气带进度条的URL跳转页面HTML源码

源码介绍 简洁大气带进度条的URL跳转页面HTML源码,记事本修改里面的内容即可,喜欢的同学可以拿去使用 获取方式: 蓝奏云:https://wfr.lanzout.com/ic1iZ1kj6yde CSDN免积分下载:https://download.csdn.net/download/huayula/88…

Java桶排序、基数排序、剪枝算法

桶排序算法 桶排序的基本思想是: 把数组 arr 划分为 n 个大小相同子区间(桶),每个子区间各自排序,最后合并 。计数排序是桶排序的一种特殊情况,可以把计数排序当成每个桶里只有一个元素的情况。 1.找出待…

答疑解惑:核技术利用辐射安全与防护考核

前言 最近通过了《核技术利用辐射安全与防护考核》,顺利拿到了合格证。这是从事与辐射相关行业所需要的一个基本证书,考试并不难,在此写篇博客记录一下主要的知识点。 需要这个证书的行业常见的有医疗方面的,如放疗,…

黑马苍穹外卖学习Day3

目录 公共字段自动填充问题分析实现思路代码实现 新增菜品需求分析和设计接口设计代码开发开发文件上传接口功能开发 菜品分页查询需求分析和设计代码开发 菜品删除功能需求分析与设计代码实现代码优化 修改菜品需求分析和设计代码实现 公共字段自动填充 问题分析 员工表和分…

静态网页设计——崩坏3(HTML+CSS+JavaScript)

前言 声明:该文章只是做技术分享,若侵权请联系我删除。!! 感谢大佬的视频: 使用技术:HTMLCSSJS(静态网页设计) 主要内容:对游戏崩坏3进行简单介绍。 https://www.bilib…

【Linux进程】查看进程fork创建进程

目录 前言 1. 查看进程 2. 通过系统调用创建进程-fork初识 总结 前言 你有没有想过在使用Linux操作系统时,后台运行的程序是如何管理的?在Linux中,进程是一个非常重要的概念。本文将介绍如何查看当前运行的进程,并且讨论如何使用…

将项目同时提交到GitHub和码云Gitee上面,GitHub与Gitee同步

多个远程仓库同时使用 新建GitHub仓库 创建成功 在终端中创建仓库 如果你想在本地机器上创建Git仓库,或者想添加一个文件夹或文件到已经存在的Git仓库中,你应该在终端中创建你的Git仓库。在你可以通过终端来创建一个Git仓库。以下是在终端中创建Git仓…

java解析json复杂数据的第三种思路

文章目录 一、概述二、数据预览1. 接口json数据2. json转xml数据 三、代码实现1. pom.xml2. 核心代码3. 运行结果 四、源码传送 一、概述 接上篇 java解析json复杂数据的两种思路 我们已经通过解析返回json字符串得到数据,现在改变思路, 按照如下流程获取数据: #mermaid-svg-k…

AcrelCloud-3000环保用电监管云平台解决方案——安科瑞赵嘉敏

概述 国家全面推进打赢蓝天保卫战,打好碧水保卫战,打胜净土保卫战,加快生态环境保护、建设美丽中国。环保用电监管系统针对企业内的环保设施、设备运行状况进行检测,发挥环保设备的作用,提高监察效率,并为…

洛谷 P1217 [USACO1.5] 回文质数 Prime Palindromes 刷题笔记

P1217 [USACO1.5] 回文质数 Prime Palindromes - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路 直接枚举 减枝优化判断 优化1 只有偶数才会是质数 优化2 回文数的判断次数要优于检查素数 先判断是否为回文数再检查是否为质数 if( hw(i)&&isprime(i)) 这里…

Springcloud Gateway网关

简介 SpringCloud Gateway是基于WebFlux框架实现的,而WebFlux框架底层则使用了高性能的Reactor模式通信框架Netty。 Spring Cloud Gateway的目标提供统一的路由方式且基于 Filter链的方式提供了网关基本的功能,例如:安全,监控/指标&#xf…

lv14 IO模型:阻塞和非阻塞 7

1 五种IO模型------读写外设数据的方式 阻塞: 不能操作就睡觉 非阻塞:不能操作就返回错误(通过轮询即才能实现阻塞的情况 ) 多路复用:委托中介监控 信号驱动:让内核如果能操作时发信号,在信号处理函数中操作 异步IO&a…

ES6(一部分)未完...

文章目录 ES61.ES6 let声明变量2.ES6 const声明常量3.变量解构赋值3-1解构对象3-2解构数组3-3字符串解构 4.模板字符串5.字符串扩展5-1 include函数5-2 repeat函数(重复) 6.数值扩展6-1二进制和八进制表示法6-2isFinite 与 isNaN方法6-3islnteger方法6-4…

Vue3+TS+Vite 构建自动导入开发环境

关注⬆️⬆️⬆️⬆️ 专栏后期更新更多前端内容 在一个使用 Vue 3、Vite 和 TypeScript 的项目中,配置 unplugin-auto-import 和 unplugin-vue-components 插件可以极大地提高开发效率,因为它们可以自动导入 Vue 相关的 API 和 Vue 组件,从而减少了手动导入的需要。 文章目…

FPGA UDP协议栈:基于88E1111,支持RGMII、GMII、SGMII三种模式,提供3套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐我这里已有的以太网方案本协议栈的 1G-UDP版本本协议栈的 10G-UDP版本本协议栈的 25G-UDP版本1G 千兆网 TCP-->服务器 方案1G 千兆网 TCP-->客户端 方案10G 万兆网 TCP-->服务器客户端 方案 3、该UDP协议栈性能4、详细设计方案设…

Hyperledger Fabric 消息协议

Fabric 中大量采用了 gRPC 消息在不同组件之间进行通信交互,主要包括如下几种情况:客户端访问 Peer 节点,客户端和 Peer 节点访问排序节点,链码容器与 Peer 节点交互,以及多个 Peer 节点之间的 Gossip 交互。 消息结构…

Android 架构 - 模块化

参考文章 谷歌官方指南 一、概念 将大型、复杂问题拆解成一个个小的、简单问题,从而可以做到各个击破。模块化简单讲就是把多功能高耦合的代码逻辑拆散成多个功能单一职责明确的模块。模块指 Android 项目中的 module,通常会包含 Gradle 构建脚本、源代…

【Android Studio】创建第一个APP工程及生成APK安装包

🌟博主领域:嵌入式领域&人工智能&软件开发 前言:本文详细介绍创建Android Studio第一个APP工程及打包生成APK安装包。 如下两个博客我记录了第一次创建项目时出现的问题,若你也遇见了同样的问题,可参考&#…

QT上位机开发(会员充值软件)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 所有的控件当中,除了label、edit、radio、combobox和button之外,另外一个用的比较多的控件就是grid,也可称之为…