一、自定义函数的实现方式
1.创建临时函数
(1)创建maven项目,并加入依赖
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
(2)编写函数的实现逻辑的代码
(3)打成jar包并上传至Linux虚拟机(带依赖打包)
(4)在hive shell中,使用 add jar 路径
将jar包作为资源添加到hive环境中
add jar jar包路径;
(5)使用jar包资源注册一个临时函数,fxxx1是函数名,'MyUDF'是主类名
create temporary function fxxx1 as 'MyUDF';
(6)可以使用show functions 查看函数是否被创建
2.创建永久函数
(1)将jar上传HDFS:
hadoop fs -put jar包路径
(2)在hive命令行中创建永久函数:
create function 函数名 as '主类名路径' using jar 'hdfs:/bigdata29/jars/hive-1.0-jar-with-dependencies.jar(hdfs上的jar包路径)';
(3)删除永久函数
drop function 函数名;
3.实现UDF自定义函数
(旧版本方式):
import org.apache.hadoop.hive.ql.exec.UDF;/*旧版本实现UDF自定义函数的时候,需要将自己的类继承UDF类*/
public class MyUDFDemo1 extends UDF {/*实现evaluate函数将来hive调用自定义函数的时候,实际上调用的是该类中的evaluate函数evaluate函数的参数就是将来sql语句传入的列值evaluate函数的返回值就是自定义函数的返回值需求:传入一个字符串,返回一个新的字符串举例:SMITH --> 数加:SMITHselect xxx(ename) from emp;*/public String evaluate(String str) {// 返回字符串前缀"数加:"加上输入参数strreturn "数加:" + str;}public String evaluate(int number) {// 返回字符串前缀"数加:"加上输入参数strif(number>=90 && number<=100){return "优秀";}else if(number>=70 && number<90){return "良好";}else if(number>=60){return "及格";}else {return "不及格";}}}
(新版本方式):
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;public class MyGenericUDFDemo1 extends GenericUDF {/*这个方法主要是对自定义的UDF函数进行初始化,目的是指定调用完函数返回的值的类型需求:传入一个字符串,返回一个新的字符串举例:SMITH --> 数加:SMITH*/@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {//使用PrimitiveObjectInspectorFactory工厂类,获取String类型的ObjectInspectorreturn PrimitiveObjectInspectorFactory.javaStringObjectInspector;}/*该方法是自定义UDF的核心方法,目的是实现自定义UDF的逻辑是在initialize方法之后执行的arguments将来会有多个参数,但是UDF函数只有一个参数,所以arguments[0]就是传入的第一个参数*/@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException { //xiaohu("smith") xiaohu(60)String output = ""; //arguments[0]获取第一个参数值,通过get()方法获取其中的参数具体值Object o = arguments[0].get(); // Object o = 100if (o != null) { //如果将来调用自定义函数不传值的话,则o为null,需要加入判断,防止空指针异常if (o instanceof Text) {output = "数加:" + o;}else if(o instanceof IntWritable){ // hive中传入整数的时候,底层默认是IntWritable类型的int score = ((IntWritable) o).get();if(score>=90 && score<=100){output = "优秀";}else if(score>=70 && score<90){output = "良好";}else if(score>=60){output = "及格";}else {output = "不及格";}}}return output;}@Overridepublic String getDisplayString(String[] children) {return "这是我们自己使用新版本写法自定义的UDF函数";}
}
4.实现UDTF自定义函数
案例1:
将一行数据 M1001#xiaohu#S324231212,lkd#M1002#S2543412432,S21312312412#M1003#bfy
变为以下形式(三行):
1001 xiaohu 324231212
1002 lkd 2543412432
1003 bfy 21312312412
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;/*编写UDTF的类需要自己自定义的类继承自GenericUDTF*/
public class MyUDTFDemo1 extends GenericUDTF {/*public StructObjectInspector initialize(StructObjectInspector argOIs)指定输出的列的名称以及列的类型M1001#xiaohu#S324231212,sp#M1002#S2543412432,S21312312412#M1003#dyj1001 xiaohu 324231212*/@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {//创建一个List集合存储结果列的名字ArrayList<String> colNames = new ArrayList<>();//创建一个集合存储每一列的数据类型ArrayList<ObjectInspector> colTypes = new ArrayList<>();//向集合中添加元素,设置列的名字以及类型colNames.add("id");colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);colNames.add("name");colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);colNames.add("cardId");colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);//返回一个对象,该对象封装了列的名字以及列的类型return ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colTypes);}/*process方法主要是在调用函数的时候,底层会进行调用处理传入的列数据M1001#xiaohu#S324231212,sp#M1002#S2543412432,S21312312412#M1003#dyjargs: 接收函数调用时传入的列数据,从索引0开始*/@Overridepublic void process(Object[] args) throws HiveException {//创建一个数组,存储每一列的数据//因为结果一行数据中有3列,需要创建一个长度为3的数组,用来存储我们处理过的一行三列数据String[] rows = new String[3];//args[0]是传入的第一个列数据String col = args[0].toString();String[] infos = col.split(",");//遍历切分后的数组,得到每一个用户信息for (String info : infos) {String[] strings = info.split("#");for (String i : strings) {if(i.startsWith("M")){ // M1001rows[0] = i.substring(1);}else if(i.startsWith("S")){rows[2] = i.substring(1);}else {rows[1] = i;}}//forward(Object o) 调用概述方法,将输出的一行数据,封装成一个数组,传入到forward方法中,给hive后续处理forward(rows);}}@Overridepublic void close() throws HiveException {//这里一般是用作释放在initialize方法中创建的资源}
}
案例二:
字段:id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12 共13列
数据:
a,1,2,3,4,5,6,7,8,9,10,11,12
b,11,12,13,14,15,16,17,18,19,20,21,22
c,21,22,23,24,25,26,27,28,29,30,31,32
转成3列:id,hours,value
例如:
a,1,2,3,4,5,6,7,8,9,10,11,12
a,0时,1
a,2时,2
a,4时,3
a,6时,4
......
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;/*输入:a,1,2,3,4,5,6,7,8,9,10,11,12输出:a 0时 1a 2时 2a 4时 3..a 24时 12*/
public class MyUDTFDemo3 extends GenericUDTF {@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {//创建一个List集合存储结果列的名字ArrayList<String> colNames = new ArrayList<>();//创建一个集合存储每一列的数据类型ArrayList<ObjectInspector> colTypes = new ArrayList<>();//向集合中添加元素,设置列的名字以及类型colNames.add("id");colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);colNames.add("time");colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);colNames.add("value");colTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);//返回一个对象,该对象封装了列的名字以及列的类型return ObjectInspectorFactory.getStandardStructObjectInspector(colNames, colTypes);}/*输入:a,1,2,3,4,5,6,7,8,9,10,11,12输出:a 0时 1a 2时 2a 4时 3..a 24时 12*/@Overridepublic void process(Object[] args) throws HiveException { //作用在每一行上的//创建一个数组,存储一行三列的数据String[] rows = new String[3];//因为接收了13列的数据,args的长度是13 索引最大值是12//第一列是监测点编号String id = args[0].toString();rows[0] = id;for (int i = 1, j = 0; i < args.length; i++, j += 2) {rows[1] = j + "时";rows[2] = args[i].toString();forward(rows);}}@Overridepublic void close() throws HiveException {}
}