对接中泰极速行情 | DolphinDB XTP 插件使用教程

XTP 是中泰证券推出的高性能交易平台,专为专业投资者提供高速行情及交易系统,旨在提供优质便捷的市场接入通道。目前支持股票、基金、ETF、债券、期权等多个市场,可满足不同投资者需求。

基于 XTP 官方 C++ SDK,DolphinDB 开发了 XTP 插件,提供便捷高效的方式将 XTP 实时行情数据导入 DolphinDB,进行流计算处理及持久化存储。

本文介绍如何使用该插件实现节点启动时自动订阅 XTP 行情数据并存储至 DolphinDB 分布式数据库。全部代码需在 2.00.11 或更高版本的 DolphinDB 服务器及插件上运行,目前仅支持 Linux 系统。

1. DolphinDB XTP 行情插件介绍

DolphinDB XTP 行情插件支持通过参数配置的方式选择使用 TCP 或 UDP 的方式进行连接,插件通过调用 XTP API 实现行情订阅以及数据接收功能,具体实现严格参照 XTP 官方 API 文档:xtp-中泰证券。

XTP 行情插件目前支持行情源类型如下:

快照逐笔成交逐笔委托订单簿
指数支持
股票支持支持支持支持
基金支持支持支持支持
债券支持支持支持支持
期权支持

表1-1 XTP 行情支持概览

该插件旨在提供高效便捷的方式接入 XTP 行情数据,并与 DolphinDB 无缝集成,支持对行情数据进行流式计算、分析、加工和持久化存储。使用者可根据需求灵活订阅不同市场及数据类型的行情,满足各种应用场景。

2. 基本使用介绍

2.1 安装插件

XTP 行情插件暂未正式发布,试用请联系 DolphinDB 小助手获取插件压缩包:添加 dolphindb1 或私信 DolphinDB 知乎官方账号。

获取插件压缩包后,请将 XTP 行情插件解压至 xtp 目录,其中包括:

  • libPluginXTP.so
  • libxtpquoteapi.so
  • PluginXTP.txt
  • README.md

将 xtp 文件夹及其目录下内容移至 /YOUR_DOLPHINDB_PATH/server/plugins/ 下即可完成安装:

mv ./xtp /YOUR_DOLPHINDB_PATH/server/plugins/

2.2 加载插件

在使用插件之前,我们需要通过 loadPlugin 函数加载插件:

loadPlugin("xtp")

插件加载成功,则返回 XTP 行情插件所提供的所有函数:

图2-1 插件成功加载返回图

此外,需要注意,如果重复执行 loadPlugin 加载插件,会抛出模块已经被使用的错误提示,因为节点启动后,只允许加载一次 XTP 插件,即可在任意会话中调用该插件提供的函数。错误提示如下:

The module [XTP] is already in use.

可以通过 try-cach 语句捕获这个错误,避免因为插件已加载而中断后续脚本代码的执行:

try{loadPlugin("xtp")}catch(ex){print(ex)}

此外,节点重启需重新加载插件。

3. 通过 XTP 行情插件将实时行情数据写入分布式数据库

本章将介绍如何通过 XTP 行情插件,订阅交易所实时行情数据并写入分布式数据库落盘保存。下图以现货快照数据和逐笔委托数据为例,展示了从订阅到入库的完成流程:

图3-1 XTP 实时行情入库示意图

  • 通过 XTP 插件订阅逐笔数据写入 DolphinDB actualMarketDataStream 和 entrustStream 两个持久化流数据表。持久化流数据表是具备发布订阅功能的内存表。
  • 订阅持久化流数据表,将数据写入DolphinDB分布式数据库。分布式数据库将数据持久化存储到磁盘上。

注意:请勿直接将行情数据写入分布式数据库,因为分布式数据库不适合高频逐条写入。建议利用流数据表及其发布订阅功能实现微批处理,以提高写入吞吐量。

下面分步骤介绍关键的 DolphinDB 代码实现,完整脚本见附录。

3.1 清理环境(可选)

为确保示例脚本可重复执行,提供了流环境清理脚本。由于相同流数据表名和订阅无法重复定义,因此需先取消相关订阅并清除所需流数据表。

try {xtpConn = XTP::getHandle("xtpConn")XTP::closeXTPConnection(xtpConn)
}
catch (ex) {print(ex)
}
go
// 取消订阅
try { unsubscribeTable(tableName="actualMarketDataStream", actionName="actualMarketDataAction") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="entrustStream", actionName="entrustAction") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="tradeStream", actionName="tradeAction") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="stateStream", actionName="stateAction") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="orderBookStream", actionName="orderBookAction") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="indexMarketDataStream", actionName="indexMarketDataAction") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="optionMarketDataStream", actionName="optionMarketDataAction") } catch(ex) { print(ex) }
try { unsubscribeTable(tableName="bondMarketDataStream", actionName="bondMarketDataAction") } catch(ex) { print(ex) }
go
// 取消流表
try { dropStreamTable(tableName="actualMarketDataStream") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="entrustStream") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="tradeStream") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="stateStream") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="orderBookStream") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="indexMarketDataStream") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="optionMarketDataStream") } catch(ex) { print(ex) }
try { dropStreamTable(tableName="bondMarketDataStream") } catch(ex) { print(ex) }

3.2 创建库表

运行以下语句创建数据库和分区表,包括:

  • 创建数据库
  • 通过 XTP::getSchema 方法获取表结构
  • 创建分区表

根据数据频率与常用场景,将 actualMarketData,entrust,trade,state,orderBook 数据放在同一库内,并分别建表。另有 indexMarketData,optionMarketData,bondMarketData 的快照数据,单独建库建表。分区规则参考:《基于 DolphinDB 存储金融数据的分区方案最佳实践》。

login("admin", "123456")
// 现货快照 + 逐笔成交 + 逐笔委托 + 逐笔状态 + 订单簿
// 现货快照包含:股票、基金ETF、可转债的快照数据
// 创建数据库
if (!existsDatabase("dfs://XTP.actual")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 50])db = database("dfs://XTP.actual", COMPO, [dbVALUE, dbHASH], , `TSDB)
}
else {db = database("dfs://XTP.actual")
}
// 获取表结构
actualSchema = table(1:0, XTP::getSchema(`actualMarketData)['name'], XTP::getSchema(`actualMarketData)['typeString'])
entrustSchema = table(1:0, XTP::getSchema(`entrust)['name'], XTP::getSchema(`entrust)['typeString'])
tradeSchema = table(1:0, XTP::getSchema(`trade)['name'], XTP::getSchema(`trade)['typeString'])
stateSchema = table(1:0, XTP::getSchema(`state)['name'], XTP::getSchema(`state)['typeString'])
orderBookSchema = table(1:0, XTP::getSchema(`orderBook)['name'], XTP::getSchema(`orderBook)['typeString'])
// 创建分区表
if (existsTable("dfs://XTP.actual", "actualMarketData")) {actualPt = loadTable("dfs://XTP.actual", "actualMarketData")
}
else {actualPt = db.createPartitionedTable(actualSchema, "actualMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)
}
if (existsTable("dfs://XTP.actual", "entrust")) {entrustPt = loadTable("dfs://XTP.actual", "entrust")
}
else {entrustPt = db.createPartitionedTable(entrustSchema, "entrust", `dataTime`ticker, {seq: "delta", dataTime: "delta", entrustSeq: "delta"}, `ticker`dataTime)
}
if (existsTable("dfs://XTP.actual", "trade")) {tradePt = loadTable("dfs://XTP.actual", "trade")
}
else {tradePt = db.createPartitionedTable(tradeSchema, "trade", `dataTime`ticker, {seq: "delta", dataTime: "delta", tradeSeq: "delta"}, `ticker`dataTime)
}
if (existsTable("dfs://XTP.actual", "state")) {statePt = loadTable("dfs://XTP.actual", "state")
}
else {statePt = db.createPartitionedTable(stateSchema, "state", `dataTime`ticker, {seq: "delta", dataTime: "delta"}, `ticker`dataTime)
}
if (existsTable("dfs://XTP.actual", "orderBook")) {orderBookPt = loadTable("dfs://XTP.actual", "orderBook")
}
else {orderBookPt = db.createPartitionedTable(orderBookSchema, "orderBook", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)
}// 指数快照
if (!existsDatabase("dfs://XTP.index")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 5])db = database("dfs://XTP.index", COMPO, [dbVALUE, dbHASH], , `TSDB)
}
else {db = database("dfs://XTP.index")
}
indexSchema = table(1:0, XTP::getSchema(`indexMarketData)['name'], XTP::getSchema(`indexMarketData)['typeString'])
if (existsTable("dfs://XTP.index", "indexMarketData")) {indexPt = loadTable("dfs://XTP.index", "indexMarketData")
}
else {indexPt = db.createPartitionedTable(indexSchema, "indexMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)
}// 期权快照
if (!existsDatabase("dfs://XTP.option")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 5])db = database("dfs://XTP.option", COMPO, [dbVALUE, dbHASH], , `TSDB)
}
else {db = database("dfs://XTP.option")
}
optionSchema = table(1:0, XTP::getSchema(`optionMarketData)['name'], XTP::getSchema(`optionMarketData)['typeString'])
if (existsTable("dfs://XTP.option", "optionMarketData")) {optionPt = loadTable("dfs://XTP.option", "optionMarketData")
}
else {optionPt = db.createPartitionedTable(optionSchema, "optionMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)
}// 债券快照
if (!existsDatabase("dfs://XTP.bond")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 20])db = database("dfs://XTP.bond", COMPO, [dbVALUE, dbHASH], , `TSDB)
}
else {db = database("dfs://XTP.bond")
}
bondSchema = table(1:0, XTP::getSchema(`bondMarketData)['name'], XTP::getSchema(`bondMarketData)['typeString'])
if (existsTable("dfs://XTP.bond", "bondMarketData")) {bondPt = loadTable("dfs://XTP.bond", "bondMarketData")
}
else {bondPt = db.createPartitionedTable(bondSchema, "bondMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)
}

3.3 建立订阅消费关系

运行以下脚本建立订阅消费关系,包括:

  • 获取表结构
  • 建立持久化流表
  • 定义入库函数
  • 建立订阅消费关系

注意,采用持久化流表来进行处理,需要节点启动之前在配置文件中(单节点:dolohindb.cfg,集群:cluster.cfg)配置参数 persistenceDir ,配置参考功能配置。

// 现货快照 + 逐笔成交 + 逐笔委托 + 逐笔状态 + 订单簿
// 获取表结构
actualSchema = streamTable(1:0, XTP::getSchema(`actualMarketData)['name'], XTP::getSchema(`actualMarketData)['typeString'])
entrustSchema = streamTable(1:0, XTP::getSchema(`entrust)['name'], XTP::getSchema(`entrust)['typeString'])
tradeSchema = streamTable(1:0, XTP::getSchema(`trade)['name'], XTP::getSchema(`trade)['typeString'])
stateSchema = streamTable(1:0, XTP::getSchema(`state)['name'], XTP::getSchema(`state)['typeString'])
orderBookSchema = streamTable(1:0, XTP::getSchema(`orderBook)['name'], XTP::getSchema(`orderBook)['typeString'])
// 建立持久化流表
enableTableShareAndPersistence(table=actualSchema, tableName=`actualMarketDataStream, cacheSize=100000, preCache=1000)
enableTableShareAndPersistence(table=entrustSchema, tableName=`entrustStream, cacheSize=100000, preCache=1000)
enableTableShareAndPersistence(table=tradeSchema, tableName=`tradeStream, cacheSize=100000, preCache=1000)
enableTableShareAndPersistence(table=stateSchema, tableName=`stateStream, cacheSize=100000, preCache=1000)
enableTableShareAndPersistence(table=orderBookSchema, tableName=`orderBookStream, cacheSize=100000, preCache=1000)
go
// 定义入库函数
actualHandler = append!{loadTable("dfs://XTP.actual", "actualMarketData"), }
entrustHandler = append!{loadTable("dfs://XTP.actual", "entrust"), }
tradeHandler = append!{loadTable("dfs://XTP.actual", "trade"), }
stateHandler = append!{loadTable("dfs://XTP.actual", "state"), }
orderBookHandler = append!{loadTable("dfs://XTP.actual", "orderBook"), }
// 订阅
subscribeTable(tableName=`actualMarketDataStream, actionName=`actualMarketDataAction, offset=0, handler=actualHandler, msgAsTable=true, batchSize=1000, throttle=0.1)
subscribeTable(tableName=`entrustStream, actionName=`entrustAction, offset=0, handler=entrustHandler, msgAsTable=true, batchSize=1000, throttle=0.1)
subscribeTable(tableName=`tradeStream, actionName=`tradeAction, offset=0, handler=tradeHandler, msgAsTable=true, batchSize=1000, throttle=0.1)
subscribeTable(tableName=`stateStream, actionName=`stateAction, offset=0, handler=stateHandler, msgAsTable=true, batchSize=1000, throttle=0.1)
subscribeTable(tableName=`orderBookStream, actionName=`orderBookAction, offset=0, handler=orderBookHandler, msgAsTable=true, batchSize=1000, throttle=0.1)// 指数快照
indexSchema = streamTable(1:0, XTP::getSchema(`indexMarketData)['name'], XTP::getSchema(`indexMarketData)['typeString'])
enableTableShareAndPersistence(table=indexSchema, tableName=`indexMarketDataStream, cacheSize=100000, preCache=1000)
go
indexHandler = append!{loadTable("dfs://XTP.index", "indexMarketData"), }
subscribeTable(tableName=`indexMarketDataStream, actionName=`indexMarketDataAction, offset=0, handler=indexHandler, msgAsTable=true, batchSize=1000, throttle=0.1)// 期权快照
optionSchema = streamTable(1:0, XTP::getSchema(`optionMarketData)['name'], XTP::getSchema(`optionMarketData)['typeString'])
enableTableShareAndPersistence(table=optionSchema, tableName=`optionMarketDataStream, cacheSize=100000, preCache=1000)
go
optionHandler = append!{loadTable("dfs://XTP.option", "optionMarketData"), }
subscribeTable(tableName=`optionMarketDataStream, actionName=`optionMarketDataAction, offset=0, handler=optionHandler, msgAsTable=true, batchSize=1000, throttle=0.1)// 债券快照
bondSchema = streamTable(1:0, XTP::getSchema(`bondMarketData)['name'], XTP::getSchema(`bondMarketData)['typeString'])
enableTableShareAndPersistence(table=bondSchema, tableName=`bondMarketDataStream, cacheSize=100000, preCache=1000)
go
bondHandler = append!{loadTable("dfs://XTP.bond", "bondMarketData"), }
subscribeTable(tableName=`bondMarketDataStream, actionName=`bondMarketDataAction, offset=0, handler=bondHandler, msgAsTable=true, batchSize=1000, throttle=0.1)

3.4 订阅 XTP 行情实时写入分布式表

运行以下脚本订阅XTP实时行情并写入分布式表,包括:

  • XTP 账号的信息配置与登录
  • XTP 快照行情接入
  • XTP 逐笔行情接入
  • XTP 订单簿行情接入

注意,XTP 本身的设计要求按照行情类别来进行订阅,因此若无特殊需求则直接按照如下代码进行接入即可。具体规则可以访问:xtp-中泰证券

try { XTP::setGlobalConfig(1, "./plugins/xtp/", 3) } catch(ex) { print(ex) }
go
// 创建连接
xtpConn = XTP::createXTPConnection("xtpConn")
// XTP 账户信息配置
xtpConfig = dict(STRING, ANY);
xtpConfig["ip"] = "111.111.111.111";
xtpConfig["port"] = 1111;
xtpConfig["user"] = "11111111111";
xtpConfig["password"] = "11111111111";  // 没有密码的情况下,请与user输入相同内容
xtpConfig["protocalType"] = 1;    //1 是 TCP 2 是 UDP, 测试环境只有TCP
xtpConfig["heartBeatInterval"] = 60;
go
// 登录XTP!
XTP::login(xtpConn, xtpConfig)// 接入快照行情
tableDict = dict(STRING, ANY);
tableDict["indexTable"] = indexMarketDataStream
tableDict["optionTable"] = optionMarketDataStream
tableDict["actualTable"] = actualMarketDataStream
tableDict["bondTable"] = bondMarketDataStream
go
XTP::subscribe(xtpConn, 1, 4, , tableDict)  // 1 for 快照, 4 for 全市场,第四个参数不填,表示接入所有标的
// 接入逐笔行情
tableDict = dict(STRING, ANY);
tableDict["entrustTable"] = entrustStream
tableDict["tradeTable"] = tradeStream
tableDict["statusTable"] = stateStream
go
XTP::subscribe(xtpConn, 2, 4, , tableDict)  // 2 for 逐笔,4 for 沪深两市,第三个参数不填,表示接入所有股票
// 接入订单簿
tableDict = dict(STRING, ANY);
tableDict["orderBookTable"] = orderBookStream
go
XTP::subscribe(xtpConn, 3, 4, , tableDict)  // 3 for 订单簿,4 for 沪深两市,第三个参数不填,表示接入所有股票

运行完成后,XTP 插件会接受 XTP 实时行情,将行情写入持久化流表,订阅关系将消费流表中内容,并将数据写入分区表落盘保存。

至此,我们已完成 XTP 实时行情的接入全流程!

3.5 通过 XTP::getStatus 查看行情接收情况

XTP 行情插件提供 XTP::getStatus 方法以供用户查看行情接入情况:

xtpConn = XTP::getHandle("xtpConn")
XTP::getStatus(xtpConn)

此处以逐笔数据为例,用户可以根据该方法,观察到对应类别行情数据的开始时间、结束时间、最后处理的消息时间、已处理数据条数、最后错误信息、失败行情条数和最后失败行情时间戳的内容。

图3-2 查看行情接入状况

4.节点启动自动订阅 XTP 实时行情

如果您希望在节点启动时自动订阅 XTP 实时行情,可以通过配置 startup.dos 脚本实现。DolphinDB 系统的启动流程如下:

图4-1 DolphinDB 系统启动流程图

1. 系统初始化脚本(dolphindb.dos

    • 该脚本是必需的,默认加载版本发布目录中的 dolphindb.dos
    • 不建议修改该脚本,因为版本升级时需要用新版本发布包中的系统初始化脚本覆盖。

2. 用户启动脚本(startup.dos

    • 该脚本通过配置参数 startup 后才会执行。
    • 单节点模式在 dolphindb.cfg 中配置,集群模式在 cluster.cfg 中配置,可配置绝对路径或相对路径。
    • 若配置了相对路径或未指定目录,系统会依次搜索本地节点的 home 目录、工作目录和可执行文件所在目录。
    • 配置示例:startup=/DolphinDB/server/startup.dos

将附件中的代码添加到 /DolphinDB/server 目录的 startup.dos 文件中,并在相应的配置文件中配置参数 startup,即可实现节点启动时自动订阅。

3. 定时任务脚本(postStart.dos

    • DolphinDB 中通过 scheduleJob 函数定义的定时任务会持久化。
    • 重启节点时,系统先执行用户启动脚本,然后在初始化定时任务模块时完成持久化定时任务的加载。
    • 完成上述步骤后,系统会执行定时任务脚本,此时可以调用 scheduleJob 函数定义新的定时任务。
    • 本教程未使用该功能,无需开启该配置项。1.30.15 和 2.00.3 版本开始支持配置 postStart.dos 实现节点启动自动执行定时任务脚本。

注意

  • XTP 的账户信息需要根据实际环境进行修改。

5. 附录

  • 详细启动脚本配置可以参考官网文档教程:启动脚本教程。
  • 关于节点启动时自动订阅处理业务的部署可以参考官网文档教程:节点启动时的流计算自动订阅教程。
  • startup.dos 启动脚本(账户信息需要根据用户实际情况进行修改)。
// 加载插件
login(`admin, `123456)
try{loadPlugin("./plugins/xtp/PluginXTP.txt")}catch(ex){print(ex)}
go// 清理环境
def cleanEnvironment() {try {xtpConn = XTP::getHandle("xtpConn")XTP::closeXTPConnection(xtpConn)}catch (ex) {print(ex)}go// 取消订阅try { unsubscribeTable(tableName="actualMarketDataStream", actionName="actualMarketDataAction") } catch(ex) { print(ex) }try { unsubscribeTable(tableName="entrustStream", actionName="entrustAction") } catch(ex) { print(ex) }try { unsubscribeTable(tableName="tradeStream", actionName="tradeAction") } catch(ex) { print(ex) }try { unsubscribeTable(tableName="stateStream", actionName="stateAction") } catch(ex) { print(ex) }try { unsubscribeTable(tableName="orderBookStream", actionName="orderBookAction") } catch(ex) { print(ex) }try { unsubscribeTable(tableName="indexMarketDataStream", actionName="indexMarketDataAction") } catch(ex) { print(ex) }try { unsubscribeTable(tableName="optionMarketDataStream", actionName="optionMarketDataAction") } catch(ex) { print(ex) }try { unsubscribeTable(tableName="bondMarketDataStream", actionName="bondMarketDataAction") } catch(ex) { print(ex) }go// 取消流表try { dropStreamTable(tableName="actualMarketDataStream") } catch(ex) { print(ex) }try { dropStreamTable(tableName="entrustStream") } catch(ex) { print(ex) }try { dropStreamTable(tableName="tradeStream") } catch(ex) { print(ex) }try { dropStreamTable(tableName="stateStream") } catch(ex) { print(ex) }try { dropStreamTable(tableName="orderBookStream") } catch(ex) { print(ex) }try { dropStreamTable(tableName="indexMarketDataStream") } catch(ex) { print(ex) }try { dropStreamTable(tableName="optionMarketDataStream") } catch(ex) { print(ex) }try { dropStreamTable(tableName="bondMarketDataStream") } catch(ex) { print(ex) }
}// 创建库表
def createDbAndPt() {// 现货快照 + 逐笔成交 + 逐笔委托 + 逐笔状态 + 订单簿// 现货快照包含:股票、基金ETF、可转债的快照数据// 创建数据库if (!existsDatabase("dfs://XTP.actual")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 50])db = database("dfs://XTP.actual", COMPO, [dbVALUE, dbHASH], , `TSDB)}else {db = database("dfs://XTP.actual")}// 获取表结构actualSchema = table(1:0, XTP::getSchema(`actualMarketData)['name'], XTP::getSchema(`actualMarketData)['typeString'])entrustSchema = table(1:0, XTP::getSchema(`entrust)['name'], XTP::getSchema(`entrust)['typeString'])tradeSchema = table(1:0, XTP::getSchema(`trade)['name'], XTP::getSchema(`trade)['typeString'])stateSchema = table(1:0, XTP::getSchema(`state)['name'], XTP::getSchema(`state)['typeString'])orderBookSchema = table(1:0, XTP::getSchema(`orderBook)['name'], XTP::getSchema(`orderBook)['typeString'])// 创建分区表if (existsTable("dfs://XTP.actual", "actualMarketData")) {actualPt = loadTable("dfs://XTP.actual", "actualMarketData")}else {actualPt = db.createPartitionedTable(actualSchema, "actualMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)}if (existsTable("dfs://XTP.actual", "entrust")) {entrustPt = loadTable("dfs://XTP.actual", "entrust")}else {entrustPt = db.createPartitionedTable(entrustSchema, "entrust", `dataTime`ticker, {seq: "delta", dataTime: "delta", entrustSeq: "delta"}, `ticker`dataTime)}if (existsTable("dfs://XTP.actual", "trade")) {tradePt = loadTable("dfs://XTP.actual", "trade")}else {tradePt = db.createPartitionedTable(tradeSchema, "trade", `dataTime`ticker, {seq: "delta", dataTime: "delta", tradeSeq: "delta"}, `ticker`dataTime)}if (existsTable("dfs://XTP.actual", "state")) {statePt = loadTable("dfs://XTP.actual", "state")}else {statePt = db.createPartitionedTable(stateSchema, "state", `dataTime`ticker, {seq: "delta", dataTime: "delta"}, `ticker`dataTime)}if (existsTable("dfs://XTP.actual", "orderBook")) {orderBookPt = loadTable("dfs://XTP.actual", "orderBook")}else {orderBookPt = db.createPartitionedTable(orderBookSchema, "orderBook", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)}// 指数快照if (!existsDatabase("dfs://XTP.index")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 5])db = database("dfs://XTP.index", COMPO, [dbVALUE, dbHASH], , `TSDB)}else {db = database("dfs://XTP.index")}indexSchema = table(1:0, XTP::getSchema(`indexMarketData)['name'], XTP::getSchema(`indexMarketData)['typeString'])if (existsTable("dfs://XTP.index", "indexMarketData")) {indexPt = loadTable("dfs://XTP.index", "indexMarketData")}else {indexPt = db.createPartitionedTable(indexSchema, "indexMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)}// 期权快照if (!existsDatabase("dfs://XTP.option")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 5])db = database("dfs://XTP.option", COMPO, [dbVALUE, dbHASH], , `TSDB)}else {db = database("dfs://XTP.option")}optionSchema = table(1:0, XTP::getSchema(`optionMarketData)['name'], XTP::getSchema(`optionMarketData)['typeString'])if (existsTable("dfs://XTP.option", "optionMarketData")) {optionPt = loadTable("dfs://XTP.option", "optionMarketData")}else {optionPt = db.createPartitionedTable(optionSchema, "optionMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)}// 债券快照if (!existsDatabase("dfs://XTP.bond")) {dbVALUE = database(, VALUE, 2023.01.01..2023.01.02)dbHASH = database(, HASH, [SYMBOL, 20])db = database("dfs://XTP.bond", COMPO, [dbVALUE, dbHASH], , `TSDB)}else {db = database("dfs://XTP.bond")}bondSchema = table(1:0, XTP::getSchema(`bondMarketData)['name'], XTP::getSchema(`bondMarketData)['typeString'])if (existsTable("dfs://XTP.bond", "bondMarketData")) {bondPt = loadTable("dfs://XTP.bond", "bondMarketData")}else {bondPt = db.createPartitionedTable(bondSchema, "bondMarketData", `dataTime`ticker, {dataTime: "delta"}, `ticker`dataTime)}
}def createStreamTableAndSubscribe() {// 现货快照 + 逐笔成交 + 逐笔委托 + 逐笔状态 + 订单簿// 获取表结构actualSchema = streamTable(1:0, XTP::getSchema(`actualMarketData)['name'], XTP::getSchema(`actualMarketData)['typeString'])entrustSchema = streamTable(1:0, XTP::getSchema(`entrust)['name'], XTP::getSchema(`entrust)['typeString'])tradeSchema = streamTable(1:0, XTP::getSchema(`trade)['name'], XTP::getSchema(`trade)['typeString'])stateSchema = streamTable(1:0, XTP::getSchema(`state)['name'], XTP::getSchema(`state)['typeString'])orderBookSchema = streamTable(1:0, XTP::getSchema(`orderBook)['name'], XTP::getSchema(`orderBook)['typeString'])// 建立持久化流表enableTableShareAndPersistence(table=actualSchema, tableName=`actualMarketDataStream, cacheSize=100000, preCache=1000)enableTableShareAndPersistence(table=entrustSchema, tableName=`entrustStream, cacheSize=100000, preCache=1000)enableTableShareAndPersistence(table=tradeSchema, tableName=`tradeStream, cacheSize=100000, preCache=1000)enableTableShareAndPersistence(table=stateSchema, tableName=`stateStream, cacheSize=100000, preCache=1000)enableTableShareAndPersistence(table=orderBookSchema, tableName=`orderBookStream, cacheSize=100000, preCache=1000)go// 定义入库函数actualHandler = append!{loadTable("dfs://XTP.actual", "actualMarketData"), }entrustHandler = append!{loadTable("dfs://XTP.actual", "entrust"), }tradeHandler = append!{loadTable("dfs://XTP.actual", "trade"), }stateHandler = append!{loadTable("dfs://XTP.actual", "state"), }orderBookHandler = append!{loadTable("dfs://XTP.actual", "orderBook"), }// 订阅subscribeTable(tableName=`actualMarketDataStream, actionName=`actualMarketDataAction, offset=0, handler=actualHandler, msgAsTable=true, batchSize=1000, throttle=0.1)subscribeTable(tableName=`entrustStream, actionName=`entrustAction, offset=0, handler=entrustHandler, msgAsTable=true, batchSize=1000, throttle=0.1)subscribeTable(tableName=`tradeStream, actionName=`tradeAction, offset=0, handler=tradeHandler, msgAsTable=true, batchSize=1000, throttle=0.1)subscribeTable(tableName=`stateStream, actionName=`stateAction, offset=0, handler=stateHandler, msgAsTable=true, batchSize=1000, throttle=0.1)subscribeTable(tableName=`orderBookStream, actionName=`orderBookAction, offset=0, handler=orderBookHandler, msgAsTable=true, batchSize=1000, throttle=0.1)// 指数快照indexSchema = streamTable(1:0, XTP::getSchema(`indexMarketData)['name'], XTP::getSchema(`indexMarketData)['typeString'])enableTableShareAndPersistence(table=indexSchema, tableName=`indexMarketDataStream, cacheSize=100000, preCache=1000)goindexHandler = append!{loadTable("dfs://XTP.index", "indexMarketData"), }subscribeTable(tableName=`indexMarketDataStream, actionName=`indexMarketDataAction, offset=0, handler=indexHandler, msgAsTable=true, batchSize=1000, throttle=0.1)// 期权快照optionSchema = streamTable(1:0, XTP::getSchema(`optionMarketData)['name'], XTP::getSchema(`optionMarketData)['typeString'])enableTableShareAndPersistence(table=optionSchema, tableName=`optionMarketDataStream, cacheSize=100000, preCache=1000)gooptionHandler = append!{loadTable("dfs://XTP.option", "optionMarketData"), }subscribeTable(tableName=`optionMarketDataStream, actionName=`optionMarketDataAction, offset=0, handler=optionHandler, msgAsTable=true, batchSize=1000, throttle=0.1)// 债券快照bondSchema = streamTable(1:0, XTP::getSchema(`bondMarketData)['name'], XTP::getSchema(`bondMarketData)['typeString'])enableTableShareAndPersistence(table=bondSchema, tableName=`bondMarketDataStream, cacheSize=100000, preCache=1000)gobondHandler = append!{loadTable("dfs://XTP.bond", "bondMarketData"), }subscribeTable(tableName=`bondMarketDataStream, actionName=`bondMarketDataAction, offset=0, handler=bondHandler, msgAsTable=true, batchSize=1000, throttle=0.1)
}def connectToXTP(xtpConfig) {try { XTP::setGlobalConfig(1, "./plugins/xtp/", 3) } catch(ex) { print(ex) }go// 创建连接xtpConn = XTP::createXTPConnection("xtpConn")// 登录XTP!XTP::login(xtpConn, xtpConfig)return xtpConn
}def subscribeXTP(xtpConn, indexMarketDataStream, optionMarketDataStream, actualMarketDataStream, bondMarketDataStream, entrustStream, tradeStream, stateStream, orderBookStream) {// 接入快照行情tableDict = dict(STRING, ANY);tableDict["indexTable"] = indexMarketDataStreamtableDict["optionTable"] = optionMarketDataStreamtableDict["actualTable"] = actualMarketDataStreamtableDict["bondTable"] = bondMarketDataStreamgoXTP::subscribe(xtpConn, 1, 4, , tableDict)  // 1 for 快照, 4 for 全市场,第四个参数不填,表示接入所有标的// 接入逐笔行情tableDict = dict(STRING, ANY);tableDict["entrustTable"] = entrustStreamtableDict["tradeTable"] = tradeStreamtableDict["statusTable"] = stateStreamgoXTP::subscribe(xtpConn, 2, 4, , tableDict)  // 2 for 逐笔,4 for 沪深两市,第三个参数不填,表示接入所有股票// 接入订单簿tableDict = dict(STRING, ANY);tableDict["orderBookTable"] = orderBookStreamgoXTP::subscribe(xtpConn, 3, 4, , tableDict)  // 3 for 订单簿,4 for 沪深两市,第三个参数不填,表示接入所有股票
}// XTP 账户信息配置
xtpConfig = dict(STRING, ANY);
xtpConfig["ip"] = "111.111.111.111";
xtpConfig["port"] = 1111;
xtpConfig["user"] = "11111111111";
xtpConfig["password"] = "11111111111";  // 没有密码的情况下,请与user输入相同内容
xtpConfig["protocalType"] = 1;    //1 是 TCP 2 是 UDP, 测试环境只有TCP
xtpConfig["heartBeatInterval"] = 60;
gocleanEnvironment()
createDbAndPt()
createStreamTableAndSubscribe()
xtpConn = connectToXTP(xtpConfig)
go  // 分段执行
subscribeXTP(xtpConn, indexMarketDataStream, optionMarketDataStream, actualMarketDataStream, bondMarketDataStream, entrustStream, tradeStream, stateStream, orderBookStream)
writeLog("Subsribe to XTP market data successfully!")xtpConn = XTP::getHandle("xtpConn")
XTP::getStatus(xtpConn)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/784778.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用hping3网络工具构造TCP/IP数据包和进行DDos攻击

1 概述 hping3是一个强大的命令行工具,用于生成、发送和解析TCP/IP协议的数据包。它是开源的网络安全工具,由Salvatore Sanfilippo开发,主要应用于网络审计、安全测试和故障排查等领域。hping3不仅可以作为普通的网络连通性检测工具&#xf…

【蓝桥杯第十三届省赛B】(部分详解)

九进制转十进制 #include <iostream> #include<math.h> using namespace std; int main() {cout << 2*pow(9,3)0*pow(9,2)2*pow(9,1)2*pow(9,0) << endl;return 0; }顺子日期 #include <iostream> using namespace std; int main() {// 请在此…

ROS2从入门到精通1-2:详解ROS2服务通信机制与自定义服务

目录 0 专栏介绍1 服务通信模型2 服务模型实现(C)3 服务模型实现(Python)4 自定义服务5 话题、服务通信的异同 0 专栏介绍 本专栏旨在通过对ROS2的系统学习&#xff0c;掌握ROS2底层基本分布式原理&#xff0c;并具有机器人建模和应用ROS2进行实际项目的开发和调试的工程能力。…

智慧公厕是什么?智慧公厕的主要功能、特点?

智慧公厕&#xff0c;顾名思义&#xff0c;是指应用了智能科技的公共厕所&#xff0c;旨在提供更加便捷、舒适、智能化的卫生服务。相比传统的公厕&#xff0c;智慧公厕不仅拥有更加智能化的设备&#xff0c;还配备了远程监控与管理系统&#xff0c;以及节能环保技术&#xff0…

如何选择优质的外贸网站建设公司?

在当今数字化时代&#xff0c;外贸行业越来越重视在线渠道的发展&#xff0c;而外贸网站建设作为企业对外联系的重要窗口&#xff0c;扮演着至关重要的角色。选择一家优质的外贸网站建设公司&#xff0c;不仅能帮助企业提升品牌形象&#xff0c;还能有效扩大海外市场。那么&…

微信小程序【从入门到精通】——服务器的数据交互

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

linux C:变量、运算符

linux C 文章目录 变量运算符 一、变量 [存储类型] 数据类型 标识符 值 标识符&#xff1a;由数字、字母、下划线组成的序列&#xff0c;不能以数字开头。 数据类型&#xff1a;基本数据类型构造类型 存储类型&#xff1a;auto static…

Linux(CentOS7)配置系统服务以及开机自启动

目录 前言 两种方式 /etc/systemd/system/ 进入 /etc/systemd/system/ 文件夹 创建 nginx.service 文件 重新加载 systemd 配置文件 ​编辑 配置开机自启 /etc/init.d/ 进入 /etc/init.d/ 文件夹 创建 mysql 文件 编写脚本内容 添加/删除系统服务 配置开机自启 …

【MySQL笔记】SELECT COUNT(*) 的时候,加不加where条件有差别吗?

文章目录 前言实验结论 前言 这部分很多帖子都只在问题里罗列下&#xff0c;好像也没详细解答 其实就是跟InnoDB优先走二级索引的优化有关&#xff0c;前面也提到了”优化的前提是查询语句中不包含where条件和group by条件“ 还不太了解这个优化的朋友可以看上一篇帖子 实验 …

编曲知识13:弦乐技法应用 合成器应用 声场摆位

弦乐技法 技法分类 Sustain(长音)类: Legato、Port、Gliss、Tremolo、Trills Staccato(短音)类: Staccato、Pizzicato、Spiccato Legato:连奏 Port:滑音 Gliss:慢速滑音 Tremolo:震音 Trills:颤音 Staccato:顿弓 Pizzicato:拨奏 Spiccato:跳弓 长音类技法 主…

从0到1:兼职招聘小程序开发笔记(一)

可行性分析 兼职招聘小程序&#xff1a;为雇主和求职者提供便利的平台&#xff0c;旨在帮助雇主招聘兼职员工&#xff0c;并让求职者寻找合适的兼职工作。提供简单、快捷的方式来匹配兼职岗位和候选人&#xff0c;节省了招聘和求职的时间和精力。其主要功能模块包括&#xff1…

C练习题(1)

变种水仙花&#xff08;来自牛课网&#xff09; 题目 变种水仙花数 - Lily Number&#xff1a;把任意的数字&#xff0c;从中间拆分成两个数字&#xff0c;比如1461 可以拆分成&#xff08;1和461&#xff09;,&#xff08;14和61&#xff09;,&#xff08;146和1),如果所有拆…

力扣刷题Days29-128.最长连续数列(js)

目录 1&#xff0c;题目 2&#xff0c;代码 2.1自己实现 2.2哈希表 3&#xff0c;学习与收获 枚举思想&#xff1a; 遍历的核心逻辑 碎碎念 本题 先是想到利用数组排序&#xff0c;从而简化遍历处理逻辑&#xff0c;再在提交错误提醒的情况下&#xff0c;考虑到数组中存…

基于FreeRTOS系统的STM32简易遥控器设计

项目说明 该项目是一个基于FreeRTOS系统的Stm32遥控器设计。使用该项目主要是自己学习FreeRTOS的使用&#xff0c;以及模块化编程的思想。这个项目应该长期会有更新。 项目开源 github:https://github.com/snqx-lqh/Stm32RemoteControl gitee:https://gitee.com/snqx-lqh/S…

Github 2024-04-01 开源项目月报 Top20

根据Github Trendings的统计,本月(2024-04-01统计)共有20个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目9TypeScript项目2非开发语言项目2Jupyter Notebook项目2HTML项目1CSS项目1C#项目1Shell项目1Lua项目1JavaScript项目1C项目1Java项目…

了解 LoadRunner 性能测试软件及其基础使用

目录 一、了解LoadRunner 1、什么是Loadrunner&#xff1f; 2、Loadrunner包括什么组件&#xff1f; &#xff08;1&#xff09;前台组件 &#xff08;2&#xff09;后台组件 二、LoadRunner三大组件 1、VuGen&#xff08;虚拟用户脚本生成器&#xff09; &#xff08;…

详细分析Mysql中的STR_TO_DATE基本知识(全)

目录 前言1. 基本知识2. Demo3. 实战Demo4. Sql彩蛋4.1 LPAD函数4.2 SUBSTRING_INDEX函数 5. Java彩蛋 前言 对于该知识点&#xff0c;主要因为数据库类型为String&#xff08;类似2024-03-26&#xff09;&#xff0c;放置于后端操作后&#xff0c;需要自定义比较&#xff0c;…

【QT+QGIS跨平台编译】054:【exiv2lib_int+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

点击查看专栏目录 文章目录 一、exiv2lib_int介绍二、文件下载三、文件分析四、pro文件五、编译实践一、exiv2lib_int介绍 exiv2lib_int是 exiv2 这个开源的图像元数据库中的一个组件。 Exiv2是一个开源的C++库,用于读取、编辑和写入图片和视频文件的元数据。它可以处理各种类…

选择Six Sigma咨询公司,看准这几点不踩坑!

在如今的市场环境中&#xff0c;Six Sigma作为一种追求卓越的管理方法&#xff0c;受到了越来越多企业的青睐。然而&#xff0c;面对众多的Six Sigma咨询公司&#xff0c;企业往往感到困惑&#xff1a;究竟哪家公司更适合自己&#xff1f;今天&#xff0c;我们就来聊聊如何挑选…

ES2024即将发布!5个可能大火的JS新方法

文章目录 01&#xff1a;Promise.withResolvers02&#xff1a;Object.groupBy()03&#xff1a;Temporal04&#xff1a;Records 和 Tuples05&#xff1a;装饰器&#xff08;Decorators&#xff09;其他 ECMAScript 2024&#xff08;ES15&#xff09; 即将发布&#xff08;2024年…