SparkStreaming_window_sparksql_reids

1.5 window

滚动窗口+滑动窗口

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

  1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

  2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的cron表达式吧。

案例实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/*** 统计,截止到目前为止出现的每一个key的次数* window窗口操作,每个多长M时间,通过过往N长时间内产生的数据* M就是滑动长度sliding interval* N就是窗口长度window length*/
object Demo05_WCWithWindow {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCountUpdateStateByKey").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val ssc = new StreamingContext(conf, duration)val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
​val ret:DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_+_,windowDuration = Seconds(batchInterval * 3),slideDuration = Seconds(batchInterval * 2))
​ret.print()
​ssc.start()ssc.awaitTermination()}
}

1.6 SparkSQL和SparkStreaming的整合案例

Spark最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:top3的商品排序: 最新的top3

这里就是基于updatestateByKey,统计截止到目前为止的不同品类下的商品销量top3

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/*** SparkStreaming整合SparkSQL的案例之,热门品类top3排行* 输入数据格式:* id brand category* 1 huwei watch* 2 huawei phone**/
object Demo06_SQLWithStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingIntegerationSQL").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val spark = SparkSession.builder().config(conf).getOrCreate()val ssc = new StreamingContext(spark.sparkContext, duration)ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk-1")val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)//001 mi moblieval pairs:DStream[(String, Int)] = lines.map(line => {val fields = line.split("\\s+")if(fields == null || fields.length != 3) {("", -1)} else {val brand = fields(1)val category = fields(2)(s"${category}_${brand}", 1)}}).filter(t => t._2 != -1)
​val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
​usb.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {//category_brand countimport spark.implicits._val df = rdd.map{case (cb, count) => {val category = cb.substring(0, cb.indexOf("_"))val brand = cb.substring(cb.indexOf("_") + 1)(category, brand, count)}}.toDF("category", "brand", "sales")
​df.createOrReplaceTempView("tmp_category_brand_sales")val sql ="""|select|  t.category,|  t.brand,|  t.sales,|  t.rank|from (|  select|    category,|    brand,|    sales,|    row_number() over(partition by category order by sales desc) rank|  from tmp_category_brand_sales|) t|where t.rank < 4|;""".stripMarginspark.sql(sql).show()}})
​ssc.start()ssc.awaitTermination()}
​def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {Option(seq.sum + option.getOrElse(0))}
}

1.7 SparkStreaming整合Reids

//将实时结果写入Redis中
dStream.foreachRDD((w,c)=>{val jedis = new Jedis("192.168.10.101", 6379)   //抽到公共地方即可jedis.auth("root")jedis.set(w.toString(),c.toString())  //一个key对应多个值,可以考虑hset
})

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/585337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

m3u8网络视频文件下载方法

在windows下&#xff0c;使用命令行cmd的命令下载m3u8视频文件并保存为mp4文件。 1.下载ffmpeg&#xff0c;访问FFmpeg官方网站&#xff1a;https://www.ffmpeg.org/进行下载 ffmpeg下载&#xff0c;安装&#xff0c;操作说明 https://blog.csdn.net/m0_53157282/article/det…

下载和安装AD14 - Altium Designer 14.3.20.54863

这个版本应该还支持XP 系统[doge]&#xff0c;总之就是想安装一下&#xff0c;没什么特别的意义。 下载 资源来自毛子网站&#xff1a;https://rutracker.net/forum/viewtopic.php?t5140739&#xff0c;带上个网页翻译插件就行。要用磁力链接下载&#xff0c;推荐用qbittorr…

RabbitMQ之快速入门、上手

前言 学习一样新技术、新框架&#xff0c;最重要的是学习其思想、原理。即原理性思维。 如果是因为工作原因&#xff0c;需要快速上手RabbitMQ&#xff0c;本篇或许适合你。 核心概念 Connection&#xff1a;publisher&#xff0f;consumer 和 broker 之间的 TCP 连接Channel…

Android 理解Context

文章目录 Android 理解ContextContext是什么Activity能直接new吗&#xff1f; Context结构和源码一个程序有几个ContextContext的作用Context作用域获取ContextgetApplication()和getApplicationContext()区别Context引起的内存泄露错误的单例模式View持有Activity应用正确使用…

Springboot2+mybatisplus+多数据源更换mysql数据库为pgsql

前提:Springboot2+mybatisplus+多数据源mysql,现在需要把数据源2更换为pgsql 一、pom文件修改 增加pgsql驱动 <postgresql.version>42.7.1</postgresql.version> <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --><dependency&…

八数码问题

八数码问题 在3x3的棋盘&#xff0c;摆有八个棋子&#xff0c;每个棋子上标有1至8的某一数字&#xff0c;不同棋子上标的数 字不相同。棋盘上还有一个空格&#xff0c;与空格相邻的棋子可以移到空格中。要求解决的问题 是:给出一个初始状态和一个目标状态&#xff0c;找出一一种…

centos 防火墙 设置 LTS

centos 防火墙 设置 LTS https://blog.csdn.net/m0_58805648/article/details/130671008

详解—数据结构—<常用排序>基本实现和代码分析

目录 一.排序的概念及其运用 1.1排序的概念 1.2排序运用​编辑 1.3 常见的排序算法​编辑 二.常见排序算法的实现 2.1 插入排序 2.1.1基本思想&#xff1a; 2.1.2直接插入排序&#xff1a; 2.1.3 希尔排序( 缩小增量排序 ) 2.2 选择排序 2.2.1基本思想&#xff1a; …

日志记录、跟踪和指标

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版&#xff0c;欢迎购买。点击进入详情 日志记录、跟踪和指标是系统可观察性的三大支柱。 下图显示了它们的定义和典型架构。 记录 日志记录系统中的离散事件。例如&#xff0c;我们可以将传入请求或对…

CentOS 8 上安装 Python 3.10.12

以下是在 CentOS 8 上安装 Python 3.10.12 的全流程&#xff0c;包括下载、编译和安装。请在执行这些步骤之前确保您具有足够的权限。 安装编译依赖项&#xff1a; sudo dnf install -y gcc openssl-devel bzip2-devel libffi-devel zlib-devel readline-devel sqlite-devel下载…

论文阅读——UniRepLKNet

UniRepLKNet: A Universal Perception Large-Kernel ConvNet for Audio, Video, Point Cloud, Time-Series and Image Recognition 当我们将一个33的conv添加到一个小卷积核ConvNet中时&#xff0c;我们预计它会同时产生三种效果——1&#xff09;使感受野更大&#xff0c;2&am…

Linux:多文件编辑

多文件编辑 1.使用vim编辑多个文件 编辑多个文件有两种形式&#xff0c;一种是在进入vim前使用的参数就是多个文件。另一种就是进入vim后再编辑其他的文件。 同时创建两个新文件并编辑 $ vim 1.txt 2.txt默认进入1.txt文件的编辑界面 命令行模式下输入:n编辑2.txt文件&…

从C到C++1

一.思想过渡 前言&#xff1a;明确地说&#xff0c;学了C语言就相当于学了 C 的一半&#xff0c;从C语言转向 C 时&#xff0c;不需要再从头开始&#xff0c;接着C语言往下学就可以&#xff0c;所以我强烈建议先学C语言再学 C。 1.面向过程与面向对象 ​ 从“学院派”的角度来…

Python之自然语言处理库snowNLP

一、介绍 SnowNLP是一个python写的类库&#xff0c;可以方便的处理中文文本内容&#xff0c;是受到了TextBlob的启发而写的&#xff0c;由于现在大部分的自然语言处理库基本都是针对英文的&#xff0c;于是写了一个方便处理中文的类库&#xff0c;并且和TextBlob不同的是&…

在ubuntu上挂载QNX 镜像

步骤 1&#xff0c;将QNX imge转换成android sparse镜像 这个QNX镜像可以是直接从QNX分区读取得到或者你的刷机包中的镜像&#xff1a; rootubuntu:~/workspace/$ file qnx_img.img qnx_img.img: DOS/MBR boot sector使用python tools/mksparse.py $镜像文件 转换为android …

elasticsearch 笔记三:查询建议介绍、Suggester、自动完成

一、查询建议介绍 1. 查询建议是什么&#xff1f; 查询建议&#xff0c;为用户提供良好的使用体验。主要包括&#xff1a; 拼写检查&#xff1b; 自动建议查询词&#xff08;自动补全&#xff09; 拼写检查如图&#xff1a; 自动建议查询词&#xff08;自动补全&#xff09;…

Rust之构建命令行程序(二):读取文件

开发环境 Windows 10Rust 1.74.1 VS Code 1.85.1 项目工程 这次创建了新的工程minigrep. 读取文件 现在&#xff0c;我们将添加读取file_path参数中指定的文件的功能。首先&#xff0c;我们需要一个样本文件来测试它:我们将使用一个包含少量文本的文件&#xff0c;多行包含一…

使用Python实现Linux惠尔顿上网认证客户端

在本文中&#xff0c;我们将展示如何使用Python编写一个简单的脚本来实现Linux下的惠尔顿上网认证。以下是我们需要的参数和值&#xff1a; wholeton_host: 惠尔顿服务器地址&#xff0c;例如 192.168.10.10wholeton_user: 用户名&#xff0c;例如 AABBCCwholeton_pass: 密码&…

【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V2模型算法详解

【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V2模型算法详解 文章目录 【图像分类】【深度学习】【轻量级网络】【Pytorch版本】ShuffleNet_V2模型算法详解前言ShuffleNet_V2讲解四条实用指导思想G1:相等的通道宽度可以降低存储访问成本G2:大量的分组卷积…

uboot学习及内核更换_incomplete

官方文档 在前面 文章目录 uboot常见命令学习环境变量网络控制台uboot标准启动其他 升级uboot或内核bin和uimg以及booti和bootm的区别制作uImage更换内核更换uboot后续计划 uboot常见命令学习 环境变量 Environment Variables环境变量 autostart 如果值为yes&#xff0c;则会…