利用flask + pymysql监测数据同步中的数据是否完整

一、背景

        最近项目搞重构,将原有的系统拆分成了多个子系统。但是有数据表需要在不同系统中数据,同时为了解决项目性能最了一个很简单的方案,就是公共数据存在每个系统之中。

二、分析

        分析这些表,这些表相比源数据表,表结构完全相同,除了名称加了MD前缀。但数据却要相同。这里我们可以从几个维度去进行比对:

        a、表结构

        b、表数据量

        c、表主键最大id

        d、表数据

三、编码

因最开始是以脚本的形式来写,所有没做配置文件。

1、数据库相关

数据库连接信息

# coding=utf8"""
数据库配置信息
"""mysql_data = {"CRM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"PLM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"MES": {"host": "127.0.0.1","user": "user_name","password": ********","db": "db_name","port": 3306},"SCM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"UC": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"GDM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"ESM": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"LOCAL": {"host": "127.0.0.1","user": "user_name","password": "********","db": "db_name","port": 3306},"78": {'host': '127.0.0.1',"user": 'user_name','password': '********',"db": "db_name","port": 3306},"PLM-PRE": {'host': '127.0.0.1',"user": 'user_name','password': '********',"db": "db_name","port": 3306}
}

数据库连接池

# coding=utf8
import pymysql
from pymysql import err"""
获取crm的数据到plm
1、项目相关数据
2、合同相关数据
"""class MSP_Connect:def __init__(self, mysql_info: dict):try:self.host = mysql_info['host']self.user = mysql_info['user']self.password = mysql_info['password']self.db = mysql_info['db']self.con = pymysql.connect(host=self.host,user=self.user,password=self.password,db=self.db,charset="utf8")self.cur = self.con.cursor()except KeyError:print("mysql_info键名错误,key值分别【host, user, password, db】")except err.OperationalError as e:if e.args[0] == 1045:print("MySql密码错误")else:print("发生了其它连接错误")# 执行sqldef execute(self, sql, values=None):if values == None:self.cur.execute(sql)return self.cur.fetchall()return self.cur.execute(sql, values)# 执行多条sqldef execute_many(self, sql, values):return self.cur.executemany(sql, values)# 提交事务def commit(self):self.con.commit()# 释放资源def close(self):self.cur.close()self.con.close()

2、检查处理

# coding=utf8
import pymysql
import os
from datetime import datetime, timedelta
from msp_script.database.connect_config import MSP_Connect
from msp_script.database.db_info import mysql_data
from openpyxl import Workbook, load_workbookdef check_msp_table_data(target_db_name, file_name=None):""":description: 检查数据同步:param target_db_name: 目标数据:param file_name: 文件名称:return:"""print('*******************************************     [' + target_db_name + ']     *******************************************')delete_excel_file()target_db = MSP_Connect(mysql_data[target_db_name])write_result = []# 1、获取目标系统中的主数据表print('1、获取目标系统中的主数据表')md_tables = get_table_index_by_md(target_db)print('\t主数据表为' + str(list(table[1] for table in md_tables)))info = []# 2、对比源数据、目标数据库表结构print('2、对比源数据、目标数据库表结构')for table in md_tables:target_table_name = table[1]  # 获取当前检查的数据表名词source_table_info = get_source_table_name(target_table_name)  # 获取原数据表名称以及在哪个系统source_table_name = source_table_info[0]source_db_name = source_table_info[1]if source_db_name in ('CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM'):source_db = MSP_Connect(mysql_data[source_db_name])# 检查表及表数量check_rusult = check_table_info(source_db, source_table_name, target_db, target_table_name)check_rusult.append(target_db_name)check_rusult.append(source_db_name)write_result.append(check_rusult)if check_rusult[8] != "" and check_rusult[8] != None:info.append(check_rusult[8])source_db.close()target_db.close()# 写入报告print('3、写入比对结果到Excel')write_excel(write_result, file_name)return infodef get_source_table_name(target_table_name):"""通过目标表名称获取源数据表名称:param target_table_name: 目标数据库名称:return: 源数据表名词,系统的简称"""try:source_table_name = target_table_name.split('_', 2)[2]sys_name = target_table_name.split('_', 2)[1].upper()return source_table_name, sys_nameexcept Exception as e:print("[" + target_table_name + "] 可能是非主数据表")def get_table_index_by_md(target_db: MSP_Connect):""":description: 获取目标数据库中的主数据表:param target_db: 目标数据库连接对象:return:"""get_md_table_sql = """SELECTTABLE_SCHEMA AS '数据库名',TABLE_NAME AS '表名',TABLE_ROWS AS '行数',DATA_LENGTH AS '数据大小',INDEX_LENGTH AS '索引大小',CREATE_TIME AS '创建时间',table_comment AS '表描述' FROMinformation_schema.TABLES WHERE TABLE_NAME LIKE 'md_%'"""result = target_db.execute(get_md_table_sql, None)return result# 检验msp数据同步
def check_table_info(source_db: MSP_Connect, source_table_name, target_db: MSP_Connect, target_table_name):""":param source_db: 源数据库连接对象:param source_table_name: 源数据表:param target_db: 目标数据库连接对象:param target_table_name: 目标数据表:return:"""# 获取表字段source_sql = "SHOW COLUMNS FROM `" + source_table_name + "`"target_sql = "SHOW COLUMNS FROM `" + target_table_name + "`"source_result = source_db.execute(source_sql, None)target_result = target_db.execute(target_sql, None)# 获取表数量source_count_sql = "select count(*) from `" + source_table_name + "`"target_count_sql = "select count(*) from `" + target_table_name + "`"source_count = source_db.execute(source_count_sql, None)target_count = target_db.execute(target_count_sql, None)# 获取最大idsource_max_id_sql = "select max(id) from `" + source_table_name + "`"target_max_id_sql = "select max(id) from `" + target_table_name + "`"source_max_id = source_db.execute(source_max_id_sql, None)target_max_id = target_db.execute(target_max_id_sql, None)if source_result == target_result:flag = '相同'else:flag = '不同'msg = "目标系统表【" + target_table_name + "】与源系统表【" + source_table_name + "】表结构【 " + flag + " 】最大值分别为【" + str(target_max_id[0][0]) + ", " + str(source_max_id[0][0]) + "】, 数量分别为【" + str(target_count[0][0]) +", "+ str(source_count[0][0]) +"】"# print(msg)result = Falseremark = ''check_data_result = check_data(source_db, source_table_name, target_db, target_table_name)if result == False:if source_result != target_result:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】表结构不一致'print('\t' + remark)elif source_count != target_count:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】数据量不一致,数据量差额为:' + str(abs(source_count[0][0] - target_count[0][0])) + '条'print('\t' + remark)elif source_max_id != target_max_id:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】表最大值不一致'print('\t' + remark)elif check_data_result[0] == False and len(check_data_result) == 2:remark = '目标表【' + target_table_name + '】与源系统表【' + source_table_name + '】最近的100条数据不相同,ID为:[' + check_data_result[1] + ']'print('\t' + remark)elif check_data_result[0] == False and len(check_data_result) == 3:remark = check_data_result[1]print('\t' + remark)else:result = Truereturn [flag,target_table_name,source_table_name,target_count[0][0],source_count[0][0],target_max_id[0][0],source_max_id[0][0],result,remark]# 将比对结果写入到Excel
def write_excel(write_result: list, file_name):""":param write_result: 写入的数据:param file_name: 文件名称无后缀:return:"""parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))file_dir = os.path.join(parent_dir, 'file')file_path = os.path.join(file_dir, file_name) + '.xlsx'if os.path.exists(file_path):work_book = load_workbook(file_path)work_sheet = work_book.activemax_row = work_sheet.max_rowelse:work_book = Workbook()work_sheet = work_book.activeheaders = ['序号', '目标系统', '目标系统表', '源系统', '源系统表', '表结构是否相同', '目标系统表数量', '原系统表数量', '目标系统最大值', '原系统最大值', '比对结果', '备注']for col, headers in enumerate(headers, start=1):work_sheet.cell(row=1, column=col, value=headers)max_row = 2for num, result in enumerate(write_result, start=max_row):raw = []raw.append(num - 1)     # 序号raw.append(result[9])   # 目标系统raw.append(result[1])   # 目标系统表raw.append(result[10])  # 源系统raw.append(result[2])   # 源系统表raw.append(result[0])   # 表结构是否相同raw.append(result[3])   # 目标系统表数量raw.append(result[4])   # 原系统表数量raw.append(result[5])   # 目标系统最大值raw.append(result[6])   # 原系统最大值raw.append(result[7])   # 比对结果raw.append(result[8])   # 备注work_sheet.append(raw)work_book.save(parent_dir + "/file/" + file_name + '.xlsx')# 删除前一天产生的Excel文件
def delete_excel_file():format_time = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')parent_path = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) + '/file'for file in os.listdir(parent_path):if file.endswith('.xlsx') and "数据比对结果_" + format_time in file:os.remove(os.path.join(parent_path, file))def check_data(source_db: MSP_Connect, source_table_name, target_db: MSP_Connect, target_table_name):""":param source_db: 源数据库连接:param source_table_name: 源表:param target_db: 目标数据库连接:param target_table_name: 目标表:return: 错误信息"""res = []try:source_sql = "select * from `" + source_table_name + "` order by update_at desc, id desc limit 0, 100"target_sql = "select * from `" + target_table_name + "` order by update_at desc, id desc limit 0, 100"source_result = source_db.execute(source_sql)target_result = target_db.execute(target_sql)# return True if source_result == target_result else Falseif target_result == source_result:res.append(True)return reselse:ids = ''for result in target_result:if result not in source_result:ids += (str(result[0]) + ',')res.append(False)res.append(ids[:-1])return resexcept pymysql.err.OperationalError as e:if e.args[0] == 1054:res.append(False)res.append("源系统表【" + source_table_name + "】表缺少update_at字段")res.append('error')print("\t源系统表【" + source_table_name + '】表缺少update_at字段')return resdef check_data_custom(source_db, target_db: list, tables: list):""":param source_db: 原数据库别名:param target_db: 目标数据库别名:param tables: 需要检查的的表名:return:"""result = []for db in target_db:tar_db = MSP_Connect(mysql_data[db.upper()])dict = {}list = []for table in tables:res = check_table_info(MSP_Connect(mysql_data[source_db]), table, tar_db, 'md_' + source_db.lower() + '_' + table)if res[7] == False:list.append({# 'res': res[7],'mark': res[8],'table': table})dict[db.upper()] = listresult.append(dict) if dict else Nonetar_db.close()return resultif __name__ == '__main__':# db_name = ['PLM', 'CRM', 'GDM', 'MES', 'SCM', 'ESM']db_name = ['PLM', 'CRM']# format_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))format_time = datetime.now().strftime('%Y%m%d%H%M%S')file_name = '数据比对结果_' + format_time# print(file_name)for name in db_name:print()print('****************************************************************************************')print(name + "系统数据开始检验...")check_msp_table_data(name, file_name)

3、api

# coding=utf8
import loggingfrom flask import Flask, jsonify, request, send_file
from msp_script.public.check_msp_syn import check_msp_table_data, check_data_custom
from datetime import datetime, timedelta
import osapp = Flask(__name__)
app.debug = True@app.route('/msp/api/check_syn_data', methods=['GET'])
def check_syn_data():""":param target_db_name 目标数据库名称,必要参数:param file_name 报告文件名称,必要参数"""target_db_name = request.args.get('target_db_name')file_name = request.args.get('file_name')info = {}if file_name == None or file_name == '':info['code'] = 500info['message'] = '[file_name]文件名称不能为空!'return jsonify(info)else:file_name = file_name + '_' + datetime.now().strftime('%Y%m%d%H%M%S')if target_db_name == None or target_db_name == '':info['code'] = 500info['message'] = '[target_db_name]数据库别名不能为空!'return jsonify(info)if target_db_name.__contains__(','):db_name = target_db_name.split(',')for db in db_name:if db.upper() not in ('CRM', 'PLM', 'MES', 'SCM', 'ESM', 'GDM'):info['code'] = 500info['message'] = '数据库别名只能是[CRM, PLM, MES, SCM, ESM, GDM]!'return jsonify(info)for name in db_name:data = check_msp_table_data(name.upper(), file_name)info[name] = datainfo['code'] = 200info['file'] = '/msp/api/download/' + file_name + '.xlsx'return jsonify(info)elif target_db_name.upper() not in ('CRM', 'PLM', 'MES', 'SCM', 'ESM', 'GDM'):info['code'] = 500info['message'] = '数据库别名只能是[CRM, PLM, MES, SCM, ESM, GDM]!'return jsonify(info)else:data = check_msp_table_data(target_db_name.upper(), file_name)info['code'] = 200info['file'] = '/msp/api/download/' + file_name + '.xlsx'info[target_db_name] = datareturn jsonify(info)@app.route('/msp/api/download/<string:file_name>', methods=['GET'])
def download_file(file_name):""":param file_name: 文件名称:param suffix: 文件后缀:return:"""info = {}if file_name == "" or file_name == None:info['code'] = 500info['message'] = '文件名称不能为空!'return jsonify(info)else:parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))file_dir = os.path.join(parent_dir, 'file')file_path = os.path.join(file_dir, file_name)if os.path.exists(file_path) and os.path.isfile(file_path):return send_file(file_path, as_attachment=True)else:info['code'] = 500info['message'] = '文件不存在!'return jsonify(info)@app.route('/msp/api/check_table_data', methods=['POST'])
def check_tables():""":param source_db 源数据库名称,必要参数:param target_db 目标数据库名称,必要参数:param tables 需要检查的源数据表名称,必要参数"""source_db = request.json.get('source_db').upper()target_db = [db.upper() for db in request.json.get('target_db')]tables = request.json.get('tables')result = {}if source_db == '' or source_db == None:result['code'] = 500result['message'] = 'source_db原数据库名称不能为空'return jsonify(result)elif source_db not in ['CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM']:result['code'] = 500result['message'] = '原数据库名称[' + source_db + ']不存在'return jsonify(result)if target_db == '' or target_db == None or len(target_db) == 0:result['code'] = 500result['message'] = 'target_db目标数据库名称不能为空'return jsonify(result)else:for t_db in target_db:if t_db not in ['CRM', 'PLM', 'MES', 'SCM', 'GDM', 'ESM']:result['code'] = 500result['message'] = '目标数据库名称[' + t_db + ']不存在'return jsonify(result)if source_db in target_db:result['code'] = 500result['message'] = 'source_db源数据库' + source_db + '不允许同时存在目标数据库列表' + str(target_db) + '中'return jsonify(result)if tables == '' or tables == None or len(tables) == 0:result['code'] = 500result['message'] = 'tables需要验证的表不能为空'return jsonify(result)else:res = check_data_custom(source_db, target_db, tables)result['code'] = 200result['message'] = '成功'result['source_db'] = source_dbresult['target_db'] = target_dbresult['tables'] = tablesresult['no_match_data'] = resreturn jsonify(result)@app.route("/demo", methods=['GET'])
def demo():dict = {"a": 1,"v": 2}return jsonify(dict)if __name__ == '__main__':app.run()

四、测试

这里一共提供三个接口

1、全量检查数据

/msp/api/check_syn_data   GET

参数:target_db_name   需要检查的数据库名称

           file_name    执行完输出的报告名称

2、自定义检查系统表数据

/msp/api/check_table_data   POST

参数:source_db  源数据库名称

           target_db   目标数据库名称

           tables   需要检查的表

3、下载检查结果

/msp/api/download/<文件名称>    GET

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/27921.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

行为型-观察者模式(Observer)

观察者模式&#xff08;Observer Pattern&#xff09;是一种软件设计模式&#xff0c;主要用于在对象之间建立一对多的依赖关系&#xff0c;以便当一个对象的状态发生改变时&#xff0c;其所有依赖的对象都会得到通知并自动更新。观察者模式属于行为型模式。以下是关于观察者模…

lvgl手势事件判断为点击事件问题

lvgl手势事件判断为点击事件问题处理方法 1.打开文件lvgl\src\core\lv_indev.c 2. 修改函数 static void indev_proc_release(_lv_indev_proc_t * proc)2.1 由原来的 /*** Process the released state of LV_INDEV_TYPE_POINTER input devices* @param proc pointer to an …

学习笔记——交通安全分析05

目录 前言 当天学习笔记整理 交通行为、心理与安全 结束语 前言 #随着上一轮SPSS学习完成之后&#xff0c;本人又开始了新教材《交通安全分析》的学习 #整理过程不易&#xff0c;喜欢UP就点个免费的关注趴 #本期内容接上一期04笔记 当天学习笔记整理 交通行为、心理与…

关于Flutter doctor里两个警告的消除

在终端里输入 flutter doctor 是我们检查flutter是否配置好的方法。 在输出信息里常碰到两个警告 ! Warning: dart on your path resolves to /usr/local/Cellar/dart/2.18.6/libexec/bin/dart,which is not inside your current Flutter SDK checkout at /Users/dengpeng/fl…

三运放仪表放大器通过设置单个电阻器的值来调整增益

从公式 1 中可以看出&#xff0c;我们可以通过调整单个电阻器 R G的值来调整仪表放大器的差分增益。这很重要&#xff0c;因为与电路中的其他电阻器不同&#xff0c; RG的值不需要与任何其他电阻器匹配。 例如&#xff0c;如果我们尝试通过更改 R 5的值来设置增益&#xff0c;…

TGI模型- 同期群-评论文本

用户偏好分析 TGI 1.1 用户偏好分析介绍 要分析的目标&#xff0c;在目标群体中的均值 和 全部群体里的均值进行比较&#xff0c; 差的越多说明 目标群体偏好越明显 TGI&#xff08;Target Group Index&#xff0c;目标群体指数&#xff09;用于反映目标群体在特定研究范围内…

在自己的电脑上搭建我的世界Java版服务器

很多朋友&#xff0c;喜欢玩Minecraft&#xff0c;也希望搭建一个服务器&#xff0c;用于和小伙伴联机&#xff1b; 并且&#xff0c;拥有服务器后&#xff0c;即使所有玩家都下线&#xff0c;“世界”依旧在运行&#xff0c;玩家可以随时参与其中&#xff0c;说不定一上线&am…

有状态服务和无状态服务

有状态服务和无状态服务在不同的业务场景下有不同的应用&#xff0c;以下是一些常见的例子&#xff1a; 有状态服务的场景&#xff1a; 用户会话管理&#xff1a; 用户登录后&#xff0c;会话信息&#xff08;如用户ID、权限、购物车内容等&#xff09;需要在服务端保持状态。…

一种新的一维时间序列信号盲解卷积算法(以旋转机械故障诊断为例,MATLAB环境)

一种新的一维时间序列信号盲解卷积算法&#xff08;以旋转机械故障诊断为例&#xff0c;MATLAB环境&#xff09;&#xff0c;可作为深度学习信号前处理过程&#xff0c;水个SCI不是问题。 机械设备的状态信号中往往蕴含着大量的设备异常信息。如何从繁多的机械状态信号中提取足…

CCAA质量管理【学习笔记】​​ 备考知识点笔记(五)质量设计方法与工具

第五节 质量设计方法与工具 1 任 务 分 解 法 1.1 概念 任务分解法&#xff0c;又称工作分解结构 (Work Breakdown Structure, 简 称 WBS) 。WBS 指以可交付成果为 导向&#xff0c;对项目团队为实现项目目标并完成规定的可交付成果而执行的工作所进行的层次分解。W…

程序员基本功之git的使用

阿里网盘-资料链接 文章目录 git分布式版本控制工具git的基本概念开发过程中的问题常见的版本控制工具git分布式版本控制工具特点git系统所定制的若干目标git的工作流程图 GIT的安装和常用命令**创建本地git仓库步骤****本地git仓库的使用**git log详解 git分布式版本控制工具…

SpringCloud跨服务远程调用

随着项目的使用者越来越多&#xff0c;项目承担的压力也会越来越大&#xff0c;为了让我们的项目能服务更多的使用者&#xff0c;我们不得不需要把我们的单体项目拆分成多个微服务&#xff0c;就比如把一个商城系统拆分成用户系统&#xff0c;商品系统&#xff0c;订单系统&…

MQTT TCP HTTP 协议对比

目录 1. 类型与用途 2. 通信模式与特性 3. 优缺点 4. 使用场景 MQTT、TCP和HTTP在类型、用途、通信模式、特性以及使用场景等方面存在显著的区别&#xff0c;以下是详细的阐述&#xff1a; 1. 类型与用途 MQTT&#xff1a;MQTT是一种消息传输协议&#xff0c;主要适用于物…

增加软件投入的重要性:提升自动化程度与用户界面设计的价值

一、引言 在许多项目中&#xff0c;硬件系统通常占据了大量预算&#xff0c;而对软件的投入相对较少。这种不平衡往往导致软件自动化程度低、操作不便、界面简陋&#xff0c;过多的人工干预不仅降低了工作效率&#xff0c;还影响了用户体验。特别是对于一些国家项目&#xff0…

matlab:数字信号处理过程(趋势+峰值+包络+滤波+FFT)仿真

数字信号处理 消除趋势找峰值求包络平滑fftclc;clear;close all; figure(1);clf; Fs = 200;%采样频率 T = 1/Fs; %采样周期 N = 300;

「C系列」C 结构体

文章目录 一、C 结构体1. 定义结构体2. 声明结构体变量3. 初始化结构体变量4. 访问结构体成员5. 结构体数组6. 结构体指针7. 结构体嵌套 二、C 如何使用结构体1. 定义结构体类型2. 声明结构体变量3. 初始化结构体变量4. 访问结构体成员5. 结构体指针6. 在函数中使用结构体7. 注…

docker run的复杂使用

sudo docker run -dit --namenmediacross1.x -v $HOME:$HOME -v /tmp/.X11-unix:/tmp/.X11-unix -v /dev/dri:/dev/dri --privileged --networkhost -e DISPLAYunix$DISPLAY -w $HOME nova:nmediacross1.x这个Docker命令相当复杂&#xff0c;包含了许多选项和参数&#xff0c;…

记录一次CTF图片拼图安装工具montage+gaps成功步骤以及踩坑全过程

安装图片拼接工具montage&#xff1a; 1.安装 使用pip install montage无法安装montage工具的师傅可以尝试下面的方法 #Debian apt-get install graphicsmagick-imagemagick-compat#Ubuntu apt-get install graphicsmagick-imagemagick-compat#Alpine apk add imagemagick6#…

安全相关的一些基础知识(持续更新)

目录 1. TRNG真随机数生成 2. 对称加密和非对称加密及其区别 3. Hash算法&#xff08;摘要算法&#xff09; 4. HTTPS、TLS、SSL、HTTP区别和关系 HTTPS的基本原理 5. PSS 1. TRNG真随机数生成 True Random Number Generator 在真随机数的生成里&#xff0c;把随机数的生…

编译rk3568 Android,rk3568_r-user和rk3568_r-userdebug的区别

在 Android 开发环境中,lunch 命令用于选择编译目标设备和构建类型。选择不同的构建类型(如 user 和 userdebug)会影响编译结果的特性和用途。以下是 rk3568_r-user 和 rk3568_r-userdebug 之间的主要区别: user 构建类型 (rk3568_r-user): 目标用户:最终用户(消费者)。…