目录
一、适合场景
二、开发过程说明
三、功能使用流程
四、代码
1、安装python依赖库
2、完整代码段
一、适合场景
无废话,CSV中有少量不合规数据需要手工处理可利用此方法,适合有点经验的程序员,可以不熟Python,思路还是要有一点
二、开发过程说明
本人不熟Python,chatGPT由于网络问题总是不太好用,利用了文心一言,辅助实现了相关代码
三、功能使用流程
看“main()”函数,
1、修改Mysql数据链接
2、定义好导入的文件和表对,文件都要在变量path目录下,格局需要修改,也可path放空,在map对里面直接写完整的文件名
3、导入
导入的过程:错误的行通过“异常”抓去出来,保存到-error.csv结尾的文件中,以便后续继续处理,原来导入的文件只留下正确的可导入的数据
四、代码
1、安装python依赖库
# 先要运行这个安装依赖的包
pip install pandas sqlalchemy pymysql cryptography
2、完整代码段
# 调整修复 2024-03-29 17:19# 先要运行这个安装依赖的包
# pip install pandas sqlalchemy pymysql cryptographyimport pandas as pd
import re
from sqlalchemy import create_engine
import csv
import osdef delete_line_from_file_and_backup(file_path, line_number, output_file): with open(file_path, 'r') as file: lines = file.readlines() if not os.path.isfile(output_file):isNewFile = Trueelse:isNewFile = False# 先备份记录with open(output_file, mode='a', newline='') as file:if isNewFile:file.writelines(lines[0])print('=========================')print(lines[0])file.seek(os.SEEK_END)file.writelines(lines[line_number])# 删除指定行数的行(注意Python的索引是从0开始的) del lines[line_number]# 将修改后的内容写回文件,可以选择写入新文件或覆盖原文件 with open(file_path, 'w') as file: file.writelines(lines) #取得导入出错的CSV文件的行号
def getErrorLineNo(s):# 使用正则表达式查找行号 line_number_match = re.search(r'in line (\d+)', s) # 如果找到了匹配项,提取数字 if line_number_match: line_number = int(line_number_match.group(1)) #print(line_number) # 输出: 13092 return line_numberelse: # 使用正则表达式查找行号 line_number_match = re.search(r'at row (\d+)', s) # 如果找到了匹配项,提取数字 if line_number_match: line_number = int(line_number_match.group(1)) #print(line_number) # 输出: 13092 return line_numberelse: #print("未找到行号")return 0
# 完成导入
def doImport(engine, path, file_to_table_map):# 循环遍历字典,读取CSV文件并将数据导入到对应的MySQL表中 for csv_file, table_name in file_to_table_map.items():# 循环将所有的导入异常全部抓去出来,保存到外面的CSV文件中# 将剩余的导入到数据库中,有问题的CSV文件,需要手工处理后再导入while True:try:# 使用函数导入CSV文件 print('正在导入数据到表:', table_name, " <--- ", path + csv_file)df = pd.read_csv(path + csv_file, sep='\t') df.to_sql(table_name, engine, if_exists='replace', index=False)print('导入完成:', table_name)break;except Exception as e:print('导入失败:', table_name, e)row_to_delete = getErrorLineNo(str(e))if row_to_delete >= 0:srcFile = path + csv_filebakFile = srcFile + '-error.csv'print('备份&删除&回写:', table_name, path + bakFile)delete_line_from_file_and_backup(srcFile, row_to_delete, bakFile)else:breakdef main():# 创建数据库连接引擎 engine = create_engine('mysql+pymysql://root:123456@localhost:3306/hjyb',) # 格式:'文件名': '表名'file_to_table_map = {# 'tb_a.csv': 'tb_a',# 'tb_b.csv': 'tb_b',# 'tb_c.csv': 'tb_c',# 'tb_d.csv': 'tb_d'}# 测试数据目录path = '/Users/zhengjie/working/xx系统/测试数据/'doImport(engine, path, file_to_table_map)if __name__ == '__main__':main()