龙华区住房和建设局官方网站/收录查询api

龙华区住房和建设局官方网站,收录查询api,外网浏览入口,汕尾网站建设SparkStreaming调优 一 、要点 4.1 SparkStreaming运行原理 深入理解 4.2 调优策略 4.2.1 调整BlockReceiver的数量 案例演示: object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf new SparkConf().setAppName("Networ…

SparkStreaming调优

一 、要点

4.1 SparkStreaming运行原理

在这里插入图片描述

深入理解

在这里插入图片描述

4.2 调优策略

4.2.1 调整BlockReceiver的数量

在这里插入图片描述

案例演示:

object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("NetworkWordCount")val sc = new SparkContext(sparkConf)// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sc, Seconds(5))//创建多个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)val lines = lines1.union(lines2)lines.repartition(100)//处理的逻辑,就是简单的进行word countval words = lines.repartition(100).flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))//将结果输出到控制台wordCounts.print()//启动Streaming处理流ssc.start()//等待Streaming程序终止ssc.awaitTermination()ssc.stop(false)}
}
⭐️4.2.2 调整Block的数量

batchInterval : 触发批处理的时间间隔
blockInterval :将接收到的数据生成Block的时间间隔,spark.streaming.blockInterval(默认是200ms),那么,BlockRDD的分区数 = batchInterval / blockInterval,即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10,如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval,blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了

4.2.3 调整Receiver的接受速率

pps:permits per second 每秒允许接受的数据量(QPS -> queries per second)
Spark Streaming默认的PPS是没有限制的,可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue

⭐️4.2.3 调整数据处理的并行度

BlockRDD的分区数

a. 通过Receiver接受数据的特点决定

b. 也可以自己通过repartition设置

ShuffleRDD的分区数

a. 默认的分区数为spark.default.parallelism(core的大小)

b. 通过我们自己设置决定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 数据的序列化

SparkStreaming两种需要序列化的数据:
a. 输入的数据:默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中
b. 缓存的数据:默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
使用Kryo序列化机制,比Java序列化机制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 内存调优
(1)需要内存大小

和transformation的类型有关,如果使用的是updateStateByKey,Window这样的算子,那么内存就要设置得偏大

(2)数据存储级别

如果把接收到的数据设置的存储级别是MEMORY_DISK这种级别,也就是说如果内存不够可以把数据存储到磁盘上,其实性能还是不好的,性能最好的就是所有的数据都在内存里面,所以如果在资源允许的情况下,把内存调大一点,让所有的数据都存在内存里面。

4.2.6 Outout性能
(1)MySQL,HBase

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(2)Kafka(0.8版本)

虽然现在的Kafka的版本已经到2.x版本了,但是很多公司因为历史遗留的原因,公司里面还是会有0.8x的Kafka。比如本人公司里面有两个Kafka集群,一个是0.8x的kafka,一个是1.x的Kafka。开发的时候有时候需要我们使用SparkStreaming做实时的ETL,然后再把数据打回Kafka,0.8版本的kafka默认是没有批量提交的功能的。本人公司里面一个真实的案例,一位同学写的SparkStreaming程序将数据处理完了以后通过ForeachRDD把数据写回到0.8Kafka。但是数据处理得很慢,经常会收到延时告警。最终发现他把数据写到Kafka的时候是一条数据一条数据提交的性能很差。最终手动实现了批量提交的功能。从此再也没有收到过告警。

4.2.7 Backpressure(压力反馈)

在这里插入图片描述
在这里插入图片描述

Feedback Loop : 动态使得Streaming app从unstable状态回到stable状态

在这里插入图片描述

从Spark1.5版本开始:spark.streaming.backpressure.enabled = true

4.2.8 Elastic Scaling(资源动态分配)

动态分配资源:

批处理动态的决定这个application中需要多少个Executors:

  1. 当一个Executor空闲的时候,将这个Executor杀掉
  2. 当task太多的时候,动态的启动Executors

Streaming分配Executor的原则是比对 process time / batchInterval 的比率

在这里插入图片描述

如果延迟了,那么就自动增加资源

在这里插入图片描述

在这里插入图片描述

从Spark2.0有这个功能:: spark.streaming.dynamicAllocation.enabled = true

⭐️4.2.8 数据倾斜调优(重要)

因为SparkStreaming的底层就是RDD,之前SparkCore的所有的数据倾斜的调优策略(见Spark之数据倾斜调优)都适合于SparkStreaming,需要灵活掌握,在实际开发的工作当中用得频率较高。

二 、总结

面试问题:你在工作当中有SparkStreaming调优过项目吗?怎么调优的?效果怎么样?

  1. 比如举foreachRDD的例子
  2. 比如举个数据倾斜的例子
  3. 用Xmind整理调优的策略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/71601.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Gauss数据库omm用户无法连接处理

确保gauss数据库服务已经打开 重启gauss服务 gs_om -t restart 连接gauss gsql -d postgres -p 26000 -r 结果发现 查看数据库运行情况 gs_om -t status --detail 我们可以看到 cluster_state 的值是 Unavailable 不可用 那么问题大概率是出现在了这里 然后我们再查看一…

计算机毕业设计Python+DeepSeek-R1大模型医疗问答系统 知识图谱健康膳食推荐系统 食谱推荐系统 医疗大数据(源码+LW文档+PPT+讲解)

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

数字体验推荐TOP8提升用户参与

数字内容体验推荐核心优势 在数字化竞争日益激烈的市场环境中,数字内容体验的差异化优势已成为企业突围的关键。通过智能算法驱动的个性化推荐系统,能够精准捕捉用户行为轨迹与兴趣偏好,实现内容与受众的动态匹配。这种技术不仅显著提升页面…

redis数据迁移教程(使用RedisShake实现不停机迁移十分便捷)

1.我的场景 需要把本地的redis数据上传到阿里云服务器上面,服务器上redis并没有开aof持久化,但是将rdb文件上传至服务器后每次重启redis,rdb文件会被覆盖导致无法同同步数据,最终决定使用RedisShake 2.RedisShake介绍 什么是 RedisShake​ RedisShake 是一个用于处理和迁移…

C语言_数据结构总结4:不带头结点的单链表

纯C语言代码,不涉及C 0. 结点结构 typedef int ElemType; typedef struct LNode { ElemType data; //数据域 struct LNode* next; //指针域 }LNode, * LinkList; 1. 初始化 不带头结点的初始化,即只需将头指针初始化为NULL即可 void Init…

【数据集】社区天气资讯网络CoWIN-香港小时尺度气象数据(含MATLAB处理代码)

社区天气资讯网络CoWIN-香港小时尺度气象数据 数据概述气象变量说明数据提取(MATLAB全代码)输出WRF所需站点气温数据参考数据概述 官网-Community Weather Information Network (CoWIN) data policy CoWIN 提供 2010 - 2024 年 的数据下载,每年数据均可单独下载。下载数据…

尚硅谷爬虫note15

一、当当网 1. 保存数据 数据交给pipelines保存 items中的类名: DemoNddwItem class DemoNddwItem(scrapy.Item): 变量名 类名() book DemoNddwItem(src src, name name, price price)导入: from 项目名.items import 类…

LVGL直接解码png图片的方法

通过把png文件解码为.C文件,再放到工程中的供使用,这种方式随时速度快(应为已经解码,代码中只要直接加载图片数据显示出来即可),但是不够灵活,适用于哪些简单又不经常需要更换UI的场景下使用。如…

Apache XTable:在数据湖仓一体中推进数据互作性

Apache XTable 通过以多种开放表格式提供对数据的访问,在增强互作性方面迈出了一大步。移动数据很困难,在过去,这意味着在为数据湖仓一体选择开放表格式时,您被锁定在该选择中。一个令人兴奋的项目当在数据堆栈的这一层引入互作性…

anolis8.9-k8s1.32-node-二进制部署

一、系统 # cat /etc/anolis-release Anolis OS release 8.9 # uname -r 5.10.134-18.an8.x86_64 二、从master上拷贝dockers及cri-docker相关文件 # groupadd docker # mkdir /etc/docker# scp -P 4033 root192.168.7.201:/etc/systemd/system/containerd.service /etc/s…

《AJAX:前端异步交互的魔法指南》

什么是AJAX AJAX(Asynchronous JavaScript and XML,异步 JavaScript 和 XML) 是一种用于创建异步网页应用的技术,允许网页在不重新加载整个页面的情况下,与服务器交换数据并局部更新页面内容。尽管名称中包含 XML&…

利用 requestrepo 工具验证 XML外部实体注入漏洞

1. 前言 在数字化浪潮席卷的当下,网络安全的重要性愈发凸显。应用程序在便捷生活与工作的同时,也可能暗藏安全风险。XXE(XML外部实体)漏洞作为其中的典型代表,攻击者一旦利用它,便能窃取敏感信息、掌控服务…

Spring 无法解决循环依赖的 5 种场景

一、构造器注入引发的循环依赖 1. 问题复现 Component public class ServiceA {private final ServiceB serviceB;Autowiredpublic ServiceA(ServiceB serviceB) { // 构造器注入this.serviceB serviceB;} }Component public class ServiceB {private final ServiceA servic…

Core Vision Kit(基础视觉服务)

文章目录 一、Core Vision Kit简介场景介绍约束与限制二、通用文字识别三、人脸检测一、Core Vision Kit简介 Core Vision Kit(基础视觉服务)是机器视觉相关的基础能力,例如通用文字识别(即OCR,Optical Character Recognition,也称为光学字符识别)、人脸检测、人脸比对…

第TR3周:Pytorch复现Transformer

🍨 本文为🔗365天深度学习训练营中的学习记录博客 🍖 原作者:K同学啊 Transformer通过自注意力机制,改变了序列建模的方式,成为AI领域的基础架构 编码器:理解输入,提取上下文特征…

FreeRTOS 任务间通信机制:队列、信号量、事件标志组详解与实验

1. FreeRTOS 消息队列 1.1 简介 ​ 队列是 任务间通信的主要形式,可用于在任务之间以及中断与任务之间传递消息。队列在 FreeRTOS 中具有以下关键特点: 队列默认采用 先进先出 FIFO 方式,也可以使用 xQueueSendToFront()实现 LIFO。FreeRT…

【虚拟化】Docker Desktop 架构简介

在阅读前您需要了解 docker 架构:Docker architecture WSL 技术:什么是 WSL 2 1.Hyper-V backend 我们知道,Docker Desktop 最开始的架构的后端是采用的 Hyper-V。 Docker daemon (dockerd) 运行在一个 Linux distro (LinuxKit build) 中&…

Unity光照之Halo组件

简介 Halo 组件 是一种用于在游戏中创建光晕效果的工具,主要用于模拟光源周围的发光区域(如太阳、灯泡等)或物体表面的光线反射扩散效果。 核心功能 1.光晕生成 Halo 组件会在光源或物体的周围生成一个圆形光晕,模拟光线在空气…

Flink深入浅出之01:应用场景、基本架构、部署模式

Flink 1️⃣ 一 、知识要点 📖 1. Flink简介 Apache Flink — Stateful Computations over Data StreamsApache Flink 是一个分布式大数据处理引擎,可对有界数据流和无界数据流进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以…

os-copilot安装和使用体验测评

简介: OS Copilot是阿里云基于大模型构建的Linux系统智能助手,支持自然语言问答、命令执行和系统运维调优。本文介绍其产品优势、功能及使用方法,并分享个人开发者在云服务器资源管理中的实际应用体验。通过-t/-f/管道功能,OS Cop…