手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。
组件 | 版本 |
jdk | 1.8 |
flink | 1.17.2 |
clickhouse | 23.12.2.59 |
1.背景
flink官方不支持clickhouse连接器,工作中难免会用到。
2.方案
利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16
3.编译
导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar
4.将此依赖包,导入flink工程
spring boot工程
4.1)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion>
<!-- <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.13</version><relativePath/> <!– lookup parent from repository –></parent>--><parent><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid</artifactId><version>${project.build.version}</version></parent><artifactId>mit-microgrid-flink</artifactId><name>mit-microgrid-flink</name><description>flink connector clickhouse</description><properties><java.version>1.8</java.version><flink.version>1.17.2</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><!-- 排除SpringBoot自带的日志依赖 --><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!--flink--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><!--flink connector--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!--flink connector clickhouse--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.16.0-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId>
<!-- <artifactId>flink-clients_2.12</artifactId>--><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><!-- flink sql --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!-- Flink JDBC Connector -->
<!-- <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.6</version> <!– 与您的Flink版本匹配 –></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.17</version></dependency><!-- ClickHouse JDBC Driver --><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 --></dependency><!-- 添加clickhouse-maven依赖--><dependency><groupId>ru.ivi.opensource</groupId><artifactId>flink-clickhouse-sink</artifactId><version>1.2.0</version></dependency><!--module--><dependency><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid-common-core</artifactId><version>${project.build.version}</version></dependency><dependency><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid-api-history</artifactId><version>${project.build.version}</version></dependency><!--sql parse--><dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.37.0</version></dependency>
<!-- <dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-server</artifactId><version>1.37.0</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-parser</artifactId><version>${flink.version}</version></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-text</artifactId><version>1.12.0</version></dependency></dependencies><build><!--<plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin>-->
<!-- <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin>-->
<!-- </plugins>--><finalName>${project.artifactId}</finalName><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.7.3</version><configuration><mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass><fork>true</fork><layout>ZIP</layout><includeSystemScope>true</includeSystemScope></configuration><executions><execution><goals><goal>repackage</goal></goals><configuration><classifier>-with-dependencies</classifier></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.3.0</version><configuration><archive><addMavenDescriptor>false</addMavenDescriptor><manifest><mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><descriptors><descriptor>src/main/resources/assembly/assembly.xml</descriptor></descriptors><outputDirectory>./../out</outputDirectory></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
4.2)核心方法:
/*** multiple sql execute** @param sqlList*/public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {log.info("参数sqlList: {}", sqlList);
// StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);EnvironmentSettings setting = EnvironmentSettings.newInstance().inBatchMode().build();TableEnvironment tEnv = TableEnvironment.create(setting);if (CollectionUtil.isNullOrEmpty(sqlList)) {log.warn("sqlList参数为空");return null;}for (String s : sqlList) {TableResult tableResult = tEnv.executeSql(s);Optional<JobClient> jobClientOptional = tableResult.getJobClient();if (jobClientOptional.isPresent()) {JobClient jobClient = jobClientOptional.get();log.info("jobClient: " + jobClient);return jobClient;}}log.error("没有可执行的job");return null;}
5.源码地址
https://github.com/genghongsheng0/mit-microgrid-flink