测试业务需要:
现有A系统作为下游数据系统,上游系统有A1,A2,A3...
需要将A1,A2,A3...的数据达到某条件后(比如:A1系统销售单提交出库成功)自动触发MQ然后再经过数据清洗落到A系统,并将清洗后数据通过特定规则汇总在A系统报表中
现在需要QA同学验证的功能是:
A系统存储数据清洗后的库表(为切片表)有几十个,且前置系统较多,测试工作量也较多
需要核对清洗后A存库数据是否准确
清洗规则:(1)直接取数 (2)拼接取数 (3)映射取数
直接取数字段在2系统表中字段命名规则一致
so,以下测试工具是针对直接取数规则来开发,以便于测试
代码实现步骤:
(1)将表字段,来源系统表和切片表 数据库链接信息,查询字段 作为变量
将这些信息填入input.xlsx 作为入参
(2)读取表字段,根据来源系统表 数据库链接信息,查询字段
查询来源库表,将查询出字段值存储outfbi.xlsx
(3)读取表字段,根据切片表 数据库链接信息,查询字段
查询切片库表,将查询出字段值存储outods.xlsx
(4)对比outfbi.xlsx,outods.xlsx的字段值
对比后生成result.xlsx文件,新增列校验结果
核对字段值一致校验结果为Success,否则为Fail
代码如下:
入参文件见附件
DbcheckApi.py
import os
import pymysql
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
import datetime
import ast"""测试数据路径管理"""
SCRIPTS_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
GENERATECASE_DIR = os.path.join(SCRIPTS_DIR, "dbcheck")
inputDATAS_DIR = os.path.join(GENERATECASE_DIR, "inputdata")
outDATAS_DIR = os.path.join(GENERATECASE_DIR, "outdata")class DbcheckApi():def __init__(self,data):self.inputexcel=dataworkbook = load_workbook(filename=self.inputexcel)sheet = workbook['数据源']# 读取来源表-连接信息sourcedb_connection_info = ast.literal_eval(sheet['B3'].value)odsdb_connection_info = ast.literal_eval(sheet['B4'].value)source_db = sheet['C3'].value.strip()ods_db = sheet['C4'].value.strip()source_queryby = sheet['D3'].value.strip()ods_queryby = sheet['D4'].value.strip()print(sourcedb_connection_info)print(odsdb_connection_info)print(source_db)print(ods_db)print(source_queryby)print(ods_queryby)self.sourcedb = sourcedb_connection_infoself.odsdb = odsdb_connection_infoself.source_db = source_dbself.ods_db = ods_dbself.source_queryby = source_querybyself.ods_queryby = ods_querybydef source_select_db(self):host = self.sourcedb.get('host')port = self.sourcedb.get('port')user = self.sourcedb.get('user')passwd = self.sourcedb.get('passwd')db = self.sourcedb.get('db')if not host or not port or not user or not passwd or not db:error_msg = "连接信息不完整"return {"code": -1, "msg": error_msg, "data": ""}cnnfbi = pymysql.connect(host=host,port=port,user=user,passwd=passwd,db=db)cursor = cnnfbi.cursor()try:# 读取Excel文件df = pd.read_excel(self.inputexcel, sheet_name='Sheet1')# 获取第1列,从第2行开始读取的字段名fields = df.iloc[1:, 0].tolist()print(fields)# 构建查询SQL语句sql = "SELECT {} FROM {} WHERE {}".format(', '.join(fields), self.source_db, self.source_queryby)print(sql)# 执行查询语句cursor.execute(sql)except pymysql.err.OperationalError as e:error_msg = str(e)if "Unknown column" in error_msg:column_name = error_msg.split("'")[1]msg={"code": -1, "msg": f"列字段 {column_name} 在 "+self.source_db+" 表结构中不存在,请检查!", "data": ""}print(msg)return {"code": -1, "msg": f"列字段 {column_name} 在 "+self.source_db+" 表结构中不存在,请检查!", "data": ""}else:return {"code": -1, "msg": error_msg, "data": ""}print(error_msg)# 获取查询结果result = cursor.fetchall()# 关闭游标和连接cursor.close()cnnfbi.close()# 检查查询结果是否为空if not result:return {"code": -1, "msg": f"查询无数据,请检查sql: {sql}", "data": ""}else:# 将结果转换为DataFrame对象df = pd.DataFrame(result, columns=fields)odskey=self.source_db+'表-字段'odsvalue=self.source_db+'表-字段值'# 创建新的DataFrame对象,将字段和对应值放在两列df_new = pd.DataFrame({odskey: fields, odsvalue: df.iloc[0].values})outexcel = os.path.join(outDATAS_DIR, 'outputfbi.xlsx')# 导出结果到Excel文件df_new.to_excel(outexcel, index=False)def ods_select_db(self):host = self.odsdb.get('host')port = self.odsdb.get('port')user = self.odsdb.get('user')passwd = self.odsdb.get('passwd')db = self.odsdb.get('db')if not host or not port or not user or not passwd or not db:raise ValueError("连接信息不完整")cnnfbi = pymysql.connect(host=host,port=port,user=user,passwd=passwd,db=db)cursor = cnnfbi.cursor()try:# 读取Excel文件df = pd.read_excel(self.inputexcel, sheet_name='Sheet1')# 获取第1列,从第2行开始读取的字段名fields = df.iloc[1:, 0].tolist()print(fields)# 构建查询SQL语句sql = "SELECT {} FROM {} WHERE {}".format(', '.join(fields), self.ods_db, self.ods_queryby)print(sql)# 执行查询语句cursor.execute(sql)except pymysql.err.OperationalError as e:error_msg = str(e)if "Unknown column" in error_msg:column_name = error_msg.split("'")[1]return {"code": -1, "msg": f"列 {column_name} 不存在"+self.ods_db+" 表结构中,请检查!", "data": ""}else:return {"code": -1, "msg": error_msg, "data": ""}# 获取查询结果result = cursor.fetchall()# 关闭游标和连接cursor.close()cnnfbi.close()# 将结果转换为DataFrame对象df = pd.DataFrame(result, columns=fields)# 创建新的DataFrame对象,将字段和对应值放在两列odskey=self.ods_db+'表-字段'odsvalue=self.ods_db+'表-字段值'df_new = pd.DataFrame({odskey: fields, odsvalue: df.iloc[0].values})# 导出结果到Excel文件outexcel = os.path.join(outDATAS_DIR, 'outputfms.xlsx')df_new.to_excel(outexcel, index=False)def check_order(self):self.source_select_db()self.ods_select_db()outputfbi = os.path.join(outDATAS_DIR, 'outputfbi.xlsx')outputfms = os.path.join(outDATAS_DIR, 'outputfms.xlsx')df_a = pd.read_excel(outputfbi)df_b = pd.read_excel(outputfms)# 创建新的DataFrame对象用于存储C表的数据df_c = pd.DataFrame()# 将A表的列写入C表for col in df_a.columns:df_c[col] = df_a[col]# 将B表的列���入C表for col in df_b.columns:df_c[col] = df_b[col]odsvalue=self.ods_db+'表-字段值'fbivalue=self.source_db+'表-字段值'# 比对A2和B2列的值,如果不一致,则在第5列写入"校验失败"df_c['校验结果'] = ''for i in range(len(df_c)):if pd.notnull(df_c.at[i, fbivalue]) and pd.notnull(df_c.at[i, odsvalue]):fbivalue_rounded = df_c.at[i, fbivalue]odsvalue_rounded = df_c.at[i, odsvalue]if isinstance(fbivalue_rounded, (int, float)):fbivalue_rounded = round(fbivalue_rounded, 3)elif isinstance(fbivalue_rounded, datetime.datetime):fbivalue_rounded = round(fbivalue_rounded.timestamp(), 3)else:try:fbivalue_rounded = round(float(fbivalue_rounded), 3)except ValueError:passif isinstance(odsvalue_rounded, (int, float)):odsvalue_rounded = round(odsvalue_rounded, 3)elif isinstance(odsvalue_rounded, datetime.datetime):odsvalue_rounded = round(odsvalue_rounded.timestamp(), 3)else:try:odsvalue_rounded = round(float(odsvalue_rounded), 3)except ValueError:passif fbivalue_rounded != odsvalue_rounded:df_c.at[i, '校验结果'] = 'Fail'else:df_c.at[i, '校验结果'] = 'Success'# 将结果写入到C.xlsx文件df_c.to_excel('checkhead_result.xlsx', index=False)# 打开C.xlsx文件并设置背景色book = load_workbook('checkhead_result.xlsx')writer = pd.ExcelWriter('checkhead_result.xlsx', engine='openpyxl')writer.book = book# 获取C.xlsx的工作表sheet_name = 'Sheet1'ws = writer.book[sheet_name]# 设置背景色为红色red_fill = PatternFill(start_color='FFFF0000', end_color='FFFF0000', fill_type='solid')# 遍历校验结果列,将不一致的单元格设置为红色背景for row in ws.iter_rows(min_row=2, min_col=len(df_c.columns), max_row=len(df_c), max_col=len(df_c.columns)):for cell in row:if cell.value == 'Fail':cell.fill = red_fill# 保存Excel文件writer.save()writer.close()if __name__ == '__main__':inputexcel = os.path.join(inputDATAS_DIR, 'input.xlsx')DbcheckApi(inputexcel).check_order()