探索ClickHouse——连接Kafka和Clickhouse

安装Kafka

新增用户

sudo adduser kafka
sudo adduser kafka sudo
su -l kafka

安装JDK

sudo apt-get install openjdk-8-jre

下载解压kafka

可以从https://downloads.apache.org/kafka/下找到希望安装的版本。需要注意的是,不要下载路径包含src的包,否则会报“Classpath is empty”之类的错误。

mkdir ~/Downloads
curl "https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz" -o ~/Downloads/kafka.tgz
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/Downloads/kafka.tgz --strip 1

配置

配置kafka

vim ~/kafka/config/server.properties

将下面这行加入文件的末尾

# ~/kafka/config/server.properties
delete.topic.enable=true

同时修改log的路径

# ~/kafka/config/server.properties
log.dirs=/home/kafka/logs

创建zookeeper service

sudo vim /etc/systemd/system/zookeeper.service

将下面内容填入上述文件中

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

创建kafka service

sudo vim /etc/systemd/system/kafka.service

将下面内容填入上述文件中

[Unit]
Requires=zookeeper.service
After=zookeeper.service[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal[Install]
WantedBy=multi-user.target

启动kafka#

启动服务

sudo systemctl start kafka

查看状态

sudo systemctl status kafka

● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-09-28 03:09:39 UTC; 4s ago
Main PID: 3561758 (sh)
Tasks: 42 (limit: 2143)
Memory: 292.4M
CPU: 2.768s
CGroup: /system.slice/kafka.service
├─3561758 /bin/sh -c “/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1”
└─3561760 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/>
Sep 28 03:09:39 ubuntua systemd[1]: Started kafka.service.

可以看到kafka已经处于running状态。

测试

创建Topic

~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

发送消息

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

订阅Topic

新启动一个界面,执行下面命令

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

它会收到上面发的消息

Hello, World

连接

创建表

使用kafka engine将kafka中的流映射到一个表中。我们以《探索ClickHouse——使用Projection加速查询》中的数据为例。

clickhouse-client --stream_like_engine_allow_direct_select 1
CREATE TABLE uk_price_paid_from_kafka (uuid_string String, price_string String, time String, postcode String, a String, b String, c String, addr1 String, addr2 String, street String, locality String, town String, district String, county String, d String, e String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list='TutorialTopic', kafka_group_name='clickhouse', kafka_format='CSV', kafka_skip_broken_messages=1, kafka_num_consumers=1;

CREATE TABLE uk_price_paid_from_kafka
(
uuid_string String,
price_string String,
time String,
postcode String,
a String,
b String,
c String,
addr1 String,
addr2 String,
street String,
locality String,
town String,
district String,
county String,
d String,
e String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = ‘localhost:9092’, kafka_topic_list = ‘TutorialTopic’, kafka_group_name = ‘clickhouse’, kafka_format = ‘CSV’, kafka_skip_broken_messages = 1, kafka_num_consumers = 1
Query id: 07a063e9-6a61-42c0-8fec-1fe2f119ee28
Ok.
0 rows in set. Elapsed: 0.008 sec.

给kafka发送消息

~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic

进入消息输入模式,发送下面两个消息

"{F887F88E-7D15-4415-804E-52EAC2F10958}","70000","1995-07-07 00:00","MK15 9HP","D","N","F","31","","ALDRICH DRIVE","WILLEN","MILTON KEYNES","MILTON KEYNES","MILTON KEYNES","A","A"
"{40FD4DF2-5362-407C-92BC-566E2CCE89E9}","44500","1995-02-03 00:00","SR6 0AQ","T","N","F","50","","HOWICK PARK","SUNDERLAND","SUNDERLAND","SUNDERLAND","TYNE AND WEAR","A","A"

在这里插入图片描述

Clickhouse收到消息

在clickhouse-client交互终端中执行下面指令:

select * from uk_price_paid_from_kafka;

在这里插入图片描述
可以看到之前发送给kafka Topic的内容在Clickhouse中被收到了。

问题

后面我再在clickhouse-client交互终端中查询不到数据了。即使我们给kafka该主题发消息,也查询不到。后面我们再将《探索ClickHouse——使用MaterializedView存储kafka传递的数据》中讲解使用MaterializedView清洗和固化kafka的数据。

参考资料

  • https://openjdk.org/install/
  • https://kafka.apache.org/quickstart
  • https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04#step-2-mdash-downloading-and-extracting-the-kafka-binaries
  • https://cloud.tencent.com/developer/article/1892086
  • https://sineyuan.github.io/post/clickhouse-kafka/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/92156.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最新ChatGPT网站系统源码+支持GPT4.0+支持AI绘画Midjourney绘画+支持国内全AI模型

一、SparkAI创作系统 SparkAi系统是基于很火的GPT提问进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT系统?小编这里写一个详细图文教程吧&a…

CCF CSP认证 历年题目自练Day18

CCF CSP认证 历年题目自练Day18 题目一 试题编号: 201809-1 试题名称: 卖菜 时间限制: 1.0s 内存限制: 256.0MB 问题描述: 问题描述   在一条街上有n个卖菜的商店,按1至n的顺序排成一排,这…

Apollo自动驾驶系统概述(文末参与活动赠送百度周边)

前言 「作者主页」:雪碧有白泡泡 「个人网站」:雪碧的个人网站 「推荐专栏」: ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄,vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

大喜国庆,聊聊我正式进入职场的这三个月...

个人简介 👀个人主页: 前端杂货铺 🙋‍♂️学习方向: 主攻前端方向,正逐渐往全干发展 📃个人状态: 研发工程师,现效力于中国工业软件事业 🚀人生格言: 积跬步…

基础数据结构之——【顺序表】(上)

从今天开始更新数据结构的相关内容。(我更新博文的顺序一般是按照我当前的学习进度来安排,学到什么就更新什么(简单来说就是我的学习笔记),所以不会对一个专栏一下子更新到底,哈哈哈哈哈哈哈!&a…

八个不可不知的SQL高级方法

结构化查询语言(SQL)是一种广泛使用的工具,用于管理和操作数据库。基本的SQL查询简单易学,但掌握高级SQL技术可以将您的数据分析和管理能力提升到新的高度。 高级SQL技术是指一系列功能和函数,使您能够对数据执行复杂…

【day10.01】使用select实现服务器并发

用select实现服务器并发&#xff1a; linuxlinux:~/study/1001$ cat server.c #include <myhead.h>#define ERR_MSG(msg) do{\printf("%d\n",__LINE__);\perror(msg);\ }while(0)#define PORT 8880#define IP "192.168.31.38"int main(int argc, c…

【C/C++笔试练习】二维数组、二维数组的访问,解引用,地址计算、计算糖果、进制转换

文章目录 C/C笔试练习1.二维数组&#xff08;1&#xff09;二维数组的访问&#xff08;2&#xff09;二维数组的初始化&#xff08;3&#xff09;二维数组的解引用&#xff08;4&#xff09;二维数组的解引用&#xff08;5&#xff09;多维数组的解引用&#xff08;6&#xff0…

Blued引流脚本

于多数人来说&#xff0c;引流都是一个比较困难的操作&#xff0c;因为流量不会听你的。所以任何人在网上做生意&#xff0c;或者开一个实体店&#xff0c;都会为流量而发愁&#xff0c;其实对于流量的吸引来说&#xff0c;我们越是刻意为之&#xff0c;可能所获得的效果也越不…

Go结构体深度探索:从基础到应用

在Go语言中&#xff0c;结构体是核心的数据组织工具&#xff0c;提供了灵活的手段来处理复杂数据。本文深入探讨了结构体的定义、类型、字面量表示和使用方法&#xff0c;旨在为读者呈现Go结构体的全面视角。通过结构体&#xff0c;开发者可以实现更加模块化、高效的代码设计。…

【Android】安卓手机系统内置应用安装失败解决方案

现有的闲置手机有个内置app可老旧了&#xff0c;没有开发者维护&#xff0c;于是问题不断&#xff0c;影响了体验&#xff0c;后来在网上查找发现有它的新版本&#xff0c;想要更新却没有自动更新&#xff08;后台服务断开了&#xff09;&#xff0c;有类似的想法可以来这里了解…

Python学习之索引与切片

Python学习之索引与切片 s “0abcdefghijklmnopqrstuvwxyz”&#xff0c;第一个元素‘0’&#xff0c;索引号为0&#xff0c;最后一个元素‘z’&#xff0c;索引号为26 1. s[0]获取索引号为0的元素 2. s[1:3]获取索引号为1的元素&#xff0c;直到但不包括索引号为3的元素。即…

buuctf-[Zer0pts2020]Can you guess it?

点击source&#xff0c;进入源代码 <?php include config.php; // FLAG is defined in config.phpif (preg_match(/config\.php\/*$/i, $_SERVER[PHP_SELF])) {exit("I dont know what you are thinking, but I wont let you read it :)"); }if (isset($_GET[so…

RandomAccessFile实现断点续传

断点续传是指在文件传输过程中&#xff0c;当传输中断或失败时&#xff0c;能够恢复传输并继续从上次中断的位置继续传输。 RandomAccessFile类 RandomAccessFile是Java提供的一个用于文件读写的类&#xff0c;它可以对文件进行随机访问&#xff0c;即可以直接跳转到文件的任意…

IntelliJ IDEA 控制台中文乱码的四种解决方法

前言 IntelliJ IDEA 如果不进行配置的话&#xff0c;运行程序时控制台有时候会遇到中文乱码&#xff0c;中文乱码问题非常严重&#xff0c;甚至影响我们对信息的获取和程序的跟踪。开发体验非常不好。 本文中我总结出四点用于解决控制台中文乱码问题的方法&#xff0c;希望有助…

scrapy爬取图片

文章目录 ImagesPipeline使用步骤&#xff1a;1. 数据解析&#xff1a; 获取图片的地址 & 2. 将存储图片地址的item提交到指定的管道类&#xff08;hotgirls.py&#xff09;3. 在管道文件中自制一个基于ImagesPipeLine的一个管道类&#xff01;&#xff01;天大的坑 &#…

ChatGPT Prompting开发实战(十二)

一、如何开发prompts实现个性化的对话方式 通过设置“system”和“user”等roles&#xff0c;可以实现个性化的对话方式&#xff0c;并且可以结合参数“temperature”的设定来差异化LLM的输出内容。在此基础上&#xff0c;通过构建一个餐馆订餐对话机器人来具体演示对话过程。…

git你学“废”了吗?——git撤销操作指令详解

git你学“废”了吗&#xff1f;——git撤销操作指令详解&#x1f60e; 前言&#x1f64c;撤销的本质撤销修改情况一&#xff1a;撤销工作区的修改方式一&#xff1a;方式二&#xff1a;演示截图&#xff1a; 撤销修改情况二&#xff1a;撤销暂存区和工作区的修改操作截图&#…

为什么字节大量用GO而不是Java?

见字如面&#xff0c;我是军哥。 我看很多程序员对字节编程语言选型很好奇&#xff0c;为此我还特地问了在字节的两位4-1的技术大佬朋友&#xff0c;然后加上自己的思考&#xff0c;总结了一下就以下 2 个原因&#xff1a; 1、 选型上没有历史包袱 字节的早期的程序员大多来自于…

CISSP学习笔记:PKI和密码学应用

第七章 PKI和密码学应用 7.1 非对称密码学 对称密码系统具有共享的秘钥系统&#xff0c;从而产生了安全秘钥分发的问题非对称密码学使用公钥和私钥对&#xff0c;无需支出复杂密码分发系统 7.1.1 公钥与私钥 7.1.2 RSA&#xff08;兼具加密和数字签名&#xff09; RSA算法…