Python 算法交易实验74 QTV200第二步(改): 数据清洗并写入Mongo

说明

之前第二步是打算进入Clickhouse的,实测下来有一些bug
在这里插入图片描述
可以看到有一些分钟数据重复了。简单分析原因:

  • 1 起异步任务时,还是会有两个任务重复的问题,这个在同步情况下是不会出现的
  • 2 数据库没有upsert模式。clickhouse是最近刚应用的库,我还没有完善其操作模式。

解决思路:

  • 1 既然采用了异步,就没有办法去控制其前置的依赖和顺序,否则就会退回到同步状态。而且从效率上,n次异步IO的cpu开销,可能也只相当于1次的同步开销。可以认为,异步是更轻松,但是更’粗心’的工作状态。所以在设计上,如果每次的操作都是“无害”的,那么就没问题。这里的数据同步任务,最重要的是不重不漏,所以只要能够确保数据不重不漏即可。
  • 2 每次负责crawl的worker不直接操作数据库是对的,这可以避免过多的数据库操作开销。在同步结果的队列中,每个周期执行一次Mongo操作是完全没问题的。同步队列中可以有一些冗余的数据,在整合数据时就删除了。剩余的部分,可以直接采用upsert的方式存入。

结论:使用Mongo作为第一个数据节点的持久化。

反思点:

  • 1 对于数据的集成,可能还是Mongo更合适。因为不必事先定义表结构,而且之前做了一些开发,Mongo的操作方式非常完善。擅长在记录级数据的复杂度操作。
  • 2 clickhouse更适合用于在我的数据系统中直接输出的数据,特别是空间数据,按UCS方式规范。擅长在块级别数据的效率操作。

内容

1 目标数据库准备

采用m4.24086

很巧,QTV102的数据也在这里,所以QTV200的数据可以继续放在这里 。

回顾一下WMongo的操作,有好一阵子没用了。

from Basefuncs import *
# analysis 
target_server = 'm4.24086'
# machine_name = 'm4'
machine_name = get_machine_name()# 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
try:target_w = from_pickle(target_server)color_print('【Loading target_w】from pickle')
except:w = WMongo('w')target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)to_pickle(target_w, target_server)

有一些设计是好的,只要给出目标服务器名称,对象就会自动寻找合适的连接方式(local、lan、wan)来完成连接,对应的连接保存为本地文件。之后可以考虑通过GlobalBuffer来简化判断,还有neo4j来存储和管理关系。

进入队列的字段名,不允许有 _msg_id 字段
Wmongo_v9000.012
设置当前连接 local
>>> Switching To Mymeta
设置当前连接 local
在CN001访问mymeta,通用
当前机器的名称: m4
1.当前使用的MongeAgent:http://172.17.0.1:24011/
2.Tier1:meta, Tier2:servers
3.ConnectionHash:e8d1bc791049988d89465d5ce24d993b
4.FilterDict:{'my_server_pkey': 'm4.24086'}
5.Limits:1
6.Sort:
7.Skip:0
>>> Hit Records
当前机器的局网: my.cn001
【I】目标服务的机器:m4, 目标服务的机器局网:my.cn001
【I】采用local方式连接目标主机
Wmongo_v9000.012
设置当前连接 local
获取已有连接
target connection hash: d35632b63b77f17d4d12808fb707cb1f
data save to pickle:  ./m4.24086.pkl

然后就可以通过对象操作了

target_w.cname_recs()
{'data': {'QTV102': {'log_monitor': 264276,'log_sniffer': 792827,'log_worker': 264276,'stats': 5177,'step1_mongo_in': 2895114,'step1_mongo_meta': 2895114,'step1_mongo_out': 2895114},'QTV102_Capital_Data': {'capital_daily': 32787},'QTV102_Model_Signal': {'log_monitor': 264277,'log_sniffer': 792827,'log_worker': 0,'stats': 5218,'step1_mongo_in': 2436678,'step1_mongo_meta': 0,'step1_mongo_out': 16860560},'QTV102_Strategy': {'strategy_online': 58,'trade_orders': 128,'trade_strategy': 23},'QuantData001': {'log_monitor': 264276,'log_sniffer': 792830,'log_worker': 264276,'stats': 5178,'step1_mongo_in': 460297,'step1_mongo_meta': 460297,'step1_mongo_out': 460297},'QuantData_510500': {'log_monitor': 264277,'log_sniffer': 792831,'log_worker': 264278,'stats': 5178,'step1_mongo_in': 657702,'step1_mongo_meta': 657702,'step1_mongo_out': 657702},'SmartQuant_512660': {'log_monitor': 264277,'log_sniffer': 792830,'log_worker': 0,'stats': 5219,'step1_mongo_in': 460297,'step1_mongo_meta': 0,'step1_mongo_out': 450098},'Strategy_512660': {'capitals': 261,'monthly_report': 66,'orders': 130,'slog': 430056,'summary_report': 124,'yearly_report': 8},'test_for_mongo_engine': {'user': 2}},'msg': 'ok','status': True}

很早前随便做的一版,看起来业务效果还是不错的。这部分内容,以后就不必放在mongo,在clickhouse里一个查询就好了。
在这里插入图片描述

仍然(在逻辑上)设置表的结构为:qtv200.market_data,需要的索引有:

  • 1 pid: 主键。这个是确定的主键,对后续的基础操作来说是必须的。
  • 2 UCS(shard、part、block、brick): 管理块级数据的键,在后续的块级任务来说非常重要。
  • 3 code: 业务筛选字段
  • 4 ts: 时间,排序字段

mongo方便之处就在于:当你的逻辑明确了,建立好索引,一切就好了

# 主键 pkey
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'pid')
# UCS
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'shard')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'part')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'block')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'brick')
# 业务
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'code')
# 排序
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'ts')Out[4]: {'data': {'ts_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}

在这里插入图片描述

改造1:修改获取最大最小值的部分 etl_worker

变的简单了,不需要关心数据库里有什么,只要把当前有重复的pid去掉就可以了

...data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']data_df2 =data_df1[keep_cols1].drop_duplicates(['pid'])output_df = data_df2output_data_listofdict = output_df.to_dict(orient='records')output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)for some_data_listofdict in output_data_listofdict2:qm.parrallel_write_msg(target_stream_name, some_data_listofdict)

在脚本里做相应修改

#conda init
conda activate basecd  /home/workers && python3 etl_worker_mongo.py

改造2:修改入库的部分 s2mongo

暂时先以脚本方式执行,不固化到接口中。

现在可以采用一些更好的方式来初始化队列。

from Basefuncs import * 
import logging
from logging.handlers import RotatingFileHandler
def get_logger(name , lpath = '/var/log/' ):logger = logging.getLogger(name)fpath = lpath + name + '.log'handler = RotatingFileHandler(fpath , maxBytes=100*1024*1024, backupCount=10)logger.addHandler(handler)logger.setLevel(logging.INFO)return loggerlogger = get_logger('etf_raw_data')# IO
machine_host = '192.168.0.4'
source_redis_agent_host = f'http://{machine_host}:24118/'stream_cfg = StreamCfg(q_max_len = 1000000, batch_size = 10000, redis_agent_host = source_redis_agent_host)
qm = QManager(**stream_cfg.dict())
# qm.info()# analysis 
target_server = 'm4.24086'
target_w = from_pickle(target_server)
# machine_name = 'm4'
# machine_name = get_machine_name()
# # 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
# try:
#     target_w = from_pickle(target_server)
#     color_print('【Loading target_w】from pickle')
# except:
#     w = WMongo('w')
#     target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)
#     to_pickle(target_w, target_server)
# target_w.cname_recs()# Name
ss_name = 'xxx'
t_tier1 = 'xxx'
t_tier2 = 'xxx'keep_cols =['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
num_cols = ['open','close','high','low', 'vol','amt','ts']
# Process 
## 1 read source -- 这里本身也可以用pydantic 
ss_data_list = qm.xrange(ss_name)['data']
if len(ss_data_list):ss_data_df0 = pd.DataFrame(ss_data_list)msg_id_list = list(ss_data_df0['_msg_id'])ss_data_df = ss_data_df0[keep_cols].dropna()for the_col in num_cols:ss_data_df[the_col] = ss_data_df[the_col].apply(float)# 写入mongoresp = target_w.insert_or_update_with_key(tier1 = t_tier1, tier2 = t_tier2, data_listofdict = ss_data_df.to_dict(orient='records'), key_name ='pid')qm.xdel(ss_name,msg_id_list)logger.info(get_time_str1() + 'efl_s2mongo insert recs %s' % len(ss_data_df))
else:logger.info(get_time_str1() + 'efl_s2mongo insert not recs')

以上,规定了几部分。

  • 1 IO部分。队列和数据的handler现在通过pydantic的对象,可以非常简洁的定义。然后约定好入队列和目标数据库表的必要信息。
  • 2 处理。主要就是将需要保留的字段,以及需要转数值的字段明确。然后就是读取,保留,转换,插入,最后删除。

在测试中,就一次的数据反复插了几次,数据是不会重复的。

在这里插入图片描述
对应的日志可以看到一开始插入过n次,后面加入了定时任务,然后就转入运行了

└─ $ cat /var/log/etf_raw_data.log
2024-06-29 18:38:27efl_s2mongo insert recs 12
2024-06-29 18:40:53efl_s2mongo insert recs 12
2024-06-29 18:41:31efl_s2mongo insert recs 12
2024-06-29 18:41:42efl_s2mongo insert recs 12
2024-06-29 18:43:27efl_s2mongo insert recs 12
2024-06-29 18:44:31efl_s2mongo insert recs 12
2024-06-29 18:46:49efl_s2mongo insert recs 12
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:31efl_s2mongo insert not recs
2024-06-29 18:48:01efl_s2mongo insert not recs
2024-06-29 18:48:31efl_s2mongo insert not recs
2024-06-29 18:49:01efl_s2mongo insert not recs
2024-06-29 18:49:02efl_s2mongo insert not recs
2024-06-29 18:49:31efl_s2mongo insert not recs
2024-06-29 18:49:32efl_s2mongo insert not recs
2024-06-29 18:50:01efl_s2mongo insert not recs
2024-06-29 18:50:02efl_s2mongo insert not recs
2024-06-29 18:50:31efl_s2mongo insert not recs
2024-06-29 18:50:32efl_s2mongo insert not recs
2024-06-29 18:51:01efl_s2mongo insert not recs
2024-06-29 18:51:02efl_s2mongo insert not recs
2024-06-29 18:51:31efl_s2mongo insert not recs
2024-06-29 18:51:32efl_s2mongo insert not recs
关于定时任务

我偷了个懒,就是把这脚本和etl脚本放在一起。这两个任务被绑在一起串行了。主要是懒的再去定一个定时任务。

└─ $ cat exe_qtv200_etl_worker.sh
#!/bin/bash# 记录
# sh /home/test_exe.sh com_info_change_pattern running# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh#conda init
conda activate basecd  /home/workers && python3 etl_worker_mongo.py
cd  /home/workers && python3 etf_raw_data_s2mongo.py

对于后续其他的etl,每一个还是应该另起一个任务,这样才能利用异步来确保多个etf的数据及时获取。

【调整完毕】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/36644.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql GROUP_CONCAT函数详解

文章目录 概要使用技巧1. 建表、插入数据2.以id分组,把age字段的值拼成一行,逗号分隔(默认)3.以id分组,把age字段的值拼成 一行,分号分隔4.以id分组,把去冗余的age字段的值打印在一行5.以id分组,把age字段的…

算法:链表题目练习

目录 链表的技巧和操作总结 常用技巧: 链表中的常用操作 题目一:反转一个单链表 题目二:链表的中间结点 题目三:返回倒数第k个结点 题目四:合并两个有序链表 题目五:移除链表元素 题目六&#xff…

利用LLM本身训练SoTA embedding模型

今天分享一篇Microsoft公司的一篇文章,Title: Improving Text Embeddings with Large Language Models:使用大语言模型改善文本嵌入。 这篇文章探索了直接利用LLM来做embedding模型,其只需要利用合成数据和少于1000次的训练步骤就能获得高质…

语言模型:文本表征词嵌入技术调研

1 文本表征 文本表征是自然语言处理中的关键部分,尤其在当前大模型快速发展的背景下。由于大模型存在知识有限、处理文本长度有限、保密要求和大模型幻觉等问题,结合外部数据显得尤为重要。 为了便于存储和检索,除了保存纯文本外&#xff0…

Debug 调试代码

我们使用 debug 的目的, 认为就是查看代码的执行过程的。 步骤: 1. 打断点 断点的意义是, debug 运⾏的时候, 代码会在断点处停下来不执行如果是想要查看代码的执行过程, 建议将断点放在第⼀行在代码 和 行号之间 点击,出现的红色圆点 就是断点, 再次点击可以取消 …

Webpack: 构建微前端应用

Module Federation 通常译作“模块联邦”,是 Webpack 5 新引入的一种远程模块动态加载、运行技术。MF 允许我们将原本单个巨大应用按我们理想的方式拆分成多个体积更小、职责更内聚的小应用形式,理想情况下各个应用能够实现独立部署、独立开发(不同应用甚…

apipost的安装和测试添加接口能否正常使用

1.进入官网,点击免费使用(我是windows 64位,选合适自己的配置) 2.开始安装 选仅为我安装——下一步 选择自己的安装目录——点安装 等待 运行——完成 3.apipost一些基本操作——实现添加内容 (1)新建接口…

《人人都是产品经理》:项目一图流

《人人都是产品经理》:项目一图流 项目一图流 项目一图流

FreeSWITCH 1.10.10 简单图形化界面22-JsSIP的demo测试并记录坑

FreeSWITCH 1.10.10 简单图形化界面22-JsSIP的demo测试 00 FreeSWITCH GUI界面预览01、安装FreeSWITCH GUI先看使用手册02. 使用手册在这里0、设置FreeSWITCH账号1、jssip的demo网站2、设置jssip账号并登录3、整理坑3.1 掉线问题3.11 解决3.2 呼叫问题13.21 解决13.3 呼叫问题2…

PAE:从潮流报告中提炼有效产品属性

本文将介绍PAE,一种用于包含 PDF格式的文本和图像的产品属性提取算法。目前大部分的方法侧重于从标题或产品描述中提取属性,或利用现有产品图像中的视觉信息。与之前的工作相比,PAE从潮流趋势报告的PDF文件中提取属性,提取的属性包…

spl实现循环计算

需求 需要对一批数据进行价格计算 这里面的一部分单价来自于历史记录,但是另外一部分的单价,需要边计算边存储 数据库结构 CREATE TABLE tbl_mix_trace_price (lot_id_out varchar(255) DEFAULT NULL COMMENT 产出,lot_id_in varchar(255) DEFAULT NULL…

谈一下MySQL的两阶段提交机制

文章目录 为什么需要两阶段提交?两阶段提交流程?两阶段提交缺点? 为什么需要两阶段提交? 为了保证事务的持久性和一致性,MySQL需要确保redo log和binlog的同步持久化。MySQL通过“两阶段提交”的机制来实现在事务提交…

小迪安全v2023 javaWeb项目

小迪安全v2023 javaWeb项目 文章目录 小迪安全v2023 javaWeb项目1. webgoat靶场1. 环境配置与docker操作 2. jwt令牌1. jwt 第四关 签名没验证空加密2. jwt 第五关 爆破签名密钥3. jwt 第八关 kid参数可控 1. webgoat靶场 1. 环境配置与docker操作 自行下载配置vmware的kali-…

《mysql篇》--查询(进阶)

目录 将查询结果作为插入数据 聚合查询 聚合函数 count sum group by子句 having 联合查询 笛卡尔积 多表查询 join..on实现多表查询 内连接 外连接 自连接 子查询 合并查询 将查询结果作为插入数据 Insert into 表2 select * from 表1//将表1的查询数据插入…

Linux开发讲课20--- QSPI

SPI 是英语 Serial Peripheral interface 的缩写,顾名思义就是串行外围设备接口,一种高速的,全双工,同步的通信总线,并且在芯片的管脚上只占用四根线,节约了芯片的管脚,为 PCB 的布局上节省空间…

Springcloud-消息总线-Bus

1.消息总线在微服务中的应用 BUS- 消息总线-将消息变更发送给所有的服务节点。 在微服务架构的系统中,通常我们会使用消息代理来构建一个Topic,让所有 服务节点监听这个主题,当生产者向topic中发送变更时,这个主题产生的消息会被…

多线程引发的安全问题

前言👀~ 上一章我们介绍了线程的一些基础知识点,例如创建线程、查看线程、中断线程、等待线程等知识点,今天我们讲解多线程下引发的安全问题 线程安全(最复杂也最重要) 产生线程安全问题的原因 锁(重要…

Hive笔记-6

6.2.8 聚合函数 1) 语法 count(*),表示统计所有行数,包含null值; count(某列),表示该列一共有多少行,不包含null值; max(),求最大值,不包含null,除非所有值都是null&a…

不同node版本的切换及其指定版本vue-cli脚手架下载

目录 一.清空本地已安装node.js版本 二.装nvm管理工具 三.安装指定node版本 四.使用nvm命令切换或删除指定node版本 五.在指定node版本下下载指定vue-cli脚手架 一.清空本地已安装node.js版本 1.按健winR弹出窗口,键盘输入cmd,然后敲回车。 2.输入…

win11 + ubuntu linux双系统:开机直接进入windows修复

https://zhuanlan.zhihu.com/p/666702893 这种 双系统直接进入win 的问题,应该属于引导坏了,即grub坏了。 原因:笔记本送修了,没拆掉硬盘,可能引导被售后搞坏了。 在win-磁盘管理中查看分区,linux的分区…