ClickHouse--11--ClickHouse API操作

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 1.Java 读写 ClickHouse API
    • 1.1 首先需要加入 maven 依赖
    • 1.2 Java 读取 ClickHouse 集群表数据
        • JDBC--01--简介
      • ClickHouse java代码
    • 1.3 Java 向 ClickHouse 表中写入数据
  • 2.Spark 写入 ClickHouse API
    • 2.1 导入依赖
    • 2.2 代码编写
  • 3.Flink 写入 ClickHouse API
    • 3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API
    • 3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API


1.Java 读写 ClickHouse API

1.1 首先需要加入 maven 依赖

<!-- 连接 ClickHouse 需要驱动包-->
<dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version>
</dependency>

1.2 Java 读取 ClickHouse 集群表数据

JDBC–01–简介

在这里插入图片描述

public class Test01 {public static void main(String[] args) throws Exception {//1.注册数据库驱动Class.forName("com.mysql.jdbc.Driver");//2.获取数据库连接Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/jt_db?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8","root", "root");//3.获取传输器Statement stat = conn.createStatement();//4.发送SQL到服务器执行并返回执行结果String sql = "select * from account";ResultSet rs = stat.executeQuery( sql );//5.处理结果while( rs.next() ) {int id = rs.getInt("id");String name = rs.getString("name");double money = rs.getDouble("money");System.out.println(id+" : "+name+" : "+money);}//6.释放资源rs.close();stat.close();conn.close();System.out.println("TestJdbc.main()....");}}

ClickHouse java代码


import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;import java.sql.ResultSet;
import java.sql.SQLException;public class test01 {public static void main(String[] args) throws SQLException {ClickHouseProperties props = new ClickHouseProperties();props.setUser("default");props.setPassword("");//1.注册数据库驱动配置BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123,node2:8123,node3:8123/default", props);//2.获取数据库连接ClickHouseConnection conn = dataSource.getConnection();//3.获取传输器ClickHouseStatement statement = conn.createStatement();//4.发送SQL到服务器执行并ResultSet rs = statement.executeQuery("select id,name,age from test");//5.处理结果while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");System.out.println("id = " + id + ",name = " + name + ",age = " + age);}//6.释放资源conn.close();statement.close();rs.close();}
}

1.3 Java 向 ClickHouse 表中写入数据

package com.cy.demo;import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;import java.sql.ResultSet;
import java.sql.SQLException;public class test01 {public static void main(String[] args) throws SQLException {ClickHouseProperties props = new ClickHouseProperties();props.setUser("default");props.setPassword("");//1.注册数据库驱动配置BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123/default", props);//2.获取数据库连接ClickHouseConnection conn = dataSource.getConnection();//3.获取传输器ClickHouseStatement statement = conn.createStatement();//4.发送SQL到服务器执行并statement.execute("insert into test values (100,'王五',30)");//可以拼接批量插入多条//6.释放资源conn.close();statement.close();rs.close();}
}

在这里插入图片描述

2.Spark 写入 ClickHouse API

  • SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse对应的表中。在 ClickHouse 中需要预先创建好对应的结果表

2.1 导入依赖

        <!-- 连接 ClickHouse 需要驱动包--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version><!-- 去除与 Spark 冲突的包 --><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion></exclusions></dependency><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL ON Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.1</version></dependency>

2.2 代码编写

val session: SparkSession =
SparkSession.builder().master("local").appName("test").getOrCreate()
val jsonList = List[String](
"{\"id\":1,\"name\":\"张三\",\"age\":18}",
"{\"id\":2,\"name\":\"李四\",\"age\":19}",
"{\"id\":3,\"name\":\"王五\",\"age\":20}"
)
//将 jsonList 数据转换成 DataSet
import session.implicits._
val ds: Dataset[String] = jsonList.toDS()
val df: DataFrame = session.read.json(ds)
df.show()
//将结果写往 ClickHouse
val url = "jdbc:clickhouse://node1:8123/default"
val table = "test"
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "")
properties.put("socket_timeout", "300000")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url,
table, properties)

3.Flink 写入 ClickHouse API

  • 可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink 在1.11.0 版本对其 JDBC Connnector 进行了重构:

在这里插入图片描述

3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API

  1. maven 中需要导入以下包:
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码:
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到 ClickHouse 中,只支持 Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数
据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1,后期每个并行度满批次需要的条数时,会插入 click 中
env.setParallelism(1)
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取 Socket 中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line => {
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
})
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将 table 对象写入 ClickHouse 中
//需要在 ClickHouse 中创建表:create table flink_result(id Int,name String,age Int) engine =
MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备 ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认 5000 条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册 ClickHouse table Sink,设置 sink 数据的字段及 Schema 信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
}
}

3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API

  1. 在 Maven 中导入以下依赖包
<!-- Flink1.11 后需要 Flink-client 包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入 ClickHouse ,目前只支持 DataStream API
*/
object FlinkWriteToClickHouse2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
//准备向 ClickHouse 中插入数据的 sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置 ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据 SQL
insetIntoCkSql,
//设置插入 ClickHouse 数据的参数
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接 ClickHouse 的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入 sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/688581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机设计大赛 深度学习乳腺癌分类

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度&#xff0c;召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…

Mysql5.6忘记密码,如何找回(windows)

mysql5.6安装 第一步&#xff1a;关闭正在运行的数据库服务 net stop mysql第二步&#xff1a;在my.ini文件当中的[mysqld] 任意一个位置放入 skip-grant-tables第三步&#xff1a;启动mysql服务 net start mysql第四步&#xff1a;服务启动成功后就可以登录了&#xff0c;…

举个栗子!Tableau 技巧(265):灵活对比文本表的行数据

通过文本表查看数据时&#xff0c;我们经常需要将某一行数据与其他行进行对比&#xff0c;如何能更灵活更直观的对比分析各行数据情况呢&#xff1f; 可以试试这个方法&#xff01;如下示例&#xff1a;点击某明细行时&#xff0c;该明细行会自动置顶&#xff0c;且其它行会新…

Linux系统:iptables 防火墙

目录 一、安全技术与防火墙 1、安全技术概念 2、防火墙 2.1 防火墙概念 2.2 防火墙分类 2.3 linux的防火墙Netfilter 2.4 防火墙工具介绍 2.5 netfilter 和 iptables 的关系 二、iptables 1、概念 2、五表五链 2.1 五个table表 2.2 五个chain链 2.3 内核中数据包…

ClickHouse--06--其他扩展MergeTree系列表引擎

其他扩展MergeTree系列 MergeTree 系列表引擎 --种类 MergeTree 系 列 表 引 擎 包 含 &#xff1a; MergeTreeReplacingMergeTreeSummingMergeTree&#xff08;汇总求和功能&#xff09;AggregatingMergeTree&#xff08;聚合功能&#xff09;CollapsingMergeTree&#xff08…

- 项目落地 - 《选择项目工具的方法论》

本文属于专栏《构建工业级QPS百万级服务》 提纲&#xff1a; 选择大概率能完成业务目标的工具选择最适合的工具制作最适合的工具 本文所说的项目工具&#xff0c;泛指业务软件开发&#xff0c;所依赖的第三方提供的成熟的资源。包括但不限于开发语言、编辑工具、编译工具、三方…

IgG1 (mouse), ELISA kit——ENZO热销产品

90分钟内可得结果的高特异性定量ELISA试剂盒 免疫球蛋白G&#xff08;IgG&#xff09;是一种免疫球蛋白单体&#xff0c;由两条&#xff08;γ&#xff09;重链和两条轻链组成。每个IgG分子包含两个抗原结合域和一个效应&#xff08;Fc&#xff09;域。Enzo Life Sciences可提供…

WebService接口测试

WebService的理解 WebService就是Web服务的意思&#xff0c;对应的应用层协议为SOAP&#xff08;相当于HTTP协议&#xff09;&#xff0c;可理解为远程调用技术。 特点&#xff1a; 客户端发送的请求主体内容&#xff08;请求报文&#xff09;的格式为XML格式 接口返回的响…

学习数据结构和算法的第9天

题目讲解 移除元素 ​ 给你一个数组nums和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val的元素&#xff0c;并返回移除后数组的新长度。 ​ 不要使用额外的数组空间&#xff0c;你必须仅使用0(1)额外空间并 原地 修改输入数组。 ​ 元素的顺序可以改变。你不需要…

电脑屏幕录制工具 Top10 榜单,免费无水印方法集

随着媒体行业的突飞猛进&#xff0c;不同服务之间对有效屏幕录制的竞争日益激烈。这导致市场上出现了质量参差不齐的屏幕录像机。特别是有些录屏器会自动给你录制的视频加上水印&#xff0c;给需要在公共场合使用的人留下不专业的印象。除此之外&#xff0c;它们甚至不能保护您…

OS文件管理

文件管理 文件的属性 文件所包含的属性&#xff1a; 文件名&#xff1a;由创建文件的用户决定文件名&#xff0c;主要为了方便用户找到文件&#xff0c;同一目录下不允许有重名文件。标识符&#xff1a;一个系统内的各文件标识符唯一&#xff0c;对用户来说毫无可读性&#…

vm centos7 docker 安装 mysql 5.7.28(2024-02-18)

centos系统版本 [rootlocalhost mysql5.7]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core) docker版本 拉取指定版本镜像 docker pull mysql:5.7.28 docker images 创建挂载目录&#xff08;数据存储在centos的磁盘上&#xff09; mkdir -p /app/softwa…

全面的ASP.NET Core Blazor简介和快速入门

前言 因为咱们的MongoDB入门到实战教程Web端准备使用Blazor来作为前端展示UI&#xff0c;本篇文章主要是介绍Blazor是一个怎样的Web UI框架&#xff0c;其优势和特点在哪&#xff1f;并带你快速入门上手ASP.NET Core Blazor(当然这个前提是你要有一定的C#编程基础的情况&#x…

大数据01-导论

零、文章目录 大数据01-导论 1、数据与数据分析 **数据&#xff1a;是事实或观察的结果&#xff0c;是对客观事物的逻辑归纳&#xff0c;是用于表示客观事物的未经加工的原始素材。**数据可以是连续的值&#xff0c;比如声音、图像&#xff0c;称为模拟数据&#xff1b;也可…

探索【注解】、【反射】、【动态代理】,深入掌握高级 Java 开发技术

文章目录 Java注解1.注解基础2.注解原理 反射1.Class对象的获取1.基础公共类1.1.Object > getClass()1.2.类名.class 的方式1.3.Class.forName() 2.获取类的成员变量3.获取成员方法并调用4.反射优缺点 代理1.结构2.静态代理2.1.案例1-计算前后校验2.1.1.创建接口2.1.2.创建实…

Ubuntu20.04 安装jekyll

首先使根据官方文档安装&#xff1a;Jekyll on Ubuntu | Jekyll • Simple, blog-aware, static sites 如果没有报错&#xff0c;就不用再继续看下去了。 我这边在执行gem install jekyll bundler时报错&#xff0c;所以安装了rvm&#xff0c;安装rvm可以参考这篇文章Ubuntu …

javaweb——socket

定义 Socket&#xff08;套接字&#xff09;是计算机网络编程中的一种抽象&#xff0c;用于在网络上进行通信。它允许计算机之间通过网络进行数据传输。在Java中&#xff0c;Socket类提供了对TCP/IP协议的支持&#xff0c;通过它可以创建客户端和服务端程序&#xff0c;实现网…

FLUENT Meshing Watertight Geometry工作流入门 - 9 生成体网格

本视频中学到的内容&#xff1a; 讨论体网格的重要性&#xff0c;并了解生成体网格的不同方法 了解体网格质量&#xff0c;以及如何改进 视频链接&#xff1a; FLUENT Meshing入门教程-9生成体网格_哔哩哔哩_bilibili 体网格生成是使用大量离散体积或单元来离散化/表示计算模…

跨境云手机如何简化tiktok运营流程

如今&#xff0c;tiktok已经成为世界范围内都非常流行的社交媒体平台。然而在大多数情况下&#xff0c;由于网络原因&#xff0c;tiktok无法在国内使用&#xff0c;但依然有越来越多的人注册tiktok号码、建立tiktok矩阵。原因是tiktok仍然有大量的流量可供商业使用&#xff0c;…

java面试题基础篇

1.java面向对象三大特性 ​ 封装&#xff08;Encapsulation&#xff09;&#xff1a;是面向对象方法的重要原则&#xff0c;就是把对象的属性和操作&#xff08;或服务&#xff09;结合为一个独立的整体&#xff0c;并尽可能隐藏对象的内部实现细节。 ​ 继承&#xff1a;就是…