Spark读写MySQL数据库

Spark读写MySQL数据库

文章目录

    • Spark读写MySQL数据库
      • 一、读取数据库
        • (一)通过RDD的方式读取MySQL数据库
        • (二)通过DataFrame的方式读取MySQL数据库
      • 二、添加数据到MySQL
        • (一)通过RDD的方式插入数据到MySQL
        • (二)通过RDD的方式插入数据到MySQL 2
        • (三)使用DataFrame插入数据到MySQL

一、读取数据库

(一)通过RDD的方式读取MySQL数据库

四要素:驱动、连接地址、账号密码

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/*** 使用RDD读取MySQL数据库*/
object spark_read_mysql {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_read_mysql") //程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext//驱动名称val driver = "com.mysql.cj.jdbc.Driver"//连接信息val url = "jdbc:mysql://192.168.80.145:3306/test"//用户名val username = "root"//密码val password = "123456"//具体的SQL查询语句val sql = "select * from t_user where id>=? and id<=?"//查询val rsRDD = new JdbcRDD(sc,()=>{//加载驱动Class.forName(driver)//创建和MySQL数据库的连接DriverManager.getConnection(url,username,password)},//需要执行的SQL语句sql,//查询的开始行1,//查询的结束行20,//运行几个分区执行2,//返回值的处理(将返回值变为RDD的元素),数字从1开始,表示字段的编号rs => (rs.getInt(1),rs.getString(2),rs.getInt(3)))//将RDD的元素打印在终端rsRDD.collect().foreach(println)sc.stop()}
}
(二)通过DataFrame的方式读取MySQL数据库
import org.apache.spark.sql.SparkSession/*** 使用DataFrame读取MySQL数据库*/
object spark_read_mysql2 {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]")//指定运行的方式.appName("spark_read_mysql2")//程序的名字.getOrCreate()//创建DataFrameval jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.80.145:3306/test")//指定连接.option("driver","com.mysql.cj.jdbc.Driver")//指定驱动.option("user","root")//指定连接的用户.option("password","123456")//指定连接的用户的密码.option("dbtable","t_user")//查询的表.load()//加载数据库表//在终端显示DataFrame的内容jdbcDF.show()}
}

二、添加数据到MySQL

(一)通过RDD的方式插入数据到MySQL

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/*** 使用RDD插入数据到MySQL,RDD的每个元素都会执行一次创建连接和关闭连接*/
object spark_write_mysql {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_write_mysql") //程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext//驱动名称val driver = "com.mysql.cj.jdbc.Driver"//连接信息//?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"//用户名val username = "root"//密码val password = "123456"//创建RDDval rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每条元素,将元素插入MySQL;一个元素执行一次创建连接和插入和关闭连接rdd.foreach {case (name,age) =>{//加载驱动Class.forName(driver)//创建和MySQL的链接val conn = DriverManager.getConnection(url,username,password)//添加的SQL语句val sql = "insert into t_user(name,age) values(?,?)"//给SQL语句配置参数val ps = conn.prepareStatement(sql)//根据参数的类型配置参数ps.setString(1,name)ps.setInt(2,age)//执行SQL语句ps.executeUpdate()//关闭连接ps.close()conn.close()}}sc.stop()}
}
(二)通过RDD的方式插入数据到MySQL 2

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSessionimport java.sql.DriverManager/*** 使用RDD插入数据到MySQL,RDD的每个分区执行一次创建连接和关闭连接;推荐*/
object spark_write_mysql2 {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_write_mysql2") //程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext//驱动名称val driver = "com.mysql.cj.jdbc.Driver"//连接信息//?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"//用户名val username = "root"//密码val password = "123456"//创建RDDval rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每个分区,将元素插入MySQL;一个分区执行一次创建连接和关闭连接rdd.foreachPartition {datas =>{//加载驱动Class.forName(driver)//创建和MySQL的链接val conn = DriverManager.getConnection(url,username,password)//添加的SQL语句val sql = "insert into t_user(name,age) values(?,?)"//给SQL语句配置参数val ps = conn.prepareStatement(sql)//根据参数的类型配置参数datas.foreach{case (name,age)=>{ps.setString(1,name)ps.setInt(2,age)//执行SQL语句ps.executeUpdate()}}//关闭连接ps.close()conn.close()}}sc.stop()}
}
(三)使用DataFrame插入数据到MySQL
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}/*** 使用DataFrame插入数据到MySQL*/object spark_write_mysql3 {def main(args: Array[String]): Unit = {//创建SparkSession,作用:连接Sparkval spark = SparkSession.builder().master("local[*]") //指定运行的方式.appName("spark_write_mysql3") //程序的名字.getOrCreate()//1.创建DataFrame//1.1 schemaval schema = StructType(List(StructField("name", StringType,true),StructField("age",IntegerType,true)))//1.2 行rows//1.2.1 创建RDDval dataRDD = spark.sparkContext.parallelize(Array(Array("李四",20),Array("王五",20)))//1.2.2 创建rowsval rows = dataRDD.map(x=>Row(x(0),x(1)))//1.3 拼接表头(schema)和行内容(rows)val df = spark.createDataFrame(rows,schema)//2.通过DataFrame插入数据到MySQL//如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表,需要注意表不能存在//df.write.mode("append"),是以追加的方式将数据写入到已经存在的表中df.write.format("jdbc").option("url", "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8") //指定连接.option("driver", "com.mysql.cj.jdbc.Driver") //指定驱动.option("user", "root") //指定连接的用户.option("password", "123456") //指定连接的用户的密码.option("dbtable", "t_user2") //查询的表.save()//保存数据}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/219729.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

初级数据结构(三)——栈

文中代码源文件已上传&#xff1a;数据结构源码 <-上一篇 初级数据结构&#xff08;二&#xff09;——链表 | 初级数据结构&#xff08;四&#xff09;——队列 下一篇-> 1、栈的特性 1.1、函数栈帧简述 即使是刚入门几天的小白&#xff0c;对栈这个字…

基于YOLOv8深度学习的吸烟/抽烟行为检测系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

基于SSM实现的精品课程网站

一、系统架构 前端&#xff1a;jsp | js | css | jquery | bootstrap 后端&#xff1a;spring | springmvc | mybatis 环境&#xff1a;jdk1.7 | mysql | maven | tomcat 二、代码及数据库 三、功能介绍 01. 登录页 02. web端-首页 03. web端-视频教程 04. web端-资料…

RK3568全国产化多网口板卡带poe供电,支持鸿蒙麒麟系统

信迈XM-3568-01主板采用瑞芯微RK3568四核Cortex-A55 处理器&#xff0c;主频最高可达2.0GHz&#xff0c;效能有大幅提升最高可配8GB内存容量&#xff0c;频率高达1600MHz&#xff1b;支持全链路ECC&#xff0c;让数据更安全可靠配置双千兆自适应RJ45以太网口&#xff0c;并扩展…

Unity | Shader基础知识(第三集:案例<对材质颜色进行干预>)

一、本节介绍 上一集&#xff0c;我们学到&#xff0c;shader的语法格式&#xff0c;这一集&#xff0c;我们要学习第二简单的shader案例&#xff0c;对shader颜色的简单干预。 二、理论介绍 1.获取位置信息 Unity | Shader基础知识&#xff08;什么是shader&#xff09;_u…

stm32---串口使用

### 串口数据发送 #include <string.h> //先引用这个字符串操作库。char str[]" HALLO WORD "&#xff1b; //定义这个数组字符串。HAL_UART_Transmit(&huart2, str, strlen(str), 100); //&huart2,这里他是一个指针&#xff0c;所以要用取地址符…

使用python的socketserver使服务器支持多客户端访问

1 背景 近期在做机器人集群的分布式控制&#xff0c;涉及到了机器人之间的交流工作&#xff0c;其中&#xff0c;每一台机器人都需要与多个机器人进行交流。 考虑使用python的socket来做&#xff0c;但简单测试了一下&#xff0c;socket模块不能方便的实现一个服务器与多客户…

C_9练习题答案

一、单项选择题(本大题共20小题,每小题2分,共40分。在每小题给出的四个备选项中,选出一个正确的答案,并将所选项前的字母填写在答题纸的相应位置上。) C语言程序中,要使用数学库函数(例sqrt、sin等),需要在程序最前面加上包含文件的预处理命令&#xff08;C)。 A. #include &l…

java内部类的使用

什么是内部类&#xff1a;在一个类的内部定义的类称为内部类 为什么用内部类&#xff1a;完成某些功能只在某个具体的外部类中使用&#xff0c;其他地方不需要再使用&#xff0c;这是可以将此功能定义在一个内部类中&#xff0c;而不需要单独创建一个类。 用内部类的好处在内…

在WPF窗口中增加水印效果

** 原理&#xff1a; ** 以Canvas作为水印显示载体&#xff0c;在Canvas中创建若干个TextBlock控件用来显示水印文案&#xff0c;如下图所示 然后以每一个TextBlock的左上角为中心旋转-30&#xff0c;最终效果会是如图红线所示&#xff1a; 为了达到第一行旋转后刚好与窗口…

日常记录软件操作

webstorm下载集成码云步骤 1&#xff1a;file>>>settings>>plugins &#xff0c;在marketplace&#xff0c;下载gitee&#xff1b; 2&#xff1a;重启编辑器 3&#xff1a;file>>>settings>>>version control >>> gitee ,添加对应用…

App防止恶意截屏功能的方法:iOS、Android和鸿蒙系统的实现方案

防止应用被截图是一个比较常见的需求&#xff0c;主要是出于安全考虑。下面将分别为iOS&#xff08;苹果系统&#xff09;、Android&#xff08;安卓系统&#xff09;及HarmonyOS&#xff08;鸿蒙系统&#xff09;提供防止截屏的方法和示例代码。 在企业内部使用的应用中&…

物联网架构之CDH

1、常规初始化操作 三个节点都需要执行 hostnamectl set-hostname cdhmaster yum -y install vim lrzsz net-tools cat >>/etc/hosts<<EOF 192.168.180.210 cdhmaster 192.168.180.200 cdhslave01 192.168.180.190 cdhslave02 EOF systemctl stop firewalld.s…

深入解析Freemarker模板引擎及其在Spring Boot中的高级整合

目录 引言1. Freemarker1.1.什么是Freemarker1.2 Freemarker模板组成部分1.3.优点 2. Spring Boot整合Freemarker2.1 配置2.2 数据类型 3. 案例总结 引言 Freemarker作为一款强大的模板引擎&#xff0c;与Spring Boot的整合能够极大地提升Web应用的开发效率和灵活性。本篇博客…

Java面试题(每天10题)-------连载(47)

目录 Mybatis篇 1、#{}和${}的区别 2、通常一个Xml映射文件&#xff0c;都会写一个Dao接口与之对应&#xff0c;那么这个Dao接口的工作原理是什么&#xff1f;Dao接口中的方法&#xff0c;参数不同时&#xff0c;方法能重载吗&#xff1f; 3、Mybatis是如何让进行分页的&am…

探索 Vim:一个强大的文本编辑器

引言&#xff1a; Vim&#xff08;Vi IMproved&#xff09;是一款备受推崇的文本编辑器&#xff0c;拥有强大的功能和高度可定制性&#xff0c;提供丰富的编辑和编程体验。本文将探讨 Vim 的基本概念、使用技巧以及为用户带来的独特优势。 简介和发展 1. Vim 的简介和历史 V…

axios全局封装取消请求,你可以创建一个 Axios 实例,并为该实例配置默认的 CancelToken

import axios from axios; // Axios 的 CancelToken const CancelToken axios.CancelToken; // 创建一个 Axios 实例 const instance axios.create(); // 用于存储所有的 cancel 函数 const pendingRequests new Set(); // 添加请求配置 instance.interceptors.…

发布jar包到maven中央仓库

1. 环境 在网上找的很多文章中写得都有很多问题&#xff0c;这里记录一下最近一次成功地发布jar包到maven中央仓库的过程。并附带上每一个步骤官方的指导链接。 系统&#xff1a;mac&#xff08;windows系统在下载辅助工具时不太一样&#xff0c;在配置上和mac系统没有区别&…

docker部署go gin框架 Linux环境

目录 文章目的是什么 环境介绍 Linux 环境下 docker 部署 go gin 详细步骤 部署 gin 文章目的是什么 假设我们学习了 go 语言&#xff0c;在 Linux 上安装了 go 相关的程序&#xff0c;也能直接运行&#xff0c;使用以下命令&#xff1a; go run main.go 假如代码是这样的…

算法中的最优化方法课程复习

算法中的最优化方法课程复习 单模函数、拟凸函数、凸函数证明证明一个线性函数与一个凸函数的和也是凸的 梯度线性规划标准形式以及如何标准化标准形式常见标准化方法线性化技巧 单纯形法二次规划无约束优化Nelder-Mead线搜索FR共轭梯度法例题 优化算法的选择、停止准则算法选择…