Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、将匹配流中的数据处理成结果数据流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }// 针对2.2 非连续3次登录失败final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失败private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/657924.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Keepalived + DR 集群

目录 1、Keepalive VRRP 说明 故障切换 工作原理 核心组件 2、Keepalived DR 集群 拓扑规划 前期准备 配置 Httpd 服务 配置 Nginx 服务 配置 LVS 主 node_01 配置 LVS 从 node_02 测试 LVS 集群 测试主备切换 3、Keepalived 脑裂现象 4、Keepalived 心态检测 …

平安健康与中航健康时尚集团携手并进,共创会员制健康管理美好未来

近日&#xff0c;深圳市中航健康时尚集团股份有限公司(以下简称“中航健康时尚”)与平安健康正式达成战略合作。平安健康总裁吴军、中航健康时尚董事长王岚等领导出席签约仪式&#xff0c;就此次战略合作展开深入交流。 据了解&#xff0c;中航健康时尚集团创建于1995年&#x…

内存泄漏的原因及排查方法

&#x1f9d1;‍&#x1f393; 个人主页&#xff1a;《爱蹦跶的大A阿》 &#x1f525;当前正在更新专栏&#xff1a;《VUE》 、《JavaScript保姆级教程》、《krpano》、《krpano中文文档》 ​ ​ ✨ 前言 随着网页应用的逐渐复杂化,内存管理也变得越来越重要。内存泄漏不仅会…

YUDIAN(宇电)温控器参数笔记(二)

没想到啊&#xff0c;时隔3年&#xff0c;我又用到了这个温控器&#xff0c;又来更新一下&#xff0c;因为我刚好要做一个简易的控温系统&#xff0c;类似于恒温水槽。 这个系统大概就是&#xff1a; 温控器用pt100测温&#xff0c;作为输入&#xff0c;输入连接到一个ssr上&a…

Start gtkmm 4 Programming (range controls)_

文章目录 基础解析 Chapter 7. Range Widgets https://gtkmm.org/en/documentation.htmlhttps://gnome.pages.gitlab.gnome.org/gtkmm-documentation/index.html 基础 容器: 容器小部件与其他小部件一样&#xff0c;派生自Gtk::Widget.例如Gtk::Grid可以容纳许多子小部件&…

vue3教程,如何手动获取后端数据(入门到精通3,新人必学篇)

概述&#xff1a;没有后端数据的前端&#xff0c;就失去了灵魂&#xff0c;由于本人没有写后端数据&#xff0c;所有调用黑马的&#xff0c;往下看相信对你会有收获的。 目录 第一步&#xff1a;安装axios 第二步&#xff1a;编写后端访问地址 第三步&#xff1a;编写具体的…

如何更新github上fork的项目(需要一定git基础)

如何更新Fork的项目(需要一定git基础) 前言&#xff1a;本文记录一下自己在github上fork了大佬的开源博客项目https://github.com/tangly1024/NotionNext&#xff0c;如何使用git克隆以及自定义开发和同步合并原项目更新迭代内容的的步骤 如何更新fork的项目(进阶版) 首先你…

解决:ModuleNotFoundError: No module named ‘selenium’

解决&#xff1a;ModuleNotFoundError: No module named ‘selenium’ 文章目录 解决&#xff1a;ModuleNotFoundError: No module named selenium背景报错问题报错翻译报错位置代码报错原因解决方法方法一&#xff0c;直接安装方法二&#xff0c;手动下载安装方法三&#xff0…

数字图像处理(实践篇)三十七 OpenCV-Python 使用SIFT和BFmatcher对两个输入图像的关键点进行匹配实践

目录 一 涉及的函数 二 实践 三 报错处理 使用SIFT(尺度不变特征变换)算法

幻兽帕鲁服务器Palworld游戏怎么更新?

自建幻兽帕鲁服务器进入Palworld游戏提示“您正尝试加入的比赛正在运行不兼容的游戏版本&#xff0c;请尝试升级游戏版本”什么原因&#xff1f;这是由于你的客户端和幻兽帕鲁服务器版本不匹配&#xff0c;如何解决&#xff1f;更新幻兽帕鲁服务器即可解决。阿里云百科aliyunba…

git操作之本地代码修改后想回退成当前最新版本

这张图很关键&#xff0c;取自https://www.cnblogs.com/cblx/p/12467083.html 我们的vscode就是workspace&#xff0c;我们提交代码需要三步&#xff0c;add&#xff0c;commit&#xff0c;push&#xff0c;其中我们想拉取代码有两种方式&#xff0c;git pull或者git fetch/cl…

《【Python】如何设置现代 Python 日志记录 | Python 基础教程 | Python 冷知识 | 十分钟高手系列》学习笔记

《【Python】如何设置现代 Python 日志记录 | Python 基础》 2 PUT ALL HANDLERS/FILTERS ON THE ROOT&#xff1a;扁平化的设计有助于简化维护成本 5 STORE CONFIG IN JSON OR YAML FILE&#xff1a;使用配置文件可以将配置和代码解耦&#xff0c;减少代码量 日志设置示例 7 …

ubuntu 22安装配置并好安全加固后,普通用户一直登录不上

现象 ubuntu 22安装配置并好安全加固后&#xff0c;普通用户一直登录不上 排查报错 查看日志/var/log/auth.log发现报错 Jan 30 15:49:57 aiv-O-E-M sshd[62570]: PAM unable to dlopen(pam_tally2.so): /lib/security/pam_tally2.so: cannot open shared object file: No …

管理的四种风格

前言 管理的四种风格,一般的领导大概就是这几种管理模式,告知,辅导,参与,授权,还有就是乱搞式(神经病模式)。 一、告知式 告知式是指组织通过正式、明确的渠道,将信息传达给员工。这种方式通常用于传递基本的规章制度、工作流程、政策文件等。告知式的作用在于确保员…

体验 AutoGen Studio - 微软推出的友好多智能体协作框架

体验 AutoGen Studio - 微软推出的友好多智能体协作框架 - 知乎 最近分别体验了CrewAI、MetaGPT v0.6、Autogen Studio&#xff0c;了解了AI Agent 相关的知识。 它们的区别 可能有人要问&#xff1a;AutoGen我知道&#xff0c;那Autogen Studio是什么&#xff1f; https://g…

pandas绘制饼图:百分比、定制标签、关闭图例、支持中文

matplotlib绘制饼图 import matplotlib.pyplot as pltplt.rc(font, family=SimHei, size=13) size = [25, 15

C++_list

目录 一、模拟实现list 1、list的基本结构 2、迭代器封装 2.1 正向迭代器 2.2 反向迭代器 3、指定位置插入 4、指定位置删除 5、结语 前言&#xff1a; list是STL(标准模板库)中的八大容器之一&#xff0c;而STL属于C标准库的一部分&#xff0c;因此在C中可以直接使用…

npm 淘宝镜像正式到期

由于node安装插件是从国外服务器下载&#xff0c;如果没有“特殊手法”&#xff0c;就可能会遇到下载速度慢、或其它异常问题。 所以如果npm的服务器在中国就好了&#xff0c;于是我们乐于分享的淘宝团队干了这事。你可以用此只读的淘宝服务代替官方版本&#xff0c;且同步频率…

AsyncLocal是如何实现在Thread直接传值的?

一&#xff1a;背景 1. 讲故事 这个问题的由来是在.NET高级调试训练营第十期分享ThreadStatic底层玩法的时候&#xff0c;有朋友提出了AsyncLocal是如何实现的&#xff0c;虽然做了口头上的表述&#xff0c;但总还是会不具体&#xff0c;所以觉得有必要用文字图表的方式来系统…

百度智能小程序开发平台:SEO关键词推广优化 带完整的搭建教程

移动互联网的普及&#xff0c;小程序成为了众多企业和开发者关注的焦点。百度智能小程序开发平台为开发者提供了一站式的解决方案&#xff0c;帮助企业快速搭建并推广自己的小程序。本文将重点介绍百度智能小程序开发平台的SEO关键词推广优化功能&#xff0c;并带完整的搭建教程…