import pandas as pd
import numpy as np
import re
import psycopg2
from sqlalchemy import create_engine
1. 连接数据库,下载所需数据
def download_sqlfile(sql_file):"""连接数据库,下载所需数据"""# 建立数据库连接con = psycopg2.connect(database="warehouse", user="XXXX", password="XXXXX", host="XX.XX.cn", port="5432")# 调用游标对象cur = con.cursor()# 执行语句,下载数据sql_command = "select * from %s"%sql_file # # 获取数据# 由于数据库首次运行容易丢失连接,需两次重复运行try:data = pd.read_sql(sql_command, con,params=(sql_file,))print('连接数据库成功')except:print('首次连接数据库失败,尝试第二次连接数据库')data = pd.read_sql(sql_command, con,params=(sql_file,))print('连接数据库成功')# 关闭数据库连接关闭游标cur.close()con.close()return data
读取示例
# 读取10月数据
sql_file_10 = 'staging_sy_data.bt_51job_2023102008_20231016143722'
df_10 = download_sqlfile(sql_file_10)
2. 数据上传到数据库
# 数据上传到数据库
def upload_table(table_name, table):"""将本地Dataframe存储到数据库中"""# 数据库连接信息 database="warehouse", user="XX", password="XXX", host="XX.XX.cn", port="5432")db_connection = {'host': 'XX.XX.cn','port': '5432','dbname': 'warehouse','user': 'XXX','password': 'XXXXX'}# 创建数据库引擎engine = create_engine('postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}'.format(**db_connection))# 将本地Dataframe存储到数据库中with engine.connect() as conn:print(bool(conn))table.to_sql(name = table_name, con = engine, schema = 'staging_data_sy') # 注意此处to_sql需要使用sqlalchemy 包的 create_engine;数据库链接只能用engine不能使用# table.to_sql(table_name, engine, if_exists='replace', index=False)print('表格上传成功')return None
数据上传示例:
# 将data存储到数据库中
table_name = 'bt_51job_chengdu_202310'
try:upload_table(table_name, df_chengdu)
except:print('第一次上传失败,正在进行第二次上传...')upload_table(table_name, df_chengdu)