Spark SQL(二)之DataSet操作

一、创建DataSet

使用SparkSession,应用程序可以从现有的RDD,Hive表的或Spark数据源创建DataFrame 。

(1)基于JSON的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().json("hdfs://master:9000/test.json");//rdd
RDD<String> jsonRDD = ...
Dataset<Row> df = spark.read().json(jsonRDD);//dataset
Dataset<String> jsonDataset = ...
Dataset<Row> df = spark.read().json(dataSet);

(2)基于parquet的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.parquet");

(3)基于orc的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.orc");

 (4)基于txt的内容创建一个DataFrame

//hdfs 创建只有value列的数据
Dataset<Row> df = spark.read().txt("hdfs://master:9000/test.txt");

(5)基于cvs的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().cvs("hdfs://master:9000/test.cvs");

 (6)基于jdbc的内容创建一个DataFrame

Dataset<Row> df1 = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/man").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "man").option("user", "root").option("password","admin").load();
df1.show();Properties properties = new Properties();
properties.put("user", "root");
properties.put("password","admin");
properties.put("driver", "com.mysql.jdbc.Driver");
Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/man",  "man", properties);
df2.show();

(7)基于textFile的内容创建一个DataSet

//hdfs
Dataset<String> ds = spark.read().textFile("hdfs://master:9000/test.txt");

(8)rdd创建DataSet

//反射推断StructType
JavaRDD<Person> peopleRDD = ...
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);//编程方式指定StructType
String schemaString = ...
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim());
});
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

 

二、DataSet操作

(1)schema结构

df.printSchema();
StructType type = df.schema();

(2)map一对一映射操作

//dataframe格式转换
Dataset<Row> df1 = df.map(v-> v, RowEncoder.apply(df.schema()));
df1.show();//dataframe格式转换
StructField structField = new StructField("name", DataTypes.StringType, true, null);
StructType structType = new StructType(new StructField[]{structField});
Dataset<Row> df2 = df.map(v-> new GenericRowWithSchema(new Object[]{v.getAs("name")}, structType), RowEncoder.apply(structType));
df2.show();//dataSet格式转换
Dataset<String> dfs =  df.map(v-> v.getAs("name"), Encoders.STRING());
dfs.show();

(3)flatMap一对多映射操作

//dataSet格式转换
Dataset<String> dfs =  df.flatMap(v-> Arrays.asList((String)v.getAs("name")).iterator(), Encoders.STRING());
dfs.show();

(4)filter过滤操作

Dataset<Row>  df1 =  df.filter(new Column("name").$eq$eq$eq("mk"));
Dataset<Row>  df2 =  df.filter(new Column("name").notEqual("mk"));

(5)withColumn加列或者覆盖

Dataset<Row> df1 = df.withColumn("name1", functions.col("name"));
df1.show();
Dataset<Row>  df2 = df.withColumn("name", functions.lit("a"));
df2.show();
Dataset<Row>  df3 = df.withColumn("name", functions.concat(functions.col("name"),  functions.lit("zzz")));
df3.show();

(6)select选择列

Dataset<Row>  df1 = df.select(functions.concat(functions.col("name"),  functions.lit("zzz")).as("name1"));
df1.show();
Dataset<Row>  df2 = df.select(functions.col("name"), functions.concat(functions.col("name"),  functions.lit("zzz")).as("name1"));
df2.show();

(7)selectExpr表达式选择列

Dataset<Row>  df1 = df.selectExpr("name", "'a' as name1");
df1.show();

(8)groupBy agg分组统计

Dataset<Row>  df1 = df.groupBy(functions.col("name")).agg(functions.expr("count(1)").as("c"), functions.expr("max(desc)").as("desc"));
df1.show();

(9)drop删除列

Dataset<Row>  df1 = df.drop("name");
df1.show();

(10)distinct去重

Dataset<Row>  df1 = df.distinct();
df1.show();

(11)dropDuplicates 根据字段去重

Dataset<Row>  df1 = df.dropDuplicates("name");
df1.show();

(12)summary统计count、mean、stddev、min、max、25%、50%、75%,支持统计类型过滤

Dataset<Row>  df1 = df.summary("count");
df1.show();

(13)describe统计count、mean、stddev、min、max,支持列过滤

Dataset<Row>  df1 = df.describe("name");
df1.show();

(14)sort 排序

Dataset<Row>  df1 = df.sort(functions.col("name").asc());
df1.show();

(15)limit 分页

Dataset<Row>  df1 = df.limit(1);
df1.show();

 

三、DataSet连接

(1)join连接

Dataset<Row>  df1 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")), "left_outer");
df1.show();Dataset<Row>  df2 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")));
df2.show();

(2)crossJoin笛卡尔连接

Dataset<Row>  df1 = df.as("a").crossJoin(df.as("b"));
df1.show();

 

四、DataSet集合运算

(1)except差集

Dataset<Row>  df1 = df.except(df.filter("name='mk'"));
df1.show();

(2)union并集,根据列位置合并行,列数要一致

Dataset<Row>  df1 = df.union(df.filter("name='mk'"));
df1.show();

(3)unionByName并集,根据列名合并行,不同名报错,列数要一致

Dataset<Row>  df1 = df.unionByName(df.filter("name='mk'"));
df1.show();

(4)intersect交集

Dataset<Row>  df1 = df.intersect(df.filter("name='mk'"));
df1.show();

 

 

五、DataSet分区

repartition(numPartitions:Int):RDD[T]

coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现

假设RDD有N个分区,需要重新划分成M个分区

1、N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。

2、如果N>M并且N和M相差不多,(假如N是100,M是10)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false。

在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。

3、如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能。

如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。

DataSet的coalesce是Repartition shuffle=false的简写方法

Dataset<Row>  df1 = df.coalesce(1);
Dataset<Row>  df2 = df.repartition(1);


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322056.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jzoj6296-投票【期望dp,贪心】

正题 题目大意 nnn个人&#xff0c;第iii投票的概率是pip_ipi​&#xff0c;选择kkk个求最大的平票概率。 解题思路 我们显然要让kkk人中一半投票的概率大&#xff0c;一半投票的概率小。 所以我们可以先进行排序&#xff0c;这样我们发现答案一定是选取一段前缀和一段后缀。…

揽货最短路径解决方案算法 - C# 蚁群优化算法实现

需求为&#xff08;自己编的&#xff0c;非实际项目&#xff09;&#xff1a;某配送中心进行揽货&#xff0c;目标客户数为50个客户&#xff0c;配送中心目前的运力资源如下&#xff1a;现有车辆5台单台运力最大行驶距离200千米单台运力最大载重公斤1吨问&#xff1a;运力怎样走…

Spark SQL(三)之视图与执行SQL

一、视图与sql执行 SparkSession能够以编程方式运行SQL查询并返回结果Dataset<Row> Dataset<Row> df spark.read().json("hdfs://master:9000/test.json"); df.createOrReplaceTempView("man");Dataset<Row> sqlDF spark.sql("…

hdu4965-Fast Matrix Calculation【矩阵乘法】

正题 题目链接:http://acm.hdu.edu.cn/showproblem.php?pid4965 题目大意 给出矩阵A,BA,BA,B&#xff0c;求(AB)n(AB)^n(AB)n。然后对于每个元素%6\% 6%6后取和。 题目大意 我们发现如果直接让AB∗ABAB*ABAB∗AB这样的时间复杂度是n3n^3n3&#xff0c;显然不可过。但是我们…

OIDC在 ASP.NET Core中的应用

我们在《ASP.NET Core项目实战的课程》第一章里面给identity server4做了一个全面的介绍和示例的练习 。如果想完全理解本文所涉及到的话题&#xff0c;你需要了解的背景知识有&#xff1a;什么是OpenId Connect (OIDC)OIDC 对oAuth进行了哪些扩展&#xff1f;Identity Server4…

论文阅读:Blind Super-Resolution Kernel Estimation using an Internal-GAN

这是发表在 2019 年 NIPS 上的一篇文章&#xff0c;那个时候还叫 NIPS&#xff0c;现在已经改名为 NeurIPS 了。文章中的其中一个作者 Michal Irani 是以色 Weizmann Institute of Science (魏茨曼科学研究学院) 的一名教授&#xff0c;对图像纹理的内在统计规律有着很深入的研…

Spark SQL(四)之DataSet与RDD转换

一、创建DataSet DataSet与RDD相似&#xff0c;但是&#xff0c;它们不使用Java序列化或Kryo&#xff0c;而是使用专用的Encoder对对象进行序列化以进行网络处理或传输。虽然编码器和标准序列化都负责将对象转换为字节&#xff0c;但是编码器是动态生成的代码&#xff0c;并使…

P3389-[模板]高斯消元法

正题 题目链接:https://www.luogu.org/problem/P3389 题目大意 给出一个nnn元一次方程组&#xff0c;求解。 解题思路 模板&#xff0c;有什么好说的吗 codecodecode #include<cstdio> #include<cstring> #include<algorithm> #include<cmath> usi…

【ASP.NET Core】处理异常

依照老周的良好作风&#xff0c;开始之前先说点题外话。前面的博文中&#xff0c;老周介绍过自定义 MVC 视图的搜索路径&#xff0c;即向 ViewLocationFormats 列表添加相应的内容&#xff0c;其实&#xff0c;对 Razor Page 模型&#xff0c;也可以向 PageViewLocationFormats…

Spark SQL(五)之数据加载与存储

一、数据加载 &#xff08;1&#xff09;默认数据源&#xff08;parquet&#xff09; 最简单加载数据的方式&#xff0c;所有操作都使用默认数据源&#xff08;parquet&#xff09;。如果指定默认数据源需要配置 spark.sql.sources.default参数。 Dataset<Row> manDF …

树莓派3B上部署运行.net core 2程序

针对Linxu arm处理器如何部署.net core 2的资料很少&#xff0c;网上找到几篇但都写得不够详细&#xff0c;按照他们教程来撞墙了&#xff0c;折磨了几天终于部署成功了&#xff0c;先上一张运行成功的图1.windows系统中&#xff0c;在项目的目录下使用CMD命令运行进行发布dotn…

bzoj3143,P3232-[Hnoi2013]游走【数学期望,高斯消元,贪心】

正题 题目链接: https://www.lydsy.com/JudgeOnline/problem.php?id3143 https://www.luogu.org/problem/P3232 题目大意 一张无向图nnn个点mmm条边&#xff0c;然后给每条边附上1∼m1\sim m1∼m的权值(不能重复)&#xff0c;求1走到nnn的最小期望值。 解题思路 我们可以计…

Spark SQL(六)之加载数据的参数配置

一、配置 忽略损坏的文件、忽略丢失的文件、路径全局过滤器、递归文件查找和修改时间路径过滤器等选项/配置仅在使用基于文件的源&#xff08;parquet&#xff0c;orc&#xff0c;avro&#xff0c;json&#xff0c;csv&#xff0c;txt&#xff09;时才有效。 以下示例中使用的…

bzoj1013,luogu4035-[JSOI2008]球形空间产生器【高斯消元】

正题 题目链接: https://www.lydsy.com/JudgeOnline/problem.php?id3534 https://www.luogu.org/problem/P4035 题目大意 一个nnn维平面的元&#xff0c;给出圆表面上的n1n1n1个坐标&#xff0c;求圆心位置。 解题思路 对于圆心的第iii维位置xix_ixi​有∑j0n(ai,j−xi)2C\…

拥抱.NET Core系列:MemoryCache 初识

MSCache能做什么&#xff1f;绝对过期支持滑动过期支持&#xff08;指定一个时间&#xff0c;TimeSpan&#xff0c;指定时间内有被Get缓存时间则顺延&#xff0c;否则过期&#xff09;过期回调自定义过期MSCache目前最新的正式版是 2.0.0&#xff0c;预览版是2.1.0&#xff0c;…

Spark Structure Streaming(一)之简介

一、Structure Streaming 结构化流是基于Spark SQL引擎构建的可伸缩且容错的流处理引擎。可以像对静态数据进行批处理计算一样&#xff0c;来表示流计算。 当流数据继续到达时&#xff0c;Spark SQL引擎将负责递增地&#xff0c;连续地运行它并更新最终结果。可以在Scala&…

Ocelot中使用Butterfly实践

Ocelot(https://github.com/TomPallister/Ocelot)是一个用.net core实现的API网关&#xff0c;Butterfly(https://github.com/ButterflyAPM/butterfly)是用.net core实现的全程序跟踪&#xff0c;现在&#xff0c;Ocelot中可以使用Butterfly了&#xff0c;关于Ocelot和Butterfl…

jzoj6290-倾斜的线【计算几何,贪心】

正题 题目大意 有nnn个点&#xff0c;将两个点连成线&#xff0c;求斜率最接近PQ\frac{P}{Q}QP​的线。 解题思路 我们有一个结论&#xff1a;若我们对于每一个点做一条斜率为PQ\frac{P}{Q}QP​的线&#xff0c;然后按截距排序&#xff0c;然后答案必定是相邻的点。 证明: 我…

Java 平台调试架构JPDA

转载自 Java-JPDA 概述 JPDA&#xff1a;Java 平台调试架构&#xff08;Java Platform Debugger Architecture&#xff09; 它是 Java 虚拟机为调试和监控虚拟机专门提供的一套接口。 一、JPDA https://docs.oracle.com/javase/8/docs/technotes/guides/jpda/ JPDA 由三个…

jzoj6305-最小值【线段树,dp,双端链表】

正题 题目大意 定义函数f(x)Ax3Bx2CxDf(x)Ax^3Bx^2CxDf(x)Ax3Bx2CxD 然后给出一个序列&#xff0c;要求按顺序分成若干段。对于一段[L..R][L..R][L..R]&#xff0c;贡献为f(min{ai}(i∈[L..R]))f(min\{a_i\}(i\in[L..R]))f(min{ai​}(i∈[L..R])) 然后要求所有段的贡献之和最大…