ClickHouse表引擎之Integration系列

​ Integration系统表引擎主要用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。

1 Kafka

1.1 Kafka引擎

​ 将Kafka Topic中的数据直接导入到ClickHouse。

​ 语法如下:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],...
) ENGINE = Kafka()
SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_row_delimiter = 'delimiter_symbol',][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0]

​ 参数说明:

​ ①必需的参数

参数说明
kafka_broker_listKafka broker列表,以逗号分隔
kafka_topic_listKafka topic列表
kafka_group_nameKafka消费者组,如果不希望消息在集群中重复,使用相同的组名
kafka_format消息格式。使用与SQL格式函数相同的符号,例如JSONEachRow

​ ②可选参数

参数说明
kafka_row_delimiter分隔符字符,用于一行的结束标识符号
kafka_schema如果kafka_format参数需要schema定义,则通过该参数来支持
kafka_num_consumers每张表的消费者个数。默认值:1。如果一个使用者的吞吐量不足,则指定更多使用者。使用者的总数不应该超过主题中的分区数,因为每个分区只能分配一个使用者。
kafka_max_block_size轮询的最大批处理大小
kafka_skip_broken_messages忽略无效记录的条数。默认值:0
kafka_commit_every_batch在编写整个块之后提交每个使用和处理的批而不是单个提交(默认值:0)

​ 测试:(1)建表

  CREATE TABLE test_kafka (\timestamp UInt64,\level String,\message String\) ENGINE = Kafka() SETTINGS kafka_broker_list = 'ambari01:6667,ambari02:6667,ambari03:6667',\kafka_topic_list = 'test',\kafka_group_name = 'group1',\kafka_format = 'JSONEachRow',\kafka_row_delimiter = '\n'

​ 注意:如果后面在查询过程中报如下错误。是因为有些引擎版本存在的,消息中数据之间的分割符号未指定,导致无法处理。解决办法: 添加 kafka_row_delimiter = ‘\n’。

Cannot parse input: expected { before: \0: (at row 2)

​ (2)在kafka建立一个新的topic

sh kafka-topics.sh --create --zookeeper ambari01:2181,ambari02:2181,ambari03:2181 --replication-factor 1 --partitions 3 --topic test

​ (3)在kafka建立发布者console-producer

sh kafka-console-producer.sh --broker-list ambari01:6667,ambari02:6667,ambari03:6667 --topic test

​ (4)发送数据

{"timestamp":1515897460,"level":"one","message":"aa"}

​ 注意:由于一个kafka的partition 只能由一个 group consumer 消费,所以clickhouse 节点数需要大于 topic 的 partition 数。

​ (5)第一次查询

SELECT *
FROM test_kafka ┌──timestamp─┬─level─┬─message─┐
│ 1515897460 │ one   │ aa      │
└────────────┴───────┴─────────┘

​ (6)第二次查询

SELECT *
FROM test_kafka Ok.

​ 发现第二次查询的时候没有数据了,因为 Kafka引擎 表只是 kafka 流的一个视图而已,当数据被 select 了一次之后,这个数据就会被认为已经消费了,下次 select 就不会再出现。所以Kafka表单独使用是没什么用的,一般是用来和 MaterialView 配合,将Kafka表里面的数据自动导入到 MaterialView 里面。

​ (7)与 MaterialView 集成

​ 我们现在每一节点建一个 MaterialView 保存 Kafka 里面的数据, 再建一个全局的Distributed表。

CREATE MATERIALIZED VIEW test_kafka_view ENGINE = SummingMergeTree() PARTITION BY day ORDER BY  (day, level) AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as count FROM test_kafka GROUP BY day, level;

​ (6)再次发送数据

{"timestamp":1515897461,"level":"2","message":'bb'}
{"timestamp":1515897462,"level":"3","message":'cc'}
{"timestamp":1515897462,"level":"3","message":'ee'}
{"timestamp":1515897463,"level":"4","message":'dd'}

​ (7)查询数据

SELECT *
FROM test_kafka Ok.0 rows in set. Elapsed: 2.686 sec. 
---------------------------------------
SELECT *
FROM test_kafka_view Ok.0 rows in set. Elapsed: 0.002 sec.

​ 发现没有数据,原因:kafka 引擎默认消费根据条数与时间进行入库,不然肯定是没效率的。其中对应的参数有两个。 max_insert_block_size(默认值为: 1048576),stream_flush_interval_ms(默认值为: 7500)这两个参数都是全局性的。

​ 业务系统需要从kafka读取数据,按照官方文档建好表后,也能看到数据,但是延时很高。基本要延时15分钟左右。kafka的数据大约每秒50条左右。基本规律是累计到65535行以后(最小的块大小)才会在表中显示数据。尝试更改stream_flush_interval_ms 没有作用,但是有不想改max_block_size,因为修改以后影响到全局所有表,并且影响搜索效率。希望能每N秒保证不管block有没有写满都flush一次。

​ 虽然ClickHouse和 Kafka的配合可以说是十分的便利,只有配置好,但是相当的局限性对 kafka 数据格式的支持也有限。下面介绍WaterDrop这个中间件将Kafka的数据接入ClickHouse。

1.2 WaterDrop

​ WaterDrop: 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark和 Apache Flink之上。github地址:https://github.com/InterestingLab/waterdrop

​ ①下载并解压

wget https://github.com/InterestingLab/waterdrop/releases/download/v1.4.3/waterdrop-1.4.3.zip
unzip waterdrop-1.4.3.zip

​ ②修改配置文件waterdrop-env.sh

vim /opt/module/waterdrop-1.4.3/config/waterdrop-env.sh
SPARK_HOME=/usr/jdp/3.2.0.0-108/spark2  #配置为spark的路径

​ ③增加配置文件test.conf

spark {spark.streaming.batchDuration = 5spark.app.name = "test_waterdrop"spark.ui.port = 14020spark.executor.instances = 3spark.executor.cores = 1spark.executor.memory = "1g"
}input {kafkaStream  {topics = "test_wd"consumer.bootstrap.servers = "10.0.0.50:6667,10.0.0.52:6667,10.0.0.53:6667"consumer.zookeeper.connect = "10.0.0.50:2181,10.0.0.52:2181,10.0.0.53:2181"consumer.group.id = "group1"consumer.failOnDataLoss = falseconsumer.auto.offset.reset = latestconsumer.rebalance.max.retries = 100}
}
filter {json{source_field = "raw_message"}
}output {clickhouse {host = "10.0.0.50:8123"database = "test"table = "test_wd"fields = ["act","b_t","s_t"]username = "admin"password = "admin"retry_codes = [209, 210 ,1002]retry = 10bulk_size = 1000}
}

​ ④创建Clickhouse表

create table test.test_wd( act String, b_t String, s_t Date) ENGINE = MergeTree() partition by s_t order by s_t;

​ ⑤启动写入程序

cd /data/work/waterdrop-1.4.1
sh /opt/module/waterdrop-1.4.3/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/module/waterdrop-1.4.3/config/test.conf

​ ⑥插入数据

{"act":"aaaa","b_t":"100","s_t":"2019-12-22"}
{"act":"bxc","b_t":"200","s_t":"2020-01-01"}
{"act":"dd","b_t":"50","s_t":"2020-02-01"}

​ ⑦查看表数据

SELECT *
FROM test_wd ┌─act─┬─b_t─┬────────s_t─┐
│ dd  │ 50  │ 2020-02-01 │
└─────┴─────┴────────────┘
┌─act──┬─b_t─┬────────s_t─┐
│ aaaa │ 100 │ 2019-12-22 │
└──────┴─────┴────────────┘
┌─act─┬─b_t─┬────────s_t─┐
│ bxc │ 200 │ 2020-01-01 │
└─────┴─────┴────────────┘

2 MySQL

​ 将Mysql作为存储引擎,可以对存储在远程 MySQL 服务器上的数据执行 select查询

​ 语法:

MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

​ 参数说明

参数说明
host:portMySQL 服务器地址
database数据库的名称
table表名称
user数据库用户
password用户密码
replace_query将 INSERT INTO 查询是否替换为 REPLACE INTO 的标志。如果 replace_query=1,则替换查询
on_duplicate_clause将 ON DUPLICATE KEY UPDATE on_duplicate_clause 表达式添加到 INSERT 查询语句中。

​ 测试:

​ 在Mysql中建表,并插入数据

CREATE TABLE `user` (`id` int(11) NOT NULL,`username` varchar(50) DEFAULT NULL,`sex` varchar(5) DEFAULT NULL
)INSERT INTO user values(11,"zs","0");
INSERT INTO user values(12,"ls","0");
INSERT INTO user values(13,"ww","0");
INSERT INTO user values(14,"ll","1");

​ 创建ClickHouse表,insert_time字段为默认字段

CREATE TABLE test.from_mysql(\id UInt64,\username String,\sex String,\insert_time Date DEFAULT toDate(now())\
) ENGINE = MergeTree()\
PARTITION BY insert_time \
ORDER BY (id,username)

​ 插入数据

INSERT INTO test.from_mysql (id,username,sex) SELECT id, username,sex FROM mysql('10.0.0.42:3306','test', 'user', 'root', 'admin');

​ 查询数据

SELECT *
FROM from_mysql ┌─id─┬─username─┬─sex─┬─insert_time─┐
│ 11 │ zs       │ 0   │  2020-05-24 │
│ 12 │ ls       │ 0   │  2020-05-24 │
│ 13 │ ww       │ 0   │  2020-05-24 │
│ 14 │ ll       │ 1   │  2020-05-24 │
└────┴──────────┴─────┴─────────────┘4 rows in set. Elapsed: 0.003 sec. 

3 HDFS

​ 用户通过执行SQL语句,可以在ClickHouse中直接读取HDFS的文件,也可以将读取的数据导入到ClickHouse本地表。

​ HDFS引擎:ENGINE = HDFS(URI, format)。URI:HDFS的URI,format:存储格式,格式链接https://clickhouse.tech/docs/en/interfaces/formats/#formats

3.1 查询文件

​ 这种使用场景相当于把HDFS做为ClickHouse的外部存储,当查询数据时,直接访问HDFS的文件,而不是把HDFS文件导入到ClickHouse再进行查询。相对于ClickHouse的本地存储查询,速度较慢。

​ 在HDFS上新建一个数据文件:user.csv,上传hadoop fs -cat /user/test/user.csv,内容如下:

1,zs,18
2,ls,19
4,wu,25
3,zl,22

​ 在ClickHouse上创建一个访问user.csv文件的表:

CREATE TABLE test_hdfs_csv(\id UInt64,\name String,\age UInt8\
)ENGINE = HDFS('hdfs://ambari01:8020/user/test/user.csv', 'CSV')

​ 查询hdfs_books_csv表

SELECT *
FROM test_hdfs_csv ┌─id─┬─name─┬─age─┐
│  1 │ zs   │  18 │
│  2 │ ls   │  19 │
│  4 │ wu   │  25 │
│  3 │ zl   │  22 │
└────┴──────┴─────┘

3.2 从HDFS导入数据

​ 从HDFS导入数据,数据在ClickHouse本地表,建本地表

CREATE TABLE test_hdfs_local(\id UInt64,\name String,\age UInt8\
)ENGINE = Log

​ 在数据存储目录下可以找到这个表的文件夹

/data/clickhouse/data/test/test_hdfs_local

​ 从HDFS导入数据

INSERT INTO test_hdfs_local SELECT * FROM test_hdfs_csv

​ 查询

SELECT *
FROM test_hdfs_local ┌─id─┬─name─┬─age─┐
│  1 │ zs   │  18 │
│  2 │ ls   │  19 │
│  4 │ wu   │  25 │
│  3 │ zl   │  22 │
└────┴──────┴─────┘

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473611.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 756. 金字塔转换矩阵(回溯)

文章目录1. 题目2. 解题1. 题目 现在,我们用一些方块来堆砌一个金字塔。 每个方块用仅包含一个字母的字符串表示。 使用三元组表示金字塔的堆砌规则如下: 对于三元组(A, B, C) ,“C”为顶层方块,方块“A”、“B”分别作为方块“…

Flask框架项目实例:**租房网站(一)

Flask是一款MVC框架,主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识,通过完成作者-读书功能,先来熟悉Flask框架的完整使用步骤。 操作步骤为: 1.创建项目2.配置数据库3.定义模型类4.定义视图并配置URL 5.定义模板…

mysql中datetime比较大小问题 (转的)

方法一&#xff1a; 你也可以&#xff1a;select * from t1 where unix_timestamp(time1) > unix_timestamp(2011-03-03 17:39:05) and unix_timestamp(time1) < unix_timestamp(2011-03-03 17:39:52);就是用unix_timestamp函数&#xff0c;将字符型的时间&#xff0c;转…

ClickHouse常见问题及其解决方案

1 概述 在对ClickHouse进行分布表复制表zookeeper保证高可用的情况下进行性能测试时遇到如下坑&#xff0c;进行整理 2 分布表join问题Unknown identifier: LO_CUSTKEY, context:… 1.1 问题描述 SQL如下&#xff1a; SELECT count(1) FROM performance.line_all AS c LEFT…

Python中单引号,双引号,3个单引号及3个双引号的区别

单引号和双引号 在Python中我们都知道单引号和双引号都可以用来表示一个字符串&#xff0c;比如 [python] view plaincopy str1 python str2 "python" str1和str2是没有任何区别的。我们知道Python以其易用性而著名&#xff0c;所以刚开始看教程学习看到单引号…

LeetCode 316. 去除重复字母 / 1081. 不同字符的最小子序列(单调栈)

文章目录1. 题目2. 解题1. 题目 LC 316&#xff1a; 给你一个字符串 s &#xff0c;请你去除字符串中重复的字母&#xff0c;使得每个字母只出现一次。需保证 返回结果的字典序最小&#xff08;要求不能打乱其他字符的相对位置&#xff09;。 示例 1&#xff1a; 输入&#…

VSS 请求程序和 SharePoint 2013

Windows Server 中的 VSS 可用于创建可备份和还原 Microsoft SharePoint Foundation 的应用程序。VSS 提供了一个基础结构&#xff0c;使第三方存储管理程序、业务程序&#xff0c;以及硬件提供程序进行合作&#xff0c;以创建和管理卷影副本。基于此基础结构的解决方案可以使用…

Confluent介绍及其使用

1 confluent介绍 Confluent是用来管理和组织不同数据源的流媒体平台&#xff0c;可以实时地把不同源和位置的数据集成到一个中心的事件流平台。并且很可靠、性能很高。 Confluent目前提供了社区版&#xff08;免费&#xff09;和商业版&#xff08;收费&#xff09;两个版本&…

如何使用 Pylint 来规范 Python 代码风格

Pylint 是什么 Pylint 是一个 Python 代码分析工具&#xff0c;它分析 Python 代码中的错误&#xff0c;查找不符合代码风格标准&#xff08;Pylint 默认使用的代码风格是 PEP 8&#xff0c;具体信息&#xff0c;请参阅参考资料&#xff09;和有潜在问题的代码。目前 Pylint 的…

LeetCode 809. 情感丰富的文字

文章目录1. 题目2. 解题1. 题目 有时候人们会用重复写一些字母来表示额外的感受&#xff0c;比如 "hello" -> "heeellooo", "hi" -> "hiii"。 我们将相邻字母都相同的一串字符定义为相同字母组&#xff0c;例如&#xff1a;&qu…

confluent connect写出到ES及ClickHouse

1 连接Elasticsearch测试 1.1 启动confluent /home/kafka/.local/confluent/bin/confluent start This CLI is intended for development only, not for production https://docs.confluent.io/current/cli/index.htmlUsing CONFLUENT_CURRENT: /tmp/confluent.swpIapNw Sta…

tomcat内存溢出问题解决思路

1、修改启动时内存参数、并指定JVM时区 &#xff08;在windows server 2008 下时间少了8个小时&#xff09;在Tomcat上运行j2ee项目代码时&#xff0c;经常会出现内存溢出的情况&#xff0c;解决办法是在系统参数中增加系统参数&#xff1a; window下&#xff0c; 在catalina.b…

网站部署nginx--uwsgi

网站代码写完之后就是项目部署&#xff0c;主要包括两个方面&#xff1a; 1.nginx安装与配置&#xff1a; 1、Nginx 安装 系统平台&#xff1a;CentOS release 6.6 (Final) 64位。 一、安装编译工具及库文件 yum -y install make zlib zlib-devel gcc-c libtool openssl open…

天池 在线编程 滑动数独(滑动窗口)

文章目录1. 题目2. 解题1. 题目 描述 给定一个 3xn的矩阵 number&#xff0c;并且该矩阵只含有1到9的正整数。 考虑有一个大小为 3x3 滑动窗口&#xff0c;从左到右遍历该矩阵 number&#xff0c; 那么该滑动窗口在遍历整个矩阵的过程中会有n-2个。 现在你的任务是找出这些滑…

TIGK监控平台介绍

1 概述 众所周知监控平台对大数据平台是非常重要的&#xff0c;监控是故障诊断和分析的重要辅助利器&#xff0c;在发生事故之前就能预警&#xff0c;最大限度降低系统故障率。   监控系统我们可以分为业务层面&#xff0c;应用层面&#xff0c;系统层面 1.1 业务层面 业务系…

有意思的网站

谱聚类 http://blog.pluskid.org/?p287 Qt Graphics View 框架 http://yleesun.blog.163.com/blog/static/2941340220096110165817/ 谷歌编码规范 http://google-styleguide.googlecode.com/svn/trunk/cppguide.xml 匈牙利命名法 http://blog.csdn.net/buglu/article/details/…

天池 在线编程 队列检查(排序)

文章目录1. 题目2. 解题1. 题目 描述 班上的学生根据他们的年级照片的身高升序排列&#xff0c;确定当前未站在正确位置的学生人数 数组长度 < 10^5示例 输入: heights [1,1,3,3,4,1]输出: 3解释: 经过排序后 heights变成了[1,1,1,3,3,4]&#xff0c;有三个学生不在应在…

celery异步执行任务在Django中的应用实例

1. 创建django项目celery_demo, 创建应用demo: django-admin startproject celery_demo python manage.py startapp demo2.在celery_demo模块中创建celery.py模块, 文件目录为: celery.py模块内容为: from celery import Celery from django.conf import settings import os#…

Spring自学教程-注解的使用(三)

一、java中的注解定义注解下面是一个定义注解的实例。Target(ElementType.TYPE)Retention(RetentionPolicy.RUNTIME)DocumentedInheritedpublic interface Description { String value();}其中的interface是一个关键字&#xff0c;在设计annotations的时候必须把一个类型定义为…

Django单元测试

一.前言/准备 测Django的东西仅限于在MTV模型。哪些可以测&#xff1f;哪些不可以。 1.html里的东西不能测。①Html里的HTML代码大部分都是写死的②嵌套在html中的Django模板语言也不能测&#xff0c;即使有部分逻辑。 但写测试用例时至少要调用一个类或者方法。模板语言没有出…