Spark-streaming(一)

Spark-Streaming概述

Spark Streaming 用于流式数据的处理。

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。

DStream 是随时间推移而收到的数据的序列。

Spark-Streaming的特点:易用、容错、易整合到spark体系。

Spark-Streaming架构

DStream实操

案例:词频统计

idea中运行

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object wordcount {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 streamingval sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")// 创建 StreamingContext 对象,设置批处理间隔为 3 秒val ssc = new StreamingContext(sparkConf, Seconds(3))// 从指定的主机和端口接收文本流数据val lineStreams = ssc.socketTextStream("node01", 9999)// 将每行文本拆分为单词val wordStreams = lineStreams.flatMap(_.split(" "))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStreams = wordStreams.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)// 打印每个批次中每个单词的计数结果wordAndCountStreams.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()}
}

在虚拟机中输入: nc -lk 9999   并输入数据

结果:

解析:

对数据的操作也是按照 RDD 为单位来进行的

计算过程由 Spark Engine 来完成

DStream 创建

RDD队列

案例:

循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutableobject RDD {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 RDDStreamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")// 创建 StreamingContext 对象,设置批处理间隔为 4 秒val ssc = new StreamingContext(sparkConf, Seconds(4))// 创建一个可变队列,用于存储 RDDval rddQueue = new mutable.Queue[RDD[Int]]()// 从队列中创建输入流,oneAtATime 为 false 表示可以同时处理多个 RDDval inputStream = ssc.queueStream(rddQueue, oneAtATime = false)// 将输入流中的每个元素映射为 (元素, 1) 的键值对val mappedStream = inputStream.map((_, 1))// 按键对键值对进行聚合,统计每个键的出现次数val reducedStream = mappedStream.reduceByKey(_ + _)// 打印每个批次中每个键的计数结果reducedStream.print()// 启动流式计算ssc.start()// 循环 5 次,每次向队列中添加一个 RDD,并休眠 2 秒for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}// 等待计算终止ssc.awaitTermination()}
}

运行结果:

自定义数据源

自定义数据源

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverimport scala.util.control.NonFatalclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {new Thread("Socket Receiver") {override def run(): Unit = {receive()}}.start()}def receive(): Unit = {var socket: Socket = nullvar reader: BufferedReader = nulltry {socket = new Socket(host, port)reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var input: String = reader.readLine()while (!isStopped() && input != null) {store(input)input = reader.readLine()}} catch {case NonFatal(e) =>restart("Error receiving data", e)} finally {if (reader != null) {try {reader.close()} catch {case NonFatal(e) =>println(s"Error closing reader: ${e.getMessage}")}}if (socket != null) {try {socket.close()} catch {case NonFatal(e) =>println(s"Error closing socket: ${e.getMessage}")}}}restart("Restarting receiver")}override def onStop(): Unit = {}
}    

使用自定义的数据源采集数据

object sparkConf {def main(args: Array[String]): Unit = {try {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 streamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")// 创建 StreamingContext 对象,设置批处理间隔为 5 秒val ssc = new StreamingContext(sparkConf, Seconds(5))// 使用自定义 Receiver 创建输入流val lineStream = ssc.receiverStream(new CustomerReceiver("node01", 9999))// 将每行文本拆分为单词val wordStream = lineStream.flatMap(_.split(" "))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStream = wordStream.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)// 打印每个批次中每个单词的计数结果wordAndCountStream.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()
}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/902786.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CS144 Lab 6 实战记录:构建 IP 路由器

1 实验背景与目标 在 CS144 的 Lab 6 中&#xff0c;我们需要在之前实现的 NetworkInterface&#xff08;Lab 5&#xff09;基础上构建一个完整的 IP 路由器。路由器的主要任务是根据路由表将接收到的 IP 数据报转发到正确的网络接口&#xff0c;并发送给正确的下一跳&#xf…

【网络安全】社会工程学策略

1. 社会工程学简介 社会工程攻击是威胁行为者常用的攻击方式。这是因为&#xff0c;诱骗人们提供访问权限、信息或金钱通常比利用软件或网络漏洞更容易。 您可能还记得&#xff0c;社会工程学是一种利用人为错误来获取私人信息、访问权限或贵重物品的操纵技术。它是一个涵盖性…

【含文档+PPT+源码】基于SpringBoot的开放实验管理平台设计与实现

项目介绍 本课程演示的是一款基于SpringBoot的开放实验管理平台设计与实现&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统…

鸿蒙NEXT开发定位工具类 (WGS-84坐标系)(ArkTs)

import geoLocationManager from ohos.geoLocationManager; import { BusinessError, Callback } from ohos.base; import { LogUtil } from ./LogUtil; import { PermissionUtil } from ./PermissionUtil; import { map, mapCommon } from kit.MapKit; /*** 定位工具类 (WGS-8…

SSM从入门到上手-全面讲解SSM框架的使用.

一、SSM框架整合 将Spring、Spring MVC和MyBatis结合在一起&#xff0c;形成一个高效且易于维护的Web应用程序架构。具体整合的方式如下&#xff1a; Spring管理Bean&#xff1a;Spring负责管理所有的Java对象&#xff0c;包括Service层、DAO层等。通过Spring的IoC容器进行依赖…

学员答题pk知识竞赛小程序怎么做

制作学员答题PK知识竞赛小程序&#xff0c;主要有以下步骤&#xff1a; 一、规划设计 明确需求&#xff1a;确定小程序的使用场景是校园知识竞赛、培训机构考核还是企业内部培训等。答题功能&#xff0c;规定答题的具体规则&#xff0c;包括题目类型&#xff08;单选、多选、…

视频分析设备平台EasyCVR视频技术驱动下,监控上墙全组件解析与组网应用方案

随着数字化进程的加速推进&#xff0c;视频监控技术在工业、商业、社区等诸多领域得到了广泛应用。尽管不同场景对监控功能的具体需求存在差异&#xff0c;但底层硬件架构具有显著的共性特征。实际部署中&#xff0c;仅需依据网络环境等实际情况&#xff0c;灵活调整设备的连接…

idea使用docker插件一键部署项目

一、首先保证我们电脑上已经安装了docker docker -v查看docker版本&#xff0c;如果不能识别&#xff0c;需要先下载docker destop&#xff0c;在官网下载正常安装即可。 安装成功就可以使用docker 命令了 二、idea下载docker插件并配置docker参数 我是通过tcp连接docker服务…

SQL Tuning Advisor

什么是SQL Tuning Advisor STA可以用来优化那些已经被发现的高负载SQL. 默认情况下, Oracle数据库在自动维护窗口中自动认证那些有问题的SQL并且执行优化建议&#xff0c;找寻提升高负载SQL执行计划性能的方法. ** 如何查看自动优化维护窗口产生的报告? ** SQL> set ser…

uniapp-商城-31-shop页面中的 我的订单

前面的章节讲了很多关于页面 布局 的知识。 现在来看看其他栏目&#xff0c;我的订单页面。 1 页面样式图 基本的样式包含shop页面 我的订单 点击我的订单&#xff0c;跳转到订单页面 点击订单的每一条订单&#xff0c;跳转到订单详情 2、创建订单页面 2.1 创建sub页面文件…

深入探讨JavaScript性能瓶颈与优化实战指南

JavaScript作为现代Web开发的核心语言,其性能直接影响用户体验与业务指标。随着2025年前端应用的复杂性持续增加,性能优化已成为开发者必须掌握的核心技能。本文将从性能瓶颈分析、优化策略、工具使用三个维度,结合实战案例,系统梳理JavaScript性能优化的关键路径。 一、Ja…

基于AI与drawio的图表生成技术及其在学术研究中的应用前景分析

一、研究背景与冲突 在当今数字化时代&#xff0c;学术研究与信息传播的方式发生了深刻变革。随着数据量的爆炸式增长以及研究内容的日益复杂&#xff0c;高效、精准地呈现研究成果变得至关重要。图表作为一种直观、简洁且信息承载量大的表达方式&#xff0c;在学术研究中扮演着…

uniapp 仿小红书轮播图效果

通过对小红书的轮播图分析&#xff0c;可得出以下总结&#xff1a; 1.单张图片时容器根据图片像素定高 2.多图时轮播图容器高度以首图为锚点 3.比首图长则固高左右留白 4.比首图短则固宽上下留白 代码如下&#xff1a; <template><view> <!--轮播--><s…

【ORACLE】记录一些ORACLE的merge into语句的BUG

【ORACLE】记录一些ORACLE的merge into语句的BUG 一、自相矛盾-DML重启动行为差异,违反acid原则 发现版本&#xff1a;10g ~ 23ai 这个用例在我之前的文章里有提过&#xff0c;ORACLE和PG系关于并发事务行为有一个非常大的差异&#xff0c;就是ORACLE在某些并发冲突的场景下会…

2025上海车展:光峰科技全球首发“灵境”智能车载光学系统

当AI为光赋予思想&#xff0c;汽车将会变成什么样&#xff1f;深圳光峰科技为您揭晓答案。 2025年4月23日&#xff0c;在刚刚开幕的“2025上海车展”上&#xff0c;全球领先的激光核心器件公司光峰科技举办了主题为“AI光影盛宴&#xff0c;智享未来出行”的媒体发布会&#x…

密码学的hash函数,哈希碰撞, collision resistance, BTC用到的SHA-256简介

密码学中的哈希函数、哈希碰撞、抗碰撞性&#xff08;collision resistance&#xff09;以及比特币中使用的 SHA-256 的简明介绍&#xff1a; &#x1f9e9; 一、哈希函数&#xff08;Hash Function&#xff09; 定义&#xff1a; 哈希函数是一种将任意长度的输入&#xff08;…

unity TEngine学习4

上一篇我们学习了UI部分&#xff0c;这一篇我们学习其他部分&#xff0c;按照老规矩还是先打开官方文档 ResourceModule 在官方文档里介绍了当前加载的设置&#xff0c;但是我们是小白看不懂&#xff0c;那就不管他内部怎么实现的&#xff0c;我们主要看下面的代码给的方法&am…

【AI训练环境搭建】在IDE(Pycharm或VSCode)上使用WSL2+Ubuntu22.04+Conda+Tensorflow+GPU进行机器学习训练

本次实践将在IDE&#xff08;Pycharm或VSCode&#xff09;上使用WSL2Ubuntu22.04TensorflowGPU进行机器学习训练。基本原理是在IDE中拉起WSL2中的Python解释器&#xff0c;并运行Python程序。要运行CondaTensorflowGPU你可能需要进行以下准备工作。 1. 此示例中将使用一个mnis…

【华为OD机试真题E卷】521、 机器人可活动的最大网格点数目 | 机试真题+思路参考+代码解析(E卷复用)(C++)

文章目录 一、题目题目描述输入输出样例1 一、代码与思路&#x1f9e0;C语言思路✅C代码 一、题目 参考链接&#xff1a;https://sars2025.blog.csdn.net/article/details/141748083 题目描述 现有一个机器人口&#xff0c;可放置于MxN的网格中任意位置&#xff0c;每个网格包…

windows端远程控制ubuntu运行脚本程序并转发ubuntu端脚本输出的网页

背景 对于一些只能在ubuntu上运行的脚本&#xff0c;并且这个脚本会在ubuntu上通过网页展示运行结果。我们希望可以使用windows远程操控ubuntu&#xff0c;在windows上查看网页内容。 方法 start cmd.exe /k "sshpass -p passwd ssh namexxx.xxx.xxx.xxx "cd /hom…