Spark Streaming 概述及入门案例

一、介绍

1. 不同的数据处理

  • 从数据处理的方式:
    • 流式数据处理(Streaming)
    • 批量数据处理(Batch)
  • 从数据处理的延迟:
    • 实时数据处理(毫秒级别)
    • 离线数据处理(小时或天级别)

2. 简介

  • SparkStreaming 是一个准实时(秒或分钟级别)、微批量的数据处理框架
  • SparkStreaming 支持的很多数据输入源,如: Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等。数据输入后可以用 Spark 的高度抽象原语,如: map、 reduce、 join、 window 等进行运算。结果能保存在很多地方,如 HDFS,数据库等
  • SparkStreaming 使用离散化流 (discretized stream) 作为抽象表示,称为 DStream,它是对 RDD 在实时数据处理场景的一种封装

3. 特点

  • 易用
  • 容错
  • 易整合到 Spark 体系

二、基本架构

在这里插入图片描述

1. 背压机制

  • Spark 1.5 以前版本:通过设置静态配制参数 spark.streaming.receiver.maxRate 来限制 Receiver 的数据接收速率,来解决生产和消费速率不对等造成的内存溢出等问题,但当数据生产和数据消费的能力都高于 maxRate 时会造成资源利用率下降等问题
  • Spark 1.5 版本及以后版本:为了动态控制数据接收速率来适配集群数据处理能力,引入了背压机制 (Spark Streaming Backpressure),即根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率
  • 通过属性 spark.streaming.backpressure.enabled 来配置启用 backpressure 机制,默认值为 false,即不启用

三、入门 WordCount 案例

需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数

1. 引入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version>
</dependency>

2. 代码实现

object SparkStreamingWC {def main(args: Array[String]): Unit = {// 1.创建 SparkStreaming 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")/*创建 StreamingContext 对象需要传递两个参数1.SparkConf:配置对象2.Duration:批处理的周期,即数据采集周期,单位为毫秒,内置有 Seconds/Minute 等对象 */val ssc = new StreamingContext(conf, Seconds(3))// 2.逻辑处理val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val words = line.flatMap(_.split(" "))val wordAsOne = words.map((_, 1))val wordCount: DStream[(String, Int)] = wordAsOne.reduceByKey(_ + _)wordCount.print()// 3.运行采集器并等待关闭/*采集器是一个长期运行的任务,所以不能关闭 ssc,也不能让 main 方法执行完毕*/ssc.start()ssc.awaitTermination()}
}

3. 测试

  • 打开 cmd 命令窗口,执行 nc -lp 9999 命令(Linux 下为 nc -lk 999)
  • 运行程序 main 方法
  • 在窗口中输入测试字符串(以空格分隔),观察程序命令行输出结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/22330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Red Hat Enterprise Linux 9上使用Docker快速安装并部署RocketMQ

在Red Hat Enterprise Linux 9上快速安装和部署RocketMQ可以按照以下步骤进行&#xff1a; 1. 安装Docker 首先&#xff0c;确保Docker已经安装在你的系统上。 更新系统包并安装依赖项&#xff1a; sudo yum update -y sudo yum install -y yum-utils device-mapper-persiste…

2024年5月份面试总结

2024年5月份找工作/面试总结&#xff1a; 本人前段时间写了刚过完年后的一个月内找工作的情况&#xff0c;请查看https://blog.csdn.net/zgaoq/article/details/136236788?spm1001.2014.3001.5501 但是后续写的总结被和谐了&#xff0c;不知道这篇文章能不能发出来。 1、5月份…

系统架构设计师【第19章】: 大数据架构设计理论与实践 (核心总结)

文章目录 19.1 传统数据处理系统存在的问题19.2 大数据处理系统架构分析19.2.1 大数据处理系统面临挑战19.2.2 大数据处理系统架构特征 19.3 Lambda架构19.3.1 Lambda架构对大数据处理系统的理解19.3.2 Lambda架构应用场景19.3.3 Lambda架构介绍19.3.4  Lambda架构的实…

数据库的换行符到前端不展示了

是这样的原本数据库中的数据都是带有\n换行符的但是页面却一直不展示 解决办法 <el-drawer title"预览" :visible.sync"drawer" :with-header"false"><div v-for"(item, index) in cardArray" :key"index"><…

如何将 Vue 应用程序部署到 Cloudflare Pages

在现代 Web 开发中&#xff0c;Vue.js 已经成为了一个非常受欢迎的前端框架。它的简洁、高效和灵活性使得开发人员可以轻松构建出色的用户界面和交互体验。而 Cloudflare Pages 提供了一个简单而强大的方式来托管和部署静态网站和应用程序。本文将介绍如何将 Vue 应用程序部署到…

Android常见内存泄漏场景总结

一、非静态内部类造成的内存泄漏 造成原因&#xff1a;非静态内部类默认会持有外部类的引用&#xff0c;如果内部类的生命周期超过了外部类就会造成内存泄漏。 场景&#xff1a;当Activity销毁后&#xff0c;由于内部类中存在异步耗时任务还在执行&#xff0c;导致Activity实…

[补题记录]Leetcode 3.无重复字符的最长子串

传送门&#xff1a;无重复字符的最长子串 Problem/题意 给一个由英文、数字、符号、空格组成的字符串&#xff0c;找出其中不含有重复字符的最长子串的长度。 比如&#xff1a;abb 包含了重复字符 b&#xff1b;abc 没有包含重复字符。 注意是子串&#xff0c;不是子序列。 …

内网安全:横向传递攻击(PTH || PTK || PTT 哈希票据传递)

内网安全&#xff1a;横向传递攻击. 横向移动就是在拿下对方一台主机后&#xff0c;以拿下的那台主机作为跳板&#xff0c;对内网的其他主机再进行后面渗透&#xff0c;利用既有的资源尝试获取更多的凭据、更高的权限&#xff0c;一步一步拿下更多的主机&#xff0c;进而达到控…

CodeMirror 创建标签计算编辑器

在日常开发中对于一些数据计算场景可能会遇到标签计算的需求&#xff0c;下面关于如何使用CodeMirror实现标签计算编辑功能。 1&#xff0c;结果图 2&#xff0c;主体代码逻辑 大家只需要复制粘贴主要codeMirror使用逻辑即可 <template><el-dialogref"dialogRe…

抖店商家疑惑,自然流量突然下滑,为什么呢?

大家好&#xff0c;我是喷火龙。 很多的抖店商家会遇到一种情况&#xff0c;那就是自己店铺的流量好好的&#xff0c;不知道怎么的就突然没流量了&#xff0c;各方面的数据都断崖式的下降。 为什么会这样呢&#xff1f;原因有以下几点&#xff0c;大家可以检查一下&#xff0…

低代码和零代码软件时代质量管理(QM)和质量管理系统(QMS)

【前言】 质量控制过程的目的是为了确保产品的制造标准得到保持和改进。质量控制过程使公司能够满足客户的期望&#xff0c;同时确保产品质量的一致水平。采用这些标准创造了一种公司文化&#xff0c;鼓励所有员工努力实现高质量的生产标准。低代码和零代码软件可以成为质量控…

【网络通信层】华为云连接MQTT设备

本文介绍华为云设备连接到设备的操作。 目录 一、在华为云创建设备 二、连接MQTT 三、通信 一、在华为云创建设备 现在华为云上可以免费使用部分受限服务&#xff0c;包括免费创建自己的设备连接。 首先&#xff0c;登录华为云平台共建智能世界云底座-华为云 (huaweicl…

徐州服务器机柜租用的好处

随着服务器的广泛应用&#xff0c;越来越多的企业都选择服务器托管和租用等服务&#xff0c;在选择服务器租用之前我们还需要进行机柜租用&#xff0c;便于放置所适用的服务器&#xff0c;那么企业选择徐州服务器机柜租用的好处有哪些呢&#xff1f; 选择徐州服务器机柜租用&am…

Qt Window Dialog 无标题栏 ,无边框,可拖动

1.效果&#xff1a; 2. 主要实现步骤&#xff1a; 设置窗口 flag&#xff1a; this->setWindowFlags(Qt::FramelessWindowHint | Qt::WindowStaysOnTopHint); 创建变量存储位置 QPoint m_dragPosition; 对鼠标左键按下和移动事件做处理 void DraggableDialog::mousePre…

Java 集合中的组内平均值计算

在Java开发中&#xff0c;集合&#xff08;Collection&#xff09;是一个重要的数据结构&#xff0c;广泛应用于各种场景。计算集合中的组内平均值是一个常见的操作&#xff0c;尤其是在数据分析、统计和处理时更为重要。本文将深入探讨如何使用Java来计算集合中的组内平均值&a…

Web 页面性能衡量指标-以用户为中心的效果指标

Web 页面性能衡量指标-以用户为中心的性能指标 以用户为中心的性能指标是理解和改进站点体验的关键点 一、以用户为中心的性能指标 1. 指标是用来干啥的&#xff1f; 指标是用来衡量性能和用户体验的 2. 指标类型 感知加载速度&#xff1a;网页可以多快地加载网页中的所有…

如何在vs code中安装JavaFX

目录 下载JavaFX 配置vs code工程 编写测试代码 下载JavaFX 网站链接:https://openjfx.io 选择如下的版本

从1.0到4.0,看看你公司的费控模式是第几代?

早在2021年9月&#xff0c;艾媒咨询在《2021H1企业费控报销服务专题研究报告》中&#xff0c;第一次对企业费用管控模式的进化历程进行了清晰的划代&#xff1a;1.0手工模式、2.0网报模式、3.0移动报销模式、4.0智能费控模式。 2022年&#xff0c;在《中国企业费用管理发展白皮…

vr样板房实景漫游展示制作解决了地产商难题

家具和软装销售中&#xff0c;如何直观展示产品优势一直是老板们的难题。口头描述往往难以让客户真正感受到产品的独特之处&#xff0c;这不仅影响了销售效果&#xff0c;也增加了沟通的难度。但现在&#xff0c;我们有了全新的解决方案——样板房VR全景编辑软件! 样板房VR全景…

精打细算:可燃气体报警器检验收费的合理规划与管理

随着工业化的快速发展&#xff0c;可燃气体报警器已经成为各类工业场所不可或缺的安全设备。 它的主要功能是在可燃气体浓度超标时发出警报&#xff0c;有效预防和减少火灾、爆炸等安全事故的发生。 然而&#xff0c;为了确保报警器能够持续、准确地发挥作用&#xff0c;定期…