Storm教程3编程接口


Spouts

Spout是Stream的消息产生源,Spout组件的实现可以通过继承BaseRichSpout类或者其他Spout类来完成,也可以通过实现IRichSpout接口来实现。



需要根据情况实现Spout类中重要的几个方法有:

open方法

当一个Task被初始化的时候会调用此open方法。

一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。


示例如下:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this._collector = collector; 
}


getComponentConfiguration方法

此方法用于声明针对当前组件的特殊的Configuration配置。
示例如下:

public Map<String, Object> getComponentConfiguration() { if(!_isDistributed) {Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); return ret;10 11 } else {return null;}
}
这里便是设置了Topology中当前Component的线程数量上限。

nextTuple方法

这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法来实现的。
示例如下:

public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"twitter","facebook","google"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)];collector.emit(new Values(word));
}
 这里便是从一个数组中随机选取一个单词作为Tuple,然后通过collector发送到Topology。


declareOutputFields方法

此方法用于声明当前Spout的Tuple发送流。
Stream流的定义是通过OutputFieldsDeclare.declare方法完成的,其中的参数包括了发送的域Fields。
示例如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));
}

另外,除了上述几个方法之外,还有ack、fail和close方法等;
Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法;
这两个方法在BaseRichSpout等类中已经被隐式的实现了。


——————————————————————————————————————————————


Bolts

Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。

Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。



Bolt类需要实现的主要方法有:

prepare方法

此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。

Bolt中Tuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是些在execute中。

示例如下:

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this._collector = collector;
}


getComponentConfiguration方法

和Spout类一样,在Bolt中也可以有getComponentConfiguration方法。
 示例如下:

public Map<String, Object> getComponentConfiguration() {Map<String, Object> conf = new HashMap<String, Object>();conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);return conf;
} 
 此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用这个系统的组件的特性来完成。


execute方法

这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。

具体的发送也是在execute中通过调用emit方法来完成的。

有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。

(1)emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。

(2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。


declareOutputFields方法

用于声明当前Bolt发送的Tuple中包含的字段,和Spout中类似。
示例如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("obj","count","actualWindowLengthInSeconds"));
} 
此例说明当前Bolt类发送的Tuple包含了三个字段:"obj", "count", "actualWindowLengthInSeconds"。

——————————————————————————————————————————————


topology



——————————————————————————————————————————————


Topology运行机制

(1)Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件;


(2)在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm本身决定的;


(3)任务分配好之后,Nimbes节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息;


(4)Supervisor节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行;


(5)一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。


最后一步会不间断的执行,除非手动结束Topology。


Topology中的Stream处理时的方法调用过程如下:




有几点需要说明的地方:

(1)每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。

(2)open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,是负责运行组件中的task的线程的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。

(3)nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产生无界的Tuple流,体现实时性。相当于线程的run方法。

(4)在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(即Supervisor节 点),在每一个任务上反序列化component。

(5)Spout和Bolt之间、Bolt和Bolt之间的通信,是通过zeroMQ的消息队列实现的。

(6)上图没有列出ack方法和fail方法,在一个Tuple被成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理这个Tuple。


终止Topology

通过在Nimbus节点利用如下命令来终止一个Topology的运行:

bin/storm kill topologyName


kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387278.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

梳理操作系统概论

1、用一张图总结操作系统的结构、功能特征、采用的技术和提供服务方式等。 2、用一张图描述CPU的工作原理。 3、用一张图描述系统程序与应用程序、特权指令与非特权指令、CPU状态、PSW及中断是如何协同工作的&#xff1f; 转载于:https://www.cnblogs.com/ljgljg/p/10503190.ht…

位置指纹法的实现(KNN)

基本原理 位置指纹法可以看作是分类或回归问题&#xff08;特征是RSS向量&#xff0c;标签是位置&#xff09;&#xff0c;监督式机器学习方法可以从数据中训练出一个从特征到标签的映射关系模型。kNN是一种很简单的监督式机器学习算法&#xff0c;可以用来做分类或回归。 对于…

室内定位系列 ——WiFi位置指纹(译)

摘要 GPS难以解决室内环境下的一些定位问题&#xff0c;大部分室内环境下都存在WiFi&#xff0c;因此利用WiFi进行定位无需额外部署硬件设备&#xff0c;是一个非常节省成本的方法。然而WiFi并不是专门为定位而设计的&#xff0c;传统的基于时间和角度的定位方法并不适用于WiFi…

机器学习02线性回归、多项式回归、正规方程

单变量线性回归&#xff08;Linear Regression with One Variable&#xff09; 预测器表达式&#xff1a; 选择合适的参数&#xff08;parameters&#xff09;θ0 和 θ1&#xff0c;其决定了直线相对于训练集的准确程度。 建模误差&#xff08;modeling error&#xff09;&a…

机器学习03Logistic回归

逻辑回归 &#xff08;Logistic Regression&#xff09; 目前最流行&#xff0c;使用最广泛的一种学习算法。 分类问题&#xff0c;要预测的变量 y 是离散的值。 逻辑回归算法的性质是&#xff1a;它的输出值永远在 0 到 1 之间。 逻辑回归模型的假设是&#xff1a; 其中&a…

CNN理解比较好的文章

什么是卷积神经网络&#xff1f;为什么它们很重要&#xff1f; 卷积神经网络&#xff08;ConvNets 或者 CNNs&#xff09;属于神经网络的范畴&#xff0c;已经在诸如图像识别和分类的领域证明了其高效的能力。卷积神经网络可以成功识别人脸、物体和交通信号&#xff0c;从而为机…

Windows 安装Angular CLI

1、安装nvm npm cnpm nrm&#xff08;onenote笔记上有记录&#xff09; 参考&#xff1a;https://blog.csdn.net/tyro_java/article/details/51232458 提示&#xff1a;如果发现配置完后&#xff0c;出现类似“npm不是内部命令……”等信息。 可采取如下措施进行解决—— 检查环…

机器学习04正则化

正则化&#xff08;Regularization&#xff09; 过拟合问题&#xff08;Overfitting&#xff09;&#xff1a; 如果有非常多的特征&#xff0c;通过学习得到的假设可能能够非常好地适应训练集 &#xff1a;代价函数可能几乎为 0&#xff09;&#xff0c; 但是可能会不能推广到…

Adaboost算法

概述 一句话概述Adaboost算法的话就是&#xff1a;把多个简单的分类器结合起来形成个复杂的分类器。也就是“三个臭皮匠顶一个诸葛亮”的道理。 可能仅看上面这句话还没什么概念&#xff0c;那下面我引用个例子。 如下图所示&#xff1a; 在D1这个数据集中有两类数据“”和“-”…

机器学习05神经网络--表示

神经网络&#xff1a;表示&#xff08;Neural Networks: Representation&#xff09; 如今的神经网络对于许多应用来说是最先进的技术。 对于现代机器学习应用&#xff0c;它是最有效的技术方法。 神经网络模型是许多逻辑单元按照不同层级组织起来的网络&#xff0c; 每一层…

逻辑回归(Logistic Regression, LR)又称为逻辑回归分析,是分类和预测算法中的一种。通过历史数据的表现对未来结果发生的概率进行预测。例如,我们可以将购买的概率设置为因变量,将用户的

逻辑回归(Logistic Regression, LR)又称为逻辑回归分析&#xff0c;是分类和预测算法中的一种。通过历史数据的表现对未来结果发生的概率进行预测。例如&#xff0c;我们可以将购买的概率设置为因变量&#xff0c;将用户的特征属性&#xff0c;例如性别&#xff0c;年龄&#x…

机器学习06神经网络--学习

代价函数 标记方法&#xff1a; 神经网络的训练样本有 m 个 每个包含一组输入 x 和一组输出信号 y L 表示神经网络层数 Sl表示每层的 neuron 个数(SL 表示输出层神经元个数) 将神经网络的分类定义为两种情况&#xff1a; 二类分类&#xff1a;SL1, y0 or 1 表示哪一类&…

Logistic Regression Classifier逻辑回归

Logistic Regression Classifier逻辑回归主要思想就是用最大似然概率方法构建出方程&#xff0c;为最大化方程&#xff0c;利用牛顿梯度上升求解方程参数。 优点&#xff1a;计算代价不高&#xff0c;易于理解和实现。缺点&#xff1a;容易欠拟合&#xff0c;分类精度可能不高…

机器学习07应用机器学习的建议

决定下一步做什么&#xff08;Deciding What to Try Next&#xff09; 确保在设计机器学习系统时&#xff0c;能够选择一条最合适、最正确的道路。 具体来讲&#xff0c;将重点关注的问题是&#xff1a;假如你在开发一个机器学习系统&#xff0c;或者想试着改进一个机器学习…

CSS3--5.颜色属性

HTML5中添加了一些新的颜色的表示方式 1.RGBA&#xff1a;说得简单一点就是在RGB的基础上加进了一个通道Alpha。RGBA在RGB的基础上多了控制alpha透明度的参数。以上R、G、B三个参数&#xff0c;正整数值的取值范围为&#xff1a;0 - 255。百分数值的取值范围为&#xff1a;0.0%…

逻辑回归的通俗解释 逻辑回归的定位

1 逻辑回归的定位 首先&#xff0c;逻辑回归是一种分类&#xff08;Classification&#xff09;算法。比如说&#xff1a; 给定一封邮件&#xff0c;判断是不是垃圾邮件给出一个交易明细数据&#xff0c;判断这个交易是否是欺诈交易给出一个肿瘤检查的结果数据&#xff0c;判断…

机器学习08机器学习系统设计

首先要做什么 一个垃圾邮件分类器算法为例&#xff1a; 为了解决这样一个问题&#xff0c;首先要做的决定是如何选择并表达特征向量 x。 可以选择一个由 100 个最常出现在垃圾邮件中的词所构成的列表&#xff0c;根据这些词是否有在邮件中 出现&#xff0c;来获得我们的特…

数学笔记1——导数1(导数的基本概念)

什么是导数导数是高数中的重要概念&#xff0c;被应用于多种学科。从物理意义上讲&#xff0c;导数就是求解变化率的问题&#xff1b;从几何意义上讲&#xff0c;导数就是求函数在某一点上的切线的斜率。我们熟知的速度公式&#xff1a;v s/t&#xff0c;这求解的是平均速度&a…

python接口自动化(四)--接口测试工具介绍(详解)

简介 “工欲善其事必先利其器”&#xff0c;通过前边几篇文章的介绍&#xff0c;大家大致对接口有了进一步的认识。那么接下来让我们看看接口测试的工具有哪些。 目前&#xff0c;市场上有很多支持接口测试的工具。利用工具进行接口测试&#xff0c;能够提供测试效率。例如&…

机器学习09支持向量机

支持向量机(Support Vector Machines) 在监督学习中&#xff0c;许多学习算法的性能都非常类似&#xff0c;因此&#xff0c;重要的不是你该选择使用学习算法 A 还是学习算法 B&#xff0c;而更重要的是&#xff0c; 应用这些算法时&#xff0c;所创建的大量数据在应用这些算…