import os
import zipfile
import pandas as pddef merge_csv_files(zip_folder, output_folder):# 确保输出文件夹存在if not os.path.exists(output_folder):os.makedirs(output_folder)# 遍历指定路径下的所有zip文件for zip_file in os.listdir(zip_folder):if zip_file.endswith('.zip'):zip_path = os.path.join(zip_folder, zip_file)temp_folder = os.path.join(output_folder, 'temp')with zipfile.ZipFile(zip_path, 'r') as zip_ref:zip_ref.extractall(temp_folder)for csv_file in os.listdir(temp_folder):if csv_file.endswith('.csv'):csv_path = os.path.join(temp_folder, csv_file)# 通过尝试不同的编码方式来解决编码问题try:# 尝试使用utf-8编码读取CSV文件df = pd.read_csv(csv_path, encoding='utf-8')except UnicodeDecodeError:# 如果utf-8解码失败,尝试使用latin1编码df = pd.read_csv(csv_path, encoding='latin1')merge_csv(df, csv_file, output_folder)clean_temp_folder(temp_folder)def merge_csv(df, csv_file, output_folder):output_path = os.path.join(output_folder, os.path.basename(csv_file))if os.path.exists(output_path):existing_df = pd.read_csv(output_path)merged_df = pd.concat([existing_df, df], ignore_index=True)merged_df.to_csv(output_path, index=False)else:df.to_csv(output_path, index=False)def clean_temp_folder(temp_folder):# 删除临时文件夹及其内容for file in os.listdir(temp_folder):file_path = os.path.join(temp_folder, file)if os.path.isfile(file_path):os.remove(file_path)elif os.path.isdir(file_path):clean_temp_folder(file_path)os.rmdir(temp_folder)# 指定输入和输出文件夹
zip_folder_path = '/home/philtell/data/'
output_folder_path = '/home/philtell/data/test'# 执行合并操作
merge_csv_files(zip_folder_path, output_folder_path)
功能增加,支持解压中文,同时支持所在行筛选
import os
import zipfile
import pandas as pddef merge_csv_files(zip_folder, output_folder):if not os.path.exists(output_folder):os.makedirs(output_folder)for zip_file in os.listdir(zip_folder):if zip_file.endswith('.zip'):zip_path = os.path.join(zip_folder, zip_file)temp_folder = os.path.join(output_folder, 'temp')with zipfile.ZipFile(zip_path, 'r') as zip_ref:zip_ref.extractall(temp_folder)for csv_file in os.listdir(temp_folder):if csv_file.endswith('.csv'):csv_path = os.path.join(temp_folder, csv_file)# 读取CSV文件时指定GBK编码df = pd.read_csv(csv_path, encoding='gbk')# 保留第七列中文内容为"离线"的行df = df[df.iloc[:, 6] == "离线"]merge_csv(df, csv_file, output_folder)clean_temp_folder(temp_folder)def merge_csv(df, csv_file, output_folder):output_path = os.path.join(output_folder, os.path.basename(csv_file))if os.path.exists(output_path):existing_df = pd.read_csv(output_path, encoding='utf-8')merged_df = pd.concat([existing_df, df], ignore_index=True)merged_df.to_csv(output_path, index=False, encoding='utf-8')else:df.to_csv(output_path, index=False, encoding='utf-8')def clean_temp_folder(temp_folder):for file in os.listdir(temp_folder):file_path = os.path.join(temp_folder, file)if os.path.isfile(file_path):os.remove(file_path)elif os.path.isdir(file_path):clean_temp_folder(file_path)os.rmdir(temp_folder)# 指定输入和输出文件夹
zip_folder_path = '/home/philtell/data/'
output_folder_path = '/home/philtell/data/test2'
# 执行合并操作
merge_csv_files(zip_folder_path, output_folder_path)