Python处理Excel文件并与数据库匹配做拼接
需求:Python处理Excel中数据并于数据库交互匹配得到账号信息等其他操作
Python实现
import os
import pandas as pd
import pymssql
import warnings
import time
def extract_broadband_speed ( speed) : if pd. notnull( speed) and 'M' in str ( speed) : return str ( speed) . split( 'M' ) [ 0 ] + 'M' else : return ''
def concatenate_with_dash ( row) : product_type = row. get( '产品类型' ) workorder_type = row. get( '工单类型' ) access_type = row. get( '方式' ) broadband_speed = row. get( '速率提取' ) if workorder_type in [ '改' , '其他' ] : if product_type == '宽带' : return f" { product_type} - { broadband_speed} - { access_type} - { workorder_type} " else : return f" { product_type} - { workorder_type} " elif product_type == '宽带' : return f" { product_type} - { broadband_speed} - { access_type} - { workorder_type} 机" else : return f" { product_type} - { workorder_type} 机"
def clear_data_in_excel_files ( current_directory) : files = [ file for file in os. listdir( current_directory) if file . endswith( '.xls' ) or file . endswith( '.xlsx' ) ] for file in files: file_path = os. path. join( current_directory, file ) df = pd. read_excel( file_path) df = df. head( 0 ) df. to_excel( file_path, index= False , header= True ) print ( f"成功清空文件: { file } " ) print ( "成功清空所有 Excel 文件的除第一行表头外的数据" ) def main ( ) : start_time = time. time( ) print ( "程序开始时间:" , time. strftime( '%Y-%m-%d %H:%M:%S' , time. localtime( start_time) ) ) warnings. filterwarnings( 'ignore' ) server = '127.0.0.1' database = 'YD' username = 'sa' password = 'xyz@1234560' conn = pymssql. connect( server, username, password, database) sql_query = '''SELECT 地市, 人员名称, [账号]FROM [ZHB]''' data = pd. read_sql( sql_query, conn) data. rename( columns= { '人员名称' : '处理人' } , inplace= True ) current_directory = os. getcwd( ) files = [ file for file in os. listdir( current_directory) if file . endswith( '.xls' ) ] workorder_count = { } for file in files: file_path = os. path. join( current_directory, file ) df0 = pd. read_excel( file_path) df0[ '速率提取' ] = df0[ '速率' ] . apply ( extract_broadband_speed) df0[ '用户品质-NEW' ] = df0[ '速率提取' ] . apply ( lambda x: '千兆' if x == '1000M' else '普通品质' ) df0[ '产品工单类型合并' ] = df0. apply ( concatenate_with_dash, axis= 1 ) . str . replace( '装机' , '新装' ) df0[ '区域-修改' ] = df0[ '区域' ] . fillna( '城镇' ) . str . replace( '城市' , '城镇' ) . str . replace( '乡镇' , '城镇' ) df0[ '是否沿街-修改' ] = df0[ '沿街' ] . apply ( lambda x: '是' if pd. notnull( x) else '否' ) df0[ '开始时间' ] = df0[ '预约上门时间' ] . apply ( lambda x: str ( x) . split( ' ~ ' ) [ 0 ] . strip( ) if isinstance ( x, str ) else '' ) df0[ '结束时间' ] = df0[ '预约上门时间' ] . apply ( lambda x: str ( x) . split( ' ~ ' ) [ - 1 ] . strip( ) if isinstance ( x, str ) else '' ) print ( f"成功读取文件: { file } " ) for workorder_type in df0[ '产品类型' ] : workorder_count[ workorder_type] = workorder_count. get( workorder_type, 0 ) + 1 merged_df = pd. merge( df0, data[ [ '地市' , '处理人' , '账号' ] ] , on= [ '地市' , '处理人' ] , how= 'left' ) for idx, ( product_type, group_data) in enumerate ( merged_df. groupby( '产品类型' ) ) : print ( f"产品类型 { idx + 1 } : { product_type} " ) filtered_data = merged_df[ merged_df[ '产品类型' ] . isin( [ 'ZW' , 'TR' ] ) ] filtered_data. to_excel( "源文件/ZW_TR数据合并.xlsx" , index= False ) print ( "成功将产品类型为 ZW_TR数据合并.xlsx" ) product_types = [ '云' , '门铃' , '喇叭' , 'HM' ] hm_data = merged_df[ merged_df[ '产品类型' ] . isin( product_types) ] hm_data. to_excel( "源文件/HM_数据.xlsx" , index= False ) other_data = merged_df[ ~ merged_df[ '产品类型' ] . isin( [ 'ZW' , 'TR' , '云' , '门铃' , '喇叭' , 'HM' ] ) ] for product_type, group_data in other_data. groupby( '产品类型' ) : file_name = f"源文件/ { product_type} _数据.xlsx" group_data. to_excel( file_name, index= False ) print ( f"成功将产品类型为 { product_type} 的数据导出到文件 { file_name} " ) print ( "成功将数据库查询结果匹配并拆分业务导出为Excel文件" ) target_folder = '数据库字段/' clear_data_in_excel_files( target_folder) for file_name in os. listdir( target_folder) : file_path = os. path. join( target_folder, file_name) if file_name. endswith( '.xlsx' ) : source_file_path = os. path. join( '源文件/' , file_name) if os. path. isfile( source_file_path) : df_source = pd. read_excel( source_file_path) df_target = pd. read_excel( file_path) for source_col, target_col in [ ( '施工单编码' , '编码' ) , ( '施工单编码' , 'boss号' ) , ( '产品工单类型合并' , '工单标题' ) , ( '市' , '市' ) , ( '县' , '县' ) , ( '接入方式' , '接入方式' ) , ( '受理时间' , '受理时间' ) , ( '派单时间' , '派单时间' ) , ( '归档时间' , '归档时间' ) , ( '预约上门时间' , '前台预约时间' ) , ( '处理人' , '施工人员' ) , ( '宽带速率' , '宽带速率' ) , ( '宽带套餐资费' , '套餐信息' ) , ( '开始时间' , '预约上门时间' ) , ( '区域-修改' , '区域' ) , ( '是否沿街-修改' , '沿街商铺' ) , ( '用户品质-NEW' , '品质' ) , ] : if source_col in df_source. columns and target_col in df_target. columns: df_target[ target_col] = df_source[ source_col] if 'ZW_TR数据合并.xlsx' in source_file_path: if 'ZW资费' in df_source. columns and '信息' in df_target. columns: df_target[ '信息' ] = df_source[ 'ZW资费' ] df_target. to_excel( file_path, index= False ) print ( f"成功将字段复制到文件 { file_path} 中" ) print ( "产品类型总数:" ) for workorder_type, count in workorder_count. items( ) : print ( f" { workorder_type} : { count} " ) end_time = time. time( ) print ( "程序结束时间:" , time. strftime( '%Y-%m-%d %H:%M:%S' , time. localtime( end_time) ) ) run_time = end_time - start_timeprint ( "程序运行耗时:%0.2f" % run_time, "s" ) input ( "按任意键退出程序" ) if __name__ == "__main__" : main( )