Python 算法交易实验74 QTV200第二步(改): 数据清洗并写入Mongo

说明

之前第二步是打算进入Clickhouse的,实测下来有一些bug
在这里插入图片描述
可以看到有一些分钟数据重复了。简单分析原因:

  • 1 起异步任务时,还是会有两个任务重复的问题,这个在同步情况下是不会出现的
  • 2 数据库没有upsert模式。clickhouse是最近刚应用的库,我还没有完善其操作模式。

解决思路:

  • 1 既然采用了异步,就没有办法去控制其前置的依赖和顺序,否则就会退回到同步状态。而且从效率上,n次异步IO的cpu开销,可能也只相当于1次的同步开销。可以认为,异步是更轻松,但是更’粗心’的工作状态。所以在设计上,如果每次的操作都是“无害”的,那么就没问题。这里的数据同步任务,最重要的是不重不漏,所以只要能够确保数据不重不漏即可。
  • 2 每次负责crawl的worker不直接操作数据库是对的,这可以避免过多的数据库操作开销。在同步结果的队列中,每个周期执行一次Mongo操作是完全没问题的。同步队列中可以有一些冗余的数据,在整合数据时就删除了。剩余的部分,可以直接采用upsert的方式存入。

结论:使用Mongo作为第一个数据节点的持久化。

反思点:

  • 1 对于数据的集成,可能还是Mongo更合适。因为不必事先定义表结构,而且之前做了一些开发,Mongo的操作方式非常完善。擅长在记录级数据的复杂度操作。
  • 2 clickhouse更适合用于在我的数据系统中直接输出的数据,特别是空间数据,按UCS方式规范。擅长在块级别数据的效率操作。

内容

1 目标数据库准备

采用m4.24086

很巧,QTV102的数据也在这里,所以QTV200的数据可以继续放在这里 。

回顾一下WMongo的操作,有好一阵子没用了。

from Basefuncs import *
# analysis 
target_server = 'm4.24086'
# machine_name = 'm4'
machine_name = get_machine_name()# 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
try:target_w = from_pickle(target_server)color_print('【Loading target_w】from pickle')
except:w = WMongo('w')target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)to_pickle(target_w, target_server)

有一些设计是好的,只要给出目标服务器名称,对象就会自动寻找合适的连接方式(local、lan、wan)来完成连接,对应的连接保存为本地文件。之后可以考虑通过GlobalBuffer来简化判断,还有neo4j来存储和管理关系。

进入队列的字段名,不允许有 _msg_id 字段
Wmongo_v9000.012
设置当前连接 local
>>> Switching To Mymeta
设置当前连接 local
在CN001访问mymeta,通用
当前机器的名称: m4
1.当前使用的MongeAgent:http://172.17.0.1:24011/
2.Tier1:meta, Tier2:servers
3.ConnectionHash:e8d1bc791049988d89465d5ce24d993b
4.FilterDict:{'my_server_pkey': 'm4.24086'}
5.Limits:1
6.Sort:
7.Skip:0
>>> Hit Records
当前机器的局网: my.cn001
【I】目标服务的机器:m4, 目标服务的机器局网:my.cn001
【I】采用local方式连接目标主机
Wmongo_v9000.012
设置当前连接 local
获取已有连接
target connection hash: d35632b63b77f17d4d12808fb707cb1f
data save to pickle:  ./m4.24086.pkl

然后就可以通过对象操作了

target_w.cname_recs()
{'data': {'QTV102': {'log_monitor': 264276,'log_sniffer': 792827,'log_worker': 264276,'stats': 5177,'step1_mongo_in': 2895114,'step1_mongo_meta': 2895114,'step1_mongo_out': 2895114},'QTV102_Capital_Data': {'capital_daily': 32787},'QTV102_Model_Signal': {'log_monitor': 264277,'log_sniffer': 792827,'log_worker': 0,'stats': 5218,'step1_mongo_in': 2436678,'step1_mongo_meta': 0,'step1_mongo_out': 16860560},'QTV102_Strategy': {'strategy_online': 58,'trade_orders': 128,'trade_strategy': 23},'QuantData001': {'log_monitor': 264276,'log_sniffer': 792830,'log_worker': 264276,'stats': 5178,'step1_mongo_in': 460297,'step1_mongo_meta': 460297,'step1_mongo_out': 460297},'QuantData_510500': {'log_monitor': 264277,'log_sniffer': 792831,'log_worker': 264278,'stats': 5178,'step1_mongo_in': 657702,'step1_mongo_meta': 657702,'step1_mongo_out': 657702},'SmartQuant_512660': {'log_monitor': 264277,'log_sniffer': 792830,'log_worker': 0,'stats': 5219,'step1_mongo_in': 460297,'step1_mongo_meta': 0,'step1_mongo_out': 450098},'Strategy_512660': {'capitals': 261,'monthly_report': 66,'orders': 130,'slog': 430056,'summary_report': 124,'yearly_report': 8},'test_for_mongo_engine': {'user': 2}},'msg': 'ok','status': True}

很早前随便做的一版,看起来业务效果还是不错的。这部分内容,以后就不必放在mongo,在clickhouse里一个查询就好了。
在这里插入图片描述

仍然(在逻辑上)设置表的结构为:qtv200.market_data,需要的索引有:

  • 1 pid: 主键。这个是确定的主键,对后续的基础操作来说是必须的。
  • 2 UCS(shard、part、block、brick): 管理块级数据的键,在后续的块级任务来说非常重要。
  • 3 code: 业务筛选字段
  • 4 ts: 时间,排序字段

mongo方便之处就在于:当你的逻辑明确了,建立好索引,一切就好了

# 主键 pkey
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'pid')
# UCS
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'shard')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'part')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'block')
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'brick')
# 业务
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'code')
# 排序
target_w.set_a_index(tier1 = 'qtv200' ,tier2 = 'market_data', idx_var = 'ts')Out[4]: {'data': {'ts_1': 'Not Existed and Created'}, 'msg': 'ok', 'status': True}

在这里插入图片描述

改造1:修改获取最大最小值的部分 etl_worker

变的简单了,不需要关心数据库里有什么,只要把当前有重复的pid去掉就可以了

...data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']data_df2 =data_df1[keep_cols1].drop_duplicates(['pid'])output_df = data_df2output_data_listofdict = output_df.to_dict(orient='records')output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)for some_data_listofdict in output_data_listofdict2:qm.parrallel_write_msg(target_stream_name, some_data_listofdict)

在脚本里做相应修改

#conda init
conda activate basecd  /home/workers && python3 etl_worker_mongo.py

改造2:修改入库的部分 s2mongo

暂时先以脚本方式执行,不固化到接口中。

现在可以采用一些更好的方式来初始化队列。

from Basefuncs import * 
import logging
from logging.handlers import RotatingFileHandler
def get_logger(name , lpath = '/var/log/' ):logger = logging.getLogger(name)fpath = lpath + name + '.log'handler = RotatingFileHandler(fpath , maxBytes=100*1024*1024, backupCount=10)logger.addHandler(handler)logger.setLevel(logging.INFO)return loggerlogger = get_logger('etf_raw_data')# IO
machine_host = '192.168.0.4'
source_redis_agent_host = f'http://{machine_host}:24118/'stream_cfg = StreamCfg(q_max_len = 1000000, batch_size = 10000, redis_agent_host = source_redis_agent_host)
qm = QManager(**stream_cfg.dict())
# qm.info()# analysis 
target_server = 'm4.24086'
target_w = from_pickle(target_server)
# machine_name = 'm4'
# machine_name = get_machine_name()
# # 在本地建立连接文件,避免每次都向mymeta请求数据。 随主机变化,这里有可能要修改(TryConnectionOnceAndForever)中关于mymeta的连接配置。
# try:
#     target_w = from_pickle(target_server)
#     color_print('【Loading target_w】from pickle')
# except:
#     w = WMongo('w')
#     target_w = w.TryConnectionOnceAndForever(server_name =target_server, current_machine_name = machine_name)
#     to_pickle(target_w, target_server)
# target_w.cname_recs()# Name
ss_name = 'xxx'
t_tier1 = 'xxx'
t_tier2 = 'xxx'keep_cols =['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
num_cols = ['open','close','high','low', 'vol','amt','ts']
# Process 
## 1 read source -- 这里本身也可以用pydantic 
ss_data_list = qm.xrange(ss_name)['data']
if len(ss_data_list):ss_data_df0 = pd.DataFrame(ss_data_list)msg_id_list = list(ss_data_df0['_msg_id'])ss_data_df = ss_data_df0[keep_cols].dropna()for the_col in num_cols:ss_data_df[the_col] = ss_data_df[the_col].apply(float)# 写入mongoresp = target_w.insert_or_update_with_key(tier1 = t_tier1, tier2 = t_tier2, data_listofdict = ss_data_df.to_dict(orient='records'), key_name ='pid')qm.xdel(ss_name,msg_id_list)logger.info(get_time_str1() + 'efl_s2mongo insert recs %s' % len(ss_data_df))
else:logger.info(get_time_str1() + 'efl_s2mongo insert not recs')

以上,规定了几部分。

  • 1 IO部分。队列和数据的handler现在通过pydantic的对象,可以非常简洁的定义。然后约定好入队列和目标数据库表的必要信息。
  • 2 处理。主要就是将需要保留的字段,以及需要转数值的字段明确。然后就是读取,保留,转换,插入,最后删除。

在测试中,就一次的数据反复插了几次,数据是不会重复的。

在这里插入图片描述
对应的日志可以看到一开始插入过n次,后面加入了定时任务,然后就转入运行了

└─ $ cat /var/log/etf_raw_data.log
2024-06-29 18:38:27efl_s2mongo insert recs 12
2024-06-29 18:40:53efl_s2mongo insert recs 12
2024-06-29 18:41:31efl_s2mongo insert recs 12
2024-06-29 18:41:42efl_s2mongo insert recs 12
2024-06-29 18:43:27efl_s2mongo insert recs 12
2024-06-29 18:44:31efl_s2mongo insert recs 12
2024-06-29 18:46:49efl_s2mongo insert recs 12
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:01efl_s2mongo insert not recs
2024-06-29 18:47:31efl_s2mongo insert not recs
2024-06-29 18:48:01efl_s2mongo insert not recs
2024-06-29 18:48:31efl_s2mongo insert not recs
2024-06-29 18:49:01efl_s2mongo insert not recs
2024-06-29 18:49:02efl_s2mongo insert not recs
2024-06-29 18:49:31efl_s2mongo insert not recs
2024-06-29 18:49:32efl_s2mongo insert not recs
2024-06-29 18:50:01efl_s2mongo insert not recs
2024-06-29 18:50:02efl_s2mongo insert not recs
2024-06-29 18:50:31efl_s2mongo insert not recs
2024-06-29 18:50:32efl_s2mongo insert not recs
2024-06-29 18:51:01efl_s2mongo insert not recs
2024-06-29 18:51:02efl_s2mongo insert not recs
2024-06-29 18:51:31efl_s2mongo insert not recs
2024-06-29 18:51:32efl_s2mongo insert not recs
关于定时任务

我偷了个懒,就是把这脚本和etl脚本放在一起。这两个任务被绑在一起串行了。主要是懒的再去定一个定时任务。

└─ $ cat exe_qtv200_etl_worker.sh
#!/bin/bash# 记录
# sh /home/test_exe.sh com_info_change_pattern running# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh#conda init
conda activate basecd  /home/workers && python3 etl_worker_mongo.py
cd  /home/workers && python3 etf_raw_data_s2mongo.py

对于后续其他的etl,每一个还是应该另起一个任务,这样才能利用异步来确保多个etf的数据及时获取。

【调整完毕】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/36644.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MT1568 学生成绩

题目 有3个学生,每个学生有3门课的成绩,从键盘输入数据,包括学号、姓名、三门课成绩,学号整型,姓名字符型,成绩实型,计算3门课程总平均成绩,以及平均分最高的学生信息。不考虑非法成…

【webrtc】webrtc 与h265

参考 webrtc 支持H265(三) 总结_webrtc h265-CSDN博客 enable-chromium-hevc-hardware-decoding/README.zh_CN.md at main StaZhu/enable-chromium-hevc-hardware-decoding GitHub 《WebRTC系列》实战 Web 端支持 h265 硬解_webrtc-streamer h265编码-CSDN博客 webrtc分块…

mysql GROUP_CONCAT函数详解

文章目录 概要使用技巧1. 建表、插入数据2.以id分组,把age字段的值拼成一行,逗号分隔(默认)3.以id分组,把age字段的值拼成 一行,分号分隔4.以id分组,把去冗余的age字段的值打印在一行5.以id分组,把age字段的…

算法:链表题目练习

目录 链表的技巧和操作总结 常用技巧: 链表中的常用操作 题目一:反转一个单链表 题目二:链表的中间结点 题目三:返回倒数第k个结点 题目四:合并两个有序链表 题目五:移除链表元素 题目六&#xff…

利用LLM本身训练SoTA embedding模型

今天分享一篇Microsoft公司的一篇文章,Title: Improving Text Embeddings with Large Language Models:使用大语言模型改善文本嵌入。 这篇文章探索了直接利用LLM来做embedding模型,其只需要利用合成数据和少于1000次的训练步骤就能获得高质…

语言模型:文本表征词嵌入技术调研

1 文本表征 文本表征是自然语言处理中的关键部分,尤其在当前大模型快速发展的背景下。由于大模型存在知识有限、处理文本长度有限、保密要求和大模型幻觉等问题,结合外部数据显得尤为重要。 为了便于存储和检索,除了保存纯文本外&#xff0…

Debug 调试代码

我们使用 debug 的目的, 认为就是查看代码的执行过程的。 步骤: 1. 打断点 断点的意义是, debug 运⾏的时候, 代码会在断点处停下来不执行如果是想要查看代码的执行过程, 建议将断点放在第⼀行在代码 和 行号之间 点击,出现的红色圆点 就是断点, 再次点击可以取消 …

Webpack: 构建微前端应用

Module Federation 通常译作“模块联邦”,是 Webpack 5 新引入的一种远程模块动态加载、运行技术。MF 允许我们将原本单个巨大应用按我们理想的方式拆分成多个体积更小、职责更内聚的小应用形式,理想情况下各个应用能够实现独立部署、独立开发(不同应用甚…

高项-信息系统工程知识要点

1、五大软件架构风格 数据流风格:批处理序列、管道/过滤器 ◆调用/返回风格:主程序X 子程序、数据抽象、面向对象、层次结构 ◆独立构件风格:进程通信、事件驱动 虚拟机风格:解释器、基于规则的系统 仓库风格:数据库系…

apipost的安装和测试添加接口能否正常使用

1.进入官网,点击免费使用(我是windows 64位,选合适自己的配置) 2.开始安装 选仅为我安装——下一步 选择自己的安装目录——点安装 等待 运行——完成 3.apipost一些基本操作——实现添加内容 (1)新建接口…

《人人都是产品经理》:项目一图流

《人人都是产品经理》:项目一图流 项目一图流 项目一图流

【Spring】springSecurity

1、概述 1.1定义和用途 Spring Security为基于Spring的企业应用系统提供声明式的安全访问控制解决方案。它提供了一组可以在Spring应用上下文中配置的Bean,充分利用了Spring IoC、DI(控制反转和依赖注入)和AOP(面向切面编程&…

FreeSWITCH 1.10.10 简单图形化界面22-JsSIP的demo测试并记录坑

FreeSWITCH 1.10.10 简单图形化界面22-JsSIP的demo测试 00 FreeSWITCH GUI界面预览01、安装FreeSWITCH GUI先看使用手册02. 使用手册在这里0、设置FreeSWITCH账号1、jssip的demo网站2、设置jssip账号并登录3、整理坑3.1 掉线问题3.11 解决3.2 呼叫问题13.21 解决13.3 呼叫问题2…

PAE:从潮流报告中提炼有效产品属性

本文将介绍PAE,一种用于包含 PDF格式的文本和图像的产品属性提取算法。目前大部分的方法侧重于从标题或产品描述中提取属性,或利用现有产品图像中的视觉信息。与之前的工作相比,PAE从潮流趋势报告的PDF文件中提取属性,提取的属性包…

ML307R OpenCPU HTTP使用

一、函数介绍 二、示例代码 三、代码下载地址 一、函数介绍 具体函数可以参考cm_http.h文件,这里给出几个我用到的函数 1、创建客户端实例 /*** @brief 创建客户端实例** @param [in] url 服务器地址(服务器地址url需要填写完整,例如(服务器url仅为格式示…

spl实现循环计算

需求 需要对一批数据进行价格计算 这里面的一部分单价来自于历史记录,但是另外一部分的单价,需要边计算边存储 数据库结构 CREATE TABLE tbl_mix_trace_price (lot_id_out varchar(255) DEFAULT NULL COMMENT 产出,lot_id_in varchar(255) DEFAULT NULL…

谈一下MySQL的两阶段提交机制

文章目录 为什么需要两阶段提交?两阶段提交流程?两阶段提交缺点? 为什么需要两阶段提交? 为了保证事务的持久性和一致性,MySQL需要确保redo log和binlog的同步持久化。MySQL通过“两阶段提交”的机制来实现在事务提交…

洛谷U420301题解

题解 方法一枚举 时间复杂度 O ( n 2 ) O(n^2) O(n2) 主要代码 int main() {int n,t;cin >> n;for(int i 1;i < n;i) cin >> a[i];cin >> t;for(int i 1;i < n;i){for(int j i 1;j < n;j){if(a[i] a[j] t){cout << "Yes"…

定个小目标之刷LeetCode热题(34)

15. 三数之和 给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请 你返回所有和为 0 且不重复的三元组。 注意&#xff1a;答案中不可以包含重复的三…

小迪安全v2023 javaWeb项目

小迪安全v2023 javaWeb项目 文章目录 小迪安全v2023 javaWeb项目1. webgoat靶场1. 环境配置与docker操作 2. jwt令牌1. jwt 第四关 签名没验证空加密2. jwt 第五关 爆破签名密钥3. jwt 第八关 kid参数可控 1. webgoat靶场 1. 环境配置与docker操作 自行下载配置vmware的kali-…