hedfs和hive数据迁移后校验脚本

先谈论校验方法,本人腾讯云大数据工程师。

1、hdfs的校验

这个通常就是distcp校验,hdfs通过distcp迁移到另一个集群,怎么校验你的对不对。

有人会说,默认会有校验CRC校验。我们关闭了,为什么关闭?全量迁移,如果当前表再写数据,开自动校验就会失败。数据量大(PB级)迁移流程是先迁移全量,后面在定时补最近几天增量,再找个时间点,进行业务割接

那么怎么知道你迁移的hdfs是否有问题呢?

2个文件,一个是脚本,一个是需要校验的目录

data_checksum.py

# -*- coding: utf-8 -*-
# @Time    : 2025/1/16 22:52
# @Author  : fly-wlx
# @Email   : xxx@163.com
# @File    : data_compare.py
# @Software: PyCharmimport subprocess#output_file = 'data_checksum_result.txt'
def load_file_paths_from_conf(conf_file):file_list = []with open(conf_file, 'r') as file:lines = file.readlines()for line in lines:path = line.strip()if path and not path.startswith('#'):  # 跳过空行和注释full_path = f"{path}"file_list.append(full_path)return file_list#def write_sizes_to_file(filepath,source_namenode,source_checksum,target_namenode,target_checksum,status, output_file):
#    with open(output_file, 'w') as file:
#file.write(f"{source_namenode}/{filepath},{source_checksum},{target_namenode}/{filepath},{target_checksum},{status}\n")def write_sizes_to_file(source_path, src_info, destination_path, target_info, status,output_file):with open(output_file, 'a') as file:file.write(f"{source_path},{src_info},{destination_path}, {target_info}, {status}\n")
def run_hadoop_command(command):"""运行 Hadoop 命令并返回输出"""try:result = subprocess.check_output(command, shell=True, text=True)return result.strip()except subprocess.CalledProcessError as e:print(f"Command failed: {e}")return Nonedef get_hdfs_count(hdfs_filepath):"""获取 HDFS 路径的文件和目录统计信息"""command = f"hadoop fs -count {hdfs_filepath}"output = run_hadoop_command(command)if output:parts = output.split()if len(parts) >= 3:dir_count, file_count, content_size = parts[-3:]return dir_count, file_count, content_sizereturn None, None, Nonedef get_hdfs_size(hdfs_filepath):"""获取 HDFS 路径的总文件大小"""command = f"hadoop fs -du -s {hdfs_filepath}"output = run_hadoop_command(command)if output:parts = output.split()if len(parts) >= 1:return parts[0]return Nonedef validate_hdfs_data(source_namenode, target_namenode,filepath):output_file = 'data_checksum_result.txt'source_path=f"{source_namenode}/{filepath}"destination_path = f"{target_namenode}/{filepath}""""校验 HDFS 源路径和目标路径的数据一致性"""print("Fetching source path statistics...")src_dir_count, src_file_count, src_content_size = get_hdfs_count(source_path)src_total_size = get_hdfs_size(source_path)print("Fetching destination path statistics...")dest_dir_count, dest_file_count, dest_content_size = get_hdfs_count(destination_path)dest_total_size = get_hdfs_size(destination_path)src_info={}src_info["src_dir_count"] = src_dir_countsrc_info["src_file_count"] = src_file_count#src_info["src_content_size"] = src_content_sizesrc_info["src_total_size"] = src_total_sizetarget_info = {}target_info["src_dir_count"] = dest_dir_counttarget_info["src_file_count"] = dest_file_count#target_info["src_content_size"] = dest_content_sizetarget_info["src_total_size"] = dest_total_sizeprint("\nValidation Results:")if (src_dir_count == dest_dir_count andsrc_file_count == dest_file_count and# src_content_size == dest_content_size andsrc_total_size == dest_total_size):print("✅ Source and destination paths are consistent!")write_sizes_to_file(source_path, src_info, destination_path,target_info, 0,output_file)else:print("❌ Source and destination paths are inconsistent!")write_sizes_to_file(source_path, src_info, destination_path, target_info, 1,output_file)#print(f"Source: DIR_COUNT={src_dir_count}, FILE_COUNT={src_file_count}, CONTENT_SIZE={src_content_size}, TOTAL_SIZE={src_total_size}")#print(f"Destination: DIR_COUNT={dest_dir_count}, FILE_COUNT={dest_file_count}, CONTENT_SIZE={dest_content_size}, TOTAL_SIZE={dest_total_size}")# 设置源路径和目标路径
#source_path = "hdfs://namenode1:8020/"
#destination_path = "hdfs://namenode2:8020/path/to/destination"
# 定义源和目标集群的 namenode 地址
source_namenode = "hdfs://10.xx.xx.6:8020"
target_namenode= "hdfs://10.xx.xx.106:4007"def main():# 配置文件路径和输出文件路径conf_file = 'distcp_paths.conf'# 定义源和目标集群的 namenode 地址# 设置源路径和目标路径#source_namenode = "hdfs://source-namenode:8020"#target_namenode = "hdfs://target-namenode:8020"# 文件列表file_paths = load_file_paths_from_conf(conf_file)# 对每个目录进行校验for filepath in file_paths:validate_hdfs_data(source_namenode, target_namenode, filepath)if __name__ == "__main__":main()# 执行校验
#validate_hdfs_data(source_path, destination_path)

distcp_paths.conf

/apps/hive/warehouse/xx.db/dws_ixx_features
/apps/hive/warehouse/xx.db/dwd_xx_df

用法

直接python3 data_checksum.py(需要改为自己的)

他会实时打印对比结果,并且将结果生成到一个文件中(data_checksum_result.txt)

2、hive文件内容比对

最终客户要的是任务的数据对得上,而不是管你迁移怎么样,所以验证任务的方式:两边同时跑同多个Hive任务流的任务,查看表数据内容是否一致。(因为跑出来的hdfs的文件大小由于mapreduce原因,肯定是不一致的,校验实际数据一致就行了)

方法是先对比表字段,然后对比count数,然后将每行拼起来对比md5

涉及3个文件,单检测脚本,批量入口脚本,需要批量检测的表文件

check_script.sh

#!/bin/bash
#owner:clark.shi
#date:2025/1/22
#背景:用于hive从源端任务和目标端任务,两边跑完结果表的内容校验(因为mapreduce和小文件不同,所以要用数据内容校验)
#     --用trino(presto)会更好,因为可以跨集群使用,目前客户因为资源情况没装,此为使用hive引擎,将数据放到本地进行比对#输入:源端表,目标表,分区名,分区值
#$0是脚本本身,最低从1开始#限制脚本运行内存大小,30gb
#ulimit -v 30485760#---注意,要保证,2个表的字段顺序是一样的(md5是根据顺序拼接的)
echo "================"
echo "注意"
echo "要保证,2个表的字段顺序是一样的(md5是根据顺序拼接的)"
echo "要保证,这2个表是存在的"
echo "要保证,双端是可以互相访问"
echo "要保证,2个hive集群的MD5算法相同"
echo "禁止表,一个分区数据量超过本地磁盘,此脚本会写入本地磁盘(双端数据),对比后删除"
echo "注意,如果分区字段是数字不用加引号,如果是字符串需要加引号,搜partition_value,这里分区是int如20250122是没有引号"
echo "================"a_table=$1
b_table=$2
partition_column=$3
partition_value=$4if [ $# -ne 4 ]; thenecho "错误:必须输入 4 个参数,源端表,目标表,分区名,分区值"exit 1
fi#------------函数check_value() {# 第一个参数是布尔值,第二个参数是要 echo 的内容local value=$1local message=$2# 检查第一个参数的值if [ "$value" == "false" ]; thenecho "校验失败:$message" >> rs.txtexit fi
}#-----------函数结束echo "需要对比表的数据内容是$a_table和$b_table--,需要对比分区$partition_column是$partition_value--"sleep 2
echo "===============开始校验============="
#todo改成自己的,kerbers互信认证(也可以用ldap)
`kinit -kt /root/s_xx_tbds.keytab s_xx_tbds@TBDS-V12X10CS`#校验字段类型
echo "1.开始校验字段类型"#todo这里要改成自己的beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" -e "DESCRIBE $b_table" > 1_a_column.txtbeeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" -e "DESCRIBE $a_table" > 1_b_column.txtif diff 1_a_column.txt 1_b_column.txt > /dev/null; thenecho "表结构一致"elseecho "表结构不一致"check_value false "$a_table和$b_table字段类型不一致"fi echo "------------1.表字段,校验完毕,通过-------------"#校验count数
echo "2.开始count校验"beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" -e "select count(*) from $b_table where $partition_column=$partition_value" > 2_a_count.txtbeeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" -e "select count(*) from $a_table where $partition_column=$partition_value" > 2_b_count.txtif diff 2_a_count.txt 2_b_count.txt > /dev/null; thenecho "数据行一致"elseecho "数据行不一致"check_value false "$a_table和$b_table的数据行不一致"fiecho "------------2.数据行,校验完毕,通过-------------"#拼接每一行的值,作为唯一值,创建2个临时表
echo "3.生成每条数据唯一标识"#1.获取表列名#使用awk,去除第一行字段名,,删除#字号以及他后面的内容(一般是分区的描述),根据分隔符|取第一列数据,去掉空的行beeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" --outputformat=dsv -e "DESCRIBE $a_table" |awk 'NR > 1' |awk '!/^#/ {print} /^#/ {exit}'|awk 'BEGIN {FS="|"} {print $1}'|awk 'NF > 0' > 3_table_field_name.txt#2.拼接表列名,生成md5的表 (第一步已经检测过双方的表结构了,这里用同一个拼接字段即可)# 使用 while 循环逐行读取文件内容name_fields=""while IFS= read -r line; doif [ -z "$name_fields" ]; thenname_fields="$line"elsename_fields="$name_fields,$line"fidone < "3_table_field_name.txt"echo "$name_fields"#将每行数据进行拼接,并且生成含一个字段的md5表md5_sql="SELECT distinct(MD5(CONCAT($name_fields))) AS md5_value "a_md5_sql="$md5_sql from (select * from dim_user_profile_df where $partition_column=$partition_value  limit 100)a;"b_md5_sql="$md5_sql from $a_table where $partition_column=$partition_value;"echo "a表的sql是:$a_md5_sql"echo "b表的sql是:$b_md5_sql"#源端是生产环境,这里做了特殊处理,源端就取100条(没使用order by rand(),客户主要是检测函数,order by 会占用他们集群资源)beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" --outputformat=dsv -e "$a_md5_sql" > 4_a_md5_data.txtbeeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" --outputformat=dsv -e "$b_md5_sql" > 4_b_md5_data.txt#3.(由于不是同集群,需要下载到本地,再进行导入--如果耗费资源时长太长,再导入到hive,否则直接shell脚本搞定)# 设置large_file和small_file的路径large_file="4_b_md5_data.txt"small_file="4_a_md5_data.txt"# 遍历small_file中的每一行while IFS= read -r line; do# 检查line是否存在于large_file中if grep -qxF "$line" "$large_file"; then# 如果line存在于large_file中,输出1#echo "1"a=1else# 如果line不存在于large_file中,输出2echo "2"check_value false "$a_table和$b_table抽样存在数据内容不一致"fidone < "$small_file"echo echo "------------3.数据内容,校验完毕,通过-------------"
#抽样核对md5(取数据时已抽样,否则数据太大容易跑挂生产环境) 

input_file.txt需要校验的表文件

源端表名,目标端表名,分区字段(写1级分区就可以),分区值

ods_xxnfo_di ods_xxnfo_dii dt 20250106

ods_asxx_log_di ods_asxx_log_dii dt 20250106

ods_xxog_di ods_xxog_di dt 20250106

dwd_xxx dwd_xxx dt 20250106

run.sh

#!/bin/bash# 设置文件路径
input_file="input_file.txt"# 遍历文件中的每一行
while IFS= read -r line; do# 调用另一个脚本并传递当前行的参数echo $line./check_script.sh $line# 在每次执行完后间隔一小段时间,避免系统过载(可选)sleep 1
done < "$input_file"

使用方法

sh run.sh(需要把check_scripe和run里的内容改成自己的哈)

他会把不通过的,生成一个rs.txt

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/67077.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

61,【1】BUUCTF WEB BUU XSS COURSE 11

进入靶场 左边是吐槽&#xff0c;右边是登录&#xff0c;先登录试试 admin 123456 admiin# 123456 admin"# 123456 不玩了&#xff0c;先去回顾下xss 回顾完就很尴尬了&#xff0c;我居然用SQL的知识去做xss的题 重来 吐槽这里有一个输入框&#xff0c;容易出现存储型…

海外问卷调查如何影响企业的经营?在品牌建设中有何指导意义?

市场调查的定义&#xff1a;通过科学的方法&#xff0c;有目的地、系统地搜集整理一些市场信息&#xff0c;其目的在于了解当下市场现状和发展前景&#xff0c;为企业生产和品牌打造提供一些科学的指导意见&#xff0c;这是任何大企业、中小企业、初创企业都必须重视的一个重要…

STM32新建不同工程的方式

新建工程的方式 1. 安装开发工具 MDK5 / keil52. CMSIS 标准3. 新建工程3.1 寄存器版工程3.2 标准库版工程3.3 HAL/LL库版工程3.4 HAL库、LL库、标准库和寄存器对比3.5 库开发和寄存器的关系 4. STM32CubeMX工具的作用 1. 安装开发工具 MDK5 / keil5 MDK5 由两个部分组成&#…

idea maven本地有jar包,但还要从远程下载

idea 中&#xff0c;java 工程执行 maven reimport&#xff0c;报jar报无法下载。 我奇了个怪&#xff0c;我明明在本地仓库有啊&#xff0c;你非得从远程下载&#xff1f; 我从供应商那里拿来的&#xff0c;远程当然没有了。 这太奇葩了吧&#xff0c;折腾好久不行。 后来…

250125-package

1. 定义 包就是文件夹&#xff0c;作用是在大型项目中&#xff0c;避免不同人的编写的java文件出现同名进而导致报错&#xff1b;想象一个场景&#xff0c;在一个根目录中&#xff0c;每一个人都有自己的一个java文件夹&#xff0c;他可以将自己编写的文件放在该文件夹里&…

系统思考—动态问题分析

“不是解决问题&#xff0c;而是根本改变它的方式&#xff0c;才能真正创造持久的成功。”——彼得德鲁克 在很多情况下&#xff0c;企业面对问题时&#xff0c;总会急于寻找解决方案&#xff0c;但这些方案往往只是暂时的“应急措施”。它们看似有效&#xff0c;却难以从根本…

系统架构设计师教材:信息系统及信息安全

信息系统 信息系统的5个基本功能&#xff1a;输入、存储、处理、输出和控制。信息系统的生命周期分为4个阶段&#xff0c;即产生阶段、开发阶段、运行阶段和消亡阶段。 信息系统建设原则 1. 高层管理人员介入原则&#xff1a;只有高层管理人员才能知道企业究竟需要什么样的信…

Golang Gin系列-5:数据模型和数据库

在这篇Gin教程的博客中&#xff0c;我们将探索如何将模型和数据库与Gin框架无缝集成&#xff0c;使你能够构建健壮且可扩展的web应用程序。通过利用流行的库并遵循最佳实践&#xff0c;你将学习如何定义模型、建立数据库连接、执行CRUD操作以及确保基于gin的项目中的数据完整性…

Moretl FileSync增量文件采集工具

永久免费: <下载> <使用说明> 我们希望Moretl FileSync是一款通用性很好的文件日志采集工具,解决工厂环境下,通过共享目录采集文件,SMB协议存在的安全性,兼容性的问题. 同时,我们发现工厂设备日志一般为增量,为方便MES,QMS等后端系统直接使用数据,我们推出了增量采…

SWPU 2022 新生赛--web题

奇妙的MD5 进入靶场 然我们输入一个特殊的字符串&#xff0c;然后我到处翻了翻&#xff0c;发现有提示 在MD5中有两个特殊的字符串 0e215962017 //MD5加密后弱比较等于自身 ffifdyop //MD5加密后变成万能密码 这里明显就是万能密码了 输入之后就来到了这个页…

PyQt6医疗多模态大语言模型(MLLM)实用系统框架构建初探(上.文章部分)

一、引言 1.1 研究背景与意义 在数字化时代,医疗行业正经历着深刻的变革,智能化技术的应用为其带来了前所未有的发展机遇。随着医疗数据的指数级增长,传统的医疗诊断和治疗方式逐渐难以满足现代医疗的需求。据统计,全球医疗数据量预计每年以 48% 的速度增长,到 2025 年将…

怎么样把pdf转成图片模式(不能复制文字)

贵但好用的wps&#xff0c; 转换——转为图片型pdf —————————————————————————————————————————— 转换前&#xff1a; 转换后&#xff1a; 肉眼可见&#xff0c;模糊了&#xff0c;且不能复制。 其他免费办法&#xff0c;参考&…

C# OpenCV机器视觉:利用CNN实现快速模板匹配

在一个阳光灿烂的周末&#xff0c;阿强正瘫在沙发上&#xff0c;百无聊赖地换着电视频道。突然&#xff0c;一则新闻吸引了他的注意&#xff1a;某博物馆里一幅珍贵的古画离奇失踪&#xff0c;警方怀疑是被一伙狡猾的盗贼偷走了&#xff0c;现场只留下一些模糊不清的监控画面&a…

智能电动汽车系列 --- 智能汽车向车载软件转型

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 简单,单纯,喜欢独处,独来独往,不易合同频过着接地气的生活,除了生存温饱问题之外,没有什么过多的欲望,表面看起来很高冷,内心热情,如果你身…

YOLOv8改进,YOLOv8检测头融合DynamicHead,并添加小目标检测层(四头检测),适合目标检测、分割等,全网独发

摘要 作者提出一种新的检测头,称为“动态头”,旨在将尺度感知、空间感知和任务感知统一在一起。如果我们将骨干网络的输出(即检测头的输入)视为一个三维张量,其维度为级别 空间 通道,这样的统一检测头可以看作是一个注意力学习问题,直观的解决方案是对该张量进行全自…

GitLab配置免密登录和常用命令

SSH 免密登录 Windows免密登录 删除现有Key 访问目录&#xff1a;C:\Users\Administrator\ .ssh&#xff0c;删除公钥&#xff1a;id_rsa.pub &#xff0c;私钥&#xff1a;id_rsa 2.生成.ssh 秘钥 运行命令生成.ssh 秘钥目录&#xff08; ssh-keygen -t rsa -C xxxxxx126.…

VUE的安装

要用vue必须要先安装nodejs nodejs的安装及环境配置 1.下载安装包 下载地址&#xff1a; https://nodejs.org/zh-cn/download/ 2.安装程序 下载完成后&#xff0c;双击安装包开始安装 ①点击next ②点同意、next ③默认路径是C:\Program Files\nodejs\&#xff0c;可修改…

chrome插件:网页图片高清下载

前置条件&#xff1a; 安装有chrome谷歌浏览器的电脑 使用步骤&#xff1a; 1.打开chrome扩展插件 2.点击管理扩展程序 3.加载已解压的扩展程序 4.选择对应文件夹 5.成功后会出现一个扩展小程序 6.点击对应小程序 7.输入需要访问的网址&#xff0c;点击扩展插件即可进行图片…

[操作系统] 进程地址空间管理

虚拟地址空间的初始化 缺页中断 缺页中断的概念 缺页中断&#xff08;Page Fault Interrupt&#xff09; 是指当程序访问的虚拟地址在页表中不存在有效映射&#xff08;即该页未加载到内存中&#xff09;时&#xff0c;CPU 会发出一个中断信号&#xff0c;请求操作系统加载所…

HTML5 Web Worker 的使用与实践

引言 在现代 Web 开发中&#xff0c;用户体验是至关重要的。如果页面在执行复杂计算或处理大量数据时变得卡顿或无响应&#xff0c;用户很可能会流失。HTML5 引入了 Web Worker&#xff0c;它允许我们在后台运行 JavaScript 代码&#xff0c;从而避免阻塞主线程&#xff0c;保…