实验五 Spark Structured Streaming编程实践

一、编写程序

(1). 按照tag分组统计生成的日志数。

在新开的终端内输入 vi spark_exercise_testsyslog1.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3from functools import partialfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *if __name__ == "__main__":spark = SparkSession \.builder \.appName("Structuredcronlog") \.getOrCreate()lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hadooplyf316"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)# (1).  按照tag分组统计日志数。windowedCounts1 = words \.groupBy("tag") \.count()# 开始运行查询并在控制台输出query = windowedCounts1 \.writeStream \.outputMode("append") \.format("console") \.option('truncate', 'false')\.trigger(processingTime="3 seconds") \.start()query.awaitTermination()

(2).输出所有日志内容带spark的日志。

在新开的终端内输入 vi spark_exercise_testsyslog3.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3from functools import partialfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import *if __name__ == "__main__":spark = SparkSession \.builder \.appName("Structuredcronlog") \.getOrCreate()lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)# 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段fields = partial(regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$")words = lines.select(unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),fields(idx=2).alias("hostname"),fields(idx=3).alias("tag"),fields(idx=4).alias("content"),)# (3).  输出所有日志内容带spark的日志(根据自己模拟的日志内容进行筛选)。windowedCounts3 = words \.filter("content like '%spark%'")# 开始运行查询并在控制台输出query = windowedCounts3 \.writeStream \.outputMode("append") \.format("console") \.option('truncate', 'false')\.trigger(processingTime="3 seconds") \.start()query.awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/8476.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

onlyoffice容器打包成镜像

书接上篇,onlyoffice容器已经更改在本地docker环境中了,之后需要部署到测试环境的docker中,采用容器打包成本地镜像 1、本地docker 查看容器:docker ps 生成镜像:docker commit -p blissful_lichterman 重命名镜像&a…

【大学物理】双语合集听课笔记

7.5 angular momentu(角动量)_哔哩哔哩_bilibili 6.4Energy in Rotation Motion 有质量有速度的物体有动能,是不是很有道理 international system(from French systeme international,acronym,SI)of ineria kg*m^2 转…

pycharm中导入rospy(ModuleNotFoundError: No module named ‘rospy‘)

1. ubuntu安装对应版本ros ubuntu20.04可参考: https://wiki.ros.org/cn/noetic/Installation/Ubuntuhttps://zhuanlan.zhihu.com/p/515361781 2. 安装python3-roslib sudo apt-get install python3-roslib3.在conda环境中安装rospy pip install rospkg pip in…

【Git】Git学习-17:git rebase,且解决合并冲突

学习视频链接:【GeekHour】一小时Git教程_哔哩哔哩_bilibili​编辑https://www.bilibili.com/video/BV1HM411377j/?vd_source95dda35ac10d1ae6785cc7006f365780 理论 git rebase 目标分支:把当前分支的提交,从与目标分支的共同主祖先处断开…

js如何控制一次只加载一张图片,加载完成后再加载下一张

公众号:程序员白特,欢迎一起交流学习~ 原文:https://juejin.cn/post/7340167256267391012 今天看到一个面试题,是关于img图片加载方面的,有必要记录一下。其实关于这个问题,只要知道图片什么时候加载完成就…

分割模型Maskformer系列

maskformer:Per-Pixel Classification is Not All You Need for Semantic Segmentation 论文地址:https://arxiv.org/pdf/2107.06278 1.概述 传统的语义分割方法通常采用逐像素分类(per-pixel classification),而实…

linux安装Redis 7.2.4笔记

一.保姆级安装 1.下载Redis 7.2.4安装包 sudo wget https://download.redis.io/releases/redis-7.2.4.tar.gz2.解压,可以指定 sudo tar -zvxf redis-7.2.4.tar.gz 3.检测并安装 GCC 编译器: yum 是基于 Red Hat 的 Linux 发行版(如 CentOS、…

CSRF漏洞简介

csrf简介 CSRF 全称为跨站请求伪造( Cross-site request forgery ),是一种网络攻击方式,在 CSRF 的攻击场景中攻击者会伪造一个请求(这个请求一般是一个链接),然后欺骗目标用户进行点击&#xf…

Lora基础炼丹学习笔记

1、收集数据集 20-30张人物各个角度、各个姿势的图片 2、图片预处理 裁剪 打标签 裁剪必须也要512 * 512 ,因为sd1.5就是用这个尺寸训练的,可以使用后期处理 打标可以勾选这个,Deepbooru对二次元画风更友好 打标也可以使用wb14-tagger的…

Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

背景 在上一篇博客中我们分析了代码中barrier的是如何流动改的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客 最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 现在我们接着跟踪相应代…

FTTR(光猫)ITMS注册NCE纳管

ITMS注册 TR069交互过程: 1.1. TR069交互—主动连接机制 主动连接机制是指CPE主动发出请求连接事件(事件可以为: 0 BOOTSTRAP; 1 BOOT; PERIODIC等等)给ACS。在连接建立之后才能进行业务处理(通过调用RPC方法实现)。 备注:政企…

2024.5.8

聊天框完善 #include "mywidget.h" #include "ui_mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent), ui(new Ui::MyWidget) {ui->setupUi(this);//设置窗口大小this->resize(400,560);//设置窗口图标和标题this->setWindowTit…

Android C++ 开发调试 LLDB 工具的使用

文章目录 调试环境准备基础命令Breakpoint CommandsWatchpoint CommandsExamining VariablesEvaluating ExpressionsExamining Thread StateExecutable and Shared Library Query Commands 参考: Android 中在进行 NDK 开发的时候,我们经常需要进行 C 代…

隐式3D形状表示:Occupancy Networks

OccNet 的关键思想是隐式地表示3D形状,而不是显式地表示。与直接编码形状几何信息不同,OccNet 将形状的表面建模为非线性分类器的决策边界。 隐式表示:Occupancy Networks 将 3D 形状表示为非线性分类器函数的决策边界 f θ : R 3 X → [ 0…

2024年颠覆商业模式《本草生活》项目,巧妙三招营销引流裂变套路

2024年颠覆商业模式《本草生活》项目,巧妙三招营销引流裂变套路 文丨微三云营销总监胡佳东,点击上方“关注”,为你分享市场商业模式电商干货。 - 引言:现如今流量枯竭、降本增效、红利不再已是线上营销的常态,互联网…

静态照片怎么合成gif?详细介绍一个方法

我们在各大平台中都能看到各种样式的gif动图。Gif动图其实就是由一帧一帧的静态图片合成的动态效果的gif,想要制作gif动画可以通过使用在线图片合成(https://www.gif5.net/)工具-GIF5工具网,手机、pc均可操作,只需要上…

nestjs 全栈进阶--自定义装饰器

视频教程 20_nest中自定义装饰器_哔哩哔哩_bilibili nest new custom-decorator -p pnpm pnpm start:dev 在Nestjs 中我们使用了大量装饰器 decorator ,所以Nestjs 也允许我们去自定义装饰器。 1. 自定义方法装饰器 nest g decorator aaa --flat 它生产的代码…

详细分析McCabe环路复杂度(附例题)

目录 前言1. 基本知识2. 例题 前言 该知识点常出在408或者软考中,对此此文重点讲讲理论知识以及例题 对于例题平时看到也会更新 1. 基本知识 McCabe环路复杂度是一种用于衡量软件代码复杂性的指标,主要是通过计算代码中的控制流图中的环路数量来衡量…

机房——蓝桥杯十三届2022国赛大学B组真题

问题分析 这题用深搜广搜都能做,不过我更倾向于用广搜,因为广搜能更容易找到目标点。那么是采用结构体存储边还是采用二维数组存储临接矩阵呢?我们注意到n的取值范围为1e5,用二维数组哪怕是bool类型就需要至少1e10Byte的连续空间,这个空间太大…

5V升8.4V2A升压恒压WT3231

5V升8.4V2A升压恒压WT3231 WT3231 是一种高性能直流-直流(DC-DC)转换器,集成了能够承受10A电流和26mΩ低导通电阻的功率MOSFET。该转换器能提供高达12V的稳定输出电压,并具有固定600KHz开关频率,使得小型外部电感和电…