一、问题引出
object TestRDDPersist { def main( args: Array[ String ] ) : Unit = { val conf = new SparkConf( ) . setMaster( "local[*]" ) . setAppName( "persist" ) val sc = new SparkContext( conf) val rdd = sc. makeRDD( List( "hello world" , "hello spark" ) ) val flatRdd = rdd. flatMap( _. split( " " ) ) val mapRdd = flatRdd. map( word => { println( "@@@@@@@@@@" ) ( word, 1 ) } ) val reduceRdd = mapRdd. reduceByKey( _ + _) reduceRdd. collect( ) . foreach( println) println( "**********" ) val groupRdd = mapRdd. groupByKey( ) groupRdd. collect( ) . foreach( println) }
}
二、RDD Cache
object TestRDDPersist { def main( args: Array[ String ] ) : Unit = { val conf = new SparkConf( ) . setMaster( "local[*]" ) . setAppName( "persist" ) val sc = new SparkContext( conf) val rdd = sc. makeRDD( List( "hello world" , "hello spark" ) ) val flatRdd = rdd. flatMap( _. split( " " ) ) val mapRdd = flatRdd. map( word => { println( "@@@@@@@@@@" ) ( word, 1 ) } ) mapRdd. persist( ) val reduceRdd = mapRdd. reduceByKey( _ + _) reduceRdd. collect( ) . foreach( println) println( "**********" ) val groupRdd = mapRdd. groupByKey( ) groupRdd. collect( ) . foreach( println) }
}
object StorageLevel { val NONE = new StorageLevel( false , false , false , false ) val DISK_ONLY = new StorageLevel( true , false , false , false ) val DISK_ONLY_2 = new StorageLevel( true , false , false , false , 2 ) val MEMORY_ONLY = new StorageLevel( false , true , false , true ) val MEMORY_ONLY_2 = new StorageLevel( false , true , false , true , 2 ) val MEMORY_ONLY_SER = new StorageLevel( false , true , false , false ) val MEMORY_ONLY_SER_2 = new StorageLevel( false , true , false , false , 2 ) val MEMORY_AND_DISK = new StorageLevel( true , true , false , true ) val MEMORY_AND_DISK_2 = new StorageLevel( true , true , false , true , 2 ) val MEMORY_AND_DISK_SER = new StorageLevel( true , true , false , false ) val MEMORY_AND_DISK_SER_2 = new StorageLevel( true , true , false , false , 2 ) val OFF_HEAP = new StorageLevel( true , true , true , false , 1 )
}
三、RDD CheckPoint
object TestRDDPersist { def main( args: Array[ String ] ) : Unit = { val conf = new SparkConf( ) . setMaster( "local[*]" ) . setAppName( "persist" ) val sc = new SparkContext( conf) sc. setCheckpointDir( "checkpoint" ) val rdd = sc. makeRDD( List( "hello world" , "hello spark" ) ) val flatRdd = rdd. flatMap( _. split( " " ) ) val mapRdd = flatRdd. map( word => { println( "@@@@@@@@@@" ) ( word, 1 ) } ) mapRdd. checkpoint( ) val reduceRdd = mapRdd. reduceByKey( _ + _) reduceRdd. collect( ) . foreach( println) println( "**********" ) val groupRdd = mapRdd. groupByKey( ) groupRdd. collect( ) . foreach( println) }
}
四、缓存和检查点区别
cache 和 persist 会在原有的血缘关系中添加新的依赖,一旦数据出错可以重头读取数据;checkpoint 检查点会切断原有的血缘关系,重新建立新的血缘关系,相当于改变数据源 cache 是将数据临时存储在 JVM 堆内存中,性能较高,但安全性低,persist 可以指定存储级别,将数据临时存储在磁盘文件中,涉及到 IO,性能较低,作业执行完毕后临时文件会被删除;checkpoint 是将数据长久地存储分布式文件系统中,安全性较高,但涉及 IO 且会独立开启一个作业从数据源开始获取数据,所以性能较低,一般在 checkpoint 前先进行 cache,当 checkpoint 时 job 只需从缓存中读取数据即可,可以提高性能