【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作

ClickHouse官网文档

Flink 读取 ClickHouse 数据两种驱动

  1. ClickHouse 官方提供Clickhouse JDBC.【建议使用
  2. 第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver

ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护

ClickHouse 官方提供Clickhouse JDBC的包名:com.clickhouse.jdbc.*

有些版本com.clickhouse.jdbc.* 包含了 ru.yandex.clickhouse.ClickHouseDriver.

因此加载包的时候一定要注意导入的包名

引入依赖

        <!-- clickhouse jdbc driver --><dependency><groupId>com.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId></dependency>

使用的是 0.3 这个版本,该版本就包含上述3方CH jdbc包

     <!-- CH JDBC版本推荐使用 0.3, 0.4的版本是要 JDK 17 --><clickhouse-jdbc.version>0.3.2-patch11</clickhouse-jdbc.version>

自定义Source

测试表映射实体类,该表仅有一个name字段

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CHTestPO {private String name;}

Flink Clickhouse Source

public class ClickHouseSource implements SourceFunction<CHTestPO> {private final String URL;private final String SQL;public ClickHouseSource(String URL, String SQL) {this.URL = URL;this.SQL = SQL;}@Overridepublic void run(SourceContext<CHTestPO> output) throws Exception {//  Properties是持久化的属性集 Properties的key和value都是字符串Properties properties = new Properties();ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(URL, properties);// 使用 try-with-resource 方式关闭JDBC连接 无需手动关闭try (ClickHouseConnection conn = clickHouseDataSource.getConnection()) {// clickhouse 通过游标的方式读取数据Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(SQL);while (rs.next()) {String name = rs.getString(1);output.collect(new CHTestPO(name));}}}@Overridepublic void cancel() {}
}

自定义Sink

需额外引入依赖

        <!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency>

Java 对sql语句处理的两个对象

  1. PreparedStatement对象:能够对预编译之后的sql语句进行处理【SQL 语句预编译:通过占位符'?'实现,可以防止sql注入】
  2. Statement对象:只能对静态的sql语句进行处理

核心代码

/*** 使用 Flink-jdbc-connector + 批量写入 + sql语句的预编译 写入 Clickhouse*/
public class ClickHouseJdbcSink<T> {private final SinkFunction<T> sink;private final static String NA = "null";public ClickHouseJdbcSink(String sql, int batchSize, String url) {sink = JdbcSink.sink(sql,// 对sql语句进行预编译new ClickHouseJdbcStatementBuilder<T>(),// 设置批量插入数据new JdbcExecutionOptions.Builder().withBatchSize(batchSize).build(),// 设置ClickHouse连接配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(url).build());}public SinkFunction<T> getSink() {return this.sink;}/*** 对预编译之后的sql语句进行占位符替换** @param ps:     PreparedStatement对象 下标从 1 开始* @param fields: clickhouse表PO对象的属性字段* @param object: clickhouse表PO对象的属性字段所对应的数据类型*/public static void setPreparedStatement(PreparedStatement ps,Field[] fields,Object object) throws IllegalAccessException, SQLException {// 遍历 Field[]for (int i = 1; i <= fields.length; i++) {// 取出每个Field实例Field field = fields[i - 1];// 指示反射的对象在使用时应该取消 Java 语言访问检查field.setAccessible(true);// 通过Field实例的get方法返回指定的对象Object o = field.get(object);if (o == null) {ps.setNull(i, 0);continue;}// 这里统一设为字符型String fieldValue = o.toString();// 变量和常量的比较,通常将常量放前,可以避免空指针if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {// 替换对应位置的占位符ps.setObject(i, fieldValue);} else {ps.setNull(i, 0);}}}}

对sql语句进行预编译

@Slf4j
public class ClickHouseJdbcStatementBuilder<T> implements JdbcStatementBuilder<T> {@Overridepublic void accept(PreparedStatement preparedStatement, T t) throws SQLException {/* *********************** Java通过反射获取类的字段:** 1. getDeclaredFields():获取所有的字段,不会获取父类的字段* 2. getFields(): 只能会public字段,获取包含父类的字段** *********************/Field[] fields = t.getClass().getDeclaredFields();// 将获取到的字段替换sql预编译之后的占位符。try {ClickHouseJdbcSink.setPreparedStatement(preparedStatement, fields, t);} catch (IllegalAccessException e) {log.error("sql 预编译失败", e);e.printStackTrace();}}
}

ClickHouse读写工具类

image-20231209233006017

public class ClickHouseUtil {private static final String URL;static {ParameterTool parameterTool = ParameterUtil.getParameters();URL = parameterTool.get("clickhouse.url");}/*** 读取clickhouse*/public static DataStream<CHTestPO> read(StreamExecutionEnvironment env, String sql) {return env.addSource(new ClickHouseSource(URL, sql));}/*** 批量写入ClickHouse*/public static <T> DataStreamSink<T> batchWrite(DataStream<T> dataStream,String sql,int batchSize) {//生成 SinkFunctionClickHouseJdbcSink<T> clickHouseJdbcSink =new ClickHouseJdbcSink<T>(sql, batchSize, URL);return dataStream.addSink(clickHouseJdbcSink.getSink());}}

测试一下

public class ClickHouseUtilTest {@DisplayName("测试Flink+jdbc+游标读取Clickhouse")@Testvoid testRead() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 从default数据库的user表中读取数据String sql = "select * from default.user";DataStream<CHTestPO> ds = ClickHouseUtil.read(env, sql);// 打印数据流中的元素ds.print("clickhouse");// 执行程序env.execute();}@DisplayName("测试Flink-Connector-jdbc+预编译批量写入Clickhouse")@Testvoid testBatchWrite() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);CHTestPO po = new CHTestPO();po.setName("Lucy");CHTestPO po1 = new CHTestPO();po1.setName("Jack");DataStream<CHTestPO> ds = env.fromCollection(Arrays.asList(po, po1));// 定义将数据写入ClickHouse数据库的SQL语句String sql = "insert into default.user(name) values(?)";// 调用ClickHouseUtil的batchWrite方法将数据流ds中的数据批量写入ClickHouse数据库ClickHouseUtil.batchWrite(ds, sql, 2);// 执行程序env.execute();}
}

此时表中仅一行记录

image-20231209232619959

读取没有问题!

image-20231209232741522

写入没有问题!

image-20231209232902469

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225948.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为ensp-无线小型wlan配置教程

实验拓扑图&#xff1a; 实验平台&#xff1a;ENSP510 实验设备&#xff1a;Centered Cloud、AC6005、AP4030、STA、Cellphone vlan范围划分 vlan 101 : 10.23.101.1/24 vlan 100 : 10.23.100.1/24实验步骤&#xff1a; 一、创建VLAN100、101配置端口类型 [AC1]vlan batch 100…

STM32启动流程详解(超全,startup_stm32xx.s分析)

单片机上电后执行的第一段代码 1.初始化堆栈指针 SP_initial_sp 2.初始化 PC 指针Reset_Handler 3.初始化中断向量表 4.配置系统时钟 5.调用 C 库函数_main 初始化用户堆栈&#xff0c;然后进入 main 函数。 在正式讲解之前&#xff0c;我们需要了解STM32的启动模式。 STM32的…

【Vue】日期格式化(全局)

系列文章 【Vue】vue增加导航标签 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/134965353 【Vue】Element开发笔记 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/133947977 【Vue】vue&#xff0c;在Windows IIS平台…

关于“Python”的核心知识点整理大全23

目录 ​编辑 第&#xff11;0 章 文件和异常 10.1 从文件中读取数据 10.1.1 读取整个文件 pi_digits.txt file_reader.py 10.1.2 文件路径 10.1.3 逐行读取 file_reader.py 10.1.4 创建一个包含文件各行内容的列表 10.1.5 使用文件的内容 pi_string.py 往期快速传…

Microsoft visual studio 2013卸载方法

1、问 题 Microsoft visual studio 2013 无法通过【程序与功能】卸载 2、解决方法 使用微软的Microsoft visual studio 2013 专用卸载工具 工具下载链接&#xff1a;https://github.com/Microsoft/VisualStudioUninstaller/releases 或 链接&#xff1a;https://pan.baidu.c…

【自动化测试】web3py 连接 goerli

web3py 连接 goerli 直接使用库里方法 if __name__ __main__:from web3.auto.infura.goerli import w3w3.eth.get_balance(get_address_by_private_key(os.getenv("AAA_KEY")))error info: websockets.exceptions.InvalidStatusCode: server rejected WebSocket …

02-MQ入门之RabbitMQ简单概念说明

二&#xff1a;RabbitMQ 介绍 1.RabbitMQ的概念 RabbitMQ 是一个消息中间件&#xff1a;它接受并转发消息。你可以把它当做一个快递站点&#xff0c;当你要发送一个包裹时&#xff0c;你把你的包裹放到快递站&#xff0c;快递员最终会把你的快递送到收件人那里&#xff0c;按…

LeetCode 1143最长公共子序列 1035不相交的线 53最大子序和 | 代码随想录25期训练营day53

动态规划算法11 LeetCode 1143 最长公共子序列 2023.12.16 题目链接代码随想录讲解[链接] int longestCommonSubsequence(string text1, string text2) {//1确定dp二维数组&#xff0c;dp[i][j]表示以text1[i-1]、text2[j-1]结尾相同的公共子序列的最大长度vector<vecto…

普通二叉树和右倾斜二叉树--LeetCode 111题《Minimum Depth of Binary Tree》

本文将以解释计算二叉树的最小深度的思路为例&#xff0c;致力于用简洁易懂的语言详细描述普通二叉树和右倾斜二叉树在计算最小深度时的区别。通过跟随作者了解右倾斜二叉树的概念以及其最小深度计算过程&#xff0c;读者也将对左倾斜二叉树有更深入的了解。这将为解决LeetCode…

BIM 技术:CIM (City Information Modeling) 1-7 级

本心、输入输出、结果 文章目录 BIM 技术&#xff1a;CIM &#xff08;City Information Modeling&#xff09; 1-7 级前言城市信息模型&#xff08;CIM&#xff09;概述城市信息模型分级介绍CIM 1CIM 2CIM 3CIM 4CIM 5CIM 6CIM 7 花有重开日&#xff0c;人无再少年实践是检验真…

【Linux】dump命令使用

dump命令 dump命令用于备份文件系统。使用dump命令可以检查ext2/3/4文件系统上的文件&#xff0c;并确定哪些文件需要备份。这些文件复制到指定的磁盘、磁带或其他存储介质保管。 语法 dump [选项] [目录|文件系统] bash: dump: 未找到命令... 安装dump yum -y install …

TableView复用机制的坑

TableView复用机制的坑 复用机制 UITableView 首先加载能够覆盖一屏幕的 UITableViewCell&#xff08;具体个数要根据每个 cell 的高度而定&#xff09;。 然后当我们往上滑动时&#xff08;往下滑动同理&#xff09;&#xff0c;需要一个新的 cell 放置在列表的下方。此时&…

SpringBoot Starter机制(自定义Start案例,实际开发场景中的短信模拟,AOP实现日志打印)

前言&#xff1a; 在我们上一篇博客中&#xff0c;实现Freemarke的增删改查&#xff0c;今天分享的是关于SpringBoot Starter机制-- 1.SpringBoot Starter 1.1.什么是SpringBoot Starter SpringBoot中的starter是一种非常重要的机制(自动化配置)&#xff0c;能够抛弃以前繁杂…

一款专业的磁盘坏道清除、彻底清除填充数据根据-硬盘数据彻底清除的方法分享

工具提供了硬盘坏道修复功能&#xff0c;你可以将损坏的磁盘放到软件分析&#xff0c;让软件找到错误的地方&#xff0c;让软件找到损坏的区域&#xff0c;通过内置的修复功能就可以将不能正常使用的部分恢复&#xff0c;从而让您的电脑磁盘可以保存很多数据&#xff0c;避免造…

Windows安装Tesseract OCR与Python中使用pytesseract进行文字识别

文章目录 前言一、下载并安装Tesseract OCR二、配置环境变量三、Python中安装使用pytesseract总结 前言 Tesseract OCR是一个开源OCR&#xff08;Optical Character Recognition&#xff09;引擎&#xff0c;用于从图像中提取文本。Pytesseract是Tesseract OCR的Python封装&am…

【python】Debian安装miniconda、spyder、tushare

1. miniconda 安装 — 动手学深度学习 2.0.0 documentation中有安装Miniconda的一些说明。 Miniconda — miniconda documentation是Miniconda网站&#xff0c;里面也有安装说明。 Debian安装按照linux安装即可&#xff1a; mkdir -p ~/miniconda3 wget https://repo.anaco…

macbookpro 2024怎么恢复出厂设置

可能你的MacBook曾经是高性能的代表&#xff0c;但是现在它正慢慢地逝去了自己的光芒&#xff1f;随着逐年的使用以及文件的添加和程序的安装&#xff0c;你的MacBook可能会开始变得迟缓卡顿&#xff0c;或者失却了以往的光彩。如果你发现你的Mac开始出现这些严重问题&#xff…

英语综合教程1第三版的一些题

unit1 unit2 unit3 unit4 unit5 unit6

3.1 内容管理模块 - 工程搭建、课程查询、配置Swagger、数据字典

文章目录 内容管理模块一、基础工程搭建1.1 需求分析1.2 业务流程1.3 数据模型1.4 创建模块工程1.4.1 介绍1.4.2 xuecheng-plus-content 聚合工程1.4.3 模块演示 二、课程查询准备2.1 需求分析2.1.1 业务流程2.1.2 数据模型 2.2 生成PO类2.2.1 新增Maven配置2.2.2 课程基本信息…

CSS的盒子模型(重点)

网页布局的三大核心&#xff1a;盒子模型、浮动、定位 网页布局的过程&#xff1a; 1. 先准备好相关的网页元素&#xff0c;网页元素基本都是盒子 Box 。 2. 利用 CSS 设置好盒子样式&#xff0c;然后摆放到相应位置。 3. 往盒子里面装内容.网页布局的核心本质&#xff1a; 就…