flink部署使用(flink-connector-jdbc)连接达梦数据库并写入读取数据

flink介绍

1)Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

2)在实时计算或离线任务中,往往需要与关系型数据库交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地将流式数据写入或读取数据库。

3)flink版本下载:https://archive.apache.org/dist/flink/

flink单机搭建

## 1. 下载并解压flink
[root@localhost flink_soft]# mkdir /data/flink_soft
[root@localhost flink_soft]# tar -zxvf flink-1.16.1-bin-scala_2.12.tgz
## 2. 修改配置文件,把下面的三行全部去掉
[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# vim /data/flink_soft/flink-1.16.1/conf/flink-conf.yaml
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
## 3. 启动flink
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh
## 4. 查询进程是否存在
[root@localhost flink-1.16.1]# ps aux | grep flink
## 5. 访问http://192.168.112.162:8081/ 即可。

将已经适配dameng的jar包放到lib目录下

1)下载已经适配好的包https://github.com/gaoyuan98/flink-connector-jdbc-dameng/releases

提供了两个版本的dameng适配驱动包,一个是实现JdbcFactory接口,还有一个是实现JdbcDialectFactory接口。

2)截止发文v3.3版本官方还未正式发版,所以大概率是用这个版本:flink-connector-jdbc-dameng_20250331_(适用于v3.2及以下版本)

3)将下载好的适配包放到flink的lib目录下

DmJdbcDriver8.jar 达梦数据库jdbc驱动,可以更换为与数据库版本相同的驱动。

flink-connector-jdbc-3.1.jar flink使用jdbc方式连接数据库时的桥接包,如果项目本身已经有flink-connector-jdbc包可忽略该包。

flink-connector-jdbc-dameng-1.0.jar flink使用jdbc方式连接达梦数据库的适配包,源码基于flink-connector-jdbc.jar包进行调整,所以该包必须存在。

如项目中已经有flink-connector-jdbc的包,那么只需要使用DmJdbcDriver8.jar跟flink-connector-jdbc-dameng-1.0.jar的驱动包即可。

如项目中没有flink-connector-jdbc的包,就把这三个包全部放到lib下。

[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/lib
[root@localhost lib]# ll
total 204020
-rw-r--r--. 1 root root   1615303 Jan 17 00:30 DmJdbcDriver8.jar
-rwxrwxrwx. 1 root root    198857 Jan 19  2023 flink-cep-1.16.1.jar
-rwxrwxrwx. 1 root root    516144 Jan 19  2023 flink-connector-files-1.16.1.jar
-rw-r--r--. 1 root root    277945 Mar 28 23:46 flink-connector-jdbc-3.1-SNAPSHOT.jar
-rw-r--r--. 1 root root     13458 Mar 29 00:13 flink-connector-jdbc-dameng-1.0-SNAPSHOT.jar
-rwxrwxrwx. 1 root root    102470 Jan 19  2023 flink-csv-1.16.1.jar
-rwxrwxrwx. 1 root root 117107159 Jan 19  2023 flink-dist-1.16.1.jar
-rwxrwxrwx. 1 root root    180248 Jan 19  2023 flink-json-1.16.1.jar
-rwxrwxrwx. 1 root root  21052640 Jan 19  2023 flink-scala_2.12-1.16.1.jar
-rwxrwxrwx. 1 root root  10737871 Jan 13  2023 flink-shaded-zookeeper-3.5.9.jar
-rwxrwxrwx. 1 root root  15367504 Jan 19  2023 flink-table-api-java-uber-1.16.1.jar
-rwxrwxrwx. 1 root root  36249667 Jan 19  2023 flink-table-planner-loader-1.16.1.jar
-rwxrwxrwx. 1 root root   3133690 Jan 19  2023 flink-table-runtime-1.16.1.jar
-rwxrwxrwx. 1 root root    208006 Jan 13  2023 log4j-1.2-api-2.17.1.jar
-rwxrwxrwx. 1 root root    301872 Jan 13  2023 log4j-api-2.17.1.jar
-rwxrwxrwx. 1 root root   1790452 Jan 13  2023 log4j-core-2.17.1.jar
-rwxrwxrwx. 1 root root     24279 Jan 13  2023 log4j-slf4j-impl-2.17.1.jar

重启flink

[root@localhost flink-1.16.1]# cd /data/flink_soft/flink-1.16.1
[root@localhost flink-1.16.1]# ./bin/stop-cluster.sh
[root@localhost flink-1.16.1]# ./bin/start-cluster.sh## 如果报错的话查看这个日志
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log

flink驱动验证

在达梦数据库上创建表数据

CREATE TABLE source_table (id INT PRIMARY KEY,name VARCHAR(50),age INT
);
INSERT INTO source_table (id, name, age) VALUES (1, 'Alice', 30);
INSERT INTO source_table (id, name, age) VALUES (2, 'Bob', 25);
INSERT INTO source_table (id, name, age) VALUES (3, 'Charlie', 40);
COMMIT;

在 Flink SQL CLI 中定义达梦表

[root@localhost lib]# cd /data/flink_soft/flink-1.16.1/
[root@localhost flink-1.16.1]#  ./bin/sql-client.sh embeddedCREATE TABLE source (id INT,name STRING,age INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:dm://192.168.127.2:5236/SYSDBA','table-name' = 'source_table','driver' = 'dm.jdbc.driver.DmDriver','username' = 'SYSDBA','password' = 'SYSDBA123'
);## 在 Flink SQL CLI 中查询数据
SELECT * FROM source;
## 筛选数据,比如 查询年龄大于 30 的用户:
SELECT id, name FROM source WHERE age > 30;
## 插入数据
INSERT INTO source (id, name, age) VALUES (3, '33', 33);

CREATE TABLE source1 (id INT,name STRING,age INT
) WITH ('connector' = 'dameng','url' = 'jdbc:dm://81.70.105.201:5236/SYSDBA','table-name' = 'source_table','driver' = 'dm.jdbc.driver.DmDriver','username' = 'SYSDBA','password' = '123456'
);
SELECT * FROM source1;

flink-jdbc-dameng选错会怎么?

目前flink-connector-jdbc中,v3.0 - v3.2 都是同一个实现思路,也就是只需要集成实现JdbcDialectFactory接口的方法即可,main分支的话是实现JdbcFactory接口函数,也就是需要适配两个版本。

因使用的是v3.3的dameng包,但flink-connector-jdbc是v3.2及以下版本,驱动包接口实现不对所以会报这个错。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/77267.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用swift playground写个ios应用和大模型或者网站交互

import SwiftUIstruct ContentView: View {State private var textFieldText: String ""State private var outputText: String "输出将会显示在这里"private let tip:String "消息已发送,请等待"State private var history:[Stri…

springboot+vue2集成JWT token实现权限验证

前端项目搭建参考: Vue项目的搭建和启动_vue项目启动 csdn-CSDN博客 Vue ElementUI 登录页面_vue用户登录页面-CSDN博客 跨域问题前端解决-CSDN博客 实现思路: 1. 实现的目的:为了保护网站安全信息,使用jwt进行权限验证&#xf…

Cursor编程-从入门到精通__0409

早期的Github Copilot 最近更新了,支持Agent编程,字节跳动Trae使用(免费),但成熟程度不如Cursor,Cursor前50次免费 Copilot VS Cursor*** 1,Cursor VSCode 二次开发,IDE级别 2&…

MyBatis 详解及代码示例

MyBatis 是一个 半自动 ORM 框架,主要用于 Java 与数据库之间的持久化操作,它本质是对 JDBC 的封装 全名:MyBatis(前身 iBATIS)核心作用:自动将 SQL 执行结果映射为 Java 对象;也可以将 Java 对…

1.6-抓包技术(Burp Suite\Yakit抓包\Web、APP、小程序)

1.6-抓包技术(Burp Suite\Yakit抓包\Web、APP、小程序) 如果要使用抓包软件,基本上第一步都是要安装证书的。原因如下: 客户端(浏览器或应用)会检测到证书不受信任,并弹出 证书错误&#xff0…

Java 大视界 -- 基于 Java 的大数据隐私保护在金融客户信息管理中的实践与挑战(178)

💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…

第十届 蓝桥杯 嵌入式 省赛

一、分析 这届的真题,有点像第七届的液位检测。 这届的题目开始,貌似比赛描述的功能,逻辑上变得更好梳理了。一开始就把大致的功能给你说明一遍,不像之前都是一块一块的说明。 1. 基本功能 1)测量竞赛板上电位器 R…

实现usb的MTP功能

前言:最终结果根据用户自主选择可实现host和device功能的切换。 效果展示: 当插入usb时设备会弹窗 当用户选择设备模式时pc端就会出现mtp设备盘符 实现mtp设备 ubuntu架构根文件系统通过uMTP-Responder实现usb的MTP功能 添加服务 /home/flynn/firfly_rootfs/lib/system…

React-05React中props属性(传递数据),propTypes校验,类式与函数式组件props的使用

1.类式组件props基本数据读取与解构运算符传递 <script type"text/babel">// 创建组件class PersonalInfo extends React.Component {render() {// 读取props属性 并读取值console.log(props,this.props);return(<ul><li>姓名&#xff1a;{this.p…

PCI认证 密钥注入 ECC算法工具 NID_secp521r1 国密算法 openssl 全套证书生成,从证书提取公私钥数组 x,y等

步骤 1.全套证书已经生成。OK 2.找国芯要ECC加密解密签名验签代码。给的逻辑说明没有示例代码很难的上。 3.集成到工具 与SP联调。 1.用openssl全套证书生成及验证 注意&#xff1a;这里CA 签发 KLD 证书用的是SHA256。因为芯片只支持SHA256算法,不支持SHA512。改成统一。…

蓝桥杯每日刷题c++

目录 P9240 [蓝桥杯 2023 省 B] 冶炼金属 - 洛谷 (luogu.com.cn) P8748 [蓝桥杯 2021 省 B] 时间显示 - 洛谷 (luogu.com.cn) P10900 [蓝桥杯 2024 省 C] 数字诗意 - 洛谷 (luogu.com.cn) P10424 [蓝桥杯 2024 省 B] 好数 - 洛谷 (luogu.com.cn) P8754 [蓝桥杯 2021 省 AB2…

oracle 数据库字段类型为NUMBER(5,2)时,并且数据库值为0.1,为什么Java执行SQL查出来时为“.1“?

在 Oracle 数据库中&#xff0c;当字段类型为 NUMBER(5,2) 且存储的值为 0.1 时&#xff0c;Java 程序查询结果可能显示为 ".1"&#xff08;省略前导零&#xff09;&#xff0c;这是由 Oracle JDBC 驱动默认的数字格式化行为 导致的。以下是原因分析和解决方案&#…

3月AI论文精选十篇

1. Feature-Level Insights into Artificial Text Detection with Sparse Autoencoders[1] 核心贡献&#xff1a;通过稀疏自编码器揭示AI生成文本的检测特征&#xff0c;提出基于特征分布的鉴别方法。研究发现&#xff0c;AI文本在稀疏编码空间中呈现独特的"高频低幅"…

STM32在裸机(无RTOS)环境下,需要手动实现队列机制来替代FreeRTOS的CAN发送接收函数

xQueueSendToBackFromISR(ecuCanRxQueue, hcan->pRxMsg, &xHigherPriorityTaskWoken)&#xff0c;xQueueReceive(mscCanRxQueue,&mscRxMsg,0)和xQueueSendToBack(mscCanTxQueue, &TxMessageTemp, 0 )这3个函数&#xff0c;在裸机下实现&#xff1a; 在裸机&…

使用PX4,gazebo,mavros为旋翼添加下视的相机(仿真采集openrealm数据集-第一步)

目录 一.方法一&#xff08;没成功&#xff09; 1.运行PX4 2.运行mavros通讯 3.启动仿真世界和无人机 &#xff08;1&#xff09;单独测试相机 &#xff08;2&#xff09;make px4_sitl gazebo启动四旋翼iris无人机 二.方法二&#xff08;成功&#xff09; 1.通过 rosl…

7、nRF52xx蓝牙学习(nrf_gpiote.c库函数学习)

续前一篇文章。 3、nrfx_gpiote_in_event_enable void nrfx_gpiote_in_event_enable(nrfx_gpiote_pin_t pin, bool int_enable) {NRFX_ASSERT(nrf_gpio_pin_present_check(pin));NRFX_ASSERT(pin_in_use_by_gpiote(pin));if (pin_in_use_by_port(pin)){nrf_gpiote_polarity_t…

Java 实现插入排序:[通俗易懂的排序算法系列之三]

引言 大家好!欢迎继续关注我的排序算法系列。今天,我们要学习的是另一种非常基础且重要的排序算法——插入排序 (Insertion Sort)。 插入排序的思路非常贴近我们日常整理扑克牌的方式,理解起来相对自然。虽然它在最坏情况下的效率不高,但在某些特定场景下,它的表现甚至优…

Java的spring boot项目编译成功启动报错

问题现象&#xff1a;spring boot项目&#xff0c;候删除一些无用代码后&#xff0c;build成功&#xff0c;启动时报错&#xff1a;找不到java.util.Map或者其他对象&#xff08;用Lombok注解Data&#xff09;中的字段属性找不到等错误。解答&#xff1a; 常见是Lombok版本问题…

PyTorch参数管理详解:从访问到初始化与共享

本文通过实例代码讲解如何在PyTorch中管理神经网络参数&#xff0c;包括参数访问、多种初始化方法、自定义初始化以及参数绑定技术。所有代码可直接运行&#xff0c;适合深度学习初学者进阶学习。 1. 定义网络与参数访问 1.1 定义单隐藏层多层感知机 import torch from torch…

基于springboot+vue的课程管理系统

一、系统架构 前端&#xff1a;vue | element-ui 后端&#xff1a;springboot | mybatis-plus 环境&#xff1a;jdk1.8 | mysql8 | maven | node v16.20.2 | idea 二、代码及数据 三、功能介绍 01. 登录 02. 管理员-首页 03. 管理员-系管理 04. 管理员-专业管理 05. 管…