Spark编程实验五:Spark Structured Streaming编程

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Syslog介绍

2、通过Socket传送Syslog到Spark

3、Syslog日志拆分为DateFrame

4、对Syslog进行查询

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Structured Streaming的基本编程方法;
2、掌握日志分析的常规操作,包括拆分日志方法和分析场景。

二、实验内容

1、通过Socket传送Syslog到Spark

        日志分析是一个大数据分析中较为常见的场景。在Unix类操作系统里,Syslog广泛被应用于系统或者应用的日志记录中。Syslog通常被记录在本地文件内,也可以被发送给远程Syslog服务器。Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。

        日志一般会通过Kafka等有容错保障的源发送,本实验为了简化,直接将Syslog通过Socket源发送。新建一个终端,执行如下命令:

$ tail -n+1 -f /var/log/syslog | nc -lk 9988

        “tail -n+1 -f /var/log/syslog”表示从第一行开始打印文件syslog的内容。“-f”表示如果文件有增加则持续输出最新的内容。然后,通过管道把文件内容发送到nc程序(nc程序可以进一步把数据发送给Spark)。

        如果/var/log/syslog内的内容增长速度较慢,可以再新开一个终端(计作“手动发送日志终端”),手动在终端输入如下内容来增加日志信息到/var/log/syslog内:

$ logger ‘I am a test error log message.’

2、对Syslog进行查询

由Spark接收nc程序发送过来的日志信息,然后完成以下任务:

(1)统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。
(2)统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。
(3)输出所有日志内容带error的日志。

三、实验步骤

1、Syslog介绍

        分析日志是一个大数据分析中较为常见的场景。在Unix类操作系统里,Syslog广泛被应用于系统或者应用的日志记录中。Syslog通常被记录在本地文件内,也可以被发送给远程Syslog服务器。Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。

2、通过Socket传送Syslog到Spark

        日志一般会通过kafka等有容错保障的源发送,本实验为了简化,直接将syslog通过Socket源发送。新开一个终端,命令为“tail终端”,输入

tail -n+1 -f /var/log/syslog | nc -lk 9988

        tail命令加-n+1代表从第一行开始打印文件内容。-f代表如果文件有增加则持续输出最新的内容。通过管道发送到nc命令起的在本地9988上的服务上。
        如果/var/log/syslog内的内容增长速度较慢,可以再新开一个终端,命名为“手动发送log终端”,手动在终端输入

logger ‘I am a test error log message.’

来增加日志信息到/var/log/syslog内。

3、Syslog日志拆分为DateFrame

        Syslog每行的数据类似以下:

Nov 24 13:17:01 spark CRON[18455]: (root) CMD (cd / && run-parts --report /etc/cron.hourly)

        最前面为时间,接着是主机名,进程名,可选的进程ID,冒号后是日志内容。在Spark内,可以使用正则表达式对syslog进行拆分成结构化字段,以下是示例代码:

 # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hostname"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)

        to_timestamp(format_string('2018 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),这句是对Syslog格式的一个修正,因为系统默认的Syslog日期是没有年的字段,所以使用format_string函数强制把拆分出来的第一个字段前面加上2019年,再根据to_timestamp格式转换成timestamp字段。在接下来的查询应当以这个timestamp作为事件时间。

4、对Syslog进行查询

由Spark接收nc程序发送过来的日志信息,然后完成以下任务。

(1)统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。

        在新开的终端内输入 vi spark_exercise_testsyslog1.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3from functools import partialfrom pyspark.sql import SparkSession
from pyspark.sql.functions import *if __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredSyslog") \.getOrCreate()lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hostname"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)# (1).  统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。windowedCounts1 = words \.filter("tag = 'CRON'") \.withWatermark("timestamp", "1 minutes") \.groupBy(window('timestamp', "1 hour")) \.count() \.sort(asc('window'))# 开始运行查询并在控制台输出query = windowedCounts1 \.writeStream \.outputMode("complete") \.format("console") \.option('truncate', 'false')\.trigger(processingTime="3 seconds") \.start()query.awaitTermination()

(2)统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。

        在新开的终端内输入 vi spark_exercise_testsyslog2.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3from functools import partialfrom pyspark.sql import SparkSession
from pyspark.sql.functions import *if __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredSyslog") \.getOrCreate()lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hostname"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)# (2).  统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。windowedCounts2 = words \.withWatermark("timestamp", "1 minutes") \.groupBy('tag', window('timestamp', "1 hour")) \.count() \.sort(asc('window'))# 开始运行查询并在控制台输出query = windowedCounts2 \.writeStream \.outputMode("complete") \.format("console") \.option('truncate', 'false')\.trigger(processingTime="3 seconds") \.start()query.awaitTermination()

(3)输出所有日志内容带error的日志。

        在新开的终端内输入 vi spark_exercise_testsyslog3.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3from functools import partialfrom pyspark.sql import SparkSession
from pyspark.sql.functions import *if __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredSyslog") \.getOrCreate()lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(to_timestamp(format_string('2019 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hostname"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)# (3).  输出所有日志内容带error的日志。windowedCounts3 = words \.filter("content like '%error%'")# 开始运行查询并在控制台输出query = windowedCounts3 \.writeStream \.outputMode("update") \.format("console") \.option('truncate', 'false')\.trigger(processingTime="3 seconds") \.start()query.awaitTermination()

四、结果分析与实验体会

        Spark Structured Streaming 是 Spark 提供的用于实时流处理的 API,它提供了一种统一的编程模型,使得批处理和流处理可以共享相同的代码逻辑,让开发者更容易地实现复杂的实时流处理任务。通过对 Structured Streaming 的实验,有以下体会:

  1. 简单易用: Structured Streaming 提供了高级抽象的 DataFrame 和 Dataset API,使得流处理变得类似于静态数据处理,降低了学习成本和编程复杂度。

  2. 容错性强大: Structured Streaming 内置了端到端的 Exactly-Once 语义,能够保证在发生故障时数据处理的准确性,给开发者提供了更可靠的数据处理保障。

  3. 灵活性和扩展性: Structured Streaming 支持丰富的数据源和数据接收器,可以方便地与其他数据存储和处理系统集成,同时也支持自定义数据源和输出操作,满足各种不同场景的需求。

  4. 优化性能: Structured Streaming 内置了优化器和调度器,能够根据任务的特性自动优化执行计划,提升处理性能,同时还可以通过调整配置参数和优化代码来进一步提高性能。

  5. 监控和调试: Structured Streaming 提供了丰富的监控指标和集成的调试工具,帮助开发者实时监控作业运行状态、诊断问题,并进行性能调优。

        通过实验和实践,更深入地理解 Structured Streaming 的特性和工作原理,掌握实时流处理的开发技巧和最佳实践,为构建稳健可靠的实时流处理应用打下坚实基础。

        Syslog 是一种常用的日志标准,它定义了一个网络协议,用于在计算机系统和网络设备之间传递事件消息和警报。通过对 Syslog 的实验,有以下体会:

  1. 灵活性: Syslog 可以用于收集各种类型的事件和日志信息,包括系统日志、安全事件、应用程序消息等等,具有很高的灵活性和可扩展性。

  2. 可靠性: Syslog 提供了可靠的传输和存储机制,确保事件和日志信息不会丢失或损坏,在故障恢复和安全审计方面非常重要。

  3. 标准化: Syslog 是一种通用的日志标准,已经被广泛采用和支持,可以与各种操作系统、应用程序、设备和服务集成,提供了统一的数据格式和接口。

  4. 安全性: Syslog 支持基于 TLS 和 SSL 的加密和身份认证机制,确保传输的信息不会被窃听或篡改,保证了日志传输的安全性。

  5. 可视化: 通过将 Syslog 收集到集中式的日志管理系统中,可以方便地进行搜索、分析和可视化,使日志信息变得更加易于理解和利用。

        通过实验和实践,更深入地了解 Syslog 的工作原理和应用场景,学会如何配置和使用 Syslog,掌握日志收集、存储、分析和可视化的技巧和最佳实践,为构建高效、可靠、安全的日志管理系统打下坚实基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/684500.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

相机图像质量研究(17)常见问题总结:CMOS期间对成像的影响--靶面尺寸

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结:光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结:光学结构对成…

【AIGC】Stable Diffusion的常见错误

Stable Diffusion 在使用过程中可能会遇到各种各样的错误。以下是一些常见的错误以及可能的解决方案: 模型加载错误:可能出现模型文件损坏或缺失的情况。解决方案包括重新下载模型文件,确保文件完整并放置在正确的位置。 依赖项错误&#x…

009集——磁盘详解——电脑数据如何存储在磁盘

很多人也知道数据能够保存是由于设备中有一个叫做「硬盘」的组件存在,但也有很多人不知道硬盘是怎样储存这些数据的。这里给大家讲讲其中的原理。 首先我们要明白的是,计算机中只有0和1,那么我们存入硬盘的数据,实际上也就是一堆0…

Python三级考试笔记

Python三级考试笔记【源源老师】 三级标准 一、 理解编码、数制的基本概念,并且会应用。 1. 能够进行二进制、十进制以及十六进制之间的转换; 2. 理解Python中的数制转换函数。 二、 掌握一维数据的表示和读写方法,能够编写程序处理一维数据…

阿里云幻兽帕鲁服务器手动更新游戏服务端的命令

幻兽帕鲁Windows服务器版手动更新命令: 首先打开服务器桌面的Windows PowerShell工具,找不到就左下角开始菜单,搜索即可。 然后输入下面的命令切换到这个目录: cd C:\steamcmd 接着运行下面的命令: .\steamcmd.ex…

MySQL性能调优篇(7)-MySQL的集群部署和优化

MySQL的集群部署和优化 MySQL是一种常用的关系型数据库管理系统,可以用于存储和管理大量的结构化数据。为了满足高并发和大规模数据存储需求,MySQL的集群部署和优化变得非常重要。本篇博客将介绍MySQL的集群部署方法和一些优化技巧。 一、MySQL集群部署…

2.15作业

1、选择题 1.1、有以下程序 int main() { char a[7]"a0\0a0\0";int i,j; isizeof(a); jstrlen(a); printf("%d %d\n",i,j); } //strlen求出字符串的长度,其实是字符串中字符的个数,不包括\0 程序运行后的输出结果是____C…

【C++】C++11上

C11上 1.C11简介2.统一的列表初始化2.1 {} 初始化2.2 initializer_list 3.变量类型推导3.1auto3.2decltype3.3nullptr 4.范围for循环5.final与override6.智能指针7. STL中一些变化8.右值引用和移动语义8.1左值引用和右值引用8.2左值引用与右值引用比较8.3右值引用使用场景和意义…

【算法设计与分析】搜索旋转排序数组

📝个人主页:五敷有你 🔥系列专栏:算法分析与设计 ⛺️稳中求进,晒太阳 题目 整数数组 nums 按升序排列,数组中的值 互不相同 。 在传递给函数之前,nums 在预先未知的某个下标 k&#xff…

什么是“感知机”?

感知机(神经网络和支持向量机的理论基础) 概念:简单来说,感知机就是一个旨在建立一个线性超平面对线性可分的数据集进行分类的线性模型 分类: 单层感知机多层感知机( Multi-Layer Perceptron&#xff0c…

【lesson55】线程同步

线程同步 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步 例一:我们之前的抢票代码,一个线程把票全抢完了,它错了吗?没错…

【springboot+vue项目(十六)】基于Oauth2的SSO单点登录(三)SpringSecurity+Jwt 整合Aouth2(客户端部分)

要将Spring Security与基于OAuth 2.0的第三方认证系统进行整合,你需要执行以下步骤: 配置Spring Security以使用OAuth 2.0: 添加依赖项:在项目的构建文件(如Maven的pom.xml)中添加适当的依赖项,…

top100-回溯算法专题

回溯算法和深度优先遍历 回溯法采用试错的思想,它尝试分布的去解决一个问题。在分布解决问题的过程中,当它通过尝试发现现有的分布答案不能得到有效的正确的解答的时候,它将取消上一步甚至上级不的计算,再通过其他的可能的分布解答…

Duilib 的WinMain函数学习

之前跑了一个基本例子;接下来准备再做一些; 看着它的WinMain函数里面,有几句不知道需不需要; 它是这样的;从别的示例里面来的; int APIENTRY WinMain(HINSTANCE hInstance, HINSTANCE /*hPrevInstance*/, LPSTR /*lpCmdLine*/, int nCmdShow) {CPaintManagerUI::SetIn…

Qt Creator 继承分文件编写代码流程实现简单案列

Qt Creator 继承分文件流程实现简单案列 打开Qt Creator&#xff0c;新建c项目 添加类 完成之后&#xff0c;会自动生成.h和.cpp文件 一、animal.h文件 主要用来写类&#xff0c;包括成员变量和函数 #ifndef ANIMAL_H #define ANIMAL_H #include <iostream> #inclu…

高效货运 - 华为OD统一考试(C卷)

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 200分 题解&#xff1a; Java / Python / C 题目描述 老李是货运公司承运人&#xff0c;老李的货车额定载货重量为wt&#xff1b;现有两种货物&#xff0c;货物A单件重量为wa&#xff0c;单件运费利润为pa&#xff0c…

【matalab】基于Octave的信号处理与滤波分析案例

一、基于Octave的信号处理与滤波分析案例 GNU Octave是一款开源软件&#xff0c;类似于MATLAB&#xff0c;广泛用于数值计算和信号处理。 一个简单的信号处理与滤波分析案例&#xff0c;说明如何在Octave中生成一个有噪声的信号&#xff0c;并设计一个滤波器来去除噪声。 首…

识别盐构造在预先确定造山带动力学和几何形态方面的重要性和控制作用

近几十年来&#xff0c;理解盐岩的变形已成为许多含盐褶皱冲断带中日益受到关注的研究课题。越来越多的研究指出&#xff0c;继承性正断层及与之相连的盐构造在预先确定造山带动力学和几何形态方面的重要性和控制作用&#xff08;例如&#xff0c;在北石灰岩阿尔卑斯地区有Gran…

从数字孪生到智慧城市:科技引领下的城市未来展望

一、引言 随着科技的飞速发展&#xff0c;数字孪生和智慧城市已成为当今世界城市发展的重要趋势。数字孪生通过建立物理世界的数字模型&#xff0c;为城市管理和规划提供了前所未有的可能性&#xff1b;而智慧城市则借助先进的信息通信技术&#xff0c;使城市运行更加高效、便…

关于 TI Bq40Z551 Cell Swelling Protection的理解

“Cell Swelling Protection”&#xff08;电池膨胀保护&#xff0c;俗称鼓包&#xff09;是指一种保护措施&#xff0c;用于防止充电时电池发生过度膨胀的情况。 当充电电池过度膨胀时&#xff0c;可能会对设备的性能和安全造成威胁&#xff0c;包括电池的寿命缩短、电池损坏…