目录
前置:
准备
代码:数据库交互部分
代码:生成前复权 日、周、月、季、年数据
前置:
1 未复权日数据获取,请查看 https://blog.csdn.net/m0_37967652/article/details/146435589 数据库使用PostgreSQL。更新日数据可以查看 https://blog.csdn.net/m0_37967652/article/details/146988667 将日数据更新到最新
2 权息数据,下载 t_exdividend.sql 文件
通过网盘分享的文件:t_exdividend.sql
链接: https://pan.baidu.com/s/17B1EiHcEYByfWSICqX1KNQ?pwd=4abg 提取码: 4abg
在命令行 postgresql安装目录的bin目录下执行
psql -U postgres -h 127.0.0.1 -p 5432 -d db_stock -f E:/temp005/t_exdividend.sql
E:/temp005/t_exdividend.sql 改成自己的文件目录
准备
1 从通达信中获取当前A股股票代码,存储到txt文件,一行一个股票代码
2 准备一个空目录,创建 day month quarter week year 目录
3 安装包
pip install pandas
pip install psycopg2
代码:数据库交互部分
这部分代码存储到 utils包目录下的 postgresql_utils01.py文件中
import psycopg2
import pandas as pddef connect_db():try:conn = psycopg2.connect(database='db_stock',user='postgres',password='',host='127.0.0.1',port=5432)except Exception as e:print(f'connection failed。{e}')else:return connpassdef query_multi_stock_daily(ticker_list:list)->list:ticker_list_str = '\',\''.join(ticker_list)ticker_list_str = '\''+ticker_list_str+'\''sql_str = f"select ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap from t_stock_daily where ticker in ({ticker_list_str});"conn = connect_db()cur = conn.cursor()cur.execute(sql_str)res = cur.fetchall()cur.close()conn.close()return resdef query_multi_exdiv(ticker_list:list)->list:ticker_list_str = '\',\''.join(ticker_list)ticker_list_str = '\'' + ticker_list_str + '\''sql_str = f"select ticker,exDate,perShareTransRadio,perCashDiv,allotmentRatio,allotmentPrice from t_exdividend where ticker in ({ticker_list_str});"conn = connect_db()cur = conn.cursor()cur.execute(sql_str)res = cur.fetchall()cur.close()conn.close()return res
代码:生成前复权 日、周、月、季、年数据
from concurrent.futures import ThreadPoolExecutor
import os
import pandas as pd
from utils import postgresql_utils01
'''
股票日数据使用
'''
def output_daiy_caculate(thread_num:int,stock_ticker_list:list):pre_dir =r'E:/temp006/'# 每10个处理下print(f'thread {thread_num}, {len(stock_ticker_list)}')try:interval = len(stock_ticker_list) // 10for i in range(0, interval + 1):if (i + 1) * 10 >= len(stock_ticker_list):node_ticker_list = stock_ticker_list[i * 10:]else:node_ticker_list = stock_ticker_list[i * 10:(i + 1) * 10]daily_res = postgresql_utils01.query_multi_stock_daily(node_ticker_list)exdiv_res = postgresql_utils01.query_multi_exdiv(node_ticker_list)df_d_dict = {}df_ex_dict = {}for one in daily_res:ticker = one[0]df = pd.DataFrame(data={'tradeDate': one[1],'openPrice': one[2],'highestPrice': one[3],'lowestPrice': one[4],'closePrice': one[5],'turnoverVol': one[6],'turnoverValue': one[7],'dealAmount': one[8],'turnoverRate': one[9],'negMarketValue': one[10],'marketValue': one[11],'chgPct': one[12],'PE': one[13],'PE1': one[14],'PB': one[15],'isOpen': one[16],'vwap': one[17]})df_d_dict[ticker] = dfpassfor one in exdiv_res:ticker = one[0]df = pd.DataFrame(data={'exDate': one[1],'perShareTransRadio': one[2],'perCashDiv': one[3],'allotmentRatio': one[4],'allotmentPrice': one[5]})df_ex_dict[ticker] = dfpassfin_df_dict = {}for ticker, daily in df_d_dict.items():daily = daily.loc[daily['isOpen'] == 1].copy()daily['o_date'] = pd.to_datetime(daily['tradeDate'])daily.sort_values(by='o_date', ascending=True, inplace=True)if ticker not in df_ex_dict:fin_df_dict[ticker] = dailycontinueex = df_ex_dict[ticker]ex['a'] = 1 / (1 + ex['perShareTransRadio'] + ex['allotmentRatio'])ex['b'] = (ex['allotmentRatio'] * ex['allotmentPrice'] - ex['perCashDiv']) / (1 + ex['perShareTransRadio'] + ex['allotmentRatio'])ex['o_date'] = pd.to_datetime(ex['exDate'])ex.sort_values(by='o_date', ascending=True, inplace=True)for i, row in ex.iterrows():exDate = row['exDate']daily.loc[daily['o_date'] < exDate, 'closePrice'] = daily['closePrice'] * row['a'] + row['b']daily.loc[daily['o_date'] < exDate, 'openPrice'] = daily['openPrice'] * row['a'] + row['b']daily.loc[daily['o_date'] < exDate, 'highestPrice'] = daily['highestPrice'] * row['a'] + row['b']daily.loc[daily['o_date'] < exDate, 'lowestPrice'] = daily['lowestPrice'] * row['a'] + row['b']fin_df_dict[ticker] = dailypassother_cols = ['tradeDate', 'openPrice', 'highestPrice', 'lowestPrice', 'closePrice', 'turnoverVol','turnoverValue', 'dealAmount', 'turnoverRate', 'negMarketValue', 'marketValue']for ticker, df in fin_df_dict.items():d_path = pre_dir + 'day' + os.path.sep + ticker + '.csv'df.to_csv(d_path, encoding='utf-8', index=False)# 开始计算并导出week month quarter year 数据week_group = df.resample('W-FRI', on='o_date')month_group = df.resample('ME', on='o_date')quarter_group = df.resample('QE', on='o_date')year_group = df.resample('YE', on='o_date')w_df = week_group.last()w_df['openPrice'] = week_group.first()['openPrice']w_df['lowestPrice'] = week_group.min()['lowestPrice']w_df['highestPrice'] = week_group.max()['highestPrice']w_df['turnoverVol'] = week_group.sum()['turnoverVol']w_df['turnoverValue'] = week_group.sum()['turnoverValue']w_df['dealAmount'] = week_group.sum()['dealAmount']w_df['turnoverRate'] = week_group.sum()['turnoverRate']m_df = month_group.last()m_df['openPrice'] = month_group.first()['openPrice']m_df['lowestPrice'] = month_group.min()['lowestPrice']m_df['highestPrice'] = month_group.max()['highestPrice']m_df['turnoverVol'] = month_group.sum()['turnoverVol']m_df['turnoverValue'] = month_group.sum()['turnoverValue']m_df['dealAmount'] = month_group.sum()['dealAmount']m_df['turnoverRate'] = month_group.sum()['turnoverRate']q_df = quarter_group.last()q_df['openPrice'] = quarter_group.first()['openPrice']q_df['lowestPrice'] = quarter_group.min()['lowestPrice']q_df['highestPrice'] = quarter_group.max()['highestPrice']q_df['turnoverVol'] = quarter_group.sum()['turnoverVol']q_df['turnoverValue'] = quarter_group.sum()['turnoverValue']q_df['dealAmount'] = quarter_group.sum()['dealAmount']q_df['turnoverRate'] = quarter_group.sum()['turnoverRate']y_df = year_group.last()y_df['openPrice'] = year_group.first()['openPrice']y_df['lowestPrice'] = year_group.min()['lowestPrice']y_df['highestPrice'] = year_group.max()['highestPrice']y_df['turnoverVol'] = year_group.sum()['turnoverVol']y_df['turnoverValue'] = year_group.sum()['turnoverValue']y_df['dealAmount'] = year_group.sum()['dealAmount']y_df['turnoverRate'] = year_group.sum()['turnoverRate']w_df = w_df.loc[:, other_cols].copy()m_df = m_df.loc[:, other_cols].copy()q_df = q_df.loc[:, other_cols].copy()y_df = y_df.loc[:, other_cols].copy()w_df.to_csv(pre_dir + 'week' + os.path.sep + ticker + '.csv', encoding='utf-8')m_df.to_csv(pre_dir + 'month' + os.path.sep + ticker + '.csv', encoding='utf-8')q_df.to_csv(pre_dir + 'quarter' + os.path.sep + ticker + '.csv', encoding='utf-8')y_df.to_csv(pre_dir + 'year' + os.path.sep + ticker + '.csv', encoding='utf-8')passpassexcept Exception as e:print(f"{thread_num} error {e}")finally:print(f"{thread_num} finished")print(f'{thread_num} ending...')passdef start_execute():with open('./stock_ticker.txt',mode='r',encoding='utf-8') as fr:contents = fr.read()stock_ticker_list = contents.split('\n')print(len(stock_ticker_list))thread_count = 5interval = len(stock_ticker_list)//thread_countif interval == 0:thread_count = 1params_list = []thread_num_list = []for i in range(0,thread_count):if i == thread_count-1:pre_list = stock_ticker_list[i*interval:]else:pre_list = stock_ticker_list[i*interval:i*interval+interval]thread_num_list.append(i)params_list.append(pre_list)with ThreadPoolExecutor() as executor:executor.map(output_daiy_caculate, thread_num_list,params_list)print('线程池任务分配完毕')passif __name__ == '__main__':start_execute()pass