Debezium日常分享系列之:Debezium and TimescaleDB

Debezium日常分享系列之:Debezium and TimescaleDB

  • 一、TimescaleDB
  • 二、完整案例
  • 三、Hypertables
  • 四、Continuous aggregates
  • 五、Compression
  • 六、结论

一、TimescaleDB

TimescaleDB 是一个开源数据库,旨在使 SQL 对于时间序列数据具有可扩展性。它是作为 PostgreSQL 数据库的扩展实现的。这一事实促使我们重新使用标准 Debezium PostgreSQL 连接器,并将 TimescaleDB 支持实现为单个消息转换 (SMT)。

TimescaleDB 提供了三个基本构建块/概念:

  • Hypertables
  • Continuous aggregates
  • Compression

描述实例定义的元数据(目录)和原始数据通常存储在 _timescaledb_internal_schema 中。TimescaleDb SMT 连接到数据库并读取和处理元数据。然后,从数据库读取的原始消息会使用存储在 Kafka Connect 标头中的元数据进行丰富,从而创建物理数据和 TimescaleDB 逻辑结构之间的关系。

二、完整案例

Debezium 示例存储库包含基于 Docker Compose 的部署,该部署提供了完整的环境来演示 TimescaleDB 集成。

第一步,开始部署

$ docker-compose -f docker-compose-timescaledb.yaml up --build

该命令将启动 Debezium(Zookeeper、Kafka、Kafka Connect)和源 TimescaleDB 数据库。

启动的数据库已准备好以下数据库对象:

  • 将温度和湿度测量值表示为时间序列数据的超稳定条件;使用 DDL
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')
  • 测量数据的单一记录
INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)

PostgreSQL 出版物用于将时间序列数据发布到复制槽中,因为演示使用 pgoutput 解码插件

CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

下一步需要注册 Debezium PostgreSQL 连接器以捕获数据库中的更改

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml

注册请求文件与常规文件不同,增加了这些行

{"name": "inventory-connector","config": {
..."schema.include.list": "_timescaledb_internal","transforms": "timescaledb","transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb","transforms.timescaledb.database.hostname": "timescaledb","transforms.timescaledb.database.port": "5432","transforms.timescaledb.database.user": "postgres","transforms.timescaledb.database.password": "postgres","transforms.timescaledb.database.dbname": "postgres"}
}

三、Hypertables

连接器将捕获内部 TimescaleDB 架构以及包含原始数据的物理表,并且将应用 TimescaleDb SMT 来丰富消息并根据逻辑名称将它们路由到正确命名的主题。 SMT 配置选项包含连接到数据库所需的信息。在这种情况下,条件超表将物理存储在 _timescaledb_internal._hyper_1_1_chunk 中,并且当由 SMT 处理时,它将重新路由到根据固定配置的前缀 timescaledb 和逻辑名称 public.conditions 命名的 timescaledb.public.conditions 主题符合超表名称。

让我们在表中添加更多测量值

 docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);

并读取捕获的主题消息(在命令中启用打印密钥和标题)

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions

这些消息包含两个标头 debezium_timescaledb_chunk_table:_hyper_1_1_chunk、debezium_timescaledb_chunk_schema:_timescaledb_internal,它们描述了逻辑超表名称与从中捕获它们的物理源表之间的映射。

四、Continuous aggregates

连续聚合对存储在超表中的数据提供自动统计计算。聚合被定义为物化视图,由其自己的超表支持,而超表又由一组物理表支持。重新计算聚合后(手动或自动),新值将存储在超表中,可以从中捕获和流式传输这些值。连接器捕获物理表中的新值,SMT 通过将物理目标重新映射回聚合逻辑名称来再次解决路由问题。还添加了带有原始超表和物理表名称的 Kafka Connect 标头。

让我们创建一个名为conditions_summary的连续聚合,用于计算每个位置和时间间隔的平均、最低和最高温度

postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) ASSELECTlocation,time_bucket(INTERVAL '1 hour', time) AS bucket,AVG(temperature),MAX(temperature),MIN(temperature)FROM conditionsGROUP BY location, bucket;

并阅读捕获的主题消息

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions_summary

这些消息包含两个标头 debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal 公开哪个支持超表用于存储聚合,以及两个附加标头 debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal 公开存储聚合的物理表。

`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.

如果添加新的测量并触发聚合重新计算,则更新的聚合将发送到主题

postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);

看起来像

{"schema":{
...},"payload":{"before":null,"after":{"location":"Ostrava","bucket":"2024-01-09T13:00:00.000000Z","avg":10.0,"max":10.0,"min":10.0},"source":{"version":"2.5.0.Final","connector":"postgresql","name":"dbserver1","ts_ms":1704806938840,"snapshot":"false","db":"postgres","sequence":"[\"29727872\",\"29728440\"]","schema":"public","table":"conditions_summary","txId":764,"lsn":29728440,"xmin":null},"op":"c","ts_ms":1704806939163,"transaction":null}
}

因此,该主题包含针对两个不同位置计算的两条或多条消息。

五、Compression

TimescaleDB SMT 不会增强压缩数据块(物理表记录),而只是将其作为存储在超表中的副产品。压缩后的数据被捕获并存储在 Kafka 主题中。通常,带有压缩块的消息会被丢弃,并且不会被管道中的后续作业处理。

让我们为超表启用压缩并压缩它

postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');show_chunks
----------------------------------------_timescaledb_internal._hyper_1_1_chunk
(1 row)postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');

消息写入 timescaledb._timescaledb_internal._compressed_hypertable_3。

停止服务

docker-compose -f docker-compose-timescaledb.yaml down

六、结论

在这篇文章中,我们演示了从 TimescaleDB 时间序列数据库捕获数据以及通过 TimescaleDb SMT 对其进行处理。我们已经展示了如何根据作为数据源的超表和连续聚合来路由和丰富消息。

深入了解Debezium请阅读博主专栏:

  • Debezium专栏

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/634625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营第二十四天| 77. 组合

77. 组合 题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 解题思路&#xff1a;纵向遍历&#xff0c;遇到叶子节点返回上一节点 java&#xff1a; class Solution {List<List<Integer>> result new ArrayList&l…

ACL配置

ACL&#xff1a;访问控制列表 在路由器流量进或出接口上&#xff0c;匹配流量产生动作-- 允许 拒绝 &#xff08;访问限制&#xff09;定义感兴趣流量--- 匹配流量后&#xff0c;将流量提交给其他的协议进行策略 匹配规则&#xff1a; 至上而下逐一匹配&#xff0c;上条匹配…

蓝桥杯(C++ 整数删除 优先队列 )

优先队列&#xff1a; 优先队列具有队列的所有特性&#xff0c;包括队列的基本操作&#xff0c;只是在这基础上添加了内部的一个排序&#xff0c;它本质是一个堆实现的。 1.头文件&定义 #include <queue> #include <functional> //greater<>// 定义 p…

1d 卷积网络笔记

目录 这个是1d 卷积网络合集&#xff1a; resnet1d的 这个是1d 卷积网络合集&#xff1a; https://github.com/StChenHaoGitHub/1D-deeplearning-model-pytorch/blob/main/ResNet50.py resnet1d的 https://github.com/hsd1503/resnet1d

2023 年顶级前端工具

谁不喜欢一个好的前端工具&#xff1f;在本综述中&#xff0c;您将找到去年流行的有用的前端工具&#xff0c;它们将帮助您加快开发工作流程。让我们深入了解一下&#xff01; 在过去的 12 个月里&#xff0c;我在我的时事通讯 Web Tools Weekly 中分享了数百种工具。我为前端…

经典数据库练习题及答案

数据表介绍 --1.学生表 Student(SId,Sname,Sage,Ssex) --SId 学生编号,Sname 学生姓名,Sage 出生年月,Ssex 学生性别 --2.课程表 Course(CId,Cname,TId) --CId 课程编号,Cname 课程名称,TId 教师编号 --3.教师表 Teacher(TId,Tname) --TId 教师编号,Tname 教师姓名 --4.成绩…

JavaScript DOM可以做什么?

1、通过id获取标签元素 DOM是文档对象模型&#xff0c;它提供了一些属性和方法来方便我们操作document对象&#xff0c;比如getElementById()方法可以通过某个标签元素的id来获取这个标签元素 // 用法 window.document.getElementById(id); // 例子 <!DOCTYPE html> &l…

深度学习模型之yolov8实例分割模型TesorRT部署-python版本

1 模型转换 从github上下载官方yolov8版本&#xff0c;当前使用的版本是2023年9月份更新的版本&#xff0c;作者一直在更新。官网地址 2 加载模型 模型的训练和测试在官方文档上&#xff0c;有详细的说明&#xff0c;yolov8中文文档这里不做过多说明&#xff0c;v8现在训练是…

C#设计模式教程(5):建造者模式

建造者模式的定义 建造者模式(Builder Pattern)是一种创建型设计模式,它允许你创建复杂对象的步骤分离,这样你可以使用相同的创建过程生成不同的表示。建造者模式通常用于处理那些包含多个成员变量的类,特别是当这些成员变量需要经过复杂步骤构建或者有大量可选参数时。 …

一个查看各个软件版本生命周期的网站

在做开发或学习时&#xff0c;经常翻文档找各个SDK版本的支持信息&#xff0c;比较麻烦。发现一个罗列 JDK、SpringBoot、PHP等常见的应用的生命周期追踪网站&#xff1a;https://endoflife.date/。 endoflife.date 目前追踪 286 个产品。

智能驾驶新浪潮:SSD与UFS存储技术如何破浪前行?-UFS篇

如果说SSD是赛道上的超级跑车&#xff0c;那UFS更像是专为智能汽车定制的高性能轻量化赛车。UFS采用串行接口技术&#xff0c;像是闪电侠一样&#xff0c;将数据传输的速度推向新高&#xff0c;大幅缩短了系统启动时间和应用程序加载时间&#xff0c;这对追求即时反应的ADAS系统…

从零开始的 dbt 入门教程 (dbt core 命令进阶篇)

引 根据第一篇文章的约定&#xff0c;我想通过接下来的几篇文章带大家进一步了解 dbt 的用法&#xff0c;原计划这篇文章我会介绍 dbt 命令的进阶用法&#xff0c;进一步认识 dbt 的配置以及如何创建增量表等等零零散散十几个方面的知识点&#xff0c;结果在我写完命令部分发现…

深度学习中Numpy的一些注意点(多维数组;数据类型转换、数组扁平化、np.where()、np.argmax()、图像拼接、生成同shape的图片)

文章目录 1多维数组压缩维度扩充维度 2numpy类型转换深度学习常见的float32类型。 3数组扁平化4np.where()的用法5np.argmax()6图像拼接7生成同shape的图片&#xff0c;指定数据类型 1多维数组 a.shape(3,2);既数组h3&#xff0c;w2 a.shape(2,3,2);这里第一个2表示axis0维度上…

Unity文字游戏开发日志(2)——存档与读档

存档与读档较为简单的实现 今天学习了如何存读档。 采用了Unity自带的方式PlayerPrefs 写了一个示例代码 功能是&#xff1a;建立一个名字的新档&#xff0c;每次打开游戏名字都会变。 PlayerPrefs.SetString("save","kkk");//创建名为save数据&#…

奇异值分解(SVD)【详细推导证明】

机器学习笔记 机器学习系列笔记&#xff0c;主要参考李航的《机器学习方法》&#xff0c;见参考资料。 第一章 机器学习简介 第二章 感知机 第三章 支持向量机 第四章 朴素贝叶斯分类器 第五章 Logistic回归 第六章 线性回归和岭回归 第七章 多层感知机与反向传播【Python实例…

Dubbo-admin监控中心

监控中心 Dubbo-admin监控中心执行操作启动provider和consumer项目进行测试总体流程 Dubbo-admin监控中心 dubbo-admin下载路径 git clone https://github.com/apache/dubbo-admin.git图1-1 dubbo-admin项目文件展示 执行操作 # 启动zookeeper# 前端 cd dubbo-admin-ui npm i…

Linux 设备树详解

目录 1、概述 2、节点&#xff08; node&#xff09;和属性&#xff08; property&#xff09; 2.1、DTS 描述键值对的语法&#xff1a; 2.2 节点语法规范说明 2.3节点名及节点路径 2.4 节点别名&#xff08;节点引用&#xff09; 2.5 合并节点内容 2.6 替换节点内容 2…

Vue2:用ref方式绑定自定义事件的注意事项

一、场景描述 我们知道绑定自定义事件可以用ref方式实现。 但是&#xff0c;这个方式&#xff0c;需要注意下&#xff0c;否则&#xff0c;实现不了我们的效果。 需求是这样的&#xff0c;我们通过ref绑定的事件&#xff0c;来给App的data块中的变量赋值。 二、绑定自定义事…

java:流程控制

一、流程控制语句分类 顺序结构分支结构&#xff08;if&#xff0c;switch&#xff09;循环结构&#xff08;for&#xff0c;while&#xff0c;do...while&#xff09; 二、顺序结构 定义&#xff1a;顺序结构是程序中最基本的流程控制&#xff0c;没有特定的语法结构&#…