最近在GitHub上创建了一个新工程,收集个人在数据工程工作的小工具集合,命名为data_dragon (数据一条龙)。取这个名字的是希望这些脚本或代码能够复用,端到端地减少临时数据处理的时间。
最近因为工作上的一些变化,写作节奏有点被打乱,已经有2个月没有更新文章了。这次刚好将最近创建的小工程做个介绍。首先这个工程都是些零散的代码脚本,目前上传了3个,有Python,有Bash shell。后期,可能还有用sed写的HLR上的IMSI处理脚本,Java写的Hive UDF……总之,就是一些在实际工作中为了避免重复劳动的临时代码。
放在线上,就是为了方便以后遇到相同的问题可以重用,该工程的代码地址是https://github.com/camash/data_dragon。
工程中,每一个文件夹都是一个独立的小工具,用于解决一个独立的问题。目前,已经上传的三个脚本分别用于“快速生成Azkaban任务调度的DAG”,“hive同步数据到ES索引”以及“传输大量SFTP文件以及检查”。以下,分别对上传的三个脚本做简要介绍。
generate_azkaban_flow
作用
在Excel中配置任务依赖关系,然后使用shell脚本快速生成Azkaban的job文件。
使用方法
- 创建文件 在Excel或者其它表格软件中,按如下结构创建
编号 | 任务名称 | 任务调用脚本 | 依赖 |
---|---|---|---|
000 | start_kettle | ||
001 | test | /home/hadoop/test/kettle/all/test.sh resource | 000_start_kettle |
002 | end_job | /home/hadoop/test/kettle/all/end_job.sh | 000_start_kettle, 001_test |
- 执行转换 复制到文本文件中,保存为tsv文件,比如test.txt。然后执行shell脚本。
bash ./gen_azkaban_flow.sh test.txt
执行之后在文件所在路径内会生成以编号_任务名称.job
的Azkaban任务文件。文件数量等同于tsv文件中的行数。
ls -1 *.job
000_start_kettle.job
001_test.job
002_end_job.job
- job文件内容
主要包含执行项和依赖项,依赖项就是最终生成任务DAG的边。同时,这个脚本中默认会给执行命令加入执行日期参数,若不需要可以通过修改shell命令实现。
$ cat 002_end_job.job
type=command
dependencies=000_start_kettle, 001_test
command=/bin/bash /home/hadoop/test/kettle/all/end_job.sh '${azkaban.flow.start.year}${azkaban.flow.start.month}${azkaban.flow.start.day}'
使用总结
可以通过先在表格中规则的梳理任务流,避免了任务太多时直接写job文件容易遗漏的情况。梳理完成之后,使用该脚本一次性生成所有的job,秒秒钟完成。
hive_to_elasticsearch
作用
将Hive表中的数据导入到Elasticsearch的索引中。
使用方法
脚本是通过Python3编写的,因此使用Python3调用即可。
python3 hive_records_to_es.py
其中,Hive地址和Elasticsearch的地址放在connection.cfg
文件中,样例如下:
[hive]
host = 192.168.1.4
port = 10000
user = hadoop
[es]
es_url_1 = http://192.168.1.6:7200/
es_url_2 = http://192.168.1.7:7200/
es_url_3 = http://192.168.1.8:7200/
另外,表名和索引名在脚本中是静态赋值,后期需要动态传入。
scp_copy_and_check
作用
从远程SFTP同步文件指定文件夹下的所有文件至本地的指定路径。在传输前后,可以对源和目标系统上的文件数量进行校验(也可以支持其它校验和方式)。同时在传输前,对源的文件数量可以设置一定阈值,数量过少直接报异常退出程序。
使用方法
脚本是使用bash shell进行编写的,需要传入日期参数来确认文件夹的路径,参数格式为YYYYMMDD
。若不传入参数,则会取执行日期做为默认参数。调用方式如下:
bash scp_log_file.sh 20200916