原文地址: https://debezium.io/blog/2019/05/23/tutorial-using-debezium-connectors-with-apache-pulsar/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
将 Debezium 连接器与 Apache Pulsar 结合使用的教程
2019 年 5 月 23 日 作者: Jia Zhai,StreamNative
讨论 实例
这是 Apache Pulsar PMC 成员兼提交者 Jia Zhai 的客座文章。
Debezium是一个用于变更数据捕获 (CDC) 的开源项目。它基于Apache Kafka Connect构建,支持多种数据库,例如 MySQL、MongoDB、PostgreSQL、Oracle 和 SQL Server。Apache Pulsar包含一组基于 Pulsar IO 框架的内置连接器,它与 Apache Kafka Connect 相对应。
从版本 2.3.0 开始,Pulsar IO 提供了对Debezium 源连接器的开箱即用支持,因此您可以利用 Debezium 将数据库中的更改流式传输到 Apache Pulsar。本教程将引导您通过 Pulsar IO 设置 MySQL 的 Debezium 连接器。
教程步骤
本教程与Debezium 教程类似,只是事件流的存储从 Kafka 更改为 Pulsar。主要包括六个步骤:
启动MySQL服务器;
启动独立的 Pulsar 服务;
在 Pulsar IO 中启动 Debezium 连接器。Pulsar IO 读取 MySQL 服务器中存在的数据库更改;
订阅 Pulsar 主题,监控 MySQL 变化;
在 MySQL 服务器中进行更改,并验证更改是否立即记录在 Pulsar 主题中;
清理。
第1步:启动MySQL服务器
启动包含数据库示例的 MySQL 服务器,Debezium 从中捕获更改。打开一个新终端来启动一个新容器,该容器运行预先配置了名为 inventory 的数据库的 MySQL 数据库服务器:
docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
显示以下信息:
2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: ‘5.7.25-log’ socket: ‘/var/run/mysqld/mysqld.sock’ port: 3306 MySQL Community Server (GPL)
第 2 步:启动独立的 Pulsar 服务
以独立模式在本地启动 Pulsar 服务。Pulsar 2.3.0 中引入了对在 Pulsar IO 中运行 Debezium 连接器的支持。下载2.3.0 版本的 Pulsar 二进制文件和2.3.0 版本的 pulsar-io-kafka-connect-adaptor-2.3.0.nar。在 Pulsar 中,所有 Pulsar IO 连接器都打包为单独的NAR文件。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp …/pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
图片来自于官网原文
步骤 3:在 Pulsar IO 中启动 Debezium MySQL 连接器
在另一个终端选项卡中以本地运行模式启动 Pulsar IO 中的 Debezium MySQL 连接器。“debezium-mysql-source-config.yaml”文件包含所有配置,主要参数列在“configs”节点下。.yaml 文件包含“task.class”参数。配置文件还包括 MySQL 相关参数(如服务器、端口、用户、密码)以及用于“历史”和“偏移”存储的 Pulsar 主题的两个名称。
$ bin/pulsar-admin source localrun --sourceConfigFile debezium-mysql-source-config.yaml
“debezium-mysql-source-config.yaml”文件中的内容如下。
tenant: “test”
namespace: “test-namespace”
name: “debezium-kafka-source”
topicName: “kafka-connect-topic”
archive: “connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar”
parallelism: 1
configs:
sourceTask
task.class: “io.debezium.connector.mysql.MySqlConnectorTask”
config for mysql, docker image: debezium/example-mysql:0.8
database.hostname: “localhost”
database.port: “3306”
database.user: “debezium”
database.password: “dbz”
database.server.id: “184054”
database.server.name: “dbserver1”
database.whitelist: “inventory”
database.history: “org.apache.pulsar.io.debezium.PulsarDatabaseHistory”
database.history.pulsar.topic: “history-topic”
database.history.pulsar.service.url: “pulsar://127.0.0.1:6650”
KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
key.converter: “org.apache.kafka.connect.json.JsonConverter”
value.converter: “org.apache.kafka.connect.json.JsonConverter”
PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: “pulsar://127.0.0.1:6650”
OFFSET_STORAGE_TOPIC_CONFIG
offset.storage.topic: “offset-topic”
表是在前面提到的 MySQL 服务器中自动创建的。因此 Debezium 连接器从头开始从 MySQL binlog 文件中读取历史记录。在输出中,您会发现连接器已在 47 条记录中被触发和处理。
图片来自于官网原文
连接器启动过程记录
有关如何管理连接器的更多信息,请参阅Pulsar IO 文档。
Debezium 捕获和读取的记录会自动发布到 Pulsar 主题。当您启动新终端时,您将使用以下命令在 Pulsar 中找到当前主题:
$ bin/pulsar-admin topics list public/default
图片来自于官网原文
对于每个已更改的表,更改数据存储在单独的 Pulsar 主题中。除了数据库表相关主题外,另外两个名为“history-topic”和“offset-topic”的主题用于存储历史和偏移量相关数据。
persistent://public/default/history-topic
persistent://public/default/offset-topic
第四步:订阅Pulsar主题来监控MySQL变化
以persistent://public/default/dbserver1.inventory.products题目为例。使用 CLI 命令来使用该主题并在“产品”表更改时监视更改。
$ bin/pulsar-client consume -s “sub-products” public/default/dbserver1.inventory.products -n 0
输出如下:
…
22:17:41.201 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]
22:17:41.223 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 – consumer: 0
您还可以使用偏移量主题来监控偏移量更改,同时表更改存储在persistent://public/default/dbserver1.inventory.productsPulsar 主题中。
$ bin/pulsar-client consume -s “sub-offset” offset-topic -n 0
第 5 步:在 MySQL 服务器中进行更改,并验证更改是否立即记录在 Pulsar 主题中
启动 MySQL CLI docker 连接器,您可以更改 MySQL 服务器中的“产品”表。
d o c k e r r u n − i t − − r m − − n a m e m y s q l t e r m − − l i n k m y s q l − − r m m y s q l : 5.7 s h − c ′ e x e c m y s q l − h " docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h" dockerrun−it−−rm−−namemysqlterm−−linkmysql−−rmmysql:5.7sh−c′execmysql−h"MYSQL_PORT_3306_TCP_ADDR" -P" M Y S Q L P O R T 3 30 6 T C P P O R T " − u r o o t − p " MYSQL_PORT_3306_TCP_PORT" -uroot -p" MYSQLPORT3306TCPPORT"−uroot−p"MYSQL_ENV_MYSQL_ROOT_PASSWORD"’
运行该命令后,将显示MySQL CLI,您可以更改“products”表中两项的名称。
mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM products ;
mysql> UPDATE products SET name=‘1111111111’ WHERE id=101;
mysql> UPDATE products SET name=‘1111111111’ WHERE id=107;
图片来自于官网原文
在您消费产品主题的终端中,您发现添加了两个更改。
图片来自于官网原文
表 topic 存储 mysql 更新
在消费偏移量主题的终端中,您发现添加了两个偏移量。
图片来自于官网原文
偏移主题得到更新
在本地运行连接器的终端中,您会发现另外两条记录已被处理。
图片来自于官网原文
表主题获取更多记录
第6步:清理。
使用“Ctrl + C”关闭终端。使用“docker ps”和“docker Kill”停止MySQL相关容器。
mysql> quit
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
84d66c2f591d debezium/example-mysql:0.8 “docker-entrypoint.s…” About an hour ago Up About an hour 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
$ docker kill 84d66c2f591d
要删除 Pulsar 数据,请删除 Pulsar 二进制目录中的 data 目录。
$ pwd
/Users/jia/ws/releases/apache-pulsar-2.3.0
$ rm -rf data
结论
Pulsar IO 框架允许运行 Debezium 连接器来捕获变更数据,将不同数据库中的数据变更流式传输到 Apache Pulsar。在本教程中,您学习了如何捕获 MySQL 数据库中的数据更改并将其传播到 Pulsar。我们正在不断改进对使用 Apache Pulsar 运行 Debezium 连接器的支持,在 Pulsar 2.4.0 发布后将更容易使用。