1.基本环境
<flink.version>1.17.0</flink.version>
2. 类文件
package com.flink.tablesql;import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.io.File;
import java.util.List;public class Main2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);String path = "E:\\test\\flinktestsql\\orders.sql";
// String path = "/data/flink/flinksql/orders.sql";List<String> list = FileUtils.readLines(new File(path),"UTF-8");StringBuilder stringBuilder = new StringBuilder("");String sql = "";for(String var : list){if(StringUtils.isNotBlank(var)){stringBuilder.append(var);if(var.contains("$")){sql = stringBuilder.toString().replace("$","");System.out.println(sql);System.out.println("end-----");tabEnv.executeSql(sql);stringBuilder = new StringBuilder("");}else{stringBuilder.append("\n");}}}}
}
3.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinktestsql</artifactId><version>1.0-SNAPSHOT</version><!-- <!– 指定仓库位置,依次为aliyun、apache和cloudera仓库 –>--><repositories><repository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url></repository><repository><id>apache</id><url>https://maven.aliyun.com/repository/apache-snapshots</url></repository><repository><id>cloudera</id><url>https://maven.aliyun.com/repository/gradle-plugin</url></repository><repository><id>central</id><url>https://maven.aliyun.com/repository/central</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><!--版本--><properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.0</flink.version><slf4j.version>2.0.5</slf4j.version><scala.binary.version>2.11</scala.binary.version><scala.version>2.11.12</scala.version></properties><dependencies><!-- Flink相关依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>mssql-jdbc</artifactId><version>11.2.1.jre8</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.11.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>1.17.1</version><scope>test</scope></dependency><!-- 日志管理相关依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.19.0</version></dependency></dependencies></project>
4.后续只需要修改增加 .sql文件即可
String path = "E:\\test\\flinktestsql\\orders.sql";
// String path = "/data/flink/flinksql/orders.sql";
5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebfat com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)
6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以
7.以上遇到很多问题,总算解决了,
如增加:
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>