java多线程调用nsq消费_spark-streaming连接消费nsq

spark-streaming连接消费nsq

目的

使用 NSQ作为消息流

使用 spark-streaming 进行消费

对数据进行清洗后,保存到hive仓库中

连接方案

1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器),详细见文档

2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient ,详细见文档

详细代码

自定义连接器

ReliableNSQReceiver.scala

import com.github.brainlag.nsq.callbacks.NSQMessageCallback

import com.github.brainlag.nsq.lookup.DefaultNSQLookup

import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}

import org.apache.spark.internal.Logging

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.receiver.Receiver

class MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {

def message(message: NSQMessage): Unit ={

val s = new String(message.getMessage())

store_fun(s)

message.finished()

}

}

/* 自定义连接器 */

class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)

extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

var consumer: NSQConsumer = null

def onStart() {

// 启动通过连接接收数据的线程

new Thread("Socket Receiver") {

override def run() { receive() }

}.start()

}

def onStop() {

logInfo("Stopped receiving")

consumer.close

}

/** 接收数据 */

private def receive() {

try {

val lookup = new DefaultNSQLookup

lookup.addLookupAddress(host, port)

consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))

consumer.start

} catch {

case e: java.net.ConnectException =>

restart("Error connecting to " + host + ":" + port, e)

case t: Throwable =>

restart("Error receiving data", t)

}

}

}

使用连接器

import com.google.gson.JsonParser

import org.apache.spark.SparkConf

import org.apache.spark.internal.Logging

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.spark.streaming.dstream.DStream

import org.apache.spark.streaming.{Seconds, StreamingContext}

/*

* 在定义一个 context 之后,您必须执行以下操作.

* 通过创建输入 DStreams 来定义输入源.

* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).

* 开始接收输入并且使用 streamingContext.start() 来处理数据.

* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).

* 使用 streamingContext.stop() 来手动的停止处理.

*/

object ELKStreaming extends Logging{

def main(args: Array[String]): Unit ={

if (args.length < 4) {

System.err.println("Usage: ELKStreaming ")

System.exit(1)

}

logInfo("start ===========>")

StreamingExamples.setStreamingLogLevels()

val sparkConf = new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083")

// 创建一个批次间隔为10

val ssc = new StreamingContext(sparkConf, Seconds(args(2).toInt))

// 使用自定义的NSQReceiver

val lines = ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest"))

val hiveStream: DStream[(String, String)] = lines.map(line => prefix_exit(line))

// 将计算后的数据保存到hive中

hiveStream.foreachRDD(rdd => {

// 利用SparkConf来初始化SparkSession。

val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

// 导入隐式转换来将RDD

import sparkSession.implicits._

// 将RDD转换成DF

val df: DataFrame = rdd.toDF("str", "ymd")

// 取出表中的字段

logInfo("df count ===========>"+ df.count)

df.createOrReplaceTempView("spark_logs")

sparkSession.sql("insert into "+args(3)+" partition (ymd) select str,ymd from spark_logs")

})

ssc.start()

ssc.awaitTermination()

}

def prefix_exit(line:String):(String,String) ={

// 对数据进行清洗计算

val obj = new JsonParser().parse(line).getAsJsonObject

val data_str1 = obj.get("recv_timestamp").toString().split("T|Z|\"")

val data_str2 = data_str1(1).split('-')

val data_str3 = data_str2(1)+"/"+data_str2(2)+"/"+data_str2(0)+" "+data_str1(2)+" [I] "+obj.get("index_type").toString().split("\"")(1)+" "+line

val data_str4 = data_str2(0)+data_str2(1)+data_str2(2)

(data_str3.toString(), data_str4.toString())

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/409025.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[线性代数]Note4--A的LU分解转置-置换-向量空间

继续是线性代数的学习笔记&#xff0c;这次的笔记包含第四、五、六节三节课的内容。 第四节课是介绍A的LU分解。A的LU分解是指将矩阵A分解成一个下三角矩阵和一个上三角矩阵的乘积。其主要应用在数值分析中&#xff0c;用来解线性方程、求反矩阵或者计算行列式。 第五节课是介…

java:自定义数据库连接池

http://idata.blog.51cto.com/4581576/1159243转载于:https://www.cnblogs.com/fengjian/archive/2013/03/22/2975366.html

(转)C结构体之位域(位段)

转载自C结构体之位域(位段) 有些信息在存储时&#xff0c;并不需要占用一个完整的字节&#xff0c; 而只需占几个或一个二进制位。例如在存放一个开关量时&#xff0c;只有0和1 两种状态&#xff0c; 用一位二进位即可。为了节省存储空间&#xff0c;并使处理简便&#xff0c;C…

java为何重复调用方法_通过反射调用Java中的getter:重复调用它的最快方法是什么(在性能和可伸缩性方面)?...

小编典典您可以使用MethodHandle。其Javadoc写道&#xff1a;使用Lookup API中的工厂方法&#xff0c;可以将Core ReflectionAPI对象表示的任何类成员转换为行为等效的方法句柄。例如&#xff0c;可以使用Lookup.unreflect将反射方法转换为方法句柄。生成的方法句柄通常提供对底…

表单提交中get 和post方式的区别

两者的区别需要通过提交表单后才看得出来&#xff0c;主要是在数据发送方式和接收方式上 1.在客户端&#xff0c;Get方式在通过URL提交数据&#xff0c;就是把表单内的元素&#xff0c;转化成url参数提交&#xff0c;比如你有一个页面index.html&#xff0c;这个页面有一个文本…

linux mysql5.7.11_在Linux中以命令行方式安装 MySQL 5.7.11 for Linux Generic 二进制版本

转至: http://www.cnblogs.com/cyberniuniu/p/5273961.htmlMySQL 目前的最新版本是 5.7.11&#xff0c;在 Linux 下提供特定发行版安装包(如 .rpm)以及二进制通用版安装包(.tar.gz)。一般情况下&#xff0c;很多项目都倾向于采用二进制通用安装包形式来进行安装配置&#xff0c…

论文阅读(2)--Picking Deep Filter Responses for Fine-grained Image Recognition

这次阅读的文章是Picking Deep Filter Responses for Fine-grained Image Recognition&#xff0c;这篇文章是来自上海交通大学Xiaopeng Zhang等人的工作&#xff0c;该文章提出了一种对深度网络中的filter进行挑选的方法&#xff0c;基于挑选的filter的结果构建复杂特征表达。…

vc2008使用技巧

开发程序自动以管理员的身份运行&#xff1a; 2008里面自带一个选项&#xff1a;属性-配置属性-连接器-清单文件-Uac执行级别&#xff0c;里面可以选转载于:https://www.cnblogs.com/fwycmengsoft/archive/2013/03/26/2982874.html

论文阅读(3)--SPDA-CNN: Unifying Semantic Part Detection and Abstraction for Fine-grained Recognition

这篇文章是来自罗格斯大学的Han Zhang等人的工作。由题目可知与上一篇文章一样&#xff0c;本文的作者也关注到了富有语义的局部(利用Part&#xff0c;Part&#xff0c;Part&#xff0c;重要事情强调三遍)&#xff0c;作者不满足于CUB-2011数据库提供的head和body的定位结果&am…

从流水中倒推算出销量为多某值的日期

SELECT rq,spid,chkshl, (SELECT SUM(chkshl) AS chkshl FROM spls_ck WHERE T.plh < plh and spidSPH00009425 having SUM(chkshl)<10 ) AS chkshl FROM spls_ck T where spidSPH00009425 order by plh desc 转载于:https://www.cnblogs.com/bingyuw/archive/2013/03/29…

egg.js java 生产数据_eggjs中,自动从数据库直接生成model.

eggjs中,自动从数据库直接生成model.使用sequelize-auto可以自动生成models直接上命令就可以搞定了# 安装必要的库npm install -g sequelize-auto# MySQL/MariaDB 数据库安装对应的库,其他数据库请看文档npm install -g mysql# 从命令行生成modelssequelize-auto -o ./database…

论文阅读(4)--Part-Stacked CNN for Fine-Grained Visual Categorization

这篇文章是来自悉尼科技大学Shaoli Huang等人的工作&#xff0c;与前两篇文章的出发点类似&#xff0c;本篇文章也是在Parts上寻找Fine-Grained的线索&#xff0c;但与前两篇文章相比&#xff0c;在框架中人工的参与更少。同其它Fine-Grained分类任务相似&#xff0c;这篇文章也…

java组件自适应窗口大小_java swing 窗口和控件自适应大小

本文记录java开发CS结构时怎么自适应屏幕大小以及控件跟随frame大小变化大小、位置和字体大小需要注意&#xff1a;1、代码必须放置在其构造方法中。如&#xff1a;我的frame1是我frame.java的名&#xff0c;则代码放置在方法“public Frame1() ”中。2、放在控件初始化后的地方…

剑指offer--二维数组的查找

记录《剑指offer》上的算法题。完整的代码例子可以在我的Github 题目&#xff1a;在一个二维数组中&#xff0c;每一行按照从左到右递增的顺序排序&#xff0c;每一列都按照从上到下递增的顺序排。请完成一个函数&#xff0c;输入这样的一个二维数组和一个整数&#xff0c;判断…