关于storm的基础,参照我这篇文章:流式计算storm
关于并发和并行,参照我这篇文章:并发和并行
关于storm的并行度解释,参照我这篇文章:storm的并行度解释
关于storm的流分组策略,参照我这篇文章:storm的流分组策略
关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制
storm简介
Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。
storm核心组件
1.Nimbus
相当于storm的master,负责资源分配和任务调度,一个普通的storm集群只有一个nimbus(京东是对nimbus做了集群,加入了选举等概念,防止nimbus突然挂掉)
2.Supervisor
相当于storm的slave,负责接收Nimbus分配的任务,管理和启动所有的Worker
3.Worker
一个Worker就是一个jvm进程,对应一个Topology程序,可以有多个Executor
4.Executor
一个Executor就是一个线程,默认对应一个task,也可以设置成对应多个task
5.Task
一个Task是一个实例(spot/bolt),有多少个task就会new多少个bolt,task是storm中进行计算的最小的运行单位
6.Topology
拓扑结构,一个计算任务场景对应一个拓扑结构,拓扑结构中对声明spout和bolt直接的关系
7.Spout
是拓扑结构中的数据来源,可以向多个bolt发送数据,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源
8.Bolt
真正的数据处理部分,一个bolt可以发给多个bolt,多个bolt也可以发给一个bolt
9.Component
Spout和Bolt都是Component,Storm定义了一个名叫IComponent的总接口
全家谱如下:绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关的
spout和bolt的关系:
整体的topology结构:
storm使用zookeeper来协调集群中的多个节点,但并不是用zookeeper来传递消息
zookeeper可以看这个
Nimbus和Supervisor都是无状态的,他们的心跳都由zookeeper协调
storm优点
1.使用简单,容易上手
2.可扩展,可以调整正在运行的topologies的并行度
3.容错,可靠,当工作节点宕了,storm会尝试重启另一个,而且Nimbus和Supervisors都是无状态的,死掉重启都不影响
4.无数据丢失,Storm的抽象组件确保了数据至少处理一次,即使使用消息队列系统失败时,也能确保消息被处理
5.支持多种编程语言,Storm用Thrift定义和提交topologies.由于Thrift能被任何一种编程语言使用,因此,topologies也能被任何一种编程语言定义和使用。
6.容易部署和操作
7.高性能,低延迟
storm入门案例 ( 实时统计单次个数 )
首先导入maven依赖
<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.4</version></dependency>
1.先写一个Spout,确定数据源,实际应用中一般是接入kafka等消息,入门案例使用随机字符串代替
/*** 向后端发射tuple数据流* @author soul**/
public class SentenceSpout extends BaseRichSpout {//BaseRichSpout是ISpout接口和IComponent接口的简单实现,接口对用不到的方法提供了默认的实现private SpoutOutputCollector collector;private String[] sentences = {"my name is soul","im a boy","i have a dog","my dog has fleas","my girl friend is beautiful"};private int index=0;/*** open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。* open()接受三个参数:一个包含Storm配置的Map,一个TopologyContext对象,提供了topology中组件的信息,SpoutOutputCollector对象提供发射tuple的方法。* 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。*/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;}/*** nextTuple()方法是任何Spout实现的核心。* Storm调用这个方法,向输出的collector发出tuple。* 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。*/@Overridepublic void nextTuple() {//collector.emit(new Values("hello world this is a test"));// TODO Auto-generated method stubthis.collector.emit(new Values(sentences[index]));index++;if (index>=sentences.length) {index=0;}Utils.sleep(1000);}/*** declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("sentence"));//告诉组件发出数据流包含sentence字段}}
2.写第一个bolt,将Spout传过来的Tuple拆成一个个的单次,循环发给下一个bolt
/*** 订阅sentence spout发射的tuple流,实现分割单词* @author soul**/
public class SplitSentenceBolt extends BaseRichBolt {//BaseRichBolt是IComponent和IBolt接口的实现//继承这个类,就不用去实现本例不关心的方法private OutputCollector collector;/*** prepare()方法类似于ISpout 的open()方法。* 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。* 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,* 所以prepare()方法只保存OutputCollector对象的引用。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector=collector;}/*** SplitSentenceBolt核心功能是在类IBolt定义execute()方法,这个方法是IBolt接口中定义。* 每次Bolt从流接收一个订阅的tuple,都会调用这个方法。* 本例中,收到的元组中查找“sentence”的值,* 并将该值拆分成单个的词,然后按单词发出新的tuple。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString sentence = input.getStringByField("sentence");String[] words = sentence.split(" ");for (String word : words) {this.collector.emit(new Values(word));//向下一个bolt发射数据}}/*** plitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)。*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("word"));}}
3.再写一个bolt,一方面接收上个bolt传过来的单次,另一方面将相同单次出现的次数记录下来,并将现在的结果传给下个bolt
/*** 订阅 split sentence bolt的输出流,实现单词计数,并发送当前计数给下一个bolt* @author soul**/
public class WordCountBolt extends BaseRichBolt {private OutputCollector collector;//存储单词和对应的计数private HashMap<String, Long> counts = null;//注:不可序列化对象需在prepare中实例化/*** 大部分实例变量通常是在prepare()中进行实例化,这个设计模式是由topology的部署方式决定的* 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。* 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)* 会抛出NotSerializableException并且拓扑将无法发布。* 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。* 但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化* 而在prepare()方法中对不可序列化的对象进行实例化。*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector;this.counts = new HashMap<String, Long>();}/*** 在execute()方法中,我们查找的收到的单词的计数(如果不存在,初始化为0)* 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。* 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。*/@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = this.counts.get(word);if (count == null) {count = 0L;//如果不存在,初始化为0}count++;//增加计数this.counts.put(word, count);//存储计数this.collector.emit(new Values(word,count));}/****/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//声明一个输出流,其中tuple包括了单词和对应的计数,向后发射//其他bolt可以订阅这个数据流进一步处理declarer.declare(new Fields("word","count"));}}
4.再写一个bolt,接收上个bolt传过来的单次统计结果,在控制台打印.实际最后一步一般会将数据结果存在非关系型数据库中,比如存入HBase或者Redis中
/*** 生成一份报告* @author soul**/
public class ReportBolt extends BaseRichBolt {private HashMap<String, Long> counts = null;//保存单词和对应的计数@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.counts = new HashMap<String, Long>();}@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubString word = input.getStringByField("word");Long count = input.getLongByField("count");this.counts.put(word, count);//实时输出System.out.println("结果:"+this.counts);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub//这里是末端bolt,不需要发射数据流,这里无需定义}/*** cleanup是IBolt接口中定义* Storm在终止一个bolt之前会调用这个方法* 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果* 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接* 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。*/@Overridepublic void cleanup(){System.out.println("---------- FINAL COUNTS -----------");ArrayList<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for(String key : keys){System.out.println(key + " : " + this.counts.get(key));}System.out.println("----------------------------");}}
5.写拓扑结构,将前面四步的Spout和Bolt组成一个拓扑结构,直接运行主方法就能看到结果,这个是Storm的本地模式,将提交的方法稍作修改,就可以变成集群模式,实际都是集群模式,将这些代码打成jar包传到Nimbus上,运行在集群中
/*** 实现单词计数topology**/
public class App
{private static final String SENTENCE_SPOUT_ID = "sentence-spout";private static final String SPLIT_BOLT_ID = "split-bolt";private static final String COUNT_BOLT_ID = "count-bolt";private static final String REPORT_BOLT_ID = "report-bolt";private static final String TOPOLOGY_NAME = "word-count-topology";public static void main( String[] args ) //throws Exception{//System.out.println( "Hello World!" );//实例化spout和boltSentenceSpout spout = new SentenceSpout();SplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();TopologyBuilder builder = new TopologyBuilder();//创建了一个TopologyBuilder实例//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流//builder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个sentence spout//设置两个Executeor(线程),默认一个builder.setSpout(SENTENCE_SPOUT_ID, spout,2);// SentenceSpout --> SplitSentenceBolt//注册一个bolt并订阅sentence发射出的数据流,shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例//builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);//SplitSentenceBolt单词分割器设置4个Task,2个Executeor(线程)builder.setBolt(SPLIT_BOLT_ID, splitBolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);// SplitSentenceBolt --> WordCountBolt//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中//这里fieldsGrouping()方法保证所有“word”字段相同的tuuple会被路由到同一个WordCountBolt实例中//builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));//WordCountBolt单词计数器设置4个Executeor(线程)builder.setBolt(COUNT_BOLT_ID, countBolt,4).fieldsGrouping( SPLIT_BOLT_ID, new Fields("word"));// WordCountBolt --> ReportBolt//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBoltbuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);Config config = new Config();//Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为//设置worker数量//config.setNumWorkers(2);LocalCluster cluster = new LocalCluster();//本地提交cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());Utils.sleep(10000);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();}
}
storm与其他流式计算框架的对比
1.Spark Streaming
在处理前按时间间隔预先将其切分为一段一段的批处理作业.
Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),
一个DStream是一个微批处理(micro-batching)的RDD(弹性分布式数据集),
而RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。
2.Flink
针对流数据和批数据的分布式处理引擎
原生的流处理系统,
其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已
Flink 会把所有任务当成流来处理
3.Storm
原生的流处理系统,可以做到毫秒级处理