Flink,spark对比

三:az 如何调度Spark、Flink,MR 任务
首先,使用java编写一个spark任务,定义一个类,它有main方法,里面写好逻辑,sparkConf 和JavaSparkContext 获取上下文,然后打成一个jar包,创建一个sh文件,使用spark提交任务的spark-submit 命令,指定jar包和对应的类名,和运行的参数,然后在job 文件里面指定sh 脚本,接着dependencies指定好依赖就行。最终打包成一个zip包上传。

如果是提交flink任务呢,也是定义一个类,在main方法里,Flink 流批任务只需要分别使用StreamExecutionEnvironment或者ExecutionEnvironment获取对应的执行环境,然后获取到DataStream 或者DataSet, 然后进行一系列的转换,最终达成一个jar 包,它是使用/bin/flink run 去提交任务的,后面的参数指定和spark 大同小异 ,az 也大同小异

MR 如何提交任务呢,肯定要编写Mapper和Reducer的实现处理类,然后有个主类,获取到Hadoop 的Configuration 的对应环境配置,获取到job 指定输入输出以及Mapper以及reducer类,然后打包成一个jar包,使用hadoop jar xx.jar 提交任务。

四:简单介绍下Flink
那就对比下Flink,Spark,MapReduce
Flink ,大数据分布式处理框架,从流处理开始,打造流批一体的框架,用于对无界和有界的数据流进行有状态计算,提供了诸多高级api供用户开发分布式任务,提供了数据分布,容错机制,资源管理和调度等功能

4.1: 首先从编程模型来看,MR的基础就是一条record,spark 就是RDD,rdd就是一批数据,而Flink 是DataStream 和 DataSet,这两个也是一批数据;从这个最开始的编程模型的输入来看就知道spark以及Flink 比 MR 快,后续的数据转换spark和Flink 都有丰富的算子(transform和collect 算子,flink是operator chain),而MR就很局限了,要自己定义
4.2:从数据流转的介质来看,MR会落盘,就是那个Map阶段的结尾会落盘,涉及到磁盘I/O,比较耗费时间;其实Flink 和Spark 也会进行数据的落盘,但是他们和mr的最大的本质不同就是他们可以把数据放在内存中,最后再落盘,而MR一定会落盘;

4.3:算子方面,flink是dataset api,DataStream API, table api, sql;而spark 是 RDD, DataSet, DataFrame, sparkSql;Flink 的核心引擎是runTime,spark的是SparkCore

五:Flink 和sparkStreaming 的区别
5.1: 一个实时,一个微批
5.2: 一个使用StreamingExecutionEnvironment, 一个使用JavaStreamingContext;
5.3: 一个DataStream, 一个是Dstream 的流数据
5.4: 任务调度来说,一个是会依次创建StreamGraph, JobGraph, ExecutionGraph,JobManager 调度ExecutionGraph;而另一个是 创建DstreamGraph, JobGenerator, 和JobScheduler
5.5: 时间机制方面,一个是有数据时间,摄入时间和处理时间;而sparkStreaming 是只有处理时间
5.6: 容错方面,Flink 有分布式快照,使用两阶段提交协议可以做到只有一次处理,而sparkStreaming 也有checkpoint ,能恢复数据,但是做不到恰好一次处理,可能会重复。

六:Flink 和spark的checkpoint 的异同点
6.1: checkpoint 说白了都是为了持久化数据的,Flink 是保存比如某个数据的状态,说白了就是会动态变化的值,比如用户的订单总额就是用户订单数据的状态,而spark 是保存RDD的数据到hdfs,截断RDD,防止数据异常中断,可以恢复;不过都是把内存中的数据持久化到外部的系统中,这里一般是hdfs,持久化嘛
6.2: checkpoint的触发方式不一样,Flink 的checkpoint 是由jobManager 定时触发的,如果配置了的话;而Spark是需要在代码中手动触发的
6.3: checkpoint 的触发机制不一样,Flink的checkpoint 说白了有两个阶段,预提交阶段和提交阶段,预提交阶段会做三个事,如下所示:
6.3.1: 进行checkpoint, 比如记录了用户1和2的订单金额分别是200和300
6.3.2: 写WAL 日志,就是用户1和2又有新的动作,由增加了订单金额100和50(这个可以认为是状态)
6.3.3: 锁定资源,告诉外部系统,用户1和2的订单总金额分别是300和350,但是让外部系统知道,并不是立马更新
如果上述有任何一步失败,我们都会滚到上个checkpoint,然后接下来就是提交阶段,会做两个事:
6.3.4: 把checkpoint 的状态提交
6.3.5: 外部系统更新对应的订单总金额300和350

如果是spark的checkpoint ,则直接把数据存储到hdfs了,没有啥特殊的。

7:Flink 和Spark的集群规模
Flink on yarn,一般是10台;cpu核数是36;内存是128G;
spark on yarn,是200台,pb级别的数据,cpu 核数是36,内存是128G

8:Flink 和spark, yarn 的集群角色
8.1:说明
Flink 是有client,jobManager 以及taskManger;client 是提交任务的作用,并且接收结果返回;而JobManager 接收提交任务,进行任务调度,故障恢复,容错管理;管理tm;
spark 也是有driver,master 以及 worker,和flink的一一对应,此外还有个executor 和 clusterManager
yarn 则是有ResourceManager(整体资源的管理), NoderManager(管理节点上的资源), ApplicationMaster(一个应用程序的管理者),Container(实际运行程序的容器)以及Client

9:flink 以及 spark 还有Mr 提交任务到yarn上的流程对比
9.1:Flink 提交任务流程如下,Flink 支持三种模式,session 模式,perJob模式和Application 模式,前面两者都相当于spark的yarn-cleint 模式,一个是共享资源,一个独享资源;而Application 模式是相当于spark的yarn-cluster 模式,客户端在yarn上,生产环境使用application模式,如下所示:
在这里插入图片描述
这里的ResourceManager 是flink 自己的,不是yarn的

9.2:spark 在yarn上有yarn-client 模式和yarn-cluster 模式之分,一般我们使用yarn-cluster 模式,这个最主要的点就是driver 是在客户端还是yarn上,这里的applicationMaster 就可以理解为Driver,生产环境如下:
在这里插入图片描述
10. Flink 的TaskSlot
它的目的是为了控制一个taskManager 能运行多少个task,所以对资源进行了分配,划分成不同的slot,一般和cpu是1:1 的关系,所以一个算子分布在不同的taskManger 上面,在一个tm的并行度和slot是一比一的关系,那么全局的并行度就是我们自己设置的并行度了,不过我们在考虑的时候就是考虑单个tm里面的并行度好点;slot 做了内存隔离,没有做cpu的隔离。

11:Flink 和spark的常用算子比较
FLink 独有的算子,keyBy, process, window
spark 独有的,mapPartition, repartition,colease, union ; transformation 和 action 算子

12.Flink 分区策略
GlobalPartitioner; ShufflePartitioner, RebalancePartitioner; RescalePartitioner(根据上下游算子的并行度分发数据), BrodcastPartitioner,ForwardPartitioner(上下游算子并行度一致);KeyGroupStreamPartitioner(Hash分区),CustomPartitioner(自定义分区策略)
Flink的默认分区数就是等于并行度

spark的默认分区数等于cpu的核数,也可以使用repartition,

13:Flink 和Spark的编程流转区别
Flink 流式这边一直返回的会是DataStream, 批返回的是DataSet的数据集
而Spark这边流失返回的会是Dstream以及衍生类的数据集,而批返回的则是RDD以及衍生类的数据集

14: Spark 和Flink 的序列化
为什么这两者都要实现自己的序列化框架呢,因为Java的序列化存储密度低,分布式计算的话内存要用在刀刃上,所以他们实现了自己的序列化框架,Spark 是使用了KyroSerializer 序列化,Flink的序列化的基本类是TypeInfomation.

15: Spark 和flink的反压机制
spark.streaming.backpressure.enabled, sparkStreaming 动态调整,
Flink 手动调整,看并行度,算子处理情况。

16:flink 和spark 数据在内存的抽象
16.1: 就是java对象 --StreamRecord–Buffer–memorySegment–Byte数组
16.2 RDD在缓存到内存之前,partition中record对象实例在堆内other内存区域中的不连续空间中存储。RDD的缓存过程中, 不连续存储空间内的partition被转换为连续存储空间的Block对象,并在Storage内存区域存储,此过程被称作为Unroll(展开)。

17: Spark 和Flink以及Hive 调优
都是从三个方面来说,
分别是资源调优,代码性能调优,业务调优
17.1: 对于spark 和Flink 来说,资源调优方面,可以使得单个executor 或者taskManager 可以使用的内存和cpu最大的话就尽量可以配置最大,先说spark;
17.1.1: spark一般调整的就是num-executors ,相当于flink的tm的个数;executro-memory, executor-cores,以及driver-memory 分别相当于tm的内存,tm的slot 个数,jm的内存;spark.default.parallelism 也相当于flink的并行度,spark.storage.memoryFraction 是用来持久化RDD的那部分内存,一般是executor-memory 堆内内存的60%的50%;spark.shuffle.memoryFraction就是用来shuffle的内存,和刚刚的一样,占有堆内内存的60%的50%;所以实际生产看看到底哪个用的多一点,就多给点

17.1.2: 在资源参数这里,hive需要调整的无非也是内存和cpu这方面,如下所示:
mapreduce.map.java.opts, map 阶段的jvm进程的堆内存;
mapreduce.map.memory.mb,map阶段的jvm 进程的堆内存和堆外内存的和;
mapreduce.reduce.java.opts,reduce 阶段的jvm进程的堆内存;
mapreduce.reduce.memory.mb,reduce 阶段的 的jvm 进程的堆内存和堆外内存的和;
mapreduce.map/reduce.cpu.vcores, map 和reduce 阶段可用的cpu 的个数;当给大点

但是hive中的map和reduce 的task的数量取决于总文件的个数和每个文件数的大小,一般是每个文件数的大小起作用,如下所示:
mapred.min/max.split.size,就是可以分割文件的最小和最大文件大小,但是map的task数量还不是由这个决定的,还是由多个因素决定的,看下图
在这里插入图片描述
因为hadoop系统中dfs.block.size 一般是128M,所以如果我们没有设置上述的最小和最大的话,就是默认按照128去分割,如果要提高task数量,要么提高mapred.map.tasks的数量,要么增大mapred.min.split.size 的大小,到256M也可以。

那么reduce的task的数量呢?
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, mapred.reduce.tasks);
所以最直接的办法是通过mapred.reduce.tasks = 10 来设定就可以,当然设定太小了执行时间会长,所以要居中;太大的话则小文件过多,也不好。

17.2: 算子性能调优
17.2.1: spark算子性能调优
spark.sql.adaptive.enabled 默认为false 自适应执行框架的开关
spark.sql.adaptive.skewedJoin.enabled 默认为 false 倾斜处理开关
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 是用于聚合input的小文件,用于控制每个mapTask的输入文件,防止小文件过多时候,产生太多的task
spark.sql.autoBroadcastJoinThreshold 用于控制在spark sql中使用BroadcastJoin时候表的大小阈值,适当增大可以让一些表走BroadcastJoin,提升性能,但是如果设置太大又会造成driver内存压力
用 reduceByKey( ) 和 aggregrateByKey( ) 来取代 groupByKey,因为前者会进行预聚合
操作数据库建义采用foreachPartition( ) ,资源可以的情况下使用mapPartitions 代替map
数据复用使用persist
减少数据碎片使用 coalesce( )进行重分区
spark.shuffle.file.buffer参数是调节map端缓冲区大小,单位是kb,减少磁盘溢写次数;
spark.reducer.maxSizeInFlight 参数是调节shuffle的时候reduce端的缓冲区大小,单位是MB
spark.shuffle.io.maxRetries reduce端拉取重试次数,以及拉取失败等待间隔,spark.shuffle.io.retryWait,单位是s,比如60s
spark.shuffle.sort.bypassMergeThreshold, 如果确实不需要排序操纵,那就调大sortByPass的阈值,调大到400等,默认是200

17.2.2: Hive 性能调优
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 自动合并小文件
set hive.merge.mapredfiles = true; 设置reduce 端对输出文件的合并
set hive.archive.enabled=true; 使用hadoop archive 文件对小文件归档
set hive.mapred.mode=strict 开启严格模式;不允许对分区表查询where不带分区,order by 必须加上limit,不允许笛卡尔积等;
set hive.exec.parallel=true; //打开任务并行执行
set mapred.job.reuse.jvm.num.tasks=10 设置jvm重用
set hive.map.aggr=true; set hive.groupby.skewindata = true; 进行数据负载均衡,数据倾斜优化
set hive.fetch.task.conversion=more; 可以减少不必要的走mapreduce 任务
set hive.auto.convert.join = true; 开启map join

17.2.3: Flink 性能调优
算子方面暂无,主要是资源和倾斜方面,要改代码

17.3: 业务代码调优
最典型的问题,数据倾斜怎么办?
hive只能是自己可以通过刚刚那个skew_in_data 去均衡,那么flink 和spark呢?
17.3.1: spark和flink 数据倾斜处理
17.3.1.1: 碰到大量空值的或者就是某个大量值的,加上随机字符串,均匀shuffle
17.3…1.2: 把聚合的步骤往前放,放到hive或者mapreudce 里面去做
17.3.1.3: 过滤掉少数导致倾斜的key
17.3.1.4: 提高shuffle操作的并行度,增加并行处理能力
17.3.1.5: 两阶段聚合,局部聚合+全局聚合,对于倾斜的key打上随机浅醉,聚合后再去掉再聚合,这个适合聚合算子,不适合join
17.3.1.6: Reduce join 换成MapJoin
17.3.1.7: 倾斜key 拆分join,打上随机前缀,然后后续不倾斜的扩容和它join,最终过滤掉前缀得到正确结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/42216.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构——二叉树相关题目

1.寻找二叉树中数值为x的节点 //寻找二叉树中数值为x的节点 BTNode* TreeFind(BTNode* root, BTDataType x)//传过来二叉树的地址和根的地址,以及需要查找的数据 {if (root Null){return Null;}//首先需要先判断这个树是否为空,如果为空直接返回空if (…

【JavaWeb程序设计】JSP实现购物车功能

目录 一、结合之前所学的相关技术,编写代码实现以下购物车功能 1. 我实现的功能运行截图如下 (1)商品列表页面home.jsp (2)登录账号页面/未登录点击结账页面 (3)重新登录页面(记…

昇思25天学习打卡营第18天|ShuffleNet图像分类

一、简介: ShuffleNetV1是旷视科技提出的一种计算高效的CNN模型,和MobileNet, SqueezeNet等一样主要应用在移动端,所以模型的设计目标就是利用有限的计算资源来达到最好的模型精度。ShuffleNetV1的设计核心是引入了两种操作:Poin…

如何在centos7安装Docker

在centOS7中我们可以使用火山引擎镜像源镜像安装Docker,以下是具体的安装步骤。 step 1: 安装必要的一些系统工具 sudo yum install -y yum-utils Step 2: 添加软件源信息 sudo yum-config-manager --add-repo https://mirrors.ivolces.com/docker/linux/centos/docker-ce.r…

力扣双指针算法题目:二叉树的层序遍历(BFS)

目录 1.题目 2.思路解析 3.代码 1.题目 . - 力扣(LeetCode) 2.思路解析 对二叉树进行层序遍历,顾名思义,就是按每一层的顺序对二叉树一层一层地进行遍历 思路如下 从第一层开始,先将二叉树地头放入队列q&#xff0…

2007-2022年中国各企业数字化转型与供应链效率

企业数字化转型与供应链效率是现代企业管理和发展的两个关键方面。以下是对中国各企业数字化转型与供应链效率数据的介绍: 数据简介 企业数字化转型:指企业通过采用数字技术与创新方法,改造业务流程、组织结构和产品服务,以提升…

UCOS-III 系统移植

1. 移植前准备 1.1 源码下载 UCOS-III Kernel Source: https://github.com/weston-embedded/uC-OS3.git Micriμm CPU Source : https://github.com/weston-embedded/uC-CPU.git Micriμm Lib Source: https://github.com/weston-embedded…

多方SQL计算场景下,如何达成双方共识,确认多方计算作业的安全性

安全多方计算在SQL场景下的限制 随着MPC、隐私计算等概念的流行, 诸多政府机构、金融企业开始考虑参与到多方计算的场景中, 扩展数据的应用价值。 以下面这个场景为例, 银行可能希望获取水电局和税务局的数据,来综合计算得到各…

DolphinScheduler-3.1.9 资源中心实践

前言 目前DolphinScheduler最新的稳定版本是 3.1.9 ,基于此做些探索,逐渐深化学习路径,以便于加深理解。 3.2.1 是最新的版本。目前的稳定版本是 3.1.9 基础环境:Hadoop3.3, Java 8, Python3, MacOS14.2.1 一、本地伪分布式安装…

学习笔记——动态路由——IS-IS中间系统到中间系统(开销)

四、IS-IS开销 1、IS-IS 开销简介 在IS-IS协议刚面世时,互联网网络结构还非常简单,因此IS-IS早期的版本中只使用了6bit来描述链路开销,链路开销的取值范围是1-63。一条路由的开销范围只有10bit,取值范围是0-1023。 随着计…

前端实现无缝自动滚动动画

1. 前言: 前端使用HTMLCSS实现一个无缝滚动的列表效果 示例图: 2. 源码 html部分源码: <!--* Author: wangZhiyu <w3209605851163.com>* Date: 2024-07-05 23:33:20* LastEditTime: 2024-07-05 23:49:09* LastEditors: wangZhiyu <w3209605851163.com>* File…

【ubuntu】安装(升级)显卡驱动,黑屏|双屏无法使用问题解决方法

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 ubuntu 安装(升级)显卡驱动&#xff0c;黑屏|双屏无法使用问题解决方法 由于项目需要&#xff0c;对显卡驱动进行升级。升级完就黑屏。。。。&#xff0…

Fast R-CNN(论文阅读)

论文名&#xff1a;Fast R-CNN 论文作者&#xff1a;Ross Girshick 期刊/会议名&#xff1a;ICCV 2015 发表时间&#xff1a;2015-9 ​论文地址&#xff1a;https://arxiv.org/pdf/1504.08083 源码&#xff1a;https://github.com/rbgirshick/fast-rcnn 摘要 这篇论文提出了一…

BAT-致敬精简

什么是bat bat是windows的批处理程序&#xff0c;可以批量完成一些操作&#xff0c;方便快速。 往往我们可以出通过 winR键来打开指令窗口&#xff0c;这里输入的就是bat指令 这里就是bat界面 节约时间就是珍爱生命--你能想象以下2分钟的操作&#xff0c;bat只需要1秒钟 我…

考虑数据库粒度的设计-提升效率

目录 概要 场景 设计思路 小结 概要 公开的资料显示&#xff0c;数据库粒度是&#xff1a;“在数据库领域&#xff0c;特别是数据仓库的设计中&#xff0c;粒度是一个核心概念&#xff0c;它直接影响到数据分析的准确性和存储效率。粒度的设定涉及到数据的详细程度和精度&…

【JVM基础篇】Java的四种垃圾回收算法介绍

文章目录 垃圾回收算法垃圾回收算法的历史和分类垃圾回收算法的评价标准标记清除算法优缺点 复制算法优缺点 标记整理算法&#xff08;标记压缩算法&#xff09;优缺点 分代垃圾回收算法&#xff08;常用&#xff09;JVM参数设置使用Arthas查看内存分区垃圾回收执行流程分代GC算…

【SpringBoot】IDEA查看spring bean的依赖关系

前因&#xff1a;在研究springcloud config组件时&#xff0c;我发现config-server包下的EnvironmentController可以响应客户端的请求&#xff0c;但EnvironmentController并不在启动类所在的包路径下&#xff0c;所以我推测它是作为某个Bean方法在生效&#xff0c;寻找bean的依…

DAY1: 实习前期准备

文章目录 VS Code安装的插件C/CCMakeGitHub CopilotRemote-SSH收获 VS Code 下载链接&#xff1a;https://code.visualstudio.com 安装的插件 C/C 是什么&#xff1a;C/C IntelliSense, debugging, and code browsing. 为什么&#xff1a;初步了解如何在VS Code里使用C输出…

关于小爱同学自定义指令执行

1.前言 之前买了小爱同学音响&#xff0c;一直想让其让我的生活变得更智能&#xff0c;编写一些程序来完成一些自动化任务&#xff0c;但是经过搜索发现&#xff0c;官方开发者平台不能用了&#xff0c;寻找api阶段浪费了我很长时间。最后在github 开源项目发现了俩个比较关键…

13.SQL注入-宽字节

SQL注入-宽字节 含义&#xff1a; MySQL是用的PHP语言&#xff0c;然后PHP有addslashes()等函数&#xff0c;这类函数会自动过滤 ’ ‘’ null 等这些敏感字符&#xff0c;将它们转义成’ ‘’ \null&#xff1b;然后宽字节字符集比如GBK它会自动把两个字节的字符识别为一个汉…