说明
所有的基于时间处理和运行的程序将以同样的节奏同步和执行
TT(TimeTraveller)是一个新的设计,它最初会服务与量化过程的大量任务管理:分散开发、协同运行。但是很显然,TT的功能将远不止于此,它将服务大量的,基于时间游走特性的各种任务。
内容
1 概念
以量化场景作为假设,讨论所需要的功能,以及TT在其中所起的作用。
- 1 数据获取。程序需要在每个时隙启动,从接口或者网页获取新的数据,然后落库。
- 2 数据流转。这个不在这里讨论,假设数据流转的结构和渠道已经固定,会有一个程序来专门确保流转。
- 3 数据特征。程序在每个时隙启动,回顾过去n个时隙(brick, block)的内容,生成当前时隙新的特征。
- 4 数据决策。程序在每个时隙启动,回顾过去时隙的内容,生成当前时隙的决策。
- 5 过程跟随。程序在每个时隙检查检查采取的行动,给出当前时隙的新建议。
- 6 决策评估。在每个时隙评估情况,并作出参数改变。
- …
以上的每一个过程都是 一个相对独立且复杂的过程,如果只是case by case的开发,最后会发现情况及其混乱,难以维护。
所以,是否可以有一种机制或工具,可以基本上统一以上所有的过程?
时隙 :假设,程序只会在每个时隙启动并处理一次,正常情况下,最新的决策、行动、数据、信息可能会落后最实时的时隙1~10个。这主要是由流程的深度决定的,同时也考虑了几乎无限逻辑复杂度情况下的可同步单位。这个单位目前来看是1分钟。
时间特性:
- 1 顺序性。时间会按照严格统一的顺序向前推进,不可能跨时隙跳跃。这意味着,要处理的数据顺序和步骤是完全确定的。同时,站在某个时隙时不可能观察到未来时隙 ,技术上说,某个程序在t时隙启动,只能看到<t的时隙数据。
- 2 单向性。只可能从小到大的执行。
【Method】GoBack: 某个过程在执行过程中出错,在T时刻时发现,并且知道在T’时刻(T’<T)是正常的,那么我们回到T’状态并重新执行的过程叫GoBack。一个GoBack过程可以是幂等,也可以是分叉,这个由过程本身决定。当引入了生成机制,那么GoBack过程不是幂等的,结果可能产生变化。
【Method】Run : 执行过程,每次TT实例总是从当前时隙,执行到最新时隙。
【Attr】LastRunSlot : 上一次执行运行的时间。
【Attr】LastDataSlot : 上一次处理到的数据时隙(含)
【Attr】LatestDataSlot : 最新的数据时隙,这是查询当前数据源得到的。
【Attr】t : 当前执行时的时隙
2 前提条件
TT执行时只考虑一个固定机制,这极大的简化了大量工作的同步和协同。但这是建立在一系列机制之上的。
- 1 DataFlow: 这是由Flask-APScheduler-Celery + MongoEngine为基础组成的异步任务调度任务系统系统,主要是完成IO密集操作的并行操作,通过协程可以在不额外增加CPU开销的情况下大幅提升处理能力。这部分对应着以前的sniffers,逻辑上,sniffer主要就是嗅探各种数据变化,然后进行数据传递。 消息队列(Redis Stream、RabbitMQ)、主库(Mongo)、分析库(ClickHouse)构成了数据流的数据库核心。
- 2 GFGoLite:这个本身是一个全局的、无状态的函数服务。与TT相关的是UCS规范相关的实现,当TT需要追溯时间数据时,可能需要通过UCS对象(背后是GFGoLite)操作。
- 3 GlobalBuffer: 首先通过较为严格的tiers命名方法,确保数据始终可以使用kv的方式存取。一方面是程序(对象)本身的状态信息需要暂存(以便加载时可以恢复执行),另一方面是程序依赖的时间数据,需要载入(预载入)以便快速处理。
3 设计
先考虑几个常见的应用场景。
3.1 回测
对于标的A,开发了策略S,我们需要对S进行回测,以确保其效果,特别是对于OOB的表现。
回测会指定一套规则集,然后执行一个时间区间,从头开始执行到尾然后停止。
- 1 顺序加载数据(Read)。指定了开始和结尾之后,通过UCS可以获取brick_list,从而精确获取需要执行的每一个brick。brick数据是结构化的,存在于clickhouse中,加载速度会非常快。(RedisOrClickHouse)
- 2 执行规则。回测过程读取的全部是结构化数据,不包含向量。向量是中间数据(这可能会导致一些delay,所以数据处理和特征生成一般在slot的前半部分执行,而决策在后半部分执行,这样如果调度得当的话,还是有可能只落后一个时隙的)。
- 3 回写数据。单次执行完毕后,会有对应的行动数据需要写入数据库。数据可以分为两部分,一部分是明细数据,需要存入数据库,作为进一步分析,这部分数据推入RedisStream(元数据也顺带推入,作为checkpoint)。;另一部分是运行时元数据,这部分只要缓存在Redis里即可。任何时刻,只要从checkpoint恢复,都可以退回到那个时刻重新开始。
回测时,数据以拉为主,数据在缓存中存在1小时,或者一天,然后自动删除
3.2 运行时
与回测不同,运行时会存在很多空转情况,甚至出现依赖错误。
运行时,一般会处在等待最新数据流入,然后处理的情况。有几个问题是需要考虑的:
- 1 网络连接:【偶发失败、挂起、超时】这不是可选项,而是必选项。网络连接表现为偶发中断,连接完全挂起,超时等。
- 2 数据源更新:【推和拉】部分数据源未更新,全部数据源未更新。数据更多是采取服务端主动推送到缓存中,而不是程序直接去数据库取。
- 3 处理逻辑:分为导入依赖、数据预处理、数据处理和数据后处理几部分。
服务主动将数据推到缓存中,确保服务中存在的缓存数据是最新的。
这里要做的完善的话应该结合类似prometheus之类的工具去做。但是这个目前我没搞,所以会考虑一种折中的方式去实现这个。比较明确的是,在运行上可以用状态机的方法来控制。
-
正常 Normal :程序启动后,按照既定计划完成了数据更新,逻辑计算和结果保存。
- Success
- 滞后 Lag :虽然程序完成了执行,但是上一个时间点和当前运行时间点的差值大于阈值,认为数据出现了滞后。
- 缓慢 Slow : 程序出现了缓慢执行的情况
-
错误 Error: 程序遇到了无法执行的问题
- 读入错误 Read
- 连接错误 Connection
- 超时 OutOfTime
- 处理错误 Processing
- 依赖 Dependency
- 主逻辑 Logic
- 写入错误 Write
- 连接错误 Connection
- 超时 OutOfTime
- 读入错误 Read
3.3 设计
- 1 TT的初始化分为全新初始化和断点续传(checkpoint)两种
- 2 TT的运行分为固定周期执行(fixed-run)和嗅探执行(sniff-run)两种模式。前者一次性检查数据完整性,然后执行;后者使用状态机管理一般运行时状态。
- 3 TT采用Tiers方法进行精确编号,每一个TT实例将会一直使用这个编号。
- 4 TT需要GLobalBuffer作为默认的元数据保存方法。
- 5 TT需要QManager作为默认的数据保存方法。保存数据日志到stream,然后由一个任务来解析这些日志 ,可以考虑存到clickhouse(之前一般觉得mongo比较合适)
- 6 TT需要UCS作为brick的推算方法。这意味着GFGoLite重启会有影响,所以TT需要考虑UCS挂掉一会的情况。
- 7 TT将会使用前端管理,名称的统一命名将会在元数据表中记录。这意味着TT将使用MongoEnige和Mongo集群(mymeta)
一个简单的hypo如下
from typing import List, Optional,Dict
from pydantic import BaseModel# 测试
from Basefuncs import QManager,UCS,GlobalBuffer# Naming ... 先确保命名通过 - 通过MongoEngine进行操作,正确的结果将写入GlobalBuffer# 假设每个实例是从一个名称开始的,所以会先从GlobalBuffer中读取配置来初始化# 这个类的目的是构造通用的处理对象
import requests as req
class TimeTraveller(BaseModel):__version__ = 1.0meta : Dict = {}tiers_name: str = 'tier1.tier2.tier3.option_tier1.option_tier2'qmanager_redis_agent_host: str = 'http://172.17.0.1:24021/'qmanager_batch_size: int = 1000qmanager_q_max_len: int = 100000ucs_gfgo_lite_server: str = 'http://172.17.0.1:24090/'# global buffer同样需要先设置名称 | 空间名称要和这个叠加起来globalbuffer_server: str = 'http://172.17.0.1:24088/'globalbuffer_space_name: str = 'tem_test.test'# 这里可以增加校验 | 例如给到空间的一些解释 # __space_description# 执行初始化校验def __init__(self, **para):super().__init__(**para)print('当前版本:', self.__version__)# 在初始化时对QManager、UCS和GlobalBuffer进行校验def _check_init_parts(self):pass# 读取数据部分: 用户自定义部分,有时候可以为空def read_part(self):pass# 处理数据部分:用户自定义,通常不会为空def process_part(self):pass# 输出数据部分:用户自定义,通常不会为空def write_part(self):pass
4 实现
- 1 在开始之前,先要建立tt的专属消息管道,例如stream_tt_outcome_in和 stream_tt_outcome_out。
- 2 创建一个实例,先按规则构造一个名称;并随之创造对应的redis_var空间。
- 3 假设策略就是SMA,构造对应的函数
- 4 先进行历史区间的静态回测
- 5 再进行最新的运行时测试