import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from pylab import mpl
mpl.rcParams['font.sans-serif']=['SimHei']
mpl.rcParams['axes.unicode_minus']=False#获取交易日历
import datetime
def get_cal_date(start,end):dates= ak.tool_trade_date_hist_sina()dates['trade_date'] = dates['trade_date'].apply(lambda x:datetime.datetime.strptime(str(x),"%Y-%m-%d"))
# start = datetime.datetime.strptime(start,"%Y-%m-%d")
# end =datetime.datetime.strptime(end,"%Y-%m-%d")dates = dates.loc[(dates['trade_date']>=start) & (dates['trade_date']<=end)]return dates
#获取北向资金数据
def get_north_money(start,end):start = datetime.datetime.strptime(start,"%Y-%m-%d")end =datetime.datetime.strptime(end,"%Y-%m-%d")#获取交易日历dates=get_cal_date(start,end)#tushare限制流量,每次只能获取300条记录df= ak.stock_hsgt_north_net_flow_in_em(symbol="北上")df['value']=df['value'].astype(float)df['date']=df['date'].apply(lambda x:datetime.datetime.strptime(x,"%Y-%m-%d"))df = df.loc[(df['date']>=start) & (df['date']<=end)]return df
#获取指数数据
def get_index_data(code,start,end):start = datetime.datetime.strptime(start,"%Y-%m-%d")end =datetime.datetime.strptime(end,"%Y-%m-%d")index_df = ak.stock_zh_index_daily(symbol=code)index_df['date']=index_df['date'].apply(lambda x:datetime.datetime.strptime(str(x),"%Y-%m-%d"))index_df = index_df.loc[(index_df['date']>=start) & (index_df['date']<=end)]#index_df.index=pd.to_datetime(index_df.date)#index_df=index_df.sort_index()return index_df
#获取指数数据
#常用大盘指数
indexs={'上证综指': 'sh000001','深证成指': 'sz399001','沪深300': 'sh000300','创业板指': 'sz399006','上证50': 'sh000016','中证500': 'sh000905','中小板指': 'sz399005','上证180': 'sh000010'}
start='2014-11-17'
end='2022-08-12'
index_data=pd.DataFrame()
for name,code in indexs.items():index_data[name]=get_index_data(code,start,end)['close']
index_data.tail()
#累计收益
(index_data/index_data.iloc[0]).plot(figsize=(14,6))
plt.title('A股指数累积收益率\n 2014-2020',size=15)
plt.show()
#将价格数据转为收益率
all_ret=index_data/index_data.shift(1)-1
all_ret.tail()
north_data=get_north_money(start,end)
all_ret.reset_index()
#north_data.to_csv('north_data.csv')
#north_data=pd.read_csv('north_data',index_col=0,header=0)
all_data=pd.merge(all_ret,north_data,on='date')
all_data.rename(columns={'value':'北向资金'},inplace=True)
all_data.dropna(inplace=True)all_data.corr()
def North_Strategy(data,window,stdev_n,cost):'''输入参数:data:包含北向资金和指数价格数据window:移动窗口stdev_n:几倍标准差cost:手续费'''# 中轨df=data.copy().dropna()df['mid'] = df['北向资金'].rolling(window).mean()stdev = df['北向资金'].rolling(window).std()# 上下轨df['upper'] = df['mid'] + stdev_n * stdevdf['lower'] = df['mid'] - stdev_n * stdevdf['ret']=df.close/df.close.shift(1)-1df.dropna(inplace=True)#设计买卖信号#当日北向资金突破上轨线发出买入信号设置为1df.loc[df['北向资金']>df.upper, 'signal'] = 1#当日北向资金跌破下轨线发出卖出信号设置为0df.loc[df['北向资金']<df.lower, 'signal'] = 0df['position']=df['signal'].shift(1)df['position'].fillna(method='ffill',inplace=True)df['position'].fillna(0,inplace=True)#根据交易信号和仓位计算策略的每日收益率df.loc[df.index[0], 'capital_ret'] = 0#今天开盘新买入的position在今天的涨幅(扣除手续费)df.loc[df['position'] > df['position'].shift(1), 'capital_ret'] = \(df.close/ df.open-1) * (1- cost) #卖出同理df.loc[df['position'] < df['position'].shift(1), 'capital_ret'] = \(df.open / df.close.shift(1)-1) * (1-cost) # 当仓位不变时,当天的capital是当天的change * positiondf.loc[df['position'] == df['position'].shift(1), 'capital_ret'] = \df['ret'] * df['position']#计算标的、策略、指数的累计收益率df['策略净值']=(df.capital_ret+1.0).cumprod()df['指数净值']=(df.ret+1.0).cumprod()return df
def performance(df):df1 = df.loc[:,['ret','capital_ret']]# 计算每一年(月,周)股票,资金曲线的收益year_ret = df1.resample('A').apply(lambda x: (x + 1.0).prod() - 1.0)month_ret = df1.resample('M').apply(lambda x: (x + 1.0).prod() - 1.0)week_ret = df1.resample('W').apply(lambda x: (x + 1.0).prod() - 1.0)#去掉缺失值year_ret.dropna(inplace=True)month_ret.dropna(inplace=True)week_ret.dropna(inplace=True)# 计算策略的年(月,周)胜率year_win_rate = len(year_ret[year_ret['capital_ret'] > 0]) / len(year_ret[year_ret['capital_ret'] != 0])month_win_rate = len(month_ret[month_ret['capital_ret'] > 0]) / len(month_ret[month_ret['capital_ret'] != 0])week_win_rate = len(week_ret[week_ret['capital_ret'] > 0]) / len(week_ret[week_ret['capital_ret'] != 0])#计算总收益率、年化收益率和风险指标total_ret=df[['策略净值','指数净值']].iloc[-1]-1annual_ret=pow(1+total_ret,250/len(df1))-1dd=(df[['策略净值','指数净值']].cummax()-\df[['策略净值','指数净值']])/\df[['策略净值','指数净值']].cummax()d=dd.max()beta=df[['capital_ret','ret']].cov().iat[0,1]/df['ret'].var()alpha=(annual_ret['策略净值']-annual_ret['指数净值']*beta)exReturn=df['capital_ret']-0.03/250sharper_atio=np.sqrt(len(exReturn))*exReturn.mean()/exReturn.std()TA1=round(total_ret['策略净值']*100,2)TA2=round(total_ret['指数净值']*100,2)AR1=round(annual_ret['策略净值']*100,2)AR2=round(annual_ret['指数净值']*100,2)MD1=round(d['策略净值']*100,2)MD2=round(d['指数净值']*100,2)S=round(sharper_atio,2)#输出结果print (f'策略年胜率为:{round(year_win_rate*100,2)}%' )print (f'策略月胜率为:{round(month_win_rate*100,2)}%' )print (f'策略周胜率为:{round(week_win_rate*100,2)}%' )print(f'总收益率: 策略:{TA1}%,沪深300:{TA2}%')print(f'年化收益率:策略:{AR1}%, 沪深300:{AR2}%')print(f'最大回撤: 策略:{MD1}%, 沪深300:{MD2}%')print(f'策略Alpha: {round(alpha,2)}, Beta:{round(beta,2)},夏普比率:{S}')
#对策略累计收益率进行可视化
def plot_performance(df,name):d1=df[['策略净值','指数净值','signal']]d1[['策略净值','指数净值']].plot(figsize=(15,7))for i in d1.index:v=d1['指数净值'][i]if d1.signal[i]==1:plt.scatter(i, v, c='r')if d1.signal[i]==0:plt.scatter(i, v, c='g')plt.title(name+'—'+'北向资金择时交易策略回测',size=15)plt.xlabel('')ax=plt.gca()ax.spines['right'].set_color('none')ax.spines['top'].set_color('none')plt.show()
#将上述函数整合成一个执行函数
def main(code='sh000300',start='2015-12-08',end='2020-08-12',window=252,stdev_n=1.5,cost=0.01):hs300=get_index_data(code,start,end)north_data=get_north_money(start,end)result_df=pd.merge(hs300,north_data,on='date')#print(result_df)result_df=result_df.set_index('date')result_df.rename(columns={'value':'北向资金'},inplace=True)result_df=result_df[['close','open','北向资金']].dropna()df=North_Strategy(result_df,window,stdev_n,cost)name=list (indexs.keys()) [list (indexs.values()).index (code)]print(f'回测标的:{name}指数')#print(df.head())#df.set_index('date')startDate=df.index[0].strftime('%Y%m%d')print(f'回测期间:{startDate}—{end}')performance(df)plot_performance(df,name)
main(code='sh000300')