MapReduce 实践题:Web 访问日志分析与异常检测

文章目录

    • 作业描述
      • MapReduce 实践题:Web 访问日志分析与异常检测
        • 题目背景
        • 数据集说明
        • 任务要求
        • 输入数据示例
        • 输出数据示例
        • 实现步骤
      • 解题思路
      • 1. 数据预处理
      • 2. 访问统计
      • 3. 异常检测
      • 4. 主方法
      • 5. 结果输出

作业描述

MapReduce 实践题:Web 访问日志分析与异常检测

题目背景

你被要求设计和实现一个基于 MapReduce 的大规模 Web 访问日志分析与异常检测系统。该系统的目标是从每日数百万条访问日志中提取有用的信息,并检测出潜在的异常访问行为。访问日志文件格式如下:

127.0.0.1 - - [10/Oct/2021:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 1043
192.168.0.1 - - [10/Oct/2021:13:56:12 -0700] "POST /login HTTP/1.1" 200 2326
...
数据集说明
  • IP 地址:例如,127.0.0.1
  • 时间戳:例如,[10/Oct/2021:13:55:36 -0700]
  • 请求方法:例如,"GET""POST"
  • 请求 URL:例如,"/index.html"
  • HTTP 响应码:例如,200404500
  • 响应大小:例如,1043
任务要求
  1. 数据预处理
    • 解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。
    • 将解析后的数据格式化为结构化格式(例如,JSON)。
  2. 访问统计
    • 统计每个 IP 地址在一天中的访问次数。
    • 统计每个请求 URL 在一天中的访问次数。
  3. 异常检测
    • 检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。
    • 检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,统计每个 IP 地址的异常请求次数,并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。
  4. 结果输出
    • 输出访问统计结果:每个 IP 地址的访问次数,每个请求 URL 的访问次数。
    • 输出异常检测结果:异常高访问频率的 IP 地址及其访问次数,潜在的恶意请求 IP 地址及其异常请求次数和总请求次数的比例。
输入数据示例
127.0.0.1 - - [10/Oct/2021:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 1043
192.168.0.1 - - [10/Oct/2021:13:56:12 -0700] "POST /login HTTP/1.1" 200 2326
...
输出数据示例

访问统计结果

IP访问次数:
127.0.0.1  150
192.168.0.1  200URL访问次数:
/index.html  300
/login  400

异常检测结果

异常高访问频率 IP:
192.168.0.1  1200潜在恶意请求 IP:
127.0.0.1  50  25.0%
实现步骤
  1. 数据预处理 Mapper
    • 解析日志记录,提取必要字段并输出结构化数据。
  2. 访问统计 Mapper 和 Reducer
    • Mapper:统计每个 IP 地址和每个 URL 的访问次数。
    • Reducer:汇总每个 IP 地址和每个 URL 的访问次数。
  3. 异常检测 Mapper 和 Reducer
    • Mapper:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。
    • Reducer:计算每个 IP 地址访问次数的均值和标准差,标记异常高访问频率的 IP 地址;统计每个 IP 地址的异常请求次数并计算异常请求比例,标记潜在的恶意请求 IP 地址。

解题思路

  1. 数据预处理 Mapper:解析日志记录,提取必要字段并输出结构化数据。
  2. 访问统计 Mapper 和 Reducer:统计每个 IP 地址和每个 URL 的访问次数。
  3. 异常检测 Mapper 和 Reducer:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。
  4. 主方法:设置三个 MapReduce 作业:数据预处理、访问统计和异常检测。

1. 数据预处理

PreprocessMapper

package org.example.mapreduce.t1;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author 撕得失败的标签* @version 1.0* @description: 数据预处理* @date 2024/6/22 22:35*/public class PreprocessMapper extends Mapper<LongWritable, Text, Text, Text> {/*** @description:* 1. 解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。* 2. 将解析后的数据格式化为结构化格式(例如,JSON)。* @author 撕得失败的标签* @date 2024/6/23 11:09*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] strings = line.split(" ");if (strings.length == 10) {// 提取匹配到的字段String ipAddress = strings[0];String timestamp = strings[3] + " " + strings[4];String requestMethod = strings[5];String requestUrl = strings[6];String httpStatusCode = strings[8];String responseSize = strings[9];context.write(new Text(ipAddress), new Text(timestamp + "," + requestMethod + "," + requestUrl + "," + httpStatusCode + "," + responseSize));}}
}

2. 访问统计

AccessStatistics

package org.example.mapreduce.t1;import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;import java.io.IOException;/*** @author 撕得失败的标签* @version 1.0* @description: 访问统计* @date 2024/6/22 22:55*/
public class AccessStatistics {/*** @description:* 1. 统计每个 IP 地址在一天中的访问次数。* 2. 统计每个请求 URL 在一天中的访问次数。* @author 撕得失败的标签* @date 2024/6/23 11:08*/public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {String line = value.toString();String[] strings = line.split(" ");// 统计一天的,以 20/Jun/2024 为例if (strings[3].contains("20/Jun/2024")) {// IPcontext.write(new Text(strings[0]), new LongWritable(1));// URLcontext.write(new Text(strings[6]), new LongWritable(1));}}}public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (LongWritable value : values) {sum += value.get();}context.write(key, new LongWritable(sum));}}
}

3. 异常检测

AnomalyDetection

package org.example.mapreduce.t1;import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;import java.io.IOException;
import java.util.HashMap;/*** @author 撕得失败的标签* @version 1.0* @description: 异常检测* @date 2024/6/23 11:08*/
public class AnomalyDetection {/*** @description:* 1. 检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。* 2. 检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,*    统计每个 IP 地址的异常请求次数,*    并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。* @author 撕得失败的标签* @date 2024/6/23 11:08*/public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] strings = value.toString().split(" ");String ip = strings[0];context.write(new Text(ip), new LongWritable(1));String httpStatusCode = strings[8];if (httpStatusCode.startsWith("4") || httpStatusCode.startsWith("5")) {String anomaly = "+" + ip;context.write(new Text(anomaly), new LongWritable(1));}}}public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {private final HashMap<String, Long> ipToCount = new HashMap<String, Long>();private final HashMap<String, Long> ipToAnomalyCount = new HashMap<String, Long>();@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (LongWritable value : values) {sum += value.get();}
//            context.write(key, new LongWritable(sum));String ip = key.toString();if (ip.startsWith("+")) {ip = ip.substring(1);ipToAnomalyCount.put(ip, sum);}ipToCount.put(ip, sum);}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {// 实现异常检测的逻辑long sum = 0;for (String k : ipToCount.keySet()) {sum += ipToCount.get(k);}double avg = (double) (sum / ipToCount.size());double std = 0;for (String k : ipToCount.keySet()) {std += Math.pow(ipToCount.get(k) - avg, 2);}// 异常高访问频率 IPfor (String k : ipToCount.keySet()) {if (ipToCount.get(k) > avg + 3 * std) {context.write(new Text(k), new LongWritable(ipToCount.get(k)));}}// 潜在恶意请求 IPfor (String k : ipToAnomalyCount.keySet()) {double anomaly = (double) ipToAnomalyCount.get(k) / ipToCount.get(k);if (anomaly > 0.2) {context.write(new Text(k + "\t" + String.format("%.1f", anomaly * 100) + "%"), new LongWritable(ipToAnomalyCount.get(k)));}}}}
}

4. 主方法

Main

package org.example.mapreduce.t1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.LinkedList;
import java.util.List;/*** @author 撕得失败的标签* @version 1.0* @description: 主方法* @date 2024/6/22 22:34*/
public class Main {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 创建配置信息Configuration conf = new Configuration();conf.set("fs.default.name","hdfs://hadoop102:9000");// 1. 数据预处理 PreprocessMapperJob preprocessJob = Job.getInstance(conf, "preprocess job");preprocessJob.setJarByClass(Main.class);preprocessJob.setMapperClass(PreprocessMapper.class);preprocessJob.setOutputKeyClass(Text.class);preprocessJob.setOutputValueClass(Text.class);FileInputFormat.addInputPath(preprocessJob, new Path("/m1"));FileSystem fs = FileSystem.get(conf);Path outPath = new Path("/t1/preprocess");if(fs.exists(outPath)) {fs.delete(outPath, true);}FileOutputFormat.setOutputPath(preprocessJob, outPath);preprocessJob.waitForCompletion(true);// 2. 访问统计 AccessStatisticsJob accessStatisticsJob = Job.getInstance(conf, "access statistics job");accessStatisticsJob.setJarByClass(Main.class);accessStatisticsJob.setMapperClass(AccessStatistics.Map.class);accessStatisticsJob.setReducerClass(AccessStatistics.Reduce.class);accessStatisticsJob.setOutputKeyClass(Text.class);accessStatisticsJob.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(accessStatisticsJob, new Path("/m1"));FileSystem fs1 = FileSystem.get(conf);Path outPath1 = new Path("/t1/statistics");if(fs1.exists(outPath1)) {fs1.delete(outPath1, true);}FileOutputFormat.setOutputPath(accessStatisticsJob, outPath1);accessStatisticsJob.waitForCompletion(true);// 3. 异常检测 AnomalyDetectionJob anomalyDetectionJob = Job.getInstance(conf, "anomaly detection job");anomalyDetectionJob.setJarByClass(Main.class);anomalyDetectionJob.setMapperClass(AnomalyDetection.Map.class);anomalyDetectionJob.setReducerClass(AnomalyDetection.Reduce.class);anomalyDetectionJob.setOutputKeyClass(Text.class);anomalyDetectionJob.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(anomalyDetectionJob, new Path("/m1"));FileSystem fs2 = FileSystem.get(conf);Path outPath2 = new Path("/t1/anomaly");if(fs2.exists(outPath2)) {fs2.delete(outPath2, true);}FileOutputFormat.setOutputPath(anomalyDetectionJob, outPath2);anomalyDetectionJob.waitForCompletion(true);// 4. 输出结果 Output// 访问统计结果:FileSystem fs3 = FileSystem.get(conf);Path outPath3 = new Path("/t1/statistics/part-r-00000");BufferedReader br = new BufferedReader(new InputStreamReader(fs3.open(outPath3)));List<String> ip = new LinkedList<String>();List<String> url = new LinkedList<String>();String line;while ((line = br.readLine()) != null) {if (line.startsWith("/")) {url.add(line);} else {ip.add(line);}}// IP访问次数:System.out.println("\nIP访问次数:");for (String s : ip) {System.out.println(s);}// URL访问次数:System.out.println("\nURL访问次数:");for (String s : url) {System.out.println(s);}// 异常检测结果:FileSystem fs4 = FileSystem.get(conf);Path outPath4 = new Path("/t1/anomaly/part-r-00000");BufferedReader br1 = new BufferedReader(new InputStreamReader(fs4.open(outPath4)));List<String> potential = new LinkedList<String>();List<String> anomaly = new LinkedList<String>();String line1;while ((line1 = br1.readLine()) != null) {String[] strings = line1.split("\t");if (strings.length == 2) {anomaly.add(line1);} else {potential.add(line1);}}// 异常高访问频率 IP:System.out.println("\n异常高访问频率 IP:");if (anomaly.size() == 0) {System.out.println("无");} else {for (String s : anomaly) {System.out.println(s);}}// 潜在恶意请求 IP:System.out.println("\n潜在异常高访问频率 IP:");if (potential.size() == 0) {System.out.println("无");} else {for (String s : potential) {String[] strings = s.split("\t");System.out.println(strings[0] + "\t" + strings[2] + "\t" + strings[1]);}}}
}

5. 结果输出

IP访问次数:
10.0.0.1	334003
10.0.0.2	334350
10.0.0.3	333056
10.0.0.4	333947
10.0.0.5	333263
127.0.0.1	332347
127.0.0.2	333025
127.0.0.3	332450
127.0.0.4	333005
127.0.0.5	333428
192.168.0.1	334054
192.168.0.2	332883
192.168.0.3	333681
192.168.0.4	333133
192.168.0.5	333375URL访问次数:
/cart	713975
/checkout	713453
/contact	715382
/home.html	712570
/index.html	715544
/login	714255
/products	714821异常高访问频率 IP:
无潜在异常高访问频率 IP:
192.168.0.2	222498	66.8%
192.168.0.1	222165	66.5%
127.0.0.5	221778	66.5%
192.168.0.4	222096	66.7%
127.0.0.4	222156	66.7%
192.168.0.3	222227	66.6%
192.168.0.5	222070	66.6%
10.0.0.4	222243	66.6%
10.0.0.3	221966	66.6%
10.0.0.5	222347	66.7%
10.0.0.2	222664	66.6%
10.0.0.1	222493	66.6%
127.0.0.3	221464	66.6%
127.0.0.2	222197	66.7%
127.0.0.1	221702	66.7%Process finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/32656.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql启动时遇到:本地计算机上的MySQL服务启动后停止

1.问题重述&#xff1a; 今早启动数据库时发现无法启动&#xff0c;报错&#xff1a;本地计算机 上的 MySQL服务启动后停止。某些服务在未由其他服务或程序使用时将自动停止。 2.解决方案&#xff1a; 1.数据备份&#xff1a; 2.在bin目录下&#xff0c;命令行中输入 mysqld …

【IPython 使用技巧整理】

IPython 使用技巧整理 IPython 是一个交互式 Python 解释器&#xff0c;比标准 Python 解释器提供了更加强大的功能和更友好的使用体验。它为数据科学、机器学习和科学计算提供了强大的工具&#xff0c;是 Python 开发人员不可或缺的工具之一。本文将深入探讨 IPython 的各种使…

Java中的多线程编程实用指南

Java中的多线程编程实用指南 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们来探讨一下Java中多线程编程的实用指南。 在当今软件开发的世界中&#x…

python3.9安装pysal库

一、安装geopandas 下载地址&#xff1a;https://download.csdn.net/download/xiangfengl/89330615 然后逐个按顺序安装 1.安装gdal pip install GDAL-3.3.3-cp39-cp39-win_amd64.whl 2.安装Fiona pip install Fiona-1.8.20-cp39-cp39-win_amd64.whl 3.安装shapely pip …

Python面试宝典:云服务概览以及Python在云计算服务中的应用相关面试笔试题(1000加面试笔试题助你轻松捕获大厂Offer)

Python面试宝典:1000加python面试题助你轻松捕获大厂Offer【第二部分:Python高级特性:第二十六章:Python与云计算:第一节:云服务概览以及Python在云计算服务中的应用】 第二十六章:Python与云计算第一节:云服务概览以及Python在云计算服务中的应用1. 云计算服务的开发与…

版本控制工具-git的基本使用

目录 前言一、git简介二、git工作流程三、git基本命令3.1 创建本地仓库3.2 将工作区内容提交到本地仓库3.3 将本地仓库内容推送到远程仓库 前言 本篇文章介绍git的一般工作流程 一、git简介 Git是一个开源的分布式版本控制软件&#xff0c;常用于项目的版本管理 Git是Linux …

196.每日一题:检测大写字母(力扣)

代码解决 class Solution { public:bool detectCapitalUse(string word) {int capitalCount 0;int n word.size();// 统计大写字母的数量for (char c : word) {if (isupper(c)) {capitalCount;}}// 检查是否满足三种情况之一if (capitalCount n) {// 全部字母都是大写return…

等保测评:全面保障信息系统安全的必要举措

等保测评&#xff08;信息安全等级保护测评&#xff09;是评估信息系统安全等级的重要过程&#xff0c;旨在确保信息系统能够抵御各种安全威胁&#xff0c;保障信息的机密性、完整性和可用性。以下是一篇关于等保测评的文章&#xff0c;内容清晰、分点表示&#xff0c;并参考了…

2024.06.23 刷题日记

〇、前言 今天重点刷了回溯&#xff0c;以及常见的题目。 46. 全排列 给定一个不含重复数字的数组 nums &#xff0c;返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[1,2,3],[1,3,2],[2,1,3…

Node.js是什么(基础篇)

前言 Node.js是一个基于Chrome V8 JavaScript引擎的开源、跨平台JavaScript运行时环境&#xff0c;主要用于开发服务器端应用程序。它的特点是非阻塞I/O模型&#xff0c;使其在处理高并发请求时表现出色。 一、Node JS到底是什么 1、Node JS是什么 Node.js不是一种独立的编程…

C/C++ - 编码规范(USNA版)

[IC210] Resources/C Programming Guide and Tips 所有提交的评分作业&#xff08;作业、项目、实验、考试&#xff09;都必须使用本风格指南。本指南的目的不是限制你的编程&#xff0c;而是为你的程序建立统一的风格格式。 * 这将有助于你调试和维护程序。 * 有助于他人&am…

USB - USB在消费领域的应用

Switching in USB Consumer Applications 通用串行总线&#xff08;USB&#xff09;已成为满足终端设备之间日益增长的快速数据传输需求的主流接口--例如&#xff0c;在个人电脑和便携式设备&#xff08;如手机、数码相机和个人媒体播放器&#xff09;之间下载和上传数据。 The…

IP地址解析省份区域信息

背景 最近工作需要根据IP地址&#xff0c;解析通话所在省份。因此&#xff0c;在网上找了相关方案&#xff0c;作为存档记录下来。 在线接口 不做阐述。因为&#xff0c;一是网上可以很轻松的找到&#xff0c;没有必要多说。二是开发在内网中&#xff0c;多数不会让连接外网…

Go WebSocket入门+千万级别弹幕系统架构设计

Go实现WebSocket&#xff08;千万级别弹幕系统架构设计&#xff09; 1 websocket简介(基于HTTP协议的长连接) 使用WebSocket可以轻松的维持服务器端长连接&#xff0c;其次WebSocket是架构在HTTP协议之上的&#xff0c;并且也可以使用HTTPS方式&#xff0c;因此WebSocket是可靠…

QT事件处理系统之一:父子组件之间的事件传播机制

1、一些解释 /*1. 事件传播(非常重要):GUI编程当中, 事件的传播是[基于组件层面]的, 并不是依靠类继承机制!类的继承:QWidget <- QPushButton <- FirstButton <- SecondButton基于组件(父子组件):QWidget <- QPushButton(FirstButton/SecondButton);虽然…

国产化操作系统杂谈

目录 操作系统国产化背景国产化操作系统名录优秀操作系统介绍1.深度Linux&#xff08;deepin&#xff09;2.FydeOS3.AliOS&#xff08;openAliOS&#xff09;4.openEuler5.红旗Linux6. startOS 总结 操作系统国产化背景 官方的说法是为了打破长期以来国外对中国的操作系统的垄…

OGG几何内核开发-复杂装配模型读取、显示、分析

OGG几何内核读取STEP模型文件的API有STEPCAFControl_Reader、STEPControl_Reader。 STEPCAFControl_Reader使用很复杂&#xff0c;但可以展示装配树&#xff0c;有利于模型的详细分析。 本文演示了《插件化算法研究平台V2》的OCC几何模型插件的部分功能&#xff1a;显示装配树…

四川汇聚荣科技有限公司靠谱吗?

在如今这个信息爆炸的时代&#xff0c;了解一家公司是否靠谱对于消费者和合作伙伴来说至关重要。四川汇聚荣科技有限公司作为一家位于中国西部地区的企业&#xff0c;自然也受到了人们的关注。那么&#xff0c;这家公司究竟如何呢?接下来&#xff0c;我们将从多个角度进行深入…

Python应用开发——30天学习Streamlit Python包进行APP的构建(8)

st.table 显示静态表格。 这与 st.dataframe 的不同之处在于,这里的表格是静态的:其全部内容直接显示在页面上。 Function signature[source]st.table(data=None) Parametersdata (pandas.DataFrame, pandas.Styler, pyarrow.Table, numpy.ndarray, pyspark.sql.DataFrame,…

Mysql数据库约束的概述 , 逐渐约束 , 主键自增 , 表关系的概念和外键 ,多表关系约束介绍和使用

约束和表设计 1、DQL查询语句-limit语句(掌握) 目标 能够掌握limit语句的使用 讲解 作用&#xff1a; LIMIT是限制的意思&#xff0c;所以LIMIT的作用就是限制查询记录的条数。 LIMIT语句格式: select * from 表名 limit offset, row_count; mysql中limit的用法&#…