原文地址: https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
使用 Debezium 和 Kafka Streams 创建 DDD 聚合
2018 年 3 月 8 日 作者: Hans-Peter Grahsl、Gunnar Morling
讨论 实例
基于微服务的架构可以被认为是一种行业趋势,因此最近经常出现在企业应用程序中。在多个服务及其支持数据存储之间保持数据同步的一种可能方法是使用一种称为变更数据捕获(简称 CDC)的方法。
本质上,CDC 允许侦听数据流一端(即数据源)发生的任何修改,并将它们作为更改事件传达给其他感兴趣的各方或将它们存储到数据接收器中。建议不要以点对点的方式执行此操作,而是将数据源和数据接收器之间的事件流解耦。这样的场景可以基于Debezium和Apache Kafka相对轻松地实现,并且实际上无需编码。
作为示例,请考虑以下基于微服务的订单管理系统架构:
图片来自于官网
该系统包含三个服务:Order、Item和Stock。如果订单服务收到订单请求,它将需要来自其他两个请求的信息,例如商品定义或特定商品的库存数量。CDC 可以用于为Item和Stock服务管理的数据设置更改事件流,而不是同步调用这些服务来获取此信息。命令_服务可以订阅这些事件流,并在自己的数据库中保留相关商品和库存数据的本地副本。这种方法有助于解耦服务(例如,不会受到服务中断的直接影响),并且也有利于整体性能,因为每个服务可以仅保留其感兴趣的其他服务拥有的那些数据项的优化视图。
如何处理聚合对象?
然而,在某些用例中,事情有点棘手。有时,通过所谓的聚合来跨服务和数据存储共享信息很有用,聚合是由领域驱动设计 (DDD) 定义的概念/模式。一般来说,DDD 聚合用于传输状态,该状态可以由多个不同的域对象组成,这些域对象一起被视为单个信息单元。
具体例子有:
客户及其地址,表示为存储客户和地址列表的客户记录聚合
订单和相应的行项目,表示为存储订单及其所有行项目的订单记录聚合
支持这些 DDD 聚合的相关域对象的数据很可能存储在 RDBMS 的单独关系中。当利用 Debezium 当前的 CDC 功能时,对域对象的所有更改都将被独立捕获,并默认最终反映在单独的 Kafka 主题中,每个 RDBMS 关系一个主题。虽然此行为对于许多用例非常有帮助,但对其其他用例可能有很大限制,例如上面描述的 DDD 聚合场景。因此,这篇博文探讨了如何使用Kafka Streams API基于 Debezium CDC 事件构建 DDD 聚合。
从数据源捕获更改事件
GitHub 上的 Debezium示例存储库提供了本博文的完整源代码。首先克隆此存储库并更改为kstreams目录:
git clone https://github.com/debezium/debezium-examples.git
cd kstreams
该项目提供了一个 Docker Compose 文件,其中包含您可能已经从Debezium 教程中了解的所有组件的服务:
阿帕奇动物园管理员
阿帕奇·卡夫卡
具有 Debezium CDC 连接器的Kafka Connect实例
MySQL(填充了一些测试数据)
此外,它还声明以下服务:
MongoDB将用作数据接收器
另一个 Kafka Connect 实例将托管 MongoDB 接收器连接器
我们将在下面构建一个用于运行 DDD 聚合过程的服务
我们稍后会讨论这三个,现在让我们准备管道的源端:
export DEBEZIUM_VERSION=0.7
docker-compose up mysql zookeeper kafka connect_source
所有服务启动后,通过提交以下 JSON 文档来注册 Debezium MySQL 连接器的实例:
{
“name”: “mysql-source”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “dbz”,
“database.server.id”: “184054”,
“database.server.name”: “dbserver1”,
“table.whitelist”: “inventory.customers,inventory.addresses”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”,
“transforms”: “unwrap”,
“transforms.unwrap.type”:“io.debezium.transforms.UnwrapFromEnvelope”,
“transforms.unwrap.drop.tombstones”:“false”
}
}
为此,请运行以下curl命令:
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @mysql-source.json
这将使用给定的凭据设置指定数据库的连接器。customers出于我们的目的,我们只对和表的更改感兴趣addresses,因此该table.whitelist属性仅用于选择这两个表。另一个值得注意的事情是应用的“展开”变换。默认情况下,Debezium 的 CDC 事件将包含已更改行的旧状态和新状态以及有关更改源的一些附加元数据。通过应用UnwrapFromEnvelope SMT(单消息转换),只有新状态才会传播到相应的 Kafka 主题中。
一旦连接器部署并完成了两个捕获表的初始快照,我们就可以查看它们:
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh
–bootstrap-server kafka:9092
–from-beginning
–property print.key=true
–topic dbserver1.inventory.customers # or dbserver1.inventory.addresses
例如,您应该看到以下输出
(为了可读性而格式化并省略架构信息)针对客户更改的主题:
{
“schema”: { … },
“payload”: {
“id”: 1001
}
}
{
“schema”: { … },
“payload”: {
“id”: 1001,
“first_name”: “Sally”,
“last_name”: “Thomas”,
“email”: “sally.thomas@acme.com”
}
}
…
构建 DDD 聚合
KStreams 应用程序将处理来自两个 Kafka 主题的数据。这些主题根据 MySQL 中发现的客户和地址关系接收 CDC 事件,每个主题都有其相应的 Jackson 注释的 POJO(客户和地址),并通过保存 CDC 事件类型(即 UPSERT/DELETE)的字段进行丰富。
由于 Kafka 主题记录采用 Debezium JSON 格式且未包装信封,因此编写了一个特殊的SerDe,以便能够分别使用 POJO 或 Debezium 事件表示来读取/写入这些记录。虽然序列化程序只是使用 Jackson 将 POJO 转换为 JSON,但反序列化程序是一种“混合”程序,能够从 Debezium CDC 事件或 json 化的 POJO 进行反序列化。
完成此操作后,可以按如下方式构建动态创建和维护 DDD 聚合的 KStreams 拓扑:
客户主题(“家长”)
所有客户记录都只是从客户主题读取到KTable中,KTable 将根据记录键(即客户的 PK)自动维护每个客户的最新状态
KTable<DefaultId, Customer> customerTable =
builder.table(parentTopic, Consumed.with(defaultIdSerde,customerSerde));
地址主题(“儿童”)
对于地址记录,处理过程稍微复杂一些,需要几个步骤。首先,所有地址记录被读入KStream。
KStream<DefaultId, Address> addressStream = builder.stream(childrenTopic,
Consumed.with(defaultIdSerde, addressSerde));
其次,根据这些地址记录的键(关系中的原始主键)对这些地址记录进行“伪”分组。在此步骤中,将维护与相应客户记录的关系。即使地址记录被删除,这也可以有效地跟踪哪个地址记录属于哪个客户记录。为了实现这一点,引入了额外的LatestAddress POJO,除了地址记录本身之外,它还允许存储最新的已知PK<→FK关系。
KTable<DefaultId,LatestAddress> tempTable = addressStream
.groupByKey(Serialized.with(defaultIdSerde, addressSerde))
.aggregate(
() -> new LatestAddress(),
(DefaultId addressId, Address address, LatestAddress latest) -> {
latest.update(
address, addressId, new DefaultId(address.getCustomer_id()));
return latest;
},
Materialized.<DefaultId,LatestAddress,KeyValueStore<Bytes, byte[]>>
as(childrenTopic+"_table_temp")
.withKeySerde(defaultIdSerde)
.withValueSerde(latestAddressSerde)
);
第三,中间KTable再次转换为KStream。LastAddress记录被转换为将客户 ID(FK 关系)作为新密钥,以便按客户对它们进行分组。在分组步骤中,客户特定地址会更新,这可能会导致地址记录被添加或删除。为此,引入了另一个名为Addresses的 POJO ,它包含相应更新的地址记录映射。结果是一个KTable,其中包含每个客户 ID 的最新地址。
KTable<DefaultId, Addresses> addressTable = tempTable.toStream()
.map((addressId, latestAddress) ->
new KeyValue<>(latestAddress.getCustomerId(),latestAddress))
.groupByKey(Serialized.with(defaultIdSerde,latestAddressSerde))
.aggregate(
() -> new Addresses(),
(customerId, latestAddress, addresses) -> {
addresses.update(latestAddress);
return addresses;
},
Materialized.<DefaultId,Addresses,KeyValueStore<Bytes, byte[]>>
as(childrenTopic+"_table_aggregate")
.withKeySerde(defaultIdSerde)
.withValueSerde(addressesSerde)
);
将客户与地址结合起来
最后,通过将客户 KTable 与地址 KTable 连接起来,从而构建由CustomerAddressAggregate POJO表示的 DDD 聚合,可以轻松地将客户和地址组合在一起。最后,KTable 更改被写入 KStream,而 KStream 又被保存到 kafka 主题中。这允许以多种方式利用生成的 DDD 聚合。
KTable<DefaultId,CustomerAddressAggregate> dddAggregate =
customerTable.join(addressTable, (customer, addresses) ->
customer.get_eventType() == EventType.DELETE ?
null :
new CustomerAddressAggregate(customer,addresses.getEntries())
);
dddAggregate.toStream().to(“final_ddd_aggregates”,
Produced.with(defaultIdSerde,(Serde)aggregateSerde));
客户 KTable 中的记录可能会收到 CDC 删除事件。如果是这样,可以通过检查客户 POJO 的事件类型字段来检测,例如返回“null”而不是 DDD 聚合。每当消费方也需要对删除采取相应行动时,这样的约定就会很有帮助。_
运行聚合管道
实现聚合管道后,是时候对其进行测试运行了。为此,请构建poc-ddd-aggregates Maven 项目,其中包含完整的实现:
mvn clean package -f poc-ddd-aggregates/pom.xml
然后从 Compose 文件运行该aggregator服务,该文件采用该项目构建的 JAR 并使用java-jboss-openjdk8-jdk基础映像启动它:
docker-compose up -d aggregator
聚合管道运行后,我们可以使用控制台消费者查看聚合事件:
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh
–bootstrap-server kafka:9092
–from-beginning
–property print.key=true
–topic final_ddd_aggregates
将 DDD 聚合传输到数据接收器
我们最初打算构建这些 DDD 聚合,以便在数据源(本例中为 MySQL 表)和方便的数据接收器之间传输数据并同步更改。根据定义,DDD 聚合通常是复杂的数据结构,因此将它们写入数据存储是非常有意义的,因为数据存储提供了灵活的方式和手段来查询和/或索引它们。谈到 NoSQL 数据库,文档存储似乎是最自然的选择,MongoDB是此类用例的领先数据库。
借助Kafka Connect和众多交钥匙连接器,几乎可以轻松完成此任务。使用来自开源社区的MongoDB 接收器连接器,可以轻松地将 DDD 聚合写入 MongoDB。它所需要的只是一个正确的配置,可以将其发布到Kafka Connect 的REST API以便运行连接器。
因此,让我们启动 MongoDb 和另一个 Kafka Connect 实例来托管接收器连接器:
docker-compose up -d mongodb connect_sink
如果 DDD 聚合不加修改地写入 MongoDB,配置可能看起来很简单,如下所示:
{
“name”: “mongodb-sink”,
“config”: {
“connector.class”: “at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector”,
“tasks.max”: “1”,
“topics”: “final_ddd_aggregates”,
“mongodb.connection.uri”: “mongodb://mongodb:27017/inventory?w=1&journal=true”,
“mongodb.collection”: “customers_with_addresses”,
“mongodb.document.id.strategy”: “at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy”,
“mongodb.delete.on.null.values”: true
}
}
与源连接器一样,使用curl 部署连接器:
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8084/connectors/ -d @mongodb-sink.json
该连接器将使用来自“final_ddd_aggregates”Kafka 主题的消息,并将它们作为MongoDB 文档写入“customers_with_addresses”集合中。
您可以通过启动 Mongo shell 并查询集合的内容来查看:
docker-compose exec mongodb bash -c ‘mongo inventory’
db.customers_with_addresses.find().pretty()
{
“_id”: {
“id”: “1001”
},
“addresses”: [
{
“zip”: “76036”,
“_eventType”: “UPSERT”,
“city”: “Euless”,
“street”: “3183 Moore Avenue”,
“id”: “10”,
“state”: “Texas”,
“customer_id”: “1001”,
“type”: “SHIPPING”
},
{
“zip”: “17116”,
“_eventType”: “UPSERT”,
“city”: “Harrisburg”,
“street”: “2389 Hidden Valley Road”,
“id”: “11”,
“state”: “Pennsylvania”,
“customer_id”: “1001”,
“type”: “BILLING”
}
],
“customer”: {
“_eventType”: “UPSERT”,
“last_name”: “Thomas”,
“id”: “1001”,
“first_name”: “Sally”,
“email”: “sally.thomas@acme.com”
}
}
由于单个文档中的数据组合,某些部分是不需要的或多余的。为了消除任何不需要的数据(例如,每个地址子文档的_eventType、customer_id),还可以调整配置以将所述字段列入黑名单。
最后,更新 MySQL 源数据库中的一些客户或地址数据:
docker-compose exec mysql bash -c ‘mysql -u M Y S Q L U S E R − p MYSQL_USER -p MYSQLUSER−pMYSQL_PASSWORD inventory’
mysql> update customers set first_name= “Sarah” where id = 1001;
此后不久,您应该看到 MongoDB 中相应的聚合文档已相应更新。
缺点和限制
虽然从基于表的 CDC 事件创建 DDD 聚合的第一个版本基本上可以工作,但了解其当前的局限性非常重要:
不普遍适用,因此需要 POJO 和中间类型的自定义代码
无法跨多个实例进行扩展,因为在处理之前缺少但必要的数据重新分区
仅限于基于 1:N 关系之间的单个 JOIN 构建聚合
产生的 DDD 聚合最终是一致的,这意味着它们有可能在收敛之前暂时表现出中间状态
前几个问题可以通过在 KStreams 应用程序上进行合理的工作来解决。最后一个问题是处理 DDD 聚合结果的最终一致性,纠正起来要困难得多,并且需要 Debezium 自己的 CDC 机制做出一些努力。
外表
在这篇文章中,我们描述了一种从 Debezium 的 CDC 事件创建聚合事件的方法。在后续博客文章中,我们可能会更深入地探讨如何通过运行多个 KStreams 聚合器实例来水平扩展 DDD 创建的主题。为此,在运行拓扑之前需要对数据进行适当的重新分区。此外,研究一个更通用的版本可能会很有趣,它只需要自定义类来描述所涉及的两个主要 POJO。
我们还考虑提供一个即用型组件,该组件将以通用方式工作(基于 Connect 记录,即不依赖于特定的序列化格式,例如 JSON),并且可以设置为运行的可配置独立进程给定的聚合。
另外,关于处理最终一致性的主题,我们得到了一些想法,但这些想法肯定需要更多的探索和调查。敬请关注!
我们很想听听您对事件聚合主题的反馈。如果您对此主题有任何想法或想法,请通过在下面发表评论或向我们的邮件列表发送消息来联系。