Flink CDC -Sqlserver to Sqlserver java 模版编写

1.基本环境

     <flink.version>1.17.0</flink.version>

2. 类文件

package com.flink.tablesql;import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.io.File;
import java.util.List;public class Main2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);String path = "E:\\test\\flinktestsql\\orders.sql";
//        String path = "/data/flink/flinksql/orders.sql";List<String> list = FileUtils.readLines(new File(path),"UTF-8");StringBuilder stringBuilder = new StringBuilder("");String sql = "";for(String var : list){if(StringUtils.isNotBlank(var)){stringBuilder.append(var);if(var.contains("$")){sql = stringBuilder.toString().replace("$","");System.out.println(sql);System.out.println("end-----");tabEnv.executeSql(sql);stringBuilder = new StringBuilder("");}else{stringBuilder.append("\n");}}}}
}

3.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinktestsql</artifactId><version>1.0-SNAPSHOT</version><!--    &lt;!&ndash; 指定仓库位置,依次为aliyun、apache和cloudera仓库 &ndash;&gt;--><repositories><repository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url></repository><repository><id>apache</id><url>https://maven.aliyun.com/repository/apache-snapshots</url></repository><repository><id>cloudera</id><url>https://maven.aliyun.com/repository/gradle-plugin</url></repository><repository><id>central</id><url>https://maven.aliyun.com/repository/central</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><!--版本--><properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.0</flink.version><slf4j.version>2.0.5</slf4j.version><scala.binary.version>2.11</scala.binary.version><scala.version>2.11.12</scala.version></properties><dependencies><!-- Flink相关依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>mssql-jdbc</artifactId><version>11.2.1.jre8</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.11.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-sqlserver-cdc</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>1.17.1</version><scope>test</scope></dependency><!-- 日志管理相关依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.19.0</version></dependency></dependencies></project>

4.后续只需要修改增加 .sql文件即可

       String path = "E:\\test\\flinktestsql\\orders.sql";

        // String path = "/data/flink/flinksql/orders.sql";

5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。

Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebfat com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)

6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以

7.以上遇到很多问题,总算解决了,

如增加:

<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/175861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

护眼灯值不值得买?国AA级别标准的护眼台灯推荐

在中国&#xff0c;近视人口占全国总人数的30%左右。中全国患近视眼的总数达到3.6亿。城市近视人口的近视率达到33%。其中&#xff0c;大学生的近视率是最高的&#xff0c;达到了75&#xff05;左右。中学生的近视率大约为50&#xff05;。在校佩戴眼镜的小学生比例为30&#x…

开始使用Spring Boot Admin吧-使用Nacos注册SBA

什么是 Spring Boot Admin&#xff08;SBA&#xff09;? Spring Boot Admin 是 codecentric 公司开发的一款开源社区项目&#xff0c;目标是让用户更方便的管理以及监控 Spring Boot 应用。 应用可以通过我们的Spring Boot Admin客户端&#xff08;通过HTTP的方式&#xff0…

GaussDB数据库SQL系列-触发器

目录 一、前言 二、触发器概念 三、GaussDB数据库中的触发器 1、语法格式 2、创建步骤 3、注意事项 4、附&#xff1a;表和视图上支持的触发器种类 四、GaussDB数据库中的示例 示例一、在GaussDB数据库中创建一个触发器&#xff0c;以便在插入新记录时自动将记录的创建…

论文笔记--Toolformer: Language Models Can Teach Themselves to Use Tools

论文笔记--Toolformer: Language Models Can Teach Themselves to Use Tools 1. 文章简介2. 文章概括3 文章重点技术3.1 Toolformer3.2 APIs 4. 文章亮点5. 原文传送门 1. 文章简介 标题&#xff1a;Toolformer: Language Models Can Teach Themselves to Use Tools作者&#…

堆详解(C语言实现)

文章目录 写在前面1. 堆的概念和性质1.1 堆的概念1.2 堆的性质 2 堆的实现2.1 堆结构的定义2.2 堆的初始化2.3 堆的插入2.3.1 向上调整算法2.3.2 堆的插入元素过程 2.4 堆的删除2.4.1 向下调整算法2.4.2 堆的删除元素过程 2.5 获取堆顶元素2.6 获取堆元素个数2.7 判断堆是否为空…

vscode代码调试配置

C/C代码调试 点击 vscode左侧的 run and debug&#xff0c;新建launch.json 和 tasks.json&#xff0c;并进行配置如下 launch.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 { // Use IntelliSense to learn ab…

STM32H7 RTC及PC13问题

程序加了RTC时间过后&#xff0c;发现原本的RTC定时唤醒中断也不好使了&#xff0c;开始以为是PC13入侵检测引脚问题&#xff0c;经过测试&#xff0c;发现了一个大问题&#xff0c;当使用 HAL_RTC_SetTime(&hrtc, &time, RTC_FORMAT_BCD); 函数后&#xff0c;RTC变得…

Nano文本编辑器

目录 一、概述 1.1、起源 二、安装 2.1、Centos系统 2.2、Debian / Ubuntu 系统 三、基本操作 3.1、光标移动和选择文本 3.2、插入和删除文本 3.3、保存和退出文件 3.4、搜索和替换文本 四、配置Nano 4.1、修改Nano的配置文件 4.2、自定义快捷键 4.3、更改外观和…

Spring Security 6.x 系列(6)—— 显式设置和修改登录态信息

一、前言 此篇是对上篇 Spring Security 6.x 系列&#xff08;5&#xff09;—— Servlet 认证体系结构介绍 中4.9章节显式调用SecurityContextRepository#saveContext进行详解分析。 二、设置和修改登录态 2.1 登录态存储形式 使用Spring Security框架&#xff0c;认证成功…

opencv安装过程与问题解决

主流程看这篇文章Ubuntu下Opencv的安装&#xff08;亲测有效&#xff0c;超级简单&#xff01;&#xff09; 但是里面的cmake用我这里的&#xff0c;不然安装后会dpkg会找不到opencv sudo cmake -D CMAKE_BUILD_TYPERelease -D CMAKE_INSTALL_PREFIX/usr/local -D OPENCV_GEN…

使用 ChatGPT 创建 Makefile 构建系统:从 Docker 开始

使用 Docker 搭配 ChatGPT 创建 Makefile 构建系统 Makefile 构建系统是嵌入式软件团队实现其开发流程现代化的基础。构建系统不仅允许开发人员选择各种构建目标&#xff0c;还可以将这些构建集成到持续集成/持续部署 (CI/CD) 流程中。使用诸如 ChatGPT 这样的人工智能 (AI) 工…

深度剖析API接口测试工具的企业价值

随着企业软件开发的日益复杂和互联网应用的普及&#xff0c;API接口成为不同软件系统之间信息传递的桥梁。在这一背景下&#xff0c;API接口测试工具的应用变得愈加重要&#xff0c;对企业的发展和软件质量起到了关键性的作用。本文将深入探讨API接口测试工具在企业中的重要性&…

shell编程系列(2)-数组的定义和使用

文章目录 前言数组的定义关联数组&#xff08;字典&#xff09;数组的常用操作获取数组长度获取数组中所有元素数组中新增元素删除数组中元素删除整个数组复制数组拼接数组加载文件内容到数组遍历数组 结语 前言 在上一篇中讲了普通变量的定义和使用&#xff0c;在本篇中会讲解…

leetcode:2133. 检查是否每一行每一列都包含全部整数(python3解法)

难度&#xff1a;简单 对一个大小为 n x n 的矩阵而言&#xff0c;如果其每一行和每一列都包含从 1 到 n 的 全部 整数&#xff08;含 1 和 n&#xff09;&#xff0c;则认为该矩阵是一个 有效 矩阵。 给你一个大小为 n x n 的整数矩阵 matrix &#xff0c;请你判断矩阵是否为一…

modbus中如何将float转换为short[]

modbus4j中 有一个发送的方法 public final ModbusResponse send(ModbusRequest request) throws ModbusTransportException,其中 WriteRegistersRequest 是一个用于向 Modbus 设备写入多个寄存器的请求。Modbus 是一种工业通信协议&#xff0c;用于连接电子设备。在 Modbus 通…

matlab配置

matlab配置 windowslinux挂载安装MATLAB windows 按照这里一步步配置就行( 移动硬盘中软件备份中自取) linux linux配置步骤 挂载 sudo mount -t auto -o loop /media/oyk/Elements/ubuntu/MATLAB/R2017a_glnxa64_dvd1.iso ./matlab/安装MATLAB 挂载完成后&#xff0c;先…

SpringCloudAlibaba之Nacos的持久化和高可用——详细讲解

目录 一、Nacos持久化 1.持久化说明 2.安装mysql数据库5.6.5以上版本(略) 3.修改配置文件 二、nacos高可用 1.集群说明 2.nacos集群架构图 2.集群搭建注意事项 3.集群规划 4.搭建nacos集群 5.安装Nginx 6.配置nginx conf配置文件 7.启动nginx进行测试即可 一、Nacos持久…

laravel8中常用路由使用(笔记四)

目录 1、框架路由目录统一放该目录 2、基本路由,路由都调用Route方法 3、控制器使用路由 4、路由参数 5、路由组 6、命名路由 7、命令查看当前路由列表 8、路由缓存 在Laravel 8中&#xff0c;路由定义了应用程序中接受请求的方式。它们定义了URL和相应的控制器方法之间的…

HTML input 属性笔记

Input表示Form表单中的一种输入对象&#xff0c;其又随Type类型的不同而分文本输入框&#xff0c;密码输入框&#xff0c;单选/复选框&#xff0c;提交/重置按钮等&#xff0c;下面一一介绍。 1&#xff0c;typetext 输入类型是text&#xff0c;这是我们见的最多也是使用最多的…

13、LCD1602调试工具

LCD1602调试工具 使用LCD1602液晶屏作为调试窗口&#xff0c;提供类似Printf函数的功能&#xff0c;可实时观察单片机内部数据的变化情况&#xff0c;便于调试和演示。 main.c #include <REGX52.H> #include "LCD1602.h" #include "Delay.h"//存储…