spark自定义函数实现

场景:由于系统函数无法满足实际开发需求,需要通过自定义函数来实现

示例:


package sparkimport org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}object TestSparkUdf {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("student").master("local[2]").getOrCreate()import spark.implicits._val rdd2 = spark.sparkContext.makeRDD(Array(Student2(18, "one"), Student2(20, "two")))rdd2.toDF().registerTempTable("student")spark.udf.register("myupper", myUpper _)val df = spark.sql("select myupper(name) from student")df.show()
//    +-----------------+
//    |UDF:myupper(name)|
//    +-----------------+
//    |              ONE|
//    |              TWO|
//    +-----------------+spark.udf.register("myavg", new myAvg())val df2 = spark.sql("select myavg(age) from student")df2.show()
//    +----------+
//    |myavg(age)|
//    +----------+
//    |        19|
//    +----------+spark.stop()}//udf函数 一对一def myUpper(str: String): String = str.toUpperCase()}
//case class Student(id: String, name:String)class myAvg extends UserDefinedAggregateFunction {//输入数据的结构override def inputSchema: StructType = StructType(Array(StructField("age", LongType)))//缓冲区的数据结构override def bufferSchema: StructType = StructType(Array(StructField("total", LongType), StructField("count", LongType)))//函数计算结果的数据类型override def dataType: DataType = LongType//函数的稳定性override def deterministic: Boolean = true//缓冲区的初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0L;buffer(1) = 0L;}//新数据过来,如何更新缓冲区override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0, buffer.getLong(0) + input.getLong(0))buffer.update(1, buffer.getLong(1) + 1)}//多个缓冲区数据合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))}//计算操作结果override def evaluate(buffer: Row): Any = {buffer.getLong(0) / buffer.getLong(1)}
}case class Student2(age: Long, name: String)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/13175.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Helm安装集群整理

这里写目录标题 1.添加nfs存储2.安装redis集群3.安装neo4j集群4.安装clickhouse集群5. 安装zookeeper集群6. 安装es集群7. 安装openebs8.安装radondb 1.添加nfs存储 项目地址:https://github.com/kubernetes-sigs/nfs-subdir-external-provisioner helm install nfs…

【35分钟掌握金融风控策略29】贷中模型调额调价策略

目录 贷中客户风险管理和客户运营体系 用信审批策略 用信审批策略决策流与策略类型 贷中预警策略 对存量客户进行风险评级 基于客户的风险评级为客户匹配相应的风险缓释措施和建议 调额策略 基于定额策略的调额策略 基于客户在贷中的风险表现的调额策略 调价策略 存…

【计算机毕业设计】springboot城市公交运营管理系统

二十一世纪我们的社会进入了信息时代, 信息管理系统的建立,大大提高了人们信息化水平。传统的管理方式对时间、地点的限制太多,而在线管理系统刚好能满足这些需求,在线管理系统突破了传统管理方式的局限性。于是本文针对这一需求设…

P6023 走路

走路 题目背景 小 W 下载了一款运动软件。 题目描述 小 W 准备在接下来的 m m m 天中锻炼,由于他不能走得太多以至于累死(怎么可能呢),所以他这 m m m 天最多一共只能走 n n n 步。 这个运动软件为了激励小 W 走路&#xf…

【校园生活小程序_超详细部署】

校园生活小程序 1 完整小程序源码2 运行环境3 初次运行3.1 启动后端程序3.1.1 导入项目,找到项目的pom.xml文件,点击ok进行打开。3.1.2 创建数据库并插入内容 3.1.3 配置项目结构信息3.1.4 配置Tomcat服务器3.1.5 正式启动后端项目3.1.6出现BUG3.1.7 解决…

Android实践:查看Activity信息

问题:本地Android SDK的monitor无法正常运行,看不了进程相关信息,确认当前显示Activity十分不便 解决办法:使用adb shell指令可以快速查看 命令: adb shell dumpsys activity activities 这个命令用于获取Android设…

vscode-调试js文件

vscode 调试时报错: "launch.json" 找不到 Node.js 二进制文件“node”: 路径不存在。请确保 Node.js 已安装且位于你的路径中,或者在 launch.json 中设置 "runtimeExecutable" 在launch.json中加入即可 "runtimeExecutable&q…

Rust 标准库的结构及其模块路径

在 Rust 中,标准库提供了一组核心功能,以帮助开发者执行常见的编程任务。当使用这些功能时,我们需要通过特定的模块路径来引用它们。下面,我们将详细介绍 Rust 标准库的结构,并提供相应的 use 路径。 Rust 标准库模块…

Linux的常用指令 和 基础知识穿插巩固(巩固知识必看)

目录 前言 ls ls 扩展知识 ls -l ls -a ls -al cd cd 目录名 cd .. cd ~ cd - pwd 扩展知识 路径 / cp [选项] “源文件名” “目标文件名” mv [选项] “源文件名” “目标文件名” rm 作用 用法 ./"可执行程序名" mkdir rmdir touch m…

【YashanDB知识库】ycm纳管主机安装YCM-AGENT时报错“任务提交失败,无法连接主机”

问题现象 执行安装 ycm-agent 命令纳管主机时报错 问题的风险及影响 会导致 ycm-agent 纳管不成功,YCM 无法监控主机和数据库 问题影响的版本 yashandb-cloud-manager-23.2.1.100-linux-aarch64.tar 问题发生原因 因为 10.149.223.121 对 ycm 的主机没有开放端…

AI 情感聊天机器人之旅 —— 多轮对话存在的问题与数据积累

在 QA、逻辑推理等领域,多跳问答比单跳问答难得多。在聊天机器人场景中亦是如此,模型需要结合历史对话和用户当前的输入内容生成合适的响应。然而,现有的指令数据大都是单轮或者两轮的对话(截止这篇文章落笔的日期 2023-09-10&…

瑞芯微RK3588驱动设计之DVP并口摄像头2

dts配置看瑞芯微RK3588驱动配置之DVP并口摄像头1_rockchip 调试dvp设备 直接显示摄像头数据-CSDN博客 这里看看驱动的具体实现,以gc2145为例。 gc2145的驱动源码如下: // SPDX-License-Identifier: GPL-2.0 /** GC2145 CMOS Image Sensor driver*** C…

高斯分布应用;高斯分布和高斯核有什么;正态分布的具体应用举例说明

目录 高斯分布应用 高斯分布和高斯核有什么 正态分布的具体应用举例说明 高斯分布应用

串,数组和广义表

2.1.求next和nextval的实现 代码&#xff1a; int next_one(char *str, int len) {int result 1;if(len 1 || len 0) return len;for (size_t i 1; i < len; i){ if(compare(str, strlen-i, i)) {result i1;//break;}}return result; }int next(char *str, int *…

MySQL-索引的增删改

1、索引的分类 从功能逻辑上划分&#xff1a; 普通索引 &#xff1a;创建索引时不加任何限制条件&#xff0c;只是用来提高查询效率。可以创建在任何数据类型中&#xff0c;其值是否唯一和非空由字段本身的完整性约束条件决定。唯一索引&#xff1a;使用UNIQUE参数可以设置索引…

nodeJs用ffmpeg直播推流到rtmp服务器上

总结 最近在写直播项目 目前比较重要的点就是推拉流 自己也去了解了一下 ffmpeg FFmpeg 是一个开源项目&#xff0c;它提供了一个跨平台的命令行工具&#xff0c;以及一系列用于处理音频和视频数据的库。FFmpeg 能够执行多种任务&#xff0c;包括解封装、转封装、视频和音频…

国际化日期(inti)

我们可以使用国际化API自动的格式化数字或者日期&#xff0c;并且格式化日期或数字的时候是按照各个国家的习惯来进行格式化的&#xff0c;非常的简单&#xff1b; const now new Date(); labelDate.textContent new Intl.DateTimeFormat(zh-CN).format(now);比如说这是按照…

DC-DC转换效率的影响因素和优化方向

一. 定义 DC-DC转换效率的定义是输入与输出功率之比&#xff1a; η P O U T P I N P O U T P O U T P L O S S η\frac{P_{OUT}}{P_{IN}}\frac{P_{OUT}}{P_{OUT}P_{LOSS}} ηPIN​POUT​​POUT​PLOSS​POUT​​ 其中POUT代表输出功率&#xff0c;PIN代表输入功率&#x…

ADS FEM 仿真设置

1、EM Simulator 选择FEM。 2、在layout界面打开的EM功能&#xff0c;这里不需要操作。 3、Partitioning 不需要操作。 4、没有叠层的话需要新建&#xff0c;过孔可以在叠层处右键添加。 5、端口需要设置GND layer。 6、设置仿真频率。 7、Output plan。 8、Options 设置 介质…

网络学习(三)|Feign与RPC在微服务架构中的应用对比

文章目录 一、概述二、设计理念与实现方式三、协议与传输层四、应用场景与性能考量五、性能与效率六、结论七、其他Feign与HTTP的关系 在构建分布式系统和微服务架构时&#xff0c;选择合适的服务间通信技术至关重要。Feign和RPC&#xff08;Remote Procedure Call&#xff09;…