Python 算法交易实验76 QTV200日常推进

说明

最近实在太忙, 没太有空推进这个项目,我想还是尽量抽一点点时间推进具体的工程,然后更多的还是用碎片化的时间从整体上对qtv200进行设计完善。有些结构的问题其实是需要理清的,例如:

  • 1 要先基于原始数据进行描述性分析:采用「二阶导」来描述变化趋势
  • 2 先模式后模型:基于易于解释的数据生成模式,这样模式本身也是易于理解的
  • 3 UCS time loop: 按照统一时间轴进行统一的调度。这个非常有用,算是固化下来了。Python一些可能用的到的函数系列130 UCS-Time Brick

内容

1 增加源数据

增加ETF的行情数据跟踪

按照之前设计的方式,增加若干标的的数据跟踪。

按照ETF和执行秒数的差异,进行任务的参数化。本身其实没啥关系,本着不给别人服务器带来不必要的负担,按秒数分开会非常均匀。

...# 任务19:执行脚本-qtv200 159845 get 
task019 = {}
task019['machine'] = 'm4'
task019['task_id'] = 'task019'
task019['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取 159845 的数据。在秒1执行'
task019['pid'] = '.'.join([task019['machine'],task019['task_id']  ])
task019['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task019['set_to_status'] = 'running'
task019['running_status'] = ''
task019['start_dt'] = '2024-05-01 00:00:00'
task019['end_dt'] = '2099-06-01 00:00:00'
task019['task_kwargs'] = {'para_dict': {'url':'http://172.17.0.1:24104/exe_sh/','json_data':{'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 159845'}}}
task019['interval_para'] ={'second':'7','day_of_week':'0-4','hour':'9-16'}
task019 = TaskTable(**task019)
task019.save()# 任务20:执行脚本-qtv200 512690 get 
task020 = {}
task020['machine'] = 'm4'
task020['task_id'] = 'task020'
task020['description'] = '执行脚本,在周一到周五,上午9点到下午4点执行,获取 512690 的数据。在秒1执行'
task020['pid'] = '.'.join([task020['machine'],task020['task_id']  ])
task020['job_name'] = 'make_a_request' # 这个是对flask-aps来说的
task020['set_to_status'] = 'running'
task020['running_status'] = ''
task020['start_dt'] = '2024-05-01 00:00:00'
task020['end_dt'] = '2099-06-01 00:00:00'
task020['task_kwargs'] = {'para_dict': {'url':'http://172.17.0.1:24104/exe_sh/','json_data':{'the_cmd': 'bash /home/exe_etf_crawl_worker.sh 512690'}}}
task020['interval_para'] ={'second':'8','day_of_week':'0-4','hour':'9-16'}
task020 = TaskTable(**task020)
task020.save()

获取任务并进行发布

def exe_a_task(the_task_obj):the_task_fsm = FlaskAPSTask(transitions = transitions, wflask=wf, task_id = the_task_obj.task_id)the_task_fsm.action_rule(the_task_obj)current_task_status = the_task_fsm.get_a_task_status(task_id = the_task_obj.task_id)return the_task_obj.update(set__update_time=get_time_str1(), set__running_status =current_task_status)tasks = TaskTable.objects(machine='m4')for the_task_obj in  tasks:exe_a_task(the_task_obj)

这样就好了,周一就会自动更新。

2 存量数据

存量数据我从ricequant拿,这里其实涉及到不同数据源的融合问题。行情数据较为简单,我的方法是全量数据抓取后,按同样的方式处理,然后抽几条校验一下。

这不算是严格的数据校验,而严格的方法是我目前没法花时间去研究的。按以往的经验来来看,问题不大。

import pandas as pd
start_date = '2000-01-01'
end_date = '2099-12-31'# 真实取的是分钟
df = get_price('510300.XSHG',start_date=start_date,end_date=end_date,frequency='1m')
df1 = df.reset_index()
df1.to_csv('510300_all_data_20240706.csv', index=False)

在这里插入图片描述
可以看到 数据下载后进行转换。
一种比较好的方式是用pydantic。这相当于定义了一个数据模型,并尽可能的按设计进行校验和转换。

from typing import List, Tuple, Optional
from pydantic import BaseModel, Field, field_validator# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# Layer1(Dict) 
class MarketData(BaseModel):data_dt:stropen: floatclose:floathigh:floatlow:floatvol:floatamt:float@propertydef brick(self):return dt_str2ucs_blockname(self.data_dt)@propertydef block(self):return self.brick[:self.brick.rfind('.')]@propertydef part(self):return self.block[:self.block.rfind('.')]@propertydef shard(self):return self.part[:self.part.rfind('.')]def dict(self):data = {}data['data_dt']  = self.data_dtdata['open']  = self.opendata['close']  = self.closedata['high']  = self.highdata['low']  = self.lowdata['vol']  = self.voldata['amt']  = self.amtdata['brick']  = self.brickdata['block']  = self.blockdata['part']  = self.partdata['shard']  = self.shardreturn datamd = MarketData(**{'data_dt': '2012-05-28 09:31:00','open': 2.1121,'close': 2.1105,'high': 2.1146,'low': 2.1096,'vol': 50330003.0,'amt': 128372828.0})md.dict()
{'data_dt': '2012-05-28 09:31:00','open': 2.1121,'close': 2.1105,'high': 2.1146,'low': 2.1096,'vol': 50330003.0,'amt': 128372828.0,'brick': '2012.05.28.09','block': '2012.05.28','part': '2012.05','shard': '2012'}

不过之前没有按照这种方式实现,所以现在要确认一下当时爬取时的格式,然后输入队列即可。

之前crawl的核心字段映射与转换。

字段变量
行情data_dt,open,close,high,low,vol,amt
分类data_source, code , market
idrec_id
变换vol将转为股的单位
    if is_query_data:# ak的变量字典映射ak_dict = {}ak_dict['时间'] = 'data_dt'ak_dict['开盘'] = 'open'ak_dict['收盘'] = 'close'ak_dict['最高'] = 'high'ak_dict['最低'] = 'low'ak_dict['成交量'] = 'vol'ak_dict['成交额'] = 'amt'keep_cols = ['data_dt','open','close','high','low','vol','amt']cols = list(df.columns)new_cols = [ak_dict.get(x) or x for x in cols ]df.columns = new_colsdf1 = df[keep_cols]df1['data_source'] = 'AK'df1['code'] = etf_codedf1['market'] = 'SH'df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \+ '_' + df1['data_dt']# 调整股和手vol_approximal = df1['amt'] / df1['close']maybe_wrong = (vol_approximal / df1['vol']) > 10if maybe_wrong.sum() > 0:df1['vol'] = df1['vol'] * 100

抽取一条数据比对

AK的数据14:58分的close是3.461, vol 是4116900
在这里插入图片描述
RQ的数据也是这样的,数据一致
在这里插入图片描述
数据的唯一主键pid由etf 和时间戳共同构成,所以不会出现重复数据(即使数据源不同)。

所以接下来,将数据推入队列即可(稍微注意的是,入口队列长度为10万,应对增量是足够的),存量可以控制下断点续传

整理之后的代码

from Basefuncs import *# 1 read
df  = pd.read_csv('510300_all_20230911.csv')
etf_code = '510300'# 2 rename
df['data_dt'] = df['datetime'].apply(str)
df['open'] = df['open']
df['close'] = df['close']
df['high'] = df['high']
df['low'] = df['low']
df['vol'] = df['volume']
df['amt'] = df['total_turnover']keep_cols = ['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt']df1 = df[keep_cols]# 3 adjust vol
vol_approximal = df1['amt'] / df1['close']
maybe_wrong = (vol_approximal/df1['vol']) >10# 4 othercols 
df1['data_source'] = 'RQ'
df1['code'] = etf_code
df1['market'] = 'SH'
df1['rec_id'] = df1['data_source'] + '_' + df1['market'] + '_' + df1['code'].apply(str) \+ '_' + df1['data_dt']# 5 Q Operation
qm = QManager(redis_agent_host = 'http://192.168.0.4:24118/',redis_connection_hash = None)
stream_name = 'BUFF.andy.calnet.qtv200.stream_in'## data slice
data_listofdict = df1.to_dict(orient='records')
data_listofdict2 = slice_list_by_batch2(data_listofdict, 10000)start_id = 0# run until complete
for i, some_listofdict in enumerate(data_listofdict2):if i >= start_id:print('current index : ', i )tem_resp = qm.parrallel_write_msg(stream_name, some_listofdict)if tem_resp['status'] is False:breakstart_id = i 

这样就完成了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/42240.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浪潮信息元脑服务器支持英特尔®至强®6能效核处理器 展现强劲性能

如今,服务器作为数字经济的核心基础设施,正面临着前所未有的挑战和机遇。作为服务器领域的领军企业,浪潮信息始终站在行业前沿,不断推陈出新,以满足客户日益增长的需求。近日,浪潮信息再次展现技术实力&…

基于GWO-CNN-BiLSTM数据回归预测(多输入单输出)-灰狼优化算法优化CNN-BiLSTM

基于GWO-CNN-BiLSTM数据回归预测(多输入单输出)-灰狼优化算法优化CNN-BiLSTM 1.数据均为Excel数据,直接替换数据就可以运行程序。 2.所有程序都经过验证,保证程序可以运行。 3.具有良好的编程习惯,程序均包含简要注释。 获取方式 https:/…

筛选Github上的一些优质项目

每个项目旁都有标签说明其特点,如今日热捧、多模态、收入生成、机器人、大型语言模型等。 项目涵盖了不同的编程语言和领域,包括人工智能、语言模型、网页数据采集、聊天机器人、语音合成、AI 代理工具集、语音转录、大型语言模型、DevOps、本地文件共享…

Matplotlib 学习

知识点 1.plot():用于绘制线图和 散点图scatter() 函数:plot() 函数可以接受许多可选参数,用于控制图形的外观,例如:颜色: colorblue 控制线条的颜色。线型: linestyle-- 控制线条的样式,例如虚线。标记…

YoloV8改进策略:Block改进|轻量实时的重参数结构|最新改进|即插即用(全网首发)

摘要 本文使用重参数的Block替换YoloV8中的Bottleneck,GFLOPs从165降到了116,降低了三分之一;同时,map50-95从0.937涨到了0.947。 改进方法简单,只做简单的替换就行,即插即用,非常推荐&#xf…

C++_STL---list

list的相关介绍 list是可以在常数范围内在任意位置进行插入和删除的序列式容器,并且该容器可以前后双向迭代。 list的底层是带头双向循环链表结构,链表中每个元素存储在互不相关的独立节点中,在节点中通过指针指向其前一个元素和后一个元素。…

IDEA与通义灵码的智能编程之旅

1 概述 本文主要介绍在IDEA中如何安装和使用通义灵码来助力软件编程,从而提高编程效率,创造更大的个人同企业价值。 2 安装通义灵码 2.1 打开IDEA插件市场 点击IDEA的设置按钮,下拉选择Plugins,如下: 2.2 搜索通义灵码 在搜索框中输入“通义灵码”,如下: 2.3 安…

使用ifconfig命令获取当前服务器的内网IP地址

如何使用ifconfig命令获取当前服务器的内网IP地址呢? ifconfig eth0 | grep inet | awk {print $2}

什么是五级流水?银行眼中的“好流水”,到底是什么样的?

无论是按揭买房还是日常贷款,银行流水都是绕不开的一环。规划好你的流水,不仅能让你在申请贷款时更有底气,还可能帮你省下不少冤枉钱。今天,咱们就来一场深度剖析,聊聊如何在按揭贷款、个人经营抵押贷款前,…

代码随想录 数组部分+代码可在本地编译器运行

代码随想录 数组部分,代码可在本地编译器运行 文章目录 数组理论基础704.二分查找题目:思路二分法第一种写法二分法第二种写法 代码 27.移除元素题目:思路-双指针法代码 977.有序数组的平方题目思路-双指针代码 209.长度最小的子数组题目&am…

ChatGPT4深度解析:探索智能对话新境界

大模型chatgpt4分析功能初探 目录 1、探测目的 2、目标变量分析 3、特征缺失率处理 4、特征描述性分析 5、异常值分析 6、相关性分析 7、高阶特征挖掘 1、探测目的 1、分析chat4的数据分析能力,提高部门人效 2、给数据挖掘提供思路 3、原始数据&#xf…

科研绘图系列:R语言径向柱状图(Radial Bar Chart)

介绍 径向柱状图(Radial Bar Chart),又称为雷达图或蜘蛛网图(Spider Chart),是一种在极坐标系中绘制的柱状图。这种图表的特点是将数据点沿着一个或多个从中心向外延伸的轴来展示,这些轴通常围绕着一个中心点均匀分布。 特点: 极坐标系统:数据点不是在直角坐标系中展…

【后端面试题】【中间件】【NoSQL】MongoDB查询优化3(拆分、嵌入文档,操作系统)

拆分大文档 很常见的一种优化手段,在一些特定的业务场景中,会有一些很大的文档,这些文档有很多字段,而且有一些特定的字段还特别的大。可以考虑拆分这些文档 大文档对MongoDB的性能影响还是很大的,就我个人经验而言&…

ASCII码对照表【2024年汇总】

🍺ASCII相关文章汇总如下🍺: 🎈ASCII码对照表(255个ascii字符汇总)🎈🎈ASCII码对照表(Unicode 字符集列表)🎈🎈ASCII码对照表&#x…

Day05-04-持续集成总结

Day05-04-持续集成总结 1. 持续集成2. 代码上线目标项目 1. 持续集成 git 基本使用, 拉取代码,上传代码,分支操作,tag标签 gitlab 用户 用户组 项目 , 备份,https,优化. jenkins 工具平台,运维核心, 自由风格工程,maven风格项目,流水线项目, 流水线(pipeline) mavenpom.xmlta…

【瑞数补环境实战】某网站Cookie补环境与后缀分析还原

文章目录 1. 写在前面2. 特征分析3. 接口分析3. 补JS环境4. 补后缀参数 【🏠作者主页】:吴秋霖 【💼作者介绍】:擅长爬虫与JS加密逆向分析!Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走…

二分查找2

1. 山脉数组的峰顶索引&#xff08;852&#xff09; 题目描述&#xff1a; 算法原理&#xff1a; 根据题意我们可以将数组分为两个部分&#xff0c;一个部分是arr[mid-1]<arr[mid]&#xff0c;另一个部分为arr[mid-1]>arr[mid]&#xff0c;此时不难发现我们可以将二分…

Flink,spark对比

三&#xff1a;az 如何调度Spark、Flink&#xff0c;MR 任务 首先&#xff0c;使用java编写一个spark任务&#xff0c;定义一个类&#xff0c;它有main方法&#xff0c;里面写好逻辑&#xff0c;sparkConf 和JavaSparkContext 获取上下文&#xff0c;然后打成一个jar包&#xf…

数据结构——二叉树相关题目

1.寻找二叉树中数值为x的节点 //寻找二叉树中数值为x的节点 BTNode* TreeFind(BTNode* root, BTDataType x)//传过来二叉树的地址和根的地址&#xff0c;以及需要查找的数据 {if (root Null){return Null;}//首先需要先判断这个树是否为空&#xff0c;如果为空直接返回空if (…

【JavaWeb程序设计】JSP实现购物车功能

目录 一、结合之前所学的相关技术&#xff0c;编写代码实现以下购物车功能 1. 我实现的功能运行截图如下 &#xff08;1&#xff09;商品列表页面home.jsp &#xff08;2&#xff09;登录账号页面/未登录点击结账页面 &#xff08;3&#xff09;重新登录页面&#xff08;记…