15_基于Flink将pulsar数据写入到ClickHouse

3.8.基于Flink将数据写入到ClickHouse

编写Flink完成数据写入到ClickHouse操作, 后续基于CK完成指标统计操作

3.8.1.ClickHouse基本介绍

ClickHouse 是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。
在这里插入图片描述
结论: ClickHouse像很多OLAP数据库一样,单表查询速度由于关联查询,而且ClickHouse的两者差距更为明显。

3.8.2.ClickHouse安装步骤

本项目中,我们仅需要安装单机测试版本即可使用(node2安装), 在实际生产中, 大家可以直接将分布式集群版本

  • 1-设置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
  • 2- 直接基于yum安装即可
sudo yum install clickhouse-server clickhouse-client
  • 3-修改配置文件
vim /etc/clickhouse-server/config.xml 
修改178行: 打开这一行的注释 
<listen_host>::</listen_host>

在这里插入图片描述

  • 4-启动clickhouse的server
systemctl start clickhouse-server 
停止:
systemctl stop clickhouse-server 
重启
systemctl restart clickhouse-server
  • 5-进入客户端
    在这里插入图片描述

3.8.3.在ClickHouse中创建目标表

create database itcast_ck; 
use itcast_ck; 
create table itcast_ck.itcast_ck_ems( 
id int, 
sid varchar(128), 
ip varchar(128), 
create_time varchar(128), 
session_id varchar(128), 
yearInfo varchar(128), 
monthInfo varchar(128), 
dayInfo varchar(128), 
hourInfo varchar(128), 
seo_source varchar(128), 
area varchar(128), 
origin_channel varchar(128), 
msg_count int(128), 
from_url varchar(128), 
PRIMARY KEY (`id`) 
) ENGINE=ReplacingMergeTree();

3.8.4.编写Flink代码完成写入到CK操作

import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;import java.sql.Types;
import java.util.Properties;// 基于Flink完成读取Pulsar中数据将消息数据写入到clickhouse中
public class ItcastFlinkToClickHouse {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2. 添加Source组件, 从Pulsar中读取消息数据Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2  转换数据操作: 将 PulsarTopicPojo 转换为ROW对象SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {@Overridepublic Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());}});//2.3: 设置sink操作写入到CK操作String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder().setDrivername("ru.yandex.clickhouse.ClickHouseDriver").setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck").setQuery(insertSql).setBatchSize(1).setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR).build();tableSink.emitDataStream(rowDataSteam);//3. 提交执行env.execute("itcast_to_ck");}
}

3.9.HBase对接Phoenix实现即席查询

3.9.1.Phoenix安装操作

Phoenix是属于apache旗下的一款基于hbase的工具, 此工具提供一种全新的方式来操作hbase中数据(SQL),
同时Phoenix对hbase进行大量的优化工作, 能够让我们更加有效的操作hbase

整个安装操作, 大家可以参考资料中安装手册, 进行安装即可

3.9.2.在Phoenix中创建表

create view "itcast_h_ems" ( 
"id" integer primary key, 
"f1"."sid" varchar, 
"f1"."ip" varchar, 
"f1"."create_time" varchar, 
"f1"."session_id" varchar, 
"f1"."yearInfo" varchar, 
"f1"."monthInfo" varchar, 
"f1"."dayInfo" varchar, 
"f1"."hourInfo" varchar, 
"f1"."seo_source" varchar, 
"f1"."area" varchar, 
"f1"."origin_channel" varchar, 
"f1"."msg_count" integer, 
"f1"."from_url" varchar 
);

3.9.3.在Phoenix中类型说明

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28421.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

K最近邻算法:简单高效的分类和回归方法(三)

文章目录 &#x1f340;引言&#x1f340;训练集和测试集&#x1f340;sklearn中封装好的train_test_split&#x1f340;超参数 &#x1f340;引言 本节以KNN算法为主&#xff0c;简单介绍一下训练集和测试集、超参数 &#x1f340;训练集和测试集 训练集和测试集是机器学习和深…

【LeetCode】数据结构题解(11)[用队列实现栈]

用队列实现栈 &#x1f609; 1.题目来源&#x1f440;2.题目描述&#x1f914;3.解题思路&#x1f973;4.代码展示 所属专栏&#xff1a;玩转数据结构题型❤️ &#x1f680; >博主首页&#xff1a;初阳785❤️ &#x1f680; >代码托管&#xff1a;chuyang785❤️ &…

C语言文件操作基本方法

1、文件的分类 ANSI C 的缓冲文件系统 缓冲文件系统 缓冲文件系统是指&#xff0c;系统自动地在内存区为每个正在使用的文件开辟一个缓冲区。 从内存向磁盘输出数据时&#xff0c;必须首先输出到缓冲区中。待缓冲区装满后&#xff0c;再一起输出到磁盘文件中。 从磁盘文件向内…

Python爬虫的Selenium(学习于b站尚硅谷)

目录 一、Selenium  1.为什么要学习Selenium  &#xff08;1&#xff09;什么是Selenium  &#xff08;2&#xff09;为什么使用selenium?  &#xff08;3&#xff09;代码演示 2. selenium的基本使用  &#xff08;1&#xff09;如何安装selenium  &#xff08;2…

探析STM32标准库与HAL库之间的差异与优劣

引言&#xff1a; 在嵌入式开发领域&#xff0c;STMicroelectronics的STM32系列芯片广受欢迎。STM32提供了两种主要的软件库&#xff0c;即标准库和HAL库&#xff0c;用于开发各种应用。本文将探讨这两种库之间的差异&#xff0c;比较它们的优劣&#xff0c;并分析在选择库时需…

数仓架构模型设计参考

1、数据技术架构 1.1、技术架构 1.2、数据分层 将数据仓库分为三层&#xff0c;自下而上为&#xff1a;数据引入层&#xff08;ODS&#xff0c;Operation Data Store&#xff09;、数据公共层&#xff08;CDM&#xff0c;Common Data Model&#xff09;和数据应用层&#xff…

Android 数据库之GreenDAO

GreenDAO 是一款开源的面向 Android 的轻便、快捷的 ORM 框架&#xff0c;将 Java 对象映射到 SQLite 数据库中&#xff0c;我们操作数据库的时候&#xff0c;不再需要编写复杂的 SQL语句&#xff0c; 在性能方面&#xff0c;greenDAO 针对 Android 进行了高度优化&#xff0c;…

IPWorks S3 Delphi Edition Crack

IPWorks S3 Delphi Edition Crack IPWorksS3使集成基于云的文件存储变得容易。易于使用的组件可用于与任何S3兼容的存储提供商集成&#xff0c;如Amazon S3、Digital Ocean Spaces、Wasabi、Backblaze B2、IBM Cloud Object storage、Oracle Cloud、Linode等。强大的客户端加密…

springboot+vue智能化网络电子相册图片管理系统_84ds3

随着计算机技术发展&#xff0c;计算机系统的应用已延伸到社会的各个领域&#xff0c;大量基于网络的广泛应用给生活带来了十分的便利。所以把智能化电子相册与现在网络相结合&#xff0c;利用计算机搭建智能化电子相册系统&#xff0c;实现智能化电子相册的信息化。则对于进一…

01_什么是ansible、基本架构、ansible工作机制、Ansible安装、配置主机清单、设置SSH无密码登录等

1.什么是ansible 1.1.基本介绍 1.2.基本架构 1.3.基本特征 1.4.优点 1.5.ansible工作机制 2.Ansible安装 2.1.机器准备 2.2.安装ansible 2.2.1.安装epel源 2.2.2.安装ansible 2.2.3.查看ansible版本 2.2.4.树状结构展示文件夹 2.2.4.1.其中ansible.cfg的内容如下 2.2.4.2.host的…

jmeter 二次开发详解

目录 背景&#xff1a; 自定义 BeanShell 功能 自定义请求编写&#xff08;Java Sampler&#xff09; 实现 Java Sampler 功能的两种方式 案例&#xff1a;使用 JavaSampler 重写 HTTP 的 POST 请求 自定义函数助手 背景&#xff1a; JMeter 是一个功能强大的性能测试工具…

The ‘kotlin-android-extensions‘ Gradle plugin is no longer supported.

Android使用kotlin开发&#xff0c;运行报错 The kotlin-android-extensions Gradle plugin is no longer supported. Please use this migration guide (https://goo.gle/kotlin-android-extensions-deprecation) to start working with View Binding (https://developer.an…

linux umask 详解

1. umask 定义 在 linux 系统中&#xff0c;umask 被定义在 /etc/profile 配置文件中&#xff0c;有一段 shell 脚本对 umask 是这么定义的。在 shell 会话输入命令&#xff1a; $ cat /etc/profile # 查看 /etc/profile 配置文件的内容 if [ $UID -gt 199 ] &&…

Vue3中v-model在原生元素和自定义组件上的使用

目录 前言 一、原生元素上的用法 1. 输入框(input) 2. 多行文本域(textarea) 3. 单选按钮(radio) 4. 多选框(checkbox) 5. 下拉选择框(select) 二、自定义组件上的用法 1. 定义一个名为 modelValue 的 props 属性和一个名为 update:modelValue 的事件 2.使用一个可…

网络编程 tcp udp http编程流程 网络基础知识

讲解 网络基础知识网络编程tcp编程流程图示理解bind和accept函数理解监视套接字和链接套接字理解linux和window下的编程实现tcp特点 udp编程流程图示理解udp特点 http编程流程图示理解编程实现-网站服务器 网络基础知识 OSI分层&#xff1a;应用层 表示层 会话层 传输层 网络层…

springBoot集成caffeine,自定义缓存配置 CacheManager

目录 springboot集成caffeine Maven依赖 配置信息&#xff1a;properties文件 config配置 使用案例 Caffeine定制化配置多个cachemanager springboot集成redis并且定制化配置cachemanager springboot集成caffeine Caffeine是一种基于服务器内存的缓存库。它将数据存储在…

【MongoDB】解决ProxmoxVE下CentOS7虚拟机安装MongoDB6后启动失败的问题

目录 安装步骤&#xff1a; 2.1 配置yum源 2.2 安装MongoDB 2.3 启 动MongoDB ProxmoxVE上新装的CentOS7.4虚拟机&#xff0c;安装MongoDB6。 安装步骤&#xff1a; 2.1 配置yum源 # 创建mongodb yum源&#xff08;https://www.mongodb.com/docs/manual/tutorial/insta…

clickhouse调研报告2

由Distributed表发送分片数据 clickhouse分区目录合并 clickhouse副本协同流程 clickhouse索引查询逻辑 clickhouse一级索引生成逻辑(两主键) clickhouse的data目录下包含如下目录: [root@brfs-stress-01 201403_10_10_0]# ll /data01/clickhouse/data total 4 drwxr-x---…

【Vue3】插槽全家桶

插槽&#xff08;Slots&#xff09;是 Vue.js 框架中的一个功能&#xff0c;允许在组件内部预留一些可替换的内容。通过插槽&#xff0c;可以给父组件填充模板代码&#xff0c;让父组件向子组件传递自定义的内容&#xff0c;以便在子组件中进行展示或处理。 1. 匿名插槽 Son.…

初次使用GPU云服务器

前言&#xff1a; 在体验了GPU云服务器&#xff08;GPU Cloud Computing&#xff0c;GPU&#xff09;后&#xff0c;我认为这是一个非常强大的弹性计算服务。它为深度学习、科学计算、图形可视化、视频处理等多种应用场景提供了强大的GPU算力&#xff0c;能够满足各类用户的计算…