SparkStreaming--scala

文章目录

  • 第1关:QueueStream
    • 代码
  • 第2关:File Streams
    • 代码


第1关:QueueStream

任务描述
本关任务:编写一个清洗QueueStream数据的SparkStreaming程序。

相关知识
为了完成本关任务,你需要掌握:1.如何使用SparkStreaming,2.如何使用 SparkStreaming读取QueueStream。

SparkStreaming 的开发步骤
初始化SparkConf并设置相关参数
val conf = new SparkConf().setMaster(master).setAppName(appName)

说明:

appName 是应用程序在集群 UI 上显示的名称。

master 是Spark,Mesos或YARN集群的URL,或在本地模式下运行使用 local[*]

初始化JavaStreamingContext并设置处理批次的时间
val ssc = new StreamingContext(conf, Seconds(1))

设置数据源
例如:

val inputStream = ssc.queueStream(rddQueue)

批次数据处理(使用相关算子完成相应的操作)
算子 含义
map(func) 通过将源DStream的每个元素传递给函数func来返回一个新的DStream
flatMap(func) 与map类似,但每个输入项可以映射到0个或更多输出项。
filter(func) 通过仅选择func返回true的源DStream的记录来返回新的DStream
repartition(numPartitions) 通过创建更多或更少的分区来更改此DStream中的并行度级别
union(otherStream) 返回一个新的DStream,它包含源DStream和otherDStream中元素的并集
count() 通过计算源DStream的每个RDD中的元素数量,返回单元素RDD的新DStream
reduce(func) 通过使用函数func(它接受两个参数并返回一个)聚合源DStream的每个RDD中的元素,返回单元素RDD的新DStream 。该函数应该是关联的和可交换的,以便可以并行计算
countByValue() 当在类型为K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在源DStream的每个RDD中的频率
reduceByKey(func,[ numTasks ]) 当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数聚合每个键的值。注意:默认情况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性确定spark.default.parallelism进行分组。您可以传递可选numTasks参数来设置不同数量的任务
join(otherStream, [numTasks]) 当在(K,V)和(K,W)对的两个DStream上调用时,返回(K,(V,W))对的新DStream与每个键的所有元素对
cogroup(otherStream, [numTasks]) 当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream
transform(func) 通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回新的DStream。这可以用于在DStream上执行任意RDD操作
updateStateByKey(func) 返回一个新的“状态”DStream,其中通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态
foreachRDD(func) 最通用的输出运算符,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD。
启动流计算
ssc.start();

等待处理停止
ssc.awaitTermination();

QueueStream
QueueStream(队列流):推入队列的每个RDD将被视为DStream中的一批数据,并像流一样处理。

编程要求
在右侧编辑器补充代码,完成以下需求:

将时间戳转换成规定格式的时间形式(格式为:yyyy-MM-dd HH:mm:ss )

提取数据中的起始URL(切割符为空格)

拼接结果数据,格式如下:

Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl: https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200
将最终结果写入Mysql数据库
测试说明
平台将对你编写的代码进行评测:

预期输出:

1 Ip:100.143.124.29,visitTime:2017-10-27 14:58:05,startUrl:www/1,targetUrl:https://www.baidu.com/s?wd=反叛的鲁鲁修,statusCode:404
2 Ip:30.132.167.100,visitTime:2018-12-02 11:29:39,startUrl:www/4,targetUrl:-,statusCode:302
3 Ip:30.156.187.132,visitTime:2016-05-17 17:18:56,startUrl:www/2,targetUrl:-,statusCode:200
4 Ip:29.100.10.30,visitTime:2016-10-12 01:25:47,startUrl:www/3,targetUrl:http://cn.bing.com/search?q=游戏人生,statusCode:302
5 Ip:132.187.167.143,visitTime:2017-01-08 23:21:09,startUrl:pianhua/130,targetUrl:-,statusCode:200
6 Ip:143.187.100.10,visitTime:2016-09-21 19:27:39,startUrl:www/1,targetUrl:-,statusCode:302
7 Ip:10.100.124.30,visitTime:2018-09-16 02:49:35,startUrl:www/4,targetUrl:http://cn.bing.com/search?q=来自新世界,statusCode:200
8 Ip:29.10.143.187,visitTime:2017-09-29 15:49:09,startUrl:www/1,targetUrl:-,statusCode:404
9 Ip:29.187.132.100,visitTime:2018-11-27 05:43:17,startUrl:www/1,targetUrl:-,statusCode:200
10 Ip:187.167.124.132,visitTime:2016-01-28 13:34:33,startUrl:www/6,targetUrl:-,statusCode:200
开始你的任务吧,祝你成功!

代码

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutableobject QueueStream {def main(args: Array[String]) {val rddQueue = new mutable.SynchronizedQueue[RDD[String]]()val conf = new SparkConf().setMaster("local[2]").setAppName("queueStream")/********** Begin **********///1.初始化StreamingContext,设置时间间隔为1sval ssc = new StreamingContext(conf, Seconds(1))//2.对接队列流val inputStream = ssc.queueStream(rddQueue)/**** 数据格式如下:*      100.143.124.29,1509116285000,'GET www/1 HTTP/1.0',https://www.baidu.com/s?wd=反叛的鲁鲁修,404* 数据从左往右分别代表:用户IP、访问时间戳、起始URL及相关信息(访问方式,起始URL,http版本)、目标URL、状态码*** 原始数据的切割符为逗号,(英文逗号)** 需求:*      1.将时间戳转换成规定时间(格式为:yyyy-MM-dd HH:mm:ss )*      2.提取数据中的起始URL(切割符为空格)*      3.拼接结果数据,格式如下:* Ip:124.132.29.10,visitTime:2019-04-22 11:08:33,startUrl:www/2,targetUrl:https://search.yahoo.com/search?p=反叛的鲁鲁修,statusCode:200*      4.将最终结果写入 mysql 数据库, 调用DBUtils.add(line)即可, line:String*///3.获取队列流中的数据,进行清洗、转换(按照上面的需求)val data = inputStream.map(data=>{val dataliat = data.split(',')val ip = dataliat(0)val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val lt = dataliat(1).toLongval date = new Date(lt)val visitTime = simpleDateFormat.format(date)val startUrl = dataliat(2).split(' ')(1)val targetUrl= dataliat(3)val statusCode = dataliat(4)val result = "Ip:" + ip + ",visitTime:" + visitTime + ",startUrl:" + startUrl + ",targetUrl:" + targetUrl + ",statusCode:" + statusCoderesult})//4.将最终结果写入 mysql 数据库, 调用DBUtils.add(line)即可, line:Stringdata.foreachRDD(rdd => {rdd.foreachPartition(it => {it.foreach(line => {DBUtils.add(line)})})})//5.启动SparkStreamingssc.start()/********** End **********/DBUtils.addQueue(ssc, rddQueue)}
}

在这里插入图片描述

第2关:File Streams

任务描述
本关任务:编写一个清洗File Streams数据的SparkStreaming程序。

相关知识
为了完成本关任务,你需要掌握:1. 文件流,2. SparkStreaming的编程流程。

文件流
文件流(File Streams):从与HDFS API兼容的任何文件系统上的文件读取数据

通过文件流创建Dstream:

val lines=streamingContext.fileStreamKeyClass,ValueClass, InputFormatClass

对于简单的文本文件,有一种更简单的方法:

val lines=streamingContext.textFileStream(dataDirectory)

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的任何文件(不支持嵌套目录中写入的文件)。

注意:

文件必须具有相同的数据格式。

文件移动到该目录后,不能在添加新数据,即使添加也不会读取新数据。

只会监听目录下在程序启动后新增的文件,不会去处理历史上已经存在的文件。

说明:文件流不需要运行receiver,因此不需要分配core

SparkStreaming编程流程
设置SparkConf相关参数
val conf = new SparkConf().setMaster(master).setAppName(appName)

初始化StreamingContext
val ssc = new StreamingContext(conf, Seconds(1))
Seconds(1)表示每一秒处理一个批次;

设置数据源创建Dstream
val lines = ssc.textFileStream(dataDirectory)

通过将转换和输出操作应用于DStream来定义流式计算
比如flatmap,map,foreachRDD,updateStateByKey等等;

启动流计算
ssc.start();

等待处理停止
ssc.awaitTermination();

编程要求
在右侧编辑器中补全代码,要求如下:

/root/step11_fils下有两个文件,文件内容分别为:
hadoop hadoop hadoop hadoop hadoop hadoop hadoop hadoop spark spark
hello hello hello hello hello hello hello hello study study
要求清洗数据并实时统计单词个数,并将最终结果导入MySQL
step表结构:

列名 数据类型 长度 非空
word varchar 255 √
count int 255 √
测试说明
平台会对你编写的代码进行测试:

预期输出:

hadoop 8
spark 2
hello 8
study 2

代码

package com.sanyiqiimport java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object SparkStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("edu").setMaster("local")/********** Begin **********///1.初始化StreamingContext,设置时间间隔为1sval ssc = new StreamingContext(conf, Seconds(1))//2.设置文件流,监控目录/root/step11_filsval lines = ssc.textFileStream("/root/step11_fils")/* *数据格式如下:hadoop hadoop spark spark*切割符为空格*需求:*累加各个批次单词出现的次数*将结果导入Mysql*判断MySQL表中是否存在即将要插入的单词,不存在就直接插入,存在则把先前出现的次数与本次出现的次数相加后插入*库名用educoder,表名用step,单词字段名用word,出现次数字段用count*///3.对数据进行清洗转换val wordcount = lines.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)//4.将结果导入MySQLwordcount.foreachRDD(rdd => {rdd.foreachPartition(f = eachPartition => {val connection: Connection = createConnection()eachPartition.foreach(f = record => {val querySql = "SELECT t.count FROM step t WHERE t.word = '" + record._1 + "'"val queryResultSet: ResultSet = connection.createStatement().executeQuery(querySql)val hasNext = queryResultSet.next()print("MySQL had word:" + record._1 + " already  :  " + hasNext)if (!hasNext){val insertSql = "insert into step(word,count) values('" + record._1 + "'," + record._2 + ")"connection.createStatement().execute(insertSql)} else {val newWordCount = queryResultSet.getInt("count") + record._2val updateSql = "UPDATE step SET count = " + newWordCount + " where word = '" + record._1 + "'"connection.createStatement().execute(updateSql)}})connection.close()})})//5.启动SparkStreamingssc.start()/********** End **********/Thread.sleep(15000)ssc.awaitTermination()ssc.stop()}/***获取mysql连接*@return*/def createConnection(): Connection ={Class.forName("com.mysql.jdbc.Driver")DriverManager.getConnection("jdbc:mysql://localhost:3306/educoder","root","123123")}
}

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/46385.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OrangePi AI Pro 实测:感受 AI 应用的独特魅力与强大性能

OrangePi AiPro介绍和初始化配置 小寒有话说一、OrangePi AiPro介绍1. 主板详情2. 开发配置3. 镜像烧录4. 设备连接5. WiFi连接6. NVMe SSD的安装和挂载7. 更新下载源并下载必要的软件8. 扩展内存 二、Jupyter Lab AI测评应用案例1. 获取Jupyter Lab 网址链接2. 图像提取文字3.…

帕金森病患者应该如何进行日常锻炼以提高生活质量?

帕金森病患者的日常锻炼建议 帕金森病患者进行日常锻炼对于改善症状、维持肌肉功能和延缓疾病进展至关重要。以下是一些具体的锻炼建议: 选择适合的运动类型:帕金森病患者应选择低冲击、有氧的活动,如散步、骑自行车、游泳和太极拳等。这些运…

【qt】考试系统项目

话不多说,先一睹芳颜 咱们的账号,题库和答案都是通过文件获取的. 话不多说,直接开干 目录 一.登录窗口1.界面设计2.邮箱验证3.登录验证 二.题库窗口1.考试计时2.布局管理器3.题库显示4.按钮布局5.计算分数 三.窗口交互四.完整代码五.结语 一.登录窗口 1.界面设计 这里添加背…

从信息化、数字化、智能化到企业大模型应用

新时代背景下,数字经济发展速度之快、辐射范围之广、影响程度之深前所未有,5G、大数据、云计算、人工智能、区块链等技术加速创新,全域融入经济社会、民生服务全过程,成为资源要素重组、经济结构重塑、竞争格局重构的关键力量。千…

Visual Studio 安装程序无法执行修复或更新

一.问题场景 出现问题的场景:当你的VS已经安装但是无法在工具中下载新组件或者卸载了当时一直无法安装。 二.问题原因 如果计算机上的 Visual Studio 实例已损坏,则可能会出现此问题。 三.解决方法 如果之前尝试修复或更新 Visual Studio 失败&…

浅谈RLHF---人类反馈强化学习

浅谈RLHF(人类反馈强化学习) RLHF(Reinforcement Learning fromHuman Feedback)人类反馈强化学习 RLHF是[Reinforcement Learning from Human Feedback的缩写,即从人类反馈中进行强化学习。这是一种结合了机器学习中…

51单片机6(P0P1P2P3结构框架图)

一、GPIO结构框架图与工作原理 1、接下来我们介绍一下这个GPIO结构框图和工作原理,我们使用51单片机的GPIO分为了P0,P1,P2,P3这四组端口,下面我们就分别来介绍这四组端口它的一个内部结构,只有了解了内部的…

[PM]原型与交互设计

原型分类 1.草图原型 手绘图稿, 规划的早期,整理思路会使用 2.低保真原型 简单交互, 无需配色, 黑白灰为主, 产品规划和评审阶段使用 标准化的低保真原型是高保真原型的基础 3.高保真原型 复杂交互, 一般用于公开演示, 产品先产出低保真原型, 设计师根据原型产出设计稿 低保…

Vue3学习体验(一)

搭建工程 使用vue-cli脚手架创建vue3工程 vue create vue3-app-vue-cliVue-cli官网:https://cli.vuejs.org/zh/guide/installation.html 使用vite搭建vue3工程 npm init表示临时的下载vite应用来创建vue3工程,工程名称为vue3-app-vite npm init vit…

mount挂载

1)Vmvare挂载光驱设备 安装光驱设备后,可以看到设备文件。 ls /dev/sr0 ll /dev/cdrom虽然设备是以文件的形式出现的,但和一般的文件不一样。 2)mount挂载 目录是目录,设备是设备,mount挂载可以让目录成…

数据结构——查找(线性表的查找与树表的查找)

目录 1.查找 1.查找的基本概念 1.在哪里找? 2.什么查找? 3.查找成功与否? 4.查找的目的是什么? 5.查找表怎么分类? 6.如何评价查找算法? 7.查找的过程中我们要研究什么? 2.线性表…

Spring webflux基础核心技术

一、 用操作符转换响应式流 1 、 映射响应式流元素 转换序列的最自然方式是将每个元素映射到一个新值。 Flux 和 Mono 给出了 map 操作符&#xff0c;具有 map(Function<T&#xff0c;R>) 签名的方法可用于逐个处理元素。 当操作符将元素的类型从 T 转变为 R 时&#xf…

基于conda包的环境创建、激活、管理与删除

Anaconda是一个免费、易于安装的包管理器、环境管理器和 Python 发行版&#xff0c;支持平台包括Windows、macOS 和 Linux。下载安装地址&#xff1a;Download Anaconda Distribution | Anaconda 很多不同的项目可能需要使用不同的环境。例如某个项目需要使用pytorch1.6&#x…

SAP 消息输出 - Adobe Form

目录 1 安装链接 2 前台配置 - Fiori app 2.1 维护表单模板 (maintain form templates) 2.2 管理微标 (manage logos) 2.3 管理文本 (manage texts) 3 后台配置 3.1 定义表单输出规则 3.2 分配表单模板 SAP 消息输出&#xff0c;不仅是企业内部用来记录关键业务操作也是…

【学习笔记】无人机(UAV)在3GPP系统中的增强支持(一)-3GPP TR 22.829 V17.1.0技术报告

本文是3GPP TR 22.829 V17.1.0技术报告&#xff0c;专注于无人机&#xff08;UAV&#xff09;在3GPP系统中的增强支持。文章提出了多个无人机应用场景&#xff0c;分析了相应的能力要求&#xff0c;并建议了新的服务级别要求和关键性能指标&#xff08;KPIs&#xff09;。 下载…

算法导论 总结索引 | 第五部分 第十八章:B树

1、B 树是 为磁盘或其他直接存取的辅助存储设备 而设计的一种平衡搜索树。B 树类似于红黑树&#xff0c;在降低磁盘 I/O 操作次数方面要更好一些。许多数据库系统 使用 B 树 或者 B 树 的变种来存储信息 2、B 树与红黑树的不同之处 在于 B 树的结点 可以有很多孩子&#xff0c…

STM32-寄存器点灯案例详解

本文以PA1引脚点亮LED灯为案例&#xff0c;解析了STM32寄存器操作的配置过程&#xff0c;以及从手册查询方法和寄存器配置步骤。 一、概念 1.十六进制和二进制之间相互转换关系 首先&#xff0c;需要了解十六进制和二进制之间的基本转换方法。十六进制是一种基数为16的数制&…

制作显卡版docker并配置TensorTR环境

感谢阅读 相关概念docker准备下载一个自己电脑cuda匹配的docker镜像拉取以及启动镜像安装cudaTensorRT部署教程 相关概念 TensorRT是可以在NVIDIA各种GPU硬件平台下运行的一个模型推理框架&#xff0c;支持C和Python推理。即我们利用Pytorch&#xff0c;Tensorflow或者其它框架…

frameworks 之FallbackHome

frameworks 之FallbackHome FallbackHome 启动启动 Activity 流程创建进程ActivityThrad 与 AMS启动真正的 Launcher mActivityManagerService 创建后会启动 FallbackHome 再启动桌面程序。因为此时还没解锁&#xff0c;桌面又涉及很多其他应用程序相关&#xff0c;所以要等待用…

【Python】数据处理(mongodb、布隆过滤器、索引)

数据 数据预处理 df pd.read_csv(file_path, encodingANSI) csv的编码方式一定要用 ANSI。要不然会出现各种报错 import pandas as pd from datetime import datetime# 读取CSV文件 file_path book_douban.csv df pd.read_csv(file_path, encodingANSI)# 定义一个函数来…