原文地址: https://debezium.io/blog/2023/10/05/Debezium-JMX-signaling-and-notifications/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
Debezium signaling and notifications - Part 3: JMX channel
October 5, 2023 by Fiore Mario Vitale
debezium features notifications signaling integration
欢迎收看我们关于德贝兹信号和通知系列的第三部分。在这篇文章中,我们将继续我们对德贝兹信号和通知的探索。特别是,我们将研究如何使用JMDA通道启用和管理这些特性。
我们还将探索如何利用jlokia通过其他API发送信号和获得通知。
通过jmx与德贝兹的相互作用
jmx代表Java管理扩展,一种用于管理和监控Java应用程序的Java技术。它提供了一种标准化的方法来监控应用程序的性能,配置设置,并与使用各种管理工具和客户端运行的Java应用程序进行交互。对于复杂的、分布式的和企业级的Java应用程序的管理和监控,jmx特别有用。
可通过JMDA通道发出信号
德贝齐斯中的信号是关于在正常执行期间执行操作的触发动作。正如在前几篇文章中所讨论的,Debezum提供了不同的非常规信号通道。在这篇文章中,我们将重点讨论JMDA通道。
若要开始使用jmx信号通道,我们需要:
在卡夫卡连接服务上启用JDB2服务器
加起来jmx 到signal.enabled.channels 连接器配置属性
使用一个jmx客户端连接到jmx服务器发送信号。
德贝兹暴露了名为MBean的信号debezium.:type=management,context=signals,server= .这豆露出来了signal 接受三个参数的业务:
信号的标识。
信号的类型,例如,执行弹弓。
JSON数据字段,包含关于指定信号类型的附加信息。
通过JMDA通道启用通知
通知是告诉你在德贝兹铵中会发生什么的关键。通过JMDA通道访问通知允许您轻松地监视Debezns,例如,增量快照的进程。
要开始使用JDB2通知通道,我们需要:
在卡夫卡连接服务上启用JDB2服务器
加起来jmx 到notification.enabled.channels 连接器配置属性
使用一个JDB2客户端连接到JDB2服务器来访问通知。
通知书上的名称debezium.:type=management,context=notifications,server= .这个豆提供了一个Notification 包含一个jmx列表的BeanCompositeData 具有下列属性的类型:
财产 描述
身份证
分配给通知的唯一标识符。关于增量快照通知,id 是一样的execute-snapshot 信号。
总数_类型
与通知相关的聚合根的数据类型.在域驱动设计中,导出事件总是指聚合。
类型
提供在aggregate_type 场地。
附加_数据
地图<字符串,并附有通知的详细信息。
让我们花点时间,看看如何发送一个增量快照,并通过jmx通道接收关于其进展的通知。
通过jmx通道发送增量快照信号
对于这个例子,我们将使用带有后GERGSQL数据库的DEBeZMR文档图像。
我们可以使用下面的码头组合文件启动所有需要的服务
version: ‘2’
services:
zookeeper:
container_name: zookeeper
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name: kafka
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
container_name: postgres
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
container_name: connect
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true
这将暴露用于连接到jmx服务器的端口9012
启用jmx并指定将用于Jmx的端口号。该值用于指定JVM参数 -Dcom.sun.management.jmxremote.port= J M X P O R T . 这 个 地 址 或 可 解 析 的 主 机 名 的 码 头 主 机 , 使 用 它 来 构 造 一 个 发 送 到 j m x 客 户 端 的 U R L 。 局 部 宿 主 值 或 127.0.0.1 将 不 起 作 用 。 通 常 可 使 用 0.0.0.0 。 该 值 用 于 指 定 J V M 参 数 − D j a v a . r m i . s e r v e r . h o s t n a m e = JMX_PORT . 这个地址或可解析的主机名的码头主机,使用它来构造一个发送到jmx客户端的URL。局部宿主值或127.0.0.1将不起作用。通常可使用0.0.0.0。该值用于指定JVM参数 -Djava.rmi.server.hostname= JMXPORT.这个地址或可解析的主机名的码头主机,使用它来构造一个发送到jmx客户端的URL。局部宿主值或127.0.0.1将不起作用。通常可使用0.0.0.0。该值用于指定JVM参数−Djava.rmi.server.hostname=JMXHOST
在保存文件后debezium.yaml ,所有服务均以:
docker compose -f debezium.yaml up -d
输出会像这样
[+] Running 5/5
✔ Network deploy_default Created 0.1s
✔ Container deploy-zookeeper-1 Started 0.1s
✔ Container deploy-postgres-1 Started 0.1s
✔ Container deploy-kafka-1 Started 0.1s
✔ Container deploy-connect-1 Started
现在我们可以检查所有的服务是否都在运行
docker ps
输出应该与此相似
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f1d49fb79dba quay.io/debezium/connect:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp deploy-connect-1
e164b2651fbf quay.io/debezium/kafka:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:9092->9092/tcp deploy-kafka-1
e61116f22f9d quay.io/debezium/example-postgres:2.4 “docker-entrypoint.s…” 4 seconds ago Up 2 seconds 0.0.0.0:5432->5432/tcp deploy-postgres-1
ccb502882928 quay.io/debezium/zookeeper:2.4 “/docker-entrypoint.…” 4 seconds ago Up 2 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp deploy-zookeeper-1
此时所有服务都已启动并运行,因此我们可以通过以下配置注册连接器
{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.server.id”: “184054”,
“database.dbname”: “postgres”,
“topic.prefix”: “dbserver1”,
“snapshot.mode”: “NEVER”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.inventory”,
“signal.enabled.channels”: “source,jmx”,
“signal.data.collection”: “inventory.debezium_signal”,
“notification.enabled.channels”: “jmx”
}
}
此配置可使 来源 和 Jmx 通道。即使我们只希望使用JDB2来发送信号来执行增量快照, 来源 仍然需要发送信号,因为Debezns需要使用信号表来水印db日志以进行事件复制。
把用来发信号的表
现在,别担心 notification.enabled.channels 财产。我们稍后会深入研究的
把这个配置保存到一个文件中 后记-jmx.json ,我们可以登记。
注册连接器我们可以使用curl 调用卡夫卡连接API
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{“name”:“inventory-connector”,“config”:{“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“tasks.max”:“1”,“database.hostname”:“postgres”,“database.port”:“5432”,“database.user”:“postgres”,“database.password”:“postgres”,“database.server.id”:“184054”,“database.dbname”:“postgres”,“topic.prefix”:“dbserver1”,“snapshot.mode”:“NEVER”,“schema.history.internal.kafka.bootstrap.servers”:“kafka:9092”,“schema.history.internal.kafka.topic”:“schema-changes.inventory”,“signal.enabled.channels”:“source,jmx”,“signal.data.collection”:“inventory.debezium_signal”,“notification.enabled.channels”:“log,sink,jmx”,“notification.sink.topic.name”:“io.debezium.notification”}}’
或者我建议用 Kcctl 工具与卡夫卡互动连接。它是卡夫卡连接的现代直观的命令行客户端。
首先,我们需要创建一个配置上下文来连接卡夫卡连接
kcctl config set-context local --cluster http://localhost:8083
然后我们可以注册连接器运行以下命令
kcctl apply -f postgres-jmx.json
我们现在可以得到连接容器的日志
docker logs connect
检查连接器是否启动了流事件
INFO Postgres|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
为增量快照准备数据库
因为增量快照需要signal.data.collection 要定义此定义,我们需要在您的服务站数据库中创建信号表。
在使用与GTDS和read.only 准备好了。
要创建信号表,我们需要连接到我们的ESTGres实例。我们可以利用psql 客户在邮政集装箱内。
docker exec -it postgres bash
一旦进入容器,我们就可以连接到
psql -h localhost -d postgres -U postgres
密码是 波斯特格雷斯
我们就可以检查里面有没有桌子 存货 图解
\dt inventory.*
命令应该返回类似的东西
List of relations
Schema | Name | Type | Owner
-----------±-----------------±------±---------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
我们需要用以下命令创建信号表:
CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
发送增量快照信号
我们必须连接到jmx服务器,才能通过jmx通道发送信号。我们使用 Jmxdg 客户,所以下载后,我们可以运行它
java -jar jmxterm-1.0.4-uber.jar
open localhost:9012
beans -d debezium.postgres
run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {“data-collections”:[“inventory.orders”],“type”:“INCREMENTAL”}
帮我查一下客户
打开一个连接到JDB2服务器
搜寻豆下 德贝齐姆。 领域
执行 发信号 执行递增快照的操作 存货. 表
核对数据
在那之后,我们要检查所有来自 命令 表在相应的卡夫卡主题中得到了正确的捕捉。
我们可以通过以下命令输入卡夫卡容器:
docker exec -it kafka bash
一旦进入容器,我们就可以在 dbserver1.inventory.orders 主题:以下命令
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning
输出应该是这样的
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10001,
“order_date”: 16816,
“purchaser”: 1001,
“quantity”: 1,
“product_id”: 102
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605203,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10002,
“order_date”: 16817,
“purchaser”: 1002,
“quantity”: 2,
“product_id”: 105
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10003,
“order_date”: 16850,
“purchaser”: 1002,
“quantity”: 2,
“product_id”: 106
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10004,
“order_date”: 16852,
“purchaser”: 1003,
“quantity”: 1,
“product_id”: 107
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
就这样!我们已经使用jmx通道发送了一个增量快照信号。
通过jmx通道监控增量快照进展
由于我们已经执行了一个增量快照,现在我们可以通过JMDA通道读取Debezns生成的通知。
我们使用下列配置注册连接器
{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.server.id”: “184054”,
“database.dbname”: “postgres”,
“topic.prefix”: “dbserver1”,
“snapshot.mode”: “NEVER”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.inventory”,
“signal.enabled.channels”: “source,jmx”,
“signal.data.collection”: “inventory.debezium_signal”,
“notification.enabled.channels”: “jmx”
}
}
这种配置使 Jmx 通知频道。
要访问该通知,我们需要再次连接到JDB2服务器。就像我们为信号所做的那样,我们将使用jmxterm
java -jar jmxterm-1.0.4-uber.jar
open localhost:9012
beans -d debezium.postgres
get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications
帮我查一下客户
打开一个连接到JDB2服务器
搜寻豆下 德贝齐姆。 领域
得到通知。
你应该期待下面的输出
#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
Notifications = [ {
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Initial Snapshot;
id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
type = SKIPPED;
},
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = STARTED;
},
{
additionalData = {
( current_collection_in_progress ) = {
key = current_collection_in_progress;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( maximum_key ) = {
key = maximum_key;
value = 10004;
};
( last_processed_key ) = {
key = last_processed_key;
value = 10004;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = IN_PROGRESS;
},
{
additionalData = {
( scanned_collection ) = {
key = scanned_collection;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( total_rows_scanned ) = {
key = total_rows_scanned;
value = 4;
};
( status ) = {
key = status;
value = SUCCEEDED;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = TABLE_SCAN_COMPLETED;
},
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = COMPLETED;
}
];
这是一个来自初始快照的通知,带有状态SKIPPED 因为我们的连接器配置了"snapshot.mode": “NEVER”
这是关于增量快照启动的通知
这个通知告诉我们inventory.orders 快照正在进行中,提供了关于最后处理和最大键的有用信息。在这个例子中,我们只有一个in progress 通知,但取决于你的桌子尺寸snapshot.fetch.size ,你可以得到更多。
此通知告知特定表的快照已经完成,并提供了所处理的全部行的信息。
对于这个示例,这是我们有的最后一个通知,它告诉我们整个增量快照进展已经完成。
jmx还提供了生成自己的通知的可能性。德贝唑也会产生这些通知。您可以订阅这些通知,因此您可以在不投票的情况下立即接收这些通知。 通知书 豆。
利用亚洛基亚
JLOLIA是一个功能强大的工具,可以让您与JDB2服务器进行交互,并通过REST来公开它。使用它,我们可以通过REST与Debezns进行交互,利用信号和通知的jmx豆。这样,您可以无缝地发送信号和接收通知,并使用更熟悉的RESTAPI。
要启用Joloya,我们需要启用它在我们卡夫卡连接容器上的代理。
这是我们示例中使用的码头组合文件
version: ‘2’
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true
会暴露出恐怖分子使用的港口
这将使在我们的测试图像中已经装运的Joloia代理。如果您想在您的安装上启用代理,请检查 正式文件
通过约洛基亚发出信号
若要通过JOLOKIA发送信号,我们可以向具有所需信号和参数的JOLOKIA端点发送一个HTTP邮件请求。
要继续使用增量快照示例,要触发它,您可以运行以下命令
curl -X POST ‘http://localhost:8778/jolokia/exec’ -d ‘{“type”:“EXEC”,“mbean”:“debezium.postgres:context=signals,server=dbserver1,type=management”,“operation”:“signal”,“arguments”:[“12345”,“execute-snapshot”,"{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}"]}’ | jq
它应该是
{
“request”: {
“mbean”: “debezium.postgres:context=signals,server=dbserver1,type=management”,
“arguments”: [
“12345”,
“execute-snapshot”,
“{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}”
],
“type”: “exec”,
“operation”: “signal”
},
“value”: null,
“timestamp”: 1695651387,
“status”: 200
}
接收通知书
还允许您使用httpGET请求从Debezum获取通知。
curl -X GET ‘http://localhost:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications’ | jq
它应该是
{
“request”: {
“mbean”: “debezium.postgres:context=notifications,server=dbserver1,type=management”,
“attribute”: “Notifications”,
“type”: “read”
},
“value”: [
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “b20bec8d-f21f-4d74-bb75-cdd7f4c7d933”,
“type”: “SKIPPED”,
“aggregateType”: “Initial Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “STARTED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“last_processed_key”: “10004”,
“current_collection_in_progress”: “inventory.orders”,
“connector_name”: “dbserver1”,
“maximum_key”: “10004”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “IN_PROGRESS”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“scanned_collection”: “inventory.orders”,
“connector_name”: “dbserver1”,
“total_rows_scanned”: “4”,
“status”: “SUCCEEDED”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “TABLE_SCAN_COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “12345”,
“type”: “COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “STARTED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“last_processed_key”: “109”,
“current_collection_in_progress”: “inventory.products”,
“connector_name”: “dbserver1”,
“maximum_key”: “109”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “IN_PROGRESS”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“scanned_collection”: “inventory.products”,
“connector_name”: “dbserver1”,
“total_rows_scanned”: “9”,
“status”: “SUCCEEDED”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “TABLE_SCAN_COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “12345”,
“type”: “COMPLETED”,
“aggregateType”: “Incremental Snapshot”
}
],
“timestamp”: 1695652278,
“status”: 200
}
你也看到了我们也收到了inventory.products 我们通过RESTAPI发送的表增量快照
结论
在我们系列的第三部分中,我们学习了如何启用和管理使用jmx和joloia的信令和通知。信号可以让你动态地控制Debezium的行为,而通知可以让你了解关键事件。通过利用这些功能和JOLOKIA,您可以有效地管理、监视和与您的数据流工作流交互,确保您始终控制Debezium。