数据同步底层脚本
日志追踪,关键字提取,任务失败重启策略
Mysql_to_hive.sh
#!/bin/bashecho "mysql host is" $1
echo "mysql db is" $2
echo "mysql table is" $3
echo "mysql username is" $4
echo "mysql passwd is" $5
echo "hive db is" $6
echo "hive table prefix is" $7
echo "format is" $8
echo "create_time is" $9
echo "update_time is" ${10}
echo "query_begin_date is" ${11}
echo "query_end_date is" ${12}
echo "hive_tables is" ${13}
echo "condition is" ${14}
echo "dropCols is" ${15}host=$1
db=$2
table=$3
username=$4
passwd=$5
hive_db=$6
hive_table_prefix=$7
format=$8
create_time=$9
update_time=${10}
dt=${11}
dt1=${12}
hive_tables=${13}
condition=${14}
dropCols=${15}s=0
limit_cnts=10f(){s=$(($s+1))echo "函数f被调用次数:$s"if [ $s -gt $limit_cnts ]thenecho "the cycle times is gt $limit_cnts,exit"exit 1fiquery_begin_date=${1}
query_end_date=${2}WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${hive_db}/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/${table}_to_hive.log"
echo "FILE_LOG:${FILE_LOG}"/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class com.mingzhi.common.universal.common_mysql_to_hive \
--master yarn \
--deploy-mode client \
--driver-memory 2g \
--num-executors 3 \
--executor-memory 3G \
--executor-cores 3 \
--conf spark.default.parallelism=200 \
--conf spark.port.maxRetries=1000 \
--conf spark.rpc.numRetries=1000000 \
--conf spark.sql.shuffle.partitions=10 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.rpc.askTimeout=3600 \
--conf spark.rpc.lookupTimeout=3600 \
--conf spark.network.timeout=3600 \
--conf spark.rpc.io.connectionTimeout=3600 \
/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies.jar \
"$host" "$db" "$table" "$username" "$passwd" "$hive_db" "$hive_table_prefix" "$format" "$create_time" "$update_time" "$query_begin_date" "$query_end_date" "$hive_tables" "$condition" "$dropCols" 2>&1 | tee ${FILE_LOG}while read -r line
do#echo "$line"error1="SparkContext has been shutdown"error2="Failed to send RPC"error3="java.nio.channels.ClosedChannelException"error4="Marking as slave lost"error5="org.apache.spark.SparkException"error6="Exception in thread"error7="SparkContext was shut down"error8="org.apache.spark.sql.AnalysisException"error9="java.util.concurrent.RejectedExecutionException"if [[ ${line} == *${error9}* || ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* ]]thenecho "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"#exit 1sleep 1mf "$query_begin_date" "$query_end_date"fi
done < ${FILE_LOG}}interval_day=5
start_date=${dt}
end_date=${dt1}
while [[ $start_date < $end_date || $start_date = $end_date ]]
doquery_begin_date=`date -d "$start_date" "+%Y-%m-%d"`query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`if [[ $query_end_date > $end_date ]]thenquery_end_date=$end_datefiecho "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"#开始执行spark任务sleep 1sf "$query_begin_date" "$query_end_date"
done
Hive_to_mysql.sh
#!/bin/bashmysql_host=$1
from_db=$2
from_tables=$3
to_db=$4
to_tables=$5
username=$6
passwd=$7
dt=$8
dt1=$9
savemode=${10}
dropCols=${11}echo "mysql host is $1"
echo "from_db is ${2}"
echo "from tables is $3"
echo "to_db is $4"
echo "to_tables is $5"
echo "username is $6"
echo "passwd is $7"
echo "dt is $8"
echo "dt1 is $9"
echo "savemode is ${10}"
echo "dropCols is ${11}"if [ $8 ];
then
dt=$8
elsedt=`date -d "-1 day" +%F`fiif [ $9 ];
then
dt1=$9
else
dt1=`date -d "-1 day" +%F`
fiif [ ${10} ];
then
savemode=${10}
elsesavemode='OverWriteByDt'fiecho '==============================================================================='echo "final dt is $dt"
echo "final dt1 is $dt1"
echo "final savemode is $savemode"
echo "final dropCols is $dropCols"#/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies_202201.jar "${mysql_host}" "${from_db}" "${from_tables}" "${to_db}" "${to_tables}" "$username" "${passwd}" "${dt}" "${dt1}" "${savemode}"s=0
limit_cnts=10f(){s=$(($s+1))echo "函数f被调用次数:$s"if [ $s -gt $limit_cnts ]thenecho "the cycle times is gt $limit_cnts,exit"exit 1fiquery_begin_date=${1}
query_end_date=${2}WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${from_db}/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/${from_tables}_to_mysql.log"
echo "FILE_LOG:${FILE_LOG}"/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class com.mingzhi.common.universal.hive_to_mysql \
--master yarn \
--deploy-mode client \
--executor-memory 2G \
--num-executors 2 \
--executor-cores 4 \
--conf spark.dynamicAllocation.maxExecutors=3 \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=2g \
/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies.jar \
"${mysql_host}" "${from_db}" "${from_tables}" "${to_db}" "${to_tables}" "$username" "${passwd}" "${query_begin_date}" "${query_end_date}" "${savemode}" "${dropCols}" 2>&1 | tee ${FILE_LOG}while read -r line
do#echo "$line"error1="SparkContext has been shutdown"error2="Failed to send RPC"error3="java.nio.channels.ClosedChannelException"error4="Deadlock found"error5="org.apache.spark.SparkException"error6="Exception in thread"error7="SparkContext was shut down"error8="org.apache.spark.sql.AnalysisException"if [[ ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* ]]thenecho "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"#exit 1sleep 1mf "$query_begin_date" "$query_end_date"fi
done < ${FILE_LOG}}interval_day=4
start_date=${dt}
end_date=${dt1}if [[ $start_date == "9999"* ]]theninterval_day=0
fiwhile [[ $start_date < $end_date || $start_date = $end_date ]]
doquery_begin_date=`date -d "$start_date" "+%Y-%m-%d"`query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`if [[ $query_end_date > $end_date ]]thenquery_end_date=$end_datefiecho "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"#开始执行spark任务if [[ $query_begin_date == "10000"* ]]thenexitFi
s=0f "$query_begin_date" "$query_end_date"
done
Es_to_hive.sh
[root@mz-hadoop-01 import]# cat /root/bin/es_to_hive.sh #!/bin/bashecho "es_host is" $1
echo "es_indexes is" $2
echo "hive_db is" $3
echo "hive_tables is" $4
echo "create_time is" $5
echo "update_time is" $6
echo "dt is" $7
echo "dt1 is" $8
echo "format is $9"
echo partitions is ${10}es_host=$1
es_indexes=$2
hive_db=$3
hive_tables=$4
create_time=$5
update_time=$6
dt=$7
dt1=$8
format=$9
partitions=${10}cnts=0
cnts_limit=5memory=1
memory_limit=4f(){cnts=$(($cnts+1))
memory=$(($memory+1))if [ $memory -gt $memory_limit ]
thenmemory=$memory_limit
fiecho "函数f被调用次数cnts:$cnts and memory is $memory G"if [ $cnts -gt $cnts_limit ]
thenecho "the cycle times is gt $cnts_limit,exit"exit 1
fiquery_begin_date=$1
query_end_date=$2WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${hive_db}/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/${hive_tables}_to_hive.log"
echo "FILE_LOG:${FILE_LOG}"/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class com.mingzhi.common.universal.common_es_to_hive \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-memory ${memory}G \
--executor-cores 2 \
--conf spark.default.parallelism=200 \
--conf spark.port.maxRetries=300 \
/mnt/db_file/jars/common-1.0-SNAPSHOT-jar-with-dependencies.jar \
"$es_host" "$es_indexes" "$hive_db" "$hive_tables" "$create_time" "$update_time" "$query_begin_date" "$query_end_date" "$format" "$partitions" 2>&1 | tee ${FILE_LOG}while read -r line
do#echo "$line"error1="SparkContext has been shutdown"error2="Failed to send RPC"error3="java.nio.channels.ClosedChannelException"error4="Marking as slave lost"error5="org.apache.spark.SparkException"error6="Exception in thread"error7="SparkContext was shut down"error8="org.apache.spark.sql.AnalysisException"error9="java.util.concurrent.RejectedExecutionException"error0="java.io.IOException"if [[ ${line} == *${error9}* || ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* || ${line} == *${error0}* ]]thenecho "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"#exit 1sleep 5sf "$query_begin_date" "$query_end_date"fi
done < ${FILE_LOG}}interval_day=0
start_date=${dt}
end_date=${dt1}while [[ $start_date < $end_date || $start_date = $end_date ]]
doquery_begin_date=`date -d "$start_date" "+%Y-%m-%d"`query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`if [[ $query_end_date > $end_date ]]thenquery_end_date=$end_datefiecho "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"#开始执行spark任务cnts=0f "${query_begin_date}" "${query_end_date}"
done
数据同步业务脚本
Import
#!/bin/bashsource /root/bin/common_config/db_config.propertiesif [ $1 ];
then
dt=$1
elsedt=`date -d "-1 day" +%F`fiif [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fi/root/bin/mysql_to_hive.sh "$wfs_host" $wfs_db tbwork_order "$wfs_user" "$wfs_pwd" paascloud '' '' 'create_time' 'update_time' $dt $dt1
[root@mz-hadoop-01 import]# cat wfs_order_list_index.sh
#!/bin/bashsource /root/bin/common_config/es_config.propertiesif [ $1 ];
then
dt=$1
elsedt=`date -d "-1 day" +%F`fiif [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fiyear=${dt:0:4}cur_day=`date -d "-1 day" +%F`
cur_year=${cur_day:0:4}echo "year is ${year} and cur_year is ${cur_year}"#if [ ${year} == '2023' ];
if [ ${year} == ${cur_year} ];
then
port=9200
else
port=9500
fiecho "port is ${port}"/root/bin/es_to_hive.sh "$wfs_es_host:${port}" wfs_order_list_index_${year} paascloud wfs_order_list_index "orderCreateTime" "orderUpdateTime" $dt $dt1 'parquet' 1
Export
有更新的导出
[root@mz-hadoop-01 tcm]# cat /mnt/db_file/tcm/hive_to_olap_4_tcm_parse.sh#!/bin/bashsource /root/bin/common_config/db_config.propertieshive_table=$1
target_table=$2if [ $3 ];
then
dt=$3
elsedt=`date -d "-1 day" +%F`fiif [ $4 ];
then
dt1=$4
else
dt1=`date -d "-1 day" +%F`
fiecho "起始日期为$dt"
echo "结束日期为$dt1"f(){do_date=$1
echo "===函数执行日期为 $do_date==="/root/bin/hive_to_mysql.sh "$olap_host" tcm "$hive_table" "$olap_db" "$target_table" "$olap_user" "$olap_pwd" $1 $1}if [[ $dt == $dt1 ]]thenecho "dt = dt1......"for i in `/mnt/db_file/tcm/get_changed_dt.sh $dt`
doecho "同步变化的日期======================>$i"f $i
doneelseecho "batch process..."start_day=$dt
end_day=$dt1
dt=$start_day
while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
doecho "批处理===>"$dtf $dtdt=`date -d "+1 day $dt" +%Y-%m-%d`
donefi
无更新的导出
直接导出
[root@mz-hadoop-01 export]# cat wfs_ads_order_material_stats_to_olap.sh source /root/bin/common_config/db_config.propertiesecho "olap_host is :${olap_host}"if [ $1 ];
then
dt=$1
elsedt=`date -d "-1 day" +%F`fiecho "dt:$dt"if [ $2 ];
then
dt1=$2
elsedt1=`date -d "-1 day" +%F`fi/root/bin/hive_to_mysql.sh "${olap_host}" paascloud wfs_ads_order_material_stats mz_olap wfs_ads_order_material_stats $olap_user $olap_pwd ${dt} ${dt1}
spark通用计算任务
#!/bin/bashif [ $1 ];
then
className=$1
elseecho "need className"
exitfiif [ $2 ];
then
jarPath=$2
elseecho "need jarPath"
exitfiif [ $3 ];
then
dt=$3
elsedt=`date -d "-1 day" +%F`fiif [ $4 ];
then
dt1=$4
else
dt1=`date -d "-1 day" +%F`
fi echo "起始日期:$dt,结束日期:$dt1"f(){query_begin_date=${1}
query_end_date=${2}WORK_DIR=$(cd "$(dirname "$0")";pwd)
echo "WORK_DIR:"${WORK_DIR}
LOG_PATH="${WORK_DIR}/log/${query_end_date}"
echo "LOG_PATH:${LOG_PATH}"
mkdir -p ${LOG_PATH}
FILE_LOG="${LOG_PATH}/$className.log"
echo "FILE_LOG:${FILE_LOG}"/opt/cloudera/parcels/SPARK2/bin/spark2-submit \
--class $className \
--master yarn \
--deploy-mode client \
--driver-memory 4g \
--num-executors 2 \
--executor-memory 4G \
--executor-cores 4 \
--conf spark.driver.cores=5 \
--conf spark.port.maxRetries=1000 \
--conf spark.rpc.numRetries=1000000 \
--conf spark.sql.shuffle.partitions=3 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.rpc.askTimeout=3600 \
--conf spark.rpc.lookupTimeout=3600 \
--conf spark.network.timeout=3600 \
--conf spark.rpc.io.connectionTimeout=3600 \
--conf spark.default.parallelism=50 \
$jarPath \
$query_begin_date $query_begin_date 2>&1 | tee ${FILE_LOG}while read -r line
do#echo "$line"error1="SparkContext has been shutdown"error2="Failed to send RPC"error3="java.nio.channels.ClosedChannelException"error4="Marking as slave lost"error5="org.apache.spark.SparkException"error6="Exception in thread"error7="SparkContext was shut down"error8="org.apache.spark.sql.AnalysisException"if [[ ${line} == *${error1}* || ${line} == *${error2}* || ${line} == *${error3}* || ${line} == *${error4}* || ${line} == *${error5}* || ${line} == *${error6}* || ${line} == *${error7}* || ${line} == *${error8}* ]]thenecho "SPARK SQL EXECUTION FAILED ......AND JOB FAILED DATE IS 【${query_begin_date},${query_end_date}】......"#exit 1sleep 10sf "$query_begin_date" "$query_end_date"fi
done < ${FILE_LOG}}interval_day=0
start_date=${dt}
end_date=${dt1}
while [[ $start_date < $end_date || $start_date = $end_date ]]
doquery_begin_date=`date -d "$start_date" "+%Y-%m-%d"`query_end_date=`date -d "+${interval_day} day ${start_date}" +%Y-%m-%d`start_date=`date -d "+${interval_day} day +1 day ${start_date}" +%Y-%m-%d`if [[ $query_end_date > $end_date ]]thenquery_end_date=$end_datefiecho "【本次任务开始时间:$query_begin_date,本次任务结束时间:$query_end_date,下一次任务的开始时间:$start_date】"#开始执行spark任务f "$query_begin_date" "$query_end_date"
done
Spark业务计算任务
spark_job_4_wfs.sh
#!/bin/bashif [ $1 ];
then
className=$1
elseecho "need className"
exitfiif [ $2 ];
then
dt=$2
elsedt=`date -d "-1 day" +%F`fiif [ $3 ];
then
dt1=$3
else
dt1=`date -d "-1 day" +%F`
fi sh /root/bin/spark_job.sh $className /mnt/db_file/wfs/jar/wfs-1.0-SNAPSHOT-jar-with-dependencies.jar $dt $dt1
dwd_order_info_abi.sh
[root@mz-hadoop-01 dwd]# cat dwd_order_info_abi.sh
#!/bin/bashif [ $1 ];
then
dt=$1
elsedt=`date -d "-1 day" +%F`fiif [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fiecho "起始日期:$dt,结束日期:$dt1"/mnt/db_file/wfs/spark_job_4_wfs.sh com.mingzhi.wfs.dwd.dwd_order_info_abi $dt $dt1
Hive计算任务
#!/bin/bash
db=paascloud
hive=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/hiveif [ $1 ];
then
dt=$1
elsedt=`date -d "-1 day" +%F`fiif [ $2 ];
then
dt1=$2
else
dt1=`date -d "-1 day" +%F`
fiecho "起始日期:$dt,结束日期:$dt1"f(){do_date=$1
echo "===函数日期为 $do_date==="
sql="
use $db;set hive.exec.dynamic.partition.mode=nonstrict;add jar /mnt/db_file/wfs/jar/udf-1.4.3-SNAPSHOT-jar-with-dependencies.jar;
create temporary function str_distinct as 'com.mingzhi.StringDistinct';insert overwrite table ads_order_overall_cube partition(dt)
select
corcode_f3,max(sort_f3),
corcode_f2,max(sort_f2),
corcode_f1,max(sort_f1),
orderlargertype,
--ordersecondtype,
--orderthirdlytype,
orderSource,
orderStatus,
count(1) as cnts,str_distinct(concat_ws(',',collect_set(deal_user_ids))) as all_persons,
if(str_distinct(concat_ws(',',collect_set(deal_user_ids)))='' ,0, size(split(str_distinct(concat_ws(',',collect_set(deal_user_ids))),','))) as all_person_cnts,--regexp_replace(regexp_replace(regexp_replace(lpad(bin(cast(grouping_id() as bigint)),5,'0'),"0","x"),"1","0"),"x","1") as dim
reverse(lpad(bin(cast(GROUPING__ID as bigint)),6,'0')) as dim
,'$do_date'
from dwd_order_info_abi where dt='$do_date'
group by
corcode_f3,
corcode_f2,
corcode_f1,
orderlargertype,
--ordersecondtype,
--orderthirdlytype,
orderSource,
orderStatus
grouping sets(corcode_f2,corcode_f1,(corcode_f2,orderlargertype),(corcode_f2,orderlargertype,orderSource,orderStatus),(corcode_f1,orderlargertype),(corcode_f1,orderlargertype,orderSource,orderStatus) )
;
"# 获取当前目录
WORK_DIR=$(cd "$(dirname "$0")";pwd)LOG_PATH="$WORK_DIR/log/$do_date"
mkdir -p $LOG_PATH
FILE_NAME="ads_order_overall_cube"
#*****************************************************************************$hive -e "$sql" 2>&1 | tee $LOG_PATH/${FILE_NAME}.logwhile read -r line
doecho "$line"error="FAILED"if [[ $line == *$error* ]]thenecho "HIVE JOB EXECUTION FAILED AND DATE IS 【${do_date}】......"#exit 1f ${do_date}fi
done < ${LOG_PATH}/${FILE_NAME}.log}start_day=$dt
end_day=$dt1
dt=$start_day
while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
do#for i in `/mnt/db_file/wfs/get_changed_dt.sh $dt`
for i in `cat /mnt/db_file/wfs/log/$dt/get_changed_dt_result`
doecho ===================执行变化的日期:$i ===========================f $i
donedt=`date -d "+1 day $dt" +%Y-%m-%d`
done