1.spark 配置文件
spark-default.conf
spark.yarn.historyServer.address = xiemeng-01:18080
spark.history.port=18080
hive-site.xml
<configuration><property><name>javax.jdo.option.ConnectionURL</name>
</property><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.cj.jdbc.Driver</value>
</property><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value>
</property><property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value>
</property><property><name>hive.metastore.warehouse.dir</name><value>hdfs://mycluster/hive/warehouse</value>
</property><property><name>hive.metastore.schema.verification</name><value>false</value>
</property><property><name>datanucleus.schema.autoCreateAll</name><value>true</value>
</property>
<property><!-- hiveserver2用户名 --><name>beeline.hs2.connection.user</name><value>hive2</value></property><property><!-- hiveserver2密码 --><name>beeline.hs2.connection.password</name><value>hive2</value></property><property><!-- hiveserver2端口 --><name>beeline.hs2.connection.hosts</name><value>xiemeng-01:10000</value></property>
</configuration>
2.spark 命令
export JAVA_HOME=/home/xiemeng/software/jdk1.8.0_251
export SPARK_HOME=/home/xiemeng/software/spark
export SPARK_WORKER_CORES=2export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=xiemeng-01,xiemeng-02,xiemeng-03
-Dspark.deploy.zookeeper.dir=/spark"export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://xiemeng-01:8020
-Dspark.history.retainedApplications=30"spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
spark-examples_2.12-3.0.1.jarspark-submit --class WordCountTest \
--master yarn \
--deploy-mode cluster \
spark-review-demo-1.0-SNAPSHOT.jar \
10
1.spark rdd dataframe
/*** spark sql'测试类** @author xiemeng* @since 2020-11-30*/
object DFTest {case class People(name:String,age:Int,address:String);def main(args: Array[String]): Unit = {var sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"));val res = sc.makeRDD(List(People("zhangsan",20,"陕西省"),People("wangwu",10,"北京市")));val spark = SparkSession.builder().appName("test").getOrCreate();//第一种创建dfval df2 = spark.read.json("data/student.json");//df2.show();//第二种创建dfval df = spark.createDataFrame(res);//df.show();df.createTempView("student")//println("sql查询如下");//df.select("name").show();//spark.sql("select name,age from student").show();import spark.implicits._val numRDD = sc.makeRDD(List(("zhangsan",20,"男"),("wangwu",10,"男")))var numDF = numRDD.toDF("name","age","sex");//numDF.show();val peopleRDD:RDD[People] = numRDD.map{p => People(p._1,p._2,null)}var peopleDF = peopleRDD.toDF()//peopleDF.show();var peopleRow:RDD[Row] = peopleDF.rdd;peopleRow.foreach(r=>println(r.getString(0)+"\t"+r.getInt(1)+"\t"+r.getString(2)));//从df创建dsval ds = df.as[People];//ds.show();//从数据直接创建ds//spark.createDataset(res).show();var peopleDS = numRDD.map {case (name, age, address) =>People(name,age,address)}.toDS();peopleDS.show();//spark.stop();spark.udf.register("addName",(name:String) => "Name:"+name );spark.sql("select addName(name),age from student").show();}
}
object HotWordSearch {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").appName("test").getOrCreate()val filePath = "file:///D:\\code\\spark-start-demo\\src\\main\\resources\\input\\txt\\hotwordsearch.txt";val hotDateTuple = spark.sparkContext.textFile(filePath).map(_.split(",")).map(line => {val date = line(0);val user = line(1);val hot = line(2);((date, hot), user)})val groupRDD = hotDateTuple.groupByKey()val uvRDD = groupRDD.map(line => {val distinctUser = new ListBuffer[String];val dataAndKeyWord = line._1;val users = line._2.iterator;while (users.hasNext) {val user = users.next();if (!distinctUser.contains(user)) {distinctUser.add(user);}}val size = distinctUser.size();(dataAndKeyWord, size);})val rowDatas = uvRDD.map(line => {Row(line._1._1, line._1._2, line._2);})rowDatasvar structType = types.StructType(List(StructField("date",StringType),StructField("keyword",StringType),StructField("uv",IntegerType)))val dataFrame = spark.createDataFrame(rowDatas, structType);dataFrame.createTempView("date_keyword_uv");spark.sql("""|select date,keyword,uv|from (| select date,| keyword,| uv,| row_number() over ( partition by date order by uv desc) as rank| from date_keyword_uv|) t|where t.rank <=3""".stripMargin).show();}
}
package com.example.demoimport org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregatorcase class StudentBuffer(var sum: BigInt, var count: Int);case class Stu(name: String, age: BigInt);class UserAvg extends Aggregator[Stu, StudentBuffer, Double] {override def zero: StudentBuffer = {var sb = new StudentBuffer(0L, 0);sb}/*** 聚合** @param b* @param a* @return*/override def reduce(b: StudentBuffer, a: Stu): StudentBuffer = {b.sum = b.sum + a.age;b.count = b.count + 1;b}/*** 合并** @param b1* @param b2* @return*/override def merge(b1: StudentBuffer, b2: StudentBuffer): StudentBuffer = {b1.sum = b1.sum + b2.sum;b1.count = b1.count + b2.count;b1}/*** 业务处理** @param reduction* @return*/override def finish(reduction: StudentBuffer): Double = {reduction.sum.toDouble / reduction.count;}override def bufferEncoder: Encoder[StudentBuffer] = Encoders.product;override def outputEncoder: Encoder[Double] = Encoders.scalaDouble;
}
读数据库
读csv
object ReadCsv {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").appName("csv").config("spark.sql.crossJoin.enabled", "true").getOrCreate();val schema: StructType = new StructType().add(new StructField("a0", StringType, true)).add(new StructField("a1", DoubleType, true));print(schema);val df = spark.read.format("csv").option("header", true).option("inferSchema", true).option("sep", ",").schema(schema).load("file:///D:\\code\\spark-start-demo\\src\\main\\resources\\input\\csv")df.show();df.registerTempTable("t1");spark.sql("select a0, percentile_approx(a1,0.95,200) from t1 group by a0").show();///1,3 ,5//2*0 =0.9 i = 1 j = 9//(1-0.9)*5+ 0.9*5 = 4.5 +0.t//94.05 = (1 -0.05) *95 + 0.05*96 = 95.05//93.1 = (1-0.1)* 194 + 0.1*195 = 195.05}}
读数据库
object JdbcSink {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("jdbc").master("local").getOrCreate();val url = "jdbc:mysql://192.168.64.1:3306/student?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT";val driver = "com.mysql.cj.jdbc.Driver";val user = "root";writeData(spark,driver,url,user);val rowDatas = readData(spark, url, driver);val jdbcDF = readData2(spark, url, driver);jdbcDF.show();//val jdbcDF2 = readData3(spark, url, driver);//jdbcDF2.show();}def readData(spark: SparkSession, url: String, driver: String): DataFrame = {val rowDatas = spark.read.format("jdbc").option("url", url).option("driver", driver).option("dbtable", "student").option("user", "root").option("password", "123456").load();return rowDatas;}def readData2(spark: SparkSession, url: String, driver: String): DataFrame = {val map = Map(("url", url),("driver", driver),("user", "root"),("password", "123456"),("dbtable", "student"));val jdbcDF = spark.read.format("jdbc").options(map).load()jdbcDF;}def readData3(spark: SparkSession, url: String, driver: String): DataFrame = {val properies = new Properties();properies.put("driver", driver);properies.put("user", "root");properies.put("password", "123456");val frame = spark.read.jdbc(url, "student", properies)frame;}def writeData(spark: SparkSession, driver: String, url: String, user: String) = {val list = Array("周瑜,20,1", "诸葛亮,30,0");val structType = StructType(List(StructField("name", StringType),StructField("age", IntegerType),StructField("sex",StringType)));val rowDatas = spark.sparkContext.parallelize(list).map(_.split(","))val studentRDD: RDD[Row] = rowDatas.map(line => Row(line(0).toString, line(1).toInt,line(2).toString))val dataFrame = spark.createDataFrame(studentRDD, structType)dataFrame.write.mode("append").format("jdbc").option("url", url).option("user", user).option("password", "123456").option("driver", driver).option("dbtable", "student").save();}
}
读kafaka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamKafka {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("test");val ssc = new StreamingContext(conf, Seconds(1))ssc.checkpoint("file:///D:\\code\\spark-start-demo\\spark-warehouse")val kafkaTopics = Array("test");val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.64.128:9092,192.168.64.130:9092,192.168.64.131:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "test","enable.auto.commit" -> (false: java.lang.Boolean))val inputStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](kafkaTopics, kafkaParams));val lineDstream = inputStream.map(record => (record.key(), record.value()))val word = lineDstream.map(_._2).flatMap(_.split(" "))val pair = word.map(word => (word, 1))val result: DStream[(String, Int)] = pair.updateStateByKey(updateFunc);result.print();ssc.start();ssc.awaitTermination();}def updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.fold(0)(_ + _);val previousCount = state.getOrElse(0);Some(currentCount + previousCount);}}
读hbase
ackage com.example.demoimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableInputFormat}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object HbaseReadTest {def main(args: Array[String]): Unit = {var conf = new SparkConf().setMaster("local[8]").setAppName("Test").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");val sc = new SparkContext(conf)val hbaseConf: Configuration = getHbaseConf()val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])//putData(sc);//addHadoopJobConf(sc);doBulkLoad(sc);hbaseRDD.foreach {case (_, result) =>val key = Bytes.toString(result.getRow);val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes()));val address = Bytes.toString(result.getValue("info".getBytes(), "address".getBytes()));var age = Bytes.toString(result.getValue("info".getBytes(), "age".getBytes()));println(s"行键:\t $key,姓名:\t $name,地址:\t $address,年龄:\t $age");}}def mkData(sc: SparkContext, arr: Array[String]): RDD[String] = {val rowDataRDD = sc.makeRDD(arr)rowDataRDD}/*** 不能序列化的对象加@transient** @param sc* @param hbaseConf*/def putData(@transient sc: SparkContext): Unit = {val arr = Array[String]("003,王五,山东,20","004,赵六,陕西,23");var rowDataRDD: RDD[String] = mkData(sc, arr);rowDataRDD.foreach(rowData => {val hbaseConf: Configuration = getHbaseConf()val connection = ConnectionFactory.createConnection(hbaseConf)val table = connection.getTable(TableName.valueOf("student"))val rows = rowData.split(",")val put = new Put(Bytes.toBytes(rows(0)))put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(rows(1)))put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("address"),Bytes.toBytes(rows(2)))put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(rows(3)));table.put(put);})}def fetchHbaseData(sc: SparkContext): Unit = {val hbaseConf = getHbaseConf()val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])hbaseRDD.foreach {case (_, result) =>val key = Bytes.toString(result.getRow);val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes()));val address = Bytes.toString(result.getValue("info".getBytes(), "address".getBytes()));var age = Bytes.toString(result.getValue("info".getBytes(), "age".getBytes()));println(s"行键:\t $key,姓名:\t $name,地址:\t $address,年龄:\t $age");}}def getHbaseConf() = {val hbaseConf = HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum", "192.168.64.128");hbaseConf.set("hbase.zookeeper.property.clientPort", "2181");hbaseConf.set(TableInputFormat.INPUT_TABLE, "student");hbaseConf;}def addHadoopJobConf(@transient sc: SparkContext): Unit = {val rowDataRDD = mkData(sc, Array[String]("005,ab,西安,20","006,ac,西安,20"))var jobConf = new JobConf();jobConf.set("hbase.zookeeper.quorum", "192.168.64.128");jobConf.set("hbase.zookeeper.property.clientPort", "2181");jobConf.setOutputFormat(classOf[TableOutputFormat]);jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student");val resultRDD = rowDataRDD.map(line => line.split(",")).map(rows => {val put = new Put(Bytes.toBytes(rows(0)));put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(rows(1)))put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("address"),Bytes.toBytes(rows(2)))put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(rows(3)));(new ImmutableBytesWritable, put)})resultRDD.saveAsHadoopDataset(jobConf);}def doBulkLoad(sc: SparkContext): Unit = {val hadoopConf = new Configuration();hadoopConf.set("fs.defaultFS", "hdfs://192.168.64.128:9870");val fileSystem = FileSystem.get(hadoopConf);val hbaseConf = HBaseConfiguration.create(hadoopConf);hbaseConf.set("hbase.zookeeper.quorum", "192.168.64.128");hbaseConf.set("hbase.zookeeper.property.clientPort", "2181");val connection = ConnectionFactory.createConnection(hbaseConf);val tableName = TableName.valueOf("student")val table = connection.getTable(tableName)val admin = connection.getAdminif (!admin.tableExists(tableName)) {val tableDescriptor = new HTableDescriptor(tableName)val columnDescriptor = new HColumnDescriptor("info");tableDescriptor.addFamily(columnDescriptor);admin.createTable(tableDescriptor);}val path = "hdfs://192.168.64.128:9876/hbase";if (fileSystem.exists(new Path(path))) {fileSystem.delete(new Path(path));}val rowDataRDD = sc.makeRDD(Array[String]("rowKey:001,name:wangwu","rowKey:008,address:陕西","rowKey:001,age:20"))val keyValues = rowDataRDD.map(_.split(",")).map(arr => {val rowKey = arr(0).split(":")(1);val qualify = arr(1).split(":")(0);var value = arr(1).split(":")(1);val kv = new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes("info"), Bytes.toBytes(qualify), Bytes.toBytes(value))(new ImmutableBytesWritable(), kv)})keyValues.saveAsNewAPIHadoopFile(path,classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat2],hbaseConf)val bulkLoader = new LoadIncrementalHFiles(hbaseConf);val regionLocator = connection.getRegionLocator(tableName);bulkLoader.doBulkLoad(new Path(path), admin, table, regionLocator);}
}
自定义分区器
class MyPartitioner(partitions: Int) extends Partitioner {override def numPartitions: Int = partitionsoverride def getPartition(key: Any): Int = {val project = key.toString;if (project.equals("chinese")) 0else if (project.equals("math")) 1else 2}
}object CustomPartitionerTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test").setMaster("local");val sc = new SparkContext(conf)val arr = Array("chinese,333","math,22","english,23")val data = sc.makeRDD(arr).map(line => {(line.split(",")(0), line.split(",")(1))})data.partitionBy(new MyPartitioner(3)).saveAsTextFile("/output/partitioner");}
}
2.spark sql udf
package com.example.demo.udfimport org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructType}private [udf] class AVGFunc extends UserDefinedAggregateFunction {/*** 输入* @return*/override def inputSchema: StructType = {new StructType().add("age",LongType)}/*** 缓冲区类型* @return*/override def bufferSchema: StructType = {new StructType().add("sum",LongType).add("count",LongType);}/*** 输出类型* @return*/override def dataType: DataType = DoubleType;/*** 是否稳定* @return*/override def deterministic: Boolean = true/*** 初始化缓冲区* @param buffer*/override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0L;buffer(1) = 0L;}/*** 更新缓冲区* @param buffer* @param input*/override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0);buffer(1) = buffer.getLong(1) + 1;}/*** 合并缓冲区* @param buffer1* @param buffer2*/override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0);buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1);}override def evaluate(buffer: Row): Any = {return buffer.getLong(0).toDouble / buffer.getLong(1);}}
package com.example.demo.udfimport org.apache.hadoop.hive.ql.exec.UDFArgumentException
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactoryclass LenFunction extends GenericUDF {override def initialize(args: Array[ObjectInspector]): ObjectInspector = {if (args.length == 0) {throw new UDFArgumentException("参数个数为0");}return PrimitiveObjectInspectorFactory.writableIntObjectInspector;}override def evaluate(args: Array[GenericUDF.DeferredObject]): AnyRef = {val str = args(0).get().toString();val inspector = PrimitiveObjectInspectorFactory.writableIntObjectInspectorif (str == null) {return inspector.create(0);}return inspector.create(str.length);}override def getDisplayString(strings: Array[String]): String = "";}
object HiveFunctionTest {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test").master("yarn").enableHiveSupport().getOrCreate()session.sql("""|CREATE TEMPORARY FUNCTION c_test AS 'com.example.demo.udf.DIVFunction'""".stripMargin)session.sql("select c_test(3,4)").show();}
}
3.spark stream
object StreamingBlackListFilter {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test") setMaster ("local[8]");val ssc = new StreamingContext(conf, Seconds(1))val blackList = Array(("tom", true), ("leo", true));val blackRDD = ssc.sparkContext.makeRDD(blackList);val logDStream = ssc.socketTextStream("192.168.64.128", 9000);val logTupleStream = logDStream.map(line => {(line.split(" ")(1), line)})val resultDStream = logTupleStream.transform(rdd => {val leftOuterRDD = rdd.leftOuterJoin(blackRDD)val filteRDD = leftOuterRDD.filter(line => {if (line._2._2.getOrElse(false)) {false} else {true}});val resultRDD = filteRDD.map(line => line._2._1);resultRDD})resultDStream.print();ssc.start();ssc.awaitTermination();}
}
4.spark 调优
5.maven 依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example.demo</groupId><artifactId>spark-start-demo</artifactId><version>1.0-SNAPSHOT</version><properties><hadoop.version>3.2.0</hadoop.version><hbase.version>2.2.6</hbase.version><spark.version>3.0.1</spark.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-yarn_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.openjdk.jol</groupId><artifactId>jol-core</artifactId><version>0.8</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.20</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-mapreduce</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-yarn_2.12</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.12.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.2.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.0</version></dependency></dependencies><build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.3</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>
</project>