头歌:SparkSQL简单使用

第1关:SparkSQL初识
 

任务描述


本关任务:编写一个sparksql基础程序。

相关知识


为了完成本关任务,你需要掌握:1. 什么是SparkSQL 2. 什么是SparkSession。  

什么是SparkSQL


Spark SQL是用来操作结构化和半结构化数据的接口。
当每条存储记录共用已知的字段集合,数据符合此条件时,Spark SQL就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,Spark SQL提供了以下三大功能:
(1) Spark SQL可以从各种结构化数据源(例如JSON、Parquet等)中读取数据。

(2) Spark SQL不仅支持在Spark程序内使用SQL语句进行数据查询,也支持从类似商业智能软件Tableau这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接sparkSQL进行查询。

(3) 当在Spark程序内使用Spark SQL时,Spark SQL支持SQL与常规的Python/Java/Scala代码高度整合,包括连接RDD与SQL表、公开的自定义SQL函数接口等。

什么是SparkSession


Spark中所有功能的入口点都是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder()。

import  org.apache.spark.sql.SparkSession;
SparkSession  spark  =  SparkSession .builder().appName("Java Spark SQL基本示例").master("local").config("spark.some.config.option" , "some-value").getOrCreate();//打印spark版本号System.out.println(spark.version());


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,具体任务如下:

打印spark的版本号。
测试说明
补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class Test1 {public static void main(String[] args) throws AnalysisException {/********* Begin *********/SparkSession  spark  =  SparkSession   .builder()  .appName("Java Spark SQL基本示例")  .master("local")  .config("spark.some.config.option" , "some-value")  .getOrCreate();  //打印spark版本号  System.out.println(spark.version());  /********* End *********/}}

第2关:Dataset创建及使用
 

任务描述


本关任务:创建Dataset并使用

相关知识


为了完成本关任务,你需要掌握:

什么是Dataset;
Dataset如何创建 ;
Dataset如何操作数据。
什么是Dataset
在Spark2.0版本以后,DataFrame API将会和Dataset  API合并,统一数据处理API。故实训中的Dateset和DataFrame可看成一个概念。
Dataset和RDD一样,也是Spark的一种弹性分布式数据集,它是一个由列组成的数据集,概念上等同于关系型数据库中的一张表,但在底层具有更丰富的优化。Dataset可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。有人肯定会问,已经有了弹性分布式数据集RDD,为什么还要引入Dataset呢?因为在Spark中,我们可以像在关系型数据库中使用SQL操作数据库表一样,使用Spark SQL操作Dataset。这让熟悉关系型数据库SQL人员也能轻松掌握。

上图直观地体现了 Dataset 和 RDD 的区别。左侧的 RDD[Person] 虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 Dataset 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。Dataset 除了提供了比 RDD 更丰富的算子以外,更重要的特点能提升执行效率、减少数据读取以及执行计划的优化等。

Dataset 上可用的操作分为转换和操作。转换是产生新 Dataset 的转换,动作是触发计算和返回结果的转换。示例转换包括map,filter,select和aggregate(groupBy)。示例操作将数据计数,显示或写入文件系统。

Dataset 是“懒惰的”,即只有在调用动作时才会触发计算。在内部,Dataset 表示描述生成数据所需计算的逻辑计划。调用操作时,Spark 的查询优化器会优化逻辑计划,并以并行和分布式方式生成有效执行的物理计划。要探索逻辑计划以及优化的物理计划,请使用explain函数。

要有效地支持特定于域的对象,需要使用编码器。编码器将域特定类型T映射到 Spark 的内部类型系统。例如,给定一个具有两个字段的  Person,name(string)和age(int),编码器用于告诉 Spark 在运行时生成代码以将 Person 对象序列化为二进制结构。该二进制结构通常具有低得多的存储器占用面积以及针对数据处理(例如,柱状格式)的效率进行优化。要了解数据的内部二进制表示,请使用模式函数。

Dataset如何创建
通常有两种方法来创建Dataset。最常见的方法是使用SparkSession上提供的读取功能将Spark指向存储系统上的某些文件。

//创建泛型的Dataset
Dataset<Row> df = spark.read().json("people.json");
//创建Person类型的DatasetDataset<Person> people = spark.read().json("people.json").as(Encoders.bean(Person.class));
//以表格形式显示前20行Dataset
df.show();
people.show();


也可以通过现有数据集上的转换来创建Dataset。 例如,以下内容通过对现有数据集应用过滤器来创建新Dataset:

   Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));


Dataset如何操作数据
Dataset操作数据有两种方式:API方式处理数据和以编程方式处理数据。

API方式处理数据


Dataset操作也可以通过以下定义的各种特定于域的语言(DSL)函数进行无类型操作:Dataset(类),列和函数。 这些操作与R或Python中的数据框抽象中可用的操作非常相似。
要从数据集中选择列,请在在Java中使用col 。

  

 Column ageCol = people.col("age");


请注意,Column类型也可以通过其各种功能进行操作。

import static org.apache.spark.sql.functions.col;
以树格式
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 仅选择“名称”列
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 选择所有人,但将年龄增加1 
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+
// 选择年龄超过21 
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 计数按年龄的人
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+


以编程方式处理数据
SparkSession支持让应用程序以编程方式运行SQL查询并返回结果。

//读取json,并将Dataset,并注册为SQL临时视图
sparkSession.read().json("people.json").createOrReplaceTempView("people");
//以表格形式显示前20行Dataset
sparkSession.sql("select * from people").show();
// + ---- + ------- + 
// | 年龄| 名称| 
// + ---- + ------- + 
// | null | Michael | 
// | 30 | 安迪| 
// | 19 | 贾斯汀| 
// + ---- + ------- +


编程要求


根据提示,在右侧编辑器补充代码,读取people.json文件,过滤age为23的数据,并以表格形式显示前20行Dataset。

people.json文件内容如下:

{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}

package com.educoder.bigData.sparksql;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;public class Test2 {public static void main(String[] args) throws AnalysisException {SparkSession  spark  =  SparkSession .builder().appName("test1").master("local").config("spark.some.config.option" , "some-value").getOrCreate();/********* Begin *********/spark.read().json("people.json").createOrReplaceTempView("people");  spark.sql("select * from people where age != '23'").show();/********* End *********/}}

第3关:Dataset自定义函数

 

任务描述


本关任务:编写Dataset自定义函数。

相关知识


为了完成本关任务,你需要掌握:

UserDefinedAggregateFunction介绍;
如何使用。
UserDefinedAggregateFunction
UserDefinedAggregateFunction是实现用户定义的聚合函数基础类,用户实现自定义无类型聚合函数必须扩展UserDefinedAggregateFunction 抽象类,相关方法如下:

方法及方法返回    描述


StructType bufferSchema()    StructType表示聚合缓冲区中值的数据类型。
DataType dataType()    UserDefinedAggregateFunction的返回值的数据类型
boolean deterministic()    如果此函数是确定性的,则返回true
Object evaluate(Row buffer)    根据给定的聚合缓冲区计算此UserDefinedAggregateFunction的最终结果
void initialize(MutableAggregationBuffer buffer)    初始化给定的聚合缓冲区
StructType inputSchema()    StructType表示此聚合函数的输入参数的数据类型。
void merge(MutableAggregationBuffer buffer1, Row buffer2)    合并两个聚合缓冲区并将更新的缓冲区值存储回buffer1
void update(MutableAggregationBuffer buffer, Row input)    使用来自输入的新输入数据更新给定的聚合缓冲区
如何使用
我们以计算员工薪水平均值的例子来说:
首先在用户自定义函数的构造函数中,定义聚合函数的输入参数的数据类型和聚合缓冲区中值的数据类型。

//定义员工薪水的输入参数类型为LongType
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
//定义员工薪水总数、员工个数的参数类型
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);


对聚合缓冲区中值设置初始值。

@Overridepublic void initialize(MutableAggregationBuffer buffer) {// TODO Auto-generated method stubbuffer.update(0, 0L);buffer.update(1, 0L);}


把自定义函数的输入薪水数据转化为定义的聚合缓冲区的值(薪水总数、员工个数),并更新。

@Override
public void update(MutableAggregationBuffer buffer, Row input) {if (!input.isNullAt(0)) {long updatedSum = buffer.getLong(0) + input.getLong(0);long updatedCount = buffer.getLong(1) + 1;buffer.update(0, updatedSum);buffer.update(1, updatedCount);}
}


把多个聚合缓冲区的值进行合并。

@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {// TODO Auto-generated method stublong mergedSum = buffer1.getLong(0) + buffer2.getLong(0);long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);buffer1.update(0, mergedSum);buffer1.update(1, mergedCount);
}


最后通过聚合缓冲区的值计算输出结果。

@Override
public Object evaluate(Row buffer) {// TODO Auto-generated method stubreturn ((double) buffer.getLong(0)) / buffer.getLong(1);
}


就此自定义函数就开发完了,通过SparkSession的udf()方法会返回注册用户定义函数的方法集合UDFRegistration
通过UDFRegistration调用register方法进行自定义函数注册,使用如下:

// 注册自定义函数myAverage
spark.udf().register("myAverage", new MyAverage());
//读取json文件
spark.read().json("people.json").createOrReplaceTempView("people");
//使用自定义函数计算薪水平均值
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();
// +--------------+
// |average_salary|
// +--------------+
// |        5000|
// +--------------+


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,编写自定义函数类MyAverage,用来计算用户薪水平均值,平台已提供了最后的实现:

spark.udf().register("myAverage", new MyAverage());
spark.read().json("people.json").createOrReplaceTempView("people");
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();

测试说明


补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MyAverage extends UserDefinedAggregateFunction {
private static final long serialVersionUID = 1L;
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
@Override
public StructType bufferSchema() {
// TODO Auto-generated method stub
return bufferSchema;
}
@Override
public DataType dataType() {
// TODO Auto-generated method stub
return DataTypes.DoubleType;
}
@Override
public boolean deterministic() {
// TODO Auto-generated method stub
return true;
}
@Override
public Object evaluate(Row buffer) {
// TODO Auto-generated method stub
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
// TODO Auto-generated method stub
buffer.update(0, 0L);
buffer.update(1, 0L);
}
@Override
public StructType inputSchema() {
// TODO Auto-generated method stub
return inputSchema;
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
// TODO Auto-generated method stub
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/5586.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI家居设备的未来:智能家庭的下一个大步

&#x1f512;目录 ☂️智能家居设备的发展和AI技术的作用 ❤️AI技术实现智能家居设备的自动化控制和智能化交互的依赖 AI家居设备的未来应用场景 &#x1f4a3;智能家庭在未来的发展和应用前景 &#x1f4a5;智能家居设备的发展和AI技术的作用 智能家居设备的发展和AI技术的…

Liunx发布tomcat项目

Liunx在Tomcat发布JavaWeb项目 1.问题2.下载JDK3.下载Tomcat4.Tomcat本地JavaWeb项目打war包、解压、发布5.重启Tomcat,查看项目 1.问题 1.JDK 与 Tomcat 版本需匹配&#xff0c;否则页面不能正确显示 报错相关&#xff1a;Caused by: java.lang.ClassNotFoundException: java…

优化NGINX性能:使用NGINX_THREADS提高并发处理能力

目录标题 1. 什么是NGINX_THREADS&#xff1f;2. 配置NGINX_THREADS3. 使用NGINX_THREADS处理耗时操作4. 性能调优5. 结论 NGINX作为一个高性能的HTTP和反向代理服务器&#xff0c;在处理高并发请求时表现出色。但随着互联网应用对性能要求的不断提高&#xff0c;深入了解和优化…

DA14531如何配置SDK工程路径

1 前言 当我们获取一个DA14531的工成样列&#xff0c;有时是不包括SDK中的公共部分的代码&#xff0c;只有用户项目代码。而想拷贝到自己的SDK中&#xff0c;如&#xff1a;F:\A02_Work\A01_Dailog\DA145xx_SDK\6.0.18.1182.1\projects\User\ble_app_sensor&#xff0c;打开工…

高颜值管理系统界面,我敢保证你肯定看不够,看了又看。

有不少老铁&#xff0c;还坚持10年前的老思路&#xff0c;总觉得B端管理系统颜值不颜值不重要&#xff0c;关键是好用就行&#xff0c;这就犯了二元论的错误。 谁说高颜值的管理系统&#xff0c;就不好用了呢&#xff1f;高颜值和易用性冲突吗&#xff1f;我看未必吧。看看大厂…

羊大师:羊奶与传统牛奶在健康方面的差异

羊大师&#xff1a;羊奶与传统牛奶在健康方面的差异 羊奶与传统牛奶在健康方面的差异主要体现在以下几个方面&#xff1a; 营养成分&#xff1a;羊奶与传统牛奶在营养成分上有所不同。羊奶中含有更多的维生素A、碳水化合物、烟酸以及磷&#xff0c;而牛奶在钙、蛋白质和锌的含…

[高质量]2024五一数学建模A题保奖思路+代码(后续会更新)

你的点赞收藏是我继续更新的最大动力&#xff0c;可点击文末卡片获取更多资料 你是否在寻找数学建模比赛的突破点&#xff1f; 作为经验丰富的数学建模团队&#xff0c;我们将为你带来2024 年华东杯&#xff08;A题&#xff09;的全面解析包。这个解决方案包不仅包括完整的代…

数组模拟双链表-java

通过数组来模拟双链表&#xff0c;并执行一些插入和删除的功能。 目录 一、问题描述 二、模拟思路 1.变量解释 2.数组初始化 3.在下标是k的结点后面插入一个结点 4.删除下标为k的结点 5.基本功能解释 三、代码如下 1.代码如下&#xff1a; 2.读入数据&#xff1a; 3…

【海博】雅思考什么?

文章目录 听力 阅读 写作 口语 参考资料 听力 听力考试分为四个部分&#xff0c;每个部分有10道题&#xff0c;每题1分。问题的排列顺序和答案在听力材料中出现的顺序一致。在播放听力材料过程中&#xff0c;考生会获得读题、写下答案以及检查答案的时间。考生应在播放听力…

Memory augment is All You Need for image restoration 论文翻译

目录 一.介绍 二.实际工作 A.图像阴影去除 B.图像去雨 C.存储模块的开发 三.网络结构 A.内存扩充 B.损失函数设计 四.实验 A.与最先进方法的比较 B.MemoryNet消融研究 五.结论 CVPR2023 MemoryNet 记忆增强是图像恢复所需要的一切 论文地址https://arxiv.org/abs/…

代码随想录算法训练营DAY47|C++动态规划Part8|198.打家劫舍、213.打家劫舍II、198.打家劫舍III

文章目录 198.打家劫舍思路CPP代码 213.打家劫舍II解决环的问题思路总结CPP代码 198.打家劫舍III思路递归三部曲——确定参数和返回值递归三部曲——确定终止条件递归三部曲——确定单层遍历的逻辑 打印dp数组CPP代码 198.打家劫舍 力扣题目链接 文章讲解&#xff1a;198.打家劫…

SVN--基本原理与使用(超详细)

目录 一、SVN概述二、SVN服务端软件安装三、SVN服务端配置四、SVN客户端软件安装与使用五、SVN三大指令六、SVN图标集与忽略功能6.1 图标集6.2 忽略功能 七、SVN版本回退八、SVN版本冲突九、SVN配置多仓库与权限控制9.1 配置多仓库9.2 权限控制 十、服务配置与管理十一、模拟真…

Linux 第十七章

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C&#xff0c;linux &#x1f525;座右铭&#xff1a;“不要等到什么都没有了…

刷机维修进阶教程-----VIVO机型 修复基带 更改参数 实例步骤操作解析

写到前面: 任何参数修改和刷写分区都有风险。严重会导致无基带 无信号等故障。操作前都需要谨慎,养成备份关键数据分区的习惯。以便出现问题可以恢复。不管是修复基带分区还是更改相关参数,在操作前都有可以恢复原分区数据与刷机救砖的能力在操作。今天以vivo机型来做演示。…

bun 换源 国内阿里源 npmmirror 加速下载

Github https://github.com/oven-sh/bun 版本号 bun 1.1.5 windows 安装 bun 如果本机有 nodejs 环境, 可以 npm install -g bun 安装 ( 官方把 exe 已经传到了 npm 仓库, 走的国内 npm 镜像, 下载速度会很快) 没有 nodejs, 可以用 powershell 脚本安装 具体操作 全局 …

力扣刷题 63.不同路径 II

题干 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish”&#xff09;。 现在考虑网格中有障碍物。那么从左上角到…

Pytorch GPU版本安装

一、背景 记录一下安装Pytorch GPU版本过程。 由于手残&#xff0c;卸载了电脑上的显卡驱动&#xff0c;现在我连显卡类型是啥都不知道了。 总体思路&#xff1a;安装显卡驱动->安装cuda->安装pytorch库 二、安装显卡驱动 2.1 查看本地显卡型号 通过「DirectX 诊断工具…

Leetcode 145:二叉树的后序遍历(迭代法)

给你一棵二叉树的根节点 root &#xff0c;返回其节点值的 后序遍历 。 思路&#xff1a; 迭代法的思路是&#xff0c;使用栈&#xff0c;一层一层的将树节点遍历入栈。 比如下面这个树&#xff0c;使用迭代法&#xff0c;1&#xff09;第一层&#xff0c;让根节点入栈。2&a…

顺序表??

1.引入&#xff1a;首先需要了解数据结构的作用&#xff1a;是计算机存储、组织数据的方式。就比如之前学的数组&#xff0c;就是一种基础的数据结构。通过数组&#xff0c;就可以去维护很多个同类型的数据&#xff0c;实现增加删除等的功能&#xff0c;这样就可以有效管理这些…

嵌入式学习63-C++

知识零碎&#xff1a; newmalloc friend 友元函数 …