根据今天全量的数据以及昨天全量的数据,获取今天增量的数据

编写了一个根据今天的全量的数据以及昨天全量的数据,自动获取今天增量数据的脚本。


#!/bin/bashhive_db=""
hive_result_tb=""
hive_source_tb=""
hive_source_last_tb=""
hive_pk=""initParam(){# 库名hive_db=${1}# 原表表名hive_source_tb=${2}# 原表表名hive_source_last_tb=${3}# 结果表表名hive_result_tb=${4}# 原表主键hive_pk=${5}hive_result_tb=${hive_db}.${hive_result_tb}hive_source_tb=${hive_db}.${hive_source_tb}hive_source_last_tb=${hive_db}.${hive_source_last_tb}hive_cur_tb_pk1=${hive_pk}_pk1hive_cur_tb_pk2=${hive_pk}_pk2echo "--------------库名 hive_db------------"  ${1}echo "--------------原表表名 hive_source_tb------------"  ${2}echo "--------------原表表名 hive_source_last_tb------------"  ${3} echo "--------------原表表名 hive_result_tb------------"  ${4} echo "--------------原表表名 hive_pk------------"  ${5} }#获取表字段,参数:${hive_db} ${hive_tb}
function getHiveFieldList(){echo "------------------------------function getHiveFieldList start----------------------"hive_full_tb=${hive_source_tb}echo "--------------- [INFO] tableName ${hive_full_tb}" #初始化参数column=""column_varchar=""field_list=""field_list_varchar=""#判断参数是否有异常if [ ! ${hive_db} ] || [ ! ${hive_source_tb} ]; thenecho "-------------- [ERROR] 参数异常:--hive_db --hive_source_tb 必须同时传参 ----------------"exit 255fiecho "--------------- [INFO] 获取 hive_source_tb  ${hive_source_tb}字段 ---------------"#查询表数据sql,拼接jsonpresto_sql="desc ${hive_full_tb};"echo "--------------- [INFO] presto_sql:${presto_sql} ---------------"#获取表结构信息table_desc=$(${BIGDATA}/jar/presto.jar --server ${presto_master_host}:${presto_master_port} --catalog hive --schema default --user admin --execute "${presto_sql}")#设置状态status=$?getHiveFieldListStatusTableDesc=${status}while read -r Columndo#拆分成数组array_column=(${Column//,/ })#取数组第一个值column=${array_column[0]}#替换双引号column=${column//\"/}#判断是否为分区表,然后拼接字段par_result=$(echo ${Column} | grep "partition key")if [[ "${par_result}" != "" ]]thenecho "--------------- [INFO] ${hive_full_tb}为分区表 ---------------"#设置是否分区表参数is_hive_partition_tb=1#分区字段hive_partition_field=${column}echo "--------------- [INFO] hive_partition_field: ${hive_partition_field} ---------------"elseif [[ ${column} = ${hive_pk} ]]then  echo "两个变量相等"  columns_tb1=${columns_tb1},"${column} as ${column}_pk1"columns_tb2=${columns_tb2},"${column} as ${column}_pk2"columns_alias_tb1=${columns_alias_tb1},${column}_pk1columns_filter=${columns_filter}" (${column}_pk1!=${column}_pk2 or (${column}_pk1 is null and ${column}_pk2 is not null) or (${column}_pk2 is null and ${column}_pk1 is not null)) or"columns_alias_tb2=${columns_alias_tb2},${column}_pk2echo ${columns_tb1}else#按逗号拼接字段columns_tb1=${columns_tb1},${column}" as ${column}_tb1"columns_tb2=${columns_tb2},${column}" as ${column}_tb2"columns_alias_tb1=${columns_alias_tb1},${column}_tb1columns_alias_tb2=${columns_alias_tb2},${column}_tb2columns_filter=${columns_filter}" (${column}_tb1!=${column}_tb2 or (${column}_tb1 is null and ${column}_tb2 is not null) or (${column}_tb2 is null and ${column}_tb1 is not null)) or" fifidone <<< "${table_desc}"#表字段field_list_tb1=${columns_tb1:1}field_list_tb2=${columns_tb2:1}field_list_alias_tb1=${columns_alias_tb1:1}field_list_alias_tb2=${columns_alias_tb2:1}columns_filter=${columns_filter::-2} columns_filter=${columns_filter:1}echo "-----------------------columns_filter-------------------------"${columns_filter}#设置状态status=$?getHiveFieldListStatusFieldList=${status}#判断获取字段有异常if [ ! ${field_list_tb1} ]; thenecho "--------------- [ERROR] field_list_tb1 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_tb1 获取字段完成: \n${field_list_tb1} ---------------"#判断获取字段有异常if [ ! ${field_list_tb2} ]; thenecho "--------------- [ERROR] field_list_tb2 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_tb2 获取字段完成: \n${field_list_tb2} ---------------"#判断获取字段有异常if [ ! ${field_list_alias_tb1} ]; thenecho "--------------- [ERROR] field_list_alias_tb1 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_alias_tb1 获取字段完成: \n${field_list_alias_tb1} ---------------"#判断获取字段有异常if [ ! ${field_list_alias_tb2} ]; thenecho "--------------- [ERROR] field_list_alias_tb2 获取字段失败 ----------------"getHiveFieldListStatusTableDesc=255fiecho -e "--------------- [INFO] field_list_alias_tb2 获取字段完成: \n${field_list_alias_tb2} ---------------"echo "------------------------------function getHiveFieldList end----------------------"incrementTableData "${field_list_tb1}" "${field_list_tb2}" "${field_list_alias_tb1}" "${field_list_alias_tb2}"  "${columns_filter}" 
}function incrementTableData(){echo "-------------------function incrementTableData start"  #表字段field_list_tb1=${1}field_list_tb2=${2}field_list_alias_tb1=${3}field_list_alias_tb2=${4}columns_filter=${5} echo "---------------------function incrementTableData field_list_tb---------" "${field_list_tb1}" "${field_list_tb2}"echo "---------------------function incrementTableData field_list_alias_tb---------" "${field_list_alias_tb1}" "${field_list_alias_tb2}"echo "---------------------function incrementTableData columns_filter---------" "${columns_filter}" execute_sql="set session query_max_run_time='25.00m';set session hive.insert_existing_partitions_behavior = 'overwrite';set session use_preferred_write_partitioning = true; /*每个分区一个writer,在分区数多且整体文件体积较小时有奇效*/insert into  ${hive_result_tb}with resource_label_cur_data as(select${field_list_tb1}from ${hive_source_tb}where ${hive_pk} is not null),resource_label_last_data as(select${field_list_tb2}from ${hive_source_last_tb}where${hive_pk} is not null),resource_label_full_data as(select${field_list_alias_tb1},${field_list_alias_tb2},cur_data.${hive_cur_tb_pk1} AS ${hive_pk}_td,last_data.${hive_cur_tb_pk2} AS ${hive_pk}_last_tdfrom resource_label_cur_data as cur_datafull join resource_label_last_data as last_dataon cur_data.${hive_cur_tb_pk1} = last_data.${hive_cur_tb_pk2}),resource_label_with_updata_flag as(selectcasewhen ${hive_pk}_td is not nulland ${hive_pk}_last_td is not null then 'U'when ${hive_pk}_td is nulland ${hive_pk}_last_td is null then 'D'else 'A'end as UPDATE_FLAG,if(${hive_pk}_td is null,${hive_pk}_last_td,${hive_pk}_td) as ${hive_pk}from  resource_label_full_datawhere ${columns_filter})select${field_list_alias_tb1},full_data.UPDATE_FLAGfrom resource_label_with_updata_flag as full_dataleft join resource_label_cur_data as cur_data on full_data.${hive_pk}  = cur_data.${hive_cur_tb_pk1};"  echo "-----------------execute_sql-" ${execute_sql}#获取表结构信息table_desc=$(${BIGDATA}/jar/presto.jar --server ${presto_master_host}:${presto_master_port} --catalog hive --schema default --user admin --execute "${execute_sql}")  echo "-------------------function incrementTableData end"  
}# 定义main函数  
main() {  # 初始化参数initParam "ads_biz" "ads_biz_customer_resource_label_1d_2023_0803_test" "ads_biz_customer_resource_label_1d_2023_0803_last_test" "ads_biz_customer_resource_label_1d_2023_0803_result_test" customer_resource_idgetHiveFieldList
}  # 调用main函数  
main 

参考的sql程序

selectt.KEY1,t.KEY2,'${ds}' as BATCH_DATE,t.UPDATE_FLAG,if(t.UPDATE_FLAG in ('A', 'U'),factor_name,null) as factor_name
from(selectcasewhen isnotnull(KEY1_1)and isnotnull(KEY2_1)and isnotnull(KEY1_2)and isnotnull(KEY2_2) then 'U'when isnull(KEY1_1)and isnull(KEY2_1) then 'D'else 'A'end as UPDATE_FLAG,if(isnull(KEY1_1)and isnull(KEY2_1),KEY1_2,KEY1_1) as KEY1,if(isnull(KEY1_1)and isnull(KEY2_1),KEY2_2,KEY2_1) as KEY2from(selectt1.CONTENT_HASH AS CONTENT_HASH_1,t2.CONTENT_HASH AS CONTENT_HASH_2,t1.KEY1 AS KEY1_1,t2.KEY1 AS KEY1_2,t1.KEY2 AS KEY2_1,t2.KEY2 AS KEY2_2from(selectKEY1,KEY2,hash(factor_name) as CONTENT_HASHfromtable_namewhereds = '${ds}'and KEY1 is not nulland KEY2 is not null) t1 fulljoin (selectKEY1,KEY2,hash(factor_name) as CONTENT_HASHfromtable_namewhereds = '${last_1_day}'and KEY1 is not nulland KEY2 is not null) t2 on cast (t1.KEY1 as string) = cast (t2.KEY1 as string)and cast (t1.KEY2 as string) = cast (t2.KEY2 as string)) awherea.CONTENT_HASH_1 <> a.CONTENT_HASH_2or (isnull(a.CONTENT_HASH_1)and isnotnull(a.CONTENT_HASH_2))or (isnull(a.CONTENT_HASH_2)and isnotnull(a.CONTENT_HASH_1))) tleft join (select*fromtable_namewhereds = '${ds}'and KEY1 is not nulland KEY2 is not null) ta on cast (t.KEY1 as string) = cast (ta.KEY1 as string)and cast (t.KEY2 as string) = cast (ta.KEY2 as string);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/24643.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt中JSON的使用

一.前言&#xff1a; JSON是一种轻量级数据交换格式&#xff0c;常用于客户端和服务端的数据交互&#xff0c;不依赖于编程语言&#xff0c;在很多编程语言中都可以使用JSON&#xff0c;比如C&#xff0c;C&#xff0c;Java&#xff0c;Android&#xff0c;Qt。除了JSON&#x…

前端实现给图片添加水印

一、利用Canvas ::: tip 实现步骤: 上传图片&#xff0c;转换为base64格式的数据&#xff0c;利用Image加载图片利用canvas写入图片&#xff0c;然后绘制水印最后通过canvas输出添加水印后的base64数据 ::: 1. 本地读取图像文件渲染到img标签 ::: tip ​ 通过intpt[type…

格力变频空调怎么收氟?

格力变频空调怎么收氟&#xff1f; 随着气温的升高&#xff0c;越来越多的人开始使用格力变频空调来调节室内温度。但是&#xff0c;在使用格力变频空调时&#xff0c;很多人并不知道如何正确收氟。下面&#xff0c;我们将详细介绍一下格力变频空调怎么收氟。 首先&#xff0…

MyCat核心概念、需求案例讲解、环境准备及分片配置

1.MyCat概念介绍 2.MyCat入门需求 2.1 需求分析 2.2 环境准备 输入以下命令检查服务器防火墙状态 dead代表关闭状态&#xff0c;如果不关闭也可以需要开放特定的端口号&#xff01;&#xff01; systemctl status firewalld接着需要在三台服务器上的MySQL上创建三个数据库db0…

企业架构NOSQL数据库之MongoDB

目录 一、背景描述及其方案设计 (一)业务背景描述 &#xff08;二&#xff09;模拟运维设计方案 二、Mongodb介绍 &#xff08;一&#xff09;nosql介绍 &#xff08;二&#xff09;产品特点 1、存储性 2、 效率性 3、结构 三、安装和配置 &#xff08;一&#xff09…

Leetcode-每日一题【剑指 Offer 10- I. 斐波那契数列】

题目 写一个函数&#xff0c;输入 n &#xff0c;求斐波那契&#xff08;Fibonacci&#xff09;数列的第 n 项&#xff08;即 F(N)&#xff09;。斐波那契数列的定义如下&#xff1a; F(0) 0, F(1) 1 F(N) F(N - 1) F(N - 2), 其中 N > 1. 斐波那契数列由 0 和 1 开…

考研C语言进阶题库——更新6-10题

目录 6输入一个字符串&#xff0c;输出其中字母的个数 7用递归求函数值x1,f(x)10,x>1.f(x)f(x-1)2 8所给字符串正序反序连接&#xff0c;形成新串并输出 9输入若干个整数以-1标记为结束输出其中的最大值 10求矩阵的两条对角线之和 6输入一个字符串&#xff0c;输出其中…

[openCV]基于赛道追踪的智能车巡线方案V1

import cv2 as cv import os import numpy as npimport time# 遍历文件夹函数 def getFileList(dir, Filelist, extNone):"""获取文件夹及其子文件夹中文件列表输入 dir&#xff1a;文件夹根目录输入 ext: 扩展名返回&#xff1a; 文件路径列表""&quo…

Golang map 常用方法

文章目录 前言按key排序按value排序统计字符串中元素重复出现次数并将结果排序返回 前言 由于map是无序的&#xff0c;所以排序成 slice 返回&#xff0c;且都使用泛,代码只是示例&#xff0c;省略了错误判断 按key排序 package mainimport "fmt"func example[T c…

C语言刷题------(1)

C语言刷题 博主用的刷题网站&#xff1a;题库 - 蓝桥云课 (lanqiao.cn) 小伙伴们可以去试试&#xff01;&#xff01;&#xff01; First question 题目&#xff1a;成绩统计 题目描述&#xff1a;小蓝给学生们组织了一场考试&#xff0c;卷面总分为 100 分&#xff0c;每…

K3s vs K8s:轻量级对决 - 探索替代方案

在当今云原生应用的领域中&#xff0c;Kubernetes&#xff08;简称K8s&#xff09;已经成为了无可争议的领导者。然而&#xff0c;随着应用规模的不断增长&#xff0c;一些开发者和运维人员开始感受到了K8s的重量级特性所带来的挑战。为了解决这一问题&#xff0c;一个名为K3s的…

深入解析Spring MVC注解:@PathVariable、@ResponseBody和@RequestParam的用法和区别

简介 在Spring MVC框架中&#xff0c;PathVariable、ResponseBody和RequestParam是常用的注解&#xff0c;它们分别用于处理请求的路径变量、响应数据格式和请求参数。本文将深入介绍这些注解的用法&#xff0c;并详细讨论它们之间的区别&#xff0c;以便开发者在构建Web应用程…

【软件测试】一份合格的软件测试简历长什么样?

你可以写一篇出众的软件测试简历并且这篇测试用例能够为你带来面试电话么&#xff1f;如果没有&#xff0c;请继续阅读。我敢肯定&#xff0c;读完这篇文章&#xff0c;你将能够写出一个完美的杀手级别的软件测试和质量保证的简历&#xff0c;这将为你带来面试电话。 你的简历是…

如何在轻量级RTSP服务支持H.264扩展SEI发送接收自定义数据?

为什么开发轻量级RTSP服务&#xff1f; 开发轻量级RTSP服务的目的是为了解决在某些场景下用户或开发者需要单独部署RTSP或RTMP服务的问题。这种服务的优势主要有以下几点&#xff1a; 便利性&#xff1a;通过轻量级RTSP服务&#xff0c;用户无需配置单独的服务器&#xff0c;…

CentOS6如何进入单用户模式

问题&#xff1a;因为挂载有问题&#xff0c;开机启动不了&#xff0c;需要进入单用户模式进入修改fstab挂载文件。 1、Linux系统开机&#xff0c;在3秒内按下啊e&#xff0c;然后跳转到内核界面。 2、再按下e进入如下界面&#xff0c;选择kernel的一项&#xff0c;然后按下e键…

视频安防监控EasyCVR平台海康大华设备国标GB28181告警布防的报文说明

TSINGSEE青犀视频监控综合管理平台EasyCVR基于云边端协同&#xff0c;可支持海量视频的轻量化接入与汇聚管理。平台既具备传统安防视频监控的能力&#xff0c;比如&#xff1a;视频监控直播、云端录像、云存储、录像检索与回看、告警上报、平台级联、云台控制、语音对讲等&…

mysql的索引

在 MySQL 中&#xff0c;可以在建表语句中使用主键索引、唯一索引和普通索引来定义表的索引 主键索引&#xff1a; 主键索引用于唯一标识表中的每一行数据&#xff0c;通常用于加速根据主键值进行数据检索的操作。在建表时&#xff0c;可以通过 PRIMARY KEY 关键字定义主键索引…

深度学习Redis(2):持久化

前言 在上一篇文章中&#xff0c;介绍Redis的内存模型&#xff0c;从这篇文章开始&#xff0c;将依次介绍Redis高可用相关的知识——持久化、复制(及读写分离)、哨兵、以及集群。 本文将先说明上述几种技术分别解决了Redis高可用的什么问题&#xff1b;然后详细介绍Redis的持…

elasticsearch 配置用户名和密码

无密码的其他配置项在&#xff1a;https://blog.csdn.net/Xeon_CC/article/details/132064295 elasticsearch.yml配置文件&#xff1a; xpack.security.enabled: true xpack.security.http.ssl.enabled: true xpack.security.http.ssl.keystore.path: /path/to/elastic-certi…

软件测试分类总结

目录 1.根据源代码可见度划分 1.1黑盒测试 1.2白盒测试 1.3灰盒测试 2.根据开发阶段划分 2.1单元测试 2.2集成测试 2.3系统测试 2.4验收测试 3.按照实施组织划分 3.1α测试 3.2β测试 3.3第三方测试 4.按照是否运行程序划分 4.1静态测试 4.2动态测试 5.根据软件测试工作的…