自定义Flink SourceFunction定时读取数据库

文章目录

  • 前言
  • 一、自定义Flink SourceFunction定时读取数据库
  • 二、java代码实现
  • 总结


前言

Source 是Flink获取数据输入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现继承 RichSourceFunction 类编写自定义的 sources。Flink提供了多种预定义的 stream source:基于文件、 套接字、集合等source;但没用提供数据库相关的Source。

一、自定义Flink SourceFunction定时读取数据库

有些场景需要定时的读取不断变化的数据库数据作为流数据。本文中的代码实现适用于所有关系数据库。

  • 在构造方法中传递数据库连接参数、定时周期等信息
  • run:在run中定时读取数据库数据并emit到发送到下一节点。
  • cancel: 取消一个 source,running状态改为false将 run 中的循环 emit 元素的行为终止。

二、java代码实现

/*** 关系库流数据源 **/
public class DbSourceFunction extends RichSourceFunction<Row> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(DbSourceFunction.class);private volatile boolean isRunning = true;private String driver = null;//执行周期(秒)private Long period = null;private JSONObject conf;private DataBaseType baseType;public DbFullSourceFunction(JSONObject conf, DataBaseType baseType) {this.conf = conf;this.baseType = baseType;this.driver = baseType.getDriverClassName();// 执行周期period = conf.getLong("period");//周期单位String unit = conf.getString("executionWay", "seconds");if (period != null && period > 0) {//根据时间单位转换为秒period = FuntionUtil.getSeconds(unit, period);}}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void run(SourceContext<Row> ctx) throws Exception {while (isRunning) {String querySql = conf.getString(Key.QUERY_SQL);List<JSONObject> columnList = conf.getList(Key.COLUMN);int len = columnList.size();Connection connect = null;PreparedStatement ps = null;ResultSet rs = null;try {while (connect == null) {try {connect = getConnection();if (connect != null) {break;}} catch (Exception w) {LOG.error("获取连接异常", w.getMessage());}}ps = connect.prepareStatement(querySql);try {rs = ps.executeQuery();while (rs.next()) {Row row = new Row(len);for (int i = 0; i < len; i++) {JSONObject column = columnList.get(i);Integer columnType = column.getInt(Key.COLUMN_TYPE);//将ResultSet数据转换为Flink RowRowSetFieldUtil.rowSetFieldResultSet(row, rs, i, columnType, baseType);}// 发送结果ctx.collect(row);}} catch (Exception e) {LOG.error("查询出现异常",e);if (ps != null) {ps.close();}if (connect != null) {connect.close();}}} catch (Exception e) {LOG.error("查询数据异常", e);throw e;} finally {if (rs != null) {rs.close();}if (ps != null) {ps.close();}if (connect != null) {connect.close();}}if (period == null || period <= 0) {isRunning = false;} else {Long takeTime = (end - start) / 1000;//去掉执行消耗时间LOG.error("sleep time:" + (period - takeTime));TimeUnit.SECONDS.sleep(period - takeTime);}}}@Overridepublic void cancel() {isRunning = false;}private Connection getConnection() {Connection connection = null;try {String username = conf.getString(Key.USERNAME);String password = conf.getString(Key.PASSWORD);password = PubFunction.decryptStr(password);String jdbcUrl = conf.getString(String.format("%s[0]", Key.JDBC_URL));// 创建连接connection = DriverManager.getConnection(jdbcUrl, username, password);} catch (Exception e) {LOG.error("get connection occur exception", e);throw new RuntimeException("get connection occur exception", e);}return connection;}
}

总结

完整代码请点击下载自定义Flink SourceFunction定时读取数据库java代码下载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/616442.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

openssl3.2 - 官方demo学习 - 索引贴

文章目录 openssl3.2 - 官方demo学习 - 索引贴概述笔记工程的搭建和调试环境BIOBIO - client-arg.cBIO - client-conf.cBIO - saccept.cBIO - sconnect.cBIO - server-arg.cBIO - server-cmod.cBIO - server-conf.cBIO - 总结certsciphercipher - aesccm.cEND openssl3.2 - 官方…

使用Java连接MongoDB (6.0.12) 报错

报错&#xff1a; Exception in thread "main" com.mongodb.MongoCommandException: Command failed with error 352: Unsupported OP_QUERY command: create. 上图中“The client driver may require an upgrade”说明了“客户端驱动需要进行升级”&#xff0c;解…

数据分析-Pandas如何转换产生新列

数据分析-Pandas如何转换产生新列 时间序列数据在数据分析建模中很常见&#xff0c;例如天气预报&#xff0c;空气状态监测&#xff0c;股票交易等金融场景。此处选择巴黎、伦敦欧洲城市空气质量监测 N O 2 NO_2 NO2​数据作为样例。 python数据分析-数据表读写到pandas 经典…

What does `rpm -ivh` do?

rpm -ivh 安装 并 显示安装进度 (–install–verbose–hash) rpm -ivh /media/cdrom/RedHat/RPMS/samba-3.0.10-1.4E.i386.rpm 安装rpm -ivh --relocate //opt/gaim gaim-1.3.0-1.fc4.i386.rpm 指定安装到 /opt/gaim[Ref] rpm -uvh和-ivh有什么区别以及zabbix 安…

android前台服务:

android前台服务&#xff1a; android-安卓如何开启前台服务&#xff1f;foregroundService的使用方法&#xff0c;什么是前台服务&#xff1f;_foregroundservicetype-CSDN博客

使用BeanShell写入内容到文件【JMeter】

一、前言 ​ 在我们日常工作中&#xff0c;可能会遇到需要将请求返回的数据写入到文件中。在我们使用JMeter进行性能测试时&#xff0c;就经常能够遇到这种情况。要想达到这种目的&#xff0c;我们一般采取BeanShell后置处理器来将内容写入到文件。 二、提取 ​ 在目前大多数的…

基于多智能体点对点转换的分布式模型预测控制

matlab2020正常运行 基于多智能体点对点转换的分布式模型预测控制资源-CSDN文库

Spring MVC 日期转换器

日期转换器 自定义日期转换器 public class DataConvert implements Converter<String, Date> {/**** 配置时间转换类* param date* return*/Overridepublic Date convert(String date) {try {SimpleDateFormat sdf new SimpleDateFormat("yyyy-MM-dd");ret…

对于软件测试的认识和了解

对软件测试的认识&#xff1a; 软件测试要求开发人员避免测试自己开发的程序。从心理学角度讲&#xff0c;这是很有道理的。特别是一个相对复杂的系统&#xff0c;开发人员在刚刚开发完成的时候&#xff0c;尚沉浸于对自己设计的回味之中。此时去测试的话往往会侧重于程序本身的…

CSS3简单运用过渡元素(transition)

CSS3过渡 概念&#xff1a;在CSS3中&#xff0c;我们可以使用transition属性将元素的某一个属性从“一个属性值”在指定的时间内平滑地过渡到“另一个属性值”&#xff0c;从而实现动画效果。 CSS3变形&#xff08;transform)呈现的仅仅是一个结果&#xff0c;而CSS过渡&…

WPS - 表格虚线变成实线解决方案(Office 同上)

1、选中表格区域&#xff0c;在表格中选中需要调整为实线的表格区域 2、点击设置单元格格式&#xff0c;鼠标进行右击并点击设置单元格格式选项 3、选择实线&#xff0c;在单元格格式下的边框&#xff0c;调整到实线 4、设置为实线&#xff0c;即可将表格的虚线设置为实线

AI系统ChatGPT网站系统源码AI绘画详细搭建部署教程,支持GPT语音对话+DALL-E3文生图+GPT-4多模态模型识图理解

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

【AI视野·今日NLP 自然语言处理论文速览 第七十四期】Wed, 10 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Wed, 10 Jan 2024 Totally 38 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers Model Editing Can Hurt General Abilities of Large Language Models Authors Jia Chen Gu, Hao Xiang Xu, J…

Qt QGraphicsItem获取鼠标位置对应图像坐标

本次使用了QGraphicsView来加载图像&#xff0c;然后给其设置了一个QGraphicsScene场景&#xff0c;再给场景添加了一个自定义的QGraphicsItem&#xff0c;在其中重写了paint事件&#xff0c;用来重绘图像。 正常情况时&#xff0c;QGraphicsItem上图像的有效区域QRect大小和QG…

基于爬虫和Kettle的豆瓣电影的采集与预处理

一&#xff1a;爬虫 1、爬取的目标 将豆瓣电影网上的电影的基本信息&#xff0c;比如&#xff1a;电影名称、导演、电影类型、国家、上映年份、评分、评论人数爬取出来&#xff0c;并将爬取的结果放入csv文件中&#xff0c;方便存储。 2、网站结构 图1豆瓣网网站结构详…

Polars使用指南(二)

在上一篇文章中&#xff0c;我们介绍了Polars的优势和Polars.Series的常用API&#xff0c;本篇文章我们继续介绍Polars.Series的扩展API。 对于一些特殊的数据类型&#xff0c;如 pl.Array、list、str 等&#xff0c;Polars.Series 提供了基于属性的直接操作API&#xff0c;如…

Web前端 ---- 【Vue3】Proxy响应式原理

目录 前言 安装Vue3项目 安装 Proxy 语法格式 前言 从本文开始进入vue3的学习。本文介绍vue3中的响应式原理&#xff0c;相较于vue2中通过object.defineProperty&#xff08;vue2中的响应式&#xff09;来实现响应式&#xff0c;vue3中换成了Proxy来进行实现。 安装Vue3项目…

Linux---gcc编译

目录 前言 一、gcc编译 二、程序的编译过程 三、gcc查看编译过程 1.预处理阶段 2.编译 3.汇编 4.链接 动静态库链接的内容 动静态库链接的优缺点 5.总结记忆 前言 在前面我们学会使用vim对文件进行编辑&#xff0c;如果是C或者C程序&#xff0c;我们编辑好了内容…

C++多态与虚函数的使用注意

文章目录 什么情况下用多态构造和析构的顺序为什么要把析构函数声明为虚函数为什么不能在构造函数和析构函数中使用虚函数什么情况下用多态 多态是面向对象编程中的一个重要概念,可以提高代码的可扩展性和可维护性。在以下情况下,可以考虑使用多态: 当有一个基类或接口,并…

监督学习 - 决策树(Decision Trees)

什么是机器学习 决策树&#xff08;Decision Trees&#xff09;是一种基于树形结构进行决策的模型&#xff0c;广泛应用于分类和回归任务。它通过对数据集进行递归划分&#xff0c;构建一棵树&#xff0c;每个节点代表一个特征&#xff0c;每个分支代表一个决策规则&#xff0…