Spark Streaming整合logstash + Kafka wordCount

1、安装logstash,直接解压即可

测试logstash是否可以正常运行

bin/logstash -e 'input { stdin { } } output { stdout {codec => rubydebug } }'

只获取消息

bin/logstash -e 'input { stdin { } } output { stdout {codec => plain { format => "%{message}" } } }'

2、编写logstash配置文件
2、1在logstash目录下创建conf目录
2、2在conf目录下创建文件logstash.conf,内容如下

input {
file {
type => "logs"
path => "/home/hadoop/logs/*.log"
discover_interval => 10
start_position => "beginning" 
}
}output {
kafka {
codec => plain {
format => "%{message}"
}
topic_id => "spark"	
}
}

logstash input: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
logstash output: https://www.elastic.co/guide/en/logstash/current/output-plugins.html

3、启动logstash采集数据

bin/logstash -f conf/logstash.conf

4、代码

 

package bigdata.sparkimport org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}/*** Created by Administrator on 2017/4/28.*/
object SparkStreamDemo {def main(args: Array[String]) {val conf = new SparkConf()conf.setAppName("spark_streaming")conf.setMaster("local[*]")val sc = new SparkContext(conf)sc.setCheckpointDir("D:/checkpoints")sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc, Seconds(5))val topics = Map("spark" -> 2)val lines = KafkaUtils.createStream(ssc, "m1:2181,m2:2181,m3:2181", "spark", topics).map(_._2)val ds1 = lines.flatMap(_.split(" ")).map((_, 1))val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {Some(x.sum + y.getOrElse(0))})ds2.print()ssc.start()ssc.awaitTermination()}
}

  

 

转载于:https://www.cnblogs.com/heml/p/6796131.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/372287.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

js 变量类型

变量类型分为:基础类型和引用类型 基础类型:boolean, string, number, null, undefined, symbol 引用类型: array, object typeof: 判断变量的类型instanceof:判断某个对象是否是另外一个对象的实例主要还是理解这两个判断的不同之处&#xf…

python 相对导入_python 相对导入与绝对导入

Python 相对导入与绝对导入Python | Jul 21, 2016 | pythonPython 相对导入与绝对导入,这两个概念是相对于包内导入而言的。包内导入即是包内的模块导入包内部的模块。Python import 的搜索路径1.在当前目录下搜索该模块2.在环境变量 sys.path 中指定的路径列表中依…

具有Java Kickstart的MongoDB

NoSQL数据库由于其可伸缩性而变得越来越流行。 适当使用时 NoSQL数据库可以提供真正的好处。 MongoDB是使用C 编写的高度可扩展的开源NoSQL数据库。 1.安装MongoDB 您可以根据所使用的操作系统,按照MongoDB官方网站上的说明安装MongoDB,而不会遇到很多麻…

Linux Shell——函数的使用

文/一介书生&#xff0c;一枚码农。 scripts are for lazy people. 函数是存在内存里的一组代码的命名的元素。函数创建于脚本运行环境之中&#xff0c;并且可以执行。 函数的语法结构为&#xff1a; function <function-name> {<code to execute> } 创建函数不需要…

FFmpeg学习2:解码数据结构及函数总结

在上一篇文章中&#xff0c;对FFmpeg的视频解码过程做了一个总结。由于才接触FFmpeg&#xff0c;还是挺陌生的&#xff0c;这里就解码过程再做一个总结。本文的总结分为以下两个部分&#xff1a; 数据读取&#xff0c;主要关注在解码过程中所用到的FFmpeg中的结构体。解码过程中…

python1~10阶乘while_Python3基础 while 阶乘

?python : 3.7.0OS : Ubuntu 18.04.1 LTSIDE : PyCharm 2018.2.4conda : 4.5.11type setting : Markdown?code"""Author : 行初心Date : 18-9-24Blog : www.cnblogs.com/xingchuxinGitHub : github.com/GratefulHeartCoder"""def main():count…

JavaFX 2 GameTutorial第4部分

介绍 这是与JavaFX 2游戏教程相关的六个部分系列的第四部分。 如果您错过了第1部分 &#xff0c; 第2部分或第3部分 &#xff0c;我建议您在开始本教程之前仔细阅读它们。 回顾一下&#xff0c;在第3部分中&#xff0c;我为您提供了许多经典街机风格游戏和所使用的不同输入设备…

关于ListView的作业

原生布局并未多做修改 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android" xmlns:tools"http://schemas.android.com/tools" android:id"id/activity_m…

Java 7的类型推断

每个优秀的程序员都喜欢编写简洁但有效且经过优化的代码。 类型推断是JDK 7中引入的一种方法&#xff0c;它肯定会为您带来更少键入的好处。 您以以下方式使用Java代码已有很长时间了。 但是&#xff0c;在初始化Collections的特定实现时&#xff0c;您是否曾经想到过代码重复&…

python实现胶囊网络_胶囊网络 -- Capsule Networks

胶囊网络是 vector in vector out的结构&#xff0c;最后对每个不同的类别&#xff0c;输出不一个向量&#xff0c;向量的模长表示属于该类别的概率。例如&#xff0c;在数字识别中&#xff0c;两个数字虽然重叠在一起&#xff0c;Capsule中的两个向量能完整表达两个数字的特征…

基变换与过渡矩阵

取定线性空间的一组基&#xff0c;任何一组向量可以表示为基向量的线性组合&#xff0c;且是同构映射。两个线性空间是同构。 不同的基向量&#xff0c;基向量之间的过渡矩阵 取线性空间的两组基任一向量可以表示为这两组向量的线性组合将一组基向量表示为另外基向量的线性组合…

bootstrap的滚动监听

<!DOCTYPE html> <html lang"zh-cn"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1,maximum-scale1, user-scalableno"><title>下拉菜单和滚动监…

java构造函数_JAVA的构造函数是怎么写的。万分感谢。路过的请不要嘲笑%_%

展开全部JAVA的构造函数是&#xff1a;SetLocal EnableDelayedExpansionset classpath.for %%c in (lib\*.jar) do set classpath!32313133353236313431303231363533e59b9ee7ad9431333431363030classpath!;%%cset classpath%classpath%;./classes;java com.ham.server.Server。…

在Spring中使用Redis

随着NoSQL解决方案在许多问题上越来越受欢迎&#xff0c;现代项目越来越多地考虑使用一些&#xff08;或几种&#xff09;NoSQL代替&#xff08;或并排&#xff09;传统RDBMS。 我已经在本 &#xff0c; 本和本文章中介绍了我在MongoDB上的经验。 在本文中&#xff0c;我想对Re…

C# 中winform的一些属性设置

1 窗体的大小固定住&#xff0c;不能调整其大小 窗体FormBorderStyle 属性设置为 FixedSingle; MaximizeBox 属性设置为false; MinimizeBox 属性设置为 false; 2. 在状态栏中无图标显示 设置为fase即可。 3. 设置窗体的启动位置 方法1&#xff0c; 用代码控制 this.Location …

LiveBos---按钮成下拉

转载于:https://www.cnblogs.com/luhanzhen/p/6802779.html

Solr:创建拼写检查器

在上一篇文章中&#xff0c;我谈到了Solr Spellchecker的工作原理&#xff0c;然后向您展示了其性能的一些测试结果。 现在&#xff0c;我们将看到另一种拼写检查方法。 与其他方法一样&#xff0c;此方法使用两步过程。 相当快速的“候选单词”选择&#xff0c;然后对这些单词…

linux修改机器名称

1 使用hostname命令&#xff1a;hostname 新机器名称 2 修改vi /etc/sysconfig/network # cat /etc/sysconfig/network NETWORKINGyes HOSTNAMElocalhost.localdomain 注意&#xff1a;左侧都必须大写&#xff0c;等号附件没有空格。 查看机器名称使用hostname命令 转载于:h…

java property_property在Java中的用法

展开全部在项目中经常用到各种配置文件62616964757a686964616fe78988e69d8331333337623561&#xff0c;有.properties的&#xff0c;也有.xml格式的都可以通过java.utils.Property类进行处理。1. 读取.properties文件File pFile new File("test.properties");FileIn…

Django 和 html

下面是对应的形式&#xff0c;自定义的forms 转载于:https://www.cnblogs.com/kilen/p/6804047.html