ClickHouse10-ClickHouse中Kafka表引擎

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。
在这里插入图片描述

Kafka大家肯定不陌生:

  • 它可以用于发布和订阅数据流,是常见的队列使用方式
  • 它可以组织容错存储,是常见的容错存储的使用方式
  • 它可以在流可用时对其进行处理,是常见的大数据处理的使用方式

全文概览:

  • 基本语法
  • 从 Kafka 写入到 ClickHouse
  • 从 ClickHouse 写入到 Kafka
    • 测试1:queue->ck->queue
    • 测试2:ck->queue

基本语法

分为定义表结构和定义Kafka的接入参数,Kafka的接入参数都是常见的字段

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [ALIAS expr1],name2 [type2] [ALIAS expr2],...
) ENGINE = Kafka()
SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0,][kafka_client_id = '',][kafka_poll_timeout_ms = 0,][kafka_poll_max_batch_size = 0,][kafka_flush_interval_ms = 0,][kafka_thread_per_consumer = 0,][kafka_handle_error_mode = 'default',][kafka_commit_on_select = false,][kafka_max_rows_per_message = 1];

示例:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

从 Kafka 写入到 ClickHouse

创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test_ck_sync1

创建同步表:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'CREATE TABLE IF NOT EXISTS test_ck_sync1_res
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(sys_time)
ORDER BY tuple()

创建物化视图,进行数据样式的转换:

CREATE MATERIALIZED VIEW test_ck_sync1_mv TO test_ck_sync1_res AS
SELECTsys_time,num
FROM test_ck_sync1

通过console写入数据:

[$ kafka_2.13-3.6.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_ck_sync1
>2024-01-01 00:00:01|89  

验证数据:

$ :) select * from test_ck_sync1_res;SELECT *
FROM test_ck_sync1_resQuery id: a666f893-5be9-4022-9327-3a1507aa5485┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:01 │  89 │
└─────────────────────┴─────┘
┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:00 │  88 │
└─────────────────────┴─────┘2 rows in set. Elapsed: 0.049 sec.

从 ClickHouse 写入到 Kafka

kafka_writers_reader --(view)--> kafka_writers_queue ---> 

创建一个队列:

bin/kafka-topics.sh --topic kafka_writers --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

创建同步表:

CREATE TABLE kafka_writers_reader (     `id` Int,     `platForm` String,     `appname` String,     `time` DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';CREATE TABLE kafka_writers_queue (     id Int,     platForm String,     appname String,     time DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',        kafka_topic_list = 'kafka_writers',        kafka_group_name = 'kafka_writers_group',        kafka_format = 'CSV',       kafka_max_block_size = 1048576;

测试1:queue->ck->queue

通过写入队列kafka_writers_reader,借助ClickHouse写入队列kafka_writers

bin/kafka-topics.sh --topic kafka_writers_reader --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka_writers_readerbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers

测试2:ck->queue

通过写入表kafka_writers_reader,写入队列kafka_writers

$ :) INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (8,'Data','Test','2020-12-23 14:45:31'), 
(9,'Plan','Test1','2020-12-23 14:47:32'), 
(10,'Plan','Test2','2020-12-23 14:52:15'), 
(11,'Data','Test3','2020-12-23 14:54:39');INSERT INTO kafka_writers_reader (id, platForm, appname, time) FORMAT ValuesQuery id: 223a63ab-97fa-488d-8ea7-c2e194155d26Ok.4 rows in set. Elapsed: 1.054 sec. 
[$ kafka_2.13-3.6.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers
8,"Data","Test","1970-01-01 08:00:00"9,"Plan","Test1","1970-01-01 08:00:00"10,"Plan","Test2","1970-01-01 08:00:00"11,"Data","Test3","1970-01-01 08:00:00"

如果喜欢我的文章的话,可以去GitHub上给一个免费的关注吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/775338.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu 配置 kubernetes 学习环境,让外部访问 dashboard

Ubuntu 配置 kubernetes 学习环境 一、安装 1. minikube 首先下载一下 minikube,这是一个单机版的 k8s,只需要有容器环境就可以轻松启动和学习 k8s。 首先你需要有Docker、QEMU、Hyperkit等其中之一的容器环境,以下使用 docker 进行。 对…

【C++】编码规范之可靠性原则

C编码规范中的可靠性原则是确保代码的可读性、可维护性和稳定性,以下是几个小点及其例子: 避免使用全局变量: 如果需要多个变量在全局范围内使用,可用context(结构体/类)解决耦合性问题 // 不推荐的写法&…

65W智能快充—同为科技桌面PDU插座推荐

近10年,移动设备的智能化、功能化已经完全且紧密的融入到我们的基础生活与工作当中。 在常态化的电子设备的应用中,设备的电力续航以及后续的供电充电就尤为重要。 就目前而言,所有消费电子产品中的输入以及充电的接口,usb-c可以…

酷开科技依托酷开系统用“平台+产品+场景”塑造全屋智能生活!

杰弗里摩尔的“鸿沟理论”中写道:高科技企业推进产品的早期市场和产品被广泛接受的主流市场之间,存在着一条巨大的“鸿沟”。“鸿沟”,指产品吸引早期接纳者后、赢得更多客户前的那段间歇,以及其中可预知和不可预知的阻碍。多数产…

面试中会被问到的GIT问题解答(含答案)

在现代软件开发中,Git已经成为了版本控制系统的事实标准。无论是在个人项目还是大型企业级开发中,Git都是不可或缺的工具。因此,掌握Git的基本操作和高级特性对于软件开发者来说是非常重要的。以下是根据提供的文件内容,总结出的3…

基于Rflysim平台的无人机拦截三维比例导引算法仿真

【后厂村路钢铁侠出品】 一、Rflysim简介 RflySim是一套专为科研和教育打造的Pixhawk /PX4 和MATLAB/Simulink生态系统或工具链,采用基于模型设计(Model-Based Design, MBD)的思想,可用于无人系统的控制和安全测试。…

如何创建azure pipeline

Azure Pipelines是一种持续集成和持续交付(CI/CD)工具,可以帮助开发团队自动化构建、测试和部署应用程序。以下是创建Azure Pipeline的步骤: 登录到Azure DevOps(https://dev.azure.com/)。在Azure DevOps…

区块链食品溯源案例实现(一)

引言: 食品安全问题一直是社会关注的热点,而食品溯源作为解决食品安全问题的重要手段,其重要性不言而喻。传统的食品溯源系统往往存在数据易被篡改、信息不透明等问题,而区块链技术的引入,为食品溯源带来了革命性的变革…

设计模式之装饰模式解析

装饰模式 1)概述 1.定义 动态地给一个对象增加一些额外的职责,在增加对象功能时,装饰模式比生成子类实现更为灵活。 2.作用 装饰模式可以在不改变一个对象本身功能的基础上给对象增加额外的新行为。 3.结构图 4.角色 Component&#xf…

【React】React 内置 Hook

React 内置 Hook 是一组允许你在函数组件中使用 state 和其他 React 特性的函数。它们极大地扩展了函数组件的功能,使得在无需编写 class 的情况下也能使用 React 的全部功能。以下是一些主要的 React 内置 Hook 的介绍: 1.useState useState 是用于在函数组件中添加状态(…

深入理解 @Transactional 注解在 Spring 中的应用

前言:在 Java 开发中,事务管理是非常重要的一环。Spring 框架提供了Transactional注解来简化事务管理的操作,本文将深入介绍Transactional注解的用法,并结合代码示例进行详细讨论。 1.Transactional 注解简介 Transactional注解是…

银行卡的分类

银行卡是银行账户的一种体现形式,它是由银行机构发行的具有消费信用、转账结算、存取现金等全部或部分功能作为结算支付工具的各类卡的统称。 (1)按是否具有授信额度分类 ①借记卡:借记卡是指发卡银行向申请人签发的,没…

Machine Learning机器学习之向量机(Support Vector Machine,SVM)

目录 前言 算法提出背景: 核心思想: 原理: 应用领域: 一、支持向量机分类(主要变体) 二、构建常见的支持向量机模型 基于Python 中的 Scikit-learn 库构建线性支持向量机(SVM) 三、向…

06. 详解 Java 的 Object 类和常见类

Object 类 java.lang.Object 作为所有 Java 类的祖先&#xff0c;编译系统默认继承 Object 类&#xff0c;Object 类包含了所有 Java 类的公共属性和方法。 Object() 构造方法getClass():Class<?>public boolean equals(Object obj) 比较两对象封装的数据是否相等&…

SQLite中的动态内存分配(五)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLite中的原子提交&#xff08;四&#xff09; 下一篇&#xff1a;SQLite使用的临时文件&#xff08;二&#xff09; ​概述 SQLite使用动态内存分配来获得 用于存储各种对象的内存 &#xff08;例如&#xff1a…

Django开发复盘

一、URL 对于一个不会写正则表达式的蒟蒻来说&#xff0c;在urls.py中就只能傻傻的写死名字&#xff0c;但是即便这样&#xff0c;还会有很多相对路径和绝对路径的问题&#xff08;相对ip端口的路径&#xff09;&#xff0c;因为我们网页中涉及到页面跳转&#xff0c;涉及到发送…

uniapp 用web-view 嵌套uniapp

1. uniapp 用web-view 嵌套uniapp uniapp开发的APP要嵌套uniapp开发的h5,并且APP后面还要打包H5,这就涉及app和h5之间的通信,h5和h5之间的通信。 1.1. 准备工作 无论是app和h5通信还是 h5和h5之间的通信都是需要引入web-view的sdk文件 我下载的是1.5.2版本,代码如下 !(functi…

目标检测系列模型发展历程

常见数据集&#xff1a; VOC-->COCO 模型发展&#xff1a; RCNN-->Fast RCNN-->Faster RCNN-->Mask RCNN 这一系列的模型&#xff08;RCNN、Fast RCNN、Faster RCNN、Mask RCNN&#xff09;代表了计算机视觉特别是在物体检测和分割领域的一系列重大进展。下面&a…

神经网络:梯度下降法更新模型参数

作者&#xff1a;CSDN _养乐多_ 在神经网络领域&#xff0c;梯度下降是一种核心的优化算法&#xff0c;本文将介绍神经网络中梯度下降法更新参数的公式&#xff0c;并通过实例演示其在模型训练中的应用。通过本博客&#xff0c;读者将能够更好地理解深度学习中的优化算法和损…

【LeetCode】20. 有效的括号(Java自用版)

栈 首先&#xff0c;我们定义一个isValid方法&#xff0c;该方法接受一个字符串s作为参数&#xff0c;并返回一个布尔值来表示该字符串是否有效。 public boolean isValid(String s) {// 如果字符串为空&#xff0c;则自然是有效的if (s.isEmpty())return true;// 创建一个栈…