Spark---创建DataFrame的方式

1、读取json格式的文件创建DataFrame

注意:

1、可以两种方式读取json格式的文件。

2、df.show()默认显示前20行数据。

3、DataFrame原生API可以操作DataFrame。

4、注册成临时表时,表中的列默认按ascii顺序显示列。

df.createTempView("mytable")
df.createOrReplaceTempView("mytable")
df.createGlobalTempView("mytable")
df.createOrReplaceGlobalTempView("mytable")
Session.sql("select * from global_temp.mytable").show()

5、DataFrame是一个Row类型的RDD,df.rdd()/df.javaRdd()。

java

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonfile");
SparkContext sc = new SparkContext(conf);//创建sqlContext
SQLContext sqlContext = new SQLContext(sc);/*** DataFrame的底层是一个一个的RDD  RDD的泛型是Row类型。* 以下两种方式都可以读取json格式的文件*/
DataFrame df = sqlContext.read().format("json").load("sparksql/json");
// DataFrame df2 = sqlContext.read().json("sparksql/json.txt");
// df2.show();/*** DataFrame转换成RDD*/
RDD<Row> rdd = df.rdd();
/*** 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)* 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。*/
// df.show();
/*** 树形的形式显示schema信息*/
df.printSchema();
/*** dataFram自带的API 操作DataFrame*///select name from table// df.select("name").show();//select name age+10 as addage from tabledf.select(df.col("name"),df.col("age").plus(10).alias("addage")).show();//select name ,age from table where age>19df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();//select count(*) from table group by agedf.groupBy(df.col("age")).count().show();/*** 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘*/df.registerTempTable("jtable");DataFrame sql = sqlContext.sql("select age,count(1) from jtable group by age");DataFrame sql2 = sqlContext.sql("select * from jtable");sc.stop();

scala:

1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
2.// val frame: DataFrame = session.read.json("./data/json")
3.val frame = session.read.format("json").load("./data/json")
4.frame.show(100)
5.frame.printSchema()
6.
7./**
8.* DataFrame API 操作
9.*/
10.//select name ,age from table
11.frame.select("name","age").show(100)
12.
13.//select name,age + 10 as addage from table
14.frame.select(frame.col("name"),frame.col("age").plus(10).as("addage")).show(100)
15.
16.//select name,age from table where age >= 19
17.frame.select("name","age").where(frame.col("age").>=(19)).show(100)
18.frame.filter("age>=19").show(100)
19.
20.//select name ,age from table order by name asc ,age desc
21.import session.implicits._
22.frame.sort($"name".asc,frame.col("age").desc).show(100)
23.
24.//select name ,age from table where age is not null
25.frame.filter("age is not null").show()
26.
27./**
28.* 创建临时表
29.*/
30.frame.createTempView("mytable")
31.session.sql("select name ,age from mytable where age >= 19").show()
32.frame.createOrReplaceTempView("mytable")
33.frame.createGlobalTempView("mytable")
34.frame.createOrReplaceGlobalTempView("mytable")
35.
36./**
37.* dataFrame 转换成RDD
38.*/
39.val rdd: RDD[Row] = frame.rdd
40.rdd.foreach(row=>{
41.  val name = row.getAs[String]("name")
42.  val age = row.getAs[Long]("age")
43.  println(s"name is $name ,age is $age")
44.})

2、通过json格式的RDD创建DataFrame

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonRDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList("{\"name\":\"zhangsan\",\"age\":\"18\"}","{\"name\":\"lisi\",\"age\":\"19\"}","{\"name\":\"wangwu\",\"age\":\"20\"}"
));
JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));DataFrame namedf = sqlContext.read().json(nameRDD);
DataFrame scoredf = sqlContext.read().json(scoreRDD);
namedf.registerTempTable("name");
scoredf.registerTempTable("score");DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name");
result.show();sc.stop();

scala:

1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
2.val jsonList = List[String](
3.  "{'name':'zhangsan','age':'18'}",
4.  "{'name':'lisi','age':'19'}",
5.  "{'name':'wangwu','age':'20'}",
6.  "{'name':'maliu','age':'21'}",
7.  "{'name':'tainqi','age':'22'}"
8.)
9.
10.import session.implicits._
11.val jsds: Dataset[String] = jsonList.toDS()
12.val df = session.read.json(jsds)
13.df.show()
14.
15./**
16.* Spark 1.6
17.*/
18.val jsRDD: RDD[String] = session.sparkContext.parallelize(jsonList)
19.val frame: DataFrame = session.read.json(jsRDD)
20.frame.show()

3、非json格式的RDD创建DataFrame

1)、通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)

  • 自定义类要可序列化
  • 自定义类的访问级别是Public
  • RDD转成DataFrame后会根据映射将字段按Assci码排序
  • 将DataFrame转换成RDD时获取字段两种方式,一种是df.getInt(0)下标获取(不推荐使用),另一种是df.getAs(“列名”)获取(推荐使用)
/**
* 注意:
* 1.自定义类必须是可序列化的
* 2.自定义类访问级别必须是Public
* 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序
*/
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("RDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt");
JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Person call(String s) throws Exception {Person p = new Person();p.setId(s.split(",")[0]);p.setName(s.split(",")[1]);p.setAge(Integer.valueOf(s.split(",")[2]));return p;}
});
/**
* 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
* 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
df.show();
df.registerTempTable("person");
sqlContext.sql("select  name from person where id = 2").show();/**
* 将DataFrame转成JavaRDD
* 注意:
* 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用
* 2.可以使用row.getAs("列名")来获取对应的列值。
* 
*/
JavaRDD<Row> javaRDD = df.javaRDD();
JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Person call(Row row) throws Exception {Person p = new Person();//p.setId(row.getString(1));//p.setName(row.getString(2));//p.setAge(row.getInt(0));p.setId((String)row.getAs("id"));p.setName((String)row.getAs("name"));p.setAge((Integer)row.getAs("age"));return p;}
});
map.foreach(new VoidFunction<Person>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic void call(Person t) throws Exception {System.out.println(t);}
});sc.stop();

scala:

1.case class MyPerson(id:Int,name:String,age:Int,score:Double)
2.
3.object Test {
4.  def main(args: Array[String]): Unit = {
5.    val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
6.    val peopleInfo: RDD[String] = session.sparkContext.textFile("./data/people.txt")
7.    val personRDD : RDD[MyPerson] = peopleInfo.map(info =>{
8.MyPerson(info.split(",")(0).toInt,info.split(",")(1),info.split(",")(2).toInt,info.split(",")(3).toDouble)
9.    })
10.    import session.implicits._
11.    val ds = personRDD.toDS()
12.    ds.createTempView("mytable")
13.    session.sql("select * from mytable ").show()
14.  }
15.}

2)、动态创建Schema将非json格式的RDD转换成DataFrame

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("rddStruct");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt");
/*** 转换成Row类型的RDD*/
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {return RowFactory.create(String.valueOf(s.split(",")[0]),String.valueOf(s.split(",")[1]),Integer.valueOf(s.split(",")[2]));}
});
/*** 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库*/
List<StructField> asList =Arrays.asList(DataTypes.createStructField("id", DataTypes.StringType, true),DataTypes.createStructField("name", DataTypes.StringType, true),DataTypes.createStructField("age", DataTypes.IntegerType, true)
);StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);df.show();
sc.stop();

scala:

1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
2.val peopleInfo: RDD[String] = session.sparkContext.textFile("./data/people.txt")
3.
4.val rowRDD: RDD[Row] = peopleInfo.map(info => {
5.  val id = info.split(",")(0).toInt
6.  val name = info.split(",")(1)
7.  val age = info.split(",")(2).toInt
8.  val score = info.split(",")(3).toDouble
9.  Row(id, name, age, score)
10.})
11.val structType: StructType = StructType(Array[StructField](
12.  StructField("id", IntegerType),
13.  StructField("name", StringType),
14.  StructField("age", IntegerType),
15.  StructField("score", DoubleType)
16.))
17.val frame: DataFrame = session.createDataFrame(rowRDD,structType)
18.frame.createTempView("mytable")
19.session.sql("select * from mytable ").show()

4、读取parquet文件创建DataFrame

注意:

  • 可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种
df.write().mode(SaveMode.Overwrite)format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
  • SaveMode指定文件保存时的模式。

Overwrite:覆盖

Append:追加

ErrorIfExists:如果存在就报错

Ignore:如果存在就忽略

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("parquet");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> jsonRDD = sc.textFile("sparksql/json");
DataFrame df = sqlContext.read().json(jsonRDD);
/*** 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式* 保存成parquet文件有以下两种方式:*/
df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
df.show();
/*** 加载parquet文件成DataFrame	* 加载parquet文件有以下两种方式:	*/DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
load = sqlContext.read().parquet("./sparksql/parquet");
load.show();sc.stop();

scala:

1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
2.val frame: DataFrame = session.read.json("./data/json")
3.frame.show()
4.frame.write.mode(SaveMode.Overwrite).parquet("./data/parquet")
5.
6.val df: DataFrame = session.read.format("parquet").load("./data/parquet")
7.df.createTempView("mytable")
8.session.sql("select count(*) from mytable ").show()

5、读取JDBC中的数据创建DataFrame(MySql为例)

两种方式创建DataFrame

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("mysql");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
/*** 第一种方式读取MySql数据库表,加载为DataFrame*/
Map<String, String> options = new HashMap<String,String>();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "person");
DataFrame person = sqlContext.read().format("jdbc").options(options).load();
person.show();
person.registerTempTable("person");
/*** 第二种方式读取MySql数据表加载为DataFrame*/
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "123456");
reader.option("dbtable", "score");
DataFrame score = reader.load();
score.show();
score.registerTempTable("score");DataFrame result = 
sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");
result.show();
/*** 将DataFrame结果保存到Mysql中*/
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);sc.stop();

scala:

1.val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
2.
3.val prop = new Properties()
4.prop.setProperty("user","root")
5.prop.setProperty("password","123456")
6./**
7.* 第一种方式
8.*/
9.val df1 = session.read.jdbc("jdbc:mysql://192.168.179.14:3306/spark","person",prop)
10.df1.show()
11.df1.createTempView("person")
12.
13./**
14.* 第二种方式
15.*/
16.val map = Map[String,String](
17. "url" -> "jdbc:mysql://192.168.179.14:3306/spark",
18. "driver " -> "com.mysql.jdbc.Driver",
19. "user" -> "root",
20. "password" -> "123456",
21. "dbtable" -> "score"
22.)
23.val df2 = session.read.format("jdbc").options(map).load()
24.df2.show()
25.
26./**
27.* 第三种方式
28.*/
29.val df3 = session.read.format("jdbc")
30. .option("url", "jdbc:mysql://192.168.179.14:3306/spark")
31. .option("driver", "com.mysql.jdbc.Driver")
32. .option("user", "root")
33. .option("password", "123456")
34. .option("dbtable", "score")
35. .load()
36.df3.show()
37.df3.createTempView("score")
38.
39.val result = session.sql("select person.id,person.name,person.age,score.score from person ,score where person.id = score.id")
40.
41.result.show()
42.//将结果保存到mysql中
43.result.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.14:3306/spark","result",prop)
44.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/183909.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《HelloGitHub》第 92 期

兴趣是最好的老师&#xff0c;HelloGitHub 让你对编程感兴趣&#xff01; 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 https://github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等&#xff0c;涵盖多种编程语言 …

enote笔记法之附录1——“语法词”(即“关联词”)(ver0.24)

enote笔记法之附录1——“语法词”&#xff08;即“关联词”&#xff09;&#xff08;ver0.24&#xff09; 最上面的是截屏的完整版&#xff0c;分割线下面的是纯文字版本&#xff1a; 作者姓名&#xff08;本人的真实姓名&#xff09;&#xff1a;胡佳吉 居住地&#xff1…

定向网络攻击主要风险检查表

一、互联网入口攻击 二、内部网络横向攻击 三、集权类系统风险和要求 软件开发全资料获取&#xff1a;点我获取 定向网络攻击主要风险检查表分类测评项风险描述检查项检查结果整改建议互联网入口攻击应用网站安全漏洞应用系统和网站存在高风险安全漏洞&#xff0c;可能直接被…

推荐一款好用的BMP转PNG工具BMP2PNG

推荐一款好用的BMP转PNG工具BMP2PNG 自己写的一个BMP转PNG工具BMP2PNG 写这个工具是因为要使用传奇的部分素材在COCOS2DX使用&#xff0c; 但是COCOS2DX不支持BMP 如果直接将BMP转换到PNG的话&#xff0c;网上找到的工具都不支持透明色转换。难道要用PS一个一个抠图吗&#xf…

k8s-daemonset、job、cronjob控制器 6

Daemonset控制器&#xff08;一个节点部署一个&#xff09; 、 创建Daemonset控制器 控制节点上不能进行部署&#xff0c;有污点 解决方式&#xff1a; 扩容节点&#xff0c;token值过期的解决方法&#xff1a; 回收pod job控制器 需要使用perl镜像&#xff0c;仓库没有&…

Java 中的 Switch 是如何支持 String 的?为什么不支持 long ?

引言 在Java中&#xff0c;switch语句是一种用于根据表达式的值选择执行不同代码块的流程控制语句。最初&#xff0c;switch语句仅支持基本数据类型&#xff0c;如int、char等&#xff0c;但自从Java 7版本开始&#xff0c;它还开始支持String类型。这个改变为开发者提供了更多…

OSCP系列靶场-Esay-1

总结 getwebshell : ftp可匿名登录 → 发现隐藏文件夹 → 发现ssh密钥 → 猜解ssh用户名 → ssh密钥登录 提 权 思 路 : 发现suid权限文件 → cpulimit提权 准备工作 启动VPN 获取攻击机IP → 192.168.45.191 启动靶机 获取目标机器IP → 192.168.179.130 信息收集-端口扫…

论文阅读——SEEM

arxiv: 分割模型向比较灵活的分割的趋势的转变&#xff1a;封闭到开放&#xff0c;通用到特定、one-shot到交互式。From closed-set to open-vocabulary segmentation&#xff0c;From generic to referring segmentation&#xff0c;From one-shot to interactive segmentati…

Java结合源码-字符串

1、简介 字符串是一系列字符串的序列。在Java语言中字符串是用一对对双引号“”&#xff0c;括起来的字符系列。例如“Hello”&#xff0c;“你好”。从数组的角度来说&#xff0c;字符串可以是看成是一个个字符组成的数组。 2、字符串分类 程序中用到的字符串可以分成两大类…

深入解析CPU工作原理与细节

计算机是现代社会中不可或缺的工具&#xff0c;而CPU&#xff08;中央处理器&#xff09;则是计算机的核心组件。CPU负责执行指令和控制计算机的各种操作&#xff0c;它的性能直接影响着计算机的速度和效率。 1. CPU的基本结构 CPU通常由以下几个主要组成部分构成&#xff1a…

IDEA中springboot 提示 java: 找不到符号 符号: 变量 log

在以下位置加上该配置"-Djps.track.ap.dependenciesfalse" 然后重新启动项目&#xff0c;到此问题解决&#xff01;&#xff01;&#xff01;

SpringCloud原理】OpenFeign之FeignClient动态代理生成原理

大家好&#xff0c;前面我已经剖析了OpenFeign的动态代理生成原理和Ribbon的运行原理&#xff0c;这篇文章来继续剖析SpringCloud组件原理&#xff0c;来看一看OpenFeign是如何基于Ribbon来实现负载均衡的&#xff0c;两组件是如何协同工作的。 一、Feign动态代理调用实现rpc流…

Vue3 中el-tree-select使用中遇到的一些问题

<el-tree-selectv-model"userFormParams.deptId":data"deptTree.data"placeholder"请选择"filterableautocomplete"off"aria-autocomplete"none":render-after-expand"false"></el-tree-select> 1、…

指纹芯片的工作原理及应用领域详解

指纹芯片是一种利用指纹识别技术的电子设备,可以通过扫描人体指纹的纹理特征,将其转化为数字化信息并进行存储和识别。指纹芯片广泛应用于各个领域,包括智能手机、银行和金融、门禁系统、身份验证等,因其高度准确、快速便捷的特点,得到了广大用户的青睐。 指纹芯片的原理是基于…

【MYSQL】表的基本查询

目录 前言 一、Create&#xff08;增&#xff09; 1.单行数据 全列插入 2.多行数据 指定列插入 3.插入否则更新 4.替换 二、Retrieve&#xff08;查&#xff09; 1.select列 1.1全列查询 1.2指定列查询 1.3查询字段为表达式 1.4为查询结果指定别名 1.5结果去重 …

2948. 交换得到字典序最小的数组 (分组排序)

Problem: 2948. 交换得到字典序最小的数组 文章目录 题目思路Code 题目 给你一个下标从 0 开始的 正整数 数组 nums 和一个 正整数 limit 。 在一次操作中&#xff0c;你可以选择任意两个下标 i 和 j&#xff0c;如果 满足 |nums[i] - nums[j]| < limit &#xff0c;则交换…

基于webserver的工业数据采集项目源代码

通过浏览器&#xff0c;实现Modbus Slave端数据采集和设备控制 数据采集函数 #include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/ip.h> #include <arpa/inet.h> #include …

[node] Node.js的Web 模块

[node] Node.js的Web 模块 什么是 Web 服务器&#xff1f;Web的应用架构http使用方式使用 Node 创建 Web 服务器使用 Node 创建 Web 客户端 什么是 Web 服务器&#xff1f; Web服务器一般指网站服务器&#xff0c;是指驻留于因特网上某种类型计算机的程序&#xff0c;Web服务器…

算法通关村-----超大规模数据场景的问题

对20GB文件进行排序 问题描述 假设有一个20GB的文件&#xff0c;每行一个字符串&#xff0c;请说明如何对这个文件进行排序 问题分析 20GB的文件很难一次加载到内存中&#xff0c;可以采用分块策略&#xff0c;先使块内有序&#xff0c;在使块间有序。 实现思路 按照给定…

【重点文章】将Java程序打包成exe文件,无Java环境也可以运行(解决各种疑难杂症)

文章目录 一、将Java程序打成jar包二、将Jar打成exe三、加壳改造成安装包 编译器为IDEA 一、将Java程序打成jar包 2. 3. 你打的包一般会出现在根目录下面的out文件夹下面  当然你也可以用maven的package功能打包&#xff0c;效果是一样的   二、将Jar打成exe 使用工具e…