linux流式访问日志,流式实时日志分析系统的实现原理

我们知道网站用户访问流量是不间断的,基于网站的访问日志,即 Web log 分析是典型的流式实时计算应用场景。比如百度统计,它可以做流量分析、来源分析、网站分析、转化分析。另外还有特定场景分析,比如安全分析,用来识别 CC 攻击、 SQL 注入分析、脱库等。这个项目就简单实现一个类似于百度分析的系统。

实验原理

百度统计(tongji.baidu.com)是百度推出的一款免费的专业网站流量分析工具,能够告诉用户访客是如何找到并浏览用户的网站的,以及在网站上浏览了哪些页面。这些信息可以帮助用户改善访客在其网站上的使用体验,不断提升网站的投资回报率。

百度统计提供了几十种图形化报告,包括:趋势分析、来源分析、页面分析、访客分析、定制分析等多种统计分析服务。

这里我们参考百度统计的功能,基于 Spark Streaming 简单实现一个分析系统,使之包括以下分析功能。流量分析。一段时间内用户网站的流量变化趋势,针对不同的 IP 对用户网站的流量进行细分。常见指标是总 PV 和各 IP 的PV。

来源分析。各种搜索引擎来源给用户网站带来的流量情况,需要精确到具体搜索引擎、具体关键词。通过来源分析,用户可以及时了解哪种类型的来源为其带来了更多访客。常见指标是搜索引擎、关键词和终端类型的 PV 。

网站分析。各个页面的访问情况,包括及时了解哪些页面最吸引访客以及哪些页面最容易导致访客流失,从而帮助用户更有针对性地改善网站质量。常见指标是各页面的 PV 。

1 日志实时采集

Web log 一般在 HTTP 服务器收集,比如 Nginx access 日志文件。一个典型的方案是 Nginx 日志文件 + Flume + Kafka + Spark Streaming,如下所述:接收服务器用 Nginx ,根据负载可以部署多台,数据落地至本地日志文件;

每个 Nginx 节点上部署 Flume ,使用 tail -f 实时读取 Nginx 日志,发送至 KafKa 集群;

Spark Streaming 程序实时消费 Kafka 集群上的数据,实时分析,输出;

结果写入 MySQL 数据库。

当然,还可以进一步优化,比如 CGI 程序直接发日志消息到 Kafka ,节省了写访问日志的磁盘开销。这里主要专注 Spark Streaming 的应用,所以我们不做详细论述。

2 流式分析系统实现

我们简单模拟一下数据收集和发送的环节,用一个 Python 脚本随机生成 Nginx 访问日志,并通过脚本的方式自动上传至 HDFS ,然后移动至指定目录。 Spark Streaming 程序监控 HDFS 目录,自动处理新的文件。

生成 Nginx 日志的 Python 代码如下,保存为文件 sample_web_log.py 。#!/usr/bin/env python# -*- coding: utf-8 -*-import random

import timeclass WebLogGeneration(object):

# 类属性,由所有类的对象共享

site_url_base = "http://www.xxx.com/"

# 基本构造函数

def __init__(self):        #  前面7条是IE,所以大概浏览器类型70%为IE ,接入类型上,20%为移动设备,分别是7和8条,5% 为空

#  https://github.com/mssola/user_agent/blob/master/all_test.go

self.user_agent_dist = {0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",                                0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",                                0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",                                0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",                                0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",                                0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",                                0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",                                0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",                                0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",                                0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",                                1:" ",}        self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]        self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]        self.http_refer = [ "http://www.baidu.com/s?wd={query}","http://www.google.cn/search?q={query}","http://www.sogou.com/web?query={query}","http://one.cn.yahoo.com/s?p={query}","http://cn.bing.com/search?q={query}"]        self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]    def sample_ip(self):

slice = random.sample(self.ip_slice_list, 4) #从ip_slice_list中随机获取4个元素,作为一个片断返回

return  ".".join([str(item) for item in slice])  #  todo

def sample_url(self):        return  random.sample(self.url_path_list,1)[0]    def sample_user_agent(self):

dist_uppon = random.uniform(0, 1)        return self.user_agent_dist[float('%0.1f' % dist_uppon)]    # 主要搜索引擎referrer参数

def sample_refer(self):        if random.uniform(0, 1) > 0.2:  # 只有20% 流量有refer

return "-"

refer_str=random.sample(self.http_refer,1)

query_str=random.sample(self.search_keyword,1)        return refer_str[0].format(query=query_str[0])    def sample_one_log(self,count = 3):

time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())        while count >1:

query_log = "{ip} - - [{local_time}] \"GET /{url} HTTP/1.1\" 200 0 \"{refer}\" \"{user_agent}\" \"-\"".format(ip=self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())

print query_log

count = count -1if __name__ == "__main__":

web_log_gene = WebLogGeneration()    #while True:

#    time.sleep(random.uniform(0, 3))

web_log_gene.sample_one_log(random.uniform(10, 100))

这是一条日志的示例,为一行形式,各字段间用空格分隔,字符串类型的值用双引号包围:46.202.124.63 - - [2015-11-26 09:54:27] "GET /view.php HTTP/1.1" 200 0 "http://www.google.cn/search?q=hadoop" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-"

然后需要一个简单的脚本来调用上面的脚本以随机生成日志,上传至 HDFS ,然后移动到目标目录:#!/bin/bash

# HDFS命令 HDFS="/usr/local/myhadoop/hadoop-2.6.0/bin/hadoop fs"# Streaming程序监听的目录,注意跟后面Streaming程序的配置要保持一致 streaming_dir=”/spark/streaming”

# 清空旧数据 $HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1

$HDFS -rm "${streaming_dir}"'/*'     > /dev/null 2>&1

# 一直运行 while [ 1 ]; do

./sample_web_log.py > test.log

# 给日志文件加上时间戳,避免重名

tmplog="access.`date +'%s'`.log"

# 先放在临时目录,再move至Streaming程序监控的目录下,确保原子性

# 临时目录用的是监控目录的子目录,因为子目录不会被监控

$HDFS -put test.log ${streaming_dir}/tmp/$tmplog

$HDFS -mv           ${streaming_dir}/tmp/$tmplog ${streaming_dir}/

echo "`date +"%F %T"` put $tmplog to HDFS succeed"

sleep 1done

Spark Streaming 程序代码如下所示,可以在 bin/spark-shell 交互式环境下运行,如果要以 Spark 程序的方式运行,按注释中的说明调整一下 StreamingContext 的生成方式即可。启动 bin/spark-shell 时,为了避免因 DEBUG 日志信息太多而影响观察输出,可以将 DEBUG 日志重定向至文件,屏幕上只显示主要输出,方法是./bin/spark-shell 2>spark-shell-debug.log:// 导入类import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}// 设计计算的周期,单位秒val batch = 10/*

* 这是bin/spark-shell交互式模式下创建StreamingContext的方法

* 非交互式请使用下面的方法来创建

*/val ssc = new StreamingContext(sc, Seconds(batch))/*

// 非交互式下创建StreamingContext的方法

val conf = new SparkConf().setAppName("NginxAnay")

val ssc = new StreamingContext(conf, Seconds(batch))

*//*

* 创建输入DStream,是文本文件目录类型

* 本地模式下也可以使用本地文件系统的目录,比如 file:///home/spark/streaming

*/val lines = ssc.textFileStream("hdfs:///spark/streaming")/*

* 下面是统计各项指标,调试时可以只进行部分统计,方便观察结果

*/// 1. 总PVlines.count().print()// 2. 各IP的PV,按PV倒序//   空格分隔的第一个字段就是IPlines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => {

rdd.map(ip_pv => (ip_pv._2, ip_pv._1)).

sortByKey(false).

map(ip_pv => (ip_pv._2, ip_pv._1))

}).print()// 3. 搜索引擎PVval refer = lines.map(_.split("\"")(3))// 先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算// 输出(host, query_keys)val searchEnginInfo = refer.map(r => {

val f = r.split('/')

val searchEngines = Map(        "www.google.cn" -> "q",        "www.yahoo.com" -> "p",        "cn.bing.com" -> "q",        "www.baidu.com" -> "wd",        "www.sogou.com" -> "query"

)    if (f.length > 2) {

val host = f(2)        if (searchEngines.contains(host)) {

val query = r.split('?')(1)            if (query.length > 0) {

val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)                if (arr_search_q.length > 0)

(host, arr_search_q(0).split('=')(1))                else

(host, "")

} else {

(host, "")

}

} else

("", "")

} else

("", "")

})// 输出搜索引擎PVsearchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print()// 4. 关键词PVsearchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print()// 5. 终端类型PVlines.map(_.split("\"")(5)).map(agent => {

val types = Seq("iPhone", "Android")    var r = "Default"

for (t 

r = t

}

(r, 1)

}).reduceByKey(_ + _).print()// 6. 各页面PVlines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print()// 启动计算,等待执行结束(出错或Ctrl-C退出)ssc.start()

ssc.awaitTermination()

打开两个终端,一个调用上面的 bash 脚本模拟提交日志,一个在交互式环境下运行上面的 Streaming 程序。你可以看到各项指标的输出,比如某个批次下的输出为(依次对应上面的 6 个计算项):

总PV-------------------------------------------

Time: 1448533850000 ms

-------------------------------------------

44374

各IP的PV,按PV倒序------------------------------------------- Time: 1448533850000 ms

------------------------------------------- (72.63.87.30,30)

(63.72.46.55,30)

(98.30.63.10,29)

(72.55.63.46,29)

(63.29.10.30,29)

(29.30.63.46,29)

(55.10.98.87,27)

(46.29.98.30,27)

(72.46.63.30,27)

(87.29.55.10,26)

搜索引擎PV------------------------------------------- Time: 1448533850000 ms ------------------------------------------- (cn.bing.com,1745)

(www.baidu.com,1773)

(www.google.cn,1793)

(www.sogou.com,1845)

关键词PV-------------------------------------------

Time: 1448533850000 ms

-------------------------------------------

(spark,1426)

(hadoop,1455)

(spark sql,1429)

(spark mlib,1426)

(hive,1420)

终端类型PV-------------------------------------------

Time: 1448533850000 ms

-------------------------------------------

(Android,4281)

(Default,35745)

(iPhone,4348)

各页面PV-------------------------------------------

Time: 1448533850000 ms

-------------------------------------------

(/edit.php,6435)

(/admin/login.php,6271)

(/login.php,6320)

(/upload.php,6278)

(/list.php,6411)

(/index.html,6309)

(/view.php,6350)

查看数据更直观的做法是用图形来展示,常见做法是将结果写入外部 DB ,然后通过一些图形化报表展示系统展示出来。比如对于终端类型,我们可以用饼图展示。

对于连续的数据,我们也可以用拆线图来展示趋势。比如某页面的PV。

除了常规的每个固定周期进行一次统计,我们还可以对连续多个周期的数据进行统计。以统计总 PV 为例,上面的示例是每 10 秒统计一次,可能还需要每分钟统计一次,相当于 6 个 10 秒的周期。我们可以利用窗口方法实现,不同的代码如下:// 窗口方法必须配置checkpint,可以这样配置: ssc.checkpoint("hdfs:///spark/checkpoint")

// 这是常规每10秒一个周期的PV统计 lines.count().print()

// 这是每分钟(连续多个周期)一次的PV统计 lines.countByWindow(Seconds(batch*6), Seconds(batch*6)).print()

使用相同的办法运行程序之后,我们首先会看到连续 6 次 10 秒周期的 PV 统计输出:-------------------------------------------

Time: 1448535090000 ms

-------------------------------------------

1101

-------------------------------------------

Time: 1448535100000 ms

-------------------------------------------

816

-------------------------------------------

Time: 1448535110000 ms

-------------------------------------------

892

-------------------------------------------

Time: 1448535120000 ms

-------------------------------------------

708

-------------------------------------------

Time: 1448535130000 ms

-------------------------------------------

881

-------------------------------------------

Time: 1448535140000 ms

-------------------------------------------

872

在这之后,有一个 1 分钟周期的 PV 统计输出,它的值刚好是上面 6 次计算结果的总和:-------------------------------------------

Time: 1448535140000 ms

-------------------------------------------

5270

最后

以上内容截选自实验楼教程 【流式实时日志分析系统——《Spark 最佳实践》】,教程主要是教你开发一个类似百度统计的系统,文章主要截选了其实验原理部分,后面还有具体的开发部分:开发准备准备生成日志的Python代码;

启动Spark Shell;

实验步骤创建日志目录;

通过 bash 脚本生成日志;

在 Spark Streaming 中进行日志分析;

开始生成日志并查看结果;

作者:实验楼

链接:https://www.jianshu.com/p/241bec487619

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/534572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux无法切换shell,linux shell的切换

查看系统可用shell种类:(一般是bash shell)➜ ~ chsh -l/bin/sh/bin/bash/sbin/nologin/bin/dash/bin/zsh修改当前的shell:[rootGIT ~]# chsh -l/bin/sh/bin/bash/sbin/nologin/bin/dash/bin/zsh[rootGIT ~]# chshChanging shell for root.New shell [/bin/bash]: /…

64位ubuntu arm-linux-gcc,在ubuntu 64位的机器上执行arm-linux-gcc提示 no such file or directory【转】...

解压好了arm-linuxg-gcc 放到了$PATH路径下, 无论怎么执行都提示说: no such file or directory,可明明有这个文件的.N遍之后, 执行了 file arm-Linux-gcc发现这个命令是32位的, 需要安装兼容包,于是apt-get install libc6:i386 libgcc1:i386 gcc-4.6-base:i386:ia32-libslibst…

c语言for循环26个英文字母,菜鸟求助,写一个随机输出26个英文字母的程序

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼#include #include // 用srand、rand函数了#include // 用time函数了#define LEN 32// 产生min~max的随机数 (包含min和max)// rand函数产生0 ~ RAND_MAX 的随机数// 一般上不同编译器要求 RAND_MAX 的值(至少)为 32767#define RAN…

C语言case字句有什么作用,switch case 语句的使用规则

【规则1-21】按字母或数字顺序排列各条case语句。如果所有的case语句没有明显的重要性差别,那就按A-B-C或1-2-3等顺序排列case语句。这样做的话,你可以很容易的找到某条case语句。比如:switch(variable){case A://program codebreak;case B:/…

c语言怎样用格式化文件存储,如何用格式化的方式读写文件

对格式会来说,C语言的格式读写文件是很有要求的,在前面我们已经讲解了如何去进行字符的输入输出,但事实真相,数据的类型是很丰富的,而且大家已经熟悉了用printf和scanf函数进行格式化的输入输出,他们是向终…

输出26个英文字母c语言,菜鸟求助,写一个随机输出26个英文字母的程序

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼#include #include // 用srand、rand函数了#include // 用time函数了#define LEN 32// 产生min~max的随机数 (包含min和max)// rand函数产生0 ~ RAND_MAX 的随机数// 一般上不同编译器要求 RAND_MAX 的值(至少)为 32767#define RAN…

二阶矩阵乘法C语言,c语言矩阵相乘

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼程序清单#include&nbspint&nbspmain(void){&nbsp&nbsp&nbsp&nbsp&nbsp&nbsp&nbsp&nbspchar&nbsp&nbsp&nbsp&nbspa[2][3];&nbsp&nbsp&nbsp&nbsp&nbsp…

c语言dll创建线程,教大家写一个远程线程的DLL注入,其实还是蛮简单的……………………...

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼然后新建一个win32 application 的工程 新建c source file 写入:#include#includeint WINAPI WinMain(HINSTANCE hInstance,HINSTANCE hPrevInstance,LPSTR lpCmdLine,int nShowCmd){char DllName[MAX_PATH]"C:\\P…

linux下qq怎么截图,ubuntu 12.04使用QQ截图安装教程

相信用过linux系统的朋友都知道,linux下的截图软件是在不咋的。虽然系统本身有带截图工具,但是却苦于没有办法在截下来的图片上作画圈、写文字说明等动作。应该有不少朋友也是从windows系统下转到linux下做开发的,不知道大家对QQ截图这个软件…

android对象申明,Kotlin中的对象表达式和对象声明的具体使用

Kotlin的对象表达式与Java中的匿名内部类的主要区别:匿名内部类只能指定一个父类型,但对象表达式可以指定0~N个肤类型。一、对象表达式对象表达式的语法格式如下:object [: 0~N个父类型]{//对象表达式的类体部分}对象表达式还有如下规则&…

android+联系服务器时间,android配置时间服务器+亚洲主要的授时服务器

我们知道,Android是通过连接指定的ntpServer来获取网络时间,而不同的服务器带来的延迟也不尽相同,通常情况下,我们自然是期望尽快获取时间,那么我们就需要对ntpServer有一定的筛选,尤其是在选择默认的ntpSe…

android 获取设备的mac地址,Android编程获取设备MAC地址的实现方法

本文实例讲述了Android编程获取设备MAC地址的实现方法。分享给大家供大家参考,具体如下:/*** 获取设备的mac地址** param ac* param callback* 成功获取到mac地址之后会回调此方法*/public static void getMacAddress(final Activity ac, final SimpleCa…

android 资源如何下沉,关于Android业务模块下沉的一些实践及总结

此文已由作者徐铭阳授权网易云社区发布。欢迎访问前言最近在做需求过程中,一些类似学校选择、城市选择等业务相关模块想单独抽离出来,遇到一些诸如模块管理、通信方面的问题来背景最近有一个需求是学校列表,没错,就是我们平时总见…

android sqlite存储数据,Android之SQLite数据存储

关于SQLite的出生长大和壮大,这里就略去了,只记几点比较重要的用法:SQLite所支持的数据类型:SQLite,SQLite3支持 NULL、INTEGER、REAL(浮点数字)、TEXT(字符串文本)和BLOB(二进制对象)数据类型,虽然它支持的…

android gridview滚动条位置,Android GridView滚动到指定位置

当一个列表项目很多,并且每个项目可以进入到其它Activity或者Fragment时,保存之前列表的位置是一个比较不错的功能,今天研究了一下怎么保存浏览位置,发现GridView和它的父类中有4个相关的方法:public void smoothScrol…

android studio 跨进程,Android IPC机制(三)在Android Studio中使用AIDL实现跨进程方法调用...

本文首发于微信公众号「后厂技术官」在上一篇文章Android IPC机制(二)用Messenger进行进程间通信中我们介绍了使用Messenger来进行进程间通信的方法,但是我们能发现Messenger是以串行的方式来处理客户端发来的信息,如果有大量的消息发到服务端&#xff0…

nubia ui 5.0 android,流畅度爆棚 搭Android 5.0系统新机一览

近期各品牌新机都不少,而且90%以上都是Android系统的手机,可见安卓手机的主导地位仍在上升。而在系统层次,Android 5.0已经逐步开始普及,近期上市新机百分百均采用了这一系统,值得一提的是定制不再“深度”&#xff0c…

signature=4d4ce610ff2d4a5f2093452c24b70492,Reading Chromatin Signatures

摘要:The article cites a study which uses a combination of chromatin immunoprecipitation and microarray analysis to explore the histone modifications, transcription-factor binding and nucleosome density in 30 megabytes of human genome. It states…

html 百分比正方形,css实现未知宽度的正方形需求

今天群里有哥们问了一下,百分比宽度的正方形如何用css实现。其实就是不定宽的正方形如何用css实现。第一个方法利用图片的等比例缩放,用base64写一个1*1的透明png图片,宽度100%,这样容器就自动被撑成一个正方形,demo如…

html引用本地图片不能是桌面的,Img标签与本地文件:/// URL不显示在Microsoft Edge Web浏览器...

在我的桌面应用程序中,我创建了一个临时HTML文件(旨在让用户打印报告),然后通过默认显示网页浏览器。这个HTML文件保存在一个临时文件夹,例如:C:/Users/UserName/AppData/Local/TempImg标签与本地文件:/// URL不显示在…