文章目录
- 1. 大数据项目结构
- 2. 类说明
- 2.1 公共接口类
- 2.2 TaskNameEnum指定每个任务的名称
- 2.3 TaskRunner中编写任务的业务逻辑
- 3. 任务执行脚本
每个公司内部都有一套自己的架子,一般新人来了就直接在已有的架子上开发业务。
以下仅仅作为记录下自己使用的架子,不作为任何推荐,也不认为这样的组织结构就是好用的。
1. 大数据项目结构
项目的整体组织结构
目录 | 说明 |
---|---|
annotation | 自定义注解Runner和Task。 |
app | 用来放整个项目的各个任务。 test1和test2是具体开发的业务任务。 |
base | BaseRunner和BaseTask是两个基础类 |
enums | 用来定义任务的别名 |
FeatureContextApp | 主类在目录中的位置保持不变,如果移动,会影响扫描task和Runner |
2. 类说明
2.1 公共接口类
package com.king.ml.baseimport com.king.ml.enums.TaskNameEnum
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime
import org.springframework.util.StopWatchimport scala.util.{Failure, Success, Try}trait BaseTask extends Logging with Serializable {def taskName: TaskNameEnum.Valuedef initConf(sparkConf: SparkConf = new SparkConf()): SparkConf = sparkConfvar runtime: StopWatch = _def around(implicit spark: SparkSession, currDate: DateTime = DateTime.now): Unit = {beforeTry {Class.forName(spark.conf.get("task.runner")).newInstance().asInstanceOf[BaseRunner].run} match {case Success(_) => aftercase Failure(_) => afterThrowException}}private def before(implicit spark: SparkSession, currDate: DateTime): Unit = {val taskName = spark.conf.get("task.runner")println("开始执行任务 ...["+taskName+"]")runtime = new StopWatch(taskName)runtime.start(taskName)}private def after(implicit spark: SparkSession, currDate: DateTime): Unit = {val taskName = spark.conf.get("task.runner")runtime.stop()println("任务执行结束 ...["+ taskName+"],共耗时:" + runtime.getTotalTimeSeconds +"秒")}private def afterThrowException(implicit spark: SparkSession, currDate: DateTime): Unit = {val taskName = spark.conf.get("task.runner")runtime.stop()println("任务执行异常 ...[" + taskName + "],共耗时:" + runtime.getTotalTimeSeconds + "秒")}
}
通过一个公共的接口记录每个任务执行的具体日志信息。
2.2 TaskNameEnum指定每个任务的名称
object TaskNameEnum extends Enumeration {def getEnumType(source:String):TaskNameEnum.Value = {val values =TaskNameEnum.values.toList.filter(_.toString.toUpperCase == source.toUpperCase)values.length match {case 1 => values.headcase _ => throw new IllegalArgumentException("该任务不存在")}}val Test1 = Value("ods.ods_test1")val Test2 = Value("ods.ods_test2")}
这里的Test1和Test2表示任务的名称。
2.3 TaskRunner中编写任务的业务逻辑
package com.king.ml.app.test1import com.king.ml.annotation.Runner
import com.king.ml.base.BaseRunner
import com.king.ml.enums.TaskNameEnum
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime@Runner
class Test1TaskRunner extends BaseRunner{override def taskName: TaskNameEnum.Value = TaskNameEnum.Test1override def run(implicit spark: SparkSession, currDate: DateTime): Unit = {val cnt = spark.table("ods.ods_test1").count()println("===>总记录数为:")println("===>" + cnt)}
}
3. 任务执行脚本
在执行脚本中,任务主程序名不需要改变,只需要给任务传参枚举中任务名的值即可。
spark-submit \
--name 'test-ml' \
--master yarn \
--deploy-mode client \
--conf spark.port.maxRetries=100 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.executor.memoryOverhead=5120 \
--queue root.production \
--driver-memory 2g --num-executors 2 --executor-memory 2g --executor-cores 1 \
--class com.king.ml.app.FeatureContextApp \
./ml/ml-demo.jar "ods.ods_test1"