聊一聊UDF/UDTF/UDAF是什么,开发要点及如何使用?

背景介绍

UDF来源于Hive,Hive可以允许用户编写自己定义的函数UDF,然后在查询中进行使用。星环Inceptor中的UDF开发规范与Hive相同,目前有3种UDF:

A. UDF--以单个数据行为参数,输出单个数据行;

UDF(User Defined Function),即用户自定义函数,能结合SQL语句一起使用,更好地表达复杂的业务逻辑,一般以单个数据行为参数,输出单个数据行;比如数学函数、字符串函数、时间函数、拼接函数

B. UDTF: 以一个数据行为参数,输出多个数据行为一个表作为输出;

UDTF(User Defined Table Function),即用户自定义表函数,它与UDF类似。区别在于UDF只能实现一对一,而它用来实现多(行/列)对多(行/列)数据的处理逻辑。一般以一个数据行为参数,输出多个数据行为一个表作为输出,如lateral、view、explore;

C. UDAF: 以多个数据行为参数,输出一个数据行;

UDAF(User Defined Aggregate Function)用户自定义聚合函数,是由用户自主定义的,用法同如MAX、MIN和SUM已定义的聚合函数一样的处理函数。而且,不同于只能处理标量数据的系统定义的聚合函数,UDAF的可以接受并处理更广泛的数据类型,如用对象类型、隐式类型或者LOB存储的多媒体数据。由于UDAF也属于聚合函数中的一种,同样也需要与GROUPBY结合使用。

一般UDAF以多个数据行为参数,接收多个数据行,并输出一个数据行,比如COUNT、MAX;

UDF、UDTF、UDAF的开发要点及使用DEMO

星环Quark计算引擎中内置了很多函数,同时支持用户自行扩展,按规则添加后即可在sql执行过程中使用,目前支持UDF、UDTF、UDAF三种类型,一般UDF应用场景较多,后面将着重介绍UDF的开发与使用。UDAF及UDTF将主要介绍开发要点以及Demo示例。

Quark的UDF接口兼容开源Hive的UDF接口,用户可以参考开源Hive的UDF手册,或者直接把开源Hive的UDF迁移到Quark上。

UDF

Quark数据类型

Quark类型

Java原始类型

Java包装类

hadoop.hive.ioWritable

tinyintbyteByteByteWritable
smallintshortShortShortWritable
intintIntegerIntWritable
bigintlongLongLongWritable
string-StringText
charcharCharacterHiveCharWritable
booleanbooleanBooleanBooleanWritable
floatfloatFloatFloatWritable
double doubleDoubleDoubleWritable
decimal-BigDecimalHiveDecimalWritable
date-DateDateWritable
array-ListArrayListWritable
Map<K,V>-Map<K.V>HashMapWritable

UDF函数

Quark 提供了两个实现 UDF 的方式:

第一种:继承 UDF 类
  • 优点:实现简单;支持Quark的基本类型、数组和Map;支持函数重载。
  • 缺点:逻辑较为简单,只适合用于实现简单的函数
第二种:继承 GenericUDF 类
  • 优点:支持任意长度、任意类型的参数;可以根据参数个数和类型实现不同的逻辑;资源消耗更低;可以实现初始化和关闭资源的逻辑(initialize、close)。
  • 缺点:实现比继承UDF要复杂一些

一般在以下几种场景下考虑使用GenericUDF:

  • 传参情况复杂,比如某UDF要传参数有多种数量或多种类型的情况,在UDF中支持这种场景我们需要实现N个不同的evaluate()方法分别对应N种场景的传参,在GenericUDF我们只需在一个方法内加上判断逻辑,对不同的输入路由到不同的处理逻辑上即可。还有比如某UDF参数既要支持String list参数,也要支持Integer list参数。你可能认为我们只要继续多重载方法就好了,但是Java不支持同一个方法重载参数只有泛型类型不一样,所以该场景只能用GenericUDF。
  • 需要传非Writable的或复杂数据类型作为参数。比如嵌套数据结构,传入Map的key-value中的value为list数据类型,或者比如数据域数量不确定的Struct结构,都更适合使用GenericUDF在运行时捕获数据的内部构造。
  • 该UDF被大量、高频地使用,所以从收益上考虑,会尽可能地优化一切可以优化的地方,则GenericUDF相比UDF在operator中避免了多次反射转化的资源消耗(后面会细讲),更适合被考虑。
  • 该UDF函数功能未来预期的重构、扩展场景较多,需要做得足够可扩展,则GenericUDF在这方面更优秀。

pom文件的依赖导入

UDF开发依赖<dependency><groupId>org.apache.hive</groupId><artifactId>inceptor-exec</artifactId><version>xxx</version>
</dependency>

继承示例

1.继承 UDF 类

该方式实现简单,只需新建一个类继承org.apache.hadoop.hive.ql.exec.UDF;

继承UDF类必须实现evaluate方法且返回值类型不能为 void,支持定义多个evaluate方法不同参数列表用于处理不同类型数据;

可通过完善@Description展示UDF用法 UDF样例。

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;@Description(name="my_plus",value="my_plus() - if string, do concat; if integer, do plus",extended = "Example : \n    >select my_plus('a', 'b');\n    >ab\n    >select my_plus(3, 5);\n    >8"
)
/*** 实现UDF函数,若字符串执行拼接,int类型执行加法运算。*/
public class AddUDF extends UDF {/*** 编写一个函数,要求如下:* 1. 函数名必须为 evaluate* 2. 参数和返回值类型可以为:Java基本类型、Java包装类、org.apache.hadoop.io.Writable等类型、List、Map* 3. 函数一定要有返回值,不能为 void*/public String evaluate(String... parameters) {if (parameters == null || parameters.length == 0) {return null;}StringBuilder sb = new StringBuilder();for (String param : parameters) {sb.append(param);}return sb.toString();}/*** 支持函数重载*/public int evaluate(IntWritable... parameters) {if (parameters == null || parameters.length == 0) {return 0;}long sum = 0;for (IntWritable currentNum : parameters) {sum = Math.addExact(sum, currentNum.get());}return (int) sum;}
}
2.继承 GenericUDF 类

GenericUDF相比与UDF功能更丰富,支持所有参数类型,实现起来也更加复杂。org.apache.hadoop.hive.ql.udf.generic.GenericUDF API提供了一个通用的接口将任何数据类型的对象当作泛型Object去调用和输出,参数类型由ObjectInspector封装;参数Writable类由DeferredObject封装,使用时简单类型可直接从Writable获取,复杂类型可由ObjectInspector解析。

Java的ObjectInspector类,用于帮助Quark了解复杂对象的内部架构,通过创建特定的ObjectInspector对象替代创建具体类对象,在内存中储存某类对象的信息。在UDF中,ObjectInspector用于帮助Hive引擎将HQL转成MR Job时确定输入和输出的数据类型。Hive语句会生成MapReduce Job执行,所以使用的是Hadoop数据格式,不是编写UDF的Java的数据类型,比如Java的int在Hadoop为IntWritable,String在Hadoop为Text格式,所以我们需要将UDF内的Java数据类型转成正确的Hadoop数据类型以支持Hive将HQL生成MapReduce Job。

继承 GenericUDF 后,必须实现以下三个方法:

public class MyCountUDF extends GenericUDF {private PrimitiveObjectInspector.PrimitiveCategory[] inputType;private transient ObjectInspectorConverters.Converter intConverter;private transient ObjectInspectorConverters.Converter longConverter;// 初始化@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {}// DeferredObject封装实际参数的对应Writable类@Overridepublic Object evaluate(DeferredObject[] deferredObjects) throws HiveException {}// 函数信息@Overridepublic String getDisplayString(String[] strings) {}
}

initialize()方法只在 GenericUDF 初始化时被调用一次,执行一些初始化操作,包括:参数个数检查;参数类型检查与转换;确定返回值类型。

a. 参数个数检查;

可通过 arguments 数组的长度来判断函数参数的个数:

//  检查该记录是否传过来正确的参数数量,arguments的长度不为2时,则抛出异常if (arguments.length != 2) {throw new UDFArgumentLengthException("arrayContainsExample only takes 2 arguments: List<T>, T");}
b. 参数类型检查与转换;

针对该UDF的每个参数,initialize()方法都会收到一个对应的ObjectInspector参数,通过遍历ObjectInspector数组检查每个参数类型,根据参数类型构造ObjectInspectorConverters.Converter,用于将Hive传递的参数类型转换为对应的Writable封装对象ObjectInspector,供后续统一处理。

ObjectInspector内部有一个枚举类 Category,代表了当前 ObjectInspector 的类型。

public interface ObjectInspector extends Cloneable {public static enum Category {PRIMITIVE, // Hive原始类型LIST, // Hive数组MAP, // Hive MapSTRUCT, // 结构体UNION // 联合体};
}

Quark原始类型又细分了多种子类型,PrimitiveObjectInspector 实现了 ObjectInspector,可以更加具体的表示对应的Hive原始类型。

public interface PrimitiveObjectInspector extends ObjectInspector {/*** The primitive types supported by Quark.*/public static enum PrimitiveCategory {VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,UNKNOWN};
}

参数类型检查与转换示例:

for (int i = 0; i < length; i++) {       // 遍历每个参数ObjectInspector currentOI = arguments[i];ObjectInspector.Category type = currentOI.getCategory();     // 获取参数类型if (type != ObjectInspector.Category.PRIMITIVE) {         // 检查参数类型throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);}PrimitiveObjectInspector.PrimitiveCategory primitiveType =((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();inputType[i] = primitiveType;switch (primitiveType) {        // 参数类型转换case INT:if (intConverter == null) {ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);}break;case LONG:if (longConverter == null) {ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);}break;default:throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);}
}
c. 确定函数返回值类型

initialize() 需要 return 一个 ObjectInspector 实例,用于表示自定义UDF返回值类型。initialize() 的返回值决定了 evaluate() 的返回值类型。创建ObjectInspector时,不要用new的方式创建,应该用工厂模式去创建以保证相同类型的ObjectInspector只有一个实例,且同一个ObjectInspector可以在代码中多处被使用。

// 自定义UDF返回值类型为Long
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
完整的 initialize() 函数
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {int length = arguments.length;inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];for (int i = 0; i < length; i++) {ObjectInspector currentOI = arguments[i];ObjectInspector.Category type = currentOI.getCategory();if (type != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);}PrimitiveObjectInspector.PrimitiveCategory primitiveType =((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();inputType[i] = primitiveType;switch (primitiveType) {case INT:if (intConverter == null) {ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);}break;case LONG:if (longConverter == null) {ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);}break;default:throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);}}return PrimitiveObjectInspectorFactory.writableLongObjectInspector;}

evaluate()方法是GenericUDF的核心方法,自定义UDF的实现逻辑。代码实现步骤可以分为三部分:参数接收;自定义UDF核心逻辑;返回处理结果。

第一步:参数接收

evaluate() 的参数就是 自定义UDF 的参数。

/*** Evaluate the GenericUDF with the arguments.** @param arguments*          The arguments as DeferedObject, use DeferedObject.get() to get the*          actual argument Object. The Objects can be inspected by the*          ObjectInspectors passed in the initialize call.* @return The*/
public abstract Object evaluate(DeferredObject[] arguments)throws HiveException;

通过源码注释可知,DeferedObject.get() 可以获取参数的值。

/*** A Defered Object allows us to do lazy-evaluation and short-circuiting.* GenericUDF use DeferedObject to pass arguments.*/
public static interface DeferredObject {void prepare(int version) throws HiveException;Object get() throws HiveException;
};

再看看 DeferredObject 的源码,DeferedObject.get() 返回的是 Object,传入的参数不同,会是不同的Java类型。

第二步:自定义UDF核心逻辑

这一部分根据实际项目需求自行编写。

第三步:返回处理结果

这一步和 initialize() 的返回值一一对应,基本类型返回值有两种:Writable类型 和 Java包装类型:

  • 在 initialize 指定的返回值类型为 Writable类型 时,在 evaluate() 中 return 的就应该是对应的 Writable实例。
  • 在 initialize 指定的返回值类型为 Java包装类型 时,在 evaluate() 中 return 的就应该是对应的 Java包装类实例。

evalute()示例

@Overridepublic Object evaluate(DeferredObject[] deferredObjects) throws HiveException {LongWritable out = new LongWritable();for (int i = 0; i < deferredObjects.length; i++) {PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];Object param = deferredObjects[i].get();switch (type) {case INT:Object intObject = intConverter.convert(param);out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));break;case LONG:Object longObject = longConverter.convert(param);out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));break;default:throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);}}return out;}

getDisplayString() 返回的是 explain 时展示的信息。这里不能return null,否则可能在运行时抛出空指针异常。

@Override
public String getDisplayString(String[] strings) {return "my_count(" + Joiner.on(", ").join(strings) + ")";
}
自定义GenericUDF完整示例
@Description(name="my_count",value="my_count(...) - count int or long type numbers",extended = "Example :\n    >select my_count(3, 5);\n    >8\n    >select my_count(3, 5, 25);\n    >33"
)
public class MyCountUDF extends GenericUDF {private PrimitiveObjectInspector.PrimitiveCategory[] inputType;private transient ObjectInspectorConverters.Converter intConverter;private transient ObjectInspectorConverters.Converter longConverter;@Overridepublic ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {int length = objectInspectors.length;inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];for (int i = 0; i < length; i++) {ObjectInspector currentOI = objectInspectors[i];ObjectInspector.Category type = currentOI.getCategory();if (type != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);}PrimitiveObjectInspector.PrimitiveCategory primitiveType =((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();inputType[i] = primitiveType;switch (primitiveType) {case INT:if (intConverter == null) {ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);}break;case LONG:if (longConverter == null) {ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);}break;default:throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);}}return PrimitiveObjectInspectorFactory.writableLongObjectInspector;}@Overridepublic Object evaluate(DeferredObject[] deferredObjects) throws HiveException {LongWritable out = new LongWritable();for (int i = 0; i < deferredObjects.length; i++) {PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];Object param = deferredObjects[i].get();switch (type) {case INT:Object intObject = intConverter.convert(param);out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));break;case LONG:Object longObject = longConverter.convert(param);out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));break;default:throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);}}return out;}@Overridepublic String getDisplayString(String[] strings) {return "my_count(" + Joiner.on(", ").join(strings) + ")";}
}

UDTF

UDTF函数作用都是输入一行数据,将该行数据拆分、并返回多行数据。不同的UDTF函数只是拆分的原理不同、作用的数据格式不同而已。

适用场景

  1. 流应用中对数据处理,如:字符串解析,hyperbase数据删除,时间段去重,时间段统计
  2. 数仓数集应用中需要将单行转换为多行,inceptor内置多种UDTF,如:explode,inline,json_tuple等

注意:返回UDTF结果的同时查询其他对象,须引用关键字 LATERAL VIEW

UDTF开发要点

1. 实现UDTF函数需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

2. 然后重写/实现initialize, process, close三个方法

A. initialize初始化验证,返回字段名和字段类型

initialize初始化:UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型,名称)。initialize针对任务调一次, 作用是定义输出字段的列名、和输出字段的数据类型。

initialize方法示例
@Override/*** 返回数据类型:StructObjectInspector* 定义输出数据的列名、和数据类型。*/public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {List<String> fieldNames = new ArrayList<String>(); //fieldNames为输出的字段名fieldNames.add("world");List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //类型,列输出类型fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}
B. 初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回

process:初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果写出。process传入一行数据写出去多次,传入一行数据输出多行数据,如:mapreduce单词计数。process针对每行数据调用一次该方法。在initialize初始化的时候,定义输出字段的数据类型是集合,调用forward()将数据写入到一个缓冲区,写入缓冲区的数据也要是集合。

process方法示例
//数据的集合private List<String> dataList = new ArrayList<String>();/*** process(Object[] objects) 参数是一个数组,但是hive中的explode函数接受的是一个,一进多出* @param args* @throws HiveException*/public void process(Object[] args) throws HiveException {//我们现在的需求是传入一个数据,在传入一个分割符//1.获取数据String data = args[0].toString();//2.获取分割符String splitKey = args[1].toString();//3.切分数据,得到一个数组String[] words = data.split(splitKey);//4.想把words里面的数据全部写出去。类似在map方法中,通过context.write方法// 定义是集合、写出去是一个string,类型不匹配,写出也要写出一个集合for (String word : words) {//5.将数据放置集合,EG:传入"hello,world,hdfs"---->写出需要写n次,hello\worlddataList.clear();//清空数据集合dataList.add(word);//5.写出数据的操作forward(dataList);}}
C. 最后调用close()方法进行清理工作

最后close()方法调用,对需要清理的方法进行清理,close()方法针对整个任务调一次

UDTF DEMO

下面UDTF 实现的是字符串的分拆,多行输出

package io.transwarp.udtf;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class SplitUDF extends GenericUDTF{@Overridepublic void close() throws HiveException {// TODO Auto-generated method stub}@Overridepublic StructObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {// TODO Auto-generated method stubif(arg0.length != 1){throw new UDFArgumentLengthException("SplitString only takes one argument");}if(arg0[0].getCategory() != ObjectInspector.Category.PRIMITIVE){throw new UDFArgumentException("SplitString only takes string as a parameter");}ArrayList<String> fieldNames = new ArrayList<>();ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();fieldNames.add("col1");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col2");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}@Overridepublic void process(Object[] arg0) throws HiveException {// TODO Auto-generated method stubString input = arg0[0].toString();String[] inputSplits = input.split("#");for (int i = 0; i < inputSplits.length; i++) {try {String[] result = inputSplits[i].split(":");forward(result);} catch (Exception e) {continue;}}}
}

执行效果如下:

如何使用UDTF

将UDTF打包后,放在inceptor server 所在节点之上(建议不要放在/user/lib/hive/lib/下),之后在连接inceptor执行以下命令,生成临时函数(server有效,重启inceptor失效)

add jar /tmp/timestampUDF.jar
drop temporary function timestamp_ms;
create temporary function timestamp_ms as 'io.transwarp.udf.ToTimestamp';select date, timestamp_ms(date) from table1;

 UDAF

正如前面所说,UDAF是由用户自主定义的,虽然UDAF的使用可以方便对数据的运算处理,但是使用它的数量建议不要过多,因为UDAF的数量增长和性能下降成线性关系。另外,如果存在大量的嵌套UDAF,系统的性能也会降低,建议用户在可能的情况下写一个没有嵌套或者嵌套较少的UDAF实现相同功能来提高性能。

UDAF开发要点

1. 用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;

2. 用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。

3. 一个计算函数必须实现的5个方法的具体含义如下:

  • - init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
  • - iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。
  • - terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
  • - merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
  • - terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

UDAF DEMO

下面的UDAF DEMO目标是实现找到最大值功能,以表中某一字段为参数,返回最大值。

package udaf.transwarp.io;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;//UDAF是输入多个数据行,产生一个数据行
//用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类
public class MaxiNumber extends UDAF{public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{//最终结果private IntWritable result;//负责初始化计算函数并设置它的内部状态,result是存放最终结果的@Overridepublic void init() {result=null;}//每次对一个新值进行聚集计算都会调用iterate方法public boolean iterate(IntWritable value){if(value==null)return false;if(result==null)result=new IntWritable(value.get());elseresult.set(Math.max(result.get(), value.get()));return true;}//Hive需要部分聚集结果的时候会调用该方法//会返回一个封装了聚集计算当前状态的对象public IntWritable terminatePartial(){return result;}//合并两个部分聚集值会调用这个方法public boolean merge(IntWritable other){return iterate(other);}//Hive需要最终聚集结果时候会调用该方法public IntWritable terminate(){return result;}}
}

UDF 的打包与使用

操作前提

将开发好自定义UDF函数的项目打包成jar包,注意:jar 包中的自定义UDF 类名,不能和现有UDF 类,在包名+类名上,完全相同

部署方式

常见的UDF部署方式有以下三种:

  • 把UDF固化到image里,重新打image(推荐);
  • 其次是通过创建临时UDF(add jar + temporary function)的方式;
  • 创建永久UDF(hdfs jar+permanent function)的方式(可行,但不是很推荐);

方式一 固化UDF

  • 视频示例(仅作示范,详情查看下方文字)

此方式的核心逻辑是把UDF jar包放到image的/usr/lib/inceptor/下面,重新制作image。具体步骤如下:

以更换inceptor中的inceptor_2.10-1.1.0-transwarp-6.1.0.jar为例:

1. 进入inceptor image

docker run -it <inceptor_image_id> bash

2. 打开另一个terminal

3. 替换container中的jar包

docker cp <jar包名称> <container_id>:/usr/lib/inceptor/ <jar包名称>

image.png

4. commit修改记录

docker commit <container_id> REPOSITORY:TAG

5. 打开manager管理页面重新启动inceptor服务

6.重启完成后即可查看quark server的pod下/usr/lib/inceptor/是否有新增的jar包

方式二 创建临时UDF

  • 视频示例(仅作示范,详情查看下方文字)

1. 查看已存在jar包

LIST JAR;

2. 添加jar包

ADD JAR[S] <local_path>;
// Local_path是jar包所在Inceptor server节点的路径。

3. 创建临时UDF

CREATE TEMPORARY FUNCTION [<db_name>.]<function_name> AS <class_name>;

临时UDF在Inceptor重启后失效。如果需要更新临时UDF,需要重启Inceptor重新创建该临时UDF。

示例:

4. 验证临时UDF

SELECT [<db_name>.]<function_name>() FROM SYSTEM.DUAL;

5. 删除临时UDF

DROP TEMPORARY FUNCTION <if exists> <function_name>;

方式三 创建永久UDF

建议优先选取前两种方式,此方式虽然可行但不推荐,故仅介绍基础命令,暂无视频提供。

1. 查看已存在jar包

LIST JAR;

2. 添加jar包

ADD JAR[S] <local_or_hdfs_path>;
//Local_path是Inceptor server节点的路径。保证hive用户对jar所在的目录有读权限。

3. 创建永久UDF

CREATE PERMANENT FUNCTION [<db_name>.]<function_name> AS <class_name>;

如果Inceptor不在local mode,那么资源的地址也必须是非本地URI,比如HDFS地址。

4. 验证永久UDF

SELECT [<db_name>.]<function_name>() FROM SYSTEM.DUAL;

5. 删除永久UDF

DROP PERMANENT FUNCTION <if exists> <function_name>;

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/36546.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

打破生态「孤岛」,Catizen将开启Telegram小游戏2.0时代?

Catizen&#xff1a;引领Telegram x TON生态的顶级猫咪链游 在区块链游戏领域&#xff0c;吸引玩家的首要因素往往是游戏的趣味性。然而&#xff0c;仅靠趣味性无法评估一个项目的长期价值和发展潜力。真正能在区块链游戏市场中取得长久成功的项目&#xff0c;无一例外都依靠扎…

【消息队列】RabbitMQ集群原理与搭建

目录 前言1、集群搭建1.1、安装RabbitMQ1.1.1、前置要求1.1.2、安装Erlang环境①创建yum库配置文件②加入配置内容③更新yum库④正式安装Erlang 1.1.3、安装RabbitMQ1.1.4、RabbitMQ基础配置1.1.5、收尾工作 1.2、克隆VMWare虚拟机1.2.1、目标1.2.2、克隆虚拟机1.2.3、给新机设…

智能充电桩网关,构建高效充电网络

近年来我国新能源汽车的增长速度出现明显的上升趋势&#xff0c;但是其充电桩的发展还比较缓慢。目前在充电桩系统设计期间仍存在一些问题&#xff0c;主要表现在充电设施短缺、充电难等问题&#xff0c;这些问题的发生均会在一定程度上限制新能源汽车的发展&#xff0c;这就需…

navicat Premium发布lite免费版本了

Navicat Premium发布lite免费版本了&#xff0c;下面是完整功能对比链接 Navicat Premium 功能列表 | Navicat 免费版本下载链接如下&#xff1a; Navicat | 免费下载 Navicat Premium Lite 开发功能完全够用&#xff0c;点赞。 dbeaver该如何应对。

振弦采集仪在大型工程安全监测中的应用探索

振弦采集仪在大型工程安全监测中的应用探索 振弦采集仪是一种用于监测结构振动和变形的设备&#xff0c;它通过采集振弦信号来分析结构的动态特性。在大型工程安全监测中&#xff0c;振弦采集仪具有重要的应用价值&#xff0c;可以帮助工程师和监测人员实时了解结构的状况&…

如何在线上快速定位bug(干货)

想必有许多人都想我刚进公司一样不会快速定位线上bug吧&#xff0c;不会快速定位bug会大大降低我们的开发效率&#xff0c;随之而来的就是工作质量下降、业绩下滑。 我总结了一些我常用的线上定位技巧&#xff0c;希望能帮助到大家&#xff01; 我这里以使用阿里云日志分析作…

Attention步骤

一个典型的Attention思想包括三部分&#xff1a;Qquery、Kkey、Vvalue。 Q是query&#xff0c;是输入的信息&#xff1b;key和value成组出现&#xff0c;通常是原始文本等已有的信息&#xff1b;通过计算Q与K之间的相关性a&#xff0c;得出不同的K对输出的重要程度&#xff1b;…

2021年12月电子学会青少年软件编程 中小学生Python编程等级考试三级真题解析(选择题)

2021年12月Python编程等级考试三级真题解析 选择题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 1、小明在学习计算机时&#xff0c;学习到了一个十六进制数101,这个十六进制数对应的十进制数的数值是 A、65 B、66 C、256 D、257 答案&#xff…

为什么javaer认为后台系统一定要用java开发?

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「java的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“666”之后私信回复“666”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;公司有两个开发团队&#xf…

4年突破20亿,今麦郎如何持续策划凉白开极致产品力?

范总在方便面市场拥有30年的丰富经验&#xff0c;并曾创造过奇迹。1994年&#xff0c;他从冰糖生意进入方便面行业&#xff0c;创立今麦郎的前身华龙集团。当时&#xff0c;方便面市场已经进入红海阶段&#xff0c;市场上有上千家企业&#xff0c;康师傅和统一占据了80%的市场份…

计算机视觉-期末复习-简答/名词解释/综合设计

目录 第一讲--计算机/机器视觉概述 名词解释 简答 第二讲--图像处理概述 名词解释 简答 第三讲没划重点习题 第四讲--特征提取与选择 名词解释 简答 综合题 第五讲--不变特征 名词解释 简答 第六讲--物体分类与检测 简答 综合题 第七讲--视觉注意机制 简答 …

三角洲行动卡顿严重?这样快速解决三角洲行动国服卡顿问题

三角洲行动官方精心设计的游戏地图和敌人布局&#xff0c;加上“曼德尔砖”等目标导向性道具的引入&#xff0c;更是为玩家之间的竞技和争夺增添了无数的变数。每一次的争夺都如同是一场智慧与勇气的较量&#xff0c;让人热血沸腾&#xff0c;无法自拔。在这个战场上&#xff0…

第六篇:精通Docker Compose:打造高效的多容器应用环境

精通Docker Compose&#xff1a;打造高效的多容器应用环境 1. 引言 1.1 目的与重要性 在现代软件开发中&#xff0c;随着应用程序的复杂性不断增加&#xff0c;传统的单一容器部署方式已无法满足需求。Docker Compose作为一种强大的工具&#xff0c;专门用于定义和运行多容器…

用户中心项目全流程

企业做项目流程 需求分析 > 设计&#xff08;概要设计 、 详细设计&#xff09; > 技术选型 >初始化项目 / 引入需要的技术 > 写个小demo > 写代码 &#xff08;实现业务逻辑&#xff09; > 测试&#xff08;单元测试&#xff09;> 代码提交 / 代码评审 …

ClickHouse-Keeper安装使用

1.rpm 安装 clickhouse-keeper rpm -ivh clickhouse-keeper-23.8.11.28.x86_64.rpm 2.修改keeper的配置文件 vi /etc/clickhouse-keeper/keeper_config.xml修改部分参数 1.可修改日志等存储路径 2.增加监听配置 <listen_host>0.0.0.0</listen_host> 3.server_id…

HarmonyOS Next开发学习手册——层叠布局 (Stack)

概述 层叠布局&#xff08;StackLayout&#xff09;用于在屏幕上预留一块区域来显示组件中的元素&#xff0c;提供元素可以重叠的布局。层叠布局通过 Stack 容器组件实现位置的固定定位与层叠&#xff0c;容器中的子元素依次入栈&#xff0c;后一个子元素覆盖前一个子元素&…

【Spring】SpringCloudAlibaba学习笔记

Nacos Nacos是一个更易于构建云原生应用的动态服务发现/服务配置和服务管理平台核心功能: 服务注册: Nacos Client会通过发送REST请求向Nacos Server注册自己的服务, 提供自己的元数据, 如ip地址/端口等信息; Nacos Server收到注册请求后, 就会把这些信息存储在Map中服务心跳:…

Java毕业设计 基于SSM vue药店管理系统小程序 微信小程序

Java毕业设计 基于SSM vue药店管理系统小程序 微信小程序 SSM 药店管理系统小程序 功能介绍 用户 登录 注册 首页 药品信息 药品详情 加入购物车 立即购买 收藏 购物车 立即下单 新增收货地址 我的收藏管理 用户充值 我的订单 留言板 管理员 登录 个人中心 修改密码 个人信息…

分布式并行最短路径

此前我 “自然而然” 做了两个小算法&#xff0c;最短路径 和 最小生成树&#xff0c;我喜欢大自然的第一性原理&#xff0c;最小作用量&#xff0c;梯度下降&#xff0c;爆炸&#xff0c;河水泛滥&#xff0c;本质上都是一回事。 大自然另一风格是分布式并行&#xff0c;没外…

Java使用poi生成word文档的简单实例

Java使用poi生成word文档的简单实例 生成的效果如下&#xff1a; 用到的poi的简单的知识 新建一个word对象 //新建文件 XWPFDocument document new XWPFDocument();新建段落以及文字样式 //创建段落 XWPFParagraph paragraph document.createParagraph(); paragraph.se…