FlinkCEP - Flink的复杂事件处理详解

目录

Pattern API

单独模式(Individual Patterns)

量词(Quantifiers)

条件(Conditions)

迭代条件

简单条件

组合条件

停止条件

组合模式(Combining Patterns)

模式组(Groups of patterns)

常用API

begin(#name)

begin(#pattern_sequence)

next(#name)

next(#pattern_sequence)

followedBy(#name)

followedBy(#pattern_sequence)

followedByAny(#name)

followedByAny(#pattern_sequence)

notNext()

notFollowedBy()

within(time)

发现模式(Detecting Patterns)

从模式中选择(selecting from Patterns)

案例


        FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。它允许在无尽的事件流中检测事件模式,使您有机会掌握数据中重要的内容。FlinkCEP类似字符串中,使用正则表达式检测感兴趣的字符串。

        如果您想直接开始,可以设置Flink程序并将FlinkCEP依赖项添加到项目的pom.xml中。Java代码:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>1.17.1</version>
</dependency>

Scala代码:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_2.12</artifactId><version>1.17.1</version>
</dependency>

 2.12为scala版本,1.71.1为flink版本。

        FlinkCEP不是二进制发行版的一部分。

        要对数据流中的事件应用模式匹配,必须实现合适的equals()和hashCode()方法,因为FlinkCEP使用它们来比较和匹配事件。


Pattern API

        模式API允许定义想要从输入流中提取的复杂模式序列。

        每个复杂模式序列(complex pattern sequence)由多个简单模式( simple patterns)组成,所谓简单模式指的是寻找单个事件的模式。从现在开始,我们将这些简单模式称为模式(patterns),而我们在流中搜索的最终的复杂模式序列称为模式序列(pattern sequence)。可以将模式序列看作是这样的模式图:其中根据用户指定的条件从一个模式转换到下一个模式,例如条件event.getName().equals("end")。一个匹配(match)是一个输入事件的序列,它通过一系列有效的模式转换访问复杂模式图的所有模式。

        下面就是一个简单模式API代码:

        Pattern<Event, Event> pattern = Pattern.<Event>begin("a-pattern");

代码中定义了一个模式,该模式名称为a-pattern,模式总是从begin开始,这是一个简单模式。

        下面代码展示了一个复杂模式序列API代码:

        Pattern<Event, Event> pattern = Pattern.<Event>begin("a-pattern").next("b-pattern");

代码中定义了一个复杂模式序列,它由两个简单模式组成。一个是名称为a-pattern的begin模式,另一个是名称为b-pattern的next模式。

单独模式(Individual Patterns)

        实际上,单独模式就是简单模式。这里只是为了和后文组合模式、模式组的概念区分,单独模式表示模式是单独的。一个单独模式(Pattern)可以是单例模式(singleton),也可以是循环模式(looping pattern)。单独模式和循环模式的区别在于:单例模式接受单个事件,而循环模式可以接受多个事件。下面举一个Java中正在表达式的例子。下面是4个模式匹配符号:

a b+ c? d

其含义是:

(1)a表示:一个a,

(2)b+表示:1个或多个b,

(2)c?表示:0个或1个c,

(3)d表示:一个d。

        它们都是简单模式。a, c?,d三者接受单个事件匹配是单例模式,而b+需要接受多个事件匹配是循环模式。默认情况下,模式是单例模式。b+中的+是一个量词(Quantifiers),表示个数,例如1个或多个,2到3个,可以使用量词将单例模式转换为循环模式,例如b+就是单例模式b后面跟了一个量词转换为了循环模式。

        每个模式都可以有一个或多个条件(Conditions),它根据这些条件接受事件。例如上面四个简单模式本身就携带了条件,例如a,它表示接受的事件字符必须是a这个字符,其他字符都将被过滤掉。

        综上所述,单独模式的基本构成:模式id,量词,条件。模式id用于区分不同模式。

        下面介绍FlinkCEP中,怎么指定量词和条件。

量词(Quantifiers)

        在FlinkCEP中,可以使用以下方法指定循环模式:

(1)pattern.oneOrMore(),用于期望给定事件出现一次或多次的模式,类似前面提到的b+,

(2)pattern.times(#ofTimes),用于期望特定类型事件出现特定次数的模式,例如4个a,

(3)pattern.times(#fromTimes,#toTimes),用于期望给定类型事件的特定最小出现次数和最大出现次数的模式,例如2-4,表示出现2到4次。

(4)可以使用pattern.optional()方法将所有模式(循环或非循环)设置为可选的,例如:pattern.times(2, 4).optional()表示pattern.times(2,4)要么出现要么不出现,也就是说pattern要么出现2到4次,要么不会出现。

(5)可以使用pattern.greedy()方法将循环模式设置为贪婪模式,例如有一个字符串"aaab",模式为a+,匹配到a,aa,aaa都算是匹配成功,所谓贪婪模式是尽可能的多匹配,也就是匹配的最终结果为aaa,模式默认是贪婪模式。

        先定义一个简单模式start,

Pattern<Event, Event> start = Pattern.<Event>begin("a-pattern");

Pattern的签名如下:

public class Pattern<T, F extends T>

 T是一个泛型,表示事件的类型,F是T的子类型。可以如下使用量词:

// expecting 4 occurrences
start.times(4);// expecting 0 or 4 occurrences
start.times(4).optional();// expecting 2, 3 or 4 occurrences
start.times(2, 4);// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy();// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional();// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();// expecting 1 or more occurrences
start.oneOrMore();// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy();// expecting 0 or more occurrences
start.oneOrMore().optional();// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy();// expecting 2 or more occurrences
start.timesOrMore(2);// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy();// expecting 0, 2 or more occurrences
start.timesOrMore(2).optional()// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy();

如果仅仅只有量词,没有条件,它将匹配任何一个事件。 例如

Pattern<Event, Event> pattern = Pattern.<Event>begin("a-pattern").oneOrMore();

定义了一个名称为a-pattern的单独模式,量词oneOrMore(),表示a-pattern模式至少出现1次。如果去匹配如下的事件流:

abc

假设每个字符表示一个事件对象Event。最终匹配到如下7个结果:

a,b,c,ab,ac,bc,abc

通常这并不是我们想要的结果。 

条件(Conditions)

        对于每个模式,可以指定一个传入事件必须满足的条件,以便被“接受”到模式中,例如,它的值应该大于5,或者大于先前接受事件的平均值。可以通过如下方法指定事件属性的条件

(1)pattern.where(IterativeCondition<F> condition)

(2)pattern.or(IterativeCondition<F> condition)

(3)pattern.until(IterativeCondition<F> condition)。

        where、or、until方法的参数是迭代条件IterativeCondition,该类是一个抽象类,抽象类SimpleCondition扩展自IterativeCondition,所以参数也可以是简单条件SimpleCondition

迭代条件

        迭代条件是最一般的条件类型。这就是如何根据先前接受的事件的属性或其中一个子集的统计信息指定接受后续事件的条件。

        下面是一个迭代条件的代码,该条件接受名为“a-pattern”的模式的下一个事件,如果其名称为a,并且该模式先前接受的事件的容量加上当前事件的容量的总和不超过5.0。

        Pattern<Event, Event> pattern = Pattern.<Event>begin("a-pattern").where(new IterativeCondition<Event>() {@Overridepublic boolean filter(Event value, Context<Event> ctx) throws Exception {if (!"a".equals(value.getName())) {return false;}double sum = value.getVolume();for (Event event : ctx.getEventsForPattern("a-pattern")) {sum += event.getVolume();}return Double.compare(sum, 5.0) < 0;}})

如下定义事件流: 

package com.leboop.cep;import java.sql.Timestamp;/*** Description TODO.* Date 2024/8/20 8:22** @author leb* @version 2.0*/
public class Event {private Integer id;private String name;private Double volume;private Long timestamp;public Event() {}public Event(Integer id, String name, Double volume) {this.id = id;this.name = name;this.volume = volume;}public Event(Integer id, String name, Double volume, Long timestamp) {this.id = id;this.name = name;this.volume = volume;this.timestamp = timestamp;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Double getVolume() {return volume;}public void setVolume(Double volume) {this.volume = volume;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}public Long getTimestamp() {return timestamp;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;Event event = (Event) o;return name != null ? name.equals(event.name) : event.name == null;}@Overridepublic int hashCode() {return name != null ? name.hashCode() : 0;}@Overridepublic String toString() {return "Event{" +"id=" + id +", name='" + name + '\'' +", volume=" + volume +", timestamp=" + timestamp +'}';}
}
        // 输入流DataStream<Event> inputStream = env.fromElements(new Event(1, "w", 1.0, 1000L),new Event(2, "a", 5.0, 2000L),new Event(3, "b", 1.0, 3000L),new Event(4, "b", 1.0, 4000L),new Event(5, "d", 1.0, 5000L),new Event(6, "f", 1.0, 6000L),new Event(7, "a", 1.0, 7000L),new Event(8, "b", 1.0, 8000L),new Event(9, "c", 1.0, 9000L),new Event(10, "d", 1.0, 10000L),new Event(11, "d", 1.0, 11000L),new Event(12, "f", 1.0, 12000L));

程序运行成功后,将匹配到id为7个事件:

new Event(7, "a", 1.0, 7000L)

因为a-pattern模式匹配到该事件时,先前未接受任何事件,所以sum=1.0。

id为2的事件:

new Event(2, "a", 5.0, 2000L)

不满足当前事件容量和该模式已经接受的事件容量总和小于5。

简单条件

        简单条件扩展了前面提到的IterativeCondition类,并仅基于事件本身的属性来决定是否接受事件。例如:

        Pattern<Event, Event> pattern = Pattern.<Event>begin("a-pattern").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return "a".equals(value.getName());}});

 代码中定义了一个名为a-pattern的模式,通过简单条件SimpleCondition,匹配事件名称为a的事件。

        从简单条件SimpleCondition和迭代条件IterativeCondition的匿名内部类的filter方法可以看到,迭代条件提供了Context参数,可以对先前接受的事件进行操作。

组合条件

        可以通过顺序调用where()来任意组合条件。最终结果将是各个条件的结果的逻辑与(AND)。要使用OR组合条件,可以使用OR()方法,如下所示。 

pattern.where(SimpleCondition.of(value -> ... /*some condition*/)).where(SimpleCondition.of(value -> ... /*some condition*/));
pattern.where(SimpleCondition.of(value -> ... /*some condition*/)).or(SimpleCondition.of(value -> ... /*some condition*/));
停止条件

        在循环模式oneOrMore()和oneOrMore().optional()的情况下,可以指定停止条件(stop condition),例如:接受值大于5的事件,直到值的总和小于50。
为了更好地理解它,请看下面的例子。
(1)类似“(a+ until b)”的模式,一个或多个a直到b,
(2)一个传入事件序列"a1" "c" "a2" "b" "a3",这里a1,a2,a3都表示同一个事件a,只是为了区分,
(3)输出结果:{a1 a2} ,{a1}, {a2}, {a3}。
正如您所看到的,由于停止条件,{a1 a2 a3}或{a2 a3}没有输出。

        指定循环模式的停止条件。这意味着,如果出现与给定条件匹配的事件,则模式将不再接受任何事件。

pattern.oneOrMore().until(new IterativeCondition<Event>() {@Overridepublic boolean filter(Event value, Context ctx) throws Exception {return ...; // alternative condition}
});

        对于单独模式,可以总结如下:

        单独模式可以分为单例模式和循环模式,单例模式接受单个事件,而循环模式可以接受多个事件。单例模式可以通过添加量词变为循环模式。

        单独模式的基本构成:模式id,量词,条件。

组合模式(Combining Patterns)

        既然已经知道了单独模式是什么样子的,那么现在是时候看看如何将它们组合成一个完整的模式序列。

        模式序列必须从一个初始模式开始,如下所示:

Pattern<Event, ?> start = Pattern.<Event>begin("start");

这其实就是上面的简单模式或者单独模式。 

        接下来,可以通过指定模式序列之间所需的邻接条件(contiguity conditions),向模式序列添加更多模式。FlinkCEP支持事件之间的以下邻接形式:

(1)严格邻接(Strict Contiguity):期望所有匹配事件严格地一个接一个地出现,中间没有任何不匹配的事件。
(2)宽松邻接(Relaxed Contiguity):忽略在匹配事件之间出现的非匹配事件。
(3)非确定性宽松邻接(Non-Deterministic Relaxed Contiguity):进一步放宽邻接性,允许忽略一些匹配事件的额外匹配。

要在连续的模式之间应用它们,可以使用:
(1)next()

        适用于严格邻接,例如当前事件名称为a,下一个事件名称为b。

(2)followedBy()

        适用于宽松邻接,例如当前事件名称为a,后面跟谁事件名称为b,事件名称为b的事件不一定是下一个,可能是后面某个位置出现。

(3)followedByAny()

        适用于非确定性宽松邻接。

(1)notNext()

        如果您不希望一个事件类型直接跟随另一个事件类型。例如当前事件名称为a,下一个事件名称不为b。与next()相反。

(2)notFollowedBy()

        如果您不希望某个事件类型介于其他两个事件类型之间。与follwowedBy()相反。

注:如果时间间隔没有通过withIn()定义,则模式序列不能以notFollowedBy()结束。这是因为如果没有通过withIn()方法限制时间间隔,而模式序列以notFollowedBy()结束,则该模式会一直匹配输入的事件流永远不会终止。

        下面展示了如何添加模式生成模式序列:

// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(SimpleCondition.of(event -> "a".equals(event.getName()))
);// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);// NOT pattern with strict contiguity
Pattern<Event, ?> strictNot = start.notNext("not").where(...);// NOT pattern with relaxed contiguity
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

        代码中的模式strict、relaxed、nonDetermin、strictNot、relaxedNot都是由两个简单模式组合而成的模式序列。例如strict模式序列中,start是一个简单模式,next("middle")也是一个简单模式,next方法中middle参数是该简单模式的名称,where()方法指定了邻接条件,例如这里要求下一个事件名称为a。

例如上文提到的Java正则表达式

a b+ c? d

可以使用FlinkCEP Pattern API代码如下定义: 

Pattern<Event, Event> pattern = Pattern.<Event>begin("a-pattern").where(SimpleCondition.of(event -> "a".equals(event.getName()))).next("b-pattern").oneOrMore().where(SimpleCondition.of(event -> "b".equals(event.getName()))).next("c-pattern").optional().where(SimpleCondition.of(event -> "c".equals(event.getName()))).next("d-pattern").where(SimpleCondition.of(event -> "d".equals(event.getName())));

模式组(Groups of patterns)

        还可以将模式序列定义为begin()、followedBy()、followedByAny()和next()的条件。模式序列将被视为逻辑上的匹配条件,并将返回一个GroupPattern,并且可以对GroupPattern应用oneOrMore(), times(#ofTimes), times(#fromTimes, #toTimes), optional(), consecutive(), allowCombinations()。

        模式组和组合模式区别在于,前者是将一个模式嵌套在另一个模式中,而后者是在一个模式后面天剑另一个模式。

Pattern<Event, ?> start = Pattern.begin(Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);// strict contiguity
Pattern<Event, ?> strict = start.next(Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();

常用API

begin(#name)

        定义起始模式。

Pattern<Event, ?> start = Pattern.<Event>begin("start");

begin(#pattern_sequence)

        定义起始模式,传入的是一个模式序列。

Pattern<Event, ?> start = Pattern.<Event>begin(Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);

next(#name)

        追加一个新模式。匹配事件必须直接紧跟前一个匹配事件(严格的连续性)。

Pattern<Event, ?> next = start.next("middle");

next(#pattern_sequence)

        追加一个新模式。匹配事件序列必须直接紧跟前一个匹配事件(严格的连续性)。传入参数是一个模式序列。

Pattern<Event, ?> next = start.next(Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);

followedBy(#name)

        追加一个新模式。在匹配事件和前一个匹配事件之间可以有其他事件(宽松的连续性),与next(#name)相比,followedBy(#name)不要求紧跟着,中间可以出现其他事件。

Pattern<Event, ?> followedBy = start.followedBy("middle");

followedBy(#pattern_sequence)

      追加一个新模式。在匹配事件和前一个匹配事件之间可以有其他事件(宽松的连续性),与next(#name)相比,followedBy(#name)不要求紧跟着,中间可以出现其他事件。传入参数是一个模式序列。

Pattern<Event, ?> followedBy = start.followedBy(Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);

followedByAny(#name)

        追加一个新模式。在匹配事件和前一个匹配事件之间可能发生其他事件,并且将为每个备选匹配事件提供备选匹配(非确定性放松连续性)。例如如下模式:

A followedByAny B

去匹配序列:

A C B1 B2

可以匹配到A B1(C被忽略)和A B2(尽管B1符合条件,但是也能被忽略掉,这是followedByAny和followedBy的不同之处)。如果是A followedBy B仅仅能匹配出A B1。

Pattern<Event, ?> followedByAny = start.followedByAny("middle");

followedByAny(#pattern_sequence)

        与followedByAny(#name)类似,不同的是这里参数传入的是一个模式序列。

Pattern<Event, ?> next = start.followedByAny(Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);

notNext()

        追加一个新的否定模式。匹配(负)事件必须直接紧跟前一个匹配事件(严格连续性),以便丢弃部分匹配。

Pattern<Event, ?> notNext = start.notNext("not");

notFollowedBy()

  追加一个新的否定模式。部分匹配事件序列将被丢弃,即使在匹配(负)事件和前一个匹配事件(宽松的连续性)之间发生其他事件。

Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");

within(time)

  定义与模式匹配的事件序列的最大时间间隔。如果一个未完成的事件序列超过这个时间,它将被丢弃。

pattern.within(Time.seconds(10));

发现模式(Detecting Patterns)

        指定要查找的模式序列之后,就可以将其应用于输入流,以检测潜在的匹配。要根据模式序列运行事件流,必须创建PatternStream。给定一个输入流input,一个模式pattern和一个可选的比较器comparator,比较器用于在EventTime或同一时刻到达的情况下对具有相同时间戳的事件进行排序,可以如下创建PatternStream:

DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // optionalPatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

通过调用CEP类的静态方法pattern创建PatternStream: 

public class CEP {
/*** Creates a {@link PatternStream} from an input data stream and a pattern.** @param input DataStream containing the input events* @param pattern Pattern specification which shall be detected* @param <T> Type of the input events* @return Resulting pattern stream*/public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern){return new PatternStream<>(input, pattern);}......
}

输入流可以是键控的,也可以是非键控的,这取决于应用场景。


从模式中选择(selecting from Patterns)

        一旦获得了PatternStream,就可以对检测到的事件序列应用转换。建议这样做的方法是通过PatternProcessFunction。该方法的源码如下:

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.cep.functions;import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.cep.time.TimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;/*** It is called with a map of detected events which are identified by their names. The names are* defined by the {@link org.apache.flink.cep.pattern.Pattern} specifying the sought-after pattern.* This is the preferred way to process found matches.** <pre>{@code* PatternStream<IN> pattern = ...** DataStream<OUT> result = pattern.process(new MyPatternProcessFunction());* }</pre>** @param <IN> type of incoming elements* @param <OUT> type of produced elements based on found matches*/
@PublicEvolving
public abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {/*** Generates resulting elements given a map of detected pattern events. The events are* identified by their specified names.** <p>{@link PatternProcessFunction.Context#timestamp()} in this case returns the time of the* last element that was assigned to the match, resulting in this partial match being finished.** @param match map containing the found pattern. Events are identified by their names.* @param ctx enables access to time features and emitting results through side outputs* @param out Collector used to output the generated elements* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public abstract void processMatch(final Map<String, List<IN>> match, final Context ctx, final Collector<OUT> out)throws Exception;/*** Gives access to time related characteristics as well as enables emitting elements to side* outputs.*/public interface Context extends TimeContext {/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/<X> void output(final OutputTag<X> outputTag, final X value);}
}

        PatternProcessFunction有一个processMatch方法,每个匹配事件序列都会调用该方法。它以Map<String, List< IN >>的形式接收匹配,其中Map的键是模式序列中每个模式的名称,值是该模式的所有可接受事件的列表,IN是一个泛型,表示输入元素的类型。给定模式的事件按时间戳排序。为每个模式返回可接受事件列表List<IN>的原因是,当使用循环模式,例如oneToMany()和times()时,一个给定模式可以接受多个事件。

        如下代码对创建的PatternStream进行了处理:

        PatternStream<Event> patternStream = CEP.pattern(watermarks, pattern);SingleOutputStreamOperator<String> process = patternStream.process(new PatternProcessFunction<Event, String>() {@Overridepublic void processMatch(Map<String, List<Event>> map,Context context,Collector<String> collector) throws Exception {System.out.println("map的大小" + map.size());for (Map.Entry<String, List<Event>> entry : map.entrySet()) {String key = entry.getKey();List<Event> list = entry.getValue();for (Event event : list) {collector.collect("key=" + key + " | event=" + event);}}}});

首先输出了匹配上的模式的大小map.size(),也就是模式序列中匹配上的简单模式的个数。 之后输出每个模式名称,以及匹配上的事件序列。


案例

        如下定义Event对象:

package com.leboop.cep;/*** Description TODO.* Date 2024/8/20 8:22** @author leb* @version 2.0*/
public class Event {private Integer id;private String name;private Double volume;private Long timestamp;public Event() {}public Event(Integer id, String name, Double volume) {this.id = id;this.name = name;this.volume = volume;}public Event(Integer id, String name, Double volume, Long timestamp) {this.id = id;this.name = name;this.volume = volume;this.timestamp = timestamp;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Double getVolume() {return volume;}public void setVolume(Double volume) {this.volume = volume;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}public Long getTimestamp() {return timestamp;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;Event event = (Event) o;return name != null ? name.equals(event.name) : event.name == null;}@Overridepublic int hashCode() {return name != null ? name.hashCode() : 0;}@Overridepublic String toString() {return "Event{" +"id=" + id +", name='" + name + '\'' +", volume=" + volume +", timestamp=" + timestamp +'}';}
}

Event需要是一个POJO对象,如果模式匹配时使用比较器,还要求Event实现equals和hashCode方法。Event定义了4个属性:

(1)事件的唯一标识id

(2)事件的名称name

(3) 事件的容量volume

(4)事件时间timestamp。

        主方法如下:

package com.leboop.cep;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.util.List;
import java.util.Map;/*** Description TODO.* Date 2024/8/20 8:25** @author leb* @version 2.0*/
public class QuantifierPattern {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 输入流DataStream<Event> inputStream = env.fromElements(new Event(1, "w", 1.0, 1000L),new Event(2, "a", 5.0, 2000L),new Event(3, "b", 1.0, 3000L),new Event(4, "b", 1.0, 4000L),new Event(5, "d", 1.0, 5000L),new Event(6, "f", 1.0, 6000L),new Event(7, "a", 1.0, 7000L),new Event(8, "b", 1.0, 8000L),new Event(9, "c", 1.0, 9000L),new Event(10, "d", 1.0, 10000L),new Event(11, "d", 1.0, 11000L),new Event(12, "f", 1.0, 12000L));inputStream.print("inputStream");SingleOutputStreamOperator<Event> watermarks = inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}}));Pattern<Event, Event> pattern = Pattern.<Event>begin("a-pattern").where(SimpleCondition.of(event -> "a".equals(event.getName()))).next("b-pattern").oneOrMore().where(SimpleCondition.of(event -> "b".equals(event.getName()))).next("c-pattern").optional().where(SimpleCondition.of(event -> "c".equals(event.getName()))).next("d-pattern").where(SimpleCondition.of(event -> "d".equals(event.getName()))).within(Time.seconds(5));PatternStream<Event> patternStream = CEP.pattern(watermarks, pattern);SingleOutputStreamOperator<String> process = patternStream.process(new PatternProcessFunction<Event, String>() {@Overridepublic void processMatch(Map<String, List<Event>> map,Context context,Collector<String> collector) throws Exception {System.out.println("map的大小" + map.size());for (Map.Entry<String, List<Event>> entry : map.entrySet()) {String key = entry.getKey();List<Event> list = entry.getValue();for (Event event : list) {collector.collect("key=" + key + " | event=" + event);}}}});process.print("process");env.execute();}
}

代码中

(1)定义了流式执行环境env;

(2)并行度设置为1,为了演示方便;

(3)env.fromElements模拟了12个组成的事件流;

(4)assignTimestampsAndWatermarks方法为从事件流中提取了事件时间,并分配了有序流的watermark;

程序运行成功后,输出结果如下:

inputStream:8> Event{id=1, name='w', volume=1.0, timestamp=1000}
inputStream:5> Event{id=6, name='f', volume=1.0, timestamp=6000}
inputStream:6> Event{id=7, name='a', volume=1.0, timestamp=7000}
inputStream:4> Event{id=5, name='d', volume=1.0, timestamp=5000}
inputStream:3> Event{id=4, name='b', volume=1.0, timestamp=4000}
inputStream:3> Event{id=12, name='f', volume=1.0, timestamp=12000}
inputStream:7> Event{id=8, name='b', volume=1.0, timestamp=8000}
inputStream:8> Event{id=9, name='c', volume=1.0, timestamp=9000}
inputStream:2> Event{id=3, name='b', volume=1.0, timestamp=3000}
inputStream:1> Event{id=2, name='a', volume=5.0, timestamp=2000}
inputStream:2> Event{id=11, name='d', volume=1.0, timestamp=11000}
inputStream:1> Event{id=10, name='d', volume=1.0, timestamp=10000}
map的大小3
map的大小4
process:4> key=a-pattern | event=Event{id=2, name='a', volume=5.0, timestamp=2000}
process:5> key=b-pattern | event=Event{id=3, name='b', volume=1.0, timestamp=3000}
process:8> key=a-pattern | event=Event{id=7, name='a', volume=1.0, timestamp=7000}
process:1> key=b-pattern | event=Event{id=8, name='b', volume=1.0, timestamp=8000}
process:3> key=d-pattern | event=Event{id=10, name='d', volume=1.0, timestamp=10000}
process:2> key=c-pattern | event=Event{id=9, name='c', volume=1.0, timestamp=9000}
process:6> key=b-pattern | event=Event{id=4, name='b', volume=1.0, timestamp=4000}
process:7> key=d-pattern | event=Event{id=5, name='d', volume=1.0, timestamp=5000}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/52369.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网上商城|基于SprinBoot+vue的分布式架构网上商城系统(源码+数据库+文档)

分布式架构网上商城系统 目录 基于SprinBootvue的分布式架构网上商城系统 一、前言 二、系统设计 三、系统功能设计 5.1系统功能模块 5.2管理员功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍…

微信小程序flex-grow无效

在微信小程序中使用 Flexbox 布局时&#xff0c;如果遇到 flex-grow 属性无效的情况&#xff0c;可能是由几个不同的原因导致的。以下是一些可能的原因和解决方案&#xff1a; 1. 检查 Flex 容器 确保你的父元素&#xff08;即 Flex 容器&#xff09;已经正确设置了 display:…

每期一个小窍门 golang中 CGO_ENABLED 含义

转载 CGO_ENABLED是一个环境变量&#xff0c;用于控制Go编译器是否启用CGO&#xff08;C语言调用Go函数&#xff09;功能。当CGO_ENABLED0时&#xff0c;表示禁用CGO功能。 编译时使用CGO_ENABLED0会导致编译速度较慢的原因有以下几点&#xff1a; 编译器需要额外的时间来检…

时间继电器和定时器

一、概述 1.时间继电器是可以在设定的定时周期内或周期后闭合或断开触点的元器件。 2.时间继电器上可设定的定时周期数量有限&#xff0c;多为一个或两个。定时时长从0.02s至300h(根据产品型号范围不同)。 3.定时器可以理解为一台钟表&#xff0c;它在某个时间点上闭合(断开…

PostgreSQL11 | 事务处理与并发控制

PostgreSQL11 | 事务处理与并发控制 本文章代码已在pgsql11.22版本上运行且通过&#xff0c;展示页由pgAdmin8.4版本提供&#xff0c;本文章第一次采用md文档&#xff0c;效果比csdn官方富文本编辑器好用&#xff0c;以后的文章都将采用md文档 事务管理简介 事物是pgsql中的…

三种相机模型总结(针孔、鱼眼、全景)

相机标定 文章目录 相机标定前言 前言 我们最常见的投影模型Perspective Projection Model描述的就是针孔相机的成像原理。从上面的图根据相似三角形可以得出 参考链接 https://zhuanlan.zhihu.com/p/540969207 相机标定之张正友标定法数学原理详解&#xff08;含python源码&a…

上线eleme项目

&#xff08;一&#xff09;搭建主从从数据库 主服务器master 首先下载mysql57安装包&#xff0c;然后解压 复制改目录到/usr/local底下并且改个名字 cp -r mysql-5.7.44-linux-glibc2.12-x86_64 /usr/local/mysql 删掉/etc/my.cnf 这个会影响mysql57的启动 rm -rf /etc…

解读vue3源码-响应式篇3 effect副作用函数

提示&#xff1a;看到我 请让我滚去学习 文章目录 前言effect问题拓展分支切换与 cleanup嵌套的 effect 与 effect 栈解决在副作用函数中同时读取和操作同一属性时无限循环 effect函数实现computed-api 实现图解在这里插入图片描述 总结 前言 什么是副作用函数&#xff1f; 在…

SCYC 56901传感器SCYC 56901模块面价

SCYC 56901传感器SCYC 56901模块面价 SCYC 56901传感器SCYC 56901模块面价 SCYC 56901传感器SCYC 56901模块面价 SCYC 56901传感器SCYC 56901模块引脚线 SCYC 56901传感器SCYC 56901模块说明书 SCYC 56901传感器SCYC 56901模块电路图 SCYC 56901温度传感器是早开发&#…

iPhone 手机使用技巧:iPhone 数据恢复软件

无论是由于意外删除、系统崩溃还是软件更新&#xff0c;丢失 iPhone 上的数据都是一场噩梦。从珍贵的照片到重要的工作文件&#xff0c;这种损失可能会让人感到毁灭性。值得庆幸的是&#xff0c;几个 iPhone 数据恢复软件选项可以帮助您找回丢失的文件。这些工具提供不同的功能…

神经网络——非线性激活

1 非线性激活 1.1 几种常见的非线性激活&#xff1a; ReLU (Rectified Linear Unit)线性整流函数 Sigmoid 1.2代码实战&#xff1a; 1.2.1 ReLU import torch from torch import nn from torch.nn import ReLUinputtorch.tensor([[1,-0.5],[-1,3]])inputtorch.reshape(…

刷题DAY18

排序 题目&#xff1a;输入10个大小不同的整数&#xff0c;将它们从小到大排序后输出&#xff0c;并给出现每个元素在原来序列中的位置 输入&#xff1a;输入数据有一行&#xff0c;包含10个整数&#xff0c;用空格分开 输出&#xff1a;输出数据有两行&#xff0c;第一行为…

【计算机网络】名词解释--网络专有名词详解

在网络通信中&#xff0c;有许多专业术语和概念&#xff0c;它们共同构成了网络通信的基础。以下是一些常见的网络术语及其定义和相互之间的关系&#xff1a; 一、网络基础 1.1 电路交换&#xff1a;电路交换是一种在数据传输前建立专用通信路径的通信方式。在通信开始前&…

HTML+CSS+JavaScript制作动态七夕表白网页(含音乐+自定义文字)

源码介绍 这篇博客就享下前端代码如何实现HTMLCSSJavaScript制作七夕表白网页(含音乐自定义文字)。记事本打开源码文件可以进行内容文字之类的修改&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面&#xff0c;重定向这个界面 源码效果 源码下载…

Django后端架构开发:缓存机制,接口缓存、文件缓存、数据库缓存与Memcached缓存

深入探讨Django后端架构中的缓存机制&#xff1a;接口缓存、文件缓存、数据库缓存与Memcached缓存 目录 &#x1f31f; 缓存接口数据的实现✨ Django文件缓存的应用⚡ 关系型数据库缓存的策略&#x1f4a0; Memcached缓存的配置与优化 &#x1f31f; 缓存接口数据的实现 在D…

前端开发工程师面试整理-ES6+的新特性

ES6(ECMAScript 2015)及后续版本引入了许多新特性,极大地增强了JavaScript的功能和开发体验。以下是一些主要的新特性: 变量声明 1. let 和 const: ● let 声明块作用域变量。 ● const 声明常量,值不能重新赋值。 ● 示例:

从行为面试问题(behavioral questions)看中美程序员差异。

中美程序员在职场中的工作状态和职能、福利等有很大区别&#xff0c;从面试中的BQ轮就可见一斑。 中美程序员的面试轮差异&#xff1f; 国内的面试轮在不同公司间差异很大&#xff0c;但总体的问题类型包含笔试面试&#xff08;算法题、概念题、项目深挖、职业目标、职场文化…

混合现实UI优化:利用物理环境的直接交互

随着虚拟现实(VR)和混合现实(MR)技术的发展,用户界面(UI)的设计变得越来越重要,尤其是在需要适应多种物理环境的情况下。本文将介绍一种名为 InteractionAdapt 的用户界面优化方法,它专为VR环境中的工作空间适配而设计,能够有效利用物理环境,为用户提供更加灵活和个…

【仿真与实物设计】基于51单片机设计的打地鼠游戏机——程序源码原理图proteus仿真图PCB设计文档演示视频元件清单等(文末工程资料下载)

基于51单片机设计的打地鼠游戏机 演示视频: 基于51单片机设计的打地鼠游戏机 功能描述:使用 51单片机为核心制作一个打地鼠游戏机。按下启动开关,8盏LED流水点亮并闪烁2次,随即开始播放游戏音乐,直到开始选择模式。选择的模式在数码管上显示,该游戏机共有两个模式,分别…

layui table表单 checkbox选中一个其它也要选中

当我们选中其中一个商品的时候同类型的商品状态也要跟着改变 所以要在表单加载完成后去监听checkbox ,done:function (res) {console.log(详情表格数据,res)tableDetailList res.data;// 监听表格复选框选择table.on(checkbox( INST_SELECTORS.instLayFilters.unpaidTableDe…