使用Blink SQL+UDAF实现差值聚合计算

本案例根据某电网公司的真实业务需求,通过Blink SQL+UDAF实现实时流上的差值聚合计算,通过本案例,让读者熟悉UDAF编写,并理解UDAF中的方法调用关系和顺序。
感谢@军长在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。

一、客户需求

电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1为电表度数,其他字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。

no(string)data_date(string)cons_id(string)org_no(string)r1(double)
101201907161000000023540113.76
101201907171000000023540114.12
101201907181000000023540116.59
101201907191000000023540118.89

表1:输入数据
电网公司希望通过实时计算(Blink)对电表数据处理后,每天得到每个电表最近两天(当天和前一天)的差值数据,结果类似如下表:

cons_id(string)data_date(string)subDegreeR1(double)
100000002201907170.36
100000002201907182.47
100000002201907192.3

表2:期望的输出数据

二、需求分析

根据客户的需求,比较容易得到两种解决方案:1、通过over窗口(2 rows over window)开窗进行差值聚合;2、通过hop窗口(sliding=1天,size=2天)进行差值聚合。
over窗口和hop窗口均是Blink支持的标准窗口,使用起来非常简单。本需求的最大难点在于差值聚合,Blink支持SUM、MAX、MIN、AVG等内置的聚合函数,但没有满足业务需求的差值聚合函数,因此需要通过自定义聚合函数(UDAF)来实现。

三、UDAF开发

实时计算自定义函数开发搭建环境请参考UDX概述,在此不再赘述。本案例使用Blink2.2.7版本,下面简要描述关键代码的编写。
完整代码(为了方便上传,使用了txt格式):SubtractionUdaf.txt
1、在com.alibaba.blink.sql.udx.SubtractionUdaf包中创建一个继承AggregateFunction类的SubtractionUdaf类。

public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum> 

其中Double是UDAF输出的类型,在本案例中为相邻两天的电表差值度数。SubtractionUdaf.Accum是内部自定义的accumulator数据结构。
2、定义accumulator数据结构,用户保存UDAF的状态。

    public static class Accum {private long currentTime;//最新度数的上报时间private double oldDegree;//前一次度数private double newDegree;//当前最新度数private long num;   //accumulator中已经计算的record数量,主要用于mergeprivate List<Tuple2<Double, Long>> listInput;//缓存所有的输入,主要用于retract}

3、实现createAccumulator方法,初始化UDAF的accumulator

    //初始化udaf的accumulatorpublic SubtractionUdaf.Accum createAccumulator() {SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum();acc.currentTime = 0;acc.oldDegree = 0.0;acc.newDegree = 0.0;acc.num = 0;acc.listInput = new ArrayList<Tuple2<Double, Long>>();return acc;}

4、实现getValue方法,用于通过存放状态的accumulator计算UDAF的结果,本案例需求是计算新旧数据两者的差值。

    public Double getValue(SubtractionUdaf.Accum accumulator) {return accumulator.newDegree - accumulator.oldDegree;}

5、实现accumulate方法,用于根据输入数据更新UDAF存放状态的accumulator。考虑到数据可能乱序以及可能的retract,数据数据包括了对应的度数iValue,还包括上报度数的时间(构造的事件时间ts)。

    public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {System.out.println("method : accumulate" );accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));Collections.sort(accumulator.listInput,this.comparator);//按照时间排序accumulator.num ++;if(accumulator.listInput.size() == 1){accumulator.newDegree = iValue;accumulator.oldDegree = 0.0;accumulator.currentTime = ts;}else {//处理可能存在的数据乱序问题accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.oldDegree = accumulator.listInput.get(1).f0;}}

其中accumulator为UDAF的状态,iValue和ts为实际的输入数据。
注意需要处理可能存在的输入数据乱序问题。
6、实现retract方法,用于在某些优化场景下(如使用over窗口)对retract的数据进行处理。

    public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract的是最新值accumulator.listInput.remove(0);accumulator.num--;if(accumulator.listInput.isEmpty()){accumulator.currentTime = 0;accumulator.oldDegree = 0.0;accumulator.newDegree = 0.0;}else if(accumulator.listInput.size() == 1) {accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.oldDegree = 0.0;}else{accumulator.currentTime = accumulator.listInput.get(0).f1;accumulator.newDegree = accumulator.listInput.get(0).f0;accumulator.oldDegree = accumulator.listInput.get(1).f0;}} else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract的是次新值accumulator.listInput.remove(1);accumulator.num--;if(accumulator.listInput.size() == 1){accumulator.oldDegree = 0.0;}else {accumulator.oldDegree = accumulator.listInput.get(1).f0;}}else {//retract的是其他值accumulator.listInput.remove(Tuple2.of(iValue, ts));accumulator.num--;}}else {throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts);}}

需要考虑retract的是最新的数据还是次新的数据,需要不同的逻辑处理。
7、实现merge方法,用于某些优化场景(如使用hop窗口)。

    public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) {int i = 0;System.out.println("method : merge" );System.out.println("accumulator : "+ accumulator.newDegree);System.out.println("accumulator : "+ accumulator.currentTime);for (SubtractionUdaf.Accum entry : its) {if(accumulator.currentTime < entry.currentTime){if(entry.num > 1){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = entry.oldDegree;accumulator.newDegree = entry.newDegree;accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(entry.num == 1){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = accumulator.newDegree;accumulator.newDegree = entry.newDegree;accumulator.num ++;accumulator.listInput.addAll(entry.listInput);}}else{if(accumulator.num > 1){accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num == 1){accumulator.oldDegree = entry.newDegree;accumulator.num += entry.num;accumulator.listInput.addAll(entry.listInput);}else if(accumulator.num == 0){accumulator.currentTime = entry.currentTime;accumulator.oldDegree = entry.oldDegree;accumulator.newDegree = entry.newDegree;accumulator.num = entry.num;accumulator.listInput.addAll(entry.listInput);}}Collections.sort(accumulator.listInput,this.comparator);System.out.println("merge : "+i);System.out.println("newDegree : "+entry.newDegree);System.out.println("oldDegree = "+entry.oldDegree);System.out.println("currentTime : "+entry.currentTime);}}

需要考虑merge的是否是比当前新的数据,需要不同的处理逻辑。
8、其他方面,考虑到需要对输入度数按照事件时间排序,在open方法中实例化了自定义的Comparator类,对accumulator数据结构中的inputList按事件时间的降序排序。

    public void open(FunctionContext context) throws Exception {//定义record的先后顺序,用于listInput的排序,时间越新的record在list中越前面this.comparator = new Comparator<Tuple2<Double, Long>>() {public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) {if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) {return 1;} else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) {return -1;}else {return 0;}}};}

请参考[使用IntelliJ IDEA开发自定义函数]()完成UDAF编译、打包,并参考UDX概述完成资源的上传和引用。

四、SQL开发及测试结果

(一)over窗口

SQL代码如下,语法检查、上线、启动作业(选择当前启动位点)。并将表1数据上传至datahub。

CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';CREATE TABLE input_dh_e_mp_read_curve (`no`                  VARCHAR,data_date             VARCHAR,cons_id               VARCHAR,org_no                VARCHAR,r1                    DOUBLE,ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss'),WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (type = 'datahub',endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',project = 'jszc_datahub',topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE
)with(type = 'print'
);INSERT into data_out    
SELECTcons_id,last_value(data_date) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
FROM input_dh_e_mp_read_curve

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下(已忽略其他debug日志):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190716输出为13.76,这是因为第一个over窗口只有一条数据导致的,这种数据可以在业务层过滤掉;
(2)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。

(二)hop窗口

SQL代码如下:语法检查、上线、启动作业(选择当前启动位点)。并将表1数据上传至datahub。

CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';CREATE TABLE input_dh_e_mp_read_curve (`no`                  VARCHAR,data_date             VARCHAR,cons_id               VARCHAR,org_no                VARCHAR,r1                    DOUBLE,ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss'),WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (type = 'datahub',endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',project = 'jszc_datahub',topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE
)with(type = 'print'
);
INSERT into data_out    
SELECTcons_id,DATE_FORMAT(HOP_end(ts, INTERVAL '1' day,INTERVAL '2' day), 'yyyyMMdd'),HopWindowSubtractionUdaf(r1,unix_timestamp(ts))
FROM input_dh_e_mp_read_curve
group by hop(ts, INTERVAL '1' day,INTERVAL '2' day),cons_id;

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下(已忽略其他debug日志):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190716输出为13.76,这是因为第一个hop窗口只有一条数据导致的,这种数据可以在业务层过滤掉;
(2)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。

五、几点思考

1、关于UDAF内部方法的调用关系和顺序

UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法,其调用关系和顺序并不是完全确定,而是与Blink底层优化、Blink版本、开窗类型(如hop还是over窗口)等相关。
比较确定的是一次正常(没有failover)的作业,createAccumulator方法只在作业启动时调用一次,accumulate方法在每条数据输入时调用一次,在触发数据输出时会调用一次getValue(并不代表只调用一次)。
而retract方法和merge方法则跟具体的优化方式或开窗类型有关,本案例中over窗口调用retract方法而不调用merge方法,hop窗口调用merge方法而不调用retract方法。
大家可以增加日志,观察这几个方法的调用顺序,还是蛮有意思的。

2、如何知道需要实现UDAF中的哪些方法

UDAF中必须实现createAccumulator、getValue、accumulate方法,可选择实现retract和merge方法。
一般情况下,可先实现createAccumulator、getValue、accumulate三个方法,然后编写SQL后进行语法检查,SQL编译器会提示是否需要retract或merge方法。
比如,如果没有实现retract方法,在使用over窗口时,语法检查会报类似如下错误:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'retract' which is public, not abstract and (in case of table functions) not static.

比如,如果没有实现merge方法,在使用over窗口时,语法检查会报类似如下错误:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.

3、本案例存在优化空间的地方

(1)本案例没有考虑数据缺失的问题,比如因为某种原因(网络问题、数据采集问题等)缺少20190717的数据。这种情况下会是什么样的结果?大家可以自行测试下;
(2)本案例使用了一个List,然后通过Collections.sort方法进行排序,这不是很优的方法,如果用优先级队列(priority queue)性能应该会更好;

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/516141.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VBA - 字典实例集锦

目录 前言1. 按接收日期批量不重复编号2. 用字典实现3个ComboBox关联的三级下拉3. 填表4. 二级字典嵌套-字典统计不重复计次5. 三级字典嵌套-根据机型汇总统计数量6. 二级字典嵌套-汇总调研数据7. 字典法去重7.1 对表格中的一列数据去重7.2 对一维数组去重8. 将字典输出到工作表…

如何在SQL Server 2019中添加数据敏感度分类的命令

作者 | Jordan Sanders翻译 | 火火酱。责编 | 晋兆雨头图 | CSDN付费下载于视觉中国为了确保数据库安全性和完整性&#xff0c;数据库管理员日常需要运行多种操作。因此&#xff0c;无论在何种情况下&#xff0c;我们都不能忽视数据库中敏感数据的重要性。在本文中&#xff0c;…

支撑数千家天猫商家CRM业务,数云高弹性数据库如何做

“数据&#xff0c;已经渗透到当今每一个行业和业务职能领域&#xff0c;成为重要的生产因素。人们对于海量数据的挖掘和运用&#xff0c;预示着新一波生产率增长和消费者盈余浪潮的到来”。-----麦肯锡 基于互联网和大数据和时代背景&#xff0c;用户在互联网上留下更多的印记…

vue中forEach循环的使用

//data为集合 data.forEach(function(item, index) {//item 就是当日按循环到的对象//index是循环的索引&#xff0c;从0开始 })//定义班次详细数组var bcglxiangxiList new Array();//定义班次详细对象var bcxiangxi {};//循环传递的参数bcglXiangXiListParam.forEach(ele…

全国高速恢复收费!阿里云:自由流“3大特色能力”使能智慧之路

5月6日&#xff0c;全国高速公路正式恢复收费。ETC普及优化程度、高速自由流收费模式等成为热点话题。随着取消省界高速公路收费站和全国ETC的普及&#xff0c;极大提升高速公路网通行效率&#xff0c;有效降低物流运输成本&#xff0c;减少收费站“堵车”现象&#xff0c;高速…

JavaScript 中,break , continue , return 的区别

break , continue , return 的区别 break &#xff1a;结束当前的循环体&#xff08;如 for、while&#xff09;continue &#xff1a;跳出本次循环&#xff0c;继续执行下次循环&#xff08;如 for、while&#xff09;return &#xff1a;不仅可以退出循环&#xff0c;还能够…

移动云11.11,钜惠High不停!

一年一度的双十一又来啦全民购物车已开起来&#xff01;然而复杂的优惠规则催生一大批通宵达旦看直播的定金人和尾款人对这些套路&#xff0c;移动云统统Say NO作为一朵有服务温度的“云”&#xff0c;让优惠简单点让用户买想买的东西&#xff0c;省该省的钱五重惊喜&#xff0…

设计大数据量表结构

上篇文章讲解了传统数据库的一些设计注意点。 本篇为第二篇&#xff0c;在大数据量的情况下&#xff0c;如何去提前设计这个表结构&#xff0c;来达到一个比较好的效果。对于团队&#xff0c;对于后续的维护和扩展都带来更大的便利。 自增id 自增id还是可以有&#xff0c;但是…

去除字符串最后一位的几种方法

1.使用slice方法 /*** slice(start,end)* start 要截取的字符串的起始下标 如果为负数从后面开始算起 -1指的是字符串的最后一位* end 要截取的字符串的结尾下标 如果为负数从后面开始算起 -1指的是字符串的最后一位* start 和 end 都是下标*/let str "122889," str…

VBA中的字符串处理

目录 1 VBA中的字符串2 VBA中处理字符串的函数2.1 比较字符串2.1.1 比较运算符2.1.2 StrComp函数2.2 转换字符串2.2.1 StrConv函数2.2.2 Str函数2.2.3 CStr函数2.3 创建字符串2.3.1 Space函数2.3.2 String函数2.4 获取字符串的长度2.5 格式化字符串2.6 查找字符串2.6.1 InStr函…

使用Blink CEP实现差值聚合计算

使用Blink SQLUDAF实现差值聚合计算介绍了如何使用Blink SQLUDAF实现实时流上的差值聚合计算&#xff0c;后来在与付典就业务需求和具体实现方式进行探讨时&#xff0c;付典提出通过CEP实现的思路和方法。 本文介绍通过CEP实现实时流上的差值聚合计算。 感谢付典在实现过程中的…

企业微信小程序_授权登录接口获取用户userid

文章目录一、前置知识1. 阅读 企业微信小程序开发文档2. 企业微信小程序登录流程3. 微信小程序区别二、前端部分2.1. 调用登录接口2.2. 请求后端接口2.3. 项目源码三、后端部分3.1. yml配置3.2. 获取用户信息接口3.3. 获取token3.4. 工具类3.5. vo对象四、调试部分4.1. 模式切换…

我输给了一个 25 岁的男人

未来的你&#xff1a;小伙计你好&#xff0c;我是 10 年后的你&#xff0c;刚穿越回来&#xff0c;还是热乎的。现在的你&#xff1a;Are you sure?大哥&#xff0c;从你那憔悴的神色里可以看出日夜颠倒的作息和毫无爱情发酵的灵魂&#xff0c;随便喊个人来看看&#xff0c;咱…

阿里云公共DNS安全传输服务介绍(DoH/DoT)

概述 阿里公共DNS致力于为广大的互联网用户提供快速、稳定和安全的DNS解析。然而传统的DNS查询和应答采用UDP和TCP明文传输&#xff0c;存在网络监听、DNS劫持、中间设备干扰的风险&#xff1a; 网络监听风险&#xff1a;即便用户采用HTTPs加密的方式访问站点&#xff0c;DNS…

Excel VBA - 文件及目录操作

目录 一. 文件处理1.1 Name 语句1.2 FileCopy 语句1.3 Kill 语句1.4 GetAttr 函数1.5 SetAttr 语句1.6 FileLen 函数1.7 FileDateTime 函数二. 目录处理2.1 CurDir 函数2.1 ChDir 语句2.3 ChDrive 语句2.4 Dir 函数2.5 MkDir 语句2.6 RmDir 语句三. 文件读写3.1 Open 语句3.2 C…

企业微信小程序_集成微信小程序插件_地图选点插件

官网文档&#xff1a; https://lbs.qq.com/miniProgram/plugin/pluginGuide/locationPicker 具体操作参考官网文档即可&#xff0c;讲的很详细

一文教你如何在生产环境中在Kubernetes上部署Jaeger

作者 | Dotan Horovit翻译 | 火火酱~责编 | 晋兆雨出品 | CSDN云计算日志、指标和跟踪是“可观察性”领域的三大支柱。最近几个月&#xff0c;随着OpenTelemetry标准化以及Jaeger开源项目从CNCF孵化项目中顺利毕业&#xff0c;分布式跟踪领域出现了很多创新。根据DevOps Pulse…

一篇讲透如何理解数据库并发控制

01数据库并发控制的作用 1.1 事务的概念 在介绍并发控制前&#xff0c;首先需要了解事务。数据库提供了增删改查等几种基础操作&#xff0c;用户可以灵活地组合这几种操作&#xff0c;实现复杂的语义。在很多场景下&#xff0c;用户希望一组操作可以做为一个整体一起生效&…

工作簿长时间空闲时自动关闭

目录 1. 对关闭时间和关闭工作薄进行设置2. 利用 OnTime 方法对执行程序进行设置3. 对工作薄的变化进行设置1. 对关闭时间和关闭工作薄进行设置 在新建模块中声明一个时间作为全局的变量,该变量将是每次工作薄发生改变后延续的时间,另外再设置一个工作薄关闭的过程。 代码如…

打钱!我的数据库被黑客勒索了!

来源 | 小白学黑客责编 | 晋兆雨头图 | 付费下载于视觉中国数据库失陷昨天晚上&#xff0c;读者群里一位小伙伴发消息说自己的数据库被黑了&#xff0c;搞安全的我自然是立刻来了兴趣&#xff0c;加班加点开始分析起来&#xff0c;不知道的还以为我要熬夜等剁手节呢。这位小伙伴…