探索Flink动态CEP:杭州银行的实战案例

本文撰写自杭州银行大数据工程师唐占峰、欧阳武林老师。将介绍 Flink 动态 CEP的定义与核心概念、应用场景、并深入探讨其技术实现并介绍使用方式。主要分为以下几个内容:

  1. Flink动态CEP简介

  2. Flink动态CEP的应用场景

  3. Flink动态CEP的技术实现

  4. Flink动态CEP的使用方式

  5. 杭州银行应用实践

Tips:点击「阅读原文」跳转阿里云实时计算 Flink~

金融行业大数据技术正在进入成熟期,数据的实时性在金融的实时监控和分析交易数据以识别洗钱行为、欺诈行为、和确保合规性是至关重要的。随着业务环境的快速变化,传统的静态规则引擎已经无法满足这些需求,因为它们在规则变更时需要重启服务,这会导致服务中断和延迟响应。我们引入由 Flink 发展过来的 Flink 动态 CEP 作为行内的动态规则引擎,它能够在不中断服务的情况下动态更新规则,适应不断变化的业务需求。

CEP 是复杂事件处理 Complex Event Processing 的缩写,而 Flink CEP 则是基于 Flink 实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。Flink 动态 CEP,作为 Flink CEP 的高级功能,进一步扩展了这一能力,它支持在不重启服务的情况下动态更新规则。这种动态性不仅提高了系统的灵活性和响应速度,还大大降低了维护成本和复杂性。

01

Flink 动态 CEP 简介

1. Flink 动态 CEP 的定义和核心概念

Flink 动态 CEP 是 Apache Flink 流处理框架的一个高级功能,它允许通过DataStream(数据流)作业方式运行支持动态规则更新的 Flink CEP 作业,对 数据流进行动态的捕获、清洗和分析。Flink 动态 CEP 做到了基于 Flink 全托管快速构建动态加载最新规则来处理上游的数据流,让用户有机会实时掌握数据中重要的高阶特征。

关键概念

①pattern(模式):模式是规则,也是定义规则的方式。一个模式可以是单例或者循环模式,单例只接受一个事件,循环模式可以接受多个事件。用户可以使用pattern 来识别匹配到的事件。多个 pattern 可以组成复杂模式,我们把由多个pattern 组成的复杂模式序列称为 patternProcessor(模式处理器)。

②事件流:事件流可以来自异构上游,可以是 kafka 数据,也可以是数据库表数据(如交易流水类的实时事件流)。当 Flink 动态 CEP 作业启动后,遇到实际输入事件流,Flink 会尝试识别定义的 patternProcessor 并进行动态匹配,最终得到匹配结果。

③动态匹配:Flink 动态 CEP 会实时识别事件流变化,并不断发送给下游算子,下游算子接收到发送的事件进行解析和反序列化后生成真正使用的 patternProcessor,根据最新的 patternProcessor 定义的规则进行动态匹配。

2. Flink 动态 CEP 解决的问题

Flink CEP 是一种规则引擎,是通过设置规则模式来匹配事件的。而频繁变化的交易、记账场景要求我们对初始规则进行调整或者对规则进行新增。例如一个 CEP 作业初始规则是转账用户在一分钟内连续进行3次转账后将其认为是风险操作。而在特殊场景,预期转账次数会多一点,一分钟3次的转账次数阈值可能不合适,在当前开源 Flink CEP 实现下,没法做到使用户无感的转换,只能重新编写 Java 代码,然后重启作业,以使最新的规则生效。这样的操作带来时间成本较高和重启作业代价高的问题。因为要走一遍完整的代码开发和打包上线流程对于对时间延迟敏感程度高的银行风控领域是难以接受的,且规则引擎里通常会维护很多不同的规则,如果简单的规则修改都需要较长的时间窗口,会影响其他人的使用,维护起来也比较困难。Flink 动态 CEP 很好的降低了传统规则引擎较高的时间成本并做到无需重启作业就能丝滑更新规则,以下是 Flink 动态 CEP 解决的主要问题:

①动态规则更新:传统规则引擎在规则变更时需要重新部署和启动作业,这会导致服务中断,影响系统的实时性和可用性。而 Flink 动态 CEP 允许在不中断服务的情况下动态加载和更新 CEP 规则,这意味着可以在运行时修改模式匹配逻辑,而无需重启整个 Flink 作业。

②多规则支持:在静态场景下使用多条规则时,传统 Flink CEP 需要创建多个 CepOperator(CEP算子),这会导致数据的额外拷贝,增加处理开销。Flink动态 CEP 支持在一个 Operator(算子)中处理多条规则,减少了数据拷贝,提高了处理效率。

③参数化 Condition 支持:Flink 动态 CEP 支持在 Json 格式规则描述中定义参数化的 Condition,提高了自定义 Condition 的拓展性,解决了动态添加新的 Condition 类实现的需求。

02

Flink 动态 CEP 的应用场景

Flink 动态 CEP 就像是一个智能监控系统,它不仅能在线识别风险行为(比如洗钱或欺诈),还能为实时营销助力,为业务赋能。和金融领域相关的应用场景如下:

1. 反洗钱

Flink 动态 CEP 可以监控银行账户的交易活动,识别出类似洗钱的行为。例如,可以设置规则来识别短时间内频繁的大额存款和取款行为,或者识别出与洗钱交易相关的账户之间的资金流动,从而触发反洗钱调查。也可以结合大数据技术和机器学习技术构建洗钱风险监测模型,更准确地识别可疑交易和潜在洗钱风险客户。还可以运用 Flink 动态 CEP 的流式计算技术实时分析处理客户的全链路交易信息,结合知识图谱、实时智能等技术,构建起全行级别反洗钱领域客户关系网络图,深入融合可疑交易特征,动态完整展现资金流转全貌。

2. 反欺诈

国内电信网络诈骗非常的猖獗,金融领域的反欺诈系统对电信网络诈骗案件能起到非常关键的作用,能及时阻断欺诈案件中的资金流动减少用户资金损失。反欺诈系统对系统本身分布式、实时性、规则灵活、复杂规则匹配能力要求非常高,而 Flink 动态 CEP 在 Flink 的分布式、实时性的特性基础上,增加复杂规则匹配和规则动态配置能力,为反欺诈系统提供一种很好的解决方案。

3. 实时营销

在金融客户申请信用卡的时候,客户通常需要完成填充基本信息、个人身份信息认证等多个步骤完成信用卡的申请。用户在多步骤申请信用卡的过程中,有可能会因为各种原因在其中的任意一个环节退出、失败或超时。针对这种情况,利用客户行为日志作为数据源,Flink 动态 CEP 可以利用多种规则对各个环节客户的行为数据做规则匹配、计算。并可以根据输出结果做多种营销策略的输出,如推送客户优惠券、推送消息给客户经理及时联系客户来提高营销效率,为业务赋能。

03

Flink 动态 CEP 的技术实现

根据以上背景并基于阿里在社区提出的 FLIP-200 方案,ververica-cep 开源demo,数据架构研发团队在部门内实现了一版 Flink 动态 CEP 的支持。下面详细介绍我们是如何实现的。

在 Flink 动态 CEP 中我们复用了 Flink 的 OperatorCoordinator(算子协调器)机制,用它来负责协调FLink作业中的各个 operator(算子)。OperatorCoordinator 在 JobManager 中运行,会给 TaskManager 的 Operator 发送事件,我们实现的 DynamicCEPOperatorCoordinator(动态 CEP 算子协调器)是 OperatorCoordinator 的实现类,它是 JobManager 中运行的线程,负责调用 PatternProcessorDiscoverer(模式处理器探查器)接口拿到最新的 PatternProcessor。Flink 动态 CEP 的整体架构图如下所示:

28e9f5eb864f2dda2f6a927c58f8f682.png

上图展示的是从数据库中读取序列化后的 PatternProcessor 的过程。可以看到OperatorCoordinator 会调用 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,拿到后它会发送给和它关联的DynamicCEPOp(动态cep算子)。DynamicCEPOp 接收到发送的事件进行解析和反序列化后,最终生成要使用的 PatternProcessor 并构造相应的NFA(非确定有限状态机)。之后即可使用新构造的NFA来处理上游发生的事件,并最终输出到下游。基于这样的方式,可以做到不停机的规则更新,且只有 OperatorCoordinator 和规则数据库交互,可以减少对数据库的访问,并利用Flink 的特性保证下游 sub_task 中使用规则的一致性。

了解了 Flink 动态CEP获取规则的流程,接下来要构建FlinkCEP作业,最重要的方法,就是构建 CEP.dynamicPatterns(),阿里云实时计算 Flink 版已经定义了CEP.dynamicPatterns()Api,该 API 定义代码如下:

public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(DataStream<T> input,PatternProcessorDiscovererFactory<T> patternProcessorDiscovererFactory,TimeBehaviour timeBehaviour,TypeInformation<R> outTypeInfo)

该方法入参说明如下:

 参数  说明  
 DataStream<T>  input  输入事件流  
PatternProcessorDiscovererFactory<T>  patternProcessorDiscovererFactory  工厂对象,负责构造一个探查器(PatternProcessorDiscoverer),探查器负责获取最新规则,即构造一个PatternProcessor接口  
 TimeBehaviour  TimeBehaviour  描述FlinkCEP作业如何处理事件的时间属性。参数取值如下:ProcessingTime:代表按照ProcessingTime处理事件  EventTime:代表按照Event Time处理事件  
 TypeInformation<R>  OutTypeInfo  描述输出流的类型信息  

dynamicPatterns() 方法中 input、OutTypeInfo 分别定义输入和输出流,TimeBehaviour 定义时间属性,这里不需要多做介绍,PatternProcessorDiscovererFactory<T>接口负责构造探查器 PatternProcessorDiscoverer 以拿到最新 PatternProcessor,在实现Flink动态CEP功能中起到关键作用,故本文着重对 patternProcessor、 PatternProcessorDiscoverer 两个接口及其实现类和负责拿到 PatternProcessor 并发送给下游算子的 DynamicCEPOperatorCoordinator 的代码进行详细。

1. patternProcessor接口及其实现

public  interface PatternProcessor<IN> extends Serializable, Versioned{String getId();default Long getTimestamp(){return Long.MIN_VALUE;}Pattern<IN,?> getPattern(ClassLoader classLoader);PatternProcessFunction<IN,?> getPatternProcessFunction();
}

PatternProcessor 接口用于完整定义CEP中的一条规则。一个PatternProcessor 实现类包含一个确定的模式(Pattern)用于描述如何去匹配事件、一个 PatternProcessFunction 用于描述怎么处理一个匹配事件。除此之外还包括id和 version(可选)等用于标识 PatternProcessFunction 的信息属性。因此一个PatternProcessor既包含规则本身,也指明了规则触发时,Flink 作业如何做出响应。

@PublicEvolving
public class DefaultPatternProcessor<T> implements PatternProcessor<T> {/** The ID of the pattern processor. */private final String id;/** The version of the pattern processor. */private final Integer version;/** The pattern of the pattern processor. */private final String patternStr;private final @Nullable PatternProcessFunction<T, ?> patternProcessFunction;public DefaultPatternProcessor(final String id,final Integer version,final String pattern,final @Nullable PatternProcessFunction<T, ?> patternProcessFunction,final ClassLoader userCodeClassLoader) {this.id = checkNotNull(id);this.version = checkNotNull(version);this.patternStr = checkNotNull(pattern);this.patternProcessFunction = patternProcessFunction;}@Overridepublic String toString() {return "DefaultPatternProcessor{"+ "id='"+ id+ '\''+ ", version="+ version+ ", pattern="+ patternStr+ ", patternProcessFunction="+ patternProcessFunction+ '}';}@Overridepublic String getId() {return id;}@Overridepublic int getVersion() {return version;}@Overridepublic Pattern<T, ?> getPattern(ClassLoader classLoader) {try {return (Pattern<T, ?>) CepJsonUtils.convertJSONStringToPattern(patternStr, classLoader);} catch (Exception e) {throw new RuntimeException(e);}}@Overridepublic PatternProcessFunction<T,?> getPatternProcessFunction(){return patternProcessFunction;}
}

DefaultPatternProcessor 类是 PatternProcessor 的默认实现,它接收 id, version, pattern 字符串, PatternProcessFunction 和 ClassLoader 作为参数。并使用 checkNotNull 确保除了 patternProcessFunction 外的参数不为 null。它的 getPattern 方法中包括转换json字符串到CEP能识别的 pattern 的方法 convertJSONStringToPattern(),我们重写了 convertJSONStringToPattern() 方法,接受入参为我们指定的 classloader (类加载器)如下所示:

public static Pattern<?, ?> convertJSONStringToPattern(String jsonString, ClassLoader userCodeClassLoader) throws Exception {if (userCodeClassLoader == null) {LOG.warn("The given userCodeClassLoader is null. Will try to use ContextClassLoader of current thread.");return convertJSONStringToPattern(jsonString);}GraphSpec deserializedGraphSpec = objectMapper.readValue(jsonString, GraphSpec.class);return deserializedGraphSpec.toPattern(userCodeClassLoader);
}

它的核心方法 toPattern() 涉及到 GraphSpec 类和方法本身,GraphSpec 类是Flink 中用于描述 Pattern 序列化和反序列化的工具,它用于处理由节点 (Nodes) 和边 (Edges) 组成的图形结构。这里的节点可以是单独的 Pattern 或者是嵌套的 GraphSpec,边则定义了节点之间的关系和数据流的方向,这和数据库中存储的规则Dag紧密相关,这里不做过多解释,具体来看 toPattern() 方法的实现:

public Pattern<?, ?> toPattern(final ClassLoader classLoader) throws Exception {// Construct cache of nodes and edges for later usefinal Map<String, NodeSpec> nodeCache = new HashMap<>();for (NodeSpec node : nodes) {nodeCache.put(node.getName(), node);}final Map<String, EdgeSpec> edgeCache = new HashMap<>();for (EdgeSpec edgeSpec : edges) {edgeCache.put(edgeSpec.getSource(), edgeSpec);}String currentNodeName = findBeginPatternName();Pattern<?, ?> prevPattern = null;String prevNodeName = null;while (currentNodeName != null) {NodeSpec currentNodeSpec = nodeCache.get(currentNodeName);EdgeSpec edgeToCurrentNode = edgeCache.get(prevNodeName);Pattern<?, ?> currentPattern =currentNodeSpec.toPattern(prevPattern,afterMatchStrategy.toAfterMatchSkipStrategy(),prevNodeName == null? ConsumingStrategy.STRICT: edgeToCurrentNode.getType(),classLoader);if (currentNodeSpec instanceof GraphSpec) {ConsumingStrategy strategy =prevNodeName == null? ConsumingStrategy.STRICT: edgeToCurrentNode.getType();prevPattern =buildGroupPattern(strategy, currentPattern, prevPattern, prevNodeName == null);} else {prevPattern = currentPattern;}prevNodeName = currentNodeName;currentNodeName =edgeCache.get(currentNodeName) == null? null: edgeCache.get(currentNodeName).getTarget();}// Add window semanticsif (window != null && prevPattern != null) {prevPattern.within(this.window.getTime(), this.window.getType());}return prevPattern;
}

toPattern方法是 GraphSpec 类中的核心方法之一,它负责将 GraphSpec 对象序列化信息反序列化回 Pattern 对象。它的内部逻辑包含几个步骤:

①构建节点和边缓存:创建 nodeCache 和 edgeCache 映射,分别存储NodeSpec和 EdgeSpec 实例。这有助于在后续处理中快速查找和使用节点和边的信息

②确定开始节点:初始化 currentNodeName 变量,它表示当前处理的节点名称。这个值通过调用 findBeginPatternName() 方法获得,该方法确保从图中的开始节点开始处理。

③构建 Pattern 迭代:

使用循环迭代所有节点,从开始节点开始,根据边的信息向前构建模式。在每次迭代中:从 nodeCache 获取当前节点的 NodeSpec。从 edgeCache 获取从上一个节点到当前节点的 EdgeSpec(如果存在)。使用 NodeSpec 和 EdgeSpec 构建或更新当前的 Pattern。这涉及到根据消耗策略(ConsumingStrategy)来使用不同的 Pattern 方法,如 Pattern.begin(), Pattern.next(),Pattern.followedBy(), 或 Pattern.followedByAny()。最后更新 prevPattern 和 prevNodeName 为下一个迭代做准备。最终返回构建完成的Pattern对象。

以上详细介绍了 patternProcessor 接口实现和其中的关键方法,描述了可用的Pattern 构建过程。下面介绍 PatternProcessorDiscoverer 接口及其实现。

2. PatternProcessorDiscoverer接口及其实现  

public abstract interface PatternProcessorDiscoverer<T> extends Closeable
{public abstract void discoverPatternProcessorUpdates(PatternProcessorManager<T> paramPatternProcessorManager);
}

PatternProcessorDiscoverer 接口用于描述如何发现 Processor。

我们基于阿里云默认周期性扫描外部存储的抽象类periodicPatternProcessorDiscoverer,提供了一个用于从支持 JDBC 协议的数据库中拉取最新规则的实现:JDBCPeriodicPatternProcessorDiscoverer

public class JDBCPeriodicPatternProcessorDiscoverer<T>extends PeriodicPatternProcessorDiscoverer<T> {private static final Logger LOG =LoggerFactory.getLogger(JDBCPeriodicPatternProcessorDiscoverer.class);private final String tableName;private final String userName;private final String password;private final String jdbcUrl;private final String tenant;private final List<PatternProcessor<T>> initialPatternProcessors;private final ClassLoader userCodeClassLoader;private Connection connection;private Statement statement;private ResultSet resultSet;private Map<String, Tuple4<String, Integer, String, String>> latestPatternProcessors = new ConcurrentHashMap<>();/*** Creates a new using the given initial {@link PatternProcessor} and the time interval how* often to check the pattern processor updates.> *** @param jdbcUrl The JDBC url of the database.> * @param jdbcDriver The JDBC driver of the database.> * @param initialPatternProcessors The list of the initial {@link PatternProcessor}.> * @param intervalMillis Time interval in milliseconds how often to check updates.>*/public JDBCPeriodicPatternProcessorDiscoverer(final String jdbcUrl,final String jdbcDriver,final String tableName,final String userName,final String password,@Nullable final String tenant,final ClassLoader userCodeClassLoader,@Nullable final List<PatternProcessor<T>> initialPatternProcessors,@Nullable final Long intervalMillis)throws Exception {super(intervalMillis);this.tableName = requireNonNull(tableName);this.initialPatternProcessors = initialPatternProcessors;this.userCodeClassLoader = userCodeClassLoader;this.userName = userName;this.password = password;this.jdbcUrl = jdbcUrl;this.tenant = tenant;Class.forName(requireNonNull(jdbcDriver));this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);this.statement = this.connection.createStatement();}

JDBCPeriodicPatternProcessorDiscoverer 包括两个关键方法 arePatternProcessorsUpdated() 和 getLatestPatternProcessors(),分别用于判断 PatternProcessors 是否被更新和获取最新的 PatternProcessors。

@Override
public boolean arePatternProcessorsUpdated() throws SQLException {if (latestPatternProcessors == null&& !CollectionUtil.isNullOrEmpty(initialPatternProcessors)) {return true;}LOG.info("Start check is pattern processor updated.");if (statement == null) {try {this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);this.statement = this.connection.createStatement();} catch (SQLException e) {LOG.error("Connect to database error!", e);throw e;}}try {String sql = buildQuerySql();LOG.info("Statement execute sql is {}", sql);resultSet = statement.executeQuery(sql);Map<String, Tuple4<String, Integer, String, String>> currentPatternProcessors = new ConcurrentHashMap<>();while (resultSet.next()) {LOG.debug("check getLatestPatternProcessors start :{}", resultSet.getString(1));String id = resultSet.getString("id");if (currentPatternProcessors.containsKey(id)&& currentPatternProcessors.get(id).f1 >= resultSet.getInt("version")) {continue;}currentPatternProcessors.put(id,new Tuple4<>(requireNonNull(resultSet.getString("id")),resultSet.getInt("version"),requireNonNull(resultSet.getString("pattern")),resultSet.getString("function")));}if (latestPatternProcessors == null|| isPatternProcessorUpdated(currentPatternProcessors)) {LOG.debug("latest pattern processors size is {}", currentPatternProcessors.size());latestPatternProcessors = currentPatternProcessors;return true;} else {return false;}} catch (SQLException e) {LOG.error("Pattern processor discoverer failed to check rule changes, will recreate connection.", e);try {statement.close();connection.close();connection = DriverManager.getConnection(requireNonNull(this.jdbcUrl), this.userName, this.password);statement = connection.createStatement();} catch (SQLException ex) {LOG.error("Connect pattern processor discovery database error.", ex);throw new RuntimeException("Cannot recreate connection to database.");}}return false;
}

arePatternProcessorsUpdated() 用于检查数据库中存储的模式处理器是否发生了更新,它首先会检查是否有尚未处理的初始模式处理器列表(initialPatternProcessors),如果存在未被处理的 PatternProcessor,则直接返回true。接着建立数据库连接,调用 buildQuerySql() 来执行 sql,用于从 tableName 指定的表中获取所有或特定租户 (tenant) 的模式处理器信息。然后处理sql的执行结果,对每一个 currentPatternProcessors,检查是否已存在或版本是否更旧。如果存在更旧的版本则跳过,否则更新 currentPatternProcessors 映射。如果 latestPatternProcessors 为空或存在更新,则用 currentPatternProcessors 更新 latestPatternProcessors,并返回 true,表示有更新。

@Override
public List<PatternProcessor<T>> getLatestPatternProcessors() throws Exception {LOG.debug("Start convert pattern processors to default pattern processor.");return latestPatternProcessors.values().stream().map(patternProcessor -> {try {String patternStr = patternProcessor.f2;GraphSpec graphSpec =CepJsonUtils.convertJSONStringToGraphSpec(patternStr);LOG.debug("Latest pattern processor is {}",CepJsonUtils.convertGraphSpecToJSONString(graphSpec));PatternProcessFunction<T, ?> patternProcessFunction = null;String id = patternProcessor.f0;int version = patternProcessor.f1;if (!StringUtils.isNullOrWhitespaceOnly(patternProcessor.f3)) {patternProcessFunction =(PatternProcessFunction<T, ?>)this.userCodeClassLoader.loadClass(patternProcessor.f3).getConstructor(String.class, int.class, String.class).newInstance(id, version, tenant);}return new DefaultPatternProcessor<>(patternProcessor.f0,patternProcessor.f1,patternStr,patternProcessFunction,this.userCodeClassLoader);} catch (Exception e) {LOG.error("Get the latest pattern processors of the discoverer failure. - ", e);e.printStackTrace();}return null;}).filter(pre -> pre != null).collect(Collectors.toList());
}

getLatestPatternProcessors() 方法涉及从数据库获取最新 PatternProcessors的过程,利用 StreamAPI,将存储在 ConcurrentHashMap 中的模式处理器信息转换为 PatternProcessor 列表。这里涉及到实例化的过程:根据模式处理器信息中的类名(patternProcessor.f3),通过类加载器加载并实例化自定义的 PatternProcessFunction。如果类名不为空或非空字符串,将其转换为对应的 Java 类,并调用构造函数,传入处理器的 id、version 和租户 tenant 信息。使用上述信息,创建一个 DefaultPatternProcessor 实例,封装模式字符串、自定义的处理器函数、类加载器等信息,最后返回一个PatternProcessor 列表,其中包含了从数据库中获取的所有模式处理器的最新实例。这些实例可以被 Flink 的 CEP 功能直接使用,以处理复杂事件模式匹配。

3. PatternProcessorDiscoverer接口及其实现

接下来介绍 DynamicCepOperatorCoordinator(动态CEP算子协调器),它承担着调用上文 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,并发送给和它关联的 DynamicCEPOp 的任务如下所示:

public class DynamicCepOperatorCoordinator<T> implements OperatorCoordinator {private static final Logger LOG =LoggerFactory.getLogger(DynamicCepOperatorCoordinator.class);private final DynamicCepOperatorCoordinatorContext cepCoordinatorContext;private final PatternProcessorDiscovererFactory discovererFactory;private final String operatorName;private boolean started;private volatile boolean closed;public DynamicCepOperatorCoordinator(String operatorName, PatternProcessorDiscovererFactory discovererFactory, DynamicCepOperatorCoordinatorContext context) {this.cepCoordinatorContext = context;this.discovererFactory = discovererFactory;this.operatorName = operatorName;this.started = false;this.closed = false;}@Overridepublic void start() throws Exception {Preconditions.checkState(!started, "Dynamic Cep Operator Coordinator Started!");LOG.info("Starting Coordinator for {}:{}", this.getClass().getSimpleName(), operatorName);cepCoordinatorContext.runInCoordinatorThreadWithFixedRate(()->{if (discovererFactory instanceof PeriodicPatternProcessorDiscovererFactory) {try {PeriodicPatternProcessorDiscoverer patternProcessorDiscoverer =(PeriodicPatternProcessorDiscoverer) discovererFactory.createPatternProcessorDiscoverer(cepCoordinatorContext.getUserCodeClassloader());boolean updated = patternProcessorDiscoverer.arePatternProcessorsUpdated();if (updated && started) {Set<Integer> subtasks = cepCoordinatorContext.getSubtasks();if (!patternProcessorDiscoverer.getLatestPatternProcessors().isEmpty()) {UpdatePatternProcessorEvent updatePatternProcessorEvent =new UpdatePatternProcessorEvent(patternProcessorDiscoverer.getLatestPatternProcessors());subtasks.forEach(subtaskId -> {cepCoordinatorContext.sendEventToOperator(subtaskId, updatePatternProcessorEvent);});}}} catch (Exception e) {LOG.error("Starting Coordinator failed", e);}}});started = true;}@Overridepublic void close() throws Exception {closed = true;cepCoordinatorContext.close();}@Overridepublic void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {LOG.info("Received event {} from operator {}.", event, subtask);}@Overridepublic void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
//        cepCoordinatorContext.runInCoordinatorThread(() -> {LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);try {resultFuture.complete("Dynamic cep".getBytes(StandardCharsets.UTF_8));} catch (Throwable e) {ExceptionUtils.rethrowIfFatalErrorOrOOM(e);resultFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint for dynamic cep %s",operatorName),e));}}@Overridepublic void notifyCheckpointComplete(long checkpointId) {}@Overridepublic void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {}@Overridepublic void subtaskReset(int subtask, long checkpointId) {}@Overridepublic void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {cepCoordinatorContext.subtaskNotReady(subtask);}@Overridepublic void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {cepCoordinatorContext.subtaskReady(gateway);}
}

下面只介绍它的关键方法start(),用于负责初始化和激活协调器的运行流程:

start() 方法调用 cepCoordinatorContext.runInCoordinatorThreadWithFixedRate 来安排一个周期性执行的任务。这个方法将在框架的协调器线程中执行一个 lambda 表达式定义的任务,定期检查模式处理器更新。在这里我们定义的时间是10s,也就是每10s检查和执行一次 patternProcessors 的更新逻辑。然后构建UpdatePatternProcessorEvent,由 cepCoordinatorContext 来广播它给下游算子。需要注意的是,DynamicCepOperatorCoordinator 是 jobmanager 运行的线程,和 taskmanager 中 PatternProcessor 的产生过程是异步的。

04

Flink 动态 CEP 的使用方式

本章介绍如何编写 Flink 动态 CEP 作业,具体操作流程如下(以Kafka源为例):

1. 连接数据源(数据源也可以是来自数据库,配置不同的连接器即可)

public static void main(String[] args) throws Exception {
// Set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Classloader initialfinal ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// Process args
// Build Kafka source with new Source API based on FLIP-27Properties prop =new Properties();prop.setProperty("security.protocol","SASL_PLAINTEXT");prop.setProperty("sasl.mechanism","SCRAM-SHA-256");prop.setProperty("sasl.jaas.config","org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule" +" required username=\"100670\" password=\"000000000\";");KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder().setBootstrapServers("123.4.50.105:9292,123.4.60.106:9292,123.4.50.107:9292").setTopics("cep_test1").setGroupId("test").setStartingOffsets(OffsetsInitializer.earliest()).setProperties(prop).setValueOnlyDeserializer((new KafkaJsonDeserializer())).build();env.setParallelism(1);DataStream<Event> input = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "source");
// keyBy userId and productionId
// Notes, only events with the same key will be processd to see if there is a matchKeyedStream<Event, Tuple2<String, String>> keyedStream =input.keyBy(new KeySelector<Event, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(Event value) throws Exception {return Tuple2.of(value.getName(), value.getName());}});

①初始化执行环境

②Kafka 源配置,并将事件流 Event 根据 name 字段进行 keyby

2. 构建动态规则匹配

long time = 1000;
SingleOutputStreamOperator<String> output = CEP.dynamicPatterns(keyedStream,new JDBCPeriodicPatternProcessorDiscovererFactory<>("jdbc:mysql//123.45.6.789:3306/cep_demo_db","com.mysql.cj.jdbc.Driver","rds_demo","riskcollateral","riskcollateral",null,null,timer),TimeBehaviour.ProcessingTime,TypeInformation.of(new TypeHint<String>()){}));output.addSink(new PrintSinkFunction<>().name("cep"));env.excute("CEPDemo");}
}

3. 构建并运行

我们使用 Streampark 作为 Flink 作业的运维管控平台,根据以下步骤创建 Flink jar 包作业:

①添加jar包资源:

02d301498115fbbfeeec74fa21e3996f.png

②添加作业:

847d2819a9aa21892bbf84ecdff6015a.png

③添加作业相关配置:

84b4e0476974b0a66d1bd52c59707623.png

④发布及启动作业:

7789d032824da2123783cafcbe1eb030.png

847d666775508007dc6a175e6c1cd249.png

4. 插入规则

①建表 rds_demo 用于存储 cep 规则:

22fd317ca9cdda7e247d18f23ccbb261.png

②插入动态更新规则:

将表示 Pattern 的 JSON 字符串与 id、version、function 类名一起插入 rds_demo 表中(阿里云实时计算Flink版定义了一套 JSON 格式的规则描述,详情请参加阿里云文档——动态 CEP 中规则的 JSON 格式定义):

 id   version   pattern   function  
 1   1   {"name":"end","quantifier":{"consumingStrategy}...   xxxpackage.dynamic.cep.core.DemoPatternProcessFunction    

将 pattern 的 JSON 字符串解析后,展示如下:

{"name": "end","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"],"times": null,"untilCondition": null},"condition": null,"nodes": [{"name": "end","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"],"times": null,"untilCondition": null},"condition": {"className": "xxxpackage.dynamic.cep.core.EndCondition","type": "CLASS"},"type": "ATOMIC"},{"name": "start","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["LOOPING"],"times": null,"untilCondition": null},"type": "ATOMIC"}],"edges": [{"source": "start","target": "end","type": "SKIP_TILL_NEXT"}],"window": null,"afterMatchStrategy": {"type": "SKIP_PAST_LAST_EVENT","patternName": null},"type": "COMPOSITE","version": 1
}

这段 JSON 规则描述了一个复合模式 (COMPOSITE),它由两个原子节点(ATOMIC)组成:“start”和“end”。

这个模式目的是匹配一个特定的事件序列,其中“start”节点匹配 action 等于0的输入事件,而“end”节点匹配“xxxpackage.dynamic.cep.core.EndCondition”这个类定义的事件,这个条件由开发者定义,例如:

public class EndCondition extends SimpleCondition<Event> {@Overridepublic boolean filter(Event value) throws Exception {return value.getAction() != 1;}
}

这个 EndCondition 用于检查事件的 action 属性是否不等于1.如果事件的 action 属性不等于1,那么 filter 方法将返回 true,表示事件满足 end 节点的条件。

结合起来,这个模式的匹配的事件序列满足:“start”节点匹配所有 action 等于0的事件,一旦遇到一个 action 不等于1的事件,“end”节点的条件被满足,整个模式匹配完成。

function 字段用 DemoPatternProcessFunction 类的全路径加类名指定,记录了匹配到记录以后的处理方法如下:

public class DemoPatternProcessFunction<IN> extends PatternProcessFunction<IN, String> {String id;int version;String tenant;public DemoPatternProcessFunction(String id, int version, String tenant) {this.id = id;this.version = version;this.tenant = tenant;}@Overridepublic void processMatch(final Map<String, List<IN>> match, final Context ctx, final Collector<String> out) {StringBuilder sb = new StringBuilder();sb.append("A match for Pattern of (id, version): (").append(id).append(", ").append(version).append(") is found. The event sequence: ").append("\n");for (Map.Entry<String, List<IN>> entry : match.entrySet()) {sb.append(entry.getKey()).append(": ").append(entry.getValue().get(0).toString()).append("\n");}out.collect(sb.toString());}
}

这个处理方法是如果 PatternProcessor 匹配到一个事件序列,processMatch 方法将生成对应的描述性字符串,并由下游算子通过 Collector 将其输出。

5. 输入事件流

假如有一个事件序列如下:

private static void sendEvents(Producer<String, String> producer, String topic) {ObjectMapper objectMapper = new ObjectMapper();Event[] events = {new Event("ken", 1, 1, 0, 1662022777000L),new Event("ken", 2, 1, 0, 1662022778000L),new Event("ken", 3, 1, 1, 1662022779000L),new Event("ken", 4, 1, 2, 1662022780000L),new Event("ken", 5, 1, 1, 1662022780000L)};while (true) {try {for (Event event : events) {String json = objectMapper.writeValueAsString(event);ProducerRecord<String, String> record = new ProducerRecord<>(topic, json);producer.send(record, (metadata, exception) -> {if (exception != null) {LOG.error("Failed to send data to Kafka: ", exception);} else {System.out.println(metadata.topic());LOG.info("Data sent successfully to topic {} at offset {}",metadata.topic(), metadata.offset());}});}} catch (Exception e) {LOG.error("Error while sending events to Kafka: ", e);}}
}

我们往 Kafka Topic 插入 events,我们将会观察到 “start” 节点会匹配前两个事件,因为它们的 action 属性为0。第四个事件 action 不等于1,因此“end”节点的条件被满足,模式匹配完成。第五个事件不会影响已经完成的模式匹配。

05

杭州银行应用实践

杭州银行在我们开发的 Flink 动态 CEP 规则引擎下,也有实际的业务场景落地和应用,如事件中心-行为序列事件模块。

事件中心是以用户行为埋点数据作为数据源,对他们进行处理和分析,并输出结果辅助业务决策的平台。其中行为序列事件模块应用了行内开发的 Flink 动态 CEP 技术。事件中心-行为序列事件模块如下:

新增一个行为序列事件,填好基础信息后,用户可在行为序列配置里可以新增事件或事件组,并配置事件过期时间。

0ace84bfe155477ed215aba89f5dea5c.jpeg

一个行为序列事件模板如下:

1648494f3010dd37113ac3fed0387d19.jpeg

如下图所示,1-5原子事件表示某用户的埋点行为序列,作为 Flink 动态 CEP 的输入流 event 按照埋点顺序进入动态规则匹配,而匹配的规则是事件过期时间,这里为 20分钟。例如某输入流在 20分钟内还未完成全部五个原子事件,而只完成到事件4,这样则视为模式匹配完成,匹配到的事件为事件1到事件4,可以通过配置输出流输出自定义的规则匹配结果(如用户名字、错误原因、用户手机号码等)到 kafka、rocketMQ 等消息队列。如此,就能给业务更有价值的数据支持,做针对性的用户推荐。

4c95a1b51b522890b9410c925b9caf6b.jpeg

Flink 动态 CEP 在事件中心实践中的优势体现在,修改或新增规则或事件序列,完全无需启停服务,只需直接编辑并保存。web 端修改会同步修改数据库中保存的规则,然后选择上线,动态规则转换就完成了。

55ffdc53daa7c37dc01d4bd4c8da4e72.jpeg

【参考文献】

[1]阿里云开发者社区.(2023−02−10).Flink CEP 新特性进展与在实时风控场景的落地.阿里云开发者社区.https://developer.aliyun.com/article/1157197

[2]阿里云帮助中心. (2023-11-07). Flink 动态 CEP 快速入门_实时计算 Flink版(Flink). 阿里云帮助中心. https://help.aliyun.com/zh/flink/getting-started/getting-started-with-dynamic-flink-cep

[3]Apache Flink. (2022-09-16). FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP). Apache Flink. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308

[4]Apache Flink. (v1.15.4). FlinkCEP-Flink的复杂事件处理 . https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/libs/cep/

[5]https://github.com/RealtimeCompute/ververica-cep-demohttps://github.com/RealtimeCompute/ververica-cep-demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/65146.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

打造高效租赁小程序让交易更便捷

内容概要 在如今节奏飞快的商业世界里&#xff0c;租赁小程序如同一只聪明的小狐狸&#xff0c;迅速突围而出&#xff0c;成为商家与消费者之间的桥梁。它不仅简化了交易流程&#xff0c;还在某种程度上将传统租赁模式带入了互联网时代。越来越多的企业意识到&#xff0c;这种…

【MinIO系列】MinIO Client (mc) 完全指南

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Jmeter录制https请求

jmeter 5.5版本&#xff0c;chrome浏览器 1、首先添加Test Plan-Thread Group-HTTP(S) Test Script Recorder 2、设置HTTP(S) Test Script Recorder界面的Port&#xff08;监听端口&#xff0c;设置浏览器代理时需要与这里保持一致&#xff09;、HTPS Domains&#xff08;录制…

前端最新Vue2+Vue3基础入门到实战项目全套教程,自学前端vue就选黑马程序员,一套全通关!

Vue 快速上手 Vue概念 Vue 是一个用于构建用户界面的渐进式框架 构建用户界面&#xff1a;基于数据渲染出用户看到的页面 渐进式&#xff1a;循序渐进 框架&#xff1a;一套完整的项目解决方案 Vue 的两种使用方式: ① Vue 核心包开发 场景:局部 模块改造 ② Vue 核心包 &am…

基于Spring Boot的高校请假管理系统

一、系统背景与意义 随着高校规模的扩大和学生数量的增加&#xff0c;传统的请假管理方式已经难以满足高校管理的需求。人工请假流程繁琐、耗时长&#xff0c;且容易出现信息错误或遗漏。因此&#xff0c;开发一套基于Spring Boot的高校请假管理系统具有重要意义&#xff0c;它…

Gate.io 平台通证 GT:持续赋能与销毁、财富效应显著

在瞬息万变的加密市场中&#xff0c;每一轮牛熊转换都在加速 CEX 市场的一轮又一轮洗牌&#xff0c;这也使得该赛道的格局始终处于动态的变化。而在本轮牛市中&#xff0c;CEX 赛道也正在从最初的三大领衔变成了多强角逐&#xff0c;而 Gate.io 作为创立 11 余年的老牌交易平台…

WebRTC音视频同步原理与实现详解(下)

WebRTC音视频同步原理与实现详解&#xff08;上&#xff09; 第四章、音视频同步实现详解 4.1 音视频同步标准 音视频做到什么程度才算是同步呢&#xff1f; 关于音画同步, 业界有3个标准&#xff1a; 1&#xff09;ITU-R BT.1359&#xff08;1998&#xff09;&#xff1a…

1.系统学习-线性回归

系统学习-线性回归 前言线性回归介绍误差函数梯度下降梯度下降示例 回归问题常见的评价函数1. MAE, mean absolutely error2. MSE, mean squared error3. R square &#xff08;决定系数或R方&#xff09; 机器学习建模流程模型正则化拓展阅读作业 链接: 2.系统学习-逻辑回归 …

Oracle 日常巡检

1. 检查服务器状态 1.1. CPU使用情况 1.1.1. top top 命令是 Linux 和 Unix 系统中用于显示实时系统状态的工具&#xff0c;特别是对于监控 CPU 和内存的使用非常有用。 在命令行中输入 top&#xff0c;top 会显示一个实时更新的界面&#xff0c;其中包含系统的关键指标&am…

熊军出席ACDU·中国行南京站,详解SQL管理之道

12月21日&#xff0c;2024 ACDU中国行在南京圆满收官&#xff0c;本次活动分为三个篇章——回顾历史、立足当下、展望未来&#xff0c;为线上线下与会观众呈现了一场跨越时空的技术盛宴&#xff0c;吸引了众多业内人士的关注。云和恩墨副总经理熊军出席此次活动并发表了主题演讲…

如何在网页端使用 IDE 高效地阅读 GitHub 源码?

如何在网页端使用 IDE 高效地阅读 GitHub 源码&#xff1f; 前言什么是 GitHub1s&#xff1f;使用 GitHub1s 阅读 browser-use 项目源码步骤 1: 打开 GitHub 项目页面步骤 2: 修改 URL 使用 GitHub1s步骤 3: 浏览文件结构步骤 4: 使用代码高亮和智能补全功能步骤 5: 快速跳转和…

3D布展平台主要有哪些功能?有什么特点?

3D布展平台是一种利用3D技术和虚拟现实&#xff08;VR&#xff09;技术&#xff0c;为用户提供线上虚拟展览和展示服务的平台。这些平台通常允许用户创建、设计和发布3D虚拟展厅&#xff0c;从而提供沉浸式的展览体验。以下是对3D布展平台的详细介绍&#xff1a; 一、主要功能 …

TowardsDataScience 博客中文翻译 2018~2024(一百二十三)

TowardsDataScience 博客中文翻译 2018~2024&#xff08;一百二十三&#xff09; 引言 从 2018 年到 2024 年&#xff0c;数据科学的进展超越了许多技术领域的速度。Towards Data Science 博客依然是这个领域的关键平台&#xff0c;记录了从基础工具到前沿技术的多方面发展。…

Docker Build 命令详解:在 Ubuntu 上构建 Docker 镜像教程

简介 Docker 通过提供轻量级、可移植和高效的解决方案&#xff0c;彻底改变了软件开发和部署。docker build 命令是 Docker 镜像创建过程的核心。本文将探讨 docker build 命令、其语法、用法以及优化 Docker 构建的最佳实践。本教程的目标是手把手教你如何在 Linux 服务器上使…

Springboot应用开发:配置类整理

目录 编写目的 一、线程池 1.1 setCorePoolSize 1.2 setMaxPoolSize 1.3 setQueueCapacity 1.4 setKeepAliveSeconds 1.5 setThreadNamePrefix 1.6 setRejectedExecutionHandler 1.7 示例代码 二、Durid数据库连接池 2.1 ServletRegistrationBean 2.2 FilterRegist…

【Spring】深入解析 Spring 原理:Bean 的多方面剖析(源码阅读)

&#x1f525;个人主页&#xff1a; 中草药 &#x1f525;专栏&#xff1a;【Java】登神长阶 史诗般的Java成神之路 一、Bean的作用域 在 Java Spring 框架中&#xff0c;Bean 的作用域是一个关键概念&#xff0c;它决定了 Bean 的生命周期和实例化方式&#xff0c;对应用的性…

Excel 列名称转换问题 Swift 解答

文章目录 摘要描述题解答案Swift 实现代码&#xff1a;题解代码分析示例测试及结果 时间复杂度空间复杂度总结未来展望参考资料 摘要 本篇文章将通过 Swift 编程语言解答一个常见的算法问题&#xff1a;给定一个整数 columnNumber&#xff0c;将其转换为 Excel 表中的列名称。…

基于艾伦方差的频率稳定性分析

某个授时系统通过串口或网口采集时间间隔计数器、频率计数器、相位噪声分析仪设备的重要信息,用于评估和分析频率源的频率稳定度,确保测量的准确性和可靠性。 数据处理: 读取保存在文件中的时间间隔计数器测量的时差数据,计算时间稳定度(用TDEV表示)并保存。TDEV包括秒稳…

秒鲨后端之MyBatis【1】环境的搭建和核心配置文件详解

​ 别忘了请点个赞收藏关注支持一下博主喵&#xff01;&#xff01;&#xff01;! ! ! Mybatis简介 MyBatis历史 MyBatis最初是Apache的一个开源项目iBatis, 2010年6月这个项目由Apache Software Foundation迁移到了Google Code。随着开发团队转投Google Code旗下&#xff…

虚幻引擎结构之ULevel

在虚幻引擎中&#xff0c;场景的组织和管理是通过子关卡&#xff08;Sublevel&#xff09;来实现的。这种设计不仅提高了资源管理的灵活性&#xff0c;还优化了游戏性能&#xff0c;特别是在处理大型复杂场景时。 1. 场景划分模式 虚幻引擎采用基于子关卡的场景划分模式。每个…