解决Spark流处理产生的小文件问题

做流批一体,湖仓一体的大数据架构,常见的做法就是:

数据源->spark Streaming->ODS(数据湖)->spark streaming->DWD(数据湖)->...

那么数据源->spark Streaming->ODS,以这段为例,在数据源通过spark structured streaming写入ODS在数据湖(Delta Lake)落盘时候必然会产生很多小文件。

图片

1、目的

为了在批处理spark-sql运行更快,也避免因为小文件而导致报错。

2、影响

WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: 
Too many open files
  1. 小文件在批处理数据IO消耗巨大,程序可能卡死。

  2. 小文件块都有对应的元数据,元数据放在NameNode,导致需要的内存大大增大,增加NameNode压力,这样会限制了集群的扩展。

  3. 在HDFS或者对象储存中,小文件的读写处理速度要远远小于大文件,(寻址耗时)。

3、解决思路

3.1、事前

(1)避免写入时候产生过多小文件 做好分区partitionBy(年,月,日), 避免小文件过于分散 Trigger触发时间可以设置为1分钟,这样会攒一批一写入,避免秒级别写入而产生大量小文件(但是使用spark structured 想要做real-time不能这样,只适合做准实时)

(2)打开自适应框架的开关

spark.sql.adaptive.enabled true

(3)通过spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) //(true表示是否shuffle)
val rdd3 = rdd1.repartition(8)

coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,如果是生成一个窄依赖的结果,那么可以不发生shuffle,分区的数量发生激烈的变化,计算节点不足,不设置true可能会出错。

repartition:coalesce()方法shuffle为true的情况。

3.2、事后(小文件引起已经产生)

(1)优化 Delta 表的写入,避免小文件产生 在开源版 Spark 中,每个 executor 向 partition 中写入数据时,都会创建一个表文件进行写入,最终会导致一个 partition 中产生很多的小文件。

Databricks 对 Delta 表的写入过程进行了优化,对每个 partition,使用一个专门的 executor 合并其他 executor 对该 partition 的写入,从而避免了小文件的产生。

图片

该特性由表属性 delta.autoOptimize.optimizeWrite 来控制:可以在创建表时指定

CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

也可以修改表属性

ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

该特性有两个优点:

(1)通过减少被写入的表文件数量,提高写数据的吞吐量;

(2)避免小文件的产生,提升查询性能。

缺点:

其缺点也是显而易见的,由于使用了一个 executor 来合并表文件的写入,从而降低了表文件写入的并行度,此外,多引入的一层 executor 需要对写入的数据进行 shuffle,带来额外的开销。因此,在使用该特性时,需要对场景进行评估。

场景:

该特性适用的场景:频繁使用 MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT 等 SQL 语句的场景;

该特性不适用的场景:写入 TB 级以上数据。

(2)自动合并小文件

在流处理场景中,比如流式数据入湖场景下,需要持续的将到达的数据插入到 Delta 表中,每次插入都会创建一个新的表文件用于存储新到达的数据,假设每10s触发一次,那么这样的流处理作业一天产生的表文件数量将达到8640个,且由于流处理作业通常是 long-running 的,运行该流处理作业100天将产生上百万个表文件。这样的 Delta 表,仅元数据的维护就是一个很大的挑战,查询性能更是急剧恶化。

为了解决上述问题,Databricks 提供了小文件自动合并功能,在每次向 Delta 表中写入数据之后,会检查 Delta 表中的表文件数量,如果 Delta 表中的小文件(size < 128MB 的视为小文件)数量达到阈值,则会执行一次小文件合并,将 Delta 表中的小文件合并为一个新的大文件。

该特性由表属性 delta.autoOptimize.autoCompact 控制,和特性 delta.autoOptimize.optimizeWrite 相同,可以在创建表时指定,也可以对已创建的表进行修改。自动合并的阈值由 spark.databricks.delta.autoCompact.minNumFiles 控制,默认为50,即小文件数量达到50会执行表文件合并;

合并后产生的文件最大为128MB,如果需要调整合并后的目标文件大小,可以通过调整配置 spark.databricks.delta.autoCompact.maxFileSize 实现。

(3)手动合并小文件(我常用,每天定时运行合并分区内小文件,再去处理批任务)

自动小文件合并会在对 Delta 表进行写入,且写入后表中小文件达到阈值时被触发。除了自动合并之外,Databricks 还提供了 Optimize 命令使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。

在实现上 Optimize 使用 bin-packing 算法,该算法不但会合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我们要对 Delta 表 student 的表文件进行优化,仅需执行如下命令即可实现:(Optimize 命令不但支持全表小文件的合并,还支持特定的分区的表文件的合并)。

OPTIMIZE student WHERE date >= '2024-01-01'

4、附加

面试官可能会问,我运行optimize合并小文件,但是小文件太多了,直接卡死运行不了程序(某互联网面试题)

(1)首先停掉程序,这里注意deltalake因为有历史版本这个概念,所以不存在运行一半覆盖原来版本情况,可以基于上一个版本重新运行(考点)。

(2)第二点,大数据思想分而治之,“分”,即把复杂的任务分解为若干个“简单的任务”来处理。

OPTIMIZE student WHERE date > '2024-01-01' and date < '2024-01-02'

因为前面做了partitionby(年月日),那么缩小optimize范围,在遍历这个月的每一天日期,分治处理。

(3)第三点,大数据思想,自己不行找兄弟,加节点,加计算资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/23757.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Selenium+java环境配置】(超详细教程常见问题解决)

Seleniumjava环境配置 windows电脑环境搭建-chrome浏览器1. 下载chrome浏览器2. 查看chrome浏览器版本3. 下载chrome浏览器驱动4.配置系统环境变量PATH 验证环境是否搭建成功1. 创建java项目&#xff0c;添加pom文件中添加依赖2. 编写代码运行 常见问题&解决办法1.访问失败…

移动端 UI 风格,魅力无限

移动端 UI 风格&#xff0c;打造极致体验

Django分页

1、在视图函数文件中引入‘分页器’ from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger 2、给原来的罗列信息函数&#xff0c;添加分页功能&#xff0c;即按照页码&#xff0c;只返回部分信息。 login_required def article_list(request):article…

【因果推断python】21_匹配2

目录 匹配估计器 匹配估计器 子分类估计器在实践中用得不多&#xff08;我们很快就会明白为什么&#xff0c;主要是因为维度诅咒这个原因&#xff09;&#xff0c;但它让我们很好地、直观地了解了因果推理估计器应该做什么&#xff0c;以及它应该如何控制混淆因素。这使我们能…

Yuan 2.0-M32 是一个基于 Yuan 2.0 架构的双语混合专家 (MoE) 语言模型,旨在以更少的参数和计算量实现更高的准确率

主要创新点&#xff1a; 注意力路由器 (Attention Router): 提出了一种新的路由器网络&#xff0c;考虑了专家之间的相关性&#xff0c;从而提高了模型的准确率。高效计算&#xff1a; 使用 MoE 架构&#xff0c;40B 总参数中仅有 3.7B 激活参数&#xff0c;训练计算消耗仅为同…

大模型创新企业集结!百度智能云千帆AI加速器Demo Day启动

新一轮技术革命风暴席卷而来&#xff0c;为创业带来源源不断的创新动力。过去一年&#xff0c;在金融、制造、交通、政务等领域&#xff0c;大模型正从理论到落地应用&#xff0c;逐步改变着行业的运作模式&#xff0c;成为推动行业创新和转型的关键力量。 针对生态伙伴、创业…

IDEA破解后的配置

以下所有操作都要求进入全局setting而不是某一个项目的setting 进入全局Setting File→close project 进入欢迎页面 低版本 然后点击Setting 关闭自动更新 不关闭有可能会破解失败 Appearance & Behavior->System Settings->Updates下取消Automatically chec…

debian系统apt 国内安装源

debian系统apt 国内安装源&#xff1a; 国内阿里镜像源&#xff1a; deb http://mirrors.aliyun.com/debian stable main non-free contrib deb-src http://mirrors.aliyun.com/debian stable main non-free contrib 打开源文件位置&#xff1a;/etc/apt/sources.list,原来的内…

eNSP学习——RIP路由协议的汇总

目录 主要命令 原理概述 实验目的 实验内容 实验拓扑 实验编址 实验步骤 1、基本配置 2、配置RIPv1协议 3、配置RIPv2自动汇总 4、配置RIPv2手动汇总 需要eNSP各种配置命令的点击链接自取&#xff1a;华为&#xff45;NSP各种设备配置命令大全PDF版_ensp配置命令大全…

蓝桥杯物联网竞赛 比赛总结

CUBEMX配置建议&#xff1a; 对于CUBEMX配置来说stm32l071kbu6的引脚不算太多&#xff0c;功能模块相对的也不多&#xff0c;所以我建议直接熟练到能将所有模块烂熟于心&#xff0c;不用看原理图就能熟练配置下来&#xff0c;因为国赛看原理图去配置太花费时间 我建议学习的时…

小程序 UI 风格,赏心悦目

小程序 UI 风格&#xff0c;赏心悦目

【云原生】Kubernetes----RBAC用户资源权限

目录 引言 一、Kubernetes安全机制概述 二、认证机制 &#xff08;一&#xff09;认证方式 1.HTTPS证书认证 1.1 证书颁发 1.2 config文件 1.3 认证类型 1.4 Service Account 1.4.1 作用 1.4.2 包含内容 1.4.3 与Secret的关系 2.Bearer Tokens 3.基本认证 三、鉴…

Java Web学习笔记17——Vue快速入门

什么是Vue&#xff1f; Vue是一套前端框架&#xff0c;免除原生JavaScript中的DOM操作&#xff0c;简化书写。 基于MVVM&#xff08;Model-View-ViewModel&#xff09;思想&#xff0c;实现数据的双向绑定&#xff0c;将编程的关注点放在数据上。 官网&#xff1a;https://v…

俯视角2D_玩家角色架构

玩家控制 玩家角色蓝图的精灵旋转和摄像机旋转角 1.因为是俯视角的游戏&#xff0c;因此相机和角色的精灵图需要调整为-90 ## 玩家输入 增强输入的映射 为玩家控制器引用增强输入的映射 在游戏模式中应用该玩家控制器 在玩家蓝图中应用输入映射并编写移动逻辑,(需要注意的是…

python-小游戏-弹球对决

python-小游戏-弹球对决 需要安装pygame 代码—game-Pong.py import pygame import random# Initialize pygame pygame.init()# Set up the screen WIDTH 600 HEIGHT 400 BALL_RADIUS 20 PAD_WIDTH 10 PAD_HEIGHT 80 WHITE (255, 255, 255) PURPLE (128, 0, 128) RED…

策略模式的理解和运用

在之前的小游戏项目中&#xff0c;处理websocket长连接请求的时候&#xff0c;需要根据传递数据包的不同类型&#xff0c;进行不同的处理。为了实现这个场景&#xff0c;比较简单的方法就是使用if-else或者switch-case语句&#xff0c;根据条件进行判断。但是这导致了项目代码复…

AI驱动下,需要重新审视比亚迪在电子制造领域的“新神话”?

自4月22日创下新低后&#xff0c;比亚迪电子&#xff08;00285.HK&#xff09;之后趋势走强&#xff0c;截至6月5日收盘&#xff0c;比亚迪电子股价一度突破年内最高价位37.35港元/股&#xff0c;最终收盘36.75港元/股。 区间29个交易日涨超55&#xff05;&#xff0c;远远优于…

OneDrive空间清理及文件历史版本查询

点击OneDrive图标 点击“在线查看” 点击“设置” 点击“OneDrive设置” 点击“其他设置” 点击“存储标准” 点击“文档” 选择需要操作的文件&#xff0c;点击“历史版本记录” 需要清理空间&#xff0c;可删除历史版本&#xff0c;需要使用历史版本&#xff0c;可还原历史版…

数据报表统计实现

目录 一&#xff1a;背景 二&#xff1a;实现过程 一&#xff1a;背景 最近需要开发一个数据统计的功能&#xff0c;主要是按照各种维度统计客户的数据&#xff0c;一般是按照日期来展示数量和变化情况。下面我们来梳理下实现的过程。 二&#xff1a;实现过程 1&#xff1a…

Postgresql中json和jsonb类型区别

在我们的业务开发中&#xff0c;可能会因为特殊【历史&#xff0c;偷懒&#xff0c;防止表连接】经常会有JSON或者JSONArray类的数据存储到某列中&#xff0c;这个时候再PG数据库中有两种数据格式可以直接一对多或者一对一的映射对象。所以我们也可能会经常用到这类格式数据&am…