Debezium发布历史165

原文地址: https://debezium.io/blog/2023/10/05/Debezium-JMX-signaling-and-notifications/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

Debezium signaling and notifications - Part 3: JMX channel
October 5, 2023 by Fiore Mario Vitale
debezium features notifications signaling integration

欢迎收看我们关于德贝兹信号和通知系列的第三部分。在这篇文章中,我们将继续我们对德贝兹信号和通知的探索。特别是,我们将研究如何使用JMDA通道启用和管理这些特性。

我们还将探索如何利用jlokia通过其他API发送信号和获得通知。

通过jmx与德贝兹的相互作用
jmx代表Java管理扩展,一种用于管理和监控Java应用程序的Java技术。它提供了一种标准化的方法来监控应用程序的性能,配置设置,并与使用各种管理工具和客户端运行的Java应用程序进行交互。对于复杂的、分布式的和企业级的Java应用程序的管理和监控,jmx特别有用。

可通过JMDA通道发出信号
德贝齐斯中的信号是关于在正常执行期间执行操作的触发动作。正如在前几篇文章中所讨论的,Debezum提供了不同的非常规信号通道。在这篇文章中,我们将重点讨论JMDA通道。

若要开始使用jmx信号通道,我们需要:

在卡夫卡连接服务上启用JDB2服务器

加起来jmx 到signal.enabled.channels 连接器配置属性

使用一个jmx客户端连接到jmx服务器发送信号。

德贝兹暴露了名为MBean的信号debezium.:type=management,context=signals,server= .这豆露出来了signal 接受三个参数的业务:

信号的标识。

信号的类型,例如,执行弹弓。

JSON数据字段,包含关于指定信号类型的附加信息。

通过JMDA通道启用通知
通知是告诉你在德贝兹铵中会发生什么的关键。通过JMDA通道访问通知允许您轻松地监视Debezns,例如,增量快照的进程。

要开始使用JDB2通知通道,我们需要:

在卡夫卡连接服务上启用JDB2服务器

加起来jmx 到notification.enabled.channels 连接器配置属性

使用一个JDB2客户端连接到JDB2服务器来访问通知。

通知书上的名称debezium.:type=management,context=notifications,server= .这个豆提供了一个Notification 包含一个jmx列表的BeanCompositeData 具有下列属性的类型:

财产 描述
身份证

分配给通知的唯一标识符。关于增量快照通知,id 是一样的execute-snapshot 信号。

总数_类型

与通知相关的聚合根的数据类型.在域驱动设计中,导出事件总是指聚合。

类型

提供在aggregate_type 场地。

附加_数据

地图<字符串,并附有通知的详细信息。

让我们花点时间,看看如何发送一个增量快照,并通过jmx通道接收关于其进展的通知。

通过jmx通道发送增量快照信号
对于这个例子,我们将使用带有后GERGSQL数据库的DEBeZMR文档图像。

我们可以使用下面的码头组合文件启动所有需要的服务

version: ‘2’
services:
zookeeper:
container_name: zookeeper
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name: kafka
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
container_name: postgres
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
container_name: connect
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true
这将暴露用于连接到jmx服务器的端口9012
启用jmx并指定将用于Jmx的端口号。该值用于指定JVM参数 -Dcom.sun.management.jmxremote.port= J M X P O R T . 这 个 地 址 或 可 解 析 的 主 机 名 的 码 头 主 机 , 使 用 它 来 构 造 一 个 发 送 到 j m x 客 户 端 的 U R L 。 局 部 宿 主 值 或 127.0.0.1 将 不 起 作 用 。 通 常 可 使 用 0.0.0.0 。 该 值 用 于 指 定 J V M 参 数 − D j a v a . r m i . s e r v e r . h o s t n a m e = JMX_PORT . 这个地址或可解析的主机名的码头主机,使用它来构造一个发送到jmx客户端的URL。局部宿主值或127.0.0.1将不起作用。通常可使用0.0.0.0。该值用于指定JVM参数 -Djava.rmi.server.hostname= JMXPORT.,使jmxURL宿127.0.0.1使0.0.0.0JVMDjava.rmi.server.hostname=JMXHOST
在保存文件后debezium.yaml ,所有服务均以:

docker compose -f debezium.yaml up -d
输出会像这样

[+] Running 5/5
✔ Network deploy_default Created 0.1s
✔ Container deploy-zookeeper-1 Started 0.1s
✔ Container deploy-postgres-1 Started 0.1s
✔ Container deploy-kafka-1 Started 0.1s
✔ Container deploy-connect-1 Started
现在我们可以检查所有的服务是否都在运行

docker ps
输出应该与此相似

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f1d49fb79dba quay.io/debezium/connect:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp deploy-connect-1
e164b2651fbf quay.io/debezium/kafka:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:9092->9092/tcp deploy-kafka-1
e61116f22f9d quay.io/debezium/example-postgres:2.4 “docker-entrypoint.s…” 4 seconds ago Up 2 seconds 0.0.0.0:5432->5432/tcp deploy-postgres-1
ccb502882928 quay.io/debezium/zookeeper:2.4 “/docker-entrypoint.…” 4 seconds ago Up 2 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp deploy-zookeeper-1
此时所有服务都已启动并运行,因此我们可以通过以下配置注册连接器

{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.server.id”: “184054”,
“database.dbname”: “postgres”,
“topic.prefix”: “dbserver1”,
“snapshot.mode”: “NEVER”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.inventory”,
“signal.enabled.channels”: “source,jmx”,
“signal.data.collection”: “inventory.debezium_signal”,
“notification.enabled.channels”: “jmx”
}
}
此配置可使 来源 和 Jmx 通道。即使我们只希望使用JDB2来发送信号来执行增量快照, 来源 仍然需要发送信号,因为Debezns需要使用信号表来水印db日志以进行事件复制。
把用来发信号的表
现在,别担心 notification.enabled.channels 财产。我们稍后会深入研究的
把这个配置保存到一个文件中 后记-jmx.json ,我们可以登记。

注册连接器我们可以使用curl 调用卡夫卡连接API

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{“name”:“inventory-connector”,“config”:{“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“tasks.max”:“1”,“database.hostname”:“postgres”,“database.port”:“5432”,“database.user”:“postgres”,“database.password”:“postgres”,“database.server.id”:“184054”,“database.dbname”:“postgres”,“topic.prefix”:“dbserver1”,“snapshot.mode”:“NEVER”,“schema.history.internal.kafka.bootstrap.servers”:“kafka:9092”,“schema.history.internal.kafka.topic”:“schema-changes.inventory”,“signal.enabled.channels”:“source,jmx”,“signal.data.collection”:“inventory.debezium_signal”,“notification.enabled.channels”:“log,sink,jmx”,“notification.sink.topic.name”:“io.debezium.notification”}}’
或者我建议用 Kcctl 工具与卡夫卡互动连接。它是卡夫卡连接的现代直观的命令行客户端。

首先,我们需要创建一个配置上下文来连接卡夫卡连接

kcctl config set-context local --cluster http://localhost:8083
然后我们可以注册连接器运行以下命令

kcctl apply -f postgres-jmx.json
我们现在可以得到连接容器的日志

docker logs connect
检查连接器是否启动了流事件

INFO Postgres|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
为增量快照准备数据库
因为增量快照需要signal.data.collection 要定义此定义,我们需要在您的服务站数据库中创建信号表。

在使用与GTDS和read.only 准备好了。
要创建信号表,我们需要连接到我们的ESTGres实例。我们可以利用psql 客户在邮政集装箱内。

docker exec -it postgres bash
一旦进入容器,我们就可以连接到

psql -h localhost -d postgres -U postgres
密码是 波斯特格雷斯
我们就可以检查里面有没有桌子 存货 图解

\dt inventory.*
命令应该返回类似的东西

            List of relations

Schema | Name | Type | Owner
-----------±-----------------±------±---------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
我们需要用以下命令创建信号表:

CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
发送增量快照信号
我们必须连接到jmx服务器,才能通过jmx通道发送信号。我们使用 Jmxdg 客户,所以下载后,我们可以运行它

java -jar jmxterm-1.0.4-uber.jar

open localhost:9012

beans -d debezium.postgres

run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {“data-collections”:[“inventory.orders”],“type”:“INCREMENTAL”}
帮我查一下客户
打开一个连接到JDB2服务器
搜寻豆下 德贝齐姆。 领域
执行 发信号 执行递增快照的操作 存货. 表
核对数据
在那之后,我们要检查所有来自 命令 表在相应的卡夫卡主题中得到了正确的捕捉。

我们可以通过以下命令输入卡夫卡容器:

docker exec -it kafka bash
一旦进入容器,我们就可以在 dbserver1.inventory.orders 主题:以下命令

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning
输出应该是这样的

{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10001,
“order_date”: 16816,
“purchaser”: 1001,
“quantity”: 1,
“product_id”: 102
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605203,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10002,
“order_date”: 16817,
“purchaser”: 1002,
“quantity”: 2,
“product_id”: 105
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10003,
“order_date”: 16850,
“purchaser”: 1002,
“quantity”: 2,
“product_id”: 106
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10004,
“order_date”: 16852,
“purchaser”: 1003,
“quantity”: 1,
“product_id”: 107
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
就这样!我们已经使用jmx通道发送了一个增量快照信号。

通过jmx通道监控增量快照进展
由于我们已经执行了一个增量快照,现在我们可以通过JMDA通道读取Debezns生成的通知。

我们使用下列配置注册连接器

{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.server.id”: “184054”,
“database.dbname”: “postgres”,
“topic.prefix”: “dbserver1”,
“snapshot.mode”: “NEVER”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.inventory”,
“signal.enabled.channels”: “source,jmx”,
“signal.data.collection”: “inventory.debezium_signal”,
“notification.enabled.channels”: “jmx”
}
}
这种配置使 Jmx 通知频道。
要访问该通知,我们需要再次连接到JDB2服务器。就像我们为信号所做的那样,我们将使用jmxterm

java -jar jmxterm-1.0.4-uber.jar

open localhost:9012

beans -d debezium.postgres

get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications
帮我查一下客户
打开一个连接到JDB2服务器
搜寻豆下 德贝齐姆。 领域
得到通知。
你应该期待下面的输出

#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
Notifications = [ {
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Initial Snapshot;
id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
type = SKIPPED;
},
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = STARTED;
},
{
additionalData = {
( current_collection_in_progress ) = {
key = current_collection_in_progress;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( maximum_key ) = {
key = maximum_key;
value = 10004;
};
( last_processed_key ) = {
key = last_processed_key;
value = 10004;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = IN_PROGRESS;
},
{
additionalData = {
( scanned_collection ) = {
key = scanned_collection;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( total_rows_scanned ) = {
key = total_rows_scanned;
value = 4;
};
( status ) = {
key = status;
value = SUCCEEDED;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = TABLE_SCAN_COMPLETED;
},
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = COMPLETED;
}
];
这是一个来自初始快照的通知,带有状态SKIPPED 因为我们的连接器配置了"snapshot.mode": “NEVER”
这是关于增量快照启动的通知
这个通知告诉我们inventory.orders 快照正在进行中,提供了关于最后处理和最大键的有用信息。在这个例子中,我们只有一个in progress 通知,但取决于你的桌子尺寸snapshot.fetch.size ,你可以得到更多。
此通知告知特定表的快照已经完成,并提供了所处理的全部行的信息。
对于这个示例,这是我们有的最后一个通知,它告诉我们整个增量快照进展已经完成。
jmx还提供了生成自己的通知的可能性。德贝唑也会产生这些通知。您可以订阅这些通知,因此您可以在不投票的情况下立即接收这些通知。 通知书 豆。
利用亚洛基亚
JLOLIA是一个功能强大的工具,可以让您与JDB2服务器进行交互,并通过REST来公开它。使用它,我们可以通过REST与Debezns进行交互,利用信号和通知的jmx豆。这样,您可以无缝地发送信号和接收通知,并使用更熟悉的RESTAPI。

要启用Joloya,我们需要启用它在我们卡夫卡连接容器上的代理。

这是我们示例中使用的码头组合文件

version: ‘2’
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true
会暴露出恐怖分子使用的港口
这将使在我们的测试图像中已经装运的Joloia代理。如果您想在您的安装上启用代理,请检查 正式文件
通过约洛基亚发出信号
若要通过JOLOKIA发送信号,我们可以向具有所需信号和参数的JOLOKIA端点发送一个HTTP邮件请求。

要继续使用增量快照示例,要触发它,您可以运行以下命令

curl -X POST ‘http://localhost:8778/jolokia/exec’ -d ‘{“type”:“EXEC”,“mbean”:“debezium.postgres:context=signals,server=dbserver1,type=management”,“operation”:“signal”,“arguments”:[“12345”,“execute-snapshot”,"{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}"]}’ | jq
它应该是

{
“request”: {
“mbean”: “debezium.postgres:context=signals,server=dbserver1,type=management”,
“arguments”: [
“12345”,
“execute-snapshot”,
“{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}”
],
“type”: “exec”,
“operation”: “signal”
},
“value”: null,
“timestamp”: 1695651387,
“status”: 200
}
接收通知书
还允许您使用httpGET请求从Debezum获取通知。

curl -X GET ‘http://localhost:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications’ | jq
它应该是

{
“request”: {
“mbean”: “debezium.postgres:context=notifications,server=dbserver1,type=management”,
“attribute”: “Notifications”,
“type”: “read”
},
“value”: [
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “b20bec8d-f21f-4d74-bb75-cdd7f4c7d933”,
“type”: “SKIPPED”,
“aggregateType”: “Initial Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “STARTED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“last_processed_key”: “10004”,
“current_collection_in_progress”: “inventory.orders”,
“connector_name”: “dbserver1”,
“maximum_key”: “10004”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “IN_PROGRESS”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“scanned_collection”: “inventory.orders”,
“connector_name”: “dbserver1”,
“total_rows_scanned”: “4”,
“status”: “SUCCEEDED”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “TABLE_SCAN_COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “12345”,
“type”: “COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “STARTED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“last_processed_key”: “109”,
“current_collection_in_progress”: “inventory.products”,
“connector_name”: “dbserver1”,
“maximum_key”: “109”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “IN_PROGRESS”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“scanned_collection”: “inventory.products”,
“connector_name”: “dbserver1”,
“total_rows_scanned”: “9”,
“status”: “SUCCEEDED”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “TABLE_SCAN_COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “12345”,
“type”: “COMPLETED”,
“aggregateType”: “Incremental Snapshot”
}
],
“timestamp”: 1695652278,
“status”: 200
}
你也看到了我们也收到了inventory.products 我们通过RESTAPI发送的表增量快照

结论
在我们系列的第三部分中,我们学习了如何启用和管理使用jmx和joloia的信令和通知。信号可以让你动态地控制Debezium的行为,而通知可以让你了解关键事件。通过利用这些功能和JOLOKIA,您可以有效地管理、监视和与您的数据流工作流交互,确保您始终控制Debezium。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/720242.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Maven】Maven 基础教程(四):搭建 Maven 私服 Nexus

《Maven 基础教程》系列&#xff0c;包含以下 4 篇文章&#xff1a; Maven 基础教程&#xff08;一&#xff09;&#xff1a;基础介绍、开发环境配置Maven 基础教程&#xff08;二&#xff09;&#xff1a;Maven 的使用Maven 基础教程&#xff08;三&#xff09;&#xff1a;b…

我的NPI项目之Android 安全系列 -- Keymaster到底是个什么

最近因为一直在调研独立secure element集成的工作&#xff0c;不巧的是目前使用的高通平台只有NFC-eSE的方案。高通目前也并不支持独立的eSE集成&#xff0c;codebase中并无相对应的代码。举个例子&#xff0c;目前使用的STM的一款eSE&#xff0c;但是这款eSE的开发STM还没有完…

HarmonyOS—HAP唯一性校验逻辑

HAP是应用安装的基本单位&#xff0c;在DevEco Studio工程目录中&#xff0c;一个HAP对应一个Module。应用打包时&#xff0c;每个Module生成一个.hap文件。 应用如果包含多个Module&#xff0c;在应用市场上架时&#xff0c;会将多个.hap文件打包成一个.app文件&#xff08;称…

matlab 提取分割位于多边形区域边缘内部或边缘上的点

[in,on] = inpolygon(xq,yq,xv,yv) xv 和 yv 为定义的多边形区域的,如xv = [1 4 4 1 1 ];yv = [1 1 4 4 1 ];注意最后一个数字与第一个重复,保证多边形闭合; xq 和 yq 为待查询的点in:在多边形内部和边缘的点序号on:仅在多边形边缘的点序号 提取分割方法: matrix=[xq yq…

ios端接口代码语言-身份证号实名认证ios端接口

身份证核验&#xff0c;其背后必须要有权威&#xff0c;真实的身份证信息数据库作为基础&#xff0c;如中安未来翔云OCR云平台&#xff08;www.netocr.com&#xff09;近期上线的身份证核验功能&#xff0c;就是与权威机构合作&#xff0c;拥有权威&#xff0c;真实的身份证信息…

大数据技术学习笔记(五)—— MapReduce(1)

目录 1 MapReduce 概述1.1 MapReduce 定义1.2 MapReduce 优缺点1.3 MapReduce 核心思想1.4 MapReduce 进程1.5 Hadoop 序列化类型1.6 MapReduce 编程规范1.7 WordCount 案例实操1.7.1 案例需求1.7.2 环境准备1.7.3 编写程序1.7.4 测试 2 MapReduce 序列化2.1 序列化概述2.2 自定…

JavaScript中数组相关

JavaScript中有很多用于操作数组的内置函数。以下是一些常用的数组函数&#xff1a; 1. Array.prototype.push(): 向数组末尾添加一个或多个元素&#xff0c;并返回新数组的长度。 2. Array.prototype.pop(): 删除并返回数组的最后一个元素。 3. Array.prototype.unshift(): 向…

浅析volatile关键字

浅析volatile关键字 文章目录 浅析volatile关键字1. volatile关键字的意义2.volatile应用3. volatile常见问题总结 1. volatile关键字的意义 ​ 被 volatile 修饰的变量&#xff0c;在对其进行读写操作时&#xff0c;会引发一些可观测的副作用。而这些可观测的副作用&#xff…

sql单表运用11.3

一、进入数据库操作界面 1、mysql -u root -p 敲回车 &#xff0c;输入密码 &#xff0c;进入数据库操作界面 2、show databases 查看所有的数据&#xff08;如果没有数据库&#xff1a;创建数据库 create database 库名称&#xff09; 3、use 数据库名 使…

探索Terraform实践:优化基础设施管理

Terraform 是管理基础设施及代码&#xff08;IaC&#xff09;最常用的工具之一&#xff0c;它能使我们安全且可预测地对基础设施应用更改。 Terraform作为一个强大的基础设施即代码工具&#xff0c;为开发人员和运维团队提供了一种简单而强大的方式来定义、部署和管理基础设施。…

软件工程顶会——ICSE '24 论文清单、摘要

1、A Comprehensive Study of Learning-based Android Malware Detectors under Challenging Environments 近年来&#xff0c;学习型Android恶意软件检测器不断增多。这些检测器可以分为三种类型&#xff1a;基于字符串、基于图像和基于图形。它们大多在理想情况下取得了良好的…

Go-gin-example 第四部分 重启服务

文章目录 知识点本节目标何谓优雅 ctrlc信号 修改流程实现优雅重启endless安装 编写验证编译执行唤醒问题 续接 上一节 知识点 信号量的了解应用热更新 本节目标 在前文中&#xff0c;我们在配置玩之后直接使用 ctrlc 来进行进程的结束&#xff0c;我们将了解 ctrlc 的过程…

HarmonyOS | 状态管理(九) | Environment (设备环境查询)

系列文章目录 1.HarmonyOS | 状态管理(一) | State装饰器 2.HarmonyOS | 状态管理(二) | Prop装饰器 3.HarmonyOS | 状态管理(三) | Link装饰器 4.HarmonyOS | 状态管理(四) | Provide和Consume装饰器 5.HarmonyOS | 状态管理(五) | Observed装饰器和ObjectLink装饰器 6.Harmo…

为啥要用C艹不用C?

在很多时候&#xff0c;有人会有这样的疑问 ——为什么要用C&#xff1f;C相对于C优势是什么&#xff1f; 最近两年一直在做Linux应用&#xff0c;能明显的感受到C带来到帮助以及快感 之前&#xff0c;我在文章里面提到环形队列 C语言&#xff0c;环形队列 环形队列到底是怎么回…

NLP_文本数据分析_3(代码示例)

目标 了解文本数据分析的作用.掌握常用的几种文本数据分析方法. 1 文件数据分析介绍 文本数据分析的作用: 文本数据分析能够有效帮助我们理解数据语料, 快速检查出语料可能存在的问题, 并指导之后模型训练过程中一些超参数的选择. 常用的几种文本数据分析方法: 标签数量分布句…

阿里云的ssh的22端口,修改为2422端口,作为默认的ssh端口

要将阿里云&#xff08;或任何Linux服务器&#xff09;的SSH端口从默认的22端口更改为2422端口&#xff0c;你可以按照以下步骤操作&#xff1a; 编辑SSH配置文件&#xff1a; 打开SSH服务的配置文件/etc/ssh/sshd_config。你可以使用nano、vi或任何文本编辑器来编辑这个文件。…

Vue3_2024_4天【computer、watch、method在Vue2~3中的说明】未完待补

第一&#xff1a;从概念上介绍~~vue中计算属性、方法、监听器&#xff08;以Vue2描述&#xff09; 1.计算属性 (Computed Properties): 1.概念&#xff1a; 计算属性是基于响应式依赖进行缓存的属性&#xff0c;只有在相关依赖发生改变时才会重新求值。它们类似于具有缓存的函…

场景问题: VisualVM工具Profiler JDBC不是真实执行的SQL

1. 问题 诡异的问题表象&#xff1a; 前端反馈分页接口的Total字段一直为0 使用Visualvm中的 Profiler 注入到应用后&#xff0c;查看JDBC监控得到了分页接口执行的SQL&#xff0c;复制出来执行是55. 此时还没有注意到 IN 的范围中有一个特别的值 NULL &#x1f928; 2. 排查…

视觉Transformers中的位置嵌入 - 研究与应用指南

视觉 Transformer 中位置嵌入背后的数学和代码简介。 自从 2017 年推出《Attention is All You Need》以来&#xff0c;Transformer 已成为自然语言处理 (NLP) 领域最先进的技术。 2021 年&#xff0c;An Image is Worth 16x16 Words 成功地将 Transformer 应用于计算机视觉任务…

Windows C++:控制新进程的创建方式

目录 介绍 标志位介绍 代码示例 这些宏定义&#xff08;dwCreationFlag值&#xff09;是用于Windows操作系统中CreateProcess函数的标志&#xff0c;它们控制新进程的创建方式。下面是这些标志的中文介绍&#xff1a; 介绍 Winbase.h中的部分代码&#xff1a; // // Proc…