流计算状态算子灵活开发指南

随着实时数据流处理需求的不断增长,高效、可扩展的流计算框架变得愈发重要。DolphinDB 作为一款高性能分布式时间序列数据库,不仅在数据存储和查询上表现出色,还通过引入面向对象编程(OOP)编程范式,使得开发者能够通过封装、继承、多态等特性,提升代码的灵活性、可维护性和复用性。本文通过两个实际应用案例,详细介绍如何利用 OOP 在 DolphinDB 中开发状态引擎算子,展示 OOP 在流计算中的应用。

1. 关于 OOP

DolphinDB 从 3.00.0 版本开始支持面向对象编程。面向对象编程(Object-Oriented Programming, OOP)是非常重要的编程范式,其通过封装、继承、多态等特性,提升代码的灵活性、可维护性和复用性,提升代码的模块化,实现低耦合高内聚。

在 DolphinDB 中,OOP 可应用于多种场景,例如:

  • 用于开发流计算状态引擎中的状态算子。在未提供 OOP 时,某些状态算子的开发过程中需要使用复杂的高阶函数,不便于理解代码;某些状态算子需要通过状态函数插件进行开发,开发成本过高。而通过 OOP 编程自定义算子则可以使代码结构清晰,容易理解。
  • 在复杂事件处理(CEP)引擎中,可以利用 OOP 定义事件和编写 Monitor

本教程将主要介绍 OOP 在状态引擎中的应用。

2. DolphinDB OOP 编程概要

2.1 类的定义

类的定义格式如下:

class 类名 {属性1 :: 类型1属性2 :: 类型2// ...// 和类名同名的方法成为构造函数,有且只有一个def 类名(arg1, arg2 /*,...*/) {属性1 = arg1属性2 = arg2// ...}// 需要注意,成员变量和成员函数不能同名。def 方法(arg1, arg2 /*, ...*/) {// ...}
}

例如,我们要定义一个 Person 类,包含两个成员变量:name 和 age。其中,name 是字符串,age 是整数。该类还包含了一个构造函数和 name 成员变量的 getter/setter。

定义如下:

class Person {// 变量声明在方法声明之前name :: STRINGage :: INT// 定义构造函数def Person(name_, age_) { // 参数名不能和属性名相同,否则会覆盖属性名name = name_age = age_}def setName(newName) {name = newName}def getName() {return name}
}

2.2 对象的使用

可以通过 object.method() 的形式调用对象的成员函数;通过 object.member 的形式访问对象的属性。需要注意,和 Python 等脚本语言不同,无法直接通过对 object.member 赋值修改成员变量。如果需要为成员变量赋值,需要创建并使用相应的 setter 方法进行赋值。

p = Person("Zhang San", 30)
print(p.getName())
// 调用对象的方法
p.setName("Li Si")
print(p.getName())
// 引用对象的属性
print(p.name)
p.name = "Wang Wu" // 报错:禁止对对象的属性直接赋值

2.3 对象属性类型标注

定义成员变量的格式为:成员变量名 :: 类型标注

其中类型标注可以是:

  • 标量:包括所有的基本类型,如 INT, DOUBLE, STRING,以及时间类型。
  • 向量:如 DOUBLE VECTOR, STRING VECTOR 或者 Array Vector:INT[] VECTOR。
  • 其他形式/类型不限:如果是其他类型(如字典、函数等),或者不希望限定变量类型,可以把类型标注写成 ANY。

例如:

a :: INT
b :: DOUBLE VECTOR
handler :: ANY

2.4 变量解析

方法中使用到的变量的解析顺序:

  1. 方法参数
  2. 对象属性
  3. 共享变量
share table(1:0, `sym`val, [SYMBOL, INT]) as tbl
go
class Test2 {a :: INTb :: DOUBLEdef Test2() {a = 1b = 2.0}def method(b) {print(b) // 解析为函数参数 b;如果需要访问成员变量 b,需要使用 self,见下一小节print(a) // 解析为对象属性 aprint(tbl) // 解析为共享变量 tblprint(qwert) // 变量不存在,报错}
}

2.5 self 语法

通过 “self” 变量在类的方法中获取对象本身,其类似于 Python 中的 self,或者 Java、C++ 中的 this。

def doSomething(a) {// ...
}class A{a :: INTb :: INTdef A() {a = 1b = 2}def createCallback() {return doSomething{self}}def nameShadow(b) {print(b)print(self.b)}
}
a = A()
handler = a.createCallback()
a.nameShadow(3)

3. 应用案例

Reactive State Engine(RSE)是 DolphinDB 中的一个高性能、可扩展的计算框架,专门用于处理实时流数据。RSE 通过状态算子在数据流中捕捉并维护状态,从而实现增量计算和复杂事件处理。下面我们通过两个案例介绍一下如何利用 OOP 来开发状态引擎算子。

3.1 累计求和算子:MyCumSum

在状态引擎内部,cumsum 算子实现了累计求和的功能。这个功能在状态引擎内的实现非常简单,本节不展开说明,仅说明如何利用 OOP 重新实现该算子。

首先,定义一个类 MyCumSum,并将算子的状态定义为类的成员变量。

其次,在类中实现 append 方法,用于实现累计求和的功能。append 的参数是逐行输入的数据,返回的结果将作为计算结果输出。

最后,定义一个状态引擎,并指定 MyCumSum.append() 为引擎的算子。向引擎中输入数据,并查看计算结果。

代码如下:

class MyCumSum {sum :: DOUBLEdef MyCumSum() {sum = 0.0}def append(value) {sum = sum + valuereturn sum}
}inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])rse = createReactiveStateEngine(name="reactiveDemo",metrics = [<MyCumSum().append(val)>],dummyTable=inputTable,outputTable=result,keyColumn="sym")data = table(take(`A, 100) as sym, rand(100.0, 100) as val)
rse.append!(data)select * from data
select * from result

进行一次运行,随机生成出来的输入数据和对应输出为:

可以看到,我们编写的 OOP 算子实现了分组的累加求和,与状态引擎内置的 cumsum 算子的功能一致。

3.2 线性递归

在未提供 OOP 时,状态引擎计算线性递归时需要通过内置函数 stateIterate 实现。使用stateIterate 需要指定用于迭代计算的函数,迭代结果和 X 的关联系数,使用的输入数据列,以及初始化窗口的长度,并最终在指定的输出列中输出结果。stateIterate 的计算规则参考文档:stateIterate 。

实现代码如下:

trade = table(take("A", 6) join take("B", 6) as sym,  1..12 as val0,  take(10, 12) as val1)inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])
engine = createReactiveStateEngine(name="rsTest",metrics=<[stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5])]>,dummyTable=inputTable,outputTable=outputTable,keyColumn=["sym"],keepOrder=true)engine.append!(trade)
select * from outputTable

上例中的 stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5]) 基于 stateIterate 的计算规则,实现了线性递归,但仅从这一行代码无法理解其中的计算规则,对用户的使用可能造成困扰。如果通过 OOP 改写此函数的实现逻辑,则代码结构会清晰很多:

trade = table(take("A", 6) join take("B", 6) as sym,  1..12 as val0,  take(10, 12) as val1)inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])class MyIterateOperator {movingWindow :: DOUBLE VECTORk :: INTdef	MyIterateOperator() {k = 0movingWindow = double([])}def append(X, initial) {if (k < 3) {k = k + 1movingWindow = movingWindow join initialreturn double(initial)}result = 0.5 * X + 0.5 * sum(movingWindow)movingWindow = movingWindow[1:] join resultreturn double(result)}
}
engine2 = createReactiveStateEngine(name="rsTest2",metrics=<[MyIterateOperator().append(val0, val1)]>,dummyTable=inputTable,outputTable=outputTable,keyColumn=["sym"],keepOrder=true)engine2.append!(trade)
select * from outputTable

可以看到,当窗口长度小于3时,算子直接返回init中的结果;当窗口长度大于等于3时,算子的append方法将长度为3的窗口中数据的和与X中的值做加权平均;最终实现了和stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5])相同的功能。

虽然通过 OOP 实现线性递归的代码行数有所增加,但它结构清晰,提高了代码的可读性,同时简化了调试过程。

4. 小结与展望

通过本教程的两个应用案例,我们可以看到如何在 DolphinDB 中利用 OOP(面向对象编程)开发状态引擎算子。这种方式相比直接调用引擎提供的算子,具有可读性强、结构清晰的优点。在当前实现中,DolphinDB的 OOP 仍然采用解释执行的方式,相比原生 C++ 实现速度较慢。

目前,在响应式状态引擎中,一些有状态的高阶函数迭代算子和无状态的自定义函数算子已经可以通过即时编译(JIT)技术优化,用脚本编写的自定义函数算子的性能可以达到原生 C++ 实现的水平;但是,目前状态引擎中使用 OOP 编写的有状态的算子还没有支持 JIT。未来,我们计划使用 JIT 技术进一步优化响应式状态引擎中的 OOP 的应用,直接将类中定义的有状态算子编译为机器码运行,以期望实现在当前较高的开发效率下也不会损失运行时的效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/36211.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

聚焦 HW 行动,构筑重保邮件安全防线

随着信息技术的飞速发展&#xff0c;网络安全已成为国家安全的重要组成部分。HW行动作为国家级网络安全演练&#xff0c;通过模拟实战攻防&#xff0c;检验和提升国家关键信息基础设施的防护能力。 CACTER凭借多年HW防护经验&#xff0c;提供全面的邮件安全防护体系&#xff0…

MySQL详细介绍:开源关系数据库管理系统的魅力

学习总结 1、掌握 JAVA入门到进阶知识(持续写作中……&#xff09; 2、学会Oracle数据库入门到入土用法(创作中……&#xff09; 3、手把手教你开发炫酷的vbs脚本制作(完善中……&#xff09; 4、牛逼哄哄的 IDEA编程利器技巧(编写中……&#xff09; 5、面经吐血整理的 面试技…

Flutter 小技巧之为什么推荐 Widget 使用 const

今天收到这个问题&#xff0c;本来想着简单回复下&#xff0c;但是感觉这个话题又可以稍微展开讲讲&#xff0c;干脆就整理成一篇简单的科普&#xff0c;这样也能更方便清晰地回答这个问题。 聊这个问题之前&#xff0c;我们需要把一个“老生常谈”的概念拿出来说&#xff0c;那…

Open3d 点云投影到 xoy yoz 平面最简单的方式(附python 代码)

最简单的方式&#xff0c;就是直接把原有的点云的数据的 z or x 赋值为0, 然后生成一个新的点云。 filename_model1 r"1.pcd"down 10point_cloud o3d.io.read_point_cloud(filename_model1) point_cloud point_cloud.uniform_down_sample(int(down)) print(降采样…

metasfresh开源ERP系统Windows开发环境配置参考

目录 概述 开发环境 配置过程 后端启动 前端启动 登陆系统 其他 概述 Compiere闭源之后衍生出了Admpiere等若干开源的产品&#xff0c;metasfresh就是其中之一&#xff0c;metasfresh截至发稿时在GitHub上已有64000多次的修改提交&#xff0c;而且仍在维护中&#xff0…

GIS避坑指南!工作中ArcGIS常用的40个小技巧

01图斑的边界线太粗而且无法修改 之前有群友遇到这样一个问题&#xff0c;边界线粗到连图斑都看不见&#xff1a; 查看符号系统&#xff0c;很正常&#xff1a; 究其原因&#xff0c;是地图视图比例的问题&#xff0c;正常情况下&#xff0c;地图的视图比例会随着视图范围自动调…

未来20年人工智能将如何塑造社会

照片由Brian McGowan在Unsplash上拍摄 更多资讯&#xff0c;请访问 2img.ai “人工智能会成为我们的救星还是我们的末日&#xff1f;” 几十年来&#xff0c;这个问题一直困扰着哲学家、科学家和科幻爱好者。 当我们踏上技术革命的边缘时&#xff0c;是时候透过水晶球&#x…

我国氮化硼市场规模逐渐扩大 市场集中度有望不断提升

我国氮化硼市场规模逐渐扩大 市场集中度有望不断提升 氮化硼&#xff08;BN&#xff09;俗称为白石墨&#xff0c;是由硼原子和氮原子所构成的一种晶体材料&#xff0c;在常温条件下多表现为一种棕色或暗红色晶体。氮化硼具有导热性好、硬度大、熔点高、抗化学侵蚀性等优点&…

快来看,错过了今天就要设置为vip文章了----openEuler@2024全球发展展望与战略规划

会议主题&#xff1a;openEuler2024全球发展展望与战略规划 OpenEuler2024项目在2024年成功推出了多个长期支持&#xff08;LTS&#xff09;版本&#xff0c;标志着其在智能技术领域的全新篇章&#xff0c;并致力于构建全球性的开源新生态。以下是该项目的主要内容和成就概览&a…

【日记】软考居然一次过了(620 字)

正文 早上空闲的时候&#xff0c;上 QQ 看了一下&#xff0c;许久不见动静的系统架构设计师群有人说出分了。我想高级都出分了&#xff0c;中级应该也出来了&#xff0c;于是用手机查了一下。看到分数几乎快要泪从中来。为什么软考能一次过&#xff0c;银行从业资格证考了两三…

MST霍尔传感器IC-MH251,MH253,GT3144在卷发器方案中的应用

霍尔传感器驱动卷发器应用 卷发器在我们的日常生活中已经成为了不可或缺的一种生活工具&#xff0c;它时刻可以护理我们的头发&#xff0c;保养我们的发质。霍尔传感器驱动卷发器&#xff0c;那么霍尔传感器是如何运用在卷发器中的呢&#xff1f;霍尔传感器在卷发器中的工作原…

快速阅读参考文献:kimi请求出战!

学境思源&#xff0c;一键生成论文初稿&#xff1a; AcademicIdeas - 学境思源AI论文写作 上篇文章&#xff0c;我们为大家演示了“如何使用kimi创建论文中的流程图”。今天继续为大家介绍“使用kimi快速阅读学术参考文献”。 在学术研究的海洋中&#xff0c;文献阅读是一项基…

Windows环境下安装MySQL数据库的步骤

说明&#xff1a; 由于环境的不同&#xff0c;安装过程中可能会遇到各种各样的问题&#xff0c;不用慌&#xff0c;先根据错误提示搜索&#xff0c;多试一下。 安装前&#xff0c;请先认真看一下&#xff0c;有可能会遇到的几个问题&#xff1a; 1、证书链问题&#xff0c;一般…

【投稿优惠|稳定检索】2024年文化传播、交流与考古学国际会议 (CCEA 2024)

2024年文化传播、交流与考古学国际会议 (CCEA 2024) 2024 International Conference on Cultural Communication, Exchange, and Archaeology 【重要信息】 大会地点&#xff1a;西安 官网地址&#xff1a;http://www.icccea.com 投稿邮箱&#xff1a;iccceasub-conf.com 【注…

阿里1688商家数据采集软件

大镜山阿里1688商家数据采集一款采集阿里巴巴1688.com商家数据的软件&#xff0c;采集的数据包括店铺名称、联系人姓名、手机号码等。 一、大镜山阿里1688商家数据采集特色 — 大镜山阿里1688商家数据采集一款采集阿里巴巴1688.com商家数据的软件&#xff0c;采集的数据包括店…

一键系统重装教程:电脑重装系统,5个方法轻松恢复电脑

在日常使用电脑的过程中&#xff0c;难免会遇到系统故障、运行缓慢或者病毒感染等问题&#xff0c;重装系统成为解决这些问题的有效途径。然而&#xff0c;对于许多小伙伴来说&#xff0c;电脑重装系统似乎是一项复杂且耗时的任务。其实&#xff0c;只要掌握了正确的方法&#…

深圳比创达电子|EMC与EMI测试整改:从问题识别到效果验证

在现代电子设备的研发和生产过程中&#xff0c;电磁兼容性&#xff08;EMC&#xff09;和电磁干扰&#xff08;EMI&#xff09;的问题日益凸显。随着技术的不断进步&#xff0c;电子设备的集成度越来越高&#xff0c;工作频率也逐步提升&#xff0c;这使得电磁环境的复杂性不断…

自研一款共享集群数据库,有多难?

共享集群数据库管理系统是一种单库多实例的多活数据库管理系统&#xff0c;用户连接任意实例都可以访问同一个数据库&#xff0c;具备透明多写、高可用、高性能等特性。共享集群技术因其开发难度高&#xff0c;一直被国外垄断&#xff0c;也被称为数据库领域的“塔尖”技术。 2…

不会还有人没有用过git rebase合并分支吧?一文详解git merge与git rebase区别

文章目录 什么是git merge&#xff1f;使用git merge的场景git merge的示例 什么是git rebase&#xff1f;使用git rebase的场景git rebase的示例 git merge与git rebase的区别如何选择git merge和git rebase&#xff1f;结论 &#x1f389;欢迎来到Java学习路线专栏~探索Java中…

vscode的一些使用问题

vscode使用技巧 1、快捷键&#xff08;1&#xff09;打开命令面板&#xff08;2&#xff09;注释&#xff08;3&#xff09;删除行&#xff08;4&#xff09;上下移动光标&#xff08;5&#xff09;光标回退&#xff08;6&#xff09;复制行&#xff08;7&#xff09;插入空白行…