ClickHouse10-ClickHouse中Kafka表引擎

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。
在这里插入图片描述

Kafka大家肯定不陌生:

  • 它可以用于发布和订阅数据流,是常见的队列使用方式
  • 它可以组织容错存储,是常见的容错存储的使用方式
  • 它可以在流可用时对其进行处理,是常见的大数据处理的使用方式

全文概览:

  • 基本语法
  • 从 Kafka 写入到 ClickHouse
  • 从 ClickHouse 写入到 Kafka
    • 测试1:queue->ck->queue
    • 测试2:ck->queue

基本语法

分为定义表结构和定义Kafka的接入参数,Kafka的接入参数都是常见的字段

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [ALIAS expr1],name2 [type2] [ALIAS expr2],...
) ENGINE = Kafka()
SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0,][kafka_client_id = '',][kafka_poll_timeout_ms = 0,][kafka_poll_max_batch_size = 0,][kafka_flush_interval_ms = 0,][kafka_thread_per_consumer = 0,][kafka_handle_error_mode = 'default',][kafka_commit_on_select = false,][kafka_max_rows_per_message = 1];

示例:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

从 Kafka 写入到 ClickHouse

创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test_ck_sync1

创建同步表:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'CREATE TABLE IF NOT EXISTS test_ck_sync1_res
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(sys_time)
ORDER BY tuple()

创建物化视图,进行数据样式的转换:

CREATE MATERIALIZED VIEW test_ck_sync1_mv TO test_ck_sync1_res AS
SELECTsys_time,num
FROM test_ck_sync1

通过console写入数据:

[$ kafka_2.13-3.6.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_ck_sync1
>2024-01-01 00:00:01|89  

验证数据:

$ :) select * from test_ck_sync1_res;SELECT *
FROM test_ck_sync1_resQuery id: a666f893-5be9-4022-9327-3a1507aa5485┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:01 │  89 │
└─────────────────────┴─────┘
┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:00 │  88 │
└─────────────────────┴─────┘2 rows in set. Elapsed: 0.049 sec.

从 ClickHouse 写入到 Kafka

kafka_writers_reader --(view)--> kafka_writers_queue ---> 

创建一个队列:

bin/kafka-topics.sh --topic kafka_writers --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

创建同步表:

CREATE TABLE kafka_writers_reader (     `id` Int,     `platForm` String,     `appname` String,     `time` DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';CREATE TABLE kafka_writers_queue (     id Int,     platForm String,     appname String,     time DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',        kafka_topic_list = 'kafka_writers',        kafka_group_name = 'kafka_writers_group',        kafka_format = 'CSV',       kafka_max_block_size = 1048576;

测试1:queue->ck->queue

通过写入队列kafka_writers_reader,借助ClickHouse写入队列kafka_writers

bin/kafka-topics.sh --topic kafka_writers_reader --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka_writers_readerbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers

测试2:ck->queue

通过写入表kafka_writers_reader,写入队列kafka_writers

$ :) INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (8,'Data','Test','2020-12-23 14:45:31'), 
(9,'Plan','Test1','2020-12-23 14:47:32'), 
(10,'Plan','Test2','2020-12-23 14:52:15'), 
(11,'Data','Test3','2020-12-23 14:54:39');INSERT INTO kafka_writers_reader (id, platForm, appname, time) FORMAT ValuesQuery id: 223a63ab-97fa-488d-8ea7-c2e194155d26Ok.4 rows in set. Elapsed: 1.054 sec. 
[$ kafka_2.13-3.6.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers
8,"Data","Test","1970-01-01 08:00:00"9,"Plan","Test1","1970-01-01 08:00:00"10,"Plan","Test2","1970-01-01 08:00:00"11,"Data","Test3","1970-01-01 08:00:00"

如果喜欢我的文章的话,可以去GitHub上给一个免费的关注吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/775338.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu 配置 kubernetes 学习环境,让外部访问 dashboard

Ubuntu 配置 kubernetes 学习环境 一、安装 1. minikube 首先下载一下 minikube,这是一个单机版的 k8s,只需要有容器环境就可以轻松启动和学习 k8s。 首先你需要有Docker、QEMU、Hyperkit等其中之一的容器环境,以下使用 docker 进行。 对…

65W智能快充—同为科技桌面PDU插座推荐

近10年,移动设备的智能化、功能化已经完全且紧密的融入到我们的基础生活与工作当中。 在常态化的电子设备的应用中,设备的电力续航以及后续的供电充电就尤为重要。 就目前而言,所有消费电子产品中的输入以及充电的接口,usb-c可以…

酷开科技依托酷开系统用“平台+产品+场景”塑造全屋智能生活!

杰弗里摩尔的“鸿沟理论”中写道:高科技企业推进产品的早期市场和产品被广泛接受的主流市场之间,存在着一条巨大的“鸿沟”。“鸿沟”,指产品吸引早期接纳者后、赢得更多客户前的那段间歇,以及其中可预知和不可预知的阻碍。多数产…

基于Rflysim平台的无人机拦截三维比例导引算法仿真

【后厂村路钢铁侠出品】 一、Rflysim简介 RflySim是一套专为科研和教育打造的Pixhawk /PX4 和MATLAB/Simulink生态系统或工具链,采用基于模型设计(Model-Based Design, MBD)的思想,可用于无人系统的控制和安全测试。…

区块链食品溯源案例实现(一)

引言: 食品安全问题一直是社会关注的热点,而食品溯源作为解决食品安全问题的重要手段,其重要性不言而喻。传统的食品溯源系统往往存在数据易被篡改、信息不透明等问题,而区块链技术的引入,为食品溯源带来了革命性的变革…

设计模式之装饰模式解析

装饰模式 1)概述 1.定义 动态地给一个对象增加一些额外的职责,在增加对象功能时,装饰模式比生成子类实现更为灵活。 2.作用 装饰模式可以在不改变一个对象本身功能的基础上给对象增加额外的新行为。 3.结构图 4.角色 Component&#xf…

银行卡的分类

银行卡是银行账户的一种体现形式,它是由银行机构发行的具有消费信用、转账结算、存取现金等全部或部分功能作为结算支付工具的各类卡的统称。 (1)按是否具有授信额度分类 ①借记卡:借记卡是指发卡银行向申请人签发的,没…

Machine Learning机器学习之向量机(Support Vector Machine,SVM)

目录 前言 算法提出背景: 核心思想: 原理: 应用领域: 一、支持向量机分类(主要变体) 二、构建常见的支持向量机模型 基于Python 中的 Scikit-learn 库构建线性支持向量机(SVM) 三、向…

SQLite中的动态内存分配(五)

返回:SQLite—系列文章目录 上一篇:SQLite中的原子提交(四) 下一篇:SQLite使用的临时文件(二) ​概述 SQLite使用动态内存分配来获得 用于存储各种对象的内存 (例如&#xff1a…

Django开发复盘

一、URL 对于一个不会写正则表达式的蒟蒻来说,在urls.py中就只能傻傻的写死名字,但是即便这样,还会有很多相对路径和绝对路径的问题(相对ip端口的路径),因为我们网页中涉及到页面跳转,涉及到发送…

神经网络:梯度下降法更新模型参数

作者:CSDN _养乐多_ 在神经网络领域,梯度下降是一种核心的优化算法,本文将介绍神经网络中梯度下降法更新参数的公式,并通过实例演示其在模型训练中的应用。通过本博客,读者将能够更好地理解深度学习中的优化算法和损…

帆软报表在arm架构的linux

有朋友遇到一个问题在部署帆软报表时遇到报错。 问 我在 arm架构的linux服务器上部署帆软报表遇到了一个棘手的问题,你有空帮忙看下嘛。 我看后台日志报的错是 需要升级 gcc、libmawt.so ,是系统中缺少Tomcat需要的依赖库,你之前处理过类似…

超级会员卡积分收银系统源码:积分+收银+商城三合一小程序 带完整的安装代码包以及搭建教程

信息技术的迅猛发展,移动支付和线上购物已经成为现代人生活的常态。在这样的背景下,商家对于能够整合收银、积分管理和在线商城的综合性系统的需求日益强烈。下面,罗峰给大家分享一款超级会员卡积分收银系统源码,它集积分、收银、…

vector类(一)

文章目录 vector介绍和使用1.vector的介绍2.vector的使用2.1 vector的定义2.2 vector iterator的使用2.3 vector空间增长问题2.4 vector增删查改2.5 vector迭代器失效问题 3.vector 在OJ中的使用 vector介绍和使用 1.vector的介绍 vector是表示 可变大小数组的 序列容器。 就…

《数据结构学习笔记---第五篇》---链表OJ练习上

目录 CM11链表分割 OR36 链表的回文结构 160.相交链表 141&142环形链表 CM11链表分割 step1:思路分析 1.首先可以想到,我们可以将原链表的元素划分到两个新的链表之中,由于必须保持顺序,所以新链表我们要用尾插。 2.为了方便进行尾插我…

自动化与智能化并行:数字化运维体系助力企业腾飞

文章目录 文章目录 文章目录 一、引言二、数字化运维体系的核心要素三、构建数字化运维体系的策略四、数字化运维体系的实施与挑战主要内容读者对象 一、引言 随着信息技术的迅猛发展,数字化转型已成为企业提升竞争力、实现可持续发展的必由之路。在数字化转型的过…

JSP – 支持WORD上传的富文本编辑器

1.下载示例 https://gitee.com/xproer/zyoffice-tinymce5 2.引入组件 3.配置转换接口 效果 泽优Office文档转换服务(zyOffice) 功能:一键导入Word转HTML,不装控件,不装Office,任意平台兼容(Windows,macOS,Linux,安卓Android,苹果…

【GPU系列】选择最适合的 CUDA 版本以提高系统性能

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

nvm安装以后,node -v npm 等命令提示不是内部或外部命令

因为有vue2和vue3项目多种,所以为了适应各类版本node,使用nvm管理多种node版本,但是当我按教程安装nvm以后,nvm安装以后,node -v npm 等命令提示不是内部或外部命令 首先nvm官网网址:https://github.com/coreybutler/…

数据结构——栈(C语言版)

前言: 在学习完数据结构顺序表和链表之后,其实我们就可以做很多事情了,后面的栈和队列,其实就是对前面的顺序表和链表的灵活运用,今天我们就来学习一下栈的原理和应用。 准备工作:本人习惯将文件放在test.c…