ClickHouse Kafka 引擎教程

如果您刚开始并且第一次设置 Kafka 和 ClickHouse 需要帮助怎么办?这篇文章也许会提供下帮助。

我们将通过一个端到端示例,使用 Kafka 引擎将数据从 Kafka 主题加载到 ClickHouse 表中。我们还将展示如何重置偏移量和重新加载数据,以及如何更改表架构。最后,我们将演示如何将数据从 ClickHouse 写回 Kafka 主题。

先决条件

下面的练习假设你已经安装并运行了 Kafka 和 ClickHouse。为了方便起见,我们使用了 Kubernetes。Kafka 版本是 Confluent 5.4.0,使用带有三个 Kafka 代理的 Kafka helm chart 安装。ClickHouse版本为20.4.2,使用ClickHouse Kubernetes Operator安装在单个节点上。

这些练习应该适用于任何类型的安装,但您需要相应地更改主机名。如果 Kafka 代理较少,则可能还需要更改复制因子。

Kafka-ClickHouse 集成概述

Kafka 是一种极具可扩展性的消息总线。它的核心是由运行在不同主机上的代理管理的分布式日志。以下是应用程序模型的简短说明。

生产者将消息写入主题,主题是一组消息。使用者从主题中读取消息,该主题分布在分区上。消费者被安排在消费者组中,这允许应用程序从 Kafka 并行读取消息,而不会丢失或重复。

下图说明了上述主要部分。

ClickHouse 可以使用 Kafka 表引擎和物化视图直接从 Kafka 主题读取消息,该视图获取消息并将其推送到 ClickHouse 目标表。目标表通常使用 MergeTree 引擎或 ReplicatedMergeTree 等变体来实现。消息流如下图所示。

也可以从 ClickHouse 写回 Kafka。消息流更简单 - 只需插入到 Kafka 表中即可。下面是流程图。

在 Kafka 上创建主题

现在让我们在 Kafka 上设置一个主题,我们可以使用它来加载消息。登录到 Kafka 服务器,然后使用以下示例中的命令创建主题。在此示例中,“kafka”是服务器的 DNS 名称。如果您有其他 DNS 名称,请改用该名称。您还可以调整分区数以及复制因子。

kafka-topics \
--bootstrap-server kafka:9092 \
--topic readings \
--create --partitions 6 \
--replication-factor 2

检查主题是否已成功创建。

kafka-topics --bootstrap-server kafka:9092 --describe readings
你将看到如下所示的输出,其中显示了其分区的主题和当前状态。
Topic: readings    PartitionCount: 6    ReplicationFactor: 2    Configs:Topic: readings    Partition: 0    Leader: 0    Replicas: 0,2    Isr: 0,2Topic: readings    Partition: 1    Leader: 2    Replicas: 2,1    Isr: 2,1Topic: readings    Partition: 2    Leader: 1    Replicas: 1,0    Isr: 1,0Topic: readings    Partition: 3    Leader: 0    Replicas: 0,1    Isr: 0,1Topic: readings    Partition: 4    Leader: 2    Replicas: 2,0    Isr: 2,0Topic: readings    Partition: 5    Leader: 1    Replicas: 1,2    Isr: 1,2

现在 Kafak 准备工作已完成。让我们转向ClickHouse。

ClickHouse Kafka 引擎设置

要将数据从 Kafka 主题读取到 ClickHouse 表,我们需要做三件事:

  • 一个目标 MergeTree 表,用于为引入的数据提供主目录

  • 一个 Kafka 引擎表,使主题看起来像一个 ClickHouse 表

  • 用于自动将数据从 Kafka 移动到目标表的具体化视图

首先,我们将定义目标 MergeTree 表。登录到 ClickHouse 执行以下 SQL

CREATE TABLE readings (readings_id Int32 Codec(DoubleDelta, LZ4),time DateTime Codec(DoubleDelta, LZ4),date ALIAS toDate(time),temperature Decimal(5,2) Codec(T64, LZ4)
) Engine = MergeTree
PARTITION BY toYYYYMM(time)
ORDER BY (readings_id, time);

接下来,我们需要使用 Kafka 引擎创建一个表来连接主题并读取数据。引擎将使用主题“readings”和消费者组名称“readings consumer_group1”从主机 kafka 的代理读取数据。输入格式为 CSV。

请注意,我们省略了“date”列。它是目标表中的别名,将从“time”列自动填充。


CREATE TABLE readings_queue (readings_id Int32,time DateTime,temperature Decimal(5,2)
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-headless.kafka:9092',kafka_topic_list = 'readings',kafka_group_name = 'readings_consumer_group1',kafka_format = 'CSV',kafka_max_block_size = 1048576;

前面的设置处理最简单的情况:单个代理、单个主题且没有专用配置。

最后,我们创建一个物化视图,用于在 Kafka 和合并树表之间传输数据。

CREATE MATERIALIZED VIEW readings_queue_mv TO readings AS
SELECT readings_id, time, temperature
FROM readings_queue;

这就是 Kafka 到 ClickHouse 的集成。让我们来测试一下。

加载数据

现在是时候使用 kafka-console-producer 命令加载一些输入数据了。以下示例使用 CSV 格式添加三条记录。


kafka-console-producer --broker-list kafka:9092 --topic readings <<END
1,"2020-05-16 23:55:44",14.2
2,"2020-05-16 23:55:45",20.1
3,"2020-05-16 23:55:51",12.9
END
传输到 readings  表需要几秒钟。如果我们从表中进行查询,我们会得到以下输出。
SELECT *
FROM readings┌─readings_id─┬────────────────time─┬─temperature─┐
│           1 │ 2020-05-16 23:55:44 │       14.20 │
│           2 │ 2020-05-16 23:55:45 │       20.10 │
│           3 │ 2020-05-16 23:55:51 │       12.90 │
└─────────────┴─────────────────────┴─────────────┘

Kafka 和 ClickHouse 现已连接。

从 Kafka 重读消息

前面的示例从 Kafka 主题中的起始位置开始,并在消息到达时读取消息。这是正常方式,但有时再次阅读消息很有用。例如,您可能希望在修复架构中的 bug 或重新加载备份后重新读取消息。幸运的是,这很容易做到。我们只是重置使用者组中的偏移量。

假设我们丢失了 readings 表中的所有消息,并希望从 Kafka 重新加载它们。首先,让我们使用 TRUNCATE 命令“丢失”消息。

TRUNCATE TABLE readings;

在重置分区上的偏移量之前,我们需要关闭消息消费。为此,请在 ClickHouse 中分离 readings_queue 表,如下所示。

DETACH TABLE readings_queue

接下来,使用以下 Kafka 命令重置用于 readings_queue 表的使用者组中的分区偏移量 (kafka 节点执行)。

kafka-consumer-groups --bootstrap-server kafka:9092 \--topic readings --group readings_consumer_group1 \--reset-offsets --to-earliest --execute

现在重新连接readings_queue表。

ATTACH TABLE readings_queue

等待几秒钟,丢失的记录将被恢复。您可以运行 SELECT 来确认它们已恢复。

添加虚拟列

使用显示原始 Kafka 消息坐标的信息标记行通常很有用。为此,Kafka 表引擎自动定义了虚拟列。下面介绍如何更改 readings 表以显示源主题、分区和偏移量。

首先,让我们通过分离 Kafka 表来禁用消息使用。消息可能会堆积在主题上,但我们不会丢失它们。

DETACH TABLE readings_queue

接下来,我们通过连续执行以下 SQL 命令来更改目标表和物化视图。请注意,我们只是删除并重新创建具体化视图,而更改目标表,从而保留现有数据。

ALTER TABLE readingsADD COLUMN _topic String,ADD COLUMN _offset UInt64,ADD COLUMN _partition UInt64DROP TABLE readings_queue_mvCREATE MATERIALIZED VIEW readings_queue_mv TO readings ASSELECT readings_id, time, temperature, _topic, _offset, _partitionFROM readings_queue;

最后,我们通过重新附加 readings_queue 表来再次启用消息使用。


ATTACH TABLE readings_queue

您可以通过截断表并重新加载消息来确认新架构,就像我们在上一节中所做的那样。如果查询数据,它将如下所示。

SELECTreadings_id AS id, time, temperature AS temp,_topic, _offset, _partition
FROM readings┌─id─┬────────────────time─┬──temp─┬─_topic───┬─_offset─┬─_partition─┐
│  1 │ 2020-05-16 23:55:44 │ 14.20 │ readings │       0 │          5 │
│  2 │ 2020-05-16 23:55:45 │ 20.10 │ readings │       1 │          5 │
│  3 │ 2020-05-16 23:55:51 │ 12.90 │ readings │       2 │          5 │
└────┴─────────────────────┴───────┴──────────┴─────────┴────────────┘

顺便说一句,上述过程与在消息格式更改时升级架构的方式相同。此外,物化视图提供了一种非常通用的方法,可以使 Kafka 消息适应目标表行。您甚至可以定义多个具体化视图,以将消息流拆分到不同的目标表中。

从 ClickHouse 写入 Kafka

在本教程的最后,我们将展示如何将消息从 ClickHouse 写回 Kafka。这是一个相对较新的功能,在当前的 Altinity 稳定版本 19.16.18.85 中可用。

让我们首先在 Kafka 中创建一个新主题来包含消息。我们称其为“readings_high”

kafka-topics \
--bootstrap-server kafka:9092 \
--topic readings_high \
--create --partitions 6 \
--replication-factor 2

接下来,我们需要使用 Kafka 表引擎定义一个指向新主题的表。事实证明,此表可以读取和写入消息,但在此示例中,我们将仅使用它进行写入。

CREATE TABLE readings_high_queue (readings_id Int32,time DateTime,temperature Decimal(5,2)
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',kafka_topic_list = 'readings_high',kafka_group_name = 'readings_high_consumer_group1',kafka_format = 'CSV',kafka_max_block_size = 1048576;

最后,让我们添加一个实例化视图,将温度大于 20.0 的所有行传输到 readings_high_queue 表。此示例说明了 ClickHouse 物化视图的另一个用例,即在特定条件下生成事件。

CREATE MATERIALIZED VIEW readings_high_queue_mv TO readings_high_queue AS
SELECT readings_id, time, temperature FROM readings
WHERE toFloat32(temperature) >= 20.0

在单独的终端窗口中启动消费者,以从 Kafka 上的 readings_high 主题打印消息,如下所示。这将允许您在 ClickHouse 将行写入 Kafka 时查看行。

kafka-console-consumer --bootstrap-server kafka:9092 --topic readings_high

最后,加载一些数据,这些数据将演示如何写回 Kafka。让我们在原始主题中添加一个新批量。在另一个窗口中运行以下命令。

kafka-console-producer --broker-list kafka:9092 --topic readings <<END
4,"2020-05-16 23:55:52",9.7
5,"2020-05-16 23:55:56",25.3
6,"2020-05-16 23:55:58",14.1
END

几秒钟后,您将在运行 kafka-console-consumer 命令的窗口中看到第二行弹出。它应如下所示:

5,"2020-05-16 23:55:56",25.3

故障处理

如果您在使用任何示例时遇到问题,请查看 ClickHouse 日志。如果尚未启用跟踪日志记录,请启用跟踪日志记录。您可以看到如下消息,这些消息表示 Kafka 表引擎中的活动。

2020.05.17 07:24:20.609147 [ 64 ] {} <Debug> StorageKafka (readings_queue): Started streaming to 1 attached views

所有错误将保存在在clickhouse-server.err.log中。

结论

正如这篇博客文章所展示的,Kafka 表引擎提供了一种简单而强大的方法来集成 Kafka 主题和 ClickHouse 表。显然,管理集成还有很多工作要做,尤其是在生产系统中。我们希望本文能帮助您入门,并使您能够自己探索其他可能性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225390.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于技术架构的思考

技术选型实则是取舍的艺术 这句话是我偶然在一篇技术架构方面的文章上看到的&#xff0c;每当我需要给新项目进行技术选型&#xff0c;决定技术架构时&#xff0c;一直坚信的。 当我们做技术选型时&#xff0c;需要考虑的东西非常多。比如&#xff0c;用关系型数据库还是非关…

智能优化算法应用:基于海洋捕食者算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于海洋捕食者算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于海洋捕食者算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.海洋捕食者算法4.实验参数设定5.算法…

Lvs-NAT部署

目录 一.什么是lvs 二.集群的类型 三.lvs的三种模式 四.lvs调度算法 五.LVS-NAT模式部署 一.什么是lvs lvs负载均衡群集&#xff1a;由多台主机构成&#xff0c;对外表现为一个整体&#xff0c;只提供一个访问入口&#xff0c;相当于一个大计算机。 二.集群的类型 1&am…

【系统设计】如何确保消息不会丢失?

一、前言 对于大部分业务系统来说&#xff0c;丢消息意味着数据丢失&#xff0c;是完全无法接受的。其实&#xff0c;现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制&#xff0c;完全可以做到在消息传递过程中&#xff0c;即使发生网络中断或者硬件故障&#xf…

解决kernel32.dll丢失的修复方式,kernel32.dll预防错误的方法

kernel32.dll文件是电脑中的一个重要文件&#xff0c;如果电脑出现kernel32.dll丢失的错误提示&#xff0c;那么电脑中的一些程序将不能正常使用&#xff0c;那么出现这样的问题有什么解决办法呢&#xff1f;那么今天就和大家说说解决kernel32.dll丢失的修复方式。 一.kernel32…

[Knowledge Distillation]论文分析:Distilling the Knowledge in a Neural Network

文章目录 一、完整代码二、论文解读2.1 介绍2.2 Distillation2.3 结果 三、整体总结 论文&#xff1a;Distilling the Knowledge in a Neural Network 作者&#xff1a;Geoffrey Hinton, Oriol Vinyals, Jeff Dean 时间&#xff1a;2015 一、完整代码 这里我们使用python代码进…

探索Linux服务器配置信息的命令

目录 前言1 uname2 lscpu3 free4 df5 lspci6 lsusb7 lshw结语 前言 Linux系统提供了许多命令&#xff0c;用于获取和查看服务器的软硬件配置信息。这些命令可以帮助管理员和用户了解系统的状态、资源使用情况以及硬件设备的相关信息。以下是一些常用的命令以及它们的作用、使用…

[Vulnhub靶机] DC-1

[Vulnhub靶机] DC-1靶机渗透思路及方法&#xff08;个人分享&#xff09; 靶机下载地址&#xff1a; https://download.vulnhub.com/dc/DC-1.zip 靶机地址&#xff1a;192.168.67.25 攻击机地址&#xff1a;192.168.67.3 一、信息收集 1.使用 arp-scan 命令扫描网段内存活的…

html的学习笔记

开发工具&#xff1a;vscode 文字标签 h1:一级标题&#xff0c;h2&#xff1a;二级标题h6 p&#xff1a;段落标签 hr&#xff1a;分隔线 br&#xff1a;换行 strong/b&#xff1a;文字加粗 ins/u:下划线 em/i&#xff1a;倾斜 del/s&#xff1a;删除线 媒体标签 图片…

vue中使用ailwind css

官网地址&#xff1a; 安装 - Tailwind CSS 中文网 推荐一个网站&#xff0c;里面可以查询所有的TailWindCSS的class样式&#xff1a; Tailwind CSS Cheat Sheet npm安装&#xff1a; 注意&#xff1a;1、这里要用npm&#xff0c;不要用cnpm。2、最好用install&#xff0c;不要…

泛型的相关内容

首先我们来了解一下什么是泛型&#xff0c;泛型的作用又是什么。 泛型的形式是 ArrayList<Object> objects new ArrayList<>(); 这里的<Object>这个就是泛型&#xff0c;添加泛型的作用又是什么呢&#xff0c;它可以限制添加对象的类型&#xff0c;比如A…

黑马点评05分布式锁 1互斥锁和过期时间

实战篇-09.分布式锁-基本原理和不同实现方式对比_哔哩哔哩_bilibili 1.分布式锁 因为jvm内部的sychonized锁无法在不同jvm之间共享锁监视器&#xff0c;所以需要一个jvm外部的锁来共享。 2.redis setnx互斥锁 加锁解锁即可 2.1不释放锁可能死锁 redis 的setnx不会自动释放锁…

用CC三维建模建出的OSGB格式,用模方打不开,显示该路径包含OSGB瓦块数量0,是什么原因?

答&#xff1a;模方只识别tile命名的模型文件&#xff0c;此模型是不分块输出&#xff0c;要平面切块重新跑。 模方是一款针对实景三维模型的冗余碎片、水面残缺、道路不平、标牌破损、纹理拉伸模糊等共性问题研发的实景三维模型修复编辑软件。模方4.1新增自动单体化建模功能&…

巴贝拉葡萄酒是单一品种还是混合品种制成的?

大多数巴贝拉葡萄酒都是由单一的巴贝拉葡萄品种制成的&#xff0c;许多意大利葡萄酒商开始尝试在巴贝拉葡萄酒中加入其它葡萄品种&#xff0c;其中两个最受欢迎的意大利品种是皮埃蒙特的巴贝拉德阿尔巴和达斯蒂。和朋友在一家意大利餐厅吃饭&#xff0c;被酒单吓到了&#xff1…

10.1Linux输入子系统介绍

输入设备介绍 鼠标、键盘、按键、触摸屏等提供输入支持的设备都属于输入设备&#xff0c;在Linux也提供了一套驱动框架“input 子系统”与之对应&#xff0c;用于抽象输入设备&#xff0c;并提供管理输入设备驱动和输入事件处理程序的功能 input 子系统 input 子系统用于管理…

GPT 魔力涌现

GPT 二、Prompt 的典型构成 角色&#xff1a;给 AI 定义一个最匹配任务的角色&#xff0c;比如&#xff1a;「你是一位软件工程师」「你是一位小学老师」指示&#xff1a;对任务进行描述上下文&#xff1a;给出与任务相关的其它背景信息&#xff08;尤其在多轮交互中&#xff…

Java: Random

/*** encoding: utf-8* 版权所有 2023 涂聚文有限公司* 许可信息查看&#xff1a;* 描述&#xff1a; //https://commons.apache.org/proper/commons-lang/javadocs/api-2.6/org/apache/commons/lang/RandomStringUtils.html* //https://commons.apache.org/pro…

详解RTC:以华人文化打造链上生态

文化是人类在发展的历史长河中淘洗出来的智慧结晶&#xff0c;随着人类社会的进步和变迁&#xff0c;经历了从口口相传到互联网等不同历史时代的传承和创新。在数字技术飞速发展的当今&#xff0c;区块链技术为文化的创新与传承提供了全新的空间和方式&#xff0c;使其得以在新…