Debezium发布历史56

原文地址: https://debezium.io/blog/2019/05/23/tutorial-using-debezium-connectors-with-apache-pulsar/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

将 Debezium 连接器与 Apache Pulsar 结合使用的教程
2019 年 5 月 23 日 作者: Jia Zhai,StreamNative
讨论 实例
这是 Apache Pulsar PMC 成员兼提交者 Jia Zhai 的客座文章。

Debezium是一个用于变更数据捕获 (CDC) 的开源项目。它基于Apache Kafka Connect构建,支持多种数据库,例如 MySQL、MongoDB、PostgreSQL、Oracle 和 SQL Server。Apache Pulsar包含一组基于 Pulsar IO 框架的内置连接器,它与 Apache Kafka Connect 相对应。

从版本 2.3.0 开始,Pulsar IO 提供了对Debezium 源连接器的开箱即用支持,因此您可以利用 Debezium 将数据库中的更改流式传输到 Apache Pulsar。本教程将引导您通过 Pulsar IO 设置 MySQL 的 Debezium 连接器。

教程步骤
本教程与Debezium 教程类似,只是事件流的存储从 Kafka 更改为 Pulsar。主要包括六个步骤:

启动MySQL服务器;

启动独立的 Pulsar 服务;

在 Pulsar IO 中启动 Debezium 连接器。Pulsar IO 读取 MySQL 服务器中存在的数据库更改;

订阅 Pulsar 主题,监控 MySQL 变化;

在 MySQL 服务器中进行更改,并验证更改是否立即记录在 Pulsar 主题中;

清理。

第1步:启动MySQL服务器
启动包含数据库示例的 MySQL 服务器,Debezium 从中捕获更改。打开一个新终端来启动一个新容器,该容器运行预先配置了名为 inventory 的数据库的 MySQL 数据库服务器:

docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
显示以下信息:

2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: ‘5.7.25-log’ socket: ‘/var/run/mysqld/mysqld.sock’ port: 3306 MySQL Community Server (GPL)
第 2 步:启动独立的 Pulsar 服务
以独立模式在本地启动 Pulsar 服务。Pulsar 2.3.0 中引入了对在 Pulsar IO 中运行 Debezium 连接器的支持。下载2.3.0 版本的 Pulsar 二进制文件和2.3.0 版本的 pulsar-io-kafka-connect-adaptor-2.3.0.nar。在 Pulsar 中,所有 Pulsar IO 连接器都打包为单独的NAR文件。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp …/pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
图片来自于官网原文
在这里插入图片描述

步骤 3:在 Pulsar IO 中启动 Debezium MySQL 连接器
在另一个终端选项卡中以本地运行模式启动 Pulsar IO 中的 Debezium MySQL 连接器。“debezium-mysql-source-config.yaml”文件包含所有配置,主要参数列在“configs”节点下。.yaml 文件包含“task.class”参数。配置文件还包括 MySQL 相关参数(如服务器、端口、用户、密码)以及用于“历史”和“偏移”存储的 Pulsar 主题的两个名称。

$ bin/pulsar-admin source localrun --sourceConfigFile debezium-mysql-source-config.yaml
“debezium-mysql-source-config.yaml”文件中的内容如下。

tenant: “test”
namespace: “test-namespace”
name: “debezium-kafka-source”
topicName: “kafka-connect-topic”
archive: “connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar”

parallelism: 1

configs:

sourceTask

task.class: “io.debezium.connector.mysql.MySqlConnectorTask”

config for mysql, docker image: debezium/example-mysql:0.8

database.hostname: “localhost”
database.port: “3306”
database.user: “debezium”
database.password: “dbz”
database.server.id: “184054”
database.server.name: “dbserver1”
database.whitelist: “inventory”

database.history: “org.apache.pulsar.io.debezium.PulsarDatabaseHistory”
database.history.pulsar.topic: “history-topic”
database.history.pulsar.service.url: “pulsar://127.0.0.1:6650”

KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG

key.converter: “org.apache.kafka.connect.json.JsonConverter”
value.converter: “org.apache.kafka.connect.json.JsonConverter”

PULSAR_SERVICE_URL_CONFIG

pulsar.service.url: “pulsar://127.0.0.1:6650”

OFFSET_STORAGE_TOPIC_CONFIG

offset.storage.topic: “offset-topic”
表是在前面提到的 MySQL 服务器中自动创建的。因此 Debezium 连接器从头开始从 MySQL binlog 文件中读取历史记录。在输出中,您会发现连接器已在 47 条记录中被触发和处理。
图片来自于官网原文
在这里插入图片描述

连接器启动过程记录
有关如何管理连接器的更多信息,请参阅Pulsar IO 文档。

Debezium 捕获和读取的记录会自动发布到 Pulsar 主题。当您启动新终端时,您将使用以下命令在 Pulsar 中找到当前主题:

$ bin/pulsar-admin topics list public/default
图片来自于官网原文
在这里插入图片描述

对于每个已更改的表,更改数据存储在单独的 Pulsar 主题中。除了数据库表相关主题外,另外两个名为“history-topic”和“offset-topic”的主题用于存储历史和偏移量相关数据。

persistent://public/default/history-topic
persistent://public/default/offset-topic
第四步:订阅Pulsar主题来监控MySQL变化
以persistent://public/default/dbserver1.inventory.products题目为例。使用 CLI 命令来使用该主题并在“产品”表更改时监视更改。

$ bin/pulsar-client consume -s “sub-products” public/default/dbserver1.inventory.products -n 0
输出如下:


22:17:41.201 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]
22:17:41.223 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 – consumer: 0
您还可以使用偏移量主题来监控偏移量更改,同时表更改存储在persistent://public/default/dbserver1.inventory.productsPulsar 主题中。

$ bin/pulsar-client consume -s “sub-offset” offset-topic -n 0
第 5 步:在 MySQL 服务器中进行更改,并验证更改是否立即记录在 Pulsar 主题中
启动 MySQL CLI docker 连接器,您可以更改 MySQL 服务器中的“产品”表。

d o c k e r r u n − i t − − r m − − n a m e m y s q l t e r m − − l i n k m y s q l − − r m m y s q l : 5.7 s h − c ′ e x e c m y s q l − h " docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h" dockerrunitrmnamemysqltermlinkmysqlrmmysql:5.7shcexecmysqlh"MYSQL_PORT_3306_TCP_ADDR" -P" M Y S Q L P O R T 3 30 6 T C P P O R T " − u r o o t − p " MYSQL_PORT_3306_TCP_PORT" -uroot -p" MYSQLPORT3306TCPPORT"urootp"MYSQL_ENV_MYSQL_ROOT_PASSWORD"’
运行该命令后,将显示MySQL CLI,您可以更改“products”表中两项的名称。

mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM products ;
mysql> UPDATE products SET name=‘1111111111’ WHERE id=101;
mysql> UPDATE products SET name=‘1111111111’ WHERE id=107;

图片来自于官网原文
在这里插入图片描述

在您消费产品主题的终端中,您发现添加了两个更改。
图片来自于官网原文
在这里插入图片描述

表 topic 存储 mysql 更新
在消费偏移量主题的终端中,您发现添加了两个偏移量。
图片来自于官网原文
在这里插入图片描述

偏移主题得到更新
在本地运行连接器的终端中,您会发现另外两条记录已被处理。
图片来自于官网原文
在这里插入图片描述

表主题获取更多记录
第6步:清理。
使用“Ctrl + C”关闭终端。使用“docker ps”和“docker Kill”停止MySQL相关容器。

mysql> quit

$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
84d66c2f591d debezium/example-mysql:0.8 “docker-entrypoint.s…” About an hour ago Up About an hour 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

$ docker kill 84d66c2f591d
要删除 Pulsar 数据,请删除 Pulsar 二进制目录中的 data 目录。

$ pwd
/Users/jia/ws/releases/apache-pulsar-2.3.0

$ rm -rf data
结论
Pulsar IO 框架允许运行 Debezium 连接器来捕获变更数据,将不同数据库中的数据变更流式传输到 Apache Pulsar。在本教程中,您学习了如何捕获 MySQL 数据库中的数据更改并将其传播到 Pulsar。我们正在不断改进对使用 Apache Pulsar 运行 Debezium 连接器的支持,在 Pulsar 2.4.0 发布后将更容易使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/607383.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络——实验七

使用socket实现一个基于C/S架构的通信程序 (1)客户端发送给服务器请求,发送表征身份的用户名和密码("admin","123456"); (2)服务器根据客户端发来的信息验证身份,如果验证…

1. 两数之和(Java)

题目描述: 给定一个整数数组 nums 和一个整数目标值 target,请你在该数组中找出 和为目标值 target 的那 两个 整数,并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是,数组中同一个元素在答案里不能重复出现。 你…

linux 使用多版本 go goenv.sh

创建goenv sh 文件 在 /usr/local/bin/ 下面创建一个goenv.sh 文件 内容如下 #!/bin/bash version$1 if [[ ${version} "" ]]; then version"1.6" fi GOROOTTMP/usr/local/lib/go${version} if [[ ! -d ${GOROOTTMP} ]]; then echo "go ${versi…

笔试案例2

文章目录 1、笔试案例22、思维导图 1、笔试案例2 09)查询学过「张三」老师授课的同学的信息 selects.*,c.cname,t.tname,sc.score from t_mysql_teacher t, t_mysql_course c, t_mysql_student s, t_mysql_score sc where t.tidc.cid and c.cidsc.cid and sc.sids…

简洁大气带进度条的URL跳转页面HTML源码

源码介绍 简洁大气带进度条的URL跳转页面HTML源码,记事本修改里面的内容即可,喜欢的同学可以拿去使用 获取方式: 蓝奏云:https://wfr.lanzout.com/ic1iZ1kj6yde CSDN免积分下载:https://download.csdn.net/download/huayula/88…

Java桶排序、基数排序、剪枝算法

桶排序算法 桶排序的基本思想是: 把数组 arr 划分为 n 个大小相同子区间(桶),每个子区间各自排序,最后合并 。计数排序是桶排序的一种特殊情况,可以把计数排序当成每个桶里只有一个元素的情况。 1.找出待…

答疑解惑:核技术利用辐射安全与防护考核

前言 最近通过了《核技术利用辐射安全与防护考核》,顺利拿到了合格证。这是从事与辐射相关行业所需要的一个基本证书,考试并不难,在此写篇博客记录一下主要的知识点。 需要这个证书的行业常见的有医疗方面的,如放疗,…

黑马苍穹外卖学习Day3

目录 公共字段自动填充问题分析实现思路代码实现 新增菜品需求分析和设计接口设计代码开发开发文件上传接口功能开发 菜品分页查询需求分析和设计代码开发 菜品删除功能需求分析与设计代码实现代码优化 修改菜品需求分析和设计代码实现 公共字段自动填充 问题分析 员工表和分…

el-button点击后不恢复原样

1、单纯的span点击不恢复原样 let target event.target;if(target.nodeName "SPAN"){target event.target.parentNode;}target.blur();将此句加入到函数中即可,不用管语句中出现的红色下划线 2、假如是点击其他标签,譬如带有图标的什么 将…

静态网页设计——崩坏3(HTML+CSS+JavaScript)

前言 声明:该文章只是做技术分享,若侵权请联系我删除。!! 感谢大佬的视频: 使用技术:HTMLCSSJS(静态网页设计) 主要内容:对游戏崩坏3进行简单介绍。 https://www.bilib…

Codeforces Round 761 (Div. 2) D2. Too Many Impostors (hard version)(交互+构造 最小次数)

题目 n(6<n<1e4&#xff0c;n是3的倍数)个人&#xff0c;其中k个人是好人&#xff0c;n-k个人是坏人 k是未知的&#xff0c;但保证1/3n<k<2/3n&#xff0c;你可以询问若干次&#xff0c; 每次你可以选择三个不同的人a,b,c&#xff0c;系统告诉你这三个人中好人更…

【Linux进程】查看进程fork创建进程

目录 前言 1. 查看进程 2. 通过系统调用创建进程-fork初识 总结 前言 你有没有想过在使用Linux操作系统时&#xff0c;后台运行的程序是如何管理的&#xff1f;在Linux中&#xff0c;进程是一个非常重要的概念。本文将介绍如何查看当前运行的进程&#xff0c;并且讨论如何使用…

将项目同时提交到GitHub和码云Gitee上面,GitHub与Gitee同步

多个远程仓库同时使用 新建GitHub仓库 创建成功 在终端中创建仓库 如果你想在本地机器上创建Git仓库&#xff0c;或者想添加一个文件夹或文件到已经存在的Git仓库中&#xff0c;你应该在终端中创建你的Git仓库。在你可以通过终端来创建一个Git仓库。以下是在终端中创建Git仓…

java解析json复杂数据的第三种思路

文章目录 一、概述二、数据预览1. 接口json数据2. json转xml数据 三、代码实现1. pom.xml2. 核心代码3. 运行结果 四、源码传送 一、概述 接上篇 java解析json复杂数据的两种思路 我们已经通过解析返回json字符串得到数据,现在改变思路, 按照如下流程获取数据: #mermaid-svg-k…

AcrelCloud-3000环保用电监管云平台解决方案——安科瑞赵嘉敏

概述 国家全面推进打赢蓝天保卫战&#xff0c;打好碧水保卫战&#xff0c;打胜净土保卫战&#xff0c;加快生态环境保护、建设美丽中国。环保用电监管系统针对企业内的环保设施、设备运行状况进行检测&#xff0c;发挥环保设备的作用&#xff0c;提高监察效率&#xff0c;并为…

洛谷 P1217 [USACO1.5] 回文质数 Prime Palindromes 刷题笔记

P1217 [USACO1.5] 回文质数 Prime Palindromes - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路 直接枚举 减枝优化判断 优化1 只有偶数才会是质数 优化2 回文数的判断次数要优于检查素数 先判断是否为回文数再检查是否为质数 if( hw(i)&&isprime(i)) 这里…

工业基础类IFC—开源库汇总

ifc 基础解析库 支持语言 和授权协议&#xff01; namedescriplicenseAdapyAdapy是一个用于结构分析和设计的python库&#xff0c;致力于IFC与各种有限元格式的交互。同时&#xff0c;它还提供一套CAD模型和FEM网格模型的参数化建模框架。GPL-3.0bcfpluginbcfplugin 是一个用…

Springcloud Gateway网关

简介 SpringCloud Gateway是基于WebFlux框架实现的&#xff0c;而WebFlux框架底层则使用了高性能的Reactor模式通信框架Netty。 Spring Cloud Gateway的目标提供统一的路由方式且基于 Filter链的方式提供了网关基本的功能&#xff0c;例如:安全&#xff0c;监控/指标&#xf…

Python的工作日判断库(含调休)——chinese_calendar

chinese_calendar 库使用场景 chinese_calendar 库主要用于处理中国农历和节假日相关的日期计算和判断。以下是一些常见的使用场景&#xff1a; 判断日期是否为工作日&#xff1a;chinese_calendar 提供了 is_workday() 函数&#xff0c;可以判断指定日期是否为工作日&#xf…

lv14 IO模型:阻塞和非阻塞 7

1 五种IO模型------读写外设数据的方式 阻塞: 不能操作就睡觉 非阻塞&#xff1a;不能操作就返回错误(通过轮询即才能实现阻塞的情况 &#xff09; 多路复用&#xff1a;委托中介监控 信号驱动&#xff1a;让内核如果能操作时发信号&#xff0c;在信号处理函数中操作 异步IO&a…