Apache Flink CEP 实战

本文根据Apache Flink 实战&进阶篇系列直播课程整理而成,由哈啰出行大数据实时平台资深开发刘博分享。通过一些简单的实际例子,从概念原理,到如何使用,再到功能的扩展,希望能够给打算使用或者已经使用的同学一些帮助。

主要的内容分为如下三个部分:

  1. Flink CEP概念以及使用场景。
  2. 如何使用Flink CEP。
  3. 如何扩展Flink CEP。

Flink CEP 概念以及使用场景

什么是 CEP

CEP的意思是复杂事件处理,例如:起床-->洗漱-->吃饭-->上班等一系列串联起来的事件流形成的模式称为CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。

下图中列出了几个例子:

  • 第一个是异常行为检测的例子:假设车辆维修的场景中,当一辆车出现故障时,这辆车会被送往维修点维修,然后被重新投放到市场运行。如果这辆车被投放到市场之后还未被使用就又被报障了,那么就有可能之前的维修是无效的。
  • 第二个是策略营销的例子:假设打车的场景中,用户在APP上规划了一个行程订单,如果这个行程在下单之后超过一定的时间还没有被司机接单的话,那么就需要将这个订单输出到下游做相关的策略调整。 
  • 第三个是运维监控的例子:通常运维会监控服务器的CPU、网络IO等指标超过阈值时产生相应的告警。但是在实际使用中,后台服务的重启、网络抖动等情况都会造成瞬间的流量毛刺,对非关键链路可以忽略这些毛刺而只对频繁发生的异常进行告警以减少误报。

Flink CEP 应用场景

  • 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
  • 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
  • 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。

Flink CEP原理

Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态中间状态最终状态三种,边分为takeignoreproceed三种。

  • take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
  • ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。 
  • proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。 也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条proceed边和下游的购买状态相连。

下面以一个打车的例子来展示状态是如何流转的,规则见下图所示。

以乘客制定行程作为开始,匹配乘客的下单事件,如果这个订单超时还没有被司机接单的话,就把行程事件和下单事件作为结果集往下游输出。

假如消息到来顺序为:行程-->其他-->下单-->其他。

状态流转如下:

  1. 开始时状态处于行程状态,即等待用户制定行程。

  1. 当收到行程事件时,匹配行程状态的条件,把行程事件放到结果集中,通过take边将状态往下转移到下单状态

  1. 由于下单状态上有一条ignore边,所以可以忽略收到的其他事件,直到收到下单事件时将其匹配,放入结果集中,并且将当前状态往下转移到超时未接单状态。这时候结果集当中有两个事件:制定行程事件和下单事件。 

  1. 超时未接单状态时,如果来了一些其他事件,同样可以被ignore边忽略,直到超时事件的触发,将状态往下转移到最终状态,这时候整个模式匹配成功,最终将结果集中的制定行程事件和下单事件输出到下游。

上面是一个匹配成功的例子,如果是不成功的例子会怎么样?

假如当状态处于超时未接单状态时,收到了一个接单事件,那么就不符合超时未被接单的触发条件,此时整个模式匹配失败,之前放入结果集中的行程事件和下单事件会被清理。

Flink CEP程序开发

本节将详细介绍Flink CEP的程序结构以及API。

Flink CEP 程序结构

主要分为两部分:定义事件模式和匹配结果处理。

官方示例如下:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}}).next("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {@Overridepublic boolean filter(SubEvent subEvent) {return subEvent.getVolume() >= 10.0;}}).followedBy("end").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getName().equals("end");}});PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Alert> result = patternStream.select(new PatternProcessFunction<Event, Alert>() {@Overridepublic void select(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out) throws Exception {out.collect(createAlertFrom(pattern));}});

程序结构分为三部分:首先需要定义一个模式(Pattern),即第2行代码所示,接着把定义好的模式绑定在DataStream上(第25行),最后就可以在具有CEP功能的DataStream上将匹配的结果进行处理(第27行)。 

下面对关键部分做详细讲解:

定义模式:上面示例中,分为了三步,首先匹配一个ID为42的事件,接着匹配一个体积大于等于10的事件,最后等待收到一个name等于end的事件。 
匹配结果输出:此部分,需要重点注意select函数(第30行,注:本文基于Flink 1.7版本)里边的Map类型的pattern参数,Key是一个pattern的name,它的取值是模式定义中的Begin节点start,或者是接下来next里面的middle,或者是第三个步骤的end。后面的map中的value是每一步发生的匹配事件。因在每一步中是可以使用循环属性的,可以匹配发生多次,所以map中的value是匹配发生多次的所有事件的一个集合。

Flink CEP构成

上图中,蓝色方框代表的是一个个单独的模式;浅黄色的椭圆代表的是这个模式上可以添加的属性,包括模式可以发生的循环次数,或者这个模式是贪婪的还是可选的;橘色的椭圆代表的是模式间的关系,定义了多个模式之间是怎么样串联起来的。通过定义模式,添加相应的属性,将多个模式串联起来三步,就可以构成了一个完整的Flink CEP程序。

定义模式

下面是示例代码:

pattern.next("start").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getId() == 42;}}
)

定义模式主要有如下5个部分组成:

pattern:前一个模式
next/followedBy/...:开始一个新的模式
start:模式名称
where:模式的内容
filter:核心处理逻辑

模式的属性

接下来介绍一下怎样设置模式的属性。模式的属性主要分为循环属性可选属性

循环属性可以定义模式匹配发生固定次数(times),匹配发生一次以上(oneOrMore),匹配发生多次以上。(timesOrMore)。

可选属性可以设置模式是贪婪的(greedy),即匹配最长的串,或设置为可选的(optional),有则匹配,无则忽略。

模式的有效期

由于模式的匹配事件存放在状态中进行管理,所以需要设置一个全局的有效期(within)。 若不指定有效期,匹配事件会一直保存在状态中不会被清除。至于有效期能开多大,要依据具体使用场景和数据量来衡量,关键要看匹配的事件有多少,随着匹配的事件增多,新到达的消息遍历之前的匹配事件会增加CPU、内存的消耗,并且随着状态变大,数据倾斜也会越来越严重。

模式间的联系

主要分为三种:严格连续性(next/notNext),宽松连续性(followedBy/notFollowedBy),和非确定宽松连续性(followedByAny)。

三种模式匹配的差别见下表所示:

模式&数据流严格连续性宽松连续性非确定宽松连续性
Pattern(A B) Streaming('a','c','b1','b2')不匹配匹配 输出:a,b1匹配 输出:a,b1 a,b2

总结如下:

  • 严格连续性:需要消息的顺序到达与模式完全一致。
  • 宽松连续性:允许忽略不匹配的事件。
  • 非确定宽松连性:不仅可以忽略不匹配的事件,也可以忽略已经匹配的事件。

多模式组合

除了前面提到的模式定义和模式间的联系,还可以把相连的多个模式组合在一起看成一个模式组,类似于视图,可以在这个模式视图上进行相关操作。

上图这个例子里面,首先匹配了一个登录事件,然后接下来匹配浏览,下单,购买这三个事件反复发生三次的用户。 

如果没有模式组的话,代码里面浏览,下单,购买要写三次。有了模式组,只需把浏览,下单,购买这三个事件当做一个模式组,把相应的属性加上times(3)就可以了。

处理结果

处理匹配的结果主要有四个接口: PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction和PatternTimeoutFunction。

从名字上可以看出,输出可以分为两类:select和flatSelect指定输出一条还是多条,timeoutFunction和不带timeout的Function指定可不可以对超时事件进行旁路输出。 

下图是输出的综合示例代码:

状态存储优化

当一个事件到来时,如果这个事件同时符合多个输出的结果集,那么这个事件是如何保存的?

Flink CEP通过Dewey计数法在多个结果集中共享同一个事件副本,以实现对事件副本进行资源共享。

Flink CEP的扩展

本章主要介绍一些Flink CEP的扩展,讲述如何做到超时机制的精确管理,以及规则的动态加载与更新。

超时触发机制扩展

原生Flink CEP中超时触发的功能可以通过within+outputtag结合来实现,但是在复杂的场景下处理存在问题,如下图所示,在下单事件后还有一个预付款事件,想要得到下单并且预付款后超时未被接单的订单,该如何表示呢? 

参照下单后超时未被接单的做法,把下单并且预付款后超时未被接单规则表示为下单.followedBy(预付款).followedBy(接单).within(time),那么这样实现会存在问题吗?

这种做法的计算结果是会存在脏数据的,因为这个规则不仅匹配到了下单并且预付款后超时未被接单的订单(想要的结果),同样还匹配到了只有下单行为后超时未被接单的订单(脏数据,没有预付款)。原因是因为超时within是控制在整个规则上,而不是某一个状态节点上,所以不论当前的状态是处在哪个状态节点,超时后都会被旁路输出。

那么就需要考虑能否通过时间来直接对状态转移做到精确的控制,而不是通过规则超时这种曲线救国的方式。 于是乎,在通过消息触发状态的转移之外,需要增加通过时间触发状态的转移的支持。要实现此功能,需要在原来的状态以及状态转移中,增加时间属性的概念。如下图所示,通过wait算子来得到waiting状态,然后在waiting状态上设置一个十秒的时间属性以定义一个十秒的时间窗口。

wait算子对应NFA中的ignore状态,将在没有到达时间窗口结束时间时自旋,在ComputationState中记录wait的开始时间,在NFA的doProcess中,将到来的数据与waiting状态处理,如果到了waiting的结束时间,则进行状态转移。

上图中红色方框中为waiting状态设置了两条ignore边:

1.waitingStatus.addIgnore(lastSink,waitingCondition),waitingCondition中的逻辑是获取当前的时间(支持事件时间),判断有没有超过设置的waiting阈值,如果超过就把状态向后转移。
2.waitingStatus.addIgnore(waitingCondition),waitingCondition中如果未达到设置的waiting阈值,就会自旋在当前的waiting状态不变。

规则动态注入

线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

那么要怎么样做到规则的动态更新和加载呢?

梳理一下整体架构,Flink CEP是运行在Flink Job里的,而规则库是放在外部存储中的。首先,需要在运行的Job中能及时发现外部存储中规则的变化,即需要在Job中提供访问外部库的能力。 其次,需要将规则库中变更的规则动态加载到CEP中,即把外部规则的描述解析成Flink CEP所能识别的pattern结构体。最后,把生成的pattern转化成NFA,替换历史NFA,这样对新到来的消息,就会使用新的规则进行匹配。

下图就是一个支持将外部规则动态注入、更新的接口。

这个接口里面主要实现了四个方法:

  • initialize:初始化方法,进行外部库连接的初始化。
  • inject:和外部数据库交互的主要方法,监听外部库变化,获取最新的规则并通过Groovy动态加载,返回pattern。
  • getPeriod:设置轮巡周期,在一些比较简单的实时性要求不高的场景,可以采用轮巡的方式,定期对外部数据库进行检测。
  • getNfaKeySelector:和动态更新无关,用来支持一个流对应多个规则组。

历史匹配结果清理

新规则动态加载到Flink CEP的Job中,替换掉原来的NFA之后,还需要对历史匹配的结果集进行清理。在AbstractKeyedCEPPatternOperator中实现刷新NFA,注意,历史状态是否需要清理和业务相关:

  1. 修改的逻辑对规则中事件的匹配没有影响,保留历史结果集中的状态。
  2. 修改的逻辑影响到了之前匹配的部分,需要将之前匹配的结果集中的状态数据清除,防止错误的输出。

总结

使用Flink CEP,熟知其原理是很重要的,特别是NFA的状态转移流程,然后再去看源码中的状态图的构建就会很清晰了。

双12来袭!500元淘宝红包、iPhone11等你拿。
https://www.aliyun.com/1212/2019/home?utm_content=g_1000092611

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517220.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图神经网络(AliGraph)在阿里巴巴的发展与应用

背景 为什么做GNN? 在大数据的背景下&#xff0c;利用高速计算机去发现数据中的规律似乎是最有效的手段。为了让机器计算的有目的性&#xff0c;需要将人的知识作为输入。我们先后经历了专家系统、经典机器学习、深度学习三个阶段&#xff0c;输入的知识由具体到抽象&#xf…

实用!五款新型 Linux 命令行工具

作者 | Ricardo Gerardi译者 | 弯月&#xff0c;责编 | 屠敏出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;在Linux/Unix系统的日常使用中&#xff0c;我们需要使用很多命令行工具来完成工作&#xff0c;以及理解和管理我们的系统&#xff0c;例如使用du来监视磁盘…

从零开始入门 K8s | 手把手带你理解 etcd

导读&#xff1a;etcd 是用于共享配置和服务发现的分布式、一致性的 KV 存储系统。本文从 etcd 项目发展所经历的几个重要时刻开始&#xff0c;为大家介绍了 etcd 的总体架构及其设计中的基本原理。希望能够帮助大家更好的理解和使用 etcd。 一、etcd 项目的发展历程 etcd 诞生…

minio 单机安装、部署 centos7环境

文章目录一、默认模式下载运行1. 下载2. 访问minio控制台3. 创建目录3. 上传文件二、企业自定义模式2.1. 指定用户密码2.2. 配置目录2.3. 控制台端口2.4. 启动2.5. minio控制台登录2.6. 效果图一、默认模式下载运行 1. 下载 官网地址&#xff1a;https://docs.min.io/docs/ …

Kubernetes 日志查询分析实践

准备工作 为了完成后续的相关操作&#xff0c;我们需要准备一个 K8s 集群&#xff0c;操作步骤如下&#xff1a; 登陆容器服务控制台。创建一个标准托管集群&#xff08;杭州区域&#xff09;&#xff0c;在向导中勾选上【使用 EIP 暴露 API Server】 和【使用日志服务】。集…

“编程能力差,90%是输在这点上!”谷歌AI开发专家:逆袭并没那么难!

Google 人工智能开发者专家彭靖田老师说——超90%的程序员在初学Python 人工智能时&#xff0c;都会遇到下面3个问题&#xff1a;1.想入门人工智能&#xff0c;但不知从何学起&#xff0c;也不知道该选择什么方向...2.Python语法、机器学习/深度学习框架、算法都能看懂&#xf…

这群程序员疯了!他们想成为IT界最会带货的男人

随着网红主播越来越火&#xff0c;通过直播带货种草的形式也成了今年双12的热点。 不过&#xff0c;网红主播带货早已见怪不怪&#xff0c;但你们见过程序员直播带货吗!? 近日&#xff0c;趁着阿里云双12年末采购节&#xff0c;阿里云邀请了一波程序员GG来为大家直播带货&am…

Minio Docker 单机安装(二种模式) linux

文章目录一、默认单机启动1. docker安装启动2. minio 镜像拉取和启动3. minio登录二、minio纠删码模式2.1. 简述2.2. 启动2.3. minio登录2.4. 总览2.5. 上传文件测试一、默认单机启动 1. docker安装启动 # 在线安装docker yum install docker# 启动docker systemctl start do…

阿里巴巴 Service Mesh 落地的架构与挑战

导读&#xff1a;云原生已成为整个阿里巴巴经济体构建面向未来的技术基础设施&#xff0c;Service Mesh 作为云原生的关键技术之一&#xff0c;顺利完成在 双11 核心应用严苛而复杂场景下的落地验证。本文作者将与大家分享在完成这一目标过程中我们所面临和克服的挑战。 部署架…

看完这篇 HashMap ,和面试官扯皮就没问题了

来源 | Java 建设者责编 | Carol封图 | CSDN 下载自视觉中国&#xff08;如果你没有时间细抠本文&#xff0c;可以直接看 HashMap 概述&#xff0c;能让你对 HashMap 有个大致的了解&#xff09;HashMap 是 Map 接口的实现&#xff0c;HashMap 允许空的 key-value 键值对&#…

除了快,5G 有哪些关键技术?

阿里妹导读&#xff1a;5G不仅仅只是网速更快&#xff0c;更多的是生活方式的颠覆&#xff0c;对各行各业都会起到催化作用。5G里不仅仅只有大带宽&#xff0c;而是会有很多与B端用户&#xff08;企业&#xff09;相结合的点。接下来&#xff0c;跟阿里大文娱的梓烁一起了解5G的…

聚水潭是如何基于AnalyticDB for PostgreSQL 构筑海量实时数仓平台的

聚水潭数据仓库业务介绍 上海聚水潭网络科技有限公司成立于2014年。聚水潭创建之初&#xff0c;以电商SaaS ERP切入市场&#xff0c;凭借出色的产品和服务&#xff0c;快速获得市场领先地位。随着客户需求的不断变化&#xff0c;如今聚水潭已经发展成为以SaaS ERP为核心&#…

sqlite3的编译和使用

编译环境准备 这个是nw官网的环境搭建教程&#xff0c;一般需要python2.7、visual studio 2013、node-gyp、 node-pre-gyp&#xff0c;环境这块的文章很多自己可以百度http://docs.nwjs.io/en/latest/For Users/Advanced/Use Native%2 0Node%20Modules/ 编译好之后&#xff0…

阿里巴巴的 Kubernetes 应用管理实践经验与教训

导读&#xff1a;本文整理自孙健波在 ArchSummit 大会 2019 北京站演讲稿记录。首先介绍了阿里巴巴基于 Kubernetes 项目进行大规模应用实践过程中遇到的问题&#xff1b;随后会逐一介绍解决这些问题的现有实践及其本身存在的局限性&#xff1b;最后会介绍阿里巴巴目前正在进行…

Minio 分布式集群部署

文章目录一、分布式存储可靠性常用方法1. 概述2. 冗余3. 校验二、分布式Minio优势2.1. 数据保护2.2. 高可用2.3.一致性三、运行分布式Minio3.1. 启动方案简述3.2. 案例说明3.3. 制作分布式启动脚本3.4. 制作伪分布式启动脚本3.5. 登录minio四、分布式Minio负载均衡4.1. nginx安…

数据分析:为什么说Python比Excel更简单高效 ?

日本最大的证券公司之一野村证券首席数字官马修汉普森&#xff0c;在Quant Conference上发表讲话&#xff1a;“用Excel的人越来越少&#xff0c;大家都在用Python。”甚至直接说&#xff1a;“Python已经取代了Excel。”事实上&#xff0c;为了追求更高的效率和质量&#xff0…

快速搭建 Serverless 在线图片处理应用

简介 首先介绍下在本文出现的几个比较重要的概念&#xff1a; 函数计算&#xff08;Function Compute&#xff09;&#xff1a;函数计算是一个事件驱动的服务&#xff0c;通过函数计算&#xff0c;用户无需管理服务器等运行情况&#xff0c;只需编写代码并上传。函数计算准备计…

如何在 PyFlink 1.10 中自定义 Python UDF?

我们知道 PyFlink 是在 Apache Flink 1.9 版新增的&#xff0c;那么在 Apache Flink 1.10 中 Python UDF 功能支持的速度是否能够满足用户的急切需求呢&#xff1f; Python UDF 的发展趋势 直观的判断&#xff0c;PyFlink Python UDF 的功能也可以如上图一样能够迅速从幼苗变成…

Node.js从零开发Web Server博客项目笔记

代码运行流程 首先开启服务器&#xff0c;在npm run dev的时候运行了bin目录下的www.js文件&#xff0c;启动http服务 当前端进行访问的时候&#xff0c;经过app.js文件 App.js是整个项目的入口文件&#xff0c;首先判断这个用户在http的header头中带了那些验证的信息&#…

如何度过二十多岁这段又穷又迷茫的岁月?

我们在后台常常会收到读者的留言我马上毕业了&#xff0c;但是现在很迷茫&#xff0c;不知道学校里学的&#xff0c;能不能真正的适应工作...我工作两三年&#xff0c;还是不知道怎么规划自己的技术成长路线&#xff0c;不知道该学什么来提升自己的竞争力...人生需要长线的经营…