大数据Hive中的UDF:自定义数据处理的利器(下)

在上一篇文章中,我们对第一种用户定义函数(UDF)进行了基础介绍。接下来,本文将带您深入了解剩余的两种UDF函数类型。

文章目录

    • 1. UDAF
      • 1.1 简单UDAF
      • 1.2 通用UDAF
    • 2. UDTF
    • 3. 总结

1. UDAF

1.1 简单UDAF

第一种方式是 Simple(简单) 方式,即继承 org.apache.hadoop.hive.ql.exec.UDAF 类,并在派生类中以静态内部类的方式实现 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口。这个计算类将负责执行具体的聚合逻辑,具体步骤如下:

a)初始化(init):首先,我们需要实现UDAFEvaluator接口的init方法,用于初始化聚合过程中所需的任何资源或状态。

b)迭代(iterate):接下来,iterate方法将被用来处理传入的数据。此方法将逐个接收数据项,并更新聚合状态。它返回一个布尔值,指示是否继续迭代或停止。

c)部分终止(terminatePartial):在迭代完成后,terminatePartial方法将被调用。它的作用类似于Hadoop中的Combiner,用于返回一个中间聚合结果,以便在多个任务之间进行合并。

d)合并(merge):merge方法用于接收来自terminatePartial的中间结果,并将其合并以形成更接近最终结果的聚合状态。此方法同样返回一个布尔值,指示合并操作是否成功。

e)最终终止(terminate):最后,terminate方法将被用来生成并返回聚合操作的最终结果。

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;// 自定义的UDAF类,用于计算最大值
public class MyMaxUDAF extends UDAF {// 实现UDAFEvaluator接口的静态内部类static public class MaxIntEvaluator implements UDAFEvaluator {// 存放当前聚合操作过程中的最大值private int mMax;// 用于标记聚合数据集是否为空private boolean mEmpty;// 构造方法,用于执行初始化操作public MaxIntEvaluator() {super();init();}// 初始化方法,用于重置聚合状态public void init() {// 初始化最大值为0mMax = 0;// 初始化聚合数据集为空mEmpty = true;}// 迭代处理每一行数据。每次调用处理一行记录public boolean iterate(IntWritable o) {// 检查传入的数据是否为nullif (o != null) {// 如果当前聚合数据集为空,则直接将当前值设置为最大值if (mEmpty) {mMax = o.get();mEmpty = false; // 更新状态,标记聚合数据集不再为空} else {// 聚合数据集不为空时,用当前值和之前的最大值比较,保留较大的那个mMax = Math.max(mMax, o.get());}}return true;}// 输出Map阶段处理结果的方法,返回当前的最大值public IntWritable terminatePartial() {// 如果聚合数据集为空,则返回null;否则,返回当前的最大值return mEmpty ? null : new IntWritable(mMax);}// Combine/Reduce阶段,合并处理结果public boolean merge(IntWritable o) {// 通过调用iterate方法进行合并操作return iterate(o);}// 返回最终的聚集函数结果public IntWritable terminate() {// 如果聚合数据集为空,则返回null;否则,返回最终的最大值return mEmpty ? null : new IntWritable(mMax);}}
}

1.2 通用UDAF

编写简单的UDAF(用户定义聚合函数)相对容易,但这种方法由于依赖Java的反射机制,可能会牺牲一些性能,并且它不支持变长参数等高级特性。相比之下,通用UDAF(Generic UDAF)提供了这些高级特性的支持,虽然它的编写可能不如简单UDAF那样直接明了。
Hive社区推崇使用通用UDAF作为最佳实践,建议采用新的抽象类org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver来替代旧的UDAF接口,并推荐使用org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator抽象类来替换旧的UDAFEvaluator接口。这种新方法不仅提升了性能,还增加了灵活性,使得UDAF的功能更加强大和多样化。

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.io.IntWritable;// 通过继承AbstractGenericUDAFResolver并使用Description注解来定义一个新的UDAF。
@Description(name = "max_int", value = "_FUNC_(int) - Returns the maximum value of the column")
public class MyMaxUDAF2 extends AbstractGenericUDAFResolver {// 聚合函数的求值器内部类,继承自GenericUDAFEvaluator。public static class MaxIntEvaluator extends GenericUDAFEvaluator {// 用于存储输入参数的ObjectInspector。private PrimitiveObjectInspector inputOI;// 用于存储聚合结果。private IntWritable result;// 初始化方法,用于设置聚合函数的参数和返回类型。@Overridepublic ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {super.init(m, parameters);// 确认参数是原始类型并初始化inputOI。inputOI = (PrimitiveObjectInspector) parameters[0];// 设置聚合函数的返回类型为可写的整型。return PrimitiveObjectInspectorFactory.writableIntObjectInspector;}// 创建聚合缓冲区对象的方法。@Overridepublic AggregationBuffer getNewAggregationBuffer() throws HiveException {MaxAggBuffer buffer = new MaxAggBuffer();reset(buffer);return buffer;}// 重置聚合缓冲区对象的方法。@Overridepublic void reset(AggregationBuffer agg) throws HiveException {((MaxAggBuffer) agg).setValue(Integer.MIN_VALUE);}// 迭代方法,用于处理每一行数据。@Overridepublic void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {if (parameters[0] != null) {MaxAggBuffer myagg = (MaxAggBuffer) agg;// 从参数中获取整数值并更新聚合缓冲区中的最大值。int value = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);if (value > myagg.value) {myagg.setValue(value);}}}// 终止部分聚合的方法,通常返回最终聚合结果。@Overridepublic Object terminatePartial(AggregationBuffer agg) throws HiveException {return terminate(agg);}// 合并部分聚合结果的方法。@Overridepublic void merge(AggregationBuffer agg, Object partial) throws HiveException {if (partial != null) {MaxAggBuffer myagg = (MaxAggBuffer) agg;// 从部分聚合结果中获取整数值并更新聚合缓冲区中的最大值。int partialValue = PrimitiveObjectInspectorUtils.getInt(partial, inputOI);if (partialValue > myagg.value) {myagg.setValue(partialValue);}}}// 终止方法,用于返回最终聚合结果。@Overridepublic Object terminate(AggregationBuffer agg) throws HiveException {MaxAggBuffer myagg = (MaxAggBuffer) agg;// 创建IntWritable对象并设置聚合结果,然后返回。result = new IntWritable(myagg.value);return result;}// 聚合缓冲区对象的内部类定义,用于存储聚合过程中的中间状态。static class MaxAggBuffer implements AggregationBuffer {int value; // 聚合缓冲区中的值// 设置聚合缓冲区中的值void setValue(int val) { value = val; }}}
}
特性/UDAF类型简单UDAF通用UDAF
性能依赖反射,性能较低不依赖反射,性能较高
参数灵活性不支持变长参数支持变长参数
易用性编写简单直观编写复杂,功能强大
推荐使用适合简单聚合操作适合复杂聚合逻辑和高性能需求
接口和抽象类旧的UDAF接口和UDAFEvaluator新的AbstractGenericUDAFResolverGenericUDAFEvaluator
功能特性功能有限,实现常见聚合支持复杂迭代逻辑和自定义终止逻辑
应用场景- 快速开发和原型设计
- 实现基本聚合操作,如求和、平均值
- 对性能要求不高的小型项目
- 实现复杂的数据分析和处理
- 大数据量处理,需要高性能
- 需要变长参数支持的复杂查询
- 高级功能实现,如窗口函数、复杂的分组聚合

选择UDAF类型时应根据实际需求和上述特性来决定,以确保既能满足功能需求,又能获得较好的性能表现。

2. UDTF

  • 继承GenericUDTF类的步骤:
    开发自定义的表生成函数(UDTF)时,首先需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF这个抽象类,它为UDTF提供了一个通用的实现框架。

  • 实现initialize()、process()和close()方法:
    为了完成自定义UDTF的功能,需要实现三个核心方法:initialize()用于初始化UDTF,process()用于处理输入数据并生成输出,close()用于执行清理操作。

    - initialize()方法的调用与作用:在UDTF的执行过程中,initialize()方法是首先被调用的。它负责初始化UDTF的状态,并返回关于UDTF返回行的信息,包括返回行的个数和类型。

    • process()方法的执行:initialize()方法执行完成后,接下来会调用process()方法。该方法是UDTF的核心,负责对输入参数进行处理。在process()方法中,可以通过调用forward()方法将处理结果逐行返回。
    • close()方法的清理作用:在UDTF的所有处理工作完成后,最终会调用close()方法。这个方法用于执行必要的清理工作,如释放资源或关闭文件句柄等,确保UDTF在结束时不会留下任何未处理的事务。

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;
import java.util.List;/*** 自定义一个UDTF,实现将一个由任意分割符分隔的字符串切割成独立的单词。**/
public class LineToWordUDTF extends GenericUDTF {// 用于存储输出单词的集合private ArrayList<String> outList = new ArrayList<String>();/*** initialize方法:当GenericUDTF函数初始化时被调用一次,用于执行一些初始化操作。* 包括:*      1. 判断函数参数个数*      2. 判断函数参数类型*      3. 确定函数返回值类型*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {// 1. 定义输出数据的列名和类型List<String> fieldNames = new ArrayList<String>();List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();// 2. 添加输出数据的列名和类型fieldNames.add("lineToWord"); // 输出列名fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); // 输出列类型// 返回输出数据的ObjectInspectorreturn ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}/*** process方法:自定义UDTF的核心逻辑实现方法。* 代码实现步骤可以分为三部分:*  1. 参数接收*  2. 自定义UDTF核心逻辑*  3. 输出结果*/@Overridepublic void process(Object[] objects) throws HiveException {// 1. 获取原始数据String arg = objects[0].toString(); // 假设第一个参数为要分割的字符串// 2. 获取数据传入的第二个参数,此处为分隔符String splitKey = objects[1].toString(); // 假设第二个参数为分隔符// 3. 将原始数据按照传入的分隔符进行切分String[] fields = arg.split(splitKey); // 分割字符串// 4. 遍历切分后的结果,并写出for(String field : fields) {// 集合为复用的,首先清空集合outList.clear();// 将每个单词添加至集合outList.add(field);// 将集合内容通过forward方法写出,这里假设forward方法可以处理集合forward(outList);}}/*** close方法:当没有其他输入行时,调用该函数。* 可以进行一些资源关闭处理等最终处理。*/@Overridepublic void close() throws HiveException {// 资源清理逻辑,当前示例中无具体实现}}

3. 总结

本文我们详细解析了UDAF和UDTF在Hive中的应用。通过实际代码示例,我们展示了UDAF如何帮助我们深入分析数据,以及UDTF如何简化复杂的数据转换任务。

感谢您的阅读和支持。如果您对UDAF、UDTF或Hive的其他高级功能有疑问,或者想要更深入地讨论,欢迎在文章下留言或直接联系我们。期待我们的下一次分享,一起在大数据的世界里探索新知。

再次感谢,希望您喜欢这次的分享。我们下次见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14460.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每日一题《leetcode--382.链表随机结点》

https://leetcode.cn/problems/linked-list-random-node/ 这道题我们首先看到题目中的要求&#xff1a;在单链表中随机选取一个链表中的结点&#xff0c;要使每个结点被选取的概率是一样的。 当我们看到随机这两个字时&#xff0c;应该就会想起rand()这个函数。接着我们把使用这…

自己搭建内网穿透

本文介绍使用最新版frp搭建内网穿透&#xff0c;最新版本的frp在配置上与之前有很大不同&#xff0c;需要使用.toml文件进行配置。其中主要问题出现在toml文件内部。 一、云服务器配置 下载frp sudo apt update sudo apt install wget wget https://github.com/fatedier/frp…

求出这行英文中最后一个单词的长度

【题目描述】蓝宝看到了一行奇怪的英文&#xff0c;这行英文由若干单词组成&#xff0c;每个单词前后用一些字符*隔开请帮助蓝宝求出这行英文中最后一个单词的长度。【输入格式】 输入一行&#xff0c;就就是蓝宝看到的奇怪的英文。 【输出格式】 输出一行&#xff0c;是个整数…

文旅3d仿真数字人形象为游客提供全方位的便捷服务

在AI人工智能与VR虚拟现实技术的双重驱动下&#xff0c;文旅3D数字代言人正以其独特的魅力&#xff0c;频频亮相于各类文旅场景&#xff0c;为游客带来前所未有的个性化服务体验。他们不仅有趣有品&#xff0c;更能言善道&#xff0c;成为文旅业数字化发展的新亮点。 这些文旅3…

我的文章分类合集目录

文章目录 Java相关基础常规问题类Docker类RabbitMQ类分库分表 网络工程相关路由交换、Cisco Packet TracerIP地址 前端相关数据库 Java相关 基础 Java开发规范、项目开发流程 SpringBoot整合MyBatis实现增删改查(简单,详细) SpringBoot整合MybatisPlus&#xff08;详细&#…

【搜索】BFS

#include <iostream> #include <cstring> #include <queue>using namespace std;const int N 110;typedef pair<int, int> PII;int n, m; int g[N][N], d[N][N];//存放地图//存每一个点到起点的距离int bfs() {queue< PII > q;q.push({0, 0});m…

16.js数学方法和进制转换

数学方法 &#xff08;1&#xff09;Math.random() 默认生成0-1的随机数 var resMath.random() console.log(res) &#xff08;2&#xff09;Math.round(数字) 取整&#xff1a;正数-四舍五入 负数-5舍6入 var resMath.round(11)console.log(res) //11var res1Math.round(1…

CentOS7安装内网穿透实现远程推送镜像到本地Docker Registry

文章目录 前言1. 部署Docker Registry2. 本地测试推送镜像3. Linux 安装cpolar4. 配置Docker Registry公网访问地址5. 公网远程推送Docker Registry6. 固定Docker Registry公网地址 前言 本文主要介绍如何部署Docker Registry 本地镜像仓库,简单几步结合cpolar内网穿透工具实现…

网络安全之重发布与路由策略详解

重发布&#xff1b;import &#xff08;路由导入&#xff09; 将不同方式&#xff08;直连、静态、缺省、其他协议&#xff09;的路由器重发布进入RIP&#xff0c;OSPF中。 注意&#xff1a;1、华为中不能将缺省路由重发布进入RUO协议&#xff08;思科也是一样&#xff09;。…

Mac下QT开发环境搭建详细教程

QT Qt是一个跨平台的C应用程序框架&#xff0c;用于开发具有图形用户界面&#xff08;GUI&#xff09;的应用程序&#xff0c;同时也可用于开发非GUI程序&#xff0c;比如控制台工具和服务器。Qt是设计成通用、可移植和高效的&#xff0c;它广泛应用于全球的企业和开发者社区中…

青少年 CTF 练习平台:Misc(一)

前言 当然&#xff0c;我可以更详细地介绍一下青少年CTF练习平台。 青少年CTF练习平台是一个专为青少年设计的网络安全竞赛和训练平台。该平台由思而听&#xff08;山东&#xff09;网络科技有限公司与克拉玛依市思而听网络科技有限公司共同建设&#xff0c;自2018年创建以来…

图论定理汇总(二)

第六章 平面图 (一)、平面图的概念 定义1 如果能把图 G G G画在平面上&#xff0c;使得除顶点外&#xff0c;边与边之间没有交叉&#xff0c;称 G G G可嵌入平面&#xff0c;或称 G G G是可平面图。可平面图 G G G的边不交叉的一种画法&#xff0c;称为 G G G的一种平面嵌入&…

入门四认识HTML

一、HTML介绍 1、Web前端三大核心技术 HTML&#xff1a;负责网页的架构 CSS&#xff1a;负责网页的样式、美化 JS&#xff1a;负责网页的行动 2、什么是HTML HTML是用来描述网页的一种语言。 3、Html标签 单标签<html> 双标签<h>内容</h> 4、标…

spring boot整合j2cache 关闭二级缓存

我们整合了 j2cache 的项目启动 日志会输出 一级缓存 二级缓存 一级是 EhCacheProvider 二级是 SpringRedisProvider 如果 我们不想用二级缓存 在 j2cache.properties 中 加上 j2cache.12-cache-open配置 值为 true/false true是启用二级缓存 false 是不起用 默认 true 所以 …

多输入多输出 | Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测

多输入多输出 | Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测 目录 多输入多输出 | Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现GA-CNN遗传算法优化卷积神经网络多输入多输出预测&…

企业防泄密软件有哪些,哪个排名最好

机密数据的泄密对于企业而言&#xff0c;已成为最大的信息安全威胁之一。近年来企业面对的最大威胁来自于内部&#xff0c;以利益为出发点的互联网信息犯罪及案件&#xff0c;在世界各地不断传出&#xff0c;因此&#xff0c;信息保护与管控将逐渐成为企业信息安全重点部署项目…

VMware 安装Windows 7 SP1

1.下载镜像 迅雷&#xff1a;ed2k://|file|cn_windows_7_enterprise_with_sp1_x64_dvd_u_677685.iso|3265574912|E9DB2607EA3B3540F3FE2E388F8C53C4|/ 2.安装过程 自定义名字&#xff0c;点击【浏览】自定义安装路径 点击【浏览】&#xff0c;选择下载镜像的路径 结束啦~

html+css绘制自定义样式输入框

效果&#xff1a; 代码&#xff1a; html部分&#xff1a; <div class"box"> <div class"newbox"><input type"text" required><div class"name">Username</div></div> </div>css部分 …

投骰子——(随机游戏的控制)

精华点在于&#xff1a;利用封装&#xff0c;函数之间的良好调用&#xff0c;从而清晰明了的解决问题。 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> # include<stdlib.h> # include<time.h> # include"math.h" # define ARR_LEN 10 # d…

hpc中查看显存占用,等效nvidia-smi

nvidia-smi在hpc中无法使用&#xff0c; 但是可以通过以下方法查看应用程序占用的显存 先执行程序&#xff0c;之后 bjobs输出 可以看到使用的是gpu01节点 之后 ssh gpu01