编写了一个根据今天的全量的数据以及昨天全量的数据,自动获取今天增量数据的脚本。
#!/bin/bashhive_db=""
hive_result_tb=""
hive_source_tb=""
hive_source_last_tb=""
hive_pk=""initParam(){# 库名hive_db=${1}# 原表表名hive_source_tb=${2}# 原表表名hive_source_last_tb=${3}# 结果表表名hive_result_tb=${4}# 原表主键hive_pk=${5}hive_result_tb=${hive_db}.${hive_result_tb}hive_source_tb=${hive_db}.${hive_source_tb}hive_source_last_tb=${hive_db}.${hive_source_last_tb}hive_cur_tb_pk1=${hive_pk}_pk1hive_cur_tb_pk2=${hive_pk}_pk2echo "--------------库名 hive_db------------" ${1}echo "--------------原表表名 hive_source_tb------------" ${2}echo "--------------原表表名 hive_source_last_tb------------" ${3} echo "--------------原表表名 hive_result_tb------------" ${4} echo "--------------原表表名 hive_pk------------" ${5} }#获取表字段,参数:${hive_db} ${hive_tb}
function getHiveFieldList(){echo "------------------------------function getHiveFieldList start----------------------"hive_full_tb=${hive_source_tb}echo "--------------- [INFO] tableName ${hive_full_tb}" #初始化参数column=""column_varchar=""field_list=""field_list_varchar=""#判断参数是否有异常if [ ! ${hive_db} ] || [ ! ${hive_source_tb} ]; thenecho "-------------- [ERROR] 参数异常:--hive_db --hive_source_tb 必须同时传参 ----------------"exit 255fiecho "--------------- [INFO] 获取 hive_source_tb ${hive_source_tb}字段 ---------------"#查询表数据sql,拼接jsonpresto_sql="desc ${hive_full_tb};"echo "--------------- [INFO] presto_sql:${presto_sql} ---------------"#获取表结构信息table_desc=$(${BIGDATA}/jar/presto.jar --server ${presto_master_host}:${presto_master_port} --catalog hive --schema default --user admin --execute "${presto_sql}")#设置状态status=$?getHiveFieldListStatusTableDesc=${status}while read -r Columndo#拆分成数组array_column=(${Column//,/ })#取数组第一个值column=${array_column[0]}#替换双引号column=${column//\"/}#判断是否为分区表,然后拼接字段par_result=$(echo ${Column} | grep "partition key")if [[ "${par_result}" != "" ]]thenecho "--------------- [INFO] ${hive_full_tb}为分区表 ---------------"#设置是否分区表参数is_hive_partition_tb=1#分区字段hive_partition_field=${column}echo "--------------- [INFO] hive_partition_field: ${hive_partition_field} ---------------"elseif [[ ${column} = ${hive_pk} ]]then echo "两个变量相等" columns_tb1=${columns_tb1},"${column} as ${column}_pk1"columns_tb2=${columns_tb2},"${column} as ${column}_pk2"columns_alias_tb1=${columns_alias_tb1},${column}_pk1columns_filter=${columns_filter}" (${column}_pk1!=${column}_pk2 or (${column}_pk1 is null and ${column}_pk2 is not null) or (${column}_pk2 is null and ${column}_pk1 is not null)) or"columns_alias_tb2=${columns_alias_tb2},${column}_pk2echo ${columns_tb1}else#按逗号拼接字段columns_tb1=${columns_tb1},${column}" as ${column}_tb1"columns_tb2=${columns_tb2},${column}" as ${column}_tb2"columns_alias_tb1=${columns_alias_tb1},${column}_tb1columns_alias_tb2=${columns_alias_tb2},${column}_tb2columns_filter=${columns_filter}" (${column}_tb1!=${column}_tb2 or (${column}_tb1 is null and ${column}_tb2 is not null) or (${column}_tb2 is null and ${column}_tb1 is not null)) or" fifidone <<< "${table_desc}"#表字段field_list_tb1=${columns_tb1:1}field_list_tb2=${columns_tb2:1}field_list_alias_tb1=${columns_alias_tb1:1}field_list_alias_tb2=${columns_alias_tb2:1}columns_filter=${columns_filter::-2} columns_filter=${columns_filter:1}echo "-----------------------columns_filter-------------------------"${columns_filter}#设置状态status=$?getHiveFieldListStatusFieldList=${status}#判断获取字段有异常if [ ! ${field_list_tb1} ]; thenecho "--------------- [ERROR] field_list_tb1 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_tb1 获取字段完成: \n${field_list_tb1} ---------------"#判断获取字段有异常if [ ! ${field_list_tb2} ]; thenecho "--------------- [ERROR] field_list_tb2 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_tb2 获取字段完成: \n${field_list_tb2} ---------------"#判断获取字段有异常if [ ! ${field_list_alias_tb1} ]; thenecho "--------------- [ERROR] field_list_alias_tb1 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_alias_tb1 获取字段完成: \n${field_list_alias_tb1} ---------------"#判断获取字段有异常if [ ! ${field_list_alias_tb2} ]; thenecho "--------------- [ERROR] field_list_alias_tb2 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_alias_tb2 获取字段完成: \n${field_list_alias_tb2} ---------------"echo "------------------------------function getHiveFieldList end----------------------"incrementTableData "${field_list_tb1}" "${field_list_tb2}" "${field_list_alias_tb1}" "${field_list_alias_tb2}" "${columns_filter}"
}function incrementTableData(){echo "-------------------function incrementTableData start" #表字段field_list_tb1=${1}field_list_tb2=${2}field_list_alias_tb1=${3}field_list_alias_tb2=${4}columns_filter=${5} echo "---------------------function incrementTableData field_list_tb---------" "${field_list_tb1}" "${field_list_tb2}"echo "---------------------function incrementTableData field_list_alias_tb---------" "${field_list_alias_tb1}" "${field_list_alias_tb2}"echo "---------------------function incrementTableData columns_filter---------" "${columns_filter}" execute_sql="set session query_max_run_time='25.00m';set session hive.insert_existing_partitions_behavior = 'overwrite';set session use_preferred_write_partitioning = true; /*每个分区一个writer,在分区数多且整体文件体积较小时有奇效*/insert into ${hive_result_tb}with resource_label_cur_data as(select${field_list_tb1}from ${hive_source_tb}where ${hive_pk} is not null),resource_label_last_data as(select${field_list_tb2}from ${hive_source_last_tb}where${hive_pk} is not null),resource_label_full_data as(select${field_list_alias_tb1},${field_list_alias_tb2},cur_data.${hive_cur_tb_pk1} AS ${hive_pk}_td,last_data.${hive_cur_tb_pk2} AS ${hive_pk}_last_tdfrom resource_label_cur_data as cur_datafull join resource_label_last_data as last_dataon cur_data.${hive_cur_tb_pk1} = last_data.${hive_cur_tb_pk2}),resource_label_with_updata_flag as(selectcasewhen ${hive_pk}_td is not nulland ${hive_pk}_last_td is not null then 'U'when ${hive_pk}_td is nulland ${hive_pk}_last_td is null then 'D'else 'A'end as UPDATE_FLAG,if(${hive_pk}_td is null,${hive_pk}_last_td,${hive_pk}_td) as ${hive_pk}from resource_label_full_datawhere ${columns_filter})select${field_list_alias_tb1},full_data.UPDATE_FLAGfrom resource_label_with_updata_flag as full_dataleft join resource_label_cur_data as cur_data on full_data.${hive_pk} = cur_data.${hive_cur_tb_pk1};" echo "-----------------execute_sql-" ${execute_sql}#获取表结构信息table_desc=$(${BIGDATA}/jar/presto.jar --server ${presto_master_host}:${presto_master_port} --catalog hive --schema default --user admin --execute "${execute_sql}") echo "-------------------function incrementTableData end"
}# 定义main函数
main() { # 初始化参数initParam "ads_biz" "ads_biz_customer_resource_label_1d_2023_0803_test" "ads_biz_customer_resource_label_1d_2023_0803_last_test" "ads_biz_customer_resource_label_1d_2023_0803_result_test" customer_resource_idgetHiveFieldList
} # 调用main函数
main
参考的sql程序
selectt.KEY1,t.KEY2,'${ds}' as BATCH_DATE,t.UPDATE_FLAG,if(t.UPDATE_FLAG in ('A', 'U'),factor_name,null) as factor_name
from(selectcasewhen isnotnull(KEY1_1)and isnotnull(KEY2_1)and isnotnull(KEY1_2)and isnotnull(KEY2_2) then 'U'when isnull(KEY1_1)and isnull(KEY2_1) then 'D'else 'A'end as UPDATE_FLAG,if(isnull(KEY1_1)and isnull(KEY2_1),KEY1_2,KEY1_1) as KEY1,if(isnull(KEY1_1)and isnull(KEY2_1),KEY2_2,KEY2_1) as KEY2from(selectt1.CONTENT_HASH AS CONTENT_HASH_1,t2.CONTENT_HASH AS CONTENT_HASH_2,t1.KEY1 AS KEY1_1,t2.KEY1 AS KEY1_2,t1.KEY2 AS KEY2_1,t2.KEY2 AS KEY2_2from(selectKEY1,KEY2,hash(factor_name) as CONTENT_HASHfromtable_namewhereds = '${ds}'and KEY1 is not nulland KEY2 is not null) t1 fulljoin (selectKEY1,KEY2,hash(factor_name) as CONTENT_HASHfromtable_namewhereds = '${last_1_day}'and KEY1 is not nulland KEY2 is not null) t2 on cast (t1.KEY1 as string) = cast (t2.KEY1 as string)and cast (t1.KEY2 as string) = cast (t2.KEY2 as string)) awherea.CONTENT_HASH_1 <> a.CONTENT_HASH_2or (isnull(a.CONTENT_HASH_1)and isnotnull(a.CONTENT_HASH_2))or (isnull(a.CONTENT_HASH_2)and isnotnull(a.CONTENT_HASH_1))) tleft join (select*fromtable_namewhereds = '${ds}'and KEY1 is not nulland KEY2 is not null) ta on cast (t.KEY1 as string) = cast (ta.KEY1 as string)and cast (t.KEY2 as string) = cast (ta.KEY2 as string);