Debezium发布历史40

原文地址: https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

使用 Hibernate 和 Debezium 实现聚合视图
2018 年 9 月 20 日 作者: Gunnar Morling
讨论 实例
数据更改后更新外部全文搜索索引(例如Elasticsearch)是更改数据捕获 (CDC) 的一个非常流行的用例。

正如我们之前在博客文章中讨论的那样,Debezium 的 CDC 源连接器和 Confluence 的Elasticsearch 接收器连接器的组合可以直接捕获 MySQL、Postgres 等中的数据更改,并将它们近乎实时地推送到 Elasticsearch 。这会导致源数据库中的表与 Elasticsearch 中相应的搜索索引之间形成 1:1 的关系,这对于许多用例来说非常适合。

但如果您想将整个聚合放入单个索引中,它会变得更具挑战性。例如,客户及其所有地址;这些通常存储在 RDBMS 中的两个单独的表中,通过外键链接,而您希望 Elasticsearch 中只有一个索引,其中包含嵌入了地址的客户文档,使您能够根据以下条件高效地搜索客户:他们的地址。

继我们最近描述的基于 KStreams 的解决方案之后,我们希望在这篇文章中提出一种替代方案,用于具体化由应用程序层驱动的此类聚合视图。

概述
这个想法是在原始数据发生更改时在源数据库中的单独表中具体化视图。

聚合被序列化为 JSON 结构(自然可以表示任何嵌套对象结构)并存储在特定的表中。这是在更改数据的实际事务中完成的,这意味着聚合视图始终与主数据一致。特别是,这种方法不容易像上面链接的帖子中讨论的基于 KStreams 的解决方案那样暴露中间聚合。

整体架构如下图:
图片来自于官网
在这里插入图片描述

将物化聚合视图流式传输到 Elasticsearch
这里聚合视图通过Hibernate ORM的一个小扩展实现,它在源数据库中存储 JSON 聚合(注意“聚合视图”在概念上可以被认为与不同 RDBMS 中已知的“物化视图”相同,如它们实现了“连接”操作的结果,但从技术上讲,我们不使用后者来存储聚合视图,而是使用常规表)。然后 Debezium 捕获对该聚合表的更改,并将其传输到每种聚合类型的一个主题。Elasticsearch接收器连接器可以订阅这些主题并更新相应的全文索引。

您可以在我们的示例存储库中找到此想法的概念验证实现(即 Hibernate 扩展和相关代码)。当然,总体思路并不限于 Hibernate ORM 或 JPA,您可以使用用于访问数据的任何其他 API 来实现类似的功能。

通过 Hibernate ORM 创建聚合视图
接下来,我们假设我们在数据库中保存一个简单的域模型(包含一个Customer实体和一些相关的模型,例如Address,(客户)等)。使用 Hibernate 可以让我们使用Hibernate 事件监听器使Category聚合的创建对实际应用程序代码完全透明。由于其可扩展的架构,我们只需将此类侦听器添加到类路径即可将其插入 Hibernate,在引导实体管理器/会话工厂时将自动从该类路径中获取它。

我们的示例侦听器对注释做出反应,@MaterializeAggregate该注释标记了那些应该是物化聚合根的实体类型。

@Entity
@MaterializeAggregate(aggregateName=“customers-complete”)
public class Customer {

@Id
private long id;private String firstName;@OneToMany(mappedBy = "customer", fetch = FetchType.EAGER, cascade = CascadeType.ALL)
private Set<Address> addresses;@ManyToOne
private Category category;...

}
现在,如果通过 Hibernate 插入、更新或删除带有 注释的任何实体@MaterializeAggregate,侦听器将启动并具体化聚合根(客户)及其关联实体(地址、类别)的 JSON 视图。

在底层,Jackson API用于将模型序列化为 JSON。这意味着您可以使用其任何注释来自定义 JSON 输出,例如@JsonIgnore排除Addressto 的逆关系Customer:

@Entity
public class Address {

@Id
private long id;@ManyToOne
@JoinColumn(name = "customer_id")
@JsonIgnore
private Customer customer;private String street;private String city;...

}
请注意,Address它本身没有标记为@MaterializeAggregate,即它本身不会具体化为聚合视图。

使用 JPAEntityManager插入或更新一些客户后,让我们看一下aggregates侦听器填充的表(为了简洁起见,省略了值模式):

select * from aggregates;

| rootType | keySchema | rootId | materialization | valueSchema |

| customers-complete

| {
“schema” : {
“type” : “struct”,
“fields” : [ {
“type” : “int64”,
“optional” : false,
“field” : “id”
} ],
“optional” : false,
“name” : “customers-complete.Key”
}
}

| { “id” : 1004 }

| { “schema” : { … } }

| {
“id” : 1004,
“firstName” : “Anne”,
“lastName” : “Kretchmar”,
“email” : “annek@noanswer.org”,
“tags” : [ “long-term”, “vip” ],
“birthday” : 5098,
“category” : {
“id” : 100001,
“name” : “Retail”
},
“addresses” : [ {
“id” : 16,
“street” : “1289 University Hill Road”,
“city” : “Canehill”,
“state” : “Arkansas”,
“zip” : “72717”,
“type” : “SHIPPING”
} ]
} |
该表包含以下列:

rootType@MaterializeAggregate:注释中给出的聚合名称

rootId:聚合的 id 作为序列化 JSON

materialization:聚合本身作为序列化 JSON;在这种情况下,客户及其地址、类别等。

keySchema:行键的 Kafka Connect 架构

valueSchema:物化的 Kafka Connect 模式

让我们稍微讨论一下两个模式列。JSON 本身就其支持的数据类型而言非常有限。例如,我们会丢失有关数字字段的值范围(int 与 long 等)的信息,而无需任何附加信息。因此,侦听器从实体模型中派生出键和聚合视图的相应架构信息,并将其存储在聚合记录中。

现在 Jackson 本身只支持 JSON Schema,这对于我们的目的来说有点太有限了。因此,示例实现为 Jackson 的模式系统提供了自定义序列化器,这允许我们发出 Kafka Connect 的模式表示(具有更精确的类型信息)而不是普通的 JSON 模式。当我们想要将键和值的基于字符串的 JSON 表示形式扩展为正确类型的 Kafka Connect 记录时,这将在下面派上用场。

捕获聚合表的更改
我们现在拥有一种机制,每当通过 Hibernate 更改应用程序数据时,该机制可以透明地将聚合持久保存到源数据库中的单独表中。请注意,这发生在源事务的边界内,因此如果由于某种原因回滚同一事务,聚合视图也不会更新。

Hibernate 侦听器在编写聚合视图时使用插入或更新语义,即对于给定的聚合根,聚合表中始终存在一个反映其当前状态的对应条目。如果删除聚合根实体,侦听器也会从聚合表中删除该条目。

现在让我们设置 Debezium 来捕获对aggregates表的任何更改:

curl -i -X POST
-H “Accept:application/json”
-H “Content-Type:application/json”
http://localhost:8083/connectors/ -d @- <<-EOF
{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql”,
“database.port”: “3306”,
“database.user”: “debezium”,
“database.password”: “dbz”,
“database.server.id”: “184054”,
“database.server.name”: “dbserver1”,
“database.whitelist”: “inventory”,
“table.whitelist”: “.*aggregates”,
“database.history.kafka.bootstrap.servers”: “kafka:9092”,
“database.history.kafka.topic”: “schema-changes.inventory”
}
}
EOF
这会将 MySQL 连接器注册到“inventory”数据库(我们使用Debezium 教程中模式的扩展版本),捕获对“aggregates”表的任何更改。

扩展 JSON
如果我们现在要浏览相应的 Kafka 主题,我们会看到表中所有更改的已知 Debezium 格式的数据更改事件aggregates。

不过,具有记录“之后”状态的“物化”字段仍然是包含 JSON 字符串的单个字段。我们更希望拥有的是强类型的 Kafka Connect 记录,其模式准确地描述了聚合结构及其字段的类型。为此,示例项目提供了一个 SMT(单消息转换),它采用 JSON 物化和相应的内容valueSchema,并将其转换为成熟的 Kafka Connect 记录。对键也是如此。DELETE 事件被重写为墓碑事件。最后,SMT 将每条记录重新路由到以聚合根命名的主题,允许消费者仅订阅特定聚合类型的更改。

因此,让我们在注册 Debezium CDC 连接器时添加该 SMT:


“transforms”:“expandjson”,
“transforms.expandjson.type”:“io.debezium.aggregation.smt.ExpandJsonSmt”,

现在浏览“customers-complete”主题时,我们将看到我们期望的强类型 Kafka Connect 记录:

{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “int64”,
“optional”: false,
“field”: “id”
}
],
“optional”: false,
“name”: “customers-complete.Key”
},
“payload”: {
“id”: 1004
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [ … ],
“optional”: true,
“name”: “urn:jsonschema:com:example:domain:Customer”
},
“payload”: {
“id”: 1004,
“firstName”: “Anne”,
“lastName”: “Kretchmar”,
“email”: “annek@noanswer.org”,
“active”: true,
“tags” : [ “long-term”, “vip” ],
“birthday” : 5098,
“category”: {
“id”: 100001,
“name”: “Retail”
},
“addresses”: [
{
“id”: 16,
“street”: “1289 University Hill Road”,
“city”: “Canehill”,
“state”: “Arkansas”,
“zip”: “72717”,
“type”: “LIVING”
}
]
}
}
要确认这些是实际键入的 Kafka Connect 记录,而不仅仅是单个 JSON 字符串字段,您可以使用Avro 消息转换器并检查架构注册表中的消息架构。

将聚合消息放入 Elasticsearch
最后缺少的一步是注册 Confluence Elasticsearch 接收器连接器,将其与“customers-complete”主题挂钩,并让它将任何更改推送到相应的索引:

curl -i -X POST
-H “Accept:application/json”
-H “Content-Type:application/json”
http://localhost:8083/connectors/ -d @- <<-EOF
{
“name”: “es-customers”,
“config”: {
“connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”,
“tasks.max”: “1”,
“topics”: “customers-complete”,
“connection.url”: “http://elastic:9200”,
“key.ignore”: “false”,
“schema.ignore” : “false”,
“behavior.on.null.values” : “delete”,
“type.name”: “customer-with-addresses”,
“transforms” : “key”,
“transforms.key.type”: “org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.key.field”: “id”
}
}
EOF
这使用 Connect 的ExtractField转换从键结构中获取实际的 id 值,并将其用作相应 Elasticsearch 文档的键。指定“behavior.on.null.values”选项将使连接器在遇到逻辑删除消息(即带有键但没有值的消息)时从索引中删除相应的文档。

最后,我们可以使用 Elasticsearch REST API 来浏览索引,当然还可以使用其强大的全文查询语言通过地址或嵌入到聚合结构中的任何其他属性来查找客户:

curl -X GET -H “Accept:application/json”
http://localhost:9200/customers-complete/_search?pretty

{
“_shards”: {
“failed”: 0,
“successful”: 5,
“total”: 5
},
“hits”: {
“hits”: [
{
“_id”: “1004”,
“_index”: “customers-complete”,
“_score”: 1.0,
“_source”: {
“active”: true,
“addresses”: [
{
“city”: “Canehill”,
“id”: 16,
“state”: “Arkansas”,
“street”: “1289 University Hill Road”,
“type”: “LIVING”,
“zip”: “72717”
}
],
“tags” : [ “long-term”, “vip” ],
“birthday” : 5098,
“category”: {
“id”: 100001,
“name”: “Retail”
},
“email”: “annek@noanswer.org”,
“firstName”: “Anne”,
“id”: 1004,
“lastName”: “Kretchmar”,
“scores”: [],
“someBlob”: null,
“tags”: []
},
“_type”: “customer-with-addresses”
}
],
“max_score”: 1.0,
“total”: 1
},
“timed_out”: false,
“took”: 11
}
现在您已经看到了:客户的完整数据,包括他们的地址、类别、标签等,在 Elasticsearch 中具体化为单个文档。如果您使用 JPA 更新客户,您将看到索引中的数据近乎实时地相应更新。

优点和缺点
那么,与基于 KStreams 的方法相比,这种从多个源表实现聚合的方法有哪些优点和缺点呢?

最大的优点是一致性和事务边界意识,而建议形式的基于 KStreams 的解决方案很容易暴露中间聚合。例如,如果您要存储一个客户和三个地址,则流式查询可能会首先创建客户和先插入的两个地址的聚合,然后不久创建包含所有三个地址的完整聚合。此处讨论的方法并非如此,因为您只能将完整的聚合流式传输到 Kafka。此外,这种方法感觉更“轻量级”,即一个简单的标记注释(与一些用于微调发出的 JSON 结构的 Jackson 注释一起)就足以实现域模型中的聚合,

通过应用程序层驱动聚合的缺点是它不能完全不知道您访问主要数据的方式。如果绕过应用程序,例如直接在数据库中修补数据,这些更新自然会丢失,从而需要刷新受影响的聚合。尽管这也可以通过更改数据捕获和 Debezium 来完成:源表的更改事件可以由应用程序本身捕获和使用,从而允许它在外部数据更改后重新实现聚合。您还可能会争辩说,在源事务中运行 JSON 序列化以及在源数据库中存储聚合会产生一些开销。不过,这通常是可以接受的。

另一个要问的问题是,与简单地将 REST 请求发布到 Elasticsearch 相比,在中间聚合表上使用更改数据捕获有何优势。答案是大幅提高的稳健性和容错能力。如果由于某种原因无法访问 Elasticsearch 集群,一旦接收器再次启动,Kafka 和 Kafka Connect 的机制将确保最终传播任何更改事件。Elasticsearch 之外的其他消费者也可以订阅聚合主题,日志可以从头开始重播等。

请注意,虽然我们主要讨论使用 Elasticsearch 作为数据接收器,但还有其他支持复杂结构化记录的数据存储和连接器。一个例子是 MongoDB 和Hans-Peter Grahsl 维护的接收器连接器,可以使用该连接器将客户聚合接收到 MongoDB 中,例如通过单个主键查找即可高效检索客户及其所有相关数据。

外表
Hibernate ORM 扩展以及本文中讨论的 SMT 可以在我们的示例存储库中找到。目前它们应该被认为处于“概念验证”级别。

话虽这么说,我们正在考虑使其成为一个合适的 Debezium 组件,让您只需引入这个新组件即可在基于 Hibernate 的应用程序中采用这种聚合方法。不过,为此我们必须首先改进一些事情。最重要的是,需要一个 API,它可以让您按需(重新)创建聚合,例如针对现有数据或通过 Criteria API / JPQL 批量更新更新的数据(听众会错过)。如果任何引用的实体发生变化,聚合也应该自动重新创建(在当前的 PoC 中,只有对客户实例本身的更改才会触发其聚合视图的重建,但不会对其地址之一进行更改)。

如果您喜欢这个想法,请告诉我们,以便我们评估对此的普遍兴趣。另外,如果您有兴趣为 Debezium 项目做出贡献,这将是一个很好的项目。期待您的来信,例如在下面的评论部分或我们的邮件列表中。

非常感谢 Hans-Peter Grahsl 对本文早期版本的反馈!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/588861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2021-06-25 51蛋骗鸡按键切合LED

缘由ISIS 7 Professional_有问必答-CSDN问答 #include "REG52.h" sbit K1 P3^0; sbit K2 P3^1; sbit K3 P3^2; sbit K4 P3^3; void main() {unsigned char Xd0,xz0,cs0;unsigned int wei0;P1255;while(1){if(K10&&Xd0){P10;while(K10);}if(K20&&…

【Redis-05】Redis如何实现保存键值对的保存及过期键的管理策略

在之前的文章我们介绍过&#xff0c;Redis服务器在启动之初&#xff0c;会初始化RedisServer的实例&#xff0c;在这个实例中存在很多重要的属性结构&#xff0c;同理本篇博客中介绍的数据库实现原理也会和其中的某些属性相关&#xff0c;我们继续看一下吧。 1.服务器和客户端…

【后端】Docker学习笔记

文章目录 Docker一、Docker安装&#xff08;Linux&#xff09;二、Docker概念三、Docker常用命令四、数据卷五、自定义镜像六、网络七、DockerCompose Docker Docker是一个开源平台&#xff0c;主要基于Go语言构建&#xff0c;它使开发者能够将应用程序及其依赖项打包到一个轻…

多维时序 | MATLAB实现SSA-CNN-GRU-SAM-Attention麻雀算法优化卷积网络结合门控循环单元网络融合空间注意力机制多变量时间序列预测

多维时序 | MATLAB实现SSA-CNN-GRU-SAM-Attention麻雀算法优化卷积网络结合门控循环单元网络融合空间注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现SSA-CNN-GRU-SAM-Attention麻雀算法优化卷积网络结合门控循环单元网络融合空间注意力机制多变量时间序列预测预测效…

uni-app模版(扩展插件)

锋哥原创的uni-app视频教程&#xff1a; 2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中..._哔哩哔哩_bilibili2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中...共计23条视频&#xff0c;包括&#xff1a;第1讲 uni…

使用css实现 Typora markdown 标题自动编号

第一&#xff0c;找到主题文件夹 第二&#xff0c;复制下面代码放入 AutoNumber.css文件中 body {counter-reset: h1; }#write h1, .markdown-section h1 {counter-reset: h2; }#write h2, .markdown-section h2 {counter-reset: h3; }#write h3, .markdown-section h3 {counte…

分割数组的最大差值 - 华为OD统一考试

分割数组的最大差值 - 华为OD统一考试 OD统一考试 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 给定一个由若干整数组成的数组nums &#xff0c;可以在数组内的任意位置进行分割&#xff0c;将该数组分割成两个非空子数组(即左数组和右数组)&#xf…

第三部分 连续型需要的积分

目录 温馨提示&#xff1a; 求积分 求分段函数在确定区间的定积分 方法&#xff1a; 例1 例2 例3 例4 例5 例6 例7 求分段函数在到未知数的定积分 方法&#xff1a; 例8 求简单的二重积分 方法&#xff1a; 例9 例10 例11 求f(x,y)的二重积分 方法&#xff1a; 例12 例13 …

Langchain-Chatchat开源库使用的随笔记(一)

笔者最近在研究Langchain-Chatchat&#xff0c;所以本篇作为随笔记进行记录。 最近核心探索的是知识库的使用&#xff0c;其中关于文档如何进行分块的详细&#xff0c;可以参考笔者的另几篇文章&#xff1a; 大模型RAG 场景、数据、应用难点与解决&#xff08;四&#xff09;R…

怎么解决 Nginx反向代理加载速度慢?

Nginx反向代理加载速度慢可能由多种原因引起&#xff0c;以下是一些可能的解决方法&#xff1a; 1&#xff0c;网络延迟&#xff1a; 检查目标服务器的网络状况&#xff0c;确保其网络连接正常。如果目标服务器位于不同的地理位置&#xff0c;可能会有较大的网络延迟。考虑使用…

Good Bye 2023

Good Bye 2023 Good Bye 2023 A. 2023 题意&#xff1a;序列a中所有数的乘积应为2023&#xff0c;现在给出序列中的n个数&#xff0c;找到剩下的k个数并输出&#xff0c;报告不可能。 思路&#xff1a;把所有已知的数字乘起来&#xff0c;判断是否整除2023&#xff0c;不够…

Android Studio如何创建尺寸大小及API通用的模拟器

目录 前言 一、操作步骤 二、总结 三、更多资源 前言 在开发移动应用程序的过程中&#xff0c;使用模拟器进行测试是一种常见和方便的方式。Android Studio是一款功能强大的集成开发环境&#xff0c;它提供了创建和管理模拟器的功能。在本文中&#xff0c;我们将介绍如何创…

qs.stringify 使用arrayFormat属性 + allowDots的数据处理 - 附示例

qs&#xff1a;将url中的参数转为对象&#xff1b;将对象转为url参数形式 一、介绍 1、官方文档&#xff1a; https://github.com/ljharb/qs https://github.com/ljharb/qshttps://github.com/ljharb/qs 二、准备工作 1、安装依赖包 npm install qs --save 2、示例版本 &…

Autodesk Maya各版本安装指南

链接地址如下&#xff1a; https://pan.baidu.com/s/1Fg7MvUJS0tl5t2XAwMK9xg?pwd0531 1.鼠标右击【Maya2024(64bit)】压缩包&#xff08;win11及以上系统需先点击“显示更多选项”&#xff09;【解压到 Maya2024(64bit)】。 2.打开解压后的文件夹&#xff0c;双击打开【Setu…

vue-springboot基于JavaWeb的宠物店兽医站管理系统

ide工具&#xff1a;IDEA 或者eclipse 编程语言: java 数据库: mysql5.7 框架&#xff1a;ssmspringboot都有 前端&#xff1a;vue.jsElementUI 详细技术&#xff1a;HTMLCSSJSspringbootSSMvueMYSQLMAVEN 数据库工具&#xff1a;Navicat结合现有兽医站体系的特点&#xff0c;运…

Android 实现 Slots 游戏旋转效果

文章目录 前言一、效果展示二、代码实现1.UI布局2.SlotAdapter2.SlotsActivity 总结 前言 slots游戏&#xff1a; Slots游戏是一种极具流行度的赌博和娱乐形式&#xff0c;通常被称为老虎机或水果机。它们在赌场、线上游戏平台和手机应用中广泛存在。一般这类游戏都使用Unity…

MySQL例行检查

MySQL例行检查 1.实例例行检查1.1线程1.2索引1.3临时表1.4连接数1.5BINLOG1.6锁1.7WAIT事件1.8MySQL状态 2.事务与锁例行检查2.1查看索引的cardinality2.2查看是否存在事务阻塞现象2.3查看事务执行时长以及执行的所有SQL2.4事务与锁 3.库表例行检查3.1查看缺失主键的表3.2冗余索…

C# 给方形图片切圆角

写在前面 在有些场景中&#xff0c;给图片加上圆角处理会让视觉效果更美观。 代码实现 /// <summary>/// 将图片处理为圆角/// </summary>/// <param name"image"></param>/// <returns></returns>private Image DrawTranspar…

自动驾驶学习笔记(二十四)——车辆控制开发

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo开放平台9.0专项技术公开课》免费报名—>传送门 文章目录 前言 控制算法 控制标定 控制协议…

《深入理解JAVA虚拟机笔记》并发与线程安全原理

除了增加高速缓存之外&#xff0c;为了使处理器内部的运算单元能尽量被充分利用&#xff0c;处理器可能对输入代码进行乱序执行&#xff08;Out-Of-Order Execution&#xff09;优化。处理器会在计算之后将乱序执行的结果重组&#xff0c;保证该结果与顺序执行的结果一致&#…