Flink CEP(二) 运行源码解析

通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。

1. Pattern的定义

Pattern<Tuple3<String, Long, String>,?> pattern = Pattern.<Tuple3<String, Long, String>>begin("begin").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("success");}}).followedByAny("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("fail");}}).followedBy("end").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)throws Exception {return value.f2.equals("end");}});

 在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。

public class Pattern<T, F extends T> {/** Name of the pattern. */private final String name;/** Previous pattern. */private final Pattern<T, ? extends T> previous;/** The condition an event has to satisfy to be considered a matched. */private IterativeCondition<F> condition;/** Window length in which the pattern match has to occur. */private final Map<WithinType, Time> windowTimes = new HashMap<>();/*** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.*/private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);/** The condition an event has to satisfy to stop collecting events into looping state. */private IterativeCondition<F> untilCondition;/** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */private Times times;private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}

可以看到每一个Pattern都会存在以下属性:

  • Name:Pattern的Name
  • previous:之前的Pattern
  • condition:Pattern的匹配逻辑
  • windowTimes:限制窗口的时长
  • Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
    • /*** A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.** <ol>*   <li>Single*   <li>Looping*   <li>Times* </ol>** <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times* also hava an additional inner consuming strategy that is applied between accepted events in the* pattern.*/
      public class Quantifier {private final EnumSet<QuantifierProperty> properties;private final ConsumingStrategy consumingStrategy;private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
      }
  • untilCondition:Pattern的循环匹配的结束条件
  • times:连续匹配次数
  • afterMatchSkipStrategy:匹配后的跳过策略

2.PatternStream的构建

        对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {return new PatternStreamBuilder<>(inputStream, pattern, TimeBehaviour.EventTime, null, null);}PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));}

继续执行代码,进入Select()。

    public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction,final TypeInformation<R> outTypeInfo) {final PatternProcessFunction<T, R> processFunction =fromSelect(builder.clean(patternSelectFunction)).build();return process(processFunction, outTypeInfo);}

进入process可以看到PatternStream.select会调用builder.build函数。

    public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction,final TypeInformation<R> outTypeInfo) {return builder.build(outTypeInfo, builder.clean(patternProcessFunction));}

在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。

    <OUT, K> SingleOutputStreamOperator<OUT> build(final TypeInformation<OUT> outTypeInfo,final PatternProcessFunction<IN, OUT> processFunction) {checkNotNull(outTypeInfo);checkNotNull(processFunction);final TypeSerializer<IN> inputSerializer =inputStream.getType().createSerializer(inputStream.getExecutionConfig());final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;final NFACompiler.NFAFactory<IN> nfaFactory =NFACompiler.compileFactory(pattern, timeoutHandling);CepOperator<IN, K, OUT> operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);final SingleOutputStreamOperator<OUT> patternStream;if (inputStream instanceof KeyedStream) {KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);} else {KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();patternStream =inputStream.keyBy(keySelector).transform("GlobalCepOperator", outTypeInfo, operator).forceNonParallel();}return patternStream;}

3.CepOperator的执行

        初始化。

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

 可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。

CepOperator会在processElement中处理流中的每条数据。

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (isProcessingTime) {if (comparator == null) {// there can be no out of order elements in processing timeNFAState nfaState = getNFAState();long timestamp = getProcessingTimeService().getCurrentProcessingTime();advanceTime(nfaState, timestamp);processEvent(nfaState, element.getValue(), timestamp);updateNFA(nfaState);} else {long currentTime = timerService.currentProcessingTime();bufferEvent(element.getValue(), currentTime);}} else {long timestamp = element.getTimestamp();IN value = element.getValue();// In event-time processing we assume correctness of the watermark.// Events with timestamp smaller than or equal with the last seen watermark are// considered late.// Late events are put in a dedicated side output, if the user has specified one.if (timestamp > timerService.currentWatermark()) {// we have an event with a valid timestamp, so// we buffer it until we receive the proper watermark.bufferEvent(value, timestamp);} else if (lateDataOutputTag != null) {output.collect(lateDataOutputTag, element);} else {numLateRecordsDropped.inc();}}}

        可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。

        如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。

        正常数据会先被缓存起来,等待处理。

    private void bufferEvent(IN event, long currentTime) throws Exception {List<IN> elementsForTimestamp = elementQueueState.get(currentTime);if (elementsForTimestamp == null) {elementsForTimestamp = new ArrayList<>();registerTimer(currentTime);}elementsForTimestamp.add(event);elementQueueState.put(currentTime, elementsForTimestamp);}

       elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。

    @Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {// 1) get the queue of pending elements for the key and the corresponding NFA,// 2) process the pending elements in event time order and custom comparator if exists//		by feeding them in the NFA// 3) advance the time to the current watermark, so that expired patterns are discarded.// 4) update the stored state for the key, by only storing the new NFA and MapState iff they//		have state to be used later.// 5) update the last seen watermark.// STEP 1PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();NFAState nfaState = getNFAState();// STEP 2while (!sortedTimestamps.isEmpty()&& sortedTimestamps.peek() <= timerService.currentWatermark()) {long timestamp = sortedTimestamps.poll();advanceTime(nfaState, timestamp);// 对事件按时间进行排序try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {elements.forEachOrdered(event -> {try {processEvent(nfaState, event, timestamp);} catch (Exception e) {throw new RuntimeException(e);}});}elementQueueState.remove(timestamp);}// STEP 3advanceTime(nfaState, timerService.currentWatermark());// STEP 4updateNFA(nfaState);}
   private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {Collection<Map<String, List<IN>>> patterns =nfa.process(sharedBufferAccessor,nfaState,event,timestamp,afterMatchSkipStrategy,cepTimerService);if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {registerTimer(timestamp + nfa.getWindowTime());}processMatchedSequences(patterns, timestamp);}}private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {PatternProcessFunction<IN, OUT> function = getUserFunction();setTimestamp(timestamp);for (Map<String, List<IN>> matchingSequence : matchingSequences) {function.processMatch(matchingSequence, context, collector);}}

        nfa.process()最后会调用doProcess进行处理。

        computer

         可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1

前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:

success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

 fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

 第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;

 

 最后如果状态到达终态,输出到potentialMatches中存储。

打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13770.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据分析-关于指标和指标体系

一、电商指标体系 二、指标体系的作用 三、统计学中基本的分析手段

移远通信推出新一代高算力智能模组SG885G-WF,为工业和消费级IoT应用带来全新性能标杆

2023年7月24日&#xff0c;全球领先的物联网整体解决方案供应商移远通信宣布&#xff0c;正式推出其新一代旗舰级安卓智能模组SG885G-WF。该智能模组具有高达48 TOPS 的AI综合算力、强大性能及丰富的多媒体功能&#xff0c;非常适用于需要高处理能力和多媒体功能的工业和消费者…

如何在win10环境下配置强化学习gym库(使用vscode)

我是通过anacondavscode完成的gym库的使用&#xff0c;只是把案例跑起来了&#xff0c;具体步骤如下&#xff1a; 1、安装anaconda,参考链接&#xff1a;https://www.jianshu.com/p/2f3be7781451 我其实就是生安装的&#xff0c;也没有去配置环境啥的&#xff0c;就是下载安…

FANUC机器人SRVO-217故障报警原因分析及参考解决办法

FANUC机器人SRVO-217故障报警原因分析及参考解决办法 如下图所示,示教器提示:SRVO-217紧急停止电路板未找到, 查阅手册可以看到以下的报警说明: 故障原因: 通电时未能识别紧急停止电路板或者增设的安全I/O装置。连接有多个安全I/O装置的系统中,在报警信息的最后,会显示发…

进程_PCB 的理解

目录 一. PCB 的概念 1. 为什么需要PCB 2. PCB的属性 二. task struct 1. task struct 介绍 2. 查看进程指令 3. PID 4. PPID 父进程是什么&#xff1f; 为什么要有父进程&#xff1f; 5. fork 创建子进程 1) fork 后的现象 为什么会打印两次&#xff1f; 2) 的返…

自动驾驶感知系统--惯性导航定位系统

惯性导航定位 惯性是所有质量体本身的基本属性&#xff0c;所以建立在牛顿定律基础上的惯性导航系统&#xff08;Inertial Navigation System,INS&#xff09;(简称惯导系统)不与外界发生任何光电联系&#xff0c;仅靠系统本身就能对车辆进行连续的三维定位和三维定向。卫星导…

Linux学成之路(基础篇0(二十三)MySQL服务(主从MySQL服务和读写分离——补充)

目录 一、MySQL Replication概述 优点 异步复制&#xff08;Asynchronous repication&#xff09; 全同步复制&#xff08;Fully synchronous replication&#xff09; 半同步复制&#xff08;Semisynchronous replication&#xff09; 三、MySQL支持的复制 四、部署主从…

手写vuex

vuex 基本用法 vuex是作为插件使用&#xff0c;Vue.use(vuex) 最后注册给new Vue的是一个new Vuex.Store实例 // store.js import Vue from vue import Vuex from vuexVue.use(Vuex) // Vue.use注册插件 // new Vuex.Store实例 export default new Vuex.Store({state: {},gette…

【树链剖分+MST】CF609E

Problem - E - Codeforces 题意&#xff1a; 思路&#xff1a; 先把全局的MST求出来&#xff0c;然后对于一条边&#xff0c;如果它本来就在MST中&#xff0c;说明代价就是MST的权值和&#xff0c;否则它加入MST中&#xff0c;此时MST形成了环&#xff0c;我们把环中最大的那…

docker安装rabbitmq

1&#xff0c;拉取rabbitmq容器 docker pull rabbitmq 2&#xff0c;下载完以后启动容器 先查看自己的容器id&#xff1a; [rootch ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE rabbitmq latest bcef1e…

Power BI-网关设置与云端报表定时刷新(一)

网关的工作原理 网关是将本地数据传输至云端的桥梁&#xff0c;不仅Power BI能使用&#xff0c;其他微软软件也能够使用。 我们发布在云上的报表&#xff0c;发布后是静态的&#xff0c;不会自动刷新。需要通过网关设置定时刷新。 安装与设置 1.登录到Powerbi 在线服务–设置…

实战项目——基于多设计模式下的同步异步日志系统

系列文章目录 1.项目介绍 2.相关技术补充 3.日志系统框架 4.代码设计 5.功能测试 6.性能测试 文章目录 目录 系列文章目录 1.项目介绍 2.相关技术补充 3.日志系统框架 4.代码设计 5.功能测试 6.性能测试 文章目录 前言 一、项目介绍 二、开发环境 三、核心技…

Ubuntu更改虚拟机网段(改成桥接模式无法连接网络)

因为工作需要&#xff0c;一开始在安装vmware和虚拟机时&#xff0c;是用的Nat网络。 现在需要修改虚拟机网段&#xff0c;把ip设置成和Windows端同一网段&#xff0c;我们就要去使用桥接模式。 环境&#xff1a; Windows10、Ubuntu20.04虚拟机编辑里打开虚拟网络编辑器&#…

7.Docker-compose

文章目录 Docker-compose概念Docker-compose部署YAML文件格式和编写注意事项注意数据结构对象映射序列属组布尔值序列的映射映射的映射JSON格式文本换行锚点和引用 Docker compose配置常用字段docker compose常用命令Docker Compose 文件结构docker compose部署apachedocker co…

Ubuntu Server版 之 mysql 系列(-),安装、远程连接,mysql 创建用户、授权等

Ubuntu 分 桌面版 和 服务版 桌面版 &#xff1a;有额外的简易界面 服务版&#xff1a;是纯黑框的。没有任何UI界面的可言 安装mysql 安装位置 一般按照的位置存放在 /usr/bin 中 sudo apt-get install mysql-server退出程序或应用 exit 或 Ctrl D 查看mysql的状态 servic…

springboot参数校验

springboot参数传递 PathVariableRequestParamRequestBody JSR303 jsr303 &#xff1a; 也称 bean validation 规范&#xff0c;用于java bean 验证的标准API&#xff0c;&#xff0c;他定义了一组注解&#xff0c;可以在javabean 的属性上声明验证规则 JSR&#xff1a; ja…

【Android安全】Embedded Trace Microcell模块

ETM: Embedded Trace Macrocell, hardware unit responsible to generate hardware instruction trace. ETM模块用于在硬件层面实现instruction trace&#xff0c;可用于辅助逆向分析。 使用教程&#xff1a; https://mcuoneclipse.com/2016/11/05/tutorial-getting-etm-inst…

鸿鹄协助管理华为云与炎凰Ichiban

炎凰对华为云的需求 在炎凰日常的开发中&#xff0c;对于服务器上的需求&#xff0c;我们基本都是采用云服务。目前我们主要选择的是华为云&#xff0c;华为云的云主机比较稳定&#xff0c;提供的云主机配置也比较多样&#xff0c;非常适合对于不同场景硬件配置的需求&#xff…

Java IO,BIO、NIO、AIO

操作系统中的 I/O 以上是 Java 对操作系统的各种 IO 模型的封装&#xff0c;【文件的输入、输出】在文件处理时&#xff0c;其实依赖操作系统层面的 IO 操作实现的。【把磁盘的数据读到内存种】操作系统中的 IO 有 5 种&#xff1a; 阻塞、 非阻塞、【轮询】 异步、 IO复…

NLP From Scratch: 生成名称与字符级RNN

NLP From Scratch: 生成名称与字符级RNN 这是我们关于“NLP From Scratch”的三个教程中的第二个。 在<cite>第一个教程< / intermediate / char_rnn_classification_tutorial ></cite> 中&#xff0c;我们使用了 RNN 将名称分类为来源语言。 这次&#xff…