- SparkConf方式
- SparkConf方式能够直接将属性值传递到SparkContext;
- SparkConf能够对某些通用属性直接配置,如master使用setMaster,appname使用setAppName;
- 也能够使用set()方法对属性进行键-值对配置,如set("spark.executor.memory", "1g") 。
- 命令行參数方式
- 这样的方式是在使用spark-submit或spark-shell提交应用程序的时候。用命令行參数提交;
- 这样的方式能够比較灵活的配置各个应用程序的执行环境;
- 能够通过spark-submit --help 或 spark-shell –help显示出属性的完整列表。
- 文件配置方式
- 该方式是将属性配置项以键值对方式写入文本文件里,一个配置项占一行;
- 该文件默觉得conf/spark-defaults.conf。spark-submit在提交应用程序的时候会检查是否存在该文件,有的话就将相关的属性配置加载;
- 该文件能够在 spark-submit的命令參数--properties-file定义不同的位置。
- 优先权
- SparkConf方式 > 命令行參数方式 >文件配置方式
- 查看Spark属性配置
- 通过应用程序的webUI(地址http://<driver>:4040)能够查看Spark属性配置,从而检查属性配置是否正确。
- 仅仅是显示通过上面三种方式显式指定的属性配置。对于其它属性能够假定使用默认配置;
- 对于大多数内部控制属性,系统已经提供了合理的默认配置。
属性名称 | 默认 | 含义 |
spark.app.name | 无 | 应用程序名称 |
spark.master | 无 | 要连接的群集管理器 |
spark.executor.memory | 512 m | 每一个executor使用的内存总量 |
spark.serializer | org.apache.spark.serializer. JavaSerializer | 在网络数据传送或缓存时使用的序化器,默认的序化器是Java序化器,尽管这样的序化器对不论什么Java对象能够使用,兼容性好,可是处理速度相当的慢。假设要追求处理速度的话,建议使用org.apache.spark.serializer.KryoSerializer序化器。当然也能够随意是定义为org.apache.spark.Serializer 子类的序化器。 |
spark.kryo.registrator | 无 | 假设要使用 Kryo序化器。须要创建一个继承KryoRegistrator的类并设置系统属性spark.kryo.registrator指向该类。 |
spark.local.dir | /tmp | 用于暂存空间的文件夹。该文件夹用于保存map输出文件或者转储RDD。 该文件夹位于快速的本地磁盘上,或者位于使用逗号分隔的多个不同磁盘上的文件夹。注意: 在Spark 1.0 及更高版本号这属性将被 群集管理器配置的环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 取代。 |
spark.logConf | false | SparkContext 启动时记录有效 SparkConf信息。 |
属性名称 | 默认 | 含义 |
spark.executor.memory | 512 m | 分配给每一个executor进程总内存(使用类似512m、2g格式) |
spark.executor.extraJavaOptions | 无 | 要传递给executor的额外 JVM 选项,注意不能使用它来设置Spark属性或堆大小设置。 |
spark.executor.extraClassPath | 无 | 追加到executor类路径中的附加类路径,主要为了兼容旧版本号的Spark。通常不须要用户设置。 |
spark.executor.extraLibraryPath | 无 | 启动executor JVM 时要用到的特殊库路径。 |
spark.files.userClassPathFirst | false | executor在载入类的时候是否优先使用用户自己定义的JAR包,而不是Spark带有的JAR包。此功能能够用于解决Spark依赖包和用户依赖包之间的冲突。 眼下,该属性仅仅是一项试验功能。 |
属性名称 | 默认 | 含义 |
spark.shuffle.consolidateFiles | false | 假设设置为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来说。合并文件能够提高文件系统性能。假设使用的是ext4 或 xfs 文件系统。建议设置为true;对于ext3。因为文件系统的限制,设置为true反而会使内核>8的机器减少性能。 |
spark.shuffle.spill | true | 假设设置为true,在shuffle期间通过溢出数据到磁盘来减少了内存使用总量。溢出阈值是由spark.shuffle.memoryFraction指定的。 |
spark.shuffle.spill.compress | true | 是否压缩在shuffle期间溢出的数据,假设压缩将使用spark.io.compression.codec。 |
spark.shuffle.compress | true | 是否压缩map输出文件,压缩将使用spark.io.compression.codec。 |
spark.shuffle.file.buffer.kb | 100 | 每一个shuffle的文件输出流内存缓冲区的大小,以KB为单位。 这些缓冲区能够降低磁盘寻道的次数,也降低创建shuffle中间文件时的系统调用。 |
spark.reducer.maxMbInFlight | 48 | 每一个reduce任务同一时候获取map输出的最大大小 (以兆字节为单位)。因为每一个map输出都须要一个缓冲区来接收它。这代表着每一个 reduce 任务有固定的内存开销。所以要设置小点,除非有非常大内存。 |
属性名称 | 默认 | 含义 |
spark.ui.port | 4040 | 应用程序webUI的port |
spark.ui.retainedStages | 1000 | 在GC之前webUI保留的stage数量 |
spark.ui.killEnabled | true | 同意在webUI将stage和对应的job杀死 |
spark.eventLog.enabled | false | 是否记录Spark事件,用于应用程序在完毕后重构webUI。 |
spark.eventLog.compress | false | 是否压缩记录Spark事件,前提spark.eventLog.enabled为true。 |
spark.eventLog.dir | file:///tmp/spark-events | 假设spark.eventLog.enabled为 true,该属性为记录spark事件的根文件夹。在此根文件夹中,Spark为每一个应用程序创建分文件夹。并将应用程序的事件记录到在此文件夹中。 用户能够将此属性设置为HDFS文件夹,以便history server读取历史记录文件。 |
属性名称 | 默认 | 含义 |
spark.broadcast.compress | true | 是否在发送之前压缩广播变量。 |
spark.rdd.compress | false | 是否压缩序化的RDD分区 ,能够节省大量空间。但会消耗一些额外的CPU时间。 |
spark.io.compression.codec | org.apache.spark.io. LZFCompressionCodec | 用于压缩内部数据如 RDD 分区和shuffle输出的编码解码器。Spark提供两个编解码器: org.apache.spark.io.LZFCompressionCodec和org.apache.spark.io.SnappyCompressionCodec。当中,Snappy提供更高速的压缩和解压缩,而LZF提供了更好的压缩比。 |
spark.io.compression.snappy .block.size | 32768 | 使用Snappy编码解码器时,编码解码器使用的块大小 (以字节为单位) 。 |
spark.closure.serializer | org.apache.spark.serializer. JavaSerializer | 用于闭包的序化器,眼下仅仅有支持Java序化器。 |
spark.serializer. objectStreamReset | 10000 | 使用 org.apache.spark.serializer.JavaSerializer序化时。序化器缓存对象以防止写入冗余数据,这时停止这些对象的垃圾收集。通过调用重置序化器,刷新该信息就能够收集旧对象。若要关闭这重定期重置功能将其设置为< = 0 。默认情况下每10000个对象将重置序化器。 |
spark.kryo.referenceTracking | true | 当使用Kryo序化数据时。是否跟踪对同一对象的引用。假设你的对象图有回路或者同一对象有多个副本。有必要设置为true。其它情况下能够禁用以提高性能。 |
spark.kryoserializer.buffer.mb | 2 | 在Kryo 里同意的最大对象大小(Kryo会创建一个缓冲区,至少和序化的最大单个对象一样大) 。 假设Kryo 出现缓冲区限制超出异常报错,添加这个值。注意。每一个worker的每一个core仅仅有一个缓冲区。 |
属性名称 | 默认 | 含义 |
spark.default.parallelism | 本地模式: 本地机器内核数 Mesos精细模式: 8 其它: 全部executor的core总数 或者2,以较大者为准 | 假设用户没设置,系统使用集群中执行shuffle操作的默认任务数(groupByKey、 reduceByKey等)。 |
spark.broadcast.factory | org.apache.spark.broadcast. HttpBroadcastFactory | 广播的实现类 |
spark.broadcast.blockSize | 4096 | TorrentBroadcastFactory块大小(以kb为单位)。太大值在广播时减少并行性 (使速度变慢)。太小值, BlockManager性能可能会受到冲击。 |
spark.files.overwrite | false | 通过 SparkContext.addFile() 加入的文件在目标中已经存在而且内容不匹配时,是否覆盖目标文件。 |
spark.files.fetchTimeout | false | 在获取由driver通过SparkContext.addFile() 加入的文件时,是否使用通信时间超时。 |
spark.storage.memoryFraction | 0.6 | Java堆用于cache的比例 |
spark.tachyonStore.baseDir | System.getProperty("java.io.tmpdir") | 用于存储RDD的techyon文件夹,tachyon文件系统的URL由spark.tachyonStore.url设置。 也能够是逗号分隔的多个techyon文件夹。 |
spark.storage. memoryMapThreshold | 8192 | 以字节为单位的块大小,用于磁盘读取一个块大小进行内存映射。这能够防止Spark在内存映射时使用非常小块,普通情况下,对块进行内存映射的开销接近或低于操作系统的页大小。 |
spark.tachyonStore.url | tachyon://localhost:19998 | 基于techyon文件的URL。 |
spark.cleaner.ttl | 无限 | spark记录不论什么元数据(stages生成、task生成等)的持续时间。定期清理能够确保将超期的元数据遗忘。这在执行长时间任务是非常实用的,如执行24/7的sparkstreaming任务。注意RDD持久化在内存中的超期数据也会被清理。 |
属性名称 | 默认 | 含义 |
spark.driver.host | 本地主机名 | 执行driver的主机名或 IP 地址。 |
spark.driver.port | 随机 | driver侦听的port。 |
spark.akka.frameSize | 10 | 以MB为单位的driver和executor之间通信信息的大小,设置值越大。driver能够接受更大的计算结果。 |
spark.akka.threads | 4 | 用于通信的actor线程数,在大型集群中拥有很多其它CPU内核的driver能够添加actor线程数。 |
spark.akka.timeout | 100 | 以秒为单位的Spark节点之间通信超时时间。 |
spark.akka.heartbeat.pauses | 600 | 以下3个參数是用于设置akka自带的故障探測器,设置非常大值的话。能够停用故障探測器。假设想启用故障探測器。以秒为单位设置这3个參数。 一般是在特殊须要的情况下开启故障探測器。一个敏感的故障探測器有助于恶意的executor的定位,而对于因为GC暂停或网络滞后引起的情况下,不须要开启故障探測器;另外故障探測器的开启会导致因为心跳信息的频繁交换而引起的网络泛滥。 |
spark.akka.failure-detector.threshold | 300.0 | 相应AKKA的akka.remote.transport-failure-detector.threshold |
spark.akka.heartbeat.interval | 1000 | 心跳间隔时间 |
属性名称 | 默认 | 含义 |
spark.task.cpus | 1 | 为每一个任务分配的内核数。 |
spark.task.maxFailures | 4 | job放弃task前该task的失败次数,该值>=1 |
spark.scheduler.mode | FIFO | SparkContext对job进行调度所採用的模式。 对于多用户可採用FAIR模式。 |
spark.cores.max | 未设置 | 当应用程序执行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内核总数(不是指每台机器,而是整个集群)。 假设不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值,而Mesos将使用集群中可用的内核。 |
spark.mesos.coarse | false | 假设设置为true。在Mesos集群中执行时使用粗粒度共享模式。 |
spark.speculation | false | 下面几个參数是关于Spark猜測运行机制的相关參数。此參数设定是否使用猜測运行机制,假设设置为true则spark使用猜測运行机制,对于Stage中拖后腿的Task在其它节点中又一次启动,并将最先完毕的Task的计算结果最为终于结果。 |
spark.speculation.interval | 100 | Spark多长时间进行检查task执行状态用以猜測。以毫秒为单位。 |
spark.speculation.quantile | 0.75 | 猜測启动前。Stage必需要完毕总Task的百分比。 |
spark.speculation.multiplier | 1.5 | 比已完毕Task的执行速度中位数慢多少倍才启用猜測 |
spark.locality.wait | 3000 | 下面几个參数是关于Spark数据本地性的相关參数。 本參数是以毫秒为单位启动本地数据task的等待时间,假设超出就启动下一本地优先级别的task。 该设置相同能够应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 随意节点 ),当然,也能够通过spark.locality.wait.node等參数设置不同优先级别的本地性。 |
spark.locality.wait.process | spark.locality.wait | 本地进程级别的本地等待时间 |
spark.locality.wait.node | spark.locality.wait | 本地节点级别的本地等待时间 |
spark.locality.wait.rack | spark.locality.wait | 本地机架级别的本地等待时间 |
spark.scheduler.revive.interval | 1000 | 复活又一次获取资源的Task的最长时间间隔(毫秒),发生在Task由于本地资源不足而将资源分配给其它Task执行后进入等待时间,假设这个等待时间内又一次获取足够的资源就继续计算。 |
属性名称 | 默认 | 含义 |
spark.authenticate | false | Spark是否启用内部身份验证。 |
spark.authenticate.secret | 无 | 设置Spark用于组件之间进行身份验证的密钥。假设不是YARN上执行而且spark.authenticate为true时,须要设置密钥。 |
spark.core.connection. auth.wait.timeout | 30 | Spark用于组件时间进行身份认证的超时时间。 |
spark.ui.filters | 无 | Spark web UI 要使用的以逗号分隔的筛选器名称列表。筛选器要符合javax servlet Filter标准,每一个筛选器的參数能够通过设置java系统属性来指定: spark.<class name of filter>.params='param1=value1,param2=value2' 比如: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing' |
spark.ui.acls.enable | false | Spark webUI存取权限是否启用。 假设启用。在用户浏览web界面的时候会检查用户是否有訪问权限。 |
spark.ui.view.acls | 空 | 以逗号分隔Spark webUI訪问用户的列表。默认情况下仅仅有启动Spark job的用户才有訪问权限。 |
属性名称 | 默认 | 含义 |
spark.streaming.blockInterval | 200 | 在时间间隔内(毫秒)Spark Streaming接收器将接收数据合并成数据块并存储在Spark。 |
spark.streaming.unpersist | true | 假设设置为true,强迫将SparkStreaming持久化的RDD数据从Spark内存中清理,相同的,SparkStreaming接收的原始输入数据也会自己主动被清理;假设设置为false,则同意原始输入数据和持久化的RDD数据可被外部的Streaming应用程序訪问,由于这些数据不会自己主动清理。 |
- SPARK_MASTER_OPTS 配置master使用的属性
- SPARK_WORKER_OPTS 配置worker使用的属性
- SPARK_DAEMON_JAVA_OPTS 配置master和work都使用的属性
export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"
# - 当中x代表属性,y代表属性值
属性名称 | 默认 | 含义 |
spark.deploy.spreadOut | true | Standalone集群管理器是否自由选择节点还是固定到尽可能少的节点,前者会有更好的数据本地性。后者对于计算密集型工作负载更有效 |
spark.deploy.defaultCores | 无限 | 假设没有设置spark.cores.max,该參数设置Standalone集群分配给应用程序的最大内核数,假设不设置,应用程序获取全部的有效内核。 注意在一个共享的集群中。设置一个低值防止攫取了全部的内核,影响他人的使用。 |
spark.worker.timeout | 60 | master由于没有收到心跳信息而觉得worker丢失的时间(秒) |
属性名称 | 默认 | 含义 |
spark.worker.cleanup.enabled | false | 是否定期清理worker的应用程序工作文件夹。仅仅适用于Standalone模式,不适用于YARN模式。清理的时候将无视应用程序是否在执行。 |
spark.worker.cleanup.interval | 1800 | 清理worker本地过期的应用程序工作文件夹的时间间隔(秒) |
spark.worker.cleanup.appDataTtl | 7*24*3600 | worker保留应用程序工作文件夹的有效时间。 该时间由磁盘空间、应用程序日志、应用程序的jar包以及应用程序的提交频率来设定。 |
属性名称 | 含义 |
spark.deploy.recoveryMode | 以下3个參数是用于配置zookeeper模式的master HA。
|
spark.deploy.zookeeper.url | zookeeper集群URL |
spark.deploy.zookeeper.dir | zooKeeper保存恢复状态的文件夹。缺省为/spark |
spark.deploy.recoveryMode | 设成FILESYSTEM启用master单节点恢复模式,缺省值为NONE |
spark.deploy.recoveryDirectory | Spark保存恢复状态的文件夹 |
属性名称 | 默认 | 含义 |
spark.yarn.applicationMaster.waitTries | 10 | RM等待Spark AppMaster启动次数,也就是SparkContext初始化次数。超过这个数值,启动失败。 |
spark.yarn.submit.file.replication | 3 | 应用程序上载到HDFS的文件的复制因子 |
spark.yarn.preserve.staging.files | false | 设置为true,在job结束后,将stage相关的文件保留而不是删除。 |
spark.yarn.scheduler.heartbeat.interval-ms | 5000 | Spark AppMaster发送心跳信息给YARN RM的时间间隔 |
spark.yarn.max.executor.failures | 2倍于executor数 | 导致应用程序宣告失败的最大executor失败数 |
spark.yarn.historyServer.address | 无 | Spark history server的地址(不要加http://)。这个地址会在Spark应用程序完毕后提交给YARN RM,然后RM将信息从RM UI写到history server UI上。 |