Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)

Flink实现异常登陆监控(两秒内多次登陆失败进行异常行为标记)

在大数据处理领域,Apache Flink 是一个流行的开源流处理框架,能够高效处理实时数据流。在这篇博客中,我们将展示如何使用 Apache Flink 从 MySQL 中读取数据并进行实时异常监控处理,最终将结果写回到 MySQL 数据库中的err_login表中。

项目概述

我们的示例程序将会执行以下任务:

从 MySQL 数据库读取用户登录数据。
过滤出特定状态的登录记录。
对这些记录进行时间窗口处理。
将处理结果写回 MySQL 数据库。

依赖环境

在开始之前,请确保你已经安装了以下环境:
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>EastMoney</artifactId><version>1.0-SNAPSHOT</version><repositories><repository><id>central</id><name>Maven Central Repository</name><url>https://repo.maven.apache.org/maven2</url></repository></repositories><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.14.6</version></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.6</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies></project>

MySQL 数据库

CREATE TABLE `login_detail` (`id` int NOT NULL AUTO_INCREMENT,`username` varchar(255) DEFAULT NULL,`password` varchar(255) DEFAULT NULL,`time` varchar(255) DEFAULT NULL,`status` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=127 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci CREATE TABLE `err_login` (`id` int NOT NULL AUTO_INCREMENT,`username` varchar(255) DEFAULT NULL,`status` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=74 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci 

1. 数据模型定义

首先,我们定义了一个简单的 User case class,用于表示从 MySQL 中读取的用户数据。

case class User(id: Int, username: String, password: String, time: String, status: Int)

2.自定义 MySQL 数据源

我们实现了一个自定义的 RichSourceFunction,从 MySQL 数据库中读取数据。该函数会不断地查询数据库,并将新数据发送到 Flink 流中。

class MySQLInsertSource(jdbcUrl: String, username: String, password: String, tableName: String) extends RichSourceFunction[User] {@volatile private var isRunning = trueprivate var connection: Connection = _private var lastMaxTime: String = _override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {super.open(parameters)connection = DriverManager.getConnection(jdbcUrl, username, password)// Initial loadval statement = connection.createStatement()val resultSet = statement.executeQuery(s"SELECT * FROM $tableName")while (resultSet.next()) {val user = User(resultSet.getInt("id"),resultSet.getString("username"),resultSet.getString("password"),resultSet.getString("time"),resultSet.getInt("status"))// Update lastMaxTimeif (lastMaxTime == null || user.time > lastMaxTime) {lastMaxTime = user.time}}}override def run(ctx: SourceFunction.SourceContext[User]): Unit = {val statement = connection.createStatement()while (isRunning) {val query = s"SELECT * FROM $tableName WHERE time > '$lastMaxTime'"val resultSet = statement.executeQuery(query)while (resultSet.next()) {val user = User(resultSet.getInt("id"),resultSet.getString("username"),resultSet.getString("password"),resultSet.getString("time"),resultSet.getInt("status"))ctx.collect(user)// Update lastMaxTimeif (user.time > lastMaxTime) {lastMaxTime = user.time}}Thread.sleep(2000) // sleep for 2 seconds}}override def cancel(): Unit = {isRunning = falseif (connection != null) {connection.close()}}
}

变量声明:
isRunning: 用于控制数据源是否继续运行。
connection: 用于连接 MySQL 数据库的 Connection 对象。
lastMaxTime: 记录上次读取数据的最大时间戳,用于增量查询。
open 方法:在数据源启动时初始化数据库连接并进行初始加载,读取全部数据,更新 lastMaxTime。
run 方法:在数据源运行时不断查询数据库,获取新数据并发送到 Flink 流中。每隔2秒执行一次查询,并更新 lastMaxTime。
cancel 方法:在数据源取消时关闭数据库连接。

3. 时间戳分配器和水位线

为了确保事件按时间顺序处理,我们为数据流分配时间戳并生成水位线。

val userStreamWithTimestamps = userStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[User](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[User] {override def extractTimestamp(element: User, recordTimestamp: Long): Long = {val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val date = format.parse(element.time)date.getTime}}))

WatermarkStrategy:定义了水位线生成策略。forBoundedOutOfOrderness 表示允许事件在1秒的乱序范围内到达。

SerializableTimestampAssigner:定义了时间戳提取器,从 User 对象的 time 字段提取时间戳。

4. 数据过滤和窗口处理

我们过滤出 status 为 0 的记录,并对这些记录进行2秒的窗口处理。

val filteredStream = userStreamWithTimestamps.filter(_.status == 0)val windowedStream = filteredStream.keyBy(_.username).timeWindow(Time.seconds(2)).process(new WriteToDatabaseFunction(jdbcUrl, username, password))

过滤:filter 操作保留 status 为 0 的记录。(0为登陆失败)
窗口处理:对每个 username 进行2秒的时间窗口处理,并使用自定义的 WriteToDatabaseFunction 进行处理。

5. 窗口处理函数

我们实现了一个 ProcessWindowFunction,在窗口结束时将获取到的异常登陆用户写入 MySQL 数据库。

class WriteToDatabaseFunction(url: String, username: String, password: String) extends ProcessWindowFunction[User, String, String, TimeWindow] {val insertSql = "INSERT INTO err_login (username, status) VALUES (?, ?)"override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {val allStatusOne = elements.forall(_.status == 0)if (allStatusOne) {out.collect(s"Username: $key had status 1 for 2 seconds")val connection = DriverManager.getConnection(url, username, password)val preparedStatement = connection.prepareStatement(insertSql)try {for (user <- elements) {preparedStatement.setString(1, user.username)preparedStatement.setInt(2, user.status)preparedStatement.addBatch()}preparedStatement.executeBatch()} finally {preparedStatement.close()connection.close()}}}
}

变量声明:insertSql 为插入错误登录记录的 SQL 语句。
process 方法:
检查窗口内的所有记录 status 是否都为 0。
如果是,打印日志并将记录写入 err_login 表中。
使用批量插入提高效率。

6. 主函数

最后,我们将所有部分组装在一起,并执行 Flink 作业。

object FlinkMySQLExample {val jdbcUrl = "jdbc:mysql://localhost:3306/big_data"val username = "root"val password = "12345678"val tableName = "login_detail"def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval mySQLSource = new MySQLInsertSource(jdbcUrl, username, password, tableName)val userStream = env.addSource(mySQLSource)userStream.print()val userStreamWithTimestamps = userStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[User](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[User] {override def extractTimestamp(element: User, recordTimestamp: Long): Long = {val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val date = format.parse(element.time)date.getTime}}))val filteredStream = userStreamWithTimestamps.filter(_.status == 0)val windowedStream = filteredStream.keyBy(_.username).timeWindow(Time.seconds(2)).process(new WriteToDatabaseFunction(jdbcUrl, username, password))windowedStream.print()env.execute("Flink MySQL Example")}
}

主函数:
获取 Flink 的执行环境。
添加自定义数据源 MySQLInsertSource,从 MySQL 数据库中读取数据。
将数据流赋予时间戳和水位线。
过滤出 status 为 0 的记录。
对过滤后的记录进行2秒的窗口处理,并将结果写入 MySQL 数据库。
执行 Flink 作业。
在这里插入图片描述
在这里插入图片描述

7.总结

这段代码展示了如何使用 Apache Flink 处理实时数据流,并与 MySQL 数据库进行交互。通过自定义数据源、时间戳和水位线分配、窗口处理和自定义窗口函数,我们可以构建强大的流处理应用程序。

如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于MapReduce, MySQL, python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/19901.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

科研数据分析常见问题

许多使用SPSSAU进行初次科研数据分析的同学&#xff0c;可能对数据分析方法的深层原理和研究思路缺乏全面的把握。因此&#xff0c;当导师针对数据研究方法提出具体问题时&#xff0c;他们可能会感到些许困惑或难以立即给出满意的答复。鉴于此&#xff0c;SPSSAU汇总了一些常见…

【工具】创客贴会员|创客贴截止2024年6月所有AI功能效果实测(热门推荐和图片编辑部分)

上一篇&#xff1a;【工具】创客贴会员&#xff5c;万字测评&#xff01;前沿设计网站创客贴的 AI 文生图效果测评 上一篇写的时候只测了文生图&#xff0c;因为百度那边活动没和创客贴接洽好&#xff0c;他们不清楚创客贴的AI和其他会员功能分开了&#xff0c;导致只有10次体…

virtualbox中ubuntu22.04网络配置

第一&#xff1a;添加两个网卡&#xff0c;网卡1是NAT方式&#xff0c;网卡2是仅主机模式&#xff08;两个顺序不能颠倒&#xff09; 第二步&#xff1a;启动ifconfig查看网络

搭载昇腾310NPU的Orange Pi AIpro开箱体验以及深度学习样例测试

Orange Pi AIpro开箱体验以及样例测试 随着人工智能和物联网技术的快速发展&#xff0c;单板计算机&#xff08;Single Board Computer, SBC&#xff09;在创客和开发者社区中越来越受到欢迎。我最近入手了一款高性能的单板计算机——Orange Pi AIpro。 在入手此款AI开发板之…

探索 Ollama: 你的本地 AI 助手

本期推荐的开源项目是 Ollama&#xff0c;它是一款本地大模型运行工具&#xff0c;可以帮助用户轻松下载和运行各种大型语言模型&#xff08;LLM&#xff09;&#xff0c;而无需将数据上传到云端。以下是关于 Ollama 的介绍以及安装和使用教程&#xff1a; Ollama 是什么&#…

VB.net 进行CAD二次开发(二)

利用参考文献2&#xff0c;添加面板 执行treeControl New UCTreeView()时报一个错误&#xff1a; 用户代码未处理 System.ArgumentException HResult-2147024809 Message控件不支持透明的背景色。 SourceSystem.Windows.Forms StackTrace: 在 System.Windows…

Spring事务管理进阶-rollbackFor propagation

黑马程序员JavaWeb开发教程 文章目录 一、rollbackFor二、propagation2.1 事务传播行为2.2 场景 一、rollbackFor 默认情况下&#xff0c;只有初选RuntimeException才会回滚异常。roolbackFor属性用于控制出现何种异常类型&#xff0c;回滚事务。 二、propagation 用来配置事…

网络安全基础技术扫盲篇 — 名词解释之“数据包“

用通俗易懂的话说&#xff1a; 数据包就像是一个信封。当你写信给某个人时&#xff0c;你将内容写在一张纸上&#xff0c;然后将纸叠起来并放入信封中&#xff0c;就形成了一个完整要发送的数据内容。信封上有发件人和收件人的详细地址&#xff0c;还有一些其他必要的信息&…

Java项目:93 springboot学生评奖评优管理系统的设计与实现

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 本学生评奖评优管理系统有管理员和教师和学生。 管理员功能有个人中心&#xff0c;学生管理&#xff0c;教师管理&#xff0c;院系信息管理&a…

规则引擎Drools,基于mysql实现动态加载部署

文章目录 一、使用1、参考资料2、引包3、创建规则实体类4、实现drools动态规则5、模拟数据库&#xff0c;实现规则的CRUD6、创建控制层7、测试规则的动态添加&#xff08;1&#xff09;添加规则&#xff08;2&#xff09;修改规则&#xff08;3&#xff09;删除规则 8、模拟2个…

蓝桥杯单片机-国赛7——第十四届主观题代码参考

0.编程心得 本题中&#xff0c;要求测距能达到250cm&#xff0c;因此pca必须配置为0x01&#xff0c;但直接用会死机&#xff0c;因此需要使用CH作为判断量。 【iic的at24c02记录】&#xff1a; 读设备地址&#xff1a;0xA1 写设备地址&#xff1a;0xA0 非应答信号&#xff1…

【Linux】Linux基本指令3

目录 1.date指令 2.cal指令 3.find指令&#xff1a;&#xff08;灰常重要&#xff09; -name 4.grep指令——行文本过滤工具 5.zip/unzip指令&#xff1a; 6.tar指令&#xff08;重要&#xff09;&#xff1a;打包/解包&#xff0c;不打开它&#xff0c;直接看内容 7.bc…

Dinky DorisCDC 整库同步到 Doris

doris flinkcdc语法参考 Flink Doris Connector - Apache Doris 参考&#xff1a; Doris Flink DolphinScheduler Dinky 构建开源数据平台_dinky dolphinscheduler flink-CSDN博客

2024年,抖音小店618十大爆款预测!商家抓紧时间上架!

哈喽~我是电商月月 做电商的玩家都知道&#xff0c;一但到了换季或者是节日大促的时候&#xff0c;销量高&#xff0c;是最容易爆单的阶段 而提前上架一些热卖产品&#xff0c;爆单的几率在自己的店铺机会就越大 而最近的一个大型活动&#xff0c;就是618了&#xff0c;抖音…

只出现一次的数字II ---- 位运算

题目链接 题目: 分析: 对于只出现一次的数字, 他的任意一个bit位, 可能是0或1对于其余出现3次的数字, 假设有3n个数, 那么他们的任意一个bit相加的和可能是3n个0或3n个1那么对于数组中的全部数字的任意一个bit位之和共有三种情况: 3n个1 1 3n13n个0 1 13n个1 0 3n3n个0…

华为认证学习笔记:生成树

以太网交换网络中为了进行链路备份&#xff0c;提高网络可靠性&#xff0c;通常会使用冗余链路。但是使用冗余链路会在交换网络上产生环路&#xff0c;引发广播风暴以及MAC地址表不稳定等故障现象&#xff0c;从而导致用户通信质量较差&#xff0c;甚至通信中断。为解决交换网络…

如何限制上网行为?上网行为管控软件有什么功能?

上网行为的管理与限制对于保障企业安全、提高员工工作效率以及保护孩子健康成长都显得尤为重要。 上网行为管控软件作为一种专业的工具&#xff0c;在这方面发挥着不可替代的作用。 本文将探讨如何限制上网行为&#xff0c;并介绍上网行为管控软件的主要功能。 一、如何限制上…

C++入门——类和对象【3】(6)

前言 本节是C类和对象中的最后一节&#xff0c;学完本节内容并且能够掌握之前所学的所有内容的话&#xff0c;C就可以说是入门了&#xff0c;那我们废话不多说&#xff0c;正式进入今天的学习 1. 再谈构造函数 1.1 引入 我们在栈的背景下来看 栈的代码&#xff1a; ​type…

【技术实操】银河高级服务器操作系统实例分享,达梦数据库服务器 oom 问题分析

1. 服务器环境以及配置 【 机型】 处理器&#xff1a; HUAWEIKunpeng 920 5220 内存&#xff1a; 400518528 kB 主板型号&#xff1a; Chaoqiang K620 series 整机类型/架构&#xff1a; ARM BIOS 版本&#xff1a; KL4.41.028.TF.220224.R 固件版本&#xff1a; KL4.41…

AI能否代替ACE

什么是ACE ? 申请ACE需要以下条件: 1.发表与oracle相关的技术博客 2.参与Oracle相关的技术大会 3.对Oracle社区做出贡献。 这正好是AI应用的场景吗? 在一个群里有个群友质疑AI落地,以及应用领域? Kelvin:我一直在迷茫&#xff0c;学不好。这么多有趣AI 问题&…