Debezium发布历史30

原文地址: https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

使用 Debezium 和 Kafka Streams 创建 DDD 聚合
2018 年 3 月 8 日 作者: Hans-Peter Grahsl、Gunnar Morling
讨论 实例
基于微服务的架构可以被认为是一种行业趋势,因此最近经常出现在企业应用程序中。在多个服务及其支持数据存储之间保持数据同步的一种可能方法是使用一种称为变更数据捕获(简称 CDC)的方法。

本质上,CDC 允许侦听数据流一端(即数据源)发生的任何修改,并将它们作为更改事件传达给其他感兴趣的各方或将它们存储到数据接收器中。建议不要以点对点的方式执行此操作,而是将数据源和数据接收器之间的事件流解耦。这样的场景可以基于Debezium和Apache Kafka相对轻松地实现,并且实际上无需编码。

作为示例,请考虑以下基于微服务的订单管理系统架构:
图片来自于官网
在这里插入图片描述

该系统包含三个服务:Order、Item和Stock。如果订单服务收到订单请求,它将需要来自其他两个请求的信息,例如商品定义或特定商品的库存数量。CDC 可以用于为Item和Stock服务管理的数据设置更改事件流,而不是同步调用这些服务来获取此信息。命令_服务可以订阅这些事件流,并在自己的数据库中保留相关商品和库存数据的本地副本。这种方法有助于解耦服务(例如,不会受到服务中断的直接影响),并且也有利于整体性能,因为每个服务可以仅保留其感兴趣的其他服务拥有的那些数据项的优化视图。

如何处理聚合对象?
然而,在某些用例中,事情有点棘手。有时,通过所谓的聚合来跨服务和数据存储共享信息很有用,聚合是由领域驱动设计 (DDD) 定义的概念/模式。一般来说,DDD 聚合用于传输状态,该状态可以由多个不同的域对象组成,这些域对象一起被视为单个信息单元。

具体例子有:

客户及其地址,表示为存储客户和地址列表的客户记录聚合

订单和相应的行项目,表示为存储订单及其所有行项目的订单记录聚合

支持这些 DDD 聚合的相关域对象的数据很可能存储在 RDBMS 的单独关系中。当利用 Debezium 当前的 CDC 功能时,对域对象的所有更改都将被独立捕获,并默认最终反映在单独的 Kafka 主题中,每个 RDBMS 关系一个主题。虽然此行为对于许多用例非常有帮助,但对其其他用例可能有很大限制,例如上面描述的 DDD 聚合场景。因此,这篇博文探讨了如何使用Kafka Streams API基于 Debezium CDC 事件构建 DDD 聚合。

从数据源捕获更改事件
GitHub 上的 Debezium示例存储库提供了本博文的完整源代码。首先克隆此存储库并更改为kstreams目录:

git clone https://github.com/debezium/debezium-examples.git
cd kstreams
该项目提供了一个 Docker Compose 文件,其中包含您可能已经从Debezium 教程中了解的所有组件的服务:

阿帕奇动物园管理员

阿帕奇·卡夫卡

具有 Debezium CDC 连接器的Kafka Connect实例

MySQL(填充了一些测试数据)

此外,它还声明以下服务:

MongoDB将用作数据接收器

另一个 Kafka Connect 实例将托管 MongoDB 接收器连接器

我们将在下面构建一个用于运行 DDD 聚合过程的服务

我们稍后会讨论这三个,现在让我们准备管道的源端:

export DEBEZIUM_VERSION=0.7
docker-compose up mysql zookeeper kafka connect_source
所有服务启动后,通过提交以下 JSON 文档来注册 Debezium MySQL 连接器的实例:

{
“name”: “mysql-source”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “dbz”,
“database.server.id”: “184054”,
“database.server.name”: “dbserver1”,
“table.whitelist”: “inventory.customers,inventory.addresses”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”,
“transforms”: “unwrap”,
“transforms.unwrap.type”:“io.debezium.transforms.UnwrapFromEnvelope”,
“transforms.unwrap.drop.tombstones”:“false”
}
}
为此,请运行以下curl命令:

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @mysql-source.json
这将使用给定的凭据设置指定数据库的连接器。customers出于我们的目的,我们只对和表的更改感兴趣addresses,因此该table.whitelist属性仅用于选择这两个表。另一个值得注意的事情是应用的“展开”变换。默认情况下,Debezium 的 CDC 事件将包含已更改行的旧状态和新状态以及有关更改源的一些附加元数据。通过应用UnwrapFromEnvelope SMT(单消息转换),只有新状态才会传播到相应的 Kafka 主题中。

一旦连接器部署并完成了两个捕获表的初始快照,我们就可以查看它们:

docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh
–bootstrap-server kafka:9092
–from-beginning
–property print.key=true
–topic dbserver1.inventory.customers # or dbserver1.inventory.addresses
例如,您应该看到以下输出

(为了可读性而格式化并省略架构信息)针对客户更改的主题:

{
“schema”: { … },
“payload”: {
“id”: 1001
}
}
{
“schema”: { … },
“payload”: {
“id”: 1001,
“first_name”: “Sally”,
“last_name”: “Thomas”,
“email”: “sally.thomas@acme.com”
}
}

构建 DDD 聚合
KStreams 应用程序将处理来自两个 Kafka 主题的数据。这些主题根据 MySQL 中发现的客户和地址关系接收 CDC 事件,每个主题都有其相应的 Jackson 注释的 POJO(客户和地址),并通过保存 CDC 事件类型(即 UPSERT/DELETE)的字段进行丰富。

由于 Kafka 主题记录采用 Debezium JSON 格式且未包装信封,因此编写了一个特殊的SerDe,以便能够分别使用 POJO 或 Debezium 事件表示来读取/写入这些记录。虽然序列化程序只是使用 Jackson 将 POJO 转换为 JSON,但反序列化程序是一种“混合”程序,能够从 Debezium CDC 事件或 json 化的 POJO 进行反序列化。

完成此操作后,可以按如下方式构建动态创建和维护 DDD 聚合的 KStreams 拓扑:

客户主题(“家长”)
所有客户记录都只是从客户主题读取到KTable中,KTable 将根据记录键(即客户的 PK)自动维护每个客户的最新状态

KTable<DefaultId, Customer> customerTable =
builder.table(parentTopic, Consumed.with(defaultIdSerde,customerSerde));
地址主题(“儿童”)
对于地址记录,处理过程稍微复杂一些,需要几个步骤。首先,所有地址记录被读入KStream。

KStream<DefaultId, Address> addressStream = builder.stream(childrenTopic,
Consumed.with(defaultIdSerde, addressSerde));
其次,根据这些地址记录的键(关系中的原始主键)对这些地址记录进行“伪”分组。在此步骤中,将维护与相应客户记录的关系。即使地址记录被删除,这也可以有效地跟踪哪个地址记录属于哪个客户记录。为了实现这一点,引入了额外的LatestAddress POJO,除了地址记录本身之外,它还允许存储最新的已知PK<→FK关系。

KTable<DefaultId,LatestAddress> tempTable = addressStream
.groupByKey(Serialized.with(defaultIdSerde, addressSerde))
.aggregate(
() -> new LatestAddress(),
(DefaultId addressId, Address address, LatestAddress latest) -> {
latest.update(
address, addressId, new DefaultId(address.getCustomer_id()));
return latest;
},
Materialized.<DefaultId,LatestAddress,KeyValueStore<Bytes, byte[]>>
as(childrenTopic+"_table_temp")
.withKeySerde(defaultIdSerde)
.withValueSerde(latestAddressSerde)
);
第三,中间KTable再次转换为KStream。LastAddress记录被转换为将客户 ID(FK 关系)作为新密钥,以便按客户对它们进行分组。在分组步骤中,客户特定地址会更新,这可能会导致地址记录被添加或删除。为此,引入了另一个名为Addresses的 POJO ,它包含相应更新的地址记录映射。结果是一个KTable,其中包含每个客户 ID 的最新地址。

KTable<DefaultId, Addresses> addressTable = tempTable.toStream()
.map((addressId, latestAddress) ->
new KeyValue<>(latestAddress.getCustomerId(),latestAddress))
.groupByKey(Serialized.with(defaultIdSerde,latestAddressSerde))
.aggregate(
() -> new Addresses(),
(customerId, latestAddress, addresses) -> {
addresses.update(latestAddress);
return addresses;
},
Materialized.<DefaultId,Addresses,KeyValueStore<Bytes, byte[]>>
as(childrenTopic+"_table_aggregate")
.withKeySerde(defaultIdSerde)
.withValueSerde(addressesSerde)
);
将客户与地址结合起来
最后,通过将客户 KTable 与地址 KTable 连接起来,从而构建由CustomerAddressAggregate POJO表示的 DDD 聚合,可以轻松地将客户和地址组合在一起。最后,KTable 更改被写入 KStream,而 KStream 又被保存到 kafka 主题中。这允许以多种方式利用生成的 DDD 聚合。

KTable<DefaultId,CustomerAddressAggregate> dddAggregate =
customerTable.join(addressTable, (customer, addresses) ->
customer.get_eventType() == EventType.DELETE ?
null :
new CustomerAddressAggregate(customer,addresses.getEntries())
);

dddAggregate.toStream().to(“final_ddd_aggregates”,
Produced.with(defaultIdSerde,(Serde)aggregateSerde));
客户 KTable 中的记录可能会收到 CDC 删除事件。如果是这样,可以通过检查客户 POJO 的事件类型字段来检测,例如返回“null”而不是 DDD 聚合。每当消费方也需要对删除采取相应行动时,这样的约定就会很有帮助。_

运行聚合管道
实现聚合管道后,是时候对其进行测试运行了。为此,请构建poc-ddd-aggregates Maven 项目,其中包含完整的实现:

mvn clean package -f poc-ddd-aggregates/pom.xml
然后从 Compose 文件运行该aggregator服务,该文件采用该项目构建的 JAR 并使用java-jboss-openjdk8-jdk基础映像启动它:

docker-compose up -d aggregator
聚合管道运行后,我们可以使用控制台消费者查看聚合事件:

docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh
–bootstrap-server kafka:9092
–from-beginning
–property print.key=true
–topic final_ddd_aggregates
将 DDD 聚合传输到数据接收器
我们最初打算构建这些 DDD 聚合,以便在数据源(本例中为 MySQL 表)和方便的数据接收器之间传输数据并同步更改。根据定义,DDD 聚合通常是复杂的数据结构,因此将它们写入数据存储是非常有意义的,因为数据存储提供了灵活的方式和手段来查询和/或索引它们。谈到 NoSQL 数据库,文档存储似乎是最自然的选择,MongoDB是此类用例的领先数据库。

借助Kafka Connect和众多交钥匙连接器,几乎可以轻松完成此任务。使用来自开源社区的MongoDB 接收器连接器,可以轻松地将 DDD 聚合写入 MongoDB。它所需要的只是一个正确的配置,可以将其发布到Kafka Connect 的REST API以便运行连接器。

因此,让我们启动 MongoDb 和另一个 Kafka Connect 实例来托管接收器连接器:

docker-compose up -d mongodb connect_sink
如果 DDD 聚合不加修改地写入 MongoDB,配置可能看起来很简单,如下所示:

{
“name”: “mongodb-sink”,
“config”: {
“connector.class”: “at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector”,
“tasks.max”: “1”,
“topics”: “final_ddd_aggregates”,
“mongodb.connection.uri”: “mongodb://mongodb:27017/inventory?w=1&journal=true”,
“mongodb.collection”: “customers_with_addresses”,
“mongodb.document.id.strategy”: “at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy”,
“mongodb.delete.on.null.values”: true
}
}
与源连接器一样,使用curl 部署连接器:

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8084/connectors/ -d @mongodb-sink.json
该连接器将使用来自“final_ddd_aggregates”Kafka 主题的消息,并将它们作为MongoDB 文档写入“customers_with_addresses”集合中。

您可以通过启动 Mongo shell 并查询集合的内容来查看:

docker-compose exec mongodb bash -c ‘mongo inventory’

db.customers_with_addresses.find().pretty()
{
“_id”: {
“id”: “1001”
},
“addresses”: [
{
“zip”: “76036”,
“_eventType”: “UPSERT”,
“city”: “Euless”,
“street”: “3183 Moore Avenue”,
“id”: “10”,
“state”: “Texas”,
“customer_id”: “1001”,
“type”: “SHIPPING”
},
{
“zip”: “17116”,
“_eventType”: “UPSERT”,
“city”: “Harrisburg”,
“street”: “2389 Hidden Valley Road”,
“id”: “11”,
“state”: “Pennsylvania”,
“customer_id”: “1001”,
“type”: “BILLING”
}
],
“customer”: {
“_eventType”: “UPSERT”,
“last_name”: “Thomas”,
“id”: “1001”,
“first_name”: “Sally”,
“email”: “sally.thomas@acme.com”
}
}
由于单个文档中的数据组合,某些部分是不需要的或多余的。为了消除任何不需要的数据(例如,每个地址子文档的_eventType、customer_id),还可以调整配置以将所述字段列入黑名单。

最后,更新 MySQL 源数据库中的一些客户或地址数据:

docker-compose exec mysql bash -c ‘mysql -u M Y S Q L U S E R − p MYSQL_USER -p MYSQLUSERpMYSQL_PASSWORD inventory’

mysql> update customers set first_name= “Sarah” where id = 1001;
此后不久,您应该看到 MongoDB 中相应的聚合文档已相应更新。

缺点和限制
虽然从基于表的 CDC 事件创建 DDD 聚合的第一个版本基本上可以工作,但了解其当前的局限性非常重要:

不普遍适用,因此需要 POJO 和中间类型的自定义代码

无法跨多个实例进行扩展,因为在处理之前缺少但必要的数据重新分区

仅限于基于 1:N 关系之间的单个 JOIN 构建聚合

产生的 DDD 聚合最终是一致的,这意味着它们有可能在收敛之前暂时表现出中间状态

前几个问题可以通过在 KStreams 应用程序上进行合理的工作来解决。最后一个问题是处理 DDD 聚合结果的最终一致性,纠正起来要困难得多,并且需要 Debezium 自己的 CDC 机制做出一些努力。

外表
在这篇文章中,我们描述了一种从 Debezium 的 CDC 事件创建聚合事件的方法。在后续博客文章中,我们可能会更深入地探讨如何通过运行多个 KStreams 聚合器实例来水平扩展 DDD 创建的主题。为此,在运行拓扑之前需要对数据进行适当的重新分区。此外,研究一个更通用的版本可能会很有趣,它只需要自定义类来描述所涉及的两个主要 POJO。

我们还考虑提供一个即用型组件,该组件将以通用方式工作(基于 Connect 记录,即不依赖于特定的序列化格式,例如 JSON),并且可以设置为运行的可配置独立进程给定的聚合。

另外,关于处理最终一致性的主题,我们得到了一些想法,但这些想法肯定需要更多的探索和调查。敬请关注!

我们很想听听您对事件聚合主题的反馈。如果您对此主题有任何想法或想法,请通过在下面发表评论或向我们的邮件列表发送消息来联系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/580110.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 Redis Streams在Spring Boot中的应用&#xff1a;构建可靠的消息队列解决方案 引言前言Redis Streams的基本概念和特性1. 日志数据结构2. 消息和字段3. 消费者组4. 消息ID5. 实时和历史数据处理6. 性能…

7.3 uvm_config_db in UVM

uvm_config_db类派生自uvm_resource_db类。它是uvm_resource_db顶部的另一层便利层&#xff0c;简化了用于uvm_component实例的基本接口&#xff08;资源库的访问方法&#xff09;。 下面uvm_config_db类的代码段取自uvm源代码。 class uvm_config_db#(type Tint) extends uv…

html之为什么使用表单,常用表单元素使用?

文章目录 一、为什么使用表单呢&#xff1f;二、常用表单元素使用三、总结 一、为什么使用表单呢&#xff1f; 为什么使用表单呢&#xff0c;使用表单是为了更好的收集用户数据&#xff0c;并且安全 二、常用表单元素使用 1、password密码框 密码框&#xff1a;会隐藏数据&a…

网络摄像头爆破实战

*** 重要说明&#xff1a;仅用于交流网络安全测试技术&#xff0c;并唤起大家对网络安全的重视&#xff0c;如用本文的技术干违法的事情&#xff0c;博主概不负责。*** 文章目录 前言1. 发现摄像头2. 发现端口3. 确定品牌信息4. 确定RTSP地址5. 获取视频流6. 获取密码7. 再次获…

flutter学习-day20-使用SafeArea组件处理各机型的安全距离

&#x1f4da; 目录 介绍分析示例和效果图特殊情况 1. 介绍 安全区域&#xff0c;指的是移动端设备的可视窗口范围。处于安全区域的内容不受圆角、刘海屏、iPhone 小黑条、状态栏等的影响&#xff0c;也就是说&#xff0c;我们要做好适配&#xff0c;必须保证页面可视、可操作…

亚马逊鲲鹏系统全自动化操作注册下单更快捷

亚马逊鲲鹏系统的强大崛起&#xff0c;让买家号的注册、养号、下单留评等繁琐任务迎来了一场全新的自动化革命。这一创新性软件系统的横空出世&#xff0c;为广大亚马逊卖家提供了一种高效、智能的解决方案&#xff0c;成功摆脱了繁重的手动操作。 在这一系统中&#xff0c;买家…

安卓恢复指南:五种安卓数据恢复软件推荐

我们的手机随身携带。我们抓住他们快速拍照、发送消息并保持娱乐。有时我们对它们过于冒险&#xff0c;将它们扔在混凝土或水中&#xff0c;安装我们不应该安装的软件&#xff0c;然后将它们留在电影中或公园的长椅上。 如果您要在任何地方丢失重要数据&#xff0c;很可能是在…

C# WPF上位机开发(扩展上位机之外的技能)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 如果把c# wpf只是看成是一个做界面的框架&#xff0c;那确实有点狭隘了。单独的上位机软件&#xff0c;如果不需要上下游的支持&#xff0c;没有与…

linux中top参数详解

top命令是Linux下常用的性能分析工具&#xff0c;能够实时显示系统中各个进程的资源占用状况&#xff0c;类似于Windows的任务管理器 top参数详解 第一行&#xff0c;任务队列信息&#xff0c;同 uptime 命令的执行结果 系统时间&#xff1a;07:27:05 运行时间&#xff1a;up …

Oracle查询重复数据取第二行,好用来删除重复数据

Oracle查询重复数据取第二行&#xff0c;好用来删除重复数据 SELECT * FROM ( SELECT e.* , ROW_NUMBER() over(PARTITION BY product_category_id,model_size_id ORDER BY product_category_id,model_size_id) rn FROM equ_check_rules e ) s WHERE rn 2;

Plantuml之序列图语法介绍(十七)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

springboot对接WebSocket实现消息推送

1.修改pom文件 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency> 2.增加配置WebSocketConfig.java import org.springframework.context.annotation.Bean…

实训4---硬件部分---点灯实验--按键控制灯实验--uart串口实验

目录 三、硬件部分 【1】点灯实验 【2】按键控制灯实验 【3】uart串口实验 核心代码&#xff1a; 实验视频 实现流水灯 uart串口实验 三、硬件部分 GPIO 【1】点灯实验 1.首先找到要点的灯&#xff0c;在板子上看到对应的白色丝印&#xff0c;比如绿灯D10.然后打开底板…

服务器数据恢复-raid6离线磁盘强制上线后分区打不开的数据恢复案例

服务器数据恢复环境&#xff1a; 服务器上有一组由12块硬盘组建的raid6磁盘阵列&#xff0c;raid6阵列上层有一个lun&#xff0c;映射到WINDOWS系统上使用&#xff0c;WINDOWS系统划分了一个GPT分区。 服务器故障&分析&#xff1a; 服务器在运行过程中突然无法访问。对服务…

什么是EMC工程师?

摘要: 今天来介绍一下什么是EMC工程师。一 EMC工程师起源要了解什么是EMC工程师&#xff0c;我们首先要了解什么是EMC。 今天来介绍一下什么是EMC工程师。 一 EMC工程师起源 要了解什么是EMC工程师&#xff0c;我们首先要了解什么是EMC。 工程师这个职业相信大家都耳熟能详…

1.决策树

目录 1. 什么是决策树? 2. 决策树的原理 2.1 如何构建决策树&#xff1f; 2.2 构建决策树的数据算法 2.2.1 信息熵 2.2.2 ID3算法 2.2.2.1 信息的定义 2.2.2.2 信息增益 2.2.2.3 ID3算法举例 2.2.2.4 ID3算法优缺点 2.2.3 C4.5算法 2.2.3.1 C4.5算法举例 2.2.4 CART算法 2.2.4…

基于VUE3+Layui从头搭建通用后台管理系统(前端篇)十六:统计报表模块相关功能实现

一、本章内容 本章使用Echarts及DataV实现常用图表、特殊图表、地图及综合图表等图表展示功能。 1. 详细课程地址: https://edu.csdn.net/course/detail/38183 2. 源码下载地址: 点击下载 二、界面预览 三、开发视频 3.1 B站视频地址: 基于VUE3+Layui从

Python——yolov8识别车牌2.0

目录 一、前言 二、关于项目UI 2.1、修改界面内容的文本 2.2、修改界面的图标和图片 三、项目修改地方 四、其他配置问题 一、前言 因为后续有许多兄弟说摄像头卡顿&#xff0c;我在之前那个MATS上面改一下就可以了&#xff0c;MAST项目&#xff1a;基于YOLOv8的多端车流检…

【leetcode100-019】【矩阵】螺旋矩阵

【题干】 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 【思路】 不难注意到&#xff0c;每进行一次转向&#xff0c;都有一行/列被输出&#xff08;并失效&#xff09;&#xff1b;既然已经失效&#xff0c;那我…

倒计时1天!WAVE SUMMIT+ 2023将开启,五大亮点抢鲜看!

10句话2分钟&#xff0c;挑战成功说服宿管阿姨开门&#xff0c;这个人群中的“显眼包”是一个接入文心大模型4.0游戏里的NPC&#xff0c;妥妥 “工具人”实锤&#xff5e; 尝试用AI一键自动识别好坏咖啡豆&#xff0c;看一眼便知好坏&#xff0c;真正“颜值即正义”&#xff0…