spark-streaming连接消费nsq
目的
使用 NSQ作为消息流
使用 spark-streaming 进行消费
对数据进行清洗后,保存到hive仓库中
连接方案
1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器),详细见文档
2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient ,详细见文档
详细代码
自定义连接器
ReliableNSQReceiver.scala
import com.github.brainlag.nsq.callbacks.NSQMessageCallback
import com.github.brainlag.nsq.lookup.DefaultNSQLookup
import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {
def message(message: NSQMessage): Unit ={
val s = new String(message.getMessage())
store_fun(s)
message.finished()
}
}
/* 自定义连接器 */
class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
var consumer: NSQConsumer = null
def onStart() {
// 启动通过连接接收数据的线程
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
logInfo("Stopped receiving")
consumer.close
}
/** 接收数据 */
private def receive() {
try {
val lookup = new DefaultNSQLookup
lookup.addLookupAddress(host, port)
consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))
consumer.start
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
}
使用连接器
import com.google.gson.JsonParser
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 在定义一个 context 之后,您必须执行以下操作.
* 通过创建输入 DStreams 来定义输入源.
* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).
* 开始接收输入并且使用 streamingContext.start() 来处理数据.
* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).
* 使用 streamingContext.stop() 来手动的停止处理.
*/
object ELKStreaming extends Logging{
def main(args: Array[String]): Unit ={
if (args.length < 4) {
System.err.println("Usage: ELKStreaming ")
System.exit(1)
}
logInfo("start ===========>")
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083")
// 创建一个批次间隔为10
val ssc = new StreamingContext(sparkConf, Seconds(args(2).toInt))
// 使用自定义的NSQReceiver
val lines = ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest"))
val hiveStream: DStream[(String, String)] = lines.map(line => prefix_exit(line))
// 将计算后的数据保存到hive中
hiveStream.foreachRDD(rdd => {
// 利用SparkConf来初始化SparkSession。
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
// 导入隐式转换来将RDD
import sparkSession.implicits._
// 将RDD转换成DF
val df: DataFrame = rdd.toDF("str", "ymd")
// 取出表中的字段
logInfo("df count ===========>"+ df.count)
df.createOrReplaceTempView("spark_logs")
sparkSession.sql("insert into "+args(3)+" partition (ymd) select str,ymd from spark_logs")
})
ssc.start()
ssc.awaitTermination()
}
def prefix_exit(line:String):(String,String) ={
// 对数据进行清洗计算
val obj = new JsonParser().parse(line).getAsJsonObject
val data_str1 = obj.get("recv_timestamp").toString().split("T|Z|\"")
val data_str2 = data_str1(1).split('-')
val data_str3 = data_str2(1)+"/"+data_str2(2)+"/"+data_str2(0)+" "+data_str1(2)+" [I] "+obj.get("index_type").toString().split("\"")(1)+" "+line
val data_str4 = data_str2(0)+data_str2(1)+data_str2(2)
(data_str3.toString(), data_str4.toString())
}
}