【Table/SQL Api】Flink Table/SQL Api表转流读取MySQL

引入依赖

jdbc依赖

flink-connector-jdbc + mysql-jdbc-driver 操作mysql数据库

        <!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency><!-- mysql jdbc driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>

Table/SQL Api依赖

  1. Table/SQL Api 扩展依赖
  2. Table/SQL Api 基础依赖
  3. Table/SQL Api 和 DataStream Api 交互的依赖 bridge
  4. Flink Planner 依赖
        <!-- Table/SQL Api 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId></dependency><!-- Table/SQL Api 扩展依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId></dependency><!-- bridge桥接器,主要负责Table API和 DataStream API的连接支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId></dependency><!-- Flink Planner 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId></dependency>

对应版本在这 (项目Flink版本为1.14.5

image-20231210161727111

Flink读写MySQL工具类

Table Api 环境加载

Table API和SQL Api都是基于Table接口

Table Api上下文环境有3种类型

  1. TableEnvironment:只支持Batch作业
  2. BatchTableEnvironment:只支持Batch作业
  3. StreamTableEnvironment: 支持流计算【用这个】

Planner(查询处理器)

Planner(查询处理器):解析sql、优化sql和执行sql

Flink Planner的类型:

  1. Flink Planner (Old Planner)
  2. Blink Planner (Flink 1.14之前需要手动导入依赖)

Blink Planner从Flink 1.11版本开始为Flink-table的默认查询处理器

Blink Planner使得Table Api & SQL 层实现了流批统一

Catalog对象

Catalog对象是提供了元数据信息,数据源与数据表的信息则存储在Catalog中

// 创建Catalog对象
new JdbcCatalog(catalog_name, database, username, passwd, url);

Catalog对象是接口

Catalog接口的实现:(Flink 1.14版本之前)

  1. PG (PostgresSQL) Catalog
  2. HiveCatalog
  3. Mysql Catalog (Flink 1.15 才有)

DDL与数据库表结构必须一模一样,建立映射,这种方式数据库表结构如果变化,代码也必须随之变化重新打包,因此这种方式用的不多,一般catalog会用的比较多。

但由于项目Flink依赖用的是1.14.5,因此还是使用DDL语句实现。

代码实现

public class MysqlUtil {/*** 数据库连接对象*/private static Connection connection = null;/*** SQL语句对象*/private static PreparedStatement preparedStatement = null;/*** 结果集对象*/private static ResultSet rs = null;/*** 使用 Flink Table/SQL Api 读取Mysql** @param env:           流计算上下文环境* @param parameterTool: 参数工具* @param clazz:         流水线输出对象的类* @param tableName:     表名* @param ddlString:     DDL字符串* @param sql:           SQL查询语句* @return DataStream<T>:DataStream对象*/public static <T> DataStream<T> readWithTableOrSQLApi(StreamExecutionEnvironment env,ParameterTool parameterTool,Class<T> clazz,String tableName,String ddlString,String sql) throws Exception {// 创建TableApi运行环境EnvironmentSettings bsSettings =EnvironmentSettings.newInstance()// Flink 1.14不需要再设置 Planner//.useBlinkPlanner()// 设置流计算模式.inStreamingMode().build();// 创建StreamTableEnvironment实例StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);// 指定方言 (选择使用SQL语法还是HQL语法)tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 编写DDL ( 数据定义语言 )String ddl = buildMysqlDDL(parameterTool, tableName, ddlString);// StreamTableEnvironment注册虚拟表tableEnv.executeSql(ddl);// 查询结果是Table对象Table table = tableEnv.sqlQuery(sql);// 将Table对象转换为DataStream对象return tableEnv.toDataStream(table, clazz);}/*** 根据参数生成MySQL的DDL语句** @param parameterTool  参数工具,用于获取MySQL连接信息* @param tableName      要创建的表名* @param ddlFieldString 表字段的DDL语句* @return 生成的完整的MySQL DDL语句*/public static String buildMysqlDDL(ParameterTool parameterTool,String tableName,String ddlFieldString) {// 从参数工具中获取mysql连接的urlString url = parameterTool.get(ParameterConstants.Mysql_URL);// 从参数工具中获取mysql连接的用户名String username = parameterTool.get(ParameterConstants.Mysql_USERNAME);// 从参数工具中获取mysql连接的密码String passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);// 从参数工具中获取MySQL的驱动程序String driver = parameterTool.get(ParameterConstants.Mysql_DRIVER);// 返回完整的DDL语句return "CREATE TABLE IF NOT EXISTS " +tableName +" (\n" +ddlFieldString +")" +" WITH (\n" +"'connector' = 'jdbc',\n" +"'driver' = '" + driver + "',\n" +"'url' = '" + url + "',\n" +"'username' = '" + username + "',\n" +"'password' = '" + passwd + "',\n" +"'table-name' = '" + tableName + "'\n" +")";}/*** 初始化 jdbc Connection*/public static Connection init(ParameterTool parameterTool) {String _url = parameterTool.get(ParameterConstants.Mysql_URL);String _username = parameterTool.get(ParameterConstants.Mysql_USERNAME);String _passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);try {connection = DriverManager.getConnection(_url, _username, _passwd);} catch (Exception e) {throw new RuntimeException(e);}return connection;}/*** 生成 PreparedStatement*/public static PreparedStatement initPreparedStatement(String sql) {try {preparedStatement = connection.prepareStatement(sql);} catch (Exception e) {throw new RuntimeException(e);}return preparedStatement;}/*** 关闭 jdbc Connection*/public static void close() {try {if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭 PreparedStatement*/public static void closePreparedStatement() {try {if (preparedStatement != null) {preparedStatement.close();}} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭 ResultSet*/public static void closeResultSet() {try {if (rs != null) {rs.close();}} catch (Exception e) {throw new RuntimeException(e);}}/*** 执行 sql 语句*/public static ResultSet executeQuery(PreparedStatement ps) {preparedStatement = ps;try {rs = preparedStatement.executeQuery();} catch (Exception e) {throw new RuntimeException(e);}return rs;}}

测试一下

测试库中有个tb_user表

image-20231210174346826

创建与表映射的实体类

@Data
public class UserPO {private Long id;private String name;
}
class MysqlUtilTest {@DisplayName("测试使用 Flink Table/SQL Api 读取Mysql")@Testpublic void testReadWithTableOrSQLApi() throws Exception {// 初始化环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 设置并行度1env.setParallelism(1);// 获取参数工具实例ParameterTool parameterTool = ParameterUtil.getParameters();/* ************************ CREATE 语句用于向当前或指定的 Catalog 中注册表。* 注册后的表、视图和函数可以在 SQL 查询中使用** *********************/// 表名String tableName = "tb_user";// 表字段ddlString ddlFieldString ="id BIGINT,\n" +"name STRING \n";// 查询表的全部字段String sql = "SELECT * FROM " + tableName;DataStream<UserPO> rowDataStream =MysqlUtil.readWithTableOrSQLApi(env,parameterTool,UserPO.class,tableName,ddlFieldString,sql);rowDataStream.print("mysql");env.execute();}
}

image-20231210174720832

查询成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/212917.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu上安装 Git

在 Ubuntu 上安装 Git 可以通过包管理器 apt 进行。以下是在 Ubuntu 上安装 Git 的步骤&#xff1a; 打开终端。你可以按 Ctrl Alt T 组合键来打开终端。 运行以下命令以确保你的系统的软件包列表是最新的&#xff1a; sudo apt update 安装 Git&#xff1a; sudo apt inst…

RT-DERT改进策略:AKConv即插即用,轻松涨点

摘要 提出了一种算法&#xff0c;用于生成任意尺寸卷积核的初始采样坐标。与常规卷积核相比&#xff0c;提出的AKConv实现了不规则卷积核的函数来提取特征&#xff0c;为各种变化目标提供具有任意采样形状和尺寸的卷积核&#xff0c;弥补了常规卷积的不足。在COCO2017和VisDro…

Anaconda文件目录(打开默认路径)更改

Anaconda 文件默认目录更改 每次打开 Anaconda 都在C盘怎么办&#xff0c;如何改为D盘或是其他盘符位置&#xff1f; 可以进行下述操作。 1. 单次修改路径 单次修改路径&#xff1a;在 exe 文件(Anaconda Prompt (Anaconda_py))中写入下面代码&#xff1a; jupyter notebook …

STM32 标准外设SPL库、硬件抽象层HAL库、低层LL库区别?

1、STM32 之一 HAL库、标准外设库、LL库_ZCShou的博客-CSDN博客_ll库&#xff08;仔细阅读&#xff09; 2、STM32标准外设库、 HAL库、LL库 - King先生 - 博客园 3、STM32 之 HAL库_戈 扬的博客&#xff08;仔细阅读&#xff09; 4、STM32 LL 为什么比 HAL 高效&#xff1…

MAC下加载动态库

MAC引用动态库时报错&#xff1a; 查看一个可执行文件或者动态库引用的第三方库路径&#xff1a;otool -L xxx.dylib 第一行是动态库的安装名称&#xff08;INSTALL Name&#xff09;。当另一个客户端链接到这个 dylib 时&#xff0c;dylib 的安装 ID 会被复制到客户端中作为…

selenium库的使用

来都来了给我点个赞收藏一下再走呗&#x1f339;&#x1f339;&#x1f339;&#x1f339;&#x1f339; 目录 一、下载需要用到的python库selenium 二、selenium的基本使用 1.在python代码引入库 2.打开浏览器 3.元素定位 1&#xff09;通过id定位 2&#xff09;通过标…

一文掌握Ascend C孪生调试

1 What&#xff0c;什么是孪生调试 Ascend C提供孪生调试方法&#xff0c;即CPU域模拟NPU域的行为&#xff0c;相同的算子代码可以在CPU域调试精度&#xff0c;NPU域调试性能。孪生调试的整体方案如下&#xff1a;开发者通过调用Ascend C类库编写Ascend C算子kernel侧源码&am…

云计算大屏,可视化云计算分析平台(云实时数据大屏PSD源文件)

大屏组件可以让UI设计师的工作更加便捷&#xff0c;使其更高效快速的完成设计任务。现分享可视化云分析系统、可视化云计算分析平台、云实时数据大屏的大屏Photoshop源文件&#xff0c;开箱即用&#xff01; 若需 更多行业 相关的大屏&#xff0c;请移步小7的另一篇文章&#…

mapstruct个人学习记录

mapstruct核心技术学习 简介入门案例maven依赖 IDEA插件单一对象转换测试结果 mapping属性Spring注入的方式测试 集合的映射set类型的映射测试map类型的映射测试 MapMappingkeyDateFormatvalueDateFormat 枚举映射基础入门 简介 在工作中&#xff0c;我们经常要进行各种对象之…

【rabbitMQ】rabbitMQ用户,虚拟机地址(添加,修改,删除操作)

rabbitMQ的下载&#xff0c;安装和配置 https://blog.csdn.net/m0_67930426/article/details/134892759?spm1001.2014.3001.5502 rabbitMQ控制台模拟收发消息 https://blog.csdn.net/m0_67930426/article/details/134904365?spm1001.2014.3001.5502 目录 用户 添加用户…

MyBatis 四大核心组件之 StatementHandler 源码解析

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

CPU设计——Triumphcore——MP_work版本

该版本用作系统寄存器的实现&#xff0c;M/S/U状态的实现与切换&#xff0c;以及load/store的虚实地址转换 设计指标 2023.12.8 2023.12.9 不实现mideleg和medeleg&#xff0c;因此一旦出现异常&#xff0c;直接切换至M态&#xff0c; 调试记录 到存储区中取PTE要额外至少…

airserver mac 7.27官方破解版2024最新安装激活图文教程

airserver mac 7.27官方破解版是一款好用的airplay投屏工具&#xff0c;可以轻松将ios荧幕镜像&#xff08;airplay&#xff09;至mac上&#xff0c;在mac平台上实现视频、音频、幻灯片等文件资源的接收及投放演示操作&#xff0c;解决iphone或ipad的屏幕录像问题&#xff0c;满…

Linux C/C++ 从内存转储中恢复64位ELF可执行文件

ELF&#xff08;Executable and Linking Format&#xff09;是一种对象文件的格式&#xff0c;它主要用于定义ELF&#xff08;Executable and Linking Format&#xff09;是一种对象文件的格式&#xff0c;它主要用于定义不同类型的对象文件中的内容以及它们的存储方式。一个EL…

作业调度算法(含详细计算过程)和进程调度算法浅析

一.作业调度 作业调度算法需要知道以下公式 周转时间完成时间 - 到达时间 带权周转时间周转时间/运行时间 注&#xff1a;带权周转时间越大&#xff0c;作业&#xff08;或进程&#xff09;越短&#xff1b;带权周转时间越小&#xff0c;作业&#xff08;或进程&#xff09;越…

Redis生产实战-Redis集群故障探测以及降级方案设计

Redis 集群故障探测 在生产环境中&#xff0c;如果 Redis 集群崩溃了&#xff0c;那么会导致大量的请求打到数据库中&#xff0c;会导致整个系统都崩溃&#xff0c;所以系统需要可以识别缓存故障&#xff0c;限流保护数据库&#xff0c;并且启动接口的降级机制 降级方案设计 …

超过 50% 的内部攻击使用特权提升漏洞

特权提升漏洞是企业内部人员在网络上进行未经授权的活动时最常见的漏洞&#xff0c;无论是出于恶意目的还是以危险的方式下载有风险的工具。 Crowdstrike 根据 2021 年 1 月至 2023 年 4 月期间收集的数据发布的一份报告显示&#xff0c;内部威胁正在上升&#xff0c;而利用权…

基于SSM的剧本杀预约系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

【第三届】:“玄铁杯”RISC-V应用创新大赛(基于yolov5和OpenCv算法 — 智能警戒哨兵)

文章目录 前言 一、智能警戒哨兵是什么&#xff1f; 二、方案流程图 三、硬件方案 四、软件方案 五、演示视频链接 总结 前言 最近参加了第三届“玄铁杯”RISC-V应用创新大赛&#xff0c;我的创意题目是基于 yolov5和OpenCv算法 — 智能警戒哨兵 先介绍一下比赛&#xf…

docker容器配置MySQL与远程连接设置(纯步骤)

以下为ubuntu20.04环境&#xff0c;默认已安装docker&#xff0c;没安装的网上随便找个教程就好了 拉去mysql镜像 docker pull mysql这样是默认拉取最新的版本latest 这样是指定版本拉取 docker pull mysql:5.7查看已安装的mysql镜像 docker images通过镜像生成容器 docke…