Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse

说明

先检查一下昨天启动的worker是否正常工作,然后做一些简单的清洗,存入clickhouse。

内容

1 检查数据

from Basefuncs import * 
# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
target_stream_name = 'xxx'
qm.stream_len(target_stream_name)
2804

获取数据(使用单worker,模式比较简单且性能足够)

data = qm.xrange(target_stream_name)['data']
data_df = pd.DataFrame(data)
keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前无关的数据删掉
data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']

在这里插入图片描述
向clickhouse发起query,请求每个etf的最大时间,之后要使得新增的数据大于这个时间,另外目标表的字段形如
在这里插入图片描述
这是之前做的设计,因为隔的时间有点久都有点忘了。不过这个设计是合理的,后面会看到。

要做的转换也很简单:

  • 1 将时间字符转为时间戳
  • 2 从日期中分解出shard、part、block和brick

转换段

import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
data_df2 =data_df1[keep_cols1]

在这里插入图片描述

今天就到这里吧,明晚接着写。

Go on …

昨天疏忽了,数据不应该直接存库,而是应该整理好之后送到队列。然后由默认的worker将数据搬到clickhouse.

2 存数规则

第二步的输入队列BUFF.xxxstream_in,输出队列BUFF.xxx.stream_out
第一次需要确保对应数据表的存在。clickhouse对数值的要求比较严格,为了避免麻烦,统一设置成Float32。(这样可以用统一的同步worker)。另外clickhouse不支持删除数据,这点倒是比较特别。
在这里插入图片描述
但可以支持全部删除数据(保留数据结构) TRUNCATE table market_data_v2

create_table_sql = '''
CREATE TABLE market_data_v2
(data_dt String,open Float32,close Float32,high Float32,low Float32,vol Float32,amt Float32,brick String,block String,part String,shard String,code String,ts Float32,pid String
)
ENGINE = MergeTree
ORDER BY (ts )
'''click_para = gb.getx('sp_global.buffer.lan.xxx.xxx.para')
chc = CHClient(**click_para)
chc._exe_sql(create_table_sql)
chc._exe_sql('show tables')
[('market_data',), ('market_data_v2',)]

etl_worker.py

# 0 记录日志
import logging
from logging.handlers import RotatingFileHandlerlogger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)# ---------------------------------------- 设置日志from Basefuncs import * 
def tuple_list2dict(tuple_list):"""将包含三个元素的tuple列表转换为字典。参数:tuple_list (List[Tuple[K, V1, V2]]): 包含键和两个值的tuple的列表。返回:Dict[K, Tuple[V1, V2]]: 转换后的字典,其中值是包含两个元素的tuple。"""return {key:value1 for key, value1 in tuple_list}# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# ---------------------------------------- 基本函数# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
source_stream_name ='stream_in'
target_stream_name ='stream_out'
source_stream_len =  qm.stream_len(source_stream_name)
target_stream_len = qm.stream_len(target_stream_name)
print('source',source_stream_len)
print('target', target_stream_len)
# qm.ensure_group(target_stream_name)
cur_dt_str = get_time_str1()
if source_stream_len:is_source_recs = True
else:is_source_recs = Falselogger.info('%s %s source No Recs' %(cur_dt_str,'etl_worker'))
# 获取数据(使用单worker,模式比较简单且性能足够)# ---------------------------------------- 队列取数,有数据才执行下面
if is_source_recs:# ---------------------------------------- 取数,取出消息列表和需要的列# worker 30 秒启动一次data = qm.xrange(source_stream_name)['data']data_df = pd.DataFrame(data)msg_id_list = list(data_df['_msg_id'])keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前无关的数据删掉# data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']data_df2 =data_df1[keep_cols1]# ------------------------------------- 获取当前数据库已有的数据# 获取各code最大值click_para = {'database': 'xx','host': '192.168.0.4','name': 'xx','password': 'xx','port': xxx,'user': 'xx'}chc = CHClient(**click_para)'''这个 SQL 语句的作用是按照 `code` 分组,并为每个 `code` 找到对应的最新日期(`data_dt`),这个最新日期是基于 `ts` 字段的最大值来确定的。`argMax` 函数在这里用于找到每个分组中 `ts` 值最大时对应的 `data_dt` 值。具体来说,`argMax(data_dt, ts)` 会返回每个 `code` 分组中使得 `ts` 达到最大值的 `data_dt` 值。这意味着对于每个 `code`,查询会找到 `ts` 字段的最大值,并返回对应的 `data_dt` 值,即每个 `code` 的最新数据日期。最终,这个查询会返回一个结果集,其中包含每个 `code` 以及对应的最新数据日期(`last_data_dt`)。这对于分析每个代码的最新市场数据非常有用。'''latest_sql = '''SELECTcode,argMax(data_dt, ts) AS last_data_dtFROMmarket_data_v2GROUP BYcode'''# 更新时latest_date_tuple_list = chc._exe_sql(latest_sql)latest_date_dict = tuple_list2dict(latest_date_tuple_list)# ------------------------------------- 使用时间进行过滤# 筛选新数据data_df2['existed_dt'] = data_df2['code'].map(latest_date_dict).fillna('')output_sel = data_df2['data_dt'] > data_df2['existed_dt']output_df = data_df2[output_sel][keep_cols1]output_data_listofdict = output_df.to_dict(orient='records')output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)for some_data_listofdict in output_data_listofdict2:qm.parrallel_write_msg(target_stream_name, some_data_listofdict)del_msg = qm.xdel(source_stream_name, msg_id_list)logger.info('%s %s del source %s Recs' %(cur_dt_str,'etl_worker',del_msg['data'] ))

将该脚本发布为任务,30秒执行一次同步。

exe_qtv200_etl_worker.sh

#!/bin/bash# 记录
# sh /home/test_exe.sh com_info_change_pattern running# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh#conda init
conda activate basecd  /home/workers && python3 etl_worker.py

存数成功,后续就自动运行了。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/35573.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode】七、树、堆、图

文章目录 1、树结构2、二叉树3、二叉树的遍历4、堆结构(Heap)5、堆化6、图 1、树结构 节点、根节点、叶子节点: 高度、深度、层三者的示意图: 2、二叉树 相比其他树,二叉树即每个节点最多两个孩子(两个分…

Linux高级编程——进程

1.进程的含义? 进程是一个程序执行的过程,会去分配内存资源,cpu的调度 PID, 进程标识符 当前工作路径 chdir umask 0002 进程打开的文件列表 文件IO中有提到 (类似于标准输入 标准输出的编号,系统给0,1&#xf…

【UE5.3】笔记5-蓝图类

什么是蓝图类:其实就是C类,只不过是UE封装好的且可以直接拖出来可视化使用。 如何创建蓝图类?蓝图类有哪些? 蓝图类分为基于关卡的,基于Actor的,基于组件Component的。 基于关卡的蓝图类 一个关卡只能有…

涉案财物管理系统|DW-S405系统实现涉案财物科学化管理

随着社会的不断发展,犯罪形式日益复杂,涉案财物的种类和数量也不断增加。传统的涉案财物管理方式已经无法满足现代执法办案的需求。因此,建立一套科学、高效、规范的警用涉案财物管理系统成为公安机关亟待解决的问题。 涉案财物管理系统DW-S…

最近在读《谁说菜鸟不会数据分析 SPSS篇》pdf分享

谁说菜鸟不会数据分析 SPSS篇 《谁说菜鸟不会数据分析(SPSS篇)》继续采用职场三人行的方式来构建内容,细致梳理了准专业数据分析的常见问题,并且挑选出企业实践中最容易碰到的案例,以最轻松直白的方式来讲好数据分析的…

【Android面试八股文】你来说一说Looper、Handler、线程间的关系,一个线程可以有几个Looper可以对应几个Handler?

文章目录 Looper相关Handler相关总结结论:一个线程可以只能创建一个Looper,但是可以创建任意多个handler 对象。 Looper相关 Looper的创建是通过在线程中执行Looper.prepare()方法创建,那么这个方法到底做了什么呢?请看下面的代码: public static void prepare() {prepa…

智能卡与存储卡的静电保护方案

SIM卡和存储卡,如SD卡、MMC卡等,在现代电子设备中扮演着核心角色,存储着用户的身份信息、个人数据和多媒体文件,是设备正常运行的基石。然而,静电放电(Electrostatic Discharge, ESD)对这些卡片…

Spring相关面试题(二)

13.Spring IOC容器的加载过程 四个形态 概念态---<Bean信息>---> 定义态----BeanDefinition---->纯净态----DI---->成熟态 对应Bean的生命周期 实例化 --> 属性注入 ---> 实例化 概念态---->定义态 实例化一个ApplicationContext的对象 调用bean…

51单片机看门狗定时器配置

测试环境 单片机型号&#xff1a;STC8G1K08-38I-TSSOP20&#xff0c;其他型号请自行测试&#xff1b; IDE&#xff1a;KEIL C51&#xff1b; 寄存器配置及主要代码 手册中关于看门狗的寄存器描述如下&#xff1a; 启动看门狗&#xff0c;需将B5位EN_WDT置1即可&#xff0c;…

postgresql.conf配置详解

postgresql.conf配置详解 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; postgresql.conf是PostgreSQL数据库的主要配置文件之一&#xff0c;它包含了各种配置…

ScheduledThreadPoolExecutor和时间轮算法比较

最近项目中需要用到超时操作&#xff0c;对于不是特别优秀的timer和DelayQueue没有看。 Timer 是单线程模式。如果某个 TimerTask 执行时间很久&#xff0c;会影响其他任务的调度。Timer 的任务调度是基于系统绝对时间的&#xff0c;如果系统时间不正确&#xff0c;可能会出现…

调和映照理论简介

调和映照理论 调和映照理论&#xff08;Harmonic Mapping Theory&#xff09;是数学中的一个重要分支&#xff0c;研究调和映照&#xff08;Harmonic Mapping&#xff09;及其性质。调和映照是指保持某种特定性质&#xff08;通常是调和性&#xff09;的映射&#xff0c;它在几…

Golang的Work Stealing机制

Go的运行时系统使用了一种名为Work Stealing&#xff08;工作窃取&#xff09;的调度策略来分配Goroutine到可用线程&#xff08;称为M&#xff0c;即Machine&#xff09;上执行。这样可以最大化CPU使用率&#xff0c;减少任务调度的开销。在这种机制下&#xff0c;任务队列和调…

STL中的迭代器模式:将算法与数据结构分离

目录 1.概述 2.容器类 2.1.序列容器 2.2.关联容器 2.3.容器适配器 2.4.数组 3.迭代器 4.重用标准迭代器 5.总结 1.概述 在之前&#xff0c;我们讲了迭代器设计模式&#xff0c;分析了它的结构、角色以及优缺点&#xff1a; 设计模式之迭代器模式-CSDN博客 在 STL 中&a…

Open AI限制来袭?用上这个工具轻松破局!

【导语】近日&#xff0c;AI领域掀起了一场不小的波澜。Open AI宣布&#xff0c;从7月9日起&#xff0c;将对部分地区的开发者实施API调用限制。这一消息对于许多依赖Open AI技术的国内初创团队来说&#xff0c;无疑是一个沉重的打击。 对于这些团队而言&#xff0c;Open AI的A…

FITC-胰岛素的荧光特性与稳定性-星戈瑞

在生物医学研究领域&#xff0c;荧光标记技术是一种实验手段&#xff0c;能够实现对生物分子的可视化追踪和定量分析。其中&#xff0c;FITC-胰岛素作为一种结合了荧光素异硫氰酸酯&#xff08;FITC&#xff09;与胰岛素的荧光标记物&#xff0c;在糖尿病研究、药物开发以及细胞…

关于摄像头模组中滤光片的介绍

1、问题背景 红外截止滤光片&#xff08;IR CUT Filter&#xff09;是应用在摄像头模组中非常重要的一个器件&#xff0c;因人眼与 coms sensor 对光线各波长的响应不同&#xff0c; 人眼看不到红外光&#xff0c;但 sensor 能感应到&#xff08;如下图是某sensor在各波长下的…

使用 SwiftUI 为 macOS 创建类似于 App Store Connect 的选择器

文章目录 前言创建选择器组件使用选择器组件总结前言 最近,我一直在为我的应用开发一个全新的界面,它可以让你查看 TestFlight 上所有可用的构建,并允许你将它们添加到测试群组中。 作为这项工作的一部分,我需要创建一个组件,允许用户从特定构建中添加和删除测试群组。我…

IOS Swift 从入门到精通:从 JSON 文件加载数据

文章目录 常见问题解答数据模型JSON 数据验证 JSON解码 JSON编写 FAQRow 代码添加状态栏背景模糊将内容添加到 FAQView常见问题解答数据模型 此 FAQ 模型符合Decodable,因为我们需要将 JSON 数据解码为 SwiftUI 数据。它还将符合 Identifiable ,因此我们稍后可以在 ForEach …

Flutter学习目录

学习Dart语言 官网&#xff1a;https://dart.cn/ 快速入门&#xff1a;Dart 语言开发文档&#xff08;dart.cn/guides&#xff09; 学习Flutter Flutter生命周期 点击跳转Flutter更换主题 点击跳转StatelessWidget和StatefulWidget的区别 点击跳转学习Flutter中新的Navigato…