Spark之 使用SparkSql操作mysql和DataFrame的Scala实现

通过读取文件转换成DataFrame数据写入到mysql中

package com.zy.sparksqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}/*** 通过读取文件转换成DataFrame数据写入到mysql中*/
object SparkSqlToMysql {def main(args: Array[String]): Unit = {//创建sparkSessionval sparkSession: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local").getOrCreate()//读取数据val sc: SparkContext = sparkSession.sparkContextval fileRDD: RDD[String] = sc.textFile("D:\\person.txt")//切分val lineRDD: RDD[Array[String]] = fileRDD.map(_.split(","))//关联  通过StructType指定schema将rdd转换成DataFrameval rowRDD: RDD[Row] = lineRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))val schema = (new StructType).add("id", IntegerType, true).add("name", StringType, true).add("age", IntegerType, true)//根据rdd和schema创建DataFrameval personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)//将df注册成表personDF.createOrReplaceTempView("person")//操作表val resultDF: DataFrame = sparkSession.sql("select * from person order by age desc")//将数据存到mysql中//创建properties对象 设置连接mysql的信息val prop: Properties = new Properties()prop.setProperty("user", "root")prop.setProperty("password", "root")/** mode方法可以指定数据插入模式* overwrite:覆盖,覆盖表中已经存在的数据,如果表不存在它会事先帮你创建* append:追加,向表中追加数据,如果表不存在它会事先帮你创建* ignore:忽略,表示如果表事先存在,就不进行任何操作* error :如果表存在就报错,它是默认选项*/resultDF.write.mode("error").jdbc("jdbc:mysql://192.168.44.31:3306/spark", "person", prop)sparkSession.stop()}
}

 

从mysql中读取数据到DataFrame中

package com.zy.sparksqlimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}/*** 从mysql中读取数据到DataFrame中*/
object DataFromMysql {def main(args: Array[String]): Unit = {//创建sparkSessionval sparkSession: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local").getOrCreate()//创建properties对象 设置连接mysql的信息val prop: Properties = new Properties()prop.setProperty("user", "root")prop.setProperty("password", "root")//读取mysql数据val mysqlDF: DataFrame = sparkSession.read.jdbc("jdbc:mysql://192.168.44.31:3306/spark", "person", prop)mysqlDF.show()sparkSession.stop()}
}

 

转载于:https://www.cnblogs.com/blazeZzz/p/9851154.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/453456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

web服务器之iis,apache,tomcat三者之间的比较

IIS-Apache-Tomcat的区别 IIS与Tomcat的区别 IIS是微软公司的Web服务器。主要支持ASP语言环境. Tomcat是Java Servlet 2.2和JavaServer Pages 1.1技术的标准实现,是基于Apache许可证下开发的SJP语言环境容器,严格得说不能算是一个WEB服务器,而是Apache服务适配器。 …

iOS CAGradientLayer颜色渐变

Gradient:本身就是梯度的意思,所以在这里就是作为渐变色来理解 CAGradientLayer用于处理渐变色的层结构CAGradientLayer的渐变色可以做隐式动画大部分情况下,CAGradientLayer时和CAShapeLayer配合使用,CAShapeLayer这里就不介绍了CAGradientL…

编程要养成的好习惯

1.- DRY: Don’t repeat yourself. DRY 是一个最简单的法则,也是最容易被理解的。但它也可能是最难被应用的(因为要做到这样,我们需要在泛型设计上做相当的努力,这并不是一件容易的事)。它意味着,当我们在…

flink整合java,Flink使用SideOutPut替换Split实现分流

基于apache flink的流处理实时模型44元包邮(需用券)去购买 >以前的数据分析项目(版本1.4.2),对从Kafka读取的原始数据流,调用split接口实现分流.新项目决定使用Flink 1.7.2,使用split接口进行分流的时候,发现接口被标记为depra…

虚机中访问外网;NAT中的POSTROUTING是怎么搞的?

看下docker中是怎么配置的网络 在虚机中访问外网:设定了qemu,在主机上添加路由:sudo iptables -t nat -I POSTROUTING -s 192.168.1.110 -j SNAT --to-source 192.168.0.108 设置了这句话就可以访问外网了。 设置了两个虚拟机: ta…

Fragment结合ViewPager之懒加载

什么是懒加载?为什么要用懒加载?### 1、什么是懒加载 懒加载就是当ViewPager和Fragment结合在一起使用时,Fragment呈现在用户面前时才加载数据,当其从未被呈现在用户面前时,不会执行加载数据的代码。这就是我所理解的懒…

WCF和webservice的区别

微软论坛的斑竹回答如下: 脑内:果然是高大上啊 1.WebService:严格来说是行业标准,不是技术,使用XML扩展标记语言来表示数据(这个是夸语言和平台的关键)。微 软的Web服务实现称为ASP.NET Web Ser…

职场不得不明白的十大定律

帕金森定律 美国著名历史学家诺斯古德•帕金森通过长期调查研究,写了一本名叫《帕金森定律》的书,他在书中阐述了机构人员膨胀的原因及后果:一个不称职的官员,可能有三条出路。第一是申请退职,把位子让给能干的人&am…

php控制器教程,laravel基础教程 -- 控制器

HTTP 控制器简介控制器允许你将相应的路由业务逻辑封装在控制器类中进行有效的管理,这样你不必将所有的路由逻辑集中到routes.php文件中,导致代码的臃肿与难以维护。所有的控制器类都被存储在app/Http/Controllers目录中.基本控制器一个基本的控制器应该…

org.apache.jasper.JasperException: Unable to compile class for JSP:

报错信息: org.apache.jasper.JasperException: Unable to compile class for JSP: An error occurred at line: 1 in the generated java file The type java.io.ObjectInputStream cannot be resolved. It is indirectly referenced from required .class filesSt…

i++和++i

关于自增自减运算,很多书籍没有把问题讲清楚,在C语言里是这样的: 1.后置运算:k表示先运算,后自加。 意思是遇到k了,我先把当前的k的值拿来参加运算,后面再去管它的自加。 那么,“后面”后到什么…

什么样的项目经历会让面试官眼前一亮

很多同学都问过我类似的问题: 咱们《C语言也能干大事》中讲的自己动手写windows优化大师、自己动手写计算器等东西只是写着玩的小玩具而已,这些能用来以后找工作时写到简历中的作品吗?看别人的简历写的“图书管理系统”、“教务选课系统”多有…

matlab采样频谱,Matlab对采样数据进行频谱分析

使用Matlab对采样数据进行频谱分析1、采样数据导入Matlab采样数据的导入至少有三种方法。第一就是手动将数据整理成Matlab支持的格式,这种方法仅适用于数据量比较小的采样。第二种方法是使用Matlab的可视化交互操作,具体操作步骤为:File --&g…

链表和顺序表的一些区别

顺序表与链表是非常基本的数据结构,它们可以被统称为线性表。 线性表(Linear List)是由 n(n≥0)个数据元素(结点)a[0],a[1],a[2]…,a[n-1] 组成的有限序列。…

ANCS推送简介

总体原理 ANCS通过蓝牙BLE 4.0实现,仅支持iPhone 4S及以上且系统版本在IOS 7以上的手机,同时在外设端需要支持蓝牙4.0协议。 1、外设端进行广播,手机打开蓝牙,搜索外设,连接外设,之后进行绑定(这…

好记性不如烂笔头,记录几个常用的Linux操作

作者:老王Shell公共函数库Linux系统里有一些公共的Shell函数库可供使用,最重要的是/etc/rc.d/init.d/functions,在/etc/init.d目录下有很多脚本都用到了这个函数库,里面提供了很多有用的方法,比如:killproc…

用matlab简单电路模型,基于MATLAB的电路模型仿真应用

基于MATLAB的电路模型仿真应用实验指导书一、实验目的1、掌握采用M文件及SIMULINK对电路进行仿真的方法。2、熟悉POWERSYSTEM BLOCKSET 模块集的调用、设置方法。3.进一步熟悉M脚本文件编写的方法和技巧。二、实验原理1、通过M文件实现电路仿真的一般仿真步骤为&…

春节期间小游戏同时在线人数最高达2800万人/小时

微信官方发布2018年春节期间微信数据报告:除夕至初五,总共有2,297亿条微信消息,28亿条微信朋友圈成功发出,音视频通话总时长175亿乙分钟。其中,90后用广的消息发送量占总量的42.5%,80后用户25.9%&#xff0…

C语言中* 和

&x是对x变量取地址,也就是返回的是x的地址。 int *i;这里面的*说明变量i是一个指针,存的是一个地址。 而 *i整体代表的是一个数值,例如可以int *i 5 这里整体的*i代表的是5,而i代表的是这个值存储的地址

餐馆的故事-浅析职责链模式

我们在餐馆吃饭的时候,一般都是在拿到菜单后,选择喜欢的菜,然后通知服务员。服务员会将我们的定单交给大厨,大厨可能会亲自去做这道菜,也可能安排给小厨来做,总之,我们不用担心他们没有人做菜&a…