今天给大家一个利用qmt_trader交易策略,我现在实盘使用的系统是自己开发的,只需要把qmt_trader当中第三方库使用就可以,源代码开源开源直接下载 量化系统--开源强大的qmt交易系统,提供源代码
参考教程使用,下载当第三方库使用就开源
第一步登录qmt选择选择极简模式,独立交易也可以
把文件qmt_trader当中第三方库使用就可以
修改去qmt路径账户,参数就可以,在源代码后面修改
这个当然只是简单例子,实盘才是真的复杂,实盘很多源代码
我们测试改成True,测试一下程序,不然可能没有符合的
运行策略
测试程序没有问题test改成False进入实盘模式
运行程序
这里改成14和交易是一模一样的时间
进入24小时交易,3秒检查一次可以自己该
下单结果,当前时间不能委托
不懂的可以直接问我,备注入群可以家人我的量化学习群
当然也可以找我开qmt,我提供专业的服务
源代码
from qmt_trader.qmt_trader import qmt_trader
from qmt_trader.qmt_data import qmt_data
from qmt_trader.trader_tool import tdx_indicator
import schedule
from datetime import datetime
import time
import pandas as pd
class my_trader:
def __init__(self,path= r'D:/国金QMT交易端模拟/userdata_mini',
session_id = 123456,account='55011917',account_type='STOCK',
is_slippage=True,slippage=0.01,test=True):
'''
实时分钟T0策略利用实盘交易框架2.0
均线金叉买入死差卖出
'''
self.path=path
self.session_id=session_id
self.account=account
self.account_type=account_type
self.is_slippage=is_slippage
self.slippage=slippage
self.test=test
#避免下单失败多次下单
#买入记录
self.buy_stock_log=[]
#卖出记录
self.sell_stock_log=[]
#买入的目标金额
self.buy_max_value=10000
#卖出的目标金额
self.sell_max_value=0
#股票列表
self.code_list=['159937','159980','159985','159981']
#5分,特别提醒这个参数和获取数据的速度有关系默认是3秒一次数据,
# 如果是3秒5分钟就等于3*20*5,short_line=100,这个我后面检验一下
self.short_line=5
#10分钟
self.long_line=10
self.trader=qmt_trader(path=self.path,account=self.account,account_type=self.account_type,
is_slippage=self.is_slippage,slippage=self.slippage)
self.data=qmt_data()
#调整股票代码
self.stock_list=[]
for stock in self.code_list:
self.stock_list.append(self.trader.adjust_stock(stock=stock))
#订阅一分钟的数据,需要更快的话可以订阅tick数据
for stock in self.stock_list:
self.data.subscribe_quote(stock_code=stock,period='1m',start_time='20240101',
end_time='20500101',count=-1)
def connact(self):
'''
链接qmt
'''
try:
self.trader.connect()
print(self.trader.balance())
print(self.trader.position())
return True
except Exception as e:
print("运行错误:",e)
print('{}连接失败'.format('qmt'))
return False
def trader_func(self):
'''
交易函数
'''
#检查是否是交易时间
if self.trader.check_is_trader_date_1(trader_time=4,start_date=9,end_date=14,start_mi=0,jhjj='否'):
#读取订阅数据
df=self.data.get_market_data_ex(stock_list=self.stock_list,period='1m',
start_time='20240101',end_time='20500101',count=-1)
#解析数据
for stock in self.stock_list:
data=pd.DataFrame()
hist=df[stock]
data['date']=hist.index
data['close']=hist['close'].tolist()
data['short_line']=data['close'].rolling(self.short_line).mean()
data['long_line']=data['close'].rolling(self.long_line).mean()
#测试函数
data['test']=data['short_line']>data['long_line']
#金叉
if self.test:
#测试交易
gold_fork=data['test'].tolist()[-1]
else:
gold_fork=tdx_indicator.CROSS_UP(S1=data['short_line'],S2=data['long_line'])[-1]
#死叉
if self.test:
dead_fork=data['test'].tolist()[-1]
else:
dead_fork=tdx_indicator.CROSS_DOWN(S1=data['short_line'],S2=data['long_line'])[-1]
#买入
if gold_fork==True:
#买入
stock=self.trader.adjust_stock(stock=stock)
price=self.data.get_full_tick(code_list=[stock])[stock]['lastPrice']
stock=stock[:6]
trader_type,trader_amount,price=self.trader.order_target_value(stock=stock,price=price,value=self.buy_max_value)
if trader_type=='buy' and trader_amount>=10:
if stock not in self.buy_stock_log:
self.trader.buy(security=stock,amount=trader_amount,price=price)
print('{} 金叉 买入 股票{} 数量{} 价格{}'.format(datetime.now(),stock,trader_amount,price))
self.buy_stock_log.append(stock)
else:
print('{}已经买入'.format(stock))
elif trader_type=='sell' and trader_amount>=10:
self.trader.sell(security=stock,amount=trader_amount,price=price)
print('持有买多了平部分{} 卖出 股票{} 数量{} 价格{}'.format(datetime.now(),stock,trader_amount,price))
else:
print('{} 触发金叉{}执行买入不了'.format(datetime.now(),stock))
else:
print('{} 没有触发金叉{}'.format(datetime.now(),stock))
if dead_fork==True:
#卖出
stock=self.trader.adjust_stock(stock=stock)
price=self.data.get_full_tick(code_list=[stock])[stock]['lastPrice']
stock=stock[:6]
trader_type,trader_amount,price=self.trader.order_target_value(stock=stock,price=price,value=self.sell_max_value)
if trader_type=='sell' and trader_amount>=10:
if stock not in self.sell_stock_log:
self.trader.sell(security=stock,amount=trader_amount,price=price)
print('{} 死叉 卖出 股票{} 数量{} 价格{}'.format(datetime.now(),stock,trader_amount,price))
self.sell_stock_log.append(stock)
else:
print('{}已经卖出'.format(stock))
else:
print('{} 触发死叉{}执行卖出不了'.format(datetime.now(),stock))
else:
print('{} 没有触发死叉{}'.format(datetime.now(),stock))
else:
print('{}目前不少交易时间'.format(datetime.now()))
if __name__=='__main__':
trader=my_trader(path= r'D:/国金QMT交易端模拟/userdata_mini',
session_id = 123456,account='55011917',account_type='STOCK',
is_slippage=True,slippage=0.01,test=False)
trader.connact()
#3秒
schedule.every(0.05).minutes.do(trader.trader_func)
while True:
schedule.run_pending()
time.sleep(1)