创建RDD
sc. textFile( "本地文件或者HDFS文件所在的路径地址" )
sc. parallelize( Array( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ) , num)
sc. makeRDD( Array( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ) , num)
rdd. getNumPartitions
rdd. partitions. length
集合的操作应用
集合包括list、array、map、collect、iterator
sortBy-排序
val res5 = sc. parallelize( List( 1 , 2 , 3 , 10 , 9 , 4 , 5 , 6 , 7 , 8 ) )
res5. map( _* 2 ) . sortBy( x=> x) . collect
res5. map( _* 2 ) . sortBy( x=> x, true ) . collect
res5. map( _* 2 ) . sortBy( x=> x, false ) . collect
filter-过滤
val rdd2 = sc. parallelize( Array( 10 , 1 , 11 , 101 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ) , 1 )
rdd2. filter( _ > 4 ) . sortBy( x=> x. toString, true ) . collect
flatMap-压扁
val rdd3= sc. parallelize( List( List( "a b c" , "a b b" ) , List( "e f g" , "a d e" ) , List( "a c h" , "j t rr" ) ) )
rdd3. flatMap( _. flatMap( _. split( " " ) ) ) . collect
union-并集
val rdd4 = sc. parallelize( List( 1 , 2 , 3 ) )
val rdd5 = sc. parallelize( List( 4 , 5 , 6 ) )
rdd4. union( rdd5) . collect
intersection-交集
val rdd10= sc. parallelize( List( 4 , 5 , 6 , 1 ) )
val rdd6= sc. parallelize( List( 4 , 5 , 6 , 1 , 1 , 14 , 5 ) )
rdd10. intersection( rdd6) . collect
join-连接
val rdd7= sc. parallelize( List( ( "hello" , 1 ) , ( "hello" , 2 ) ) )
val rdd8= sc. parallelize( List( ( "hello" , 1 ) , ( "hello" , 2 ) , ( "hello" , 1 ) ) )
rdd7. join( rdd8) . collect
rdd7. leftOuterJoin( rdd8) . collect
rdd7. rightOuterJoin( rdd8) . collect
groupByKey-以key分组
val rdd9 = rdd7 union rdd8
rdd9. groupByKey. collect
rdd9. groupByKey. map( x => ( x. _1, x. _2. sum) ) . collect
rdd9. reduceByKey( _+ _) . collect
reduceByKey-以key分组value相加
sc. textFile( "hdfs://input" ) . flatMap( _. split( " " ) ) . map( ( _, 1 ) ) . reduceByKey( _+ _) . sortBy( _. _2, false ) . collect
sc. textFile( "hdfs://input" ) . flatMap( _. split( " " ) ) . map( ( _, 1 ) ) . groupByKey. map( kv=> ( kv. _1, kv. _2. sum) ) . sortBy( _. _2, false ) . collect
cogroup-组合
val rdd7= sc. parallelize( List( ( "hello" , 1 ) , ( "hello" , 2 ) ) )
val rdd8= sc. parallelize( List( ( "hello" , 1 ) , ( "hello" , 2 ) , ( "hello" , 1 ) ) )
rdd7. cogroup( rdd8) . collect
rdd7. cogroup( rdd8) . map( p=> ( p. _1, p. _2. _1. sum + p. _2. _2. sum) ) . collect
cartesian-笛卡尔积
val rdd7= sc. parallelize( List( ( "hello" , 1 ) , ( "hello" , 2 ) ) )
val rdd8= sc. parallelize( List( ( "hello" , 1 ) , ( "hello" , 2 ) , ( "hello" , 1 ) ) )
rdd7. cartesian( rdd8) . collect
mapPartitions
val rdd1 = sc. parallelize( List( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ) , 3 )
rdd1. mapPartitions( ( it: Iterator[ Int ] ) => { it. toList. map( x => x * 10 ) . iterator} )
mapPartitionsWithIndex
val func = ( index: Int , iter: Iterator[ Int ] ) => { iter. map( x => "[partID:" + index + ", val: " + x + "]" )
}
val rdd1 = sc. parallelize( List( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ) , 2 )
rdd1. mapPartitionsWithIndex( func) . collect
aggregate
def func1( index: Int , iter: Iterator[ ( Int ) ] ) : Iterator[ String ] = { iter. toList. map( x => "[partID:" + index + ", val: " + x + "]" ) . iterator
}
val rdd1 = sc. parallelize( List( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ) , 2 )
rdd1. mapPartitionsWithIndex( func1) . collect
rdd1. aggregate( 0 ) ( math. max( _, _) , _ + _)
rdd1. aggregate( 5 ) ( math. max( _, _) , _ + _)
aggregateByKey
val pairRDD = sc. parallelize( List( ( "cat" , 2 ) , ( "cat" , 5 ) , ( "mouse" , 4 ) , ( "cat" , 12 ) , ( "dog" , 12 ) , ( "mouse" , 2 ) ) , 2 )
def func2( index: Int , iter: Iterator[ ( String , Int ) ] ) : Iterator[ String ] = { iter. map( x => "[partID:" + index + ", val: " + x + "]" )
}
pairRDD. mapPartitionsWithIndex( func2) . collect
pairRDD. aggregateByKey( 0 ) ( math. max( _, _) , _ + _) . collect
pairRDD. aggregateByKey( 100 ) ( math. max( _, _) , _ + _) . collect
pairRDD. aggregateByKey( 0 ) ( ( _+ _) , _ + _) . collect
checkpoint
sc. setCheckpointDir( "hdfs://check" )
val rdd = sc. textFile( "hdfs://input" ) . flatMap( _. split( " " ) ) . map( ( _, 1 ) ) . reduceByKey( _+ _)
rdd. checkpoint
rdd. isCheckpointed
rdd. count
rdd. isCheckpointed
rdd. getCheckpointFile
coalesce、repartition
val rdd1 = sc. parallelize( 1 to 10 , 10 )
val rdd2 = rdd1. coalesce( 2 , false )
rdd2. partitions. length
val rdd3 = rdd2. repartition( 5 )
rdd3. getNumPartitions
collectAsMap
val rdd = sc. parallelize( List( ( "a" , 1 ) , ( "b" , 2 ) ) )
rdd. collectAsMap
val rdd = sc. parallelize( List( ( "a" , 1 ) , ( "a" , 2 ) ) )
rdd. collectAsMap
combineByKey
val rdd1 = sc. textFile( "hdfs://input" ) . flatMap( _. split( " " ) ) . map( ( _, 1 ) )
val rdd2 = rdd1. combineByKey( x => x, ( a: Int , b: Int ) => a + b, ( m: Int , n: Int ) => m + n)
rdd2. collect
val rdd3 = rdd1. combineByKey( x => x + 10 , ( a: Int , b: Int ) => a + b, ( m: Int , n: Int ) => m + n)
rdd3. collect
val rdd4 = sc. parallelize( List( "dog" , "cat" , "gnu" , "salmon" , "rabbit" , "turkey" , "wolf" , "bear" , "bee" ) , 3 )
val rdd5 = sc. parallelize( List( 1 , 1 , 2 , 2 , 2 , 1 , 2 , 2 , 2 ) , 3 )
val rdd6 = rdd5. zip( rdd4)
val rdd7 = rdd6. combineByKey( List( _) , ( x: List[ String ] , y: String ) => x : + y, ( m: List[ String ] , n: List[ String ] ) => m ++ n)
rdd7. collect
countByKey
val rdd1 = sc. parallelize( List( ( "a" , 1 ) , ( "b" , 2 ) , ( "b" , 2 ) , ( "c" , 2 ) , ( "c" , 1 ) ) )
rdd1. countByKey
rdd1. countByValue
rdd1. reduceByKey( _+ _) . collect
filterByRange
val rdd1 = sc. parallelize( List( ( "e" , 5 ) , ( "c" , 3 ) , ( "d" , 4 ) , ( "c" , 2 ) , ( "a" , 1 ) ) )
val rdd2 = rdd1. filterByRange( "b" , "d" )
rdd2. collect
flatMapValues
val a = sc. parallelize( List( ( "a" , "1 2" ) , ( "b" , "3 4" ) ) )
a. flatMapValues( _. split( " " ) ) . collect
foldByKey
val rdd1 = sc. parallelize( List( "dog" , "wolf" , "cat" , "bear" ) , 2 )
val rdd2 = rdd1. map( x => ( x. length, x) )
val rdd3 = rdd2. foldByKey( "" ) ( _+ _)
rdd3. collect
foreachPartition
val rdd1 = sc. parallelize( List( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ) , 3 )
rdd1. foreachPartition( x => println( x. reduce( _ + _) ) )
keyBy
val rdd1 = sc. parallelize( List( "dog" , "salmon" , "salmon" , "rat" , "elephant" ) , 3 )
val rdd2 = rdd1. keyBy( _. length)
rdd2. collect
keys、values
val rdd1 = sc. parallelize( List( "dog" , "tiger" , "lion" , "cat" , "panther" , "eagle" ) , 2 )
val rdd2 = rdd1. map( x => ( x. length, x) )
rdd2. keys. collect
rdd2. values. collect