Flink实现数据写入MySQL

 先准备一个文件里面数据有:

a, 1547718199, 1000000
b, 1547718200, 1000000
c, 1547718201, 1000000
d, 1547718202, 1000000
e, 1547718203, 1000000
f, 1547718204, 1000000
g, 1547718205, 1000000
h, 1547718210, 1000000
i, 1547718210, 1000000
j, 1547718210, 1000000

 scala代码:

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._case class SensorReading(name: String, timestamp: Long, salary: Double)object a1 {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//数据源val dataStream: DataStream[String] = env.readTextFile("D:\\wlf.备份24.1.3\\wlf\\ideaProgram\\bbbbbb\\src\\main\\resources\\salary.txt")val stream = dataStream.map(data => {val splited = data.split(",")SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)})stream.addSink( new JDBCSink() )env.execute("  job")}}class JDBCSink() extends RichSinkFunction[SensorReading]{// 定义sql连接、预编译器var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// 初始化,创建连接和预编译语句override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://bigdata1:3306/flink?serverTimezone=UTC", "root", "123456")insertStmt = conn.prepareStatement("INSERT INTO salary_table (name, salary) VALUES (?,?)")updateStmt = conn.prepareStatement("UPDATE salary_table SET salary = ? WHERE name = ?")}override def invoke(value: SensorReading): Unit = {// 执行更新语句updateStmt.setString(1, value.name)updateStmt.setDouble(2, value.salary)updateStmt.execute()// 如果update没有查到数据,那么执行插入语句if( updateStmt.getUpdateCount == 0 ){insertStmt.setString(1, value.name)insertStmt.setDouble(2, value.salary)insertStmt.execute()}}// 关闭时做清理工作override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()}
}

MySQL中查看表 :

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/649925.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【QT】文件目录操作

目录 1 文件目录操作相关的类 2 实例概述 2.1 实例功能 2.2 信号发射者信息的获取 3 QCoreApplication类 4 QFile类 5 QFilelnfo类 6 QDir类 7 QTemporaryDir和QTemporaryFiIe 8 QFiIeSystemWatcher类 文件的读写是很多应用程序具有的功能,甚至某些应用程序就是围绕…

内存管理(mmu)/内存分配原理/多级页表

1.为什么要做内存管理? 随着进程对内存需求的扩大,和同时调度的进程增加,内存是比较瓶颈的资源,如何更好的高效的利于存储资源是一个重要问题。 这个内存管理的需求也是慢慢发展而来,早期总线上的master是直接使用物…

Oracle篇—分区索引的重建和管理(第三篇,总共五篇)

☘️博主介绍☘️: ✨又是一天没白过,我是奈斯,DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux,也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章,并且也会默默的点赞收藏加关注❣…

ES的一些名称和概念总结

概念 先看看ElasticSearch的整体架构: 一个 ES Index 在集群模式下,有多个 Node (节点)组成。每个节点就是 ES 的Instance (实例)。每个节点上会有多个 shard (分片), P1 P2 是主分片, R1 R2…

达梦数据库——记录一次离谱的登录失败报错

好久没更新了哇 前面有整理过一些常见的数据库登录失败问题哈,今天记录一个遇到概率比较小,但碰上了一般不太容易找到原因的登录失败问题。 今天给客户同时初始化了三台服务器数据库,惟独这一台死活登不进去,满脑子问号&#xf…

【论文解读】Object Goal Navigation usingGoal-Oriented Semantic Exploration

论文:https://devendrachaplot.github.io/papers/semantic-exploration.pdf 代码:https://github.com/devendrachaplot/Object-Goal-Navigation 项目: Object Goal Navigation using Goal-Oriented Semantic Exploration example&#xff1…

2、鼠标事件、键盘事件、浏览器事件、监听事件、冒泡事件、默认事件、属性操作

一、鼠标事件 1、单击事件&#xff1a;onclick <body><header id"head">我是头部标签</header> </body> <script> var head document.getElementById("head")head.onclick function () {console.log("我是鼠标单击…

金蝶云星空--写插件不重启IIS热更新简单配置指南

云星空7.5版本&#xff0c;以简单方式配置并测试了热更新的实现方式可行&#xff0c;操作如下&#xff08;7.5外版本没试过&#xff0c;大家可试下&#xff09;&#xff1a; 1、打开WebSite\App_Data\Common.config&#xff0c;修改appSettings&#xff0c;设置IsEnablePlugIn…

go slice 扩容实现

基于 Go 1.19。 go 的切片我们都知道可以自动地进行扩容&#xff0c;具体来说就是在切片的容量容纳不下新的元素的时候&#xff0c; 底层会帮我们为切片的底层数组分配更大的内存空间&#xff0c;然后把旧的切片的底层数组指针指向新的内存中&#xff1a; 目前网上一些关于扩容…

redis源码之:clion搭建cluster环境

cluster集群通常每个node节点都是一主N从的模式&#xff0c;此处为简化环境搭建&#xff0c;所有node节点均只有一个主节点。 在clion环境中&#xff0c;为方便debug&#xff0c;需要通过配置多个cmake application实现redis-server、redis-cli等源码debug模式启动。 一、配置…

费曼学习法 - 理工科的学习利器

费曼学习法是以物理学家理查德费曼&#xff08;Richard Feynman&#xff09;命名的一种高效的学习方法。它旨在帮助你深入理解复杂概念&#xff0c;并能够用简单的语言解释它们。费曼学习法是一个学习框架&#xff0c;能够帮你对给定主题进行深入理解&#xff0c;包含以下4个简…

HTML-表单

表单 概念&#xff1a;一个包含交互的区域&#xff0c;用于收集用户提供的数据。 1.基本结构 示例代码&#xff1a; <form action"https://www.baidu.com/s" target"_blank" method"get"><input type"text" name"wd&q…

完成NAT实验

实验要求&#xff1a; 步骤一&#xff1a;配置vlan vlan b 2 3 interface GigabitEthernet 0/0/2 port link-type access port default vlan 2 interface GigabitEthernet 0/0/3 port link-type access port default vlan 3 interface GigabitEthernet 0/0/1 port link-type…

【PyTorch】使用PyTorch创建卷积神经网络并在CIFAR-10数据集上进行分类

前言 在深度学习的世界中&#xff0c;图像分类任务是一个经典的问题&#xff0c;它涉及到识别给定图像中的对象类别。CIFAR-10数据集是一个常用的基准数据集&#xff0c;包含了10个类别的60000张32x32彩色图像。在本博客中&#xff0c;我们将探讨如何使用PyTorch框架创建一个简…

C#,打印漂亮杨辉三角形(帕斯卡三角形)的源代码

杨辉 Blaise Pascal 这是某些程序员看完会哭的代码。 杨辉三角形&#xff08;Yanghui Triangle&#xff09;&#xff0c;是一种序列数值的三角形几何排列&#xff0c;最早出现于南宋数学家杨辉1261年所著的《详解九章算法》一书。 欧洲学者&#xff0c;最先由帕斯卡&#x…

Windows打开IE浏览器命令最简单的方法

问题场景&#xff1a; 许多插件或特定版本的系统需要使用ie浏览器来访问&#xff0c;window默认的ie浏览器是被禁用的如何快速打开ie浏览器解决问题 目录 问题场景&#xff1a; 测试环境&#xff1a; 检查环境是否支持&#xff1a; 问题解决&#xff1a; 方法一 方法二 方法…

03 SB实战 -微头条之首页门户模块(跳转某页面自动展示所有信息+根据hid查询文章全文并用乐观锁修改阅读量)

1.1 自动展示所有信息 需求描述: 进入新闻首页portal/findAllType, 自动返回所有栏目名称和id 接口描述 url地址&#xff1a;portal/findAllTypes 请求方式&#xff1a;get 请求参数&#xff1a;无 响应数据&#xff1a; 成功 {"code":"200","mes…

hex 尽然可以 设置透明度,透明度参数对比图 已解决

还不知道CSS Color Module Level 4标准早在2014年就推出8位hex和4位hex来支持设置alpha值&#xff0c;以实现hex和rgba的互转。这个办法可比6位HEX转RGBA简洁多了&#xff0c;先来简单解释一下&#xff1a; 8位hex是在6位hex基础上加后两位来表示alpha值&#xff0c;00表示完全…

Hadoop-MapReduce-MRAppMaster启动篇

一、源码下载 下面是hadoop官方源码下载地址&#xff0c;我下载的是hadoop-3.2.4&#xff0c;那就一起来看下吧 Index of /dist/hadoop/core 二、上下文 在上一篇<Hadoop-MapReduce-源码跟读-客户端篇>中已经将到&#xff1a;作业提交到ResourceManager&#xff0c;那…

数据结构——树的合集

目录 文章目录 前言 一.树的表达方式 1.树的概念 2.树的结点 3.树的存储结构 01.双亲表示法 顺序表示形式 优缺点说明 02.孩子表示法 03.孩子兄弟表示法 04.非类存储代码演示 二.二叉树 1.树的特点 2.二叉树 01.定义 02.二叉树的性质 03.满二叉树 04.完全二叉树…