// 声明采集器
// 1)继承Receiver
// 2) 重写方法 onStart,onStop
package date_10_16_SparkStreamingimport java.io.{BufferedReader, InputStreamReader}
import java.net.Socketimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiverobject MyReceiver{def main(args: Array[String]): Unit = {//使用SparkStreaming完成wordcount//配置对象val conf = new SparkConf().setMaster("local[*]").setAppName("wordcount")//实时数据分析的环境对象//StreamingContext需要两个参数,一个conf,一个是采集周期val streamingContext = new StreamingContext(conf,Seconds(5))//从指定的端口中采集数据val socketLineDstream = streamingContext.receiverStream(new MyReceiver1("chun1",9999))//将采集的数据进行分解(扁平化)val wordToSumDstream = socketLineDstream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)wordToSumDstream.print()//这里不能停止采集功能,也就是streamingContext不能结束//可以简单理解为启动采集器streamingContext.start()//Driver等待采集器,采集器不挺Driver不停止streamingContext.awaitTermination()}}// 声明采集器
// 1)继承Receiver
// 2) 重写方法 onStart,onStop
class MyReceiver1(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){var socket : Socket = nulldef receive(): Unit = {socket = new Socket(host,port)val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))var line : String = nullwhile ((line = reader.readLine()) != null){//将采集器的数据存储到采集器内部进行转换if ("END".equals(line)){return}else{this.store(line)}}}override def onStart(): Unit = {new Thread(new Runnable {override def run(): Unit = {receive()}}).start()}override def onStop(): Unit = {if (socket !=null){socket.close()socket = null}}
}