Spark SQL 中UDF的讲解
User Define Function, 用户自定义函数,简称UDF,存在与很多组件中。
在使用Sparksql的人都遇到了Sparksql所支持的函数太少了的难处,除了最基本的函数,Sparksql所能支撑的函数很少,肯定不能满足正常的项目使用,UDF可以解决问题。SparkSQL中的UDF相当于是1进1出,UDAF相当于是多进一出,类似于聚合函数。
开窗函数一般分组取topn时常用。
可以理解为自己定义函数,来获取自己想要的结果!
案例借鉴于网络!
需求:计算文本中每一个单词的长度!
代码:
Scala版本:
package com.bynear.Scalaimport org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object UDF {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SparkSQL_UDF").setMaster("local")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)val names = Array("刘亦菲", "张柏芝", "冯提模","陈一发儿")val nameRDD = sc.parallelize(names, 5)val nameRowRDD = nameRDD.map(name => Row(name))val structType = StructType(Array(StructField("name", StringType, true)))val namesDF = sqlContext.createDataFrame(nameRowRDD, structType)namesDF.registerTempTable("names")sqlContext.udf.register("strLen", (str: String) => str.length)sqlContext.sql("select name,strLen(name) as length from names").show()sqlContext.sql("select name,strLen(name) as length from names").collect().foreach(println)} }运行结果:
+----+------+
|name|length|
+----+------+
| 刘亦菲| 3|
| 张柏芝| 3|
| 冯提模| 3|
|陈一发儿| 4|
+----+------+
|name|length|
+----+------+
| 刘亦菲| 3|
| 张柏芝| 3|
| 冯提模| 3|
|陈一发儿| 4|
+----+------+
[刘亦菲,3]
[张柏芝,3]
[冯提模,3]
[陈一发儿,4]
[张柏芝,3]
[冯提模,3]
[陈一发儿,4]
Java版本:
package com.bynear.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; public class JavaUDF {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaUDF").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc.sc()); ArrayList<String> names = new ArrayList<String>(); names.add("刘亦菲"); names.add("张柏芝"); names.add("冯提模"); names.add("陈一发儿"); JavaRDD<String> nameRDD = sc.parallelize(names); JavaRDD<Row> nameRowRDD = nameRDD.map(new Function<String, Row>() {@Override public Row call(String line) throws Exception {return RowFactory.create(String.valueOf(line)); }}); /** * 使用动态编程方式,将RDD转换为Dataframe */ ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame nameDF = sqlContext.createDataFrame(nameRowRDD, structType); /** * 注册临时表 */ nameDF.registerTempTable("user"); /** * 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2, 表明包含几个参数传入 * UDF1<String, Integer> 表示 传入参数 String 输出参数为 Integer * call方法为 自定义的函数! * DataTypes.IntegerType 必须与输出参数的类型一致即 Integer */ sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {@Override public Integer call(String s) throws Exception {return s.length(); }}, DataTypes.IntegerType); /** * select name ,StrLen(name) as length from user * 在临时表user中 查找name StrLen(name) == name的长度 * StrLen(name) as length 表示将获取到的name的长度 例如15 15作为一列 as length 列名为 length */ sqlContext.sql("select name ,StrLen(name) as length from user").show(); Row[] rows = sqlContext.sql("select name ,StrLen(name) as length from user").collect(); for (Row row : rows) {System.out.println(row); }sc.close(); } }输出结果:同上!
Java版本中,主要之一到UDFX 方法,以及传入参数的个数类型,以及输出类型,最终要的是文本最后的
DataTypes.IntegerType 类型要与输出类型相同!