【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、关于clickhouse的介绍
  • 三、clickhouse作为Flink的source示例
    • 1、maven依赖
    • 2、实现
      • 1)、user bean
      • 2)、source实现
    • 3、验证
      • 1)、clickhouse数据源准备
      • 2)、启动应用程序并观察控制台输出


本文主要介绍Flink 的clickhouse作为数据源的使用,以一个简单的示例进行展示。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文依赖clickhouse数据库环境好用。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

一、maven依赖

本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。

如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。

二、关于clickhouse的介绍

关于clickhouse的基本知识详见该系列文章,关于该部分不再赘述。

ClickHouse系列文章
1、ClickHouse介绍
2、clickhouse安装与简单验证(centos)
3、ClickHouse表引擎-MergeTree引擎
4、clickhouse的Log系列表引擎、外部集成表引擎和其他特殊的表引擎介绍及使用
5、ClickHouse查看数据库容量、表的指标、表分区、数据大小等

三、clickhouse作为Flink的source示例

1、maven依赖

<dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.1.40</version>
</dependency>

2、实现

1)、user bean

import lombok.Data;/*** @author alanchan**/
@Data
public class UserSource {private int id;private String name;private int age;public UserSource(int id, String name, int age) {this.id = id;this.name = name;this.age = age;}public UserSource() {}
}

2)、source实现

import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;/*** @author alanchan**/
public class Source_Clickhouse {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<UserSource> users = env.addSource(new ClickhouseSource());// transformation// sinkusers.print();// executeenv.execute();}private static class ClickhouseSource extends RichParallelSourceFunction<UserSource> {private boolean flag = true;private ClickHouseConnection conn = null;private ClickHouseStatement stmt = null;private ResultSet rs = null;private Map<ClickHouseQueryParam, String> additionalDBParams = new HashMap<>();UserSource user = null;private String sql = "select id,name,age from t_flink_sink_clickhouse";// open只执行一次,适合开启资源@Overridepublic void open(Configuration parameters) throws Exception {ClickHouseProperties properties = new ClickHouseProperties();String url = "jdbc:clickhouse://192.168.10.42:8123/tutorial";properties.setSessionId(UUID.randomUUID().toString());
//			properties.setDatabase("tutorial");
//			properties.setHost("192.168.10.42");ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);// ClickHousePropertiesadditionalDBParams.put(ClickHouseQueryParam.SESSION_ID, UUID.randomUUID().toString());conn = dataSource.getConnection();stmt = conn.createStatement();}@Overridepublic void run(SourceContext<UserSource> ctx) throws Exception {while (flag) {rs = stmt.executeQuery(sql, additionalDBParams);while (rs.next()) {user = new UserSource(rs.getInt(1), rs.getString(2), rs.getInt(3));ctx.collect(user);}}}// 接收到cancel命令时取消数据生成@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null)conn.close();if (stmt != null)stmt.close();if (rs != null)rs.close();}}
}

3、验证

启动应用程序,查看应用程序控制台输出是不是与期望的一致即可。

1)、clickhouse数据源准备

1、启动clickhouse服务
2、创建数据库 tutorial
3、创建表 t_flink_sink_clickhouse
4、插入数据并查询

server2 :) select * from t_flink_sink_clickhouse limit 10;SELECT *
FROM t_flink_sink_clickhouse
LIMIT 10Query id: cd8bbe95-e8b0-448d-a7e2-dae3b5b2602d┌─id─┬─name─────┬─age─┐
│  1 │ alanchan │  19 │
│  2 │ alan     │  20 │
│  3 │ chan     │  21 │
└────┴──────────┴─────┘3 rows in set. Elapsed: 0.052 sec. 

2)、启动应用程序并观察控制台输出

由于本应用程序未设置查询频率,所以会一直输出,可以设置以一定的频率来查询。

4> UserSource(id=1, name=alanchan, age=19)
4> UserSource(id=2, name=alan, age=20)
4> UserSource(id=3, name=chan, age=21)

以上,本文主要介绍Flink 的clickhouse作为数据源的使用,以一个简单的示例进行展示。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/217654.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软考 系统架构设计师系列知识点之大数据(2)

接前一篇文章&#xff1a;软考 系统架构设计师系列知识点之大数据&#xff08;1&#xff09; 所属章节&#xff1a; 第11章. 未来信息综合技术 第6节. 云计算和大数据技术概述 大数据和云计算已成为IT领域的两种主流技术。“数据是重要资产”这一概念已成为大家的共识&#xf…

云基础软件深化合作,云轴科技ZStack与麒麟软件战略签约

12月8日&#xff0c;云轴科技ZStack与麒麟软件战略合作签约仪式在北京举行&#xff0c;双方对过往紧密合作表达了充分肯定&#xff0c;并就进一步联合技术创新、打造重点行业标杆和持续赋能客户达成高度共识。云轴科技创始人&CEO张鑫和麒麟软件高级副总经理谢文征共同见证双…

Oracle(2-17) RMAN Maintenance

文章目录 一、基础知识1、Retention Policy 保留政策2、Recovery Window - Part 1 恢复窗口-第1部分3、Cross Checking 交叉检查4、The CROSSCHECK Command CROSSCHECK命令5、OBSOLETE VS EXPIRED 过时与过期6、Deleting Backups and Copies 删除备份和副本7、The DELETE Comma…

无参RCE [GXYCTF2019]禁止套娃1

打开题目 毫无思绪&#xff0c;先用御剑扫描一下 只能扫出index.php 我们尝试能不能用php伪协议读取flag php://filter/readconvert.base64-encode/resourceindex.php php://filter/readconvert.base64-encode/resourceflag.php 但是页面都回显了429 怀疑是不是源码泄露 用…

【Linux学习】深入理解动态库与静态库

目录 十三.动态库与静态库 13.1 认识动静态库 13.2 深入理解动静态库 什么是库? 编译链接过程 动静态库的基本原理 13.3 静态库 静态库的打包: 静态库的使用: 13.4 动态库 动态库的打包: 动态库的使用: 13.5 动态库与静态库怎么选? 十三.动态库与静态库 13.1 认识动静态库 …

【毕业设计】基于STM32的解魔方机器人

1、方案设计 1.采用舵机作为魔方机器人的驱动电机&#xff0c;从舵机的驱动原理可知&#xff1a;舵机运行的速度和控制器的主频没有关系&#xff0c;所以采用单片机和采用更高主频的嵌入式处理器相比在控制效果上没有什么差别。单片机编程过程简单&#xff0c;非常容易上手&am…

uniapp使用u-empty以及其相关属性

Uni-app 是一款基于 Vue.js 的跨平台开发框架&#xff0c;可以用于同时开发多个平台的应用程序。其中&#xff0c;u-empty 是 Uni-app 提供的一个组件&#xff0c;用于展示空状态的页面。 u-empty 组件有以下几个相关属性&#xff1a; image&#xff1a;设置空状态显示的图片。…

orb-slam2学习总结

目录 视觉SLAM 1、地图初始化 2、ORB_SLAM地图初始化流程 3、ORB特征提取及匹配 1、对极几何 2、对极约束 &#xff08;epipolar constraint&#xff09; 3、基础矩阵F、本质矩阵E 5、单目尺度不确定性 6、单应矩阵&#xff08;Homography Matrix&#xff09; 6.1 什么是单应矩…

Python入门第5篇(爬虫相关)

目录 爬虫初步说明 html相关基础 urllib之读取网页内容 http相关基础 requests之webapi调用 爬虫初步说明 爬虫&#xff0c;一句话来说&#xff0c;即模拟浏览器爬取一些内容&#xff0c;如自动下载音乐、电影、图片这种的 具体可能是直接访问网页进行解析&#xff0c;也…

【Spark精讲】RDD特性之数据本地化

首选运行位置 上图红框为RDD的特性五&#xff1a;每个RDD的每个分区都有一组首选运行位置&#xff0c;用于标识RDD的这个分区数据最好能够在哪台主机上运行。通过RDD的首选运行位置可以让RDD的某个分区的计算任务直接在指定的主机上运行&#xff0c;从而实现了移动计算而不是移…

【matlab进阶学习-6】 读取log数据data.txt文件,并做处理,导出报告/表格/图表

原始文件 原始文件格式txt&#xff0c;每一行对应一个数据&#xff0c;数据之间由逗号分割开 对应意思 时刻&#xff0c;电压&#xff0c;电流&#xff0c;功率&#xff0c;容量&#xff0c;&#xff0c;电流&#xff0c;功率&#xff0c;&#xff0c;RTC时间&#xff0c;状态…

内网服务器部署maven私服简记

前言 很多企业希望创建自己的maven私服&#xff0c;但服务器无法和外网连通&#xff0c;所以这里介绍一套完整的内网部署nexus的解决方案。实现的方式也很简单&#xff0c;将下载好的nexus安装和项目所需的依赖仓库都上传到服务i去上去&#xff0c;通过脚本的方式实现批量导入…

MySQL和 Oracle查看表信息

在日常Mysql和Oracle数据库使用时&#xff0c;经常使用到查看表、索引等信息&#xff0c;记录下来&#xff0c;方便备查。 MySQL 主要是使用 information_schema 信息表&#xff1b; Oracle 主要是使用 各种视图&#xff0c;如user_ind_columns。 一、查看所有表 MySQL查看表…

CSS的三大特性(层叠性、继承性、优先级---------很重要)

CSS 有三个非常重要的三个特性&#xff1a;层叠性、继承性、优先级。 层叠性 场景&#xff1a;相同选择器给设置相同的样式&#xff0c;此时一个样式就会覆盖&#xff08;层叠&#xff09;另一个冲突的样式。层叠性主要解决样式冲突 的问题 原则&#xff1a;  样式冲突&am…

autojs-练手-视频号点赞(进阶版)

注释很详细&#xff0c;直接上代码 较初阶版新增内容 1. 简单但好用的ui界面 为方便大家参考&#xff0c;ui界面的模板单独拿出来了 ui界面模板 2. opencv图像识别 3. 需加载情况特殊处理&#xff08;防卡壳&#xff09; 4. 增加自动判断是否已点赞的情况 源码部分 // 启用…

HarmonyOS4.0从零开始的开发教程14Web组件的使用

HarmonyOS&#xff08;十二&#xff09;Web组件的使用 1 概述 相信大家都遇到过这样的场景&#xff0c;有时候我们点击应用的页面&#xff0c;会跳转到一个类似浏览器加载的页面&#xff0c;加载完成后&#xff0c;才显示这个页面的具体内容&#xff0c;这个加载和显示网页的…

智能优化算法应用:基于水循环算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于水循环算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于水循环算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.水循环算法4.实验参数设定5.算法结果6.参考文…

拓扑排序实现循环依赖判断 | 京东云技术团队

本文记录如何通过拓扑排序&#xff0c;实现循环依赖判断 前言 一般提到循环依赖&#xff0c;首先想到的就是Spring框架提供的Bean的循环依赖检测&#xff0c;相关文档可参考&#xff1a; https://blog.csdn.net/cristianoxm/article/details/113246104 本文方案脱离Spring Be…

无需公网IP联机Minecraft,我的世界服务器本地搭建教程

目录 前言 1.Mcsmanager安装 2.创建Minecraft服务器 3.本地测试联机 4. 内网穿透 4.1 安装cpolar内网穿透 4.2 创建隧道映射内网端口 5.远程联机测试 6. 配置固定远程联机端口地址 6.1 保留一个固定TCP地址 6.2 配置固定TCP地址 7. 使用固定公网地址远程联机 8.总…

Vue 中 v-model 的修饰符

lazy 修饰符&#xff1a;将 v-model 改为失去焦点后更新数据。 number 修饰符&#xff1a;将 v-model 数据转为数字类型。 trim 修饰符&#xff1a;去除 v-model 数据中的首尾空格。 语法格式&#xff1a; // lazy 修饰符 <input v-model.lazy"数据"> // nu…