报错java.lang.OutOfMemoryError: GC overhead limit exceeded
HDFS上有一些每天增长的文件,使用Snappy压缩,突然某天OOM了
1.原因:
因为snappy不能split切片,也就会导致一个文件将会由一个task来读取,读取后解压,数据又会膨胀好多倍,如果文件数太大而且你的并行度也挺大,就会导致大量full gc,最终OOM
为了程序能快速跑起来,只好将最后入HDFS前reparation(500),修改为1000,增加文件数,减少每个文件数据量。
最好是能够修改压缩方式,一个spark程序的输出会作为另一个spark程序的输入使用不能切片的压缩方式是肯定不行的,所以后面还是将他们修改成lzo了
lzo压缩配置过程祥见https://blog.csdn.net/weixin_43736084/article/details/122873460
我自己的理解是如果HDFS上的Snappy压缩文件需要Spark等计算框架取读取并且需要切片,那么就要手动将文件大小控制在128MB附近,以使计算效率最大化,但是解压后的数据仍然翻大约3倍
下面记录一下,找问题,修复问题的过程,以及Spark参数调优的过程,很漫长,太菜了。
HDFS上的文件
可以看到一个文件快700MB了,一共500个文件。
当使用Spark读取后做一系列计算就开始报错OOM,用Jstat -gc pid 1000
查看gc情况,发现当执行到stage1的textFile读取文件时就会卡住然后一直进行full GC,最终OOM。
1.以为是数据倾斜,因为总是某个stage某个task执行时卡住
对数据进行抽样查看,发现并没有数据倾斜,基本上没有重复数据
rdd.sample(0.1,false).countByKey().forEach((k,v) ->System.out.println(k+"---"+v));
2.可能内存不够用,或者参数调整的不对,开始调参
**方向:1.加内存、2.堆外内存、3.调JVM参数、4.调整缓存和执行参数比例、5.增加核数增加并行度,减少每个task处理的数据量、6、调整代码增加shuffer时的分区数、6.调整代码先聚和之类的
因为代码比较多,划分的stage比较多,一时间通过WEB UI没看出来是当前stage中哪个算子出现的问题,一直以为是reduceBykey的时候报的错,导致方向找错了,所以怎么调整都是错的
收获就是对调参的了解更熟练了。
1.一开始就以为是执行内存不足所以我将fraction调为0.8,storageFraction调为0.2,不断的增大执行内存,都无济于事。
--conf spark.memory.fraction=0.8
--conf spark.memory.storageFraction=0.3
2.调整代码提前过滤数据之类的全部尝试了,没用。
3.期间我修改了读取文件时的分区数竟然没想到是切片问题,发现不管怎么调整都只有500,当时还在疑惑为啥啊,┭┮﹏┭┮
3.后来看WEB UI中task执行情况,其实卡住的task一直在读取数据,input Size 项是在不断增加的,下面是后来我修改后读取的大小,一直读到700MB,过程很慢,而且如果core数大也就是并行度大的话,我这里320个task并行度,每一个要读取700MB,并且是需要解压的,Snappy解压后3G左右,查看task的errlog会发现日志出现具体我记不清了,大概就是读取xxxx文件3G,通过内存spill到了磁盘多少多少之类的,看到一个task读了这么大,才忽然想起来snappy是不能split切片的
就会导致一个文件使用一个task读取全部数据并解压,最终OOM
也可能我当时参数调整的不好,后面学习并总结了一下内存调参,详见:
https://blog.csdn.net/weixin_43736084/article/details/121541393
参数调整:
# --为了好看换行了
spark-submit --master spark://11.172.54.167:7077
--class $main --deploy-mode client --driver-memory 16g
--executor-memory 25g
--executor-cores 8
--total-executor-cores 320
--conf spark.memory.fraction=0.8
--conf spark.memory.storageFraction=0.3
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=5g
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-TieredCompilation -XX:G1HeapRegionSize=16m -XX:InitiatingHeapOccupancyPercent=55 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:-UseCompressedClassPointers -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=256m -XX:ReservedCodeCacheSize=512m -XX:+UseCodeCacheFlushing -XX:ParallelGCThreads=20 -XX:ConcGCThreads=20 -Xms20g -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
--jars $jars xxxx.jar $date1 $max $date2 >> log/$log_file#代码内参数
conf.set("spark.driver.maxResultSize", "8g");
conf.set("spark.serialize", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{ImmutableBytesWritable.class, HyperLogLog.class, HashSet.class, RegisterSet.class, IllegalArgumentException.class, FileCommitProtocol.TaskCommitMessage.class});
//conf.set("spark.kryo.registrationRequired","true"); #开启的话类没加到上面会报错
conf.set("spark.kryoserializer.buffer.mb", "10");
conf.set("spark.shuffle.file.buffer", "128");
conf.set("spark.reducer.maxSizeInFlight", "144");
conf.set("spark.shuffle.io.maxRetries", "50");
conf.set("spark.shuffle.io.retryWait", "5s");