使用的Spark2以上版本所以只考虑UnifiedMemoryManager动态内存管理,如图:
1. 内存划分 与 内存计算 与 调参方式
1.1 三部分:Spark内存、用户内存、预留内存
- 预留内存:300MB 固定
- Spark内存和用户内存比例由参数
spark.memory.fraction
(默认0.75) 控制 - 计算公式:
假设:我们在submit提交参数设置executor.memeory
= 10G + 300M (方便计算),我们叫他为系统内存
那么:
Spark内存 = (系统内存 - 预留内存) fraction = (10G + 300M - 300M) * 0.75 = 7.5G*
用户内存 = (系统内存 - 预留内存) (1 - fraction) = 10 G * (1 - 0.75) = 2.5G*
1.2 Spark内存又分为两部分:存储内存、执行内存
- 存储内存、执行内存比例有
spark.memory.storageFraction
(默认0.5) 控制 - 计算公式:
假设:1.1的计算成立
那么:
存储内存 = Spark内存*storageFraction = 7.5G * 0.5 = 3.75G
执行内存 = Spark内存*storageFraction = 7.5G * (1 - 0.5) = 3.75G
2. 内存参数调优
可以看出来实际设置了10G,而执行内存却只有了3.5G,所以需要根据业务来进行参数调整。
2.1 调大spark.memory.storageFraction=0.6
如果程序中有需要使用内存Cache的而不需要太多计算shuffer之类的那么可以增加存储内存,调大spark.memory.storageFraction
参数
2.2 调小spark.memory.storageFraction=0.3
- 如果程序没有cache而又大量shuffer就需要执行内存大一些,调小
spark.memory.storageFraction
参数 - 如果我们程序运行比较慢了,可以使用
Jstat -gc pid
来查看GC情况,当发现GC频繁,就说明执行内存不够,需要调小spark.memory.storageFraction
参数
3. 再说一点:JVM参数调整
- 当
Jstat -gc pid
发现Minor GC频繁,但是Full GC几乎没有,那么就需要调整JVM参数来调大Eden伊甸园区,使用-Xmn
来调整 - 大小设置:
说一个案例:
假设:
1.Spark读取的是HDFS上的文件,HDFS上默认的Block块大小为128M
每个executor有8个core,executor-cores
来设置 - 估算Eden区大小应为128M * 8 大约1G,考虑survivor区
-Xmn
= 4/3 * 1 G 大约1.4GB - 2中是没有压缩的数据,假设数据进行压缩了,还要考虑压缩方式是否可以切片,例如使用Snappy压缩不能切片,每个文件700MB,那么Eden区的设置要把解压后数据膨胀考虑进去,假设膨胀3倍
估算Eden = 700M* 3 * 8 大约 16G
-Xmn
= 4/3 * 16G 大约 21.5GB,才能保证程序可以正常执行,要不然很容易出现OOM,当前还可以设置堆外内存来缓解压力--conf spark.memory.offHeap.size=5g
- 更换压缩方式,尽量使用能够切片的压缩方式,如果不能只能增加存储的文件数来减小每个task读取数据的大小,或者减少Executor core来增大每个core的内存,但是Spark的并行度会降低,执行时间变长,需要自己权衡利弊。
- 注意:使用G1垃圾回收器时,不要使用 -Xmn 选项或 -XX:NewRatio 等其他相关选项显式设置年轻代大小。固定年轻代的大小会覆盖暂停时间目标。可以仅使用-Xms,-Xmx和暂停时间目标-XX:MaxGCPauseMillis ,经过测试如果设置-Xmn会经常出现Full GC,去掉后Full GC为0次,而且总的GC时间大大减少,spark程序卡顿减少。
3.1 -Xmn
-Xmn这里需要多说一下,经过线上执行情况分析,一开始使用的-Xmn=18G,Spark程序执行时间2.2小时,并伴有几十次Full GC;去掉-Xmn后,Spark程序执行时间58分钟,0次Full GC。第二次测试时我将每个executor_core设置为了5,第一次为8,并没有控制变量,因为目地是优化不是测试。
4.对象的使用
为了减少有些数据结构与对象的元数据占用大量空间,尽量使用:
字符串代替对象,基本数据类型(int long)代替字符串,数组代替ArrayList等
String内部是一个char数据,char采用UTF-16编码,每个字符两字节,可以设置JVM参数 -XX:+UseCompressedStrings
采用8位来编码每一个ASCII字符来压缩字符。
下面是我的参数,需要根据自己的程序去修改一些参数值:
# submit参数
spark-submit --master spark://192.168.11.167:7077 \
--class $main --deploy-mode client --driver-memory 25g \
--executor-memory 45g \
--executor-cores 8 \
--total-executor-cores 320 \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.3 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=5g \
--conf spark.executor.memoryOverhead=5G \
--conf spark.speculation=true \
--conf spark.network.timeout=3000 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:-TieredCompilation -XX:G1HeapRegionSize=16m -XX:InitiatingHeapOccupancyPercent=55 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:-UseCompressedClassPointers -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=256m -XX:ReservedCodeCacheSize=512m -XX:+UseCodeCacheFlushing -XX:ParallelGCThreads=20 -XX:ConcGCThreads=20 -Xms20g -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
--jars $jars xxxx.jar $date1 $max $date2 >> log/$log_file#代码内参数
conf.set("spark.driver.maxResultSize", "8g");
conf.set("spark.serialize", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{ImmutableBytesWritable.class, HyperLogLog.class, HashSet.class, RegisterSet.class, IllegalArgumentException.class, FileCommitProtocol.TaskCommitMessage.class});
//conf.set("spark.kryo.registrationRequired","true"); #开启的话类没加到上面会报错
conf.set("spark.kryoserializer.buffer.mb", "10");
conf.set("spark.shuffle.file.buffer", "128");
conf.set("spark.reducer.maxSizeInFlight", "144");
conf.set("spark.shuffle.io.maxRetries", "50");
conf.set("spark.shuffle.io.retryWait", "5s");