SparkStreaming_window_sparksql_reids

1.5 window

滚动窗口+滑动窗口

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

  1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

  2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的cron表达式吧。

案例实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/*** 统计,截止到目前为止出现的每一个key的次数* window窗口操作,每个多长M时间,通过过往N长时间内产生的数据* M就是滑动长度sliding interval* N就是窗口长度window length*/
object Demo05_WCWithWindow {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCountUpdateStateByKey").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val ssc = new StreamingContext(conf, duration)val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
​val ret:DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_+_,windowDuration = Seconds(batchInterval * 3),slideDuration = Seconds(batchInterval * 2))
​ret.print()
​ssc.start()ssc.awaitTermination()}
}

1.6 SparkSQL和SparkStreaming的整合案例

Spark最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:top3的商品排序: 最新的top3

这里就是基于updatestateByKey,统计截止到目前为止的不同品类下的商品销量top3

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/*** SparkStreaming整合SparkSQL的案例之,热门品类top3排行* 输入数据格式:* id brand category* 1 huwei watch* 2 huawei phone**/
object Demo06_SQLWithStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingIntegerationSQL").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val spark = SparkSession.builder().config(conf).getOrCreate()val ssc = new StreamingContext(spark.sparkContext, duration)ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk-1")val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)//001 mi moblieval pairs:DStream[(String, Int)] = lines.map(line => {val fields = line.split("\\s+")if(fields == null || fields.length != 3) {("", -1)} else {val brand = fields(1)val category = fields(2)(s"${category}_${brand}", 1)}}).filter(t => t._2 != -1)
​val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
​usb.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {//category_brand countimport spark.implicits._val df = rdd.map{case (cb, count) => {val category = cb.substring(0, cb.indexOf("_"))val brand = cb.substring(cb.indexOf("_") + 1)(category, brand, count)}}.toDF("category", "brand", "sales")
​df.createOrReplaceTempView("tmp_category_brand_sales")val sql ="""|select|  t.category,|  t.brand,|  t.sales,|  t.rank|from (|  select|    category,|    brand,|    sales,|    row_number() over(partition by category order by sales desc) rank|  from tmp_category_brand_sales|) t|where t.rank < 4|;""".stripMarginspark.sql(sql).show()}})
​ssc.start()ssc.awaitTermination()}
​def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {Option(seq.sum + option.getOrElse(0))}
}

1.7 SparkStreaming整合Reids

//将实时结果写入Redis中
dStream.foreachRDD((w,c)=>{val jedis = new Jedis("192.168.10.101", 6379)   //抽到公共地方即可jedis.auth("root")jedis.set(w.toString(),c.toString())  //一个key对应多个值,可以考虑hset
})

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/585337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

m3u8网络视频文件下载方法

在windows下&#xff0c;使用命令行cmd的命令下载m3u8视频文件并保存为mp4文件。 1.下载ffmpeg&#xff0c;访问FFmpeg官方网站&#xff1a;https://www.ffmpeg.org/进行下载 ffmpeg下载&#xff0c;安装&#xff0c;操作说明 https://blog.csdn.net/m0_53157282/article/det…

下载和安装AD14 - Altium Designer 14.3.20.54863

这个版本应该还支持XP 系统[doge]&#xff0c;总之就是想安装一下&#xff0c;没什么特别的意义。 下载 资源来自毛子网站&#xff1a;https://rutracker.net/forum/viewtopic.php?t5140739&#xff0c;带上个网页翻译插件就行。要用磁力链接下载&#xff0c;推荐用qbittorr…

RabbitMQ之快速入门、上手

前言 学习一样新技术、新框架&#xff0c;最重要的是学习其思想、原理。即原理性思维。 如果是因为工作原因&#xff0c;需要快速上手RabbitMQ&#xff0c;本篇或许适合你。 核心概念 Connection&#xff1a;publisher&#xff0f;consumer 和 broker 之间的 TCP 连接Channel…

Android 理解Context

文章目录 Android 理解ContextContext是什么Activity能直接new吗&#xff1f; Context结构和源码一个程序有几个ContextContext的作用Context作用域获取ContextgetApplication()和getApplicationContext()区别Context引起的内存泄露错误的单例模式View持有Activity应用正确使用…

八数码问题

八数码问题 在3x3的棋盘&#xff0c;摆有八个棋子&#xff0c;每个棋子上标有1至8的某一数字&#xff0c;不同棋子上标的数 字不相同。棋盘上还有一个空格&#xff0c;与空格相邻的棋子可以移到空格中。要求解决的问题 是:给出一个初始状态和一个目标状态&#xff0c;找出一一种…

详解—数据结构—<常用排序>基本实现和代码分析

目录 一.排序的概念及其运用 1.1排序的概念 1.2排序运用​编辑 1.3 常见的排序算法​编辑 二.常见排序算法的实现 2.1 插入排序 2.1.1基本思想&#xff1a; 2.1.2直接插入排序&#xff1a; 2.1.3 希尔排序( 缩小增量排序 ) 2.2 选择排序 2.2.1基本思想&#xff1a; …

日志记录、跟踪和指标

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版&#xff0c;欢迎购买。点击进入详情 日志记录、跟踪和指标是系统可观察性的三大支柱。 下图显示了它们的定义和典型架构。 记录 日志记录系统中的离散事件。例如&#xff0c;我们可以将传入请求或对…

论文阅读——UniRepLKNet

UniRepLKNet: A Universal Perception Large-Kernel ConvNet for Audio, Video, Point Cloud, Time-Series and Image Recognition 当我们将一个33的conv添加到一个小卷积核ConvNet中时&#xff0c;我们预计它会同时产生三种效果——1&#xff09;使感受野更大&#xff0c;2&am…

Python之自然语言处理库snowNLP

一、介绍 SnowNLP是一个python写的类库&#xff0c;可以方便的处理中文文本内容&#xff0c;是受到了TextBlob的启发而写的&#xff0c;由于现在大部分的自然语言处理库基本都是针对英文的&#xff0c;于是写了一个方便处理中文的类库&#xff0c;并且和TextBlob不同的是&…

elasticsearch 笔记三:查询建议介绍、Suggester、自动完成

一、查询建议介绍 1. 查询建议是什么&#xff1f; 查询建议&#xff0c;为用户提供良好的使用体验。主要包括&#xff1a; 拼写检查&#xff1b; 自动建议查询词&#xff08;自动补全&#xff09; 拼写检查如图&#xff1a; 自动建议查询词&#xff08;自动补全&#xff09;…

Rust之构建命令行程序(二):读取文件

开发环境 Windows 10Rust 1.74.1 VS Code 1.85.1 项目工程 这次创建了新的工程minigrep. 读取文件 现在&#xff0c;我们将添加读取file_path参数中指定的文件的功能。首先&#xff0c;我们需要一个样本文件来测试它:我们将使用一个包含少量文本的文件&#xff0c;多行包含一…

【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V2模型算法详解

【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V2模型算法详解 文章目录 【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V2模型算法详解前言ShuffleNet_V2讲解四条实用指导思想G1:相等的通道宽度可以降低存储访问成本G2:大量的分组卷积…

【Shell编程练习】监控内存和磁盘容量,小于给定值时报警

系列文章目录 输出Hello World 通过位置变量创建 Linux 系统账户及密码 系列文章目录分析代码实现运行结果 分析 对于磁盘容量&#xff0c;可以使用df命令查看指定指定分区的磁盘使用情况。比如 然后我们需要从这段输出中提取我们想要的信息。在这里就是Available字段的值。…

【wargames】bandit0~9关wp

第1关直接ssh连接&#xff0c;获得密码NH2SXQwcBdpmTEzi3bvBHMM9H66vVXjL&#xff0c;用这个密码连接第2关 第2关&#xff0c;连接之后查看 存在特殊字符的文件 因为使用 - 作为参数是指 STDIN/STDOUT 即 dev/stdin 或 dev/stdout 。所以如果你想打开这种类型的文件&#xff0…

数据结构--二叉搜索树的实现

目录 1.二叉搜索树的概念 2.二叉搜索树的操作 二叉搜索树的插入 中序遍历(常用于排序) 二叉搜索树的查找 二叉搜索树的删除 完整二叉树代码&#xff1a; 二叉搜索树的应用 key/value搜索模型整体代码 1.二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一…

基于JAVA的考研专业课程管理系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 考研高校模块2.3 高校教师管理模块2.4 考研专业模块2.5 考研政策模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 考研高校表3.2.2 高校教师表3.2.3 考研专业表3.2.4 考研政策表 四、系统展示五、核…

SAP CO系统配置-获利能力分析-(机器人制造项目实例)

创建经营组织 配置路径 IMG菜单路径:企业结构>定义>控制>创建经营组织 事务代码 KEP8 屏幕截图: 维护特性 配置路径

nodejs+vue+ElementUi农产品团购销售系统zto2c

目标是为了完成小区团购平台的设计和实现&#xff0c;在疫情当下的环境&#xff0c;方便小区业主购入生活所需&#xff0c;减小居民的生活压力 采用B/S模式架构系统&#xff0c;开发简单&#xff0c;只需要连接网络即可登录本系统&#xff0c;不需要安装任何客户端。开发工具采…

Python/R/GUI/BI类型常用数据可视化工具

什么是数据可视化工具&#xff1f; 数据可视化工具是指旨在可视化数据的所有形式的软件。它们处理数据输入&#xff0c;将其转换为用户可以根据自己的需求进行定制的视觉效果。 不同的工具可以包含不同的功能&#xff0c;但最基本的是&#xff0c;数据可视化工具提供输入数据集…

CDN:内容分发的高速公路(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…