flink 事件处理 CEP 详解

简述

Apache Flink CEP(Complex Event Processing,复杂事件处理)是一个基于Flink Runtime构建的复杂事件处理库,它允许用户定义复杂的模式来检测和分析事件流中的复杂事件。

  • **复杂事件处理(CEP):**一种基于动态环境中事件流的分析技术,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列。
  • **Flink CEP:**Apache Flink提供的一个专门用于复杂事件处理的库,它允许用户通过定义模式(Pattern)来匹配和检测事件流中的复杂事件。

主要组件功能

  • **Event Stream:**输入的数据流,通常来自传感器、日志、消息队列等实时数据源。
  • **Pattern定义:**使用Flink CEP提供的Pattern API来定义复杂事件的匹配规则。
  • **Pattern检测:**Flink CEP引擎实时读取数据流,并尝试将流中的事件与定义的模式进行匹配。
  • **生成Alert:**当检测到满足条件的复杂事件时,触发相应的操作,如发送警报通知、控制系统等。

应用场景

  • **金融交易:**用于监测金融交易中的异常情况,如突然的大额转账、不合理的交易行为等。
  • **物联网:**监测物联网设备中的异常情况,如异常温度、湿度等。
  • **零售业:**监测零售业中的实时销售情况,如销售额、库存情况等。
  • **广告业:**监测广告业中的实时用户行为,如用户的点击行为、浏览行为等。
  • **游戏业:**监测游戏中的实时用户行为,如用户的游戏操作、游戏分数等。

开始

  1. 依赖到项目的pom.xml文件中
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>1.19.0</version>
</dependency>
  1. CEP程序
DataStream<Event> input = ...;Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(SimpleCondition.of(event -> event.getId() == 42)).next("middle").subtype(SubEvent.class).where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0)).followedBy("end").where(SimpleCondition.of(event -> event.getName().equals("end")));PatternStream<Event> patternStream = CEP.pattern(input, pattern);DataStream<Alert> result = patternStream.process(new PatternProcessFunction<Event, Alert>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Alert> out) throws Exception {out.collect(createAlertFrom(pattern));}});

DataStream中的事件,如果你想在上面进行模式匹配的话,必须实现合适的 equals()和hashCode()方法, 因为FlinkCEP使用它们来比较和匹配事件。

模式API

Apache Flink CEP(Complex Event Processing)库提供了一套强大的API来定义复杂事件的匹配模式。这些API允许你构建复杂的事件序列,其中事件可以按照特定的顺序、时间间隔、逻辑关系等进行匹配。
基础模式

  • Pattern.begin(“name”): 开始定义一个模式,并为模式的起始部分命名。
  • where(…): 定义一个条件,它必须被满足才能继续匹配。
  • next(“name”): 定义一个模式中的下一个事件,并为其命名。
  • oneOrMore(): 匹配一个或多个事件。
  • times(n): 精确匹配n个事件。
  • timesOrMore(n): 匹配n个或更多事件。
  • optional(): 定义一个可选的事件部分。
  • within(Time): 定义整个模式必须在给定的时间窗口内完成匹配。

单个模式

单个模式(或单例模式)是指只接受一个事件的模式。默认情况下,如果没有指定循环量词(如oneOrMore(), times()等),模式都是单例的。
在以下示例中,start模式是一个单例模式,它只接受一个满足特定条件的事件。

Pattern<Event, ?> startPattern = Pattern.<Event>begin("start")  .where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  // 定义事件必须满足的条件  return value.getId() == 42;  }  });

单个模式可以有一个或多个条件,这些条件用于确定哪些事件应该被接受。在上面的示例中,where方法用于指定条件。虽然单个模式默认只接受一个事件,但可以通过使用循环量词将其转换为循环模式。
以下示例展示了如何将start模式转换为接受一个或多个事件的循环模式。

// 使用oneOrMore()转换为循环模式  
Pattern<Event, ?> startPatternRepeated = Pattern.<Event>begin("start")  .oneOrMore()  .where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  // 定义事件必须满足的条件  return value.getId() == 42;  }  });

量词

量词用于指定模式中某个元素应该出现的次数。这些量词帮助用户定义更复杂的模式,比如连续出现的事件、特定次数的事件等。
常用的量词

  1. next()
  • 描述:用于定义模式中的下一个事件。
  • 用法:Pattern.begin(“start”).next(“next”)
  • 注意:默认情况下,next() 表示一个事件必须出现一次。
  1. oneOrMore()
  • 描述:表示模式中的某个元素应该出现一次或多次。
  • 用法:Pattern.begin(“start”).oneOrMore()
  • 示例:Pattern.begin(“a”).oneOrMore().where(new SimpleCondition() {…})
  1. times(int count)
  • 描述:指定模式中的某个元素应该出现的精确次数。
  • 用法:Pattern.begin(“a”).times(3)
  • 示例:Pattern.begin(“a”).times(3).where(new SimpleCondition() {…})
  1. timesOrMore(int count)
  • 描述:指定模式中的某个元素应该出现至少指定次数。
  • 用法:Pattern.begin(“a”).timesOrMore(3)
  • 示例:Pattern.begin(“a”).timesOrMore(3).where(new SimpleCondition() {…})
  1. optional()
  • 描述:表示模式中的某个元素是可选的,即它可能出现也可能不出现。
  • 用法:Pattern.begin(“a”).optional()
  • 示例:Pattern.begin(“start”).next(“a”).optional().next(“b”)
  1. consecutive()
  • 描述:通常与oneOrMore()或times()结合使用,以确保指定的事件连续出现。
  • 用法:Pattern.begin(“a”).consecutive().oneOrMore()
  • 示例:Pattern.begin(“start”).next(“a”).consecutive().times(3)
  1. greedy()
  • 描述:用于在具有相同名称的模式元素之间选择最长的匹配序列(与strictContiguity()相对)。
  • 用法:Pattern.begin(“a”).greedy()
  • 注意:greedy() 通常与循环量词一起使用,以确保在处理具有重叠匹配的情况时选择最长的匹配序列。
  1. strictContiguity()
  • 描述:确保具有相同名称的模式元素是连续的,即它们之间没有遗漏的事件。
  • 用法:Pattern.begin(“a”).strictContiguity()
  • 注意:默认情况下,如果未指定strictContiguity(),则 Flink CEP 会尝试找到最长的匹配序列(类似于greedy()),但允许在具有相同名称的模式元素之间有遗漏的事件。

条件

条件(Conditions)是用于过滤事件的关键部分,确保只有满足特定条件的事件才会被包含在复杂事件模式中。Flink CEP 提供了多种方式来定义这些条件,使得用户能够灵活地定义模式匹配所需的事件属性。

  1. SimpleCondition
    SimpleCondition 是一个接口,需要实现 filter 方法来定义条件。filter 方法接收一个事件作为参数,并返回一个布尔值来表示该事件是否满足条件。
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")  .where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  // 定义条件  return value.getId() == 42;  }  });
  1. IterativeCondition
    IterativeCondition 用于在模式匹配过程中考虑多个连续事件的状态。它有两个方法:filter 和 iterate。filter 方法用于过滤单个事件,而 iterate 方法用于在连续事件上迭代并更新状态。
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")  .where(new IterativeCondition<Event, String>() {  @Override  public boolean filter(Event value, Context<String> ctx) throws Exception {  // 过滤条件  // ...  return true; // 示例,始终返回 true  }  @Override  public boolean iterate(Event value, Context<String> ctx) throws Exception {  // 迭代条件  // ...  return true; // 示例,始终返回 true  }  });
  1. Lambda 表达式
    可以使用 Lambda 表达式来简化 SimpleCondition 的定义。
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")  .where(value -> value.getId() == 42);
  1. 组合条件
    可以使用逻辑操作符(如 and, or)来组合多个条件。Flink CEP 提供了 PatternFlatMapFunction 和 PatternProcessFunction,允许你以更复杂的方式组合和处理条件。
pattern.where(SimpleCondition.of(value -> ... /*一些判断条件*/)).or(SimpleCondition.of(value -> ... /*一些判断条件*/));

组合模式

组合模式(Composite Patterns)允许将多个简单的模式组合成一个更复杂的模式。这可以通过使用 PatternSelectFunction、PatternFlatMapFunction、PatternProcessFunction 等来实现,这些函数允许根据输入事件动态地选择或生成新的模式。

  1. PatternSelectFunction
    PatternSelectFunction 允许基于当前的事件和/或之前的匹配结果来选择不同的模式分支。它返回一个 Pattern<T, ?> 对象,该对象定义了给定事件应如何继续匹配。
PatternSelectFunction<Event> selector = new PatternSelectFunction<Event>() {  @Override  public Pattern<Event, ?> select(Map<String, List<Event>> pattern) throws Exception {  // 根据 pattern 中的事件选择或生成新的模式  // 例如,如果某个事件满足某个条件,则选择一个分支模式  // 否则,选择另一个分支模式  // ...  return somePattern; // 返回选择的或新生成的 Pattern 对象  }  
};  Pattern<Event, ?> pattern = Pattern.<Event>begin("start")  .then(selector);
  1. PatternFlatMapFunction
    PatternFlatMapFunction 类似于 PatternSelectFunction,但它可以基于当前的事件和/或之前的匹配结果生成多个新的模式。这允许根据输入动态地扩展匹配的可能性。
PatternFlatMapFunction<Event, Event> flatMapper = new PatternFlatMapFunction<Event, Event>() {  @Override  public void flatMap(Map<String, List<Event>> pattern, Collector<Pattern<Event, ?>> out) throws Exception {  // 根据 pattern 中的事件生成多个新的 Pattern 对象  // ...  out.collect(somePattern1); // 输出一个或多个 Pattern 对象  out.collect(somePattern2);  // ...  }  
};  Pattern<Event, ?> pattern = Pattern.<Event>begin("start")  .then(flatMapper);
  1. PatternProcessFunction
    PatternProcessFunction 提供了更高级的功能,允许你在匹配过程中访问完整的模式上下文(包括之前和当前的事件),并可以基于这些上下文信息执行任意的操作。这个函数不仅限于生成新的模式,还可以用于发出警告、执行副作用操作等。
PatternProcessFunction<Event, ?> processor = new PatternProcessFunction<Event, ?>() {  @Override  public void processMatch(Map<String, List<Event>> match, Context<Event> ctx, Collector<OUT> out) throws Exception {  // 处理完整的模式匹配结果  // 可以基于 match 中的事件执行任意操作  // ...  // 如果需要,可以将结果发送到输出流中  // out.collect(...);  }  // ... 其他可选的方法,如 onTimer、onEvent 等  
};  Pattern<Event, ?> pattern = Pattern.<Event>begin("start")  // 定义其他模式部分...  .process(processor);

示例:使用 PatternSelectFunction 选择分支
假设有一个 Event 类型,其中包含一个 type 字段,想要根据 type 字段的值选择不同的分支模式:

PatternSelectFunction<Event> selector = new PatternSelectFunction<Event>() {  @Override  public Pattern<Event, ?> select(Map<String, List<Event>> pattern) throws Exception {  Event lastEvent = pattern.get("start").get(pattern.get("start").size() - 1);  if ("A".equals(lastEvent.getType())) {  return Pattern.<Event>after(pattern.get("start").get(0))  .where(event -> "B".equals(event.getType()))  .name("A-to-B");  } else if ("C".equals(lastEvent.getType())) {  return Pattern.<Event>after(pattern.get("start").get(0))  .where(event -> "D".equals(event.getType()))  .name("C-to-D");  } else {  // 默认分支或错误处理...  return null; // 或者抛出异常等  }  }  
};  Pattern<Event, ?> pattern = Pattern.<Event>begin("start")  .then(selector);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/846857.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Dinky FlinkSQL Doris读取写入

Dinky运行前开启全局变量&#xff0c;以支持使用&#xff1a; sink.sink.label-prefix ${idUtil.simpleUUID()} Mysql同步Doris - testMysqlCdcDoris&#xff1a; EXECUTE CDCSOURCE demo_doris WITH (connector mysql-cdc,hostname 172.xxx,port 3306,username xxx,pas…

gorm的find和scan使用

在 GORM 中&#xff0c;.Find() 和 .Scan() 都可以用于检索数据库记录&#xff0c;但它们之间存在一些差异&#xff0c;并不完全等同于彼此。 使用例子 Find 方法的使用例子 查找单一记录&#xff1a; var result models.MyModel db.Where(“id ?”, 1).Find(&result…

Spring 源码:深度解析AOP源码配置解析

文章目录 一、 解析AOP配置的入口1.1 从XML配置到AOP Namespace的解析流程1.2 分析注解驱动的AOP配置解析流程 二、AOP配置解析的核心流程2.1 ConfigBeanDefinitionParser 类2.2 parse()2.3 parseAdvisor()2.4 parseAspect()2.5 parsePointcut()2.6 createAdvisorBeanDefinitio…

算法每日一题(python,2024.05.29) day.11

题目来源&#xff08;力扣. - 力扣&#xff08;LeetCode&#xff09;&#xff0c;简单&#xff09; 解题思路&#xff1a; 法一&#xff1a;切片函数法 直接用python中的切片函数直接解决 法二&#xff1a;交换法 从俩头开始交换字符串的数字&#xff0c;若为奇数&#xff…

具有激情的技术管理者才是优秀的领导者

与其他类型的管理者相比&#xff0c;技术管理者更要具有激情&#xff0c;有激情的技术领导者才能影响和感染团队成员&#xff0c;实现团队的目标。 激情能够带领团队走出阴霾。在所有人都觉得没有希望而选择放弃的时候&#xff0c;有激情的管理者能够带领团队面对困难&#xf…

GITLAB常见问题总结

Troubleshooting GitLab Pages administration (FREE SELF) 原文地址 stage: Plan group: Knowledge info: To determine the technical writer assigned to the Stage/Group associated with this page, see https://about.gitlab.com/handbook/product/ux/technical-writing/…

STM8单片机变频器设计

变频调速技术是现代电力传动技术的重要发展方向,而作为变频调速系统的核心—变频器的性能也越来越成为调速性能优劣的决定因素,除了变频器本身制造工艺的“先天”条件外,对变频器采用什么样的控制方式也是非常重要的。随着电力电子技术、微电子技术、计算机网络等高新技术的…

Kompas AI:智能生活的开启者

引言 在现代社会&#xff0c;**人工智能&#xff08;AI&#xff09;**已经深刻地影响了我们的生活和工作。无论是智能家居、自动驾驶&#xff0c;还是医疗诊断&#xff0c;AI的应用无处不在。而在众多AI平台中&#xff0c;Kompas AI 作为一个先进的对话式AI平台&#xff0c;通过…

R语言数据分析-针对芬兰污染指数的分析与考察

1. 研究背景及意义 近年来&#xff0c;随着我国科技和经济高速发展&#xff0c;人们生活质量也随之显著提高。但是&#xff0c; 环境污染问题也日趋严重&#xff0c;给人们的生活质量和社会生产的各个方面都造成了许多不 利的影响。空气污染作为环境污染主要方面&#xff0c;更…

【漏洞复现】海康威视综合安防管理平台 orgManage/v1/orgs/download 任意文件读取漏洞

0x01 产品简介 海康威视综合安防管理平台是一套“集成化”、“智能化”的平台,通过接入视频监控、一卡通、停车场、报警检测等系统的设备。海康威视集成化综合管理软件平台,可以对接入的视频监控点集中管理,实现统一部署、统一配置、统一管理和统一调度。 0x02 漏洞概述 海康…

7-8 矩阵字符

给定一个仅包含小写字母的字符串S,用这些字符恰好排成一个n行m列的矩阵(m≥n)&#xff0c;请找出所有符合要求的矩阵中最接近于正方形的那个矩阵。然后从第一列开始&#xff0c;逐列按照从上到下的顺序输出矩阵中的字符。 例如&#xff1a;S "abcdefgh"。按要求m≥…

动态规划-求买卖股票所能获得的最大收益(hard)

一、问题描述 二、解题思路 1.先看有哪几个可变参数&#xff1a; (1).当前第几天 nowday(范围&#xff1a;0->n-1) (2).剩余交易次数 restTime(范围&#xff1a;k->0) (3).当天可买入还是可卖出 isnowHold(0 表示当前未持有可买入&#xff0c;1 表示当前持有可卖出) 2.…

WIN10环境下xposed环境搭建

禁止拿来干坏事&#xff0c;仅做学习为目的 环境需求 1.夜神模拟器7.1 2.Android stdio 2022.3.1 3. Adb环境配置 具体实现 1.安装xposed 打开可一键安装&#xff0c;重启 2.连接虚拟机 adb connect 127.0.0.1:620013.打开as,进入project 4.在lib下添加准备好的jar包 …

AD软件底层丝印反转

快捷键VB&#xff0c;翻转后底部视图所有显示就正常了&#xff0c;当底层确认之后再按VB就回到正常状态。 否则你就看到一个镜像的丝印。 快捷键VB后 注意&#xff0c;经过VB反转BOTTOM后TOP层的丝印变镜像翻转了。 设计完毕后调整过来即可。

高级优化理论与方法(十四)

高级优化理论与方法&#xff08;十四&#xff09; Non-linear Constrained OptimizationKKT-Theorem(FONC)SONCDefinitionSOSCExample 1Example 2 Convex Optimization ProblemsDefinitionLemmaTheoremLemmaExampleTheorem TheoremExample DefinitionTheoremLemmaCorollaryLemm…

查询语言:ClickHouse的SQL基础与特点

1.背景介绍 查询语言&#xff1a;ClickHouse的SQL基础与特点 作者&#xff1a;禅与计算机程序设計艺術 1. 背景介绍 1.1 ClickHouse简介 ClickHouse是Yandex开源的一个高性能分布式 column-oriented DBSMS (Column-based Distributed SQL Management System)&#xff0c;它…

怎么下载 jar 包

一、在Maven仓库里面下载 Maven仓库 网址&#xff1a;https://mvnrepository.com/ 二、搜索需要的 jar 包&#xff08;以 druid 为例&#xff09; 三、找到 druid jar包&#xff0c;点进去 四、找到自己需要的版本&#xff0c;点进去 五、 点 jar 下载

【漏洞复现】SpringBlade tenant/list SQL 注入漏洞

0x01 产品简介 SpringBlade ,是一个由商业级项目升级优化而来的 SpringCloud 分布式微服务架构、SpingBoot 单体式微服务架构并存的综合型项目。 0x02 漏洞概述 SpringBlade 后台框架 /api/blade-system/tenantist路径存在SQL注入漏洞&#xff0c;攻击者除了可以利用 SQL 注…

【深度学习在计算机视觉中的应用:塑造机器的视觉智能】

文章目录 前言深度学习在计算机视觉中的关键应用图像分类示例&#xff1a;使用卷积神经网络分析代码结论 前言 计算机视觉是一个模仿人类视觉感知能力的领域&#xff0c;它使计算机能够从图像和视频中识别、处理和理解视觉信息。深度学习的兴起极大地推动了计算机视觉技术的发…

关于网络编程

目录 1、InetAdress类 2、Socket套接字 3、UDP数据报套接字编程 &#xff08;1&#xff09;DatagramSocket 类 &#xff08;2&#xff09;DatagramPacket类 &#xff08;3&#xff09;处理无连接问题 UdpEchoServer.java UdpEchoClient.java 4、TCP流套接字编程 &…