流式计算storm核心组件介绍以及入门案例---跟着就能在本地跑起来的storm项目

关于storm的基础,参照我这篇文章:流式计算storm
关于并发和并行,参照我这篇文章:并发和并行
关于storm的并行度解释,参照我这篇文章:storm的并行度解释
关于storm的流分组策略,参照我这篇文章:storm的流分组策略
关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制

storm简介

Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

storm核心组件

1.Nimbus

相当于storm的master,负责资源分配和任务调度,一个普通的storm集群只有一个nimbus(京东是对nimbus做了集群,加入了选举等概念,防止nimbus突然挂掉)

2.Supervisor

相当于storm的slave,负责接收Nimbus分配的任务,管理和启动所有的Worker

3.Worker

一个Worker就是一个jvm进程,对应一个Topology程序,可以有多个Executor

4.Executor

一个Executor就是一个线程,默认对应一个task,也可以设置成对应多个task

5.Task

一个Task是一个实例(spot/bolt),有多少个task就会new多少个bolt,task是storm中进行计算的最小的运行单位

6.Topology

拓扑结构,一个计算任务场景对应一个拓扑结构,拓扑结构中对声明spout和bolt直接的关系

7.Spout

是拓扑结构中的数据来源,可以向多个bolt发送数据,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源

8.Bolt

真正的数据处理部分,一个bolt可以发给多个bolt,多个bolt也可以发给一个bolt

9.Component

Spout和Bolt都是Component,Storm定义了一个名叫IComponent的总接口
全家谱如下:绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的
这里写图片描述

spout和bolt的关系:

这里写图片描述

整体的topology结构:

这里写图片描述

storm使用zookeeper来协调集群中的多个节点,但并不是用zookeeper来传递消息
zookeeper可以看这个
Nimbus和Supervisor都是无状态的,他们的心跳都由zookeeper协调

storm优点

1.使用简单,容易上手
2.可扩展,可以调整正在运行的topologies的并行度
3.容错,可靠,当工作节点宕了,storm会尝试重启另一个,而且Nimbus和Supervisors都是无状态的,死掉重启都不影响
4.无数据丢失,Storm的抽象组件确保了数据至少处理一次,即使使用消息队列系统失败时,也能确保消息被处理
5.支持多种编程语言,Storm用Thrift定义和提交topologies.由于Thrift能被任何一种编程语言使用,因此,topologies也能被任何一种编程语言定义和使用。
6.容易部署和操作
7.高性能,低延迟

storm入门案例 ( 实时统计单次个数 )

首先导入maven依赖

		<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.4</version></dependency>

1.先写一个Spout,确定数据源,实际应用中一般是接入kafka等消息,入门案例使用随机字符串代替

/*** 向后端发射tuple数据流* @author soul**/
public class SentenceSpout extends BaseRichSpout {//BaseRichSpout是ISpout接口和IComponent接口的简单实现,接口对用不到的方法提供了默认的实现private SpoutOutputCollector collector;private String[] sentences = {"my name is soul","im a boy","i have a dog","my dog has fleas","my girl friend is beautiful"};private int index=0;/*** open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。* open()接受三个参数:一个包含Storm配置的Map,一个TopologyContext对象,提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。* 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。*/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;}/*** nextTuple()方法是任何Spout实现的核心。* Storm调用这个方法,向输出的collector发出tuple。* 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。*/@Overridepublic void nextTuple() {//collector.emit(new Values("hello world this is a test"));// TODO Auto-generated method stubthis.collector.emit(new Values(sentences[index]));index++;if (index>=sentences.length) {index=0;}Utils.sleep(1000);}/*** declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("sentence"));//告诉组件发出数据流包含sentence字段}}

2.写第一个bolt,将Spout传过来的Tuple拆成一个个的单次,循环发给下一个bolt

/*** 订阅sentence spout发射的tuple流,实现分割单词* @author soul**/
public class SplitSentenceBolt extends BaseRichBolt {//BaseRichBolt是IComponent和IBolt接口的实现//继承这个类,就不用去实现本例不关心的方法private OutputCollector collector;/*** prepare()方法类似于ISpout 的open()方法。* 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。* 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,* 所以prepare()方法只保存OutputCollector对象的引用。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector=collector;}/*** SplitSentenceBolt核心功能是在类IBolt定义execute()方法,这个方法是IBolt接口中定义。* 每次Bolt从流接收一个订阅的tuple,都会调用这个方法。* 本例中,收到的元组中查找“sentence”的值,* 并将该值拆分成单个的词,然后按单词发出新的tuple。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString sentence = input.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.collector.emit(new Values(word));//向下一个bolt发射数据}}/*** plitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)。*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("word"));}}

3.再写一个bolt,一方面接收上个bolt传过来的单次,另一方面将相同单次出现的次数记录下来,并将现在的结果传给下个bolt

/*** 订阅 split sentence bolt的输出流,实现单词计数,并发送当前计数给下一个bolt* @author soul**/
public class WordCountBolt extends BaseRichBolt {private OutputCollector collector;//存储单词和对应的计数private HashMap<String, Long> counts = null;//注:不可序列化对象需在prepare中实例化/*** 大部分实例变量通常是在prepare()中进行实例化,这个设计模式是由topology的部署方式决定的* 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。* 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)* 会抛出NotSerializableException并且拓扑将无法发布。* 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。* 但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化* 而在prepare()方法中对不可序列化的对象进行实例化。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;this.counts = new HashMap<String, Long>();}/*** 在execute()方法中,我们查找的收到的单词的计数(如果不存在,初始化为0)* 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。* 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = this.counts.get(word);if (count == null) {count = 0L;//如果不存在,初始化为0}count++;//增加计数this.counts.put(word, count);//存储计数this.collector.emit(new Values(word,count));}/****/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//声明一个输出流,其中tuple包括了单词和对应的计数,向后发射//其他bolt可以订阅这个数据流进一步处理declarer.declare(new Fields("word","count"));}}

4.再写一个bolt,接收上个bolt传过来的单次统计结果,在控制台打印.实际最后一步一般会将数据结果存在非关系型数据库中,比如存入HBase或者Redis中

/*** 生成一份报告* @author soul**/
public class ReportBolt extends BaseRichBolt {private HashMap<String, Long> counts = null;//保存单词和对应的计数@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = input.getLongByField("count");this.counts.put(word, count);//实时输出System.out.println("结果:"+this.counts);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//这里是末端bolt,不需要发射数据流,这里无需定义}/*** cleanup是IBolt接口中定义* Storm在终止一个bolt之前会调用这个方法* 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果* 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接* 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。*/@Overridepublic void cleanup(){System.out.println("---------- FINAL COUNTS -----------");ArrayList<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for(String key : keys){System.out.println(key + " : " + this.counts.get(key));}System.out.println("----------------------------");}}

5.写拓扑结构,将前面四步的Spout和Bolt组成一个拓扑结构,直接运行主方法就能看到结果,这个是Storm的本地模式,将提交的方法稍作修改,就可以变成集群模式,实际都是集群模式,将这些代码打成jar包传到Nimbus上,运行在集群中

/*** 实现单词计数topology**/
public class App
{private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String REPORT_BOLT_ID = "report-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main( String[] args ) //throws Exception{//System.out.println( "Hello World!" );//实例化spout和boltSentenceSpout spout = new SentenceSpout();SplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();TopologyBuilder builder = new TopologyBuilder();//创建了一个TopologyBuilder实例//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout//设置两个Executeor(线程),默认一个builder.setSpout(SENTENCE_SPOUT_ID, spout,2);// SentenceSpout --> SplitSentenceBolt//注册一个bolt并订阅sentence发射出的数据流,shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);//SplitSentenceBolt单词分割器设置4个Task,2个Executeor(线程)builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);// SplitSentenceBolt --> WordCountBolt//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中//这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));//WordCountBolt单词计数器设置4个Executeor(线程)builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));// WordCountBolt --> ReportBolt//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBoltbuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);Config config = new Config();//Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为//设置worker数量//config.setNumWorkers(2);LocalCluster cluster = new LocalCluster();//本地提交cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());Utils.sleep(10000);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();}
}

storm与其他流式计算框架的对比

1.Spark Streaming
在处理前按时间间隔预先将其切分为一段一段的批处理作业.
Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),
一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集),
而RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。

2.Flink
针对流数据和批数据的分布式处理引擎
原生的流处理系统,
其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已
Flink 会把所有任务当成流来处理

3.Storm
原生的流处理系统,可以做到毫秒级处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/499998.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx使用gzip压缩文件---lz77算法---Haffman编码

为了提高页面的响应速度,可以从设置 nginx 的 gzip 和缓存这2方面入手,而为ttf,js,css等文件开启 gzip 和缓存能大大减少带宽的消耗. HTTP 的内容编码机制 Accept-Encoding 和 Content-Encoding 是 HTTP 中用来对[采用何种编码格式传输正文]进行协定的一对头部字段. 它的工作…

Javascript模块化编程

随着网站逐渐变成"互联网应用程序"&#xff0c;嵌入网页的Javascript代码越来越庞大&#xff0c;越来越复杂。 网页越来越像桌面程序&#xff0c;需要一个团队分工协作、进度管理、单元测试等等......开发者不得不使用软件工程的方法&#xff0c;管理网页的业务逻辑。…

zookeeper基础整理

zookeeper简述 ZooKeeper是一个分布式的&#xff0c;开放源码的分布式应用程序协调服务&#xff0c;是Google的Chubby一个开源的实现&#xff0c;是Hadoop和Hbase的重要组件 ZooKeeper 使用 Java 所编写&#xff0c;但是支持 Java 和 C 两种编程语言。 提供的功能包括&#xf…

JS模块化编程require.js简介

一、为什么要用require.js&#xff1f; 最早的时候&#xff0c;所有Javascript代码都写在一个文件里面&#xff0c;只要加载这一个文件就够了。后来&#xff0c;代码越来越多&#xff0c;一个文件不够了&#xff0c;必须分成多个文件&#xff0c;依次加载。下面的网页代码&…

CSS position属性

目前几乎所有主流的浏览器都支持position属性&#xff08;"inherit"除外&#xff0c;"inherit"不支持所有包括IE8和之前版本IE浏览器&#xff0c;IE9、IE10还没测试过&#xff09;&#xff0c;以下是w3school对position五个值的解释&#xff1a; 其中absol…

storm的并行度的解释--- ( 看完就能理解 )

关于storm的基础,参照我这篇文章:流式计算storm 关于并发和并行,参照我这篇文章:并发和并行 关于storm的并行度解释,参照我这篇文章:storm的并行度解释 关于storm的流分组策略,参照我这篇文章:storm的流分组策略 关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机…

JS--Console.log()详解

对于JavaScript程序的调试&#xff0c;相比于alert()&#xff0c;使用console.log()是一种更好的方式&#xff0c;原因在于&#xff1a;alert()函数会阻断JavaScript程序的执行&#xff0c;从而造成副作用&#xff1b;而console.log()仅在控制台中打印相关信息&#xff0c;因此…

订单单量监控v2

前段时间做了一个订单单量监控的项目,已经投入使用了,现在总结一下 前期的想法参考这篇文章 整体使用了storm实时计算框架和redis数据库,还有kafka消息队列 先上效果图,我们可以后期将数据展示出来,明显发现某天00点有单量突变的情况,明显是促销活动导致单量增加了 而后面的报…

iOS中的MVC设计模式

一、MVC概述模型&#xff0d;视图&#xff0d;控制器&#xff08;MVC&#xff09;是Xerox PARC在二十世纪八十年代为编程语言Smalltalk&#xff0d;80发明的一种软件设计模式&#xff0c;已被广泛使用。后来被推荐为Oracle旗下Sun公司Java EE平台的设计模式&#xff0c;并且受到…

iOS-MVVM-模式介绍

一、MVVM概述 MVVM 到底是什么&#xff1f;我们首先看一下MVC架构&#xff1a;我们看到的是一个典型的 MVC 设置。Model 呈现数据&#xff0c;View 呈现用户界面&#xff0c;而 View Controller 调节它两者之间的交互。Cool&#xff01;稍微考虑一下&#xff0c;虽然 View 和 …

[数据库]---mysql数据库 使用binlog+canal或binlake进行数据库的复制

前言 在进行冷热分离的时候&#xff0c;需要将数据实时的复制在历史数据库中&#xff0c;我们使用的是binlogcanal的思想,将每次数据库数据的变更转换成消息发出来,然后再操作这些消息达到数据复制的 在京东,实现同样功能的组件&#xff0c;叫binlake 接下来详细说下: 1.Binl…

MAC下配置ZSH

MAC下面的终端是神器。而且苹果非常贴心的为我们准备好了ZSH。 可惜ZSH不是很好用&#xff0c;需要配合一些插件和模板&#xff1a;oh-my-zsh将bash切换为zsh chsh -s /bin/zsh其实还可以用which来定位&#xff08;特别是ubuntu的童鞋&#xff09; chsh -s which zsh 直接用zsh…

MAC下使用OpenSSL生成私钥和公钥

MAC OS自带了OpenSSL,直接在命令行里使用OPENSSL就可以。打开命令行工具&#xff0c;然后输入 openssl打开openssl&#xff0c;接着只要三句命令就可以搞定。1、打开Terminal--cd 到指定文件夹&#xff0c;如桌面Mac:~/Desktop $ openssl2、第一句命令&#xff1a;生成私钥&…

idea插件开发(01)---最简单的helloworld版,不需要知道原理,先跟我做一个最简单的弹框插件

前言 用了那么多idea插件,也想自己做一个插件,下面就是入门版本 你不需要先知道所有的概念,先跟着我的步骤做一个小;例子,后面再说原理 相关概念看后面一篇 本次以windos系统为例 开始 1.你得安装一个环境,供idea插件的开发使用 下载地址: https://www.jetbrains.com/idea/…

苹果封装的对称加密和非对称加密API

一、信息摘要算法5&#xff1a;MD51.系统库位置&#xff1a;<CommonCrypto/CommonHMAC.h>。2.非加密算法&#xff0c;属于哈希散列&#xff0c;不可逆&#xff0c;用于检验数据完整性。二、安全散列(哈希)算法SHA&#xff1a; 1.包含的散列算法&#xff1a;SHA-1&#xf…

ECC椭圆曲线加密算法原理

比特币使用椭圆曲线算法生成公钥和私钥&#xff0c;选择的是secp256k1曲线。与RSA&#xff08;Ron Rivest&#xff0c;Adi Shamir&#xff0c;Len Adleman三位天才的名字&#xff09;一样&#xff0c;ECC&#xff08;Elliptic Curves Cryptography&#xff0c;椭圆曲线加密&…

AES加密算法原理

一、摘要 AES&#xff08;The Advanced Encryption Standard&#xff09;是美国国家标准与技术研究所用于加密电子数据的规范&#xff0c;在2002年5月26日建立。它被预期能成为人们公认的加密包括金融、电信和政府数字信息的方法。AES 是一个新的可以用于保护电子数据的加密算法…

Base64编码解码原理

一. Base64编码由来 为什么会有Base64编码呢&#xff1f;因为有些网络传送渠道并不支持所有的字节&#xff0c;例如传统的邮件只支持可见字符的传送&#xff0c;像ASCII码的控制字符就不能通过邮件传送。这样用途就受到了很大的限制&#xff0c;比如图片二进制流的每个字节不可…

eclipse mat 打开dump文件,明明大小1G,打开后却只有不到100M.其他的去哪了

eclipse mat 打开dump文件,明明大小1G,打开后却只有不到100M.其他的去哪了 Used heap dump 显示的大小远小于dump文件大小 解决: window-->Preferences-->memory Analuzer-->勾选keep unreachable objects-->apply and close

MVP模式介绍

一、概述MVP 是从经典的模式MVC演变而来&#xff0c;它们的基本思想有相通的地方&#xff1a;Controller/Presenter负责逻辑的处理&#xff0c;Model提供数据&#xff0c;View负责显示。二、MVC和MVP的区别作为一种新的模式&#xff0c;MVP与MVC有着一个重大的区别&#xff1a;…