利用blink+MQ实现流计算中的超时统计问题

案例与解决方案汇总页:
阿里云实时计算产品案例&解决方案汇总

一. 背景介绍

菜鸟的物流数据本身就有链路复杂、实操节点多、汇总维度多、考核逻辑复杂的特点,对于实时数据的计算存在很大挑战。经过仓配ETL团队的努力,目前仓配实时数据已覆盖了绝大多数场景,但是有这样一类特殊指标:“晚点超时指标”(例如:出库超6小时未揽收的订单量),仍存在实时汇总计算困难。原因在于:流计算是基于消息触发计算的,若没有消息到达到则无法计算,这类指标恰好是要求在指定的超时时间计算出有多少未达到的消息。然而,这类指标对于指导实操有着重要意义,可以告知运营小二当前多少订单积压在哪些作业节点,应该督促哪些实操人员加快作业,这对于物流的时效KPI达成至关重要。

之前的方案是:由产品前端根据用户的请求查询OLAP数据库,由OLAP从明细表出结果。大促期间,用户请求量大,加之数据量大,故对OLAP的明细查询造成了比较大的压力。

二. 解决方案

2.1   问题定义

“超时晚点指标” 是指,一笔订单的两个相邻的实操节点node_n-1 、node_n 的完成时间 time_n-1、time_n,
当满足 : time_n is null  && current_time - time_n-1 > kpi_length 时,time_flag_n 为 true , 该笔订单计入 超时晚点指标的计数。
如下图,有一笔订单其 node_1 为出库节点,时间为time_1 = '2018-06-18 00:00:00' ,运营对出库与揽收之间考核的时长 kpi_length = 6h, 那么当前自然时间 current_time > '2018-06-18 06:00:00' 时,且node_2揽收节点的time_2 为null,则该笔订单的 timeout_flag_2 = true , “出库超6小时未揽收订单量” 加1。由于要求time_2 为null,即要求没有揽收消息下发的情况下让流计算做汇总值更新,这违背了流计算基于消息触发的基本原理,故流计算无法直接算出这种“超时晚点指标”。

决问题的基本思路是:在考核时刻(即 kpi_time = time_n-1+kpi_length )“制造”出一条消息下发给流计算,触发汇总计算。继续上面的例子:在考核时刻“2018-06-18 06:00:00”利用MetaQ定时消息功能“制造”出一条消息下发给流计算汇总任务,触发对该笔订单的 time_out_flag_2 的判断,增加汇总计数。同时,还利用 Blink 的Retraction 机制,当time_2 由null变成有值的时候,Blink 可以对 time_out_flag_2 更新,重新计数。

2.2 方案架构

如上图所示:
Step1:  Blink job1 接收来自上游系统的订单数据,做清洗加工,生成订单明细表:dwd_ord_ri,利用TT下发给Blink job2 和 Blink job3。
Step2:Blink job2 收到 dwd_ord_ri后,对每笔订单算出考核时刻 kpi_time = time_n-1+kpi_length,作为MetaQ消息的“TIMER_DELIVER_MS” 属性,写入MetaQ。MetaQ的定时消息功能,可以根据用户写入的TIMER_DELIVER_MS 在指定时刻下发给消费者,即上图中的Blink job3。
Step3:Blink job3 接收 TT、MetaQ 两个消息源,先做Join,再对time_flag判断,最后做Aggregate计算。同一笔订单,dwd_ord_ri、timing_msg任意一个消息到来,都会触发join,time_flag判断,aggregate重新计算一遍,Blink的Retraction可对结果进行实时更新。

2.3 实现细节

本方案根据物流场景中多种实操节点、多种考核时长的特点,从Blink SQL代码 和 自定义Sink两方面做了特殊设计,从而实现了灵活配置、高效开发。
(1) Blink job2 --- 生成定时消息
关键Blink SQL 代码如下。约定每条record的第一个字段为投递时间列表,即MetaQ向消费者下发消息的时刻List,也就是上面所说的多个考核时刻。第二个字段为保序字段,比如在物流场景中经常以订单code、运单号作为保序主键。该代码实现了对每个出库的物流订单,根据其出库时间,向后延迟6小时(21600000毫秒)、12小时(43200000毫秒)、24小时(86400000毫秒)由MetaQ向消费者下发三个定时消息。

create table metaq_timing_msg
(
deliver_time_list       varchar comment '投递时间列表', -- 约定第一个字段为投递时间list
lg_code           varchar comment '物流订单code', -- 约定第二字段为保序主键
node_name               varchar comment '节点名称',
node_time               varchar comment '节点时间',
)
WITH
(
type = 'custom',
class = 'com.alibaba.xxx.xxx.udf.MetaQTimingMsgSink',
tag = 'store',
topic = 'blink_metaq_delay_msg_test',
producergroup = 'blinktest',
retrytimes = '5',
sleeptime = '1000'
);
insert into metaq_timing_msg
select
concat_ws(',',
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 21600000) as varchar),  --6小时
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 43200000) as varchar),  --12小时
cast( (UNIX_TIMESTAMP(store_out_time)*1000 + 86400000) as varchar)   --24小时
)   as deliver_time_list,
lg_code,
'wms'  as node_name,
store_out_time  as node_time
from
(
select
lg_code,
FIRST_VALUE(store_out_time) as store_out_time
from srctable
group by lg_code
)b
where store_out_time is not null  ;

(2) Blink 自定义Sink --- MetaQTimingMsg Sink
Blink的当前版本还不支持 MetaQ的定时消息功能的Sink,故利用 Blink的自定义Sink功能,并结合菜鸟物流数据的特点开发了MetaQTimingMsg Sink。关键代码如下(实现 writeAddRecord 方法)。

@Override
public void writeAddRecord(Row row) throws IOException {
Object deliverTime = row.getField(0);
String[] deliverTimeList = deliverTime.toString().split(",");
for(String dTime:deliverTimeList){String orderCode = row.getField(1).toString();String key = orderCode + "_" + dTime;Message message = newMessage(row, dTime, key);boolean result = sendMessage(message,orderCode);if(!result){LOG.error(orderCode + " : " + dTime + " send failed");}}
}private  Message newMessage(Row row,String deliverMillisec,String key){//Support Varbinary Type Insert Into MetaQMessage message = new Message();message.setKeys(key);message.putUserProperty("TIMER_DELIVER_MS",deliverMillisec);int arity = row.getArity();Object[] values = new Object[arity];for(int i=0;i<arity;i++){values[i]=row.getField(i);}String lineStr=org.apache.commons.lang3.StringUtils.join(values, FIELD_DELIMITER);try {byte[] bytes = lineStr.getBytes(ENCODING);message.setBody(bytes);message.setWaitStoreMsgOK(true);} catch (UnsupportedEncodingException e) {LOG.error("create new message error",e);}return message;
}private boolean sendMessage(Message message,String orderCode){long retryTime = 0;boolean isSendSuccess = true;if(message != null){message.setTopic(topicName);message.setTags(tagName);}SendResult result = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {.... // 针对物流订单code的hash算法return list.get(index.intValue());}                    },orderCode);if(!result.getSendStatus().equals(SendStatus.SEND_OK)){LOG.error("" + orderCode +" write to metaq result is " + result.getSendStatus().toString());isSendSuccess = false;}return  isSendSuccess;
}
}

(3)Blink job3 --- 汇总计算
关键Blink SQL 代码如下,统计了每个仓库的“出库超6小时未揽收物理订单”、“出库超12小时未揽收物理订单”、“出库超24小时未揽收物理订单”的汇总值。代码中使用了“stringLast()”函数处理来自dwd_ord_ri的每条消息,以取得每个物流订单的最新出库揽收情况,利用Blink Retraction机制,更新汇总值。

create view dws_store_view as  
select t1.store_code,max(t1.store_name)       as store_name,count(case when length(trim(t1.store_out_time)) = 19and t1.tms_collect_time is null and NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 21600then t2.lg_code end ) as tms_not_collect_6h_ord_cnt, ---出库超6小时未揽收物流订单量count(case when length(trim(t1.store_out_time)) = 19and t1.tms_collect_time is null and  NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 43200then t2.lg_code end ) as tms_not_collect_12h_ord_cnt,---出库超6小时未揽收物流订单量count(case when length(trim(t1.store_out_time)) = 19and t1.tms_collect_time is null and  NOW()-UNIX_TIMESTAMP(t1.store_out_time,'yyyy-MM-dd HH:mm:ss') >= 86400then t2.lg_code end ) as tms_not_collect_24h_ord_cnt ---出库超6小时未揽收物流订单量
from 
(select lg_code,coalesce(store_code,'-1')   as store_code,store_name,store_out_time,tms_collect_timefrom (select lg_code,max(store_code)          as store_code,max(store_name)          as store_name,stringLast(store_out_time)  as store_out_time,stringLast(tms_collect_time)as tms_collect_time, from dwd_ord_rigroup by lg_code) a     
) t1
left outer join 
(select lg_code,from timing_msg  where node_name = 'wms'group by lg_code
) t2
on t1.lg_code = t2.lg_code
group byt1.store_code;

三.  方案优势

3.1 配置灵活

我们从“Blink SQL 代码” 和“自定义MetaQ” 两个方面设计,用户可以根据具体的业务场景,在Blink SQL的一个view里就能实现多种节点多种考核时间的定时消息生成,而不是针对每一个实操节点的每一种定时指标都要写一个view,这样大大节省了代码量,提升了开发效率。例如对于仓库节点的出库超6小时未揽收、超12小时未揽收、超24小时未揽收,这三个指标利用上述方案,仅需在Blink job2的中metaq_timing_msg的第一个字段deliver_time_list中拼接三个kpi_length,即6小时、12小时、24小时为一个字符串即可,由MetaQTimingMsg Sink自动拆分成三条消息下发给MetaQ。对于不同的节点的考核,仅需在node_name,node_time填写不同的节点名称和节点实操时间即可。

3.2 主键保序

如2.3节所述,自定义的Sink中 实现了MetaQ的 MessageQueueSelector 接口的 select() 方法,同时在Blink SQL 生成的MetaQ消息默认第二个字段为保序主键字段。从而,可以根据用户自定义的主键,保证同一主键的所有消息放在同一个通道内处理,从而保证按主键保序,这对于流计算非常关键,能够实现数据的实时准确性。

3.3 性能优良

让专业的团队做专业的事。个人认为,这种大规模的消息存储、消息下发的任务本就应该交给“消息中间件”来处理,这样既可以做到计算与消息存储分离,也可以方便消息的管理,比如针对不同的实操节点,我们还可以定义不同的MetaQ的tag。
另外,正如2.2节所述,我们对定时消息量做了优化。考虑到一笔订单的属性字段或其他节点更新会下发多条消息,我们利用了Blink的FIRST_VALUE函数,在Blink job2中同一笔订单的的一种考核指标只下发一条定时消息,大大减少了消息量,减轻了Blink的写压力,和MetaQ的存储。

四. 自我介绍

马汶园    阿里巴巴 -菜鸟网络—数据部      数据工程师
菜鸟仓配实时研发核心成员,主导多次仓配大促实时数据研发,对利用Blink的原理与特性解决物流场景问题有深入思考与理解。

 

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Schema整合Quartz_01

文章目录一、实现思路二、第一种实现方式2.1. 新建web项目2.2. 导入依赖2.3. 创建一个job类2.4. 创建配置文件2.5. 配置web.xml2.6.运行web服务&#xff0c;观察Quartz定时任务三、第二种实现方式3.1. 创建job类3.2. 修改spring-config.xml3.3. 运行web服务&#xff0c;观察Qua…

使用Grab的实验平台进行混沌实验编排

背景 对每个用户来说&#xff0c;Grab是一个可以叫车&#xff0c;叫外卖或付款的一个APP。对工程师来说&#xff0c;Grab是一个有许多服务并通过RPC交互的分布式系统&#xff0c;有时也可以叫做微服务架构。在数千台服务器上运行的数百个服务每天都有工程师在上面进行变更。每…

c++ 麦克风 录音 wav_小米有品上线新品,手机麦克风得到史诗级加强

手机里面是自带录音功能的&#xff0c;所以很多朋友都喜欢用手机来记录会议等内容&#xff0c;但是手机自带的麦克风用来录音并不合适&#xff0c;要么声音小&#xff0c;要么录下来的都是杂音&#xff0c;难道非要买一个专用的录音笔或者麦克风吗&#xff1f;其实没有那个必要…

云+X案例展 | 民生类: “中企通信 × TutorABC”共创全球数字教育科技新里程

本案例由中企通信投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。TutorABC荣获IDC 【讯…

ESB接口调用异常汇总

文章目录一、ESB接口前置知识1. ESB接口简述2. 生成的代码组成部分二、常见的异常汇总2.1. 场景1&#xff1a;不能解析某域名2.2. 场景2&#xff1a;调用服务连接超时三、调用服务前异常3.1. 异常描述3.2. CXF相关的jar和jdk的rt.jar中冲突3.3. 异常现象3.4. 异常日志3.5. 异常…

基于实时计算(Flink)与高斯模型构建实时异常检测系统

案例与解决方案汇总页&#xff1a;阿里云实时计算产品案例&解决方案汇总 1. 概述 异常检测&#xff08;anomaly detection&#xff09;指的是对不符合预期模式或数据集&#xff08;英语&#xff1a;dataset&#xff09;中其他项目的项目、事件或观测值的识别。实际应用包括…

hive 中某个字段等于0_快速了解hive

作者丨HappyMint文章选摘&#xff1a;大数据与人工智能这是作者的第7篇文章本文主要针对从事大数据分析和架构相关工作&#xff0c;需要与hive打交道但目前对hive还没有进行深层次了解的小伙伴&#xff0c;希望本文会让你对hive有一个快速的了解。内容主要包括什么是hive、为什…

利用blink CEP实现流计算中的超时统计问题

案例与解决方案汇总页&#xff1a;阿里云实时计算产品案例&解决方案汇总 一. 背景介绍 如<利用blinkMQ实现流计算中的延时统计问题>一文中所描述的场景&#xff0c;我们将其简化为以下案例&#xff1a; 实时流的数据源结构如下&#xff1a; 物流订单号支付时间仓接…

PPT素材网

PPT素材推荐 官网&#xff1a;http://www.1ppt.com/ 背景色采用这个&#xff0c;模板才用这个 简洁微立体创业融资计划书PPT模板免费下载 http://www.1ppt.com/article/33315.html

云+X案例展 | 民生类:中国电信天翼云携手国家天文台打造“大国重器”

本案例由天翼云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。FAST是由中国科学院国家…

阿里云TSDB在大数据集群监控中的方案与实战

目前大部分的互联网企业基本上都有搭建自己的大数据集群&#xff0c;为了能更好让我们的大数据集群更加高效安全的工作&#xff0c;一个优秀的监控方案是必不可少的&#xff1b;所以今天给大家带来的这篇文章就是讲阿里云TSDB在上海某大型互联网企业中的大数据集群监控方案中的…

linux上java解加密(AES/CBC)异常:java.lang.SecurityException: JCE cannot authenticate the provider BC办法

对接第三方厂商需求时&#xff0c;需要对数据AES256进行解密&#xff0c;由于java本身不支持&#xff0c;需要添加依赖。 文章目录一、版本适配1. 版本对应关系2. maven仓库地址3. maven坐标二、linux jdk策略下载2.1. JDK6 jce2.2. JDK7 jce2.3. JDK8 jce三、linux jdk策略配置…

云+X案例展 | 民生类:易趋云全面提升三德科技管理效能

本案例由深圳蓝云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。湖南三德科技股份有限…

redis 公网 安全_redis漏洞复现

一、漏洞简介什么是redisredis是一个key-value存储系统。和Memcached类似&#xff0c;它支持存储的value类型相对更多&#xff0c;包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash&#xff08;哈希类型&#xff09;。这些数据类型都支持push/po…

实时计算的最佳实践:基于表格存储和Blink的大数据实时计算

表格存储: 数据存储和数据消费All in one 表格存储&#xff08;Table Store&#xff09;是阿里云自研的NoSQL多模型数据库&#xff0c;提供PB级结构化数据存储、千万TPS以及毫秒级延迟的服务能力。在实时计算场景里&#xff0c;表格存储强大的写入能力和多模型的存储形态&…

关于JDK8采坑JCE加密限制版本问题

文章目录一、解决方案11. 调研2. 常见的异常3. 安全性机制导致的访问https会报错4. 解决方案5. 操作流程6. 移动jar配置策略二、解决方案22.1. 声明2.2. 编辑策略文件2.3. 修改默认属性一、解决方案1 声明&#xff1a;jdk1.8已经经过线上环境使用 1. 调研 JDK8的加密策略存在…

速围观!云+X 案例评选榜单重磅出炉!

2019年11月&#xff0c;CSDN云计算强势开启“云X”案例征集活动&#xff0c;从先进性、拓展性、效益性等三个基本方向出发&#xff0c;深入展现云技术作用行业的突出优势。时隔2个月&#xff0c;通过广泛征集等方式&#xff0c;经过层层筛选&#xff0c;深入挖掘出跨行业、跨生…

uvm 形式验证_UVM基础

uvm_component与uvm_object1.几乎所有的类都派生于uvm_object&#xff0c;包括uvm_component。uvm_component有两大特性是uvm_object所没有的&#xff1a;一是通过在new的时候指定parent参数来形成一种树形的组织结构&#xff1b;二是有phase的自动执行特点。下图是常用的UVM继…

Table Store: 海量结构化数据实时备份实战

Table Store: 海量结构化数据实时备份实战 数据备份简介 在信息技术与数据管理领域&#xff0c;备份是指将文件系统或数据库系统中的数据加以复制&#xff0c;一旦发生灾难或者错误操作时&#xff0c;得以方便而及时地恢复系统的有效数据和正常运作。在实际备份过程中&#xf…

云+X案例展 | 电商零售类:云徙助力良品铺子「双11」

本案例由云徙投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。助力品牌制胜双十一的背后…