利用blink CEP实现流计算中的超时统计问题

案例与解决方案汇总页:
阿里云实时计算产品案例&解决方案汇总

一. 背景介绍

如<利用blink+MQ实现流计算中的延时统计问题>一文中所描述的场景,我们将其简化为以下案例:
实时流的数据源结构如下:

物流订单号支付时间仓接单时间仓出库时间
LP12018-08-01 08:00  
LP12018-08-01 08:002018-08-01 09:00 
LP22018-08-01 09:10 
LP22018-08-01 09:102018-08-01 09:50 
LP22018-08-01 09:102018-08-01 09:50​2018-08-01 12:00

我们期望通过以上数据源,按照支付日期统计,每个仓库的仓接单量、仓出库量、仓接单超2H未出库单量、仓接单超6H未出库单量。可以看出,其中LP1仓接单时间是2018-08-01 09:00,但一直到2018-08-01 12:00点之前,一直都没有出库,LP1满足仓接单超2H未出库的行为。

该场景的难点就在于:订单未出库。而对于TT中的源消息流,订单未出库,TT就不会下发新的消息,不下发新的消息,blink就无法被触发计算。而针对上述的场景,对于LP1,我们需要在仓接单时间是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已经仓接单但超2H未出库了。

二. 解决方案

本文主要是利用blink CEP来实现上述场景,具体实现步骤如下所述。
第一步:在source DDL中定义event_timestamp,并定义sink,如下:

----定义source
create table sourcett_dwd_ri
(lg_order_code                  varchar comment '物流订单号',ded_pay_time                   varchar comment '支付时间',store_code                     varchar comment '仓库编码',store_name                     varchar comment '仓库名称',wms_create_time                varchar comment '仓接单时间',wms_consign_create_time        varchar comment '仓出库时间',evtstamp as case when coalesce(wms_create_time, '') <> ''then to_timestamp(wms_create_time, 'yyyy-MM-dd HH:mm:ss')else to_timestamp('1970-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss')end   --构造event_timestamp,如果源表本身带有消息的occur_time,可直接选择occur_time作为event_timestamp,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000)  --设置延迟10秒处理
)
with
(type='tt',topic='dwd_ri',accessKey='xxxxxx',accessId='xxxxxx',lengthCheck='PAD',nullValues='\\N|'
);----定义sink
create table sink_hybrid_blink_cep
(ded_pay_date                   varchar comment '支付日期',store_code                     varchar comment '仓库编码',store_name                     varchar comment '仓库名称',wms_create_ord_cnt             bigint  comment '仓接单量',wms_confirm_ord_cnt            bigint  comment '仓出库量',wmsin_nowmsout_2h_ord_cnt      bigint  comment '仓接单超2小时未出库单量',wmsin_nowmsout_6h_ord_cnt      bigint  comment '仓接单超6小时未出库单量'    ,sub_partition                  bigint  comment '二级分区(支付日期)',PRIMARY KEY (ded_pay_date, store_code, sub_partition)
)
with
(type='PetaData',url = 'xxxxxx',tableName='blink_cep',userName='xxxxxx',password='xxxxxx',bufferSize='30000',batchSize='3000',batchWriteTimeoutMs='15000'
);

第二步:根据blink CEP的标准语义进行改写,如下:

create view blink_cep_v1
as
select   '仓接单-仓出库超时' as timeout_type,lg_order_code,wms_create_time as start_time,wms_consign_create_time as end_time
from     source_dwd_csn_whc_lgt_fl_ord_ri
MATCH_RECOGNIZE
(PARTITION BY lg_order_codeORDER BY     evtstampMEASURESe1.wms_create_time         as wms_create_time,e2.wms_consign_create_time as wms_consign_create_timeONE ROW PER MATCH WITH TIMEOUT ROWS  --重要,必须设置延迟也下发AFTER MATCH SKIP TO NEXT ROWPATTERN (e1 -> e2) WITHIN INTERVAL '6' HOUREMIT TIMEOUT (INTERVAL '2' HOUR, INTERVAL '6' HOUR)DEFINEe1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null
)
where    wms_create_time is not null      --重要,可以大大减少进入CEP的消息量
and      wms_consign_create_time is null  --重要,可以大大减少进入CEP的消息量
;

第三步:根据blink的执行机制,我们通过源实时流sourcett_dwd_ri与超时消息流blink_cep_v1关联,来触发blink对超时消息进行聚合操作,如下:

create view blink_cep_v2
as
select   a.lg_order_code                       as lg_order_code,last_value(a.store_code             ) as store_code,last_value(a.store_name             ) as store_name,last_value(a.ded_pay_time           ) as ded_pay_time,last_value(a.wms_create_time        ) as wms_create_time,last_value(a.real_wms_confirm_time  ) as real_wms_confirm_time,last_value(case when coalesce(a.wms_create_time, '') <> ''and  coalesce(a.real_wms_confirm_time, '') = '' and  now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 7200then 'Y' else 'N' end) as flag_01,last_value(case when coalesce(a.wms_create_time, '') <> ''and  coalesce(a.real_wms_confirm_time, '') = '' and  now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 21600then 'Y' else 'N' end) as flag_02
from(select   lg_order_code                       as lg_order_code,last_value(store_code             ) as store_code,last_value(store_name             ) as store_name,last_value(ded_pay_time           ) as ded_pay_time,last_value(wms_create_time        ) as wms_create_time,last_value(wms_consign_create_time) as real_wms_confirm_timefrom     sourcett_dwd_rigroup by lg_order_code) a
left outer join(select   lg_order_code,count(*) as cntfrom     blink_cep_v1group by lg_order_code) b
on       a.lg_order_code = b.lg_order_code
group by a.lg_order_code
;insert into sink_hybrid_blink_cep
select   regexp_replace(substring(a.ded_pay_time, 1, 10), '-', '') as ded_pay_date,a.store_code,max(a.store_name)        as store_name,count(case when coalesce(a.wms_create_time, '') <> '' then a.lg_order_code end) as wmsin_ord_cnt,count(case when coalesce(a.real_wms_confirm_time, '') <> '' then a.lg_order_code end) as wmsout_ord_cnt,count(case when a.flag_01 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt,count(case when a.flag_02 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), '-', '') as bigint) as sub_partition
from     blink_cep_v2 as t1
where    coalesce(lg_cancel_time, '') = ''
and      coalesce(ded_pay_time, '') <> ''
group by regexp_replace(substring(ded_pay_time, 1, 10), '-', ''),a.store_code
;

三. 问题拓展

  1. blink CEP的参数比较多,要完全看懂,着实需要一些时间,但CEP的强大是毋庸置疑的。CEP不仅可以解决物流场景中的超时统计问题,风控中的很多场景也是信手拈来。这里有一个风控中的场景,通过上述物流案例的用法,我们是否能推敲出这个场景的用法呢?
    风控案例测试数据如下:
刷卡时间银行卡ID刷卡地点
2018-04-13 12:00:001WW
2018-04-13 12:05:001WW1
2018-04-13 12:10:001WW2
2018-04-13 12:20:001WW

我们认为,当一张银行卡在10min之内,在不同的地点被刷卡大于等于两次,我们就期望对消费者出发预警机制。

  1. blink CEP是万能的么?答案是否定的,当消息乱序程度比较高的时候,实时性和准确性就成了一对矛盾的存在。要想实时性比较高,必然要求设置的offset越小越好,但offset设置比较小,就直接可能导致很多eventtime<watermark-offset的消息,直接被丢弃,准确性很难保证。比如,在CP回传物流详情的时候,经常回传的时间跟实操的时间差异很大(实操时间是10点,但回传时间是15点),如果以实操时间作为eventtime,可能就会导致这种差异很大的消息被直接丢掉,无法进入CEP,进而无法触发CEP后续的计算,在使用CEP的过程中,应该注意这一点。

四. 作者简介

花名:缘桥,来自菜鸟-CTO-数据部-仓配数据研发,主要负责菜鸟仓配业务的离线和实时数据仓库建设以及创新数据技术和工具的探索和应用。

 

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PPT素材网

PPT素材推荐 官网&#xff1a;http://www.1ppt.com/ 背景色采用这个&#xff0c;模板才用这个 简洁微立体创业融资计划书PPT模板免费下载 http://www.1ppt.com/article/33315.html

云+X案例展 | 民生类:中国电信天翼云携手国家天文台打造“大国重器”

本案例由天翼云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。FAST是由中国科学院国家…

阿里云TSDB在大数据集群监控中的方案与实战

目前大部分的互联网企业基本上都有搭建自己的大数据集群&#xff0c;为了能更好让我们的大数据集群更加高效安全的工作&#xff0c;一个优秀的监控方案是必不可少的&#xff1b;所以今天给大家带来的这篇文章就是讲阿里云TSDB在上海某大型互联网企业中的大数据集群监控方案中的…

linux上java解加密(AES/CBC)异常:java.lang.SecurityException: JCE cannot authenticate the provider BC办法

对接第三方厂商需求时&#xff0c;需要对数据AES256进行解密&#xff0c;由于java本身不支持&#xff0c;需要添加依赖。 文章目录一、版本适配1. 版本对应关系2. maven仓库地址3. maven坐标二、linux jdk策略下载2.1. JDK6 jce2.2. JDK7 jce2.3. JDK8 jce三、linux jdk策略配置…

云+X案例展 | 民生类:易趋云全面提升三德科技管理效能

本案例由深圳蓝云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。湖南三德科技股份有限…

redis 公网 安全_redis漏洞复现

一、漏洞简介什么是redisredis是一个key-value存储系统。和Memcached类似&#xff0c;它支持存储的value类型相对更多&#xff0c;包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash&#xff08;哈希类型&#xff09;。这些数据类型都支持push/po…

实时计算的最佳实践:基于表格存储和Blink的大数据实时计算

表格存储: 数据存储和数据消费All in one 表格存储&#xff08;Table Store&#xff09;是阿里云自研的NoSQL多模型数据库&#xff0c;提供PB级结构化数据存储、千万TPS以及毫秒级延迟的服务能力。在实时计算场景里&#xff0c;表格存储强大的写入能力和多模型的存储形态&…

关于JDK8采坑JCE加密限制版本问题

文章目录一、解决方案11. 调研2. 常见的异常3. 安全性机制导致的访问https会报错4. 解决方案5. 操作流程6. 移动jar配置策略二、解决方案22.1. 声明2.2. 编辑策略文件2.3. 修改默认属性一、解决方案1 声明&#xff1a;jdk1.8已经经过线上环境使用 1. 调研 JDK8的加密策略存在…

速围观!云+X 案例评选榜单重磅出炉!

2019年11月&#xff0c;CSDN云计算强势开启“云X”案例征集活动&#xff0c;从先进性、拓展性、效益性等三个基本方向出发&#xff0c;深入展现云技术作用行业的突出优势。时隔2个月&#xff0c;通过广泛征集等方式&#xff0c;经过层层筛选&#xff0c;深入挖掘出跨行业、跨生…

uvm 形式验证_UVM基础

uvm_component与uvm_object1.几乎所有的类都派生于uvm_object&#xff0c;包括uvm_component。uvm_component有两大特性是uvm_object所没有的&#xff1a;一是通过在new的时候指定parent参数来形成一种树形的组织结构&#xff1b;二是有phase的自动执行特点。下图是常用的UVM继…

Table Store: 海量结构化数据实时备份实战

Table Store: 海量结构化数据实时备份实战 数据备份简介 在信息技术与数据管理领域&#xff0c;备份是指将文件系统或数据库系统中的数据加以复制&#xff0c;一旦发生灾难或者错误操作时&#xff0c;得以方便而及时地恢复系统的有效数据和正常运作。在实际备份过程中&#xf…

云+X案例展 | 电商零售类:云徙助力良品铺子「双11」

本案例由云徙投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。助力品牌制胜双十一的背后…

同样做前端,为何差距越来越大?

阿里妹导读&#xff1a;前端应用越来越复杂&#xff0c;技术框架不断变化&#xff0c;如何成为一位优秀的前端工程师&#xff0c;应对更大的挑战&#xff1f;今天&#xff0c;阿里前端技术专家会影结合实际工作经验&#xff0c;沉淀了五项重要方法&#xff0c;希望能对你的职业…

云+X案例展 | 民生类:必创科技助力打造智慧城市

本案例由必创科技投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。每当夜幕降临&#xf…

刚刚,阿里宣布开源Flutter应用框架Fish Redux!

3月5日&#xff0c;闲鱼宣布在GitHub上开源Fish Redux&#xff0c;Fish Redux是一个基于 Redux 数据管理的组装式 flutter 应用框架&#xff0c; 特别适用于构建中大型的复杂应用&#xff0c;它最显著的特征是 函数式的编程模型、可预测的状态管理、可插拔的组件体系、最佳的性…

谈谈结构体部分成员排序(重载的利用/sort)

涉及知识点&#xff1a; 1.重载运算符的知识 2.sort函数的使用 3.高精度排序 sort函数的用法&#xff1f; 通过面向百度GOOGLE编程的我&#xff0c;得知&#xff0c;sort的用法 sort函数详解&#xff08;史上最完整QAQ&#xff09; - AlvinZH - 博客园 对于数组而言&…

qt最大化和还原实现_研究进展 | 水生所关于细菌异化型硝酸盐还原成铵与反硝化脱氮两种途径抉择的分子调控机制研究取得进展...

在无氧和缺氧条件下&#xff0c;许多细菌可利用硝酸根和亚硝酸根作为电子受体进行无氧呼吸&#xff0c;包括异化型硝酸盐还原成铵(dissimilatory nitrate reduction (DNR) to ammonia&#xff0c;DNRA)和反硝化脱氮(denitrification)两种相互竞争的DNR途径&#xff0c;在氮元素…

中国科学院院士徐宗本:人工智能的基石是数学

来源&#xff1a;科学网 “人工智能的基石是数学&#xff0c;没有数学基础科学的支持&#xff0c;人工智能很难行稳致远。” 近日&#xff0c;由联合国教科文组织和中国工程院联合主办的联合国教科文组织国际工程科技知识中心2019国际高端研讨会上&#xff0c;中国科学院院士、…

MySQL运维实战 之 PHP访问MySQL你使用对了吗

大家都知道&#xff0c;slow query系统做的好不好&#xff0c;直接决定了解决slow query的效率问题 一个数据库管理平台&#xff0c;拥有一个好的slow query系统&#xff0c;基本上就拥有了解锁性能问题的钥匙 但是今天主要分享的并不是平台&#xff0c;而是在平台中看到的奇…

通过阿里云K8S Ingress Controller实现路由配置的动态更新

简介 在Kubernetes集群中&#xff0c;Ingress作为集群内服务对外暴露的访问接入点&#xff0c;其几乎承载着集群内服务访问的所有流量。我们知道&#xff0c;Nginx Ingress Controller是Kubernetes社区很重要的一个子项目&#xff0c;其内部主要依托于高性能的负载均衡软件Ngi…