合并spark structured streaming处理流式数据产生的小文件

备注:

By 远方时光原创,可转载,不能复制到其他平台

背景:做流批一体,湖仓一体的大数据架构,常见的做法就是

数据源->spark Streaming->ODS(数据湖)->spark streaming->DWD(数据湖)->...

那么数据源->spark Streaming->ODS,以这段为例,在数据源通过spark structured streaming写入ODS在数据湖(Delta Lake)落盘时候必然会产生很多小文件

目的:

为了在批处理spark-sql运行更快,也避免因为小文件而导致报错

影响:

WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: Too many open files
 

1.小文件在批处理数据IO消耗巨大,程序可能卡死

2.小文件块都有对应的元数据,元数据放在NameNode,导致需要的内存大大增大,增加NameNode压力,这样会限制了集群的扩展。

3.在HDFS或者对象储存中,小文件的读写处理速度要远远小于大文件,(寻址耗时)

解决思路:

事前:

1.避免写入时候产生过多小文件

  • 做好分区partitionBy(年,月,日), 避免小文件过于分散
  • Trigger触发时间可以设置为1分钟,这样会攒一批一写入,避免秒级别写入而产生大量小文件(但是使用spark structured 想要做real-time不能这样,只适合做准实时

2.打开自适应框架的开关

spark.sql.adaptive.enabled true

3.通过spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) //(true表示是否shuffle)
val rdd3 = rdd1.repartition(8)

coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,如果是生成一个窄依赖的结果,那么可以不发生shuffle,分区的数量发生激烈的变化,计算节点不足,不设置true可能会出错。
repartition:coalesce()方法shuffle为true的情况。

事后(小文件引起已经产生):

1:优化 Delta 表的写入,避免小文件产生

在开源版 Spark 中,每个 executor 向 partition 中写入数据时,都会创建一个表文件进行写入,最终会导致一个 partition 中产生很多的小文件。

Databricks 对 Delta 表的写入过程进行了优化,对每个 partition,使用一个专门的 executor 合并其他 executor 对该 partition 的写入,从而避免了小文件的产生。

该特性由表属性 delta.autoOptimize.optimizeWrite 来控制:

可以在创建表时指定

CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

也可以修改表属性

ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

该特性有两个优点:通过减少被写入的表文件数量,提高写数据的吞吐量;避免小文件的产生,提升查询性能。

其缺点也是显而易见的,由于使用了一个 executor 来合并表文件的写入,从而降低了表文件写入的并行度,此外,多引入的一层 executor 需要对写入的数据进行 shuffle,带来额外的开销。因此,在使用该特性时,需要对场景进行评估:

该特性适用的场景:频繁使用 MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT 等 SQL 语句的场景;

该特性不适用的场景:写入 TB 级以上数据。

2.自动合并小文件

在流处理场景中,比如流式数据入湖场景下,需要持续的将到达的数据插入到 Delta 表中,每次插入都会创建一个新的表文件用于存储新到达的数据,假设每10s触发一次,那么这样的流处理作业一天产生的表文件数量将达到8640个,且由于流处理作业通常是 long-running 的,运行该流处理作业100天将产生上百万个表文件。这样的 Delta 表,仅元数据的维护就是一个很大的挑战,查询性能更是急剧恶化。

为了解决上述问题,Databricks 提供了小文件自动合并功能,在每次向 Delta 表中写入数据之后,会检查 Delta 表中的表文件数量,如果 Delta 表中的小文件(size < 128MB 的视为小文件)数量达到阈值,则会执行一次小文件合并,将 Delta 表中的小文件合并为一个新的大文件。

该特性由表属性 delta.autoOptimize.autoCompact 控制,和特性 delta.autoOptimize.optimizeWrite 相同,可以在创建表时指定,也可以对已创建的表进行修改。自动合并的阈值由 spark.databricks.delta.autoCompact.minNumFiles 控制,默认为50,即小文件数量达到50会执行表文件合并;合并后产生的文件最大为128MB,如果需要调整合并后的目标文件大小,可以通过调整配置 spark.databricks.delta.autoCompact.maxFileSize 实现。

3:手动合并小文件(我常用,每天定时运行合并分区内小文件,再去处理批任务)

自动小文件合并会在对 Delta 表进行写入,且写入后表中小文件达到阈值时被触发。除了自动合并之外,Databricks 还提供了 Optimize 命令使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。在实现上 Optimize 使用 bin-packing 算法,该算法不但会合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我们要对 Delta 表 student 的表文件进行优化,仅需执行如下命令即可实现:(Optimize 命令不但支持全表小文件的合并,还支持特定的分区的表文件的合并

OPTIMIZE student WHERE date >= '2024-01-01'

附加:

面试官可能会问,我运行optimize合并小文件,但是小文件太多了,直接卡死运行不了程序(某互联网面试题)

回答:

1.首先停掉程序,这里注意deltalake因为有历史版本这个概念,所以不存在运行一半覆盖原来版本情况,可以基于上一个版本重新运行(考点)

2.第二点,大数据思想分而治之“分”,即把复杂的任务分解为若干个“简单的任务”来处理。

​OPTIMIZE student WHERE date > '2024-01-01' and date < '2024-01-02'

因为前面做了partitionby(年月日),那么缩小optimize范围,在遍历这个月的每一天日期,分治处理

3.第三点,大数据思想,自己不行找兄弟,加节点,加计算资源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/704993.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 实现页面导出A4标准大小的PDF文件,以及处理图片跨域不能正常展示的问题等

效果预览&#xff1a; 代码流程&#xff1a;首先在utils文件夹下创建htmlToPdf的js工具文件&#xff0c;然后在main.js中注册引用 htmlToPdf.js // 导出页面为PDF格式 import html2Canvas from html2canvas import JsPDF from jspdfexport default {install(Vue, options) {V…

hcia datacom课程学习(1):通信基础

1.总体框架 上图为发送方通过互联网传递信息给接收方的过程。 家用路由器会直接集成上图中的四层&#xff08;vlan&#xff0c;DHCP&#xff0c;静态路由&#xff0c;NAT&#xff0c;PPPoE&#xff09;。 2.网络性能指标 &#xff08;1&#xff09;带宽 单位时间内传输的数…

Linux 使用 SSH 传输文件

# 登录 ssh usernameip_address # 复制文件(远程-->本地) scp usernameip_address:/home/username/filename . # 复制文件(本地-->远程) scp filename usernameip_address:/home/username # 复制目录(本地-->远程) scp -r source_dir usernameip_address:/home/use…

独孤思维:这份付出,可以拿一辈子收益

学习&#xff0c;不能贪杯。 做副业&#xff0c;不能什么都做。 什么都学&#xff0c;就什么都学不会。 什么项目都做&#xff0c;就什么都赚不到钱。 这是一定的。 人的精力有限&#xff0c;时间有限&#xff0c;成本有限。 这一辈子&#xff0c;做好一件事就够了。 很…

解析Hadoop三大核心组件:HDFS、MapReduce和YARN

目录 HadoopHadoop的优势 Hadoop的组成HDFS架构设计Yarn架构设计MapReduce架构设计 总结 在大数据时代&#xff0c;Hadoop作为一种开源的分布式计算框架&#xff0c;已经成为处理大规模数据的首选工具。它采用了分布式存储和计算的方式&#xff0c;能够高效地处理海量数据。Had…

Redisson - 实现Java的Redis分布式和可扩展解决方案

Redisson - 实现Java的Redis分布式和可扩展解决方案 引言&#xff1a; 在现代的分布式系统中&#xff0c;缓存和数据存储扮演着至关重要的角色。Redis作为一种高性能的键值存储数据库&#xff0c;被广泛用于缓存、消息队列、实时数据分析等场景。然而&#xff0c;原生的Redis…

pod调度策略 标签管理 资源配额与限额 全局资源配额与限额策略,

打分也是基于可调度节点进行打分资源情况. 指定多个节点,会进行覆盖其之前节点名称 --- kind: Pod apiVersion: v1 metadata:name: myhttp spec:nodeName: node-0001 # 基于节点名称进行调度containers:- name: apacheimage: myos:httpd 基于节点名称的调度策略 标签与调…

数据可视化--了解数据可视化和Excel数据可视化

目录 1.1科学可视化&#xff1a; 可视化是模式、关系、异常 1.2三基色原理&#xff1a; 三基色:红色、绿色和蓝色 1.3Excel数据可视化 1.3.1 excel数据分析-13个图表可视化技巧 1.3.2 excel数据分析-28个常用可视化图表&#xff08;video&#xff09; 1.3.3Excel可视化…

SpringAMQP消息队列

安装RabbitMQ 在linux上安装RabbitMQ,并运行 docker run \-e RABBITMQ_DEFAULT_USERzywzy \-e RABBITMQ_DEFAULT_PASS123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-managementhttp://ip:15672 访问控制台, 用户名zywzy,密码123321 引入…

康复训练day2——2024牛客寒假集训营6

一道很好的构造题&#xff0c;受益匪浅。 链接&#xff1a;F-命运的抉择_2024牛客寒假算法基础集训营6 (nowcoder.com)​​​​​​ 题意&#xff1a; 题解 &#xff08;并查集 思维&#xff09;&#xff1a; 首先将存在1的情况特判掉&#xff0c;我们的数组的元素都是> 2的…

2024-02-26(Spark,kafka)

1.Spark SQL是Spark的一个模块&#xff0c;用于处理海量结构化数据 限定&#xff1a;结构化数据处理 RDD的数据开发中&#xff0c;结构化&#xff0c;非结构化&#xff0c;半结构化数据都能处理。 2.为什么要学习SparkSQL SparkSQL是非常成熟的海量结构化数据处理框架。 学…

在having、select子句中使用子查询

目录 在having子句中使用子查询 统计出部门平均工资高于公司平均工资的部门编号、平均工资、部门人数 在select子句中使用子查询 查询每个员工的编号、姓名、职位、部门名称 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 在havin…

销售线索获取 如何查找更多的销售线索平台

在进行销售工作时&#xff0c;寻找潜在客户和销售线索是非常重要的。只有及时地发现客户的需求和问题&#xff0c;才能更好地进行销售和提供服务。然而&#xff0c;在如今的市场环境中&#xff0c;客户的信息被广泛地分散在各个渠道和媒介上&#xff0c;如果仅靠人工搜索和整理…

如何优化Node.js应用的性能

随着Node.js在Web开发领域的广泛应用&#xff0c;越来越多的开发者开始关注如何优化Node.js应用的性能。优化Node.js应用的性能可以提升应用的响应速度&#xff0c;降低资源消耗&#xff0c;提升用户体验。在本文中&#xff0c;我们将探讨一些优化Node.js应用性能的方法和技巧。…

Nginx重写功能和反向代理

目录 一、重写功能rewrite 1. ngx_http_rewrite_module模块指令 1.1 if 指令 1.2 return 指令 1.3 set 指令 1.4 break 指令 2. rewrite 指令 3. 防盗链 3.1 实现盗链 3.2 实现防盗链 4. 实用网址 二、反向代理 1. 概述 2. 相关概念 3. 反向代理模块 4. 参数配置…

亿道丨三防平板丨如何从多方面选择合适的三防加固平板?

在如今这个信息爆炸的时代&#xff0c;移动设备已经成为我们生活和工作的必备工具。然而&#xff0c;在一些特殊的场合中&#xff0c;普通的平板电脑可能无法满足需求&#xff0c;比如工厂车间、野外作业、极端天气等环境下。此时&#xff0c;三防平板就成了不二之选。那么&…

SpringCloud-Docker安装与详解

Docker 是一款强大的容器化平台&#xff0c;通过其轻量级的容器技术&#xff0c;使应用程序的开发、部署和管理变得更加便捷和高效。本文将深入探讨 Docker 的安装过程&#xff0c;并详细解析其基本概念、组件及常用命令&#xff0c;以帮助读者充分理解和熟练使用 Docker。企业…

mac安装zookeeper

下载地址&#xff1a; http://archive.apache.org/dist/zookeeper/ 注意&#xff1a;由于Zookeeper从3.5.5版本开始&#xff0c;带有bin名称的包才是我们想要的下载可以直接使用的里面有编译后的二进制的包&#xff0c;而之前的普通的tar.gz的包里面是只是源码的包无法直接使…

Laravel04 eloquent

eloquent 1. eloquent2. 创建eloquent model 以及 取数据 1. eloquent 文档地址&#xff1a; https://learnku.com/docs/laravel/8.x/eloquent/9406 下面是我们&#xff0c;通过laravel的DB类从数据库中获取了post记录&#xff0c;那么有没有可能我们直接获取一个post对象&am…

Ps:索引颜色模式

Ps菜单&#xff1a;图像/模式/索引颜色 Image/Mode/Indexed Color 索引颜色 Indexed Color模式可生成最多 256 种颜色的 8 位图像文件。 这种颜色的限制使得索引颜色模式的图像文件相比于全彩图像&#xff08;如 RGB 颜色模式下的图像&#xff09;具有更小的文件大小&#xff0…