Debezium发布历史153

原文地址: https://debezium.io/blog/2023/07/10/custom-http-signaling-notification/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

Debezium signaling and notifications - Part 2: Customisation
July 10, 2023 by Anisha Mohanty
debezium features notifications signaling custom channels

欢迎来到这个系列文章专门讨论信号和通知德贝齐姆!这篇文章作为本系列的第二部分,我们将在这里讨论如何自定义Debezns中的信号和通知通道。

Debezum2.3引入了信号和通知能力的新改进。您可以设置新的信号和通知通道,除了预先定义的信号和通知通道由DEBeZUS提供。这一功能使用户能够定制系统,以适应其独特的需求,并将其与现有的基础设施或第三方解决方案相结合。它能够通过精确捕捉和传递信号事件并通过首选渠道触发通知,对数据变化进行有效的监测和积极的响应。

本系列的第一篇文章, 代贝兹的信号和通知 ,概述德贝兹的信号和通知功能。它还讨论了可用的渠道及其在各种情况下的用例。

定制信号和通知通道
在Debezum中,可以定制信号和通知通道以适应特定的需求。例如,我们可以通过创建一个HTTP 信号和通知的通道。这个HTTP 通道接收来自HTTP端点的信号,并且可以在信号传递时将通知送回端点。

让我们来探索一个演示如何创建和利用HTTP 使用德贝兹波斯特连接器的信号和通知通道,A 模拟服务器 发出信号, 邮筒 通过http端点接收通知。

设置HTTP 信号通道:
在发生相关数据库更改时,配置德贝兹(Debezum)连接器以接收信号。

建立一个服务,利用HTTP 频道。该服务可以是数据库、第三方应用程序或任何其他可以发送HTTP请求的系统。在本例中,我们将使用模拟服务器将信号发送到Debezns。模拟服务器是一种可以用来模拟HTTP请求和响应的服务。

配置模拟服务器,使用适当的http方法(例如)通过http端点发送信号。,邮政)。

定制HTTP 通道设置,以定义http端点网址、身份验证、标题和任何所需的其他参数。

设置HTTP 通知渠道:
一旦信号被Debezum接收和处理,它就可以触发将通知张贴到http端点。在本例中,我们将使用HTTP 频道。邮政箱是一种服务,可以用来接收HTTP请求并查看请求详细信息。

定制HTTP 通知的通道设置,在邮件箱中创建一个资料库,并定义http端点URL、身份验证、标题和任何所需的附加参数。

使用适当的HTTP方法将通知事件转发到http端点即邮政资料箱(例如。,邮政)。通知有效载荷可以根据需要定制。

在博客帖子中,此示例的完整源代码在Debezum示例存储库中提供。 http-signal-notification 目录。

创建一个Java项目来构建HTTP 信号和通知渠道。运行以下命令,以使用MAVIN创建一个新的Java项目:

mvn archetype:generate
-DgroupId=io.debezium.examples
-DartifactId=http-signaling-notification
将下列从属关系添加到pom.xml 德贝佐姆版本的文件(2.3及后版本):

io.debezium debezium-core 2.3.0.Final 若要使用模拟服务器接收信号,请创建一个定义模拟服务器服务的码头组合文件。模拟服务器服务的配置如下:

services:
mockServer:
image: mockserver/mockserver:latest
ports:
- 1080:1080
environment:
- MOCKSERVER_WATCH_INITIALIZATION_JSON=true
- MOCKSERVER_INITIALIZATION_JSON_PATH=/config/initializerJson.json
volumes:
- ./initializerJson.json:/config/initializerJson.json
环境变量MOCKSERVER_WATCH_INITIALIZATION_JSON 和MOCKSERVER_INITIALIZATION_JSON_PATH 设置为使模拟服务器监视初始化JSON文件中的更改并指定其路径。…initializerJson.json 文件,其中包含的http请求和响应信息的信号,是安装到模拟服务器容器。

…initializerJson.json 文件定义了对路径的模拟http请求/api/signal 使用查询字符串参数code=10969 .当模拟服务器收到此请求时,它将与一个含有id ,type ,以及data .响应的状态代码为200,表示成功响应。定义:initializerJson.json 文件如下:

[
{
“httpRequest” : {
“method” : “GET”,
“path” : “/api/signal”,
“queryStringParameters” : {
“code” : [“10969”]
}
},
“httpResponse” : {
“body”: “{“id”:“924e3ff8-2245-43ca-ba77-2af9af02fa07”,“type”:“log”,“data”:{“message”: “Signal message received from http endpoint.”}}”,
“statusCode”: 200
}
}
]
id一个用来识别信号实例的任意唯一字符串。

type :发送的信号类型。在这个例子中,类型是log 它要求连接器向连接器的日志文件添加一个条目。在处理信号之后,连接器在日志中打印指定的消息。

data :传递到信号事件的jon格式参数。在这个例子中,message 参数传递到信号事件。

创造HTTP 通过实施SignalChannelReader 接口如下:

public class HttpSignalChannel implements SignalChannelReader {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpSignalChannel.class);

public static final String CHANNEL_NAME = "http";
private static final List<SignalRecord> SIGNALS = new ArrayList<>();
public CommonConnectorConfig connectorConfig;@Override
public String name() { return CHANNEL_NAME;
}@Override
public void init(CommonConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig;
}@Override
public List<SignalRecord> read() { try {String requestUrl = "http://mockServer:1080/api/signal?code=10969";// send http request to the mock serverHttpClient httpClient = HttpClient.newHttpClient();HttpRequest request = HttpRequest.newBuilder().uri(URI.create(requestUrl)).GET().header("Content-Type", "application/json").build();// read the responseHttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == 200) {ObjectMapper mapper = new ObjectMapper();String responseBody = response.body();// parse the response bodyJsonNode signalJson = mapper.readTree(responseBody);Map<String, Object> additionalData = signalJson.has("additionalData") ? mapper.convertValue(signalJson.get("additionalData"), new TypeReference<>() {}) : new HashMap<>();String id = signalJson.get("id").asText();String type = signalJson.get("type").asText();String data = signalJson.get("data").toString();SignalRecord signal = new SignalRecord(id, type, data, additionalData);LOGGER.info("Recorded signal event '{}' ", signal);// process the signalSIGNALS.add(signal);} else {LOGGER.warn("Error while reading signaling events from endpoint: {}", response.statusCode());}} catch (IOException | InterruptedException e) {LOGGER.warn("Exception while preparing to process the signal '{}' from the endpoint", e.getMessage());e.printStackTrace();}return SIGNALS;}@Override
public void close() { SIGNALS.clear();
}

}
…name() 方法返回信号通道的名称。若要启用德贝兹使用通道,请指定名称http 在连接器的signal.enabled.channels 财产。
…init() 方法可以用来初始化HTTP通道需要的特定配置、变量或连接。
…read() 方法从http端点读取信号并返回一个列表SignalRecord 将由德贝兹连接器处理的对象。
…close() 方法关闭所有分配的资源。
通过实现NotificationChannel 接口如下:

public class HttpNotificationChannel implements NotificationChannel {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpNotificationChannel.class);

public static final String CHANNEL_NAME = "http";
private static final String NOTIFICATION_PREFIX = "[HTTP NOTIFICATION SERVICE]";@Override
public String name() { return CHANNEL_NAME;
}@Override
public void init(CommonConnectorConfig config) { // custom configuration
}@Override
public void send(Notification notification) { LOGGER.info(String.format("%s Sending notification to http channel", NOTIFICATION_PREFIX));String binId = createBin();sendNotification(binId, notification);
}private static String createBin()  {// Create a bin on the servertry {HttpRequest request = HttpRequest.newBuilder().uri(new URI("https://www.toptal.com/developers/postbin/api/bin")).POST(HttpRequest.BodyPublishers.ofString(" ")).build();HttpClient httpClient = HttpClient.newHttpClient();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == HTTP_CREATED) {String binId = response.body().replaceAll(".*\"binId\":\"([^\"]+)\".*", "$1");LOGGER.info("Bin created: " + response.body());return binId;}} catch (URISyntaxException | InterruptedException | IOException e) {throw new RuntimeException(e);}return null;
}private static void sendNotification (String binId, Notification notification) {// Get notification from the bintry {ObjectMapper mapper = new ObjectMapper();String notificationString = mapper.writeValueAsString(notification);HttpRequest request = HttpRequest.newBuilder().uri(new URI("https://www.toptal.com/developers/postbin/" + binId)).header("Content-Type", "application/json").POST(HttpRequest.BodyPublishers.ofString(notificationString)).build();HttpClient httpClient = HttpClient.newHttpClient();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == HTTP_OK) {LOGGER.info("Notification received : " + response.body());}} catch (URISyntaxException | InterruptedException | IOException e) {throw new RuntimeException(e);}
}@Override
public void close() { 
}

}
…name() 方法返回通知通道的名称。若要启用德贝兹使用通道,请指定http 在连接器的notification.enabled.channels 财产。
…init() 方法可以用来初始化通道所需的特定配置、变量或连接。
…send() 方法将通知发送到通道。该通知载有SignalRecord 由德贝兹接头处理的对象。
…close() 方法关闭所有分配的资源。
宣布HTTP 的信号和通知渠道META-INF/services 目录下io.debezium.pipeline.signal.SignalChannelReader 和io.debezium.pipeline.notification.channels.NotificationChannel 文件。

将Java项目编译并导出为JAR文件。这可以使用MAVIN或您首选的构建工具来完成。将JAR文件复制到包含您想要使用的Debezum连接器的JAR文件的目录中。例如,如果您想使用与Debezum后角连接器的自定义信号和通知通道,请将JAR文件复制到/kafka/connect/debezium-connector-postgres 目录。

这个例子提供了一个码头组合文件,其中定义了必要的服务,其中包括模拟服务器、动物园管理员、卡夫卡连接和波斯特格雷斯数据库。

要启动服务,请运行以下命令:

export DEBEZIUM_VERSION=2.3
docker-compose up -d
在确保服务的运行和运行之后,以及之后的下一步是注册连接器。这包括创建一个连接器配置文件。我们创建一个名为register-postgres.json 拥有下列属性:

{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: 1,
“database.hostname”: “postgres”,
“database.port”: 5432,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.dbname” : “postgres”,
“topic.prefix”: “dbserver1”,
“schema.include.list”: “inventory”,
“signal.enabled.channels”: “http”,
“notification.enabled.channels”: “http”
}
}
…signal.enabled.channels 属性指定连接器要使用的信号通道。在这种情况下,连接器使用http 信号通道。
…notification.enabled.channels 属性指定连接器要使用的通知通道。在这种情况下,连接器使用http 通知频道。
现在我们已经准备好了连接器配置文件,我们可以通过执行以下命令来注册卡夫卡连接器:

curl -i -X POST -H “Accept:application/json”
-H “Content-Type:application/json” http://localhost:8083/connectors/
-d @register-postgres.json
一旦连接器成功注册,您可以查看连接器日志以观察信号事件。日志提供了对连接器处理和进展的深入了解,包括任何与信号相关的信息。你会遇到类似以下的日志信息:

Recorded signal event ‘SignalRecord{id=‘924e3ff8-2245-43ca-ba77-2af9af02fa07’, type=‘log’, data=’{“message”:“Signal message received from http endpoint.”}’, additionalData={}}’ [io.debezium.examples.signal.HttpSignalChannel]
此外,您可能会注意到与发送到邮件箱的通知事件有关的日志消息。例如:

[HTTP NOTIFICATION SERVICE] Sending notification to http channel [io.debezium.examples.notification.HttpNotificationChannel]
Bin created: {“binId”:“1688742588469-1816775151528”,“now”:1688742588470,“expires”:1688744388470} [io.debezium.examples.notification.HttpNotificationChannel]
它提供关于通知事件的信息,例如创建带有唯一标识符(BINID)的垃圾箱和其他相关细节。若欲从邮政局检索通知事件,请取binId 从日志消息,并使用它来请求来自邮政信息库的相应通知事件。要查看通知事件,您可以使用以下网址访问邮政资料库:https://www.toptal.com/developers/postbin/b/:binId .替换:binId 从连接器日志中获得的实际二进制的URL中。

发送到邮政局的通知事件如下:
图片来自官网原文
在这里插入图片描述

资料夹后预览
结论
在本教程中,我们探讨了如何为德贝齐亚连接器创建自定义信号和通知通道。我们创建了一个自定义信号通道,它接收来自http端点的信号事件。我们还创建了一个自定义通知通道,该通道将通知事件发送到http端点。

Debezum的全面的信号和通知系统提供了与第三方解决方案的无缝集成,使用户能够随时了解Debezum连接器的状态和进展。该系统的可扩展性使用户能够定制信号和通知通道,以适应其定制需求。

请继续关注本系列的第3部分,我们将在这里探索关于jmx信号和通知的内容。与此同时,您可以查看Debezum文档,了解更多有关信号和通知通道的信息。

如果您有任何问题或反馈,请随时与我们联系。 邮寄清单 或 #通信秘书长 关于郁金香聊天的频道。我们很乐意听到你的消息!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/705096.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

<网络安全>《55 概念讲解<第二课 MAC地址>》

1 MAC地址是什么&#xff1f; MAC地址&#xff08;Media Access Control Address&#xff09;是一种标识网络设备的唯一地址&#xff0c;也被称为物理地址或硬件地址。它由网络设备制造商在生产过程中写入网卡的EPROM&#xff08;一种可擦写的闪存芯片&#xff09;。 IP地址和…

LabVIEW变压器振动信号数据采集与分析

LabVIEW变压器振动信号数据采集与分析 随着电力系统的快速发展&#xff0c;对变压器的安全监控和故障诊断需求日益增加。设计了一套基于LabVIEW的变压器振动信号数据采集与分析系统&#xff0c;提高变压器的运行安全性和可靠性&#xff0c;实现对变压器振动信号的实时监测和故…

产品经理学习-产品运营《什么是SOP》

目录 什么是SOP 如何执行SOP 执行SOP的重点 什么是SOP SOP就是项目流程操作的说明书 日常工作中的例行操作&#xff1a; 例行操作是指&#xff0c;在每一天&#xff0c;针对每一个用户&#xff0c;在每个项目之中&#xff0c;都必须完成的操作&#xff0c;这些必须完成的操…

微服务-微服务链路追踪组件Skywalking实战

自动化监控系统Prometheus&Grafana实战&#xff1a; 4 trem APM-性能监控项目班&#xff1a; https://vip.tulingxueyuan.cn/detail/p_602e574ae4b035d3cdb8f8fe/6 1. skywalking是什么 1.1 Skywalking主要功能特性 1.2 Skywalking整体架构 1.3 SkyWalking 环境搭建部…

用 SIL 和 PIL 仿真测试生成的代码

目录 PIL 的目标连接配置 对顶层模型运行 SIL 或 PIL 仿真 对 Model 模块运行 SIL 或 PIL 仿真 SIL 或 PIL 模块仿真 硬件实现设置 使用软件在环 (SIL) 和处理器在环 (PIL) 仿真,测试模型组件与从组件生成的生产代码之间的数字等效性。 使用 SIL 仿真,在您的开发…

【析】装卸一体化车辆路径问题的自适应并行遗传算法

0 引言 国内外有关 &#xff36;&#xff32;&#xff30;&#xff33;&#xff30;&#xff24;的文献较多&#xff0c;求解目标多以最小化车辆行驶距离为主&#xff0c;但现实中可能存在由租赁费用产生的单次派出成本&#xff0c;需要综合考 虑单次派车成本和配送路径成本。…

Doris实战——结合Flink构建极速易用的实时数仓

目录 一、实时数仓的需求与挑战 二、构建极速易用的实时数仓架构 三、解决方案 3.1 如何实现数据的增量与全量同步 3.1.1 增量及全量数据同步 3.1.2 数据一致性保证 3.1.3 DDL 和 DML 同步 Light Schema Change Flink CDC DML 和DDL同步 3.2 如何基于Flink实现多种数…

初学学习408之数据结构--数据结构基本概念

初学学习408之数据结构我们先来了解一下数据结构的基本概念。 数据结构&#xff1a;是相互之间存在一种或多种特定关系的数据元素的集合。 本内容来源于参考书籍《大话数据结构》与《王道数据结构》。除去书籍中的内容&#xff0c;作为初学者的我会尽力详细直白地介绍数据结构的…

YOLOv7改进 | 更换主干网络之GhostNet

前言:Hello大家好,我是小哥谈。GhostNet是一种轻量级的卷积神经网络架构,它的设计目标是在保持高精度的同时,减少模型的参数和计算量,以便在资源受限的设备上进行高效推理。GhostNet通过引入Ghost模块来实现这一目标,该模块利用低成本的附加通道来学习主要特征,并通过信…

FMM 笔记:在colab上执行FMM

windows上配置FMM很麻烦&#xff0c;一直没整好&#xff0c;于是尝试了在colab上执行FMM 参考内容&#xff1a;jalal1/fmm_jupyter: Install Fast map matching (FMM) using Jupyter Notebook (github.com) 1 下载数据 # download file from GitHub ! wget https://raw.gith…

【ArcGIS】基于DEM/LUCC等数据统计得到各集水区流域特征

基于DEM/LUCC等数据统计得到各集水区流域特征 提取不同集水区各类土地利用类型比例步骤1&#xff1a;划分集水区为独立面单元步骤2&#xff1a;批量掩膜提取得到各集水区土地利用类型比例步骤3&#xff1a;导入各集水区LUCC数据并统计得到各类型占比 提取坡度特征流域面坡度河道…

Mysql 的高可用详解

Mysql 高可用 复制 复制是解决系统高可用的常见手段。其思路就是&#xff1a;不要把鸡蛋都放在一个篮子里。 复制解决的基本问题是让一台服务器的数据与其他服务器保持同步。一台主库的数据可以同步到多台备库上&#xff0c;备库本身也可以被配置成另外一台服务器的主库。主…

20240226-100. 同一棵树

题目要求 给定两个二叉树 p 和 q 的根&#xff0c;编写一个函数来检查它们是否相同。 如果两个二叉树结构相同并且节点具有相同的值&#xff0c;则认为它们是相同的。 Example 1: Input: p [1,2,3], q [1,2,3] Output: trueExample 2: Input: p [1,2], q [1,null,2] Outp…

数一满分150分总分451东南大学920电子信息通信考研Jenny老师辅导班同学,真题大纲,参考书。

记录用来打破的&#xff0c;信息通信考研Jenny老师2024级辅导班同学&#xff0c;数一满分150分&#xff0c;专业课920专业基础综合143&#xff0c;总分451分&#xff0c;一位及其优秀的本科985报考东南大学信息学院的学生&#xff0c;东南大学920考研&#xff0c;东南大学信息科…

vue - - - - Vue3+i18n多语言动态国际化设置

Vue3i18n多语言动态国际化设置 前言一、 i18n 介绍二、插件安装三、i18n配置3.1 创建i18n对应文件夹/文件3.2 en-US.js3.3 zh-CN.js3.4 index.js 四、 mian.js 引入 i18n配置文件五、 组件内使用六、使用效果 前言 继续【如何给自己的网站添加中英文切换】一文之后&#xff0c…

41.仿简道云公式函数实战-数学函数-SUMIF

1. SUMIF函数 SUMIF 函数可用于计算子表单中满足某一条件的数字相加并返回和。 2. 函数用法 SUMIF(range, criteria, [sum_range]) 其中各参数的含义及使用方法如下&#xff1a; range&#xff1a;必需&#xff1b;根据 criteria 的条件规则进行检测的判断字段。支持的字段…

【Leetcode每日一题】二分查找 - 在排序数组中查找元素的第一个和最后一个位置(难度⭐⭐)(18)

1. 题目解析 Leetcode链接&#xff1a;34. 在排序数组中查找元素的第一个和最后一个位置 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 核心在于找到给定目标值所在的数组下标区间&#xff0c;设计一个O(logn)的算法。 2. 算法原…

基于“python+”潮汐、风驱动循环、风暴潮等海洋水动力模拟

原文&#xff1a;基于“python”潮汐、风驱动循环、风暴潮等海洋水动力模拟 前沿 ADCIRC是新一代海洋水动力计算模型&#xff0c;它采用了非结构三角形网格广义波动连续方程的设计&#xff0c;在提高计算精确度的同时还减小了计算时间。被广泛应用于&#xff1a;模拟潮汐和风驱…

2024牛客寒假算法基础集训营2

目录 A.Tokitsukaze and Bracelet B.Tokitsukaze and Cats C.Tokitsukaze and Min-Max XOR D.Tokitsukaze and Slash Draw E and F.Tokitsukaze and Eliminate (easy)(hard) G.Tokitsukaze and Power Battle (easy) 暂无 I.Tokitsukaze and Short Path (plus) J.Tokits…