Spark---DataFrame存储、Spark UDF函数、UDAF函数

四、DataFrame存储+Spark UDF函数

1、储存DataFrame

1)、将DataFrame存储为parquet文件

2)、将DataFrame存储到JDBC数据库

3)、将DataFrame存储到Hive表

2、UDF:用户自定义函数

可以自定义类实现UDFX接口

java:

SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {
return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");/*** 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx*/
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(String t1) throws Exception {return t1.length();}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
//	/**
//	 * 
//	 */
//	private static final long serialVersionUID = 1L;
//
//	@Override
//	public Integer call(String t1, Integer t2) throws Exception {
//return t1.length()+t2;
//	}
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();sc.stop();	

scala:

1.val spark = SparkSession.builder().master("local").appName("UDF").getOrCreate()
2.val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")
3.import spark.implicits._
4.val nameDF: DataFrame = nameList.toDF("name")
5.nameDF.createOrReplaceTempView("students")
6.nameDF.show()
7.
8.spark.udf.register("STRLEN",(name:String)=>{
9.name.length
10.})
11.spark.sql("select name ,STRLEN(name) as length from students order by length desc").show(100)

五、UDAF函数

1、UDAF:用户自定义聚合函数

1)、实现UDAF函数如果要自定义类要继承

UserDefinedAggregateFunction类

2)、UDAF原理图

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/*** 注册一个UDAF函数,实现统计相同值得个数* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的*/
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** */private static final long serialVersionUID = 1L;/*** 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑* buffer.getInt(0)获取的是上一次聚合后的值* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 * 大聚和发生在reduce端.* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row arg1) {buffer.update(0, buffer.getInt(0)+1);}/*** 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值       * buffer2.getInt(0) : 这次计算传入进来的update的结果* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));}/*** 指定输入字段的字段及类型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));}/*** 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果*/@Overridepublic Object evaluate(Row row) {return row.getInt(0);}@Overridepublic boolean deterministic() {//设置为truereturn true;}/*** 指定UDAF函数计算后返回的结果类型*/@Overridepublic DataType dataType() {return DataTypes.IntegerType;}/*** 在进行聚合操作的时候所要处理的数据的结果的类型*/@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();sc.stop();

scala:

1.class MyCount extends UserDefinedAggregateFunction{
2.  //输入数据的类型
3.  override def inputSchema: StructType =    StructType(List[StructField](StructField("xx",StringType,true)))
4.
5.  //在聚合过程中处理的数据类型
6.  override def bufferSchema: StructType =   StructType(List[StructField](StructField("xx",IntegerType,true)))
7.
8.  //最终返回值的类型,与evaluate返回的值保持一致
9.  override def dataType: DataType = IntegerType
10.
11.  //多次运行数据是否一致
12.  override def deterministic: Boolean = true
13.
14.  //每个分区中每组key 对应的初始值
15.  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
16.
17.  //每个分区中,每个分组内进行聚合操作
18.  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
19.    buffer.update(0,buffer.getInt(0) + 1)
20.  }
21.
22.  //不同的分区中相同的key的数据进行聚合
23.  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
24.    buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
25.  }
26.
27.  //聚合之后,每个分组最终返回的值,类型要和dataType 一致
28.  override def evaluate(buffer: Row): Any = buffer.getInt(0)
29.}
30.
31.object Test {
32.  def main(args: Array[String]): Unit = {
33.    val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
34.    val list = List[String]("zhangsan","lisi","wangwu","zhangsan","lisi","zhangsan")
35.
36.    import session.implicits._
37.    val frame = list.toDF("name")
38.    frame.createTempView("mytable")
39.
40.    session.udf.register("MyCount",new MyCount())
41.
42.    val result = session.sql("select name,MyCount(name) from mytable group by name")
43.    result.show()
44.
45.  }
46.}
47.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/207019.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

案例062:基于微信小程序的健身房私教预约系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

模块式雨水调蓄池施工简单,无需大型机械,可实现当天开挖当天回填

模块式雨水调蓄池的施工过程非常简单&#xff0c;无需大型机械和繁琐的施工工艺。在施工过程中&#xff0c;只需要进行简单的开挖和回填即可&#xff0c;而且可以在当天完成。这种施工方式不仅节省了施工时间和成本&#xff0c;还可以避免因大型机械和繁琐工艺引起的安全隐患。…

MIT_线性代数笔记: 复习一

目录 问题一问题二问题三问题四 本讲为考前复习课&#xff0c;考试范围就是 Axb 这个单元&#xff0c;重点是长方形矩阵&#xff0c;与此相关的概念包括零空间、左零空间、秩、向量空间、子空间&#xff0c;特别是四个基本子空间。当矩阵为可逆的方阵时&#xff0c;很多性质是一…

二叉树的层次遍历

102. 二叉树的层序遍历 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 样例输入 示例 1&#xff1a; 输入&#xff1a;root [3…

php研究课题

对于PHP这门语言而言&#xff0c;可以研究的课题有很多&#xff0c;以下是可能的课题方向和对应的内容&#xff1a; PHP语言基础研究 PHP语言特性和基本语法PHP的数据类型、变量、运算符和表达式PHP的流程控制语句PHP的函数和引用PHP的面向对象编程和设计模式 PHP与Web开发 …

harmony开发之Text组件的使用

TextInput、TextArea是输入框组件&#xff0c;通常用于响应用户的输入操作&#xff0c;比如评论区的输入、聊天框的输入、表格的输入等&#xff0c;也可以结合其它组件构建功能页面&#xff0c;例如登录注册页面。 图片来源黑马程序员 Text组件的使用&#xff1a; 文本显示组…

flutter学习-day1-环境搭建和启动第一个项目

&#x1f4da; 目录 SDK 下载配置环境变量安装 flutter搭建 Android 环境SDK 和依赖升级IDE 配置与使用 Android Studio 配置与使用VS Code 配置与使用 真机调试 本文学习和引用自《Flutter实战第二版》&#xff1a;作者&#xff1a;杜文 1. SDK下载 前置需要操作系统 window …

Spring Cloud + Vue前后端分离-第4章 使用Vue cli 4搭建管理控台

Spring Cloud Vue前后端分离-第4章 使用Vue cli 4搭建管理控台 4-1 使用vue cli创建admin项目 Vue 简介 Vue作者尤雨溪在google工作时&#xff0c;最早只想研究angular的数据绑定功能&#xff0c;后面觉得这个小功能很好用&#xff0c;有前景&#xff0c;就再扩展&#xff…

[MySQL] MySQL复合查询(多表查询、子查询)

前面我们学习了MySQL简单的单表查询。但是我们发现&#xff0c;在很多情况下单表查询并不能很好的满足我们的查询需求。本篇文章会重点讲解MySQL中的多表查询、子查询和一些复杂查询。希望本篇文章会对你有所帮助。 文章目录 一、基本查询回顾 二、多表查询 2、1 笛卡尔积 2、2…

机器学习笔记 - 基于深度学习计算视频中演员的出镜时间

一、基本步骤 这里是使用动画片猫和老鼠进行计算,基本流程如下: 1、导入并读取视频,从中提取帧,并将其另存为图像 2、标记一些图像以训练模型(别担心,我已经为你做好了) 3、根据训练数据构建我们的模型 4、对剩余图像进行预测 5、计算汤姆和杰瑞的屏幕时间 二、基础环境…

教师未来发展前景如何

作为一名教师&#xff0c;我对未来发展的前景也感到有些迷茫。 不过教育行业仍然是一个稳定的职业&#xff0c;但是随着社会的变化和科技的发展&#xff0c;传统的教学模式已经逐渐被在线教育、人工智能等新型教学方式所取代。这使得教师的角色和职责也在发生变化&#xff0c;需…

matplot绘图时图像太大报错但能保存

matplot绘图时&#xff0c;图像太大&#xff0c;可能在jupyter里面报错&#xff0c;但是图像可以保存。 报错&#xff1a;Image size of 12237479x675 pixels is too large. It must be less than 2^16 in each direction. 在这里插入图片描述

Linux中用bash写脚本

本章主要介绍如何使用bash 了解通配符了解变量了解返回值和数值运算判断语句 grep的用法是“grep 关键字 file”&#xff0c;意思是从file中过滤出含有关键字的行 例如&#xff0c;grep root /var/log/messages&#xff0c;意思是从/var/log/messages 中过滤出含有root 的行…

SpringIOC第二课,@Bean用法,DI详解,常见面试题Autowired VS Resource

一、回顾 但是我们之前MVC时候&#xff0c;在页面上&#xff0c;为什只用Controller,不用其他的呢&#xff1f; 用其他的好使吗&#xff1f;(我们可以在这里看到&#xff0c;出现404的字样&#xff09; Service ResponseBody public class TestController {RequestMapping(&quo…

kubernetes安装kubesphere

前置默认都安装了k8s&#xff0c;且k8s都正常 1、nfs文件系统 1.1、安装nfs-server # 在每个机器。 yum install -y nfs-utils# 在master 执行以下命令 echo "/nfs/data/ *(insecure,rw,sync,no_root_squash)" > /etc/exports# 执行以下命令&#xff0c;启动 …

数字化和数智化一字之差,究竟有何异同点?

在2023杭州云栖大会的一展台内&#xff0c;桌子上放着一颗番茄和一个蛋糕&#xff0c;一旁的机器人手臂融入“通义千问”大模型技术后&#xff0c;变得会“思考”&#xff1a;不仅能描述“看”到了什么&#xff0c;还能确认抓取的是番茄而不是蛋糕。 “传统的机械臂通常都只能基…

Post Quantum Fuzzy Stealth Signatures and Applications

目录 笔记后续的研究方向摘要引言贡献模块化框架模糊构造实施适用于FIDO Post Quantum Fuzzy Stealth Signatures and Applications CCS 2023 笔记 后续的研究方向 摘要 自比特币问世以来&#xff0c;基于区块链的加密货币中的私人支付一直是学术和工业研究的主题。隐形地址…

cmd命令 常用的命令

网络工作为常年公司里的背锅侠&#xff0c;不得不集齐十八般武艺很难甩锅。像cmd命令这种好用又好上手的技术&#xff0c;就是网络工程师上班常备技能。 只要按下快捷键 winR&#xff0c;输入cmd回车&#xff0c;然后输入cmd命令。 像我自己&#xff0c;我就经常用cmd命令检测…

在UBUNTU上使用Qemu和systemd-nspawn搭建RISC-V轻量级用户模式开发环境

参考链接 使用Qemu和systemd-nspawn搭建RISC-V轻量级用户模式开发环境 - 知乎 安装Qemu sudo apt updatesudo apt -y install qemu-user-binfmt qemu-user-static systemd-container sudo apt -y install zstd 配置环境 RISCV_FILEarchriscv-2023-10-09.tar.zstwget -c ht…

浪潮信息KeyarchOS——保卫数字未来的安全防御利器

浪潮信息KeyarchOS——保卫数字未来的安全防御利器 前言 众所周知&#xff0c;目前流行的操作系统有10余种&#xff0c;每一款操作系统都有自己的特点。作为使用者&#xff0c;我们该如何选择操作系统。如果你偏重操作系统的安全可信和稳定高效&#xff0c;我推荐你使用浪潮信…