Storm的BaseBasicBolt源码解析ack机制

我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

那么我们来看看BasicBolt的源码是不是这样的,不能因为看到别人的帖子说是这样的,我们就这样任务,以讹传讹,我们要To see is to believe。

 

为了方便看源代码,我先上我们的继承类:

public class SplitSentenceBolt extends BaseBasicBolt {  public void prepare(Map stormConf, TopologyContext context) {super.prepare(stormConf, context);}//5:执行我们自己的逻辑处理方法,接收传入的参数。public void execute(Tuple input, BasicOutputCollector collector) {String sentence = (String)input.getValueByField("sentence");String[] words = sentence.split(" ");for (String word : words) {word = word.trim();word = word.toLowerCase();collector.emit(new Values(word,1));//这个地方就是调用OutputCollector的包装类,来发消息}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));}
}

通过打断点,我们发现,bolt的task会创建这个类下面会标准执行顺序

public class BasicBoltExecutor implements IRichBolt {public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    private IBasicBolt _bolt;private transient BasicOutputCollector _collector;//1:创建该对象,然后把我们写的SplitSentenceBolt对象赋给父类IBasicBolt。public BasicBoltExecutor(IBasicBolt bolt) {_bolt = bolt;}public void declareOutputFields(OutputFieldsDeclarer declarer) {_bolt.declareOutputFields(declarer);//这里就是调用SplitSentenceBolt对象的方法了。}//2:给BasicOutputCollector _collector字段赋值,BasicOutputCollector就是对OutputCollector类的包装。public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {_bolt.prepare(stormConf, context);_collector = new BasicOutputCollector(collector);}//3:然后程序执行该方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]public void execute(Tuple input) {_collector.setContext(input);//把接收到的tuple值设置给BasicOutputCollector中inputTuple字段。try {_bolt.execute(input, _collector);//这个地方是调用我们实现类SplitSentenceBolt的ececute方法。_collector.getOutputter().ack(input);//这个地方就是响应} catch(FailedException e) {if(e instanceof ReportedFailedException) {_collector.reportError(e);}_collector.getOutputter().fail(input);//这个地方就是响应}}public void cleanup() {_bolt.cleanup();}public Map<String, Object> getComponentConfiguration() {return _bolt.getComponentConfiguration();}
}
public class BasicOutputCollector implements IBasicOutputCollector {private OutputCollector out;private Tuple inputTuple;public BasicOutputCollector(OutputCollector out) {this.out = out;}//4:把收到的tuple数据赋值给inputTuple,这个时候BasicOutputCollector对象的字段都具有值了。public void setContext(Tuple inputTuple) {this.inputTuple = inputTuple;}//6:这里我们发送新的(转换后的)tuple数据,看他内部的调用,其实他也会发送一个anchor tuple来保持tracker链路,
而这个anchor tuple就是bolt接收到转换前的源tuple数据。public List<Integer> emit(List<Object> tuple) { return emit(Utils.DEFAULT_STREAM_ID, tuple);  }public List<Integer> emit(String streamId, List<Object> tuple) {return out.emit(streamId, inputTuple, tuple);}public void emitDirect(int taskId, String streamId, List<Object> tuple) {out.emitDirect(taskId, streamId, inputTuple, tuple);}public void emitDirect(int taskId, List<Object> tuple) {emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);}protected IOutputCollector getOutputter() {return out;}public void reportError(Throwable t) {out.reportError(t);}
}

 这里大家不要纠结bolt的启动时从哪里开始的,我后面会讲的,这里我们关注的是,BasicBoltExecutor对象创建后的执行过程,以这我们来看执行的过程。在BasicBoltExecutor的execute方法中,我们看到了ack和fail方法会被自动调用的,当我们的程序抛出异常则会执行fail方法的。

转发:http://www.cnblogs.com/intsmaze/p/5924873.html

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423456.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

博客中gitalk最新评论的获取 github api使用

博客中&#xff0c;对于网友的评论以及每篇文章的评论数还是很重要的。但是基于静态的页面想要存储动态的评论数据是比较难的&#xff0c;一般博客主题中都内置了评论插件&#xff0c;但是博客主题中对于最新评论的支持显示还是很少的&#xff0c;至少目前我是没怎么发现。博客…

前端学习(11):标题和段落

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>标题和段落</title> </head> <body><h1>我是歌谣</h1><h2>我是歌谣</h2><h3>我是歌谣</h3>&l…

Kafka文件存储机制

Kafka是什么 Kafka是最初由Linkedin公司开发&#xff0c;是一个分布式、分区的、多副本的、多订阅者&#xff0c;基于zookeeper协调的分布式日志系统(也可以当做MQ系统)&#xff0c;常见可以用于web/nginx日志、访问日志&#xff0c;消息服务等等&#xff0c;Linkedin于2010年贡…

用Zend Encoder加密PHP文件和PHP 优化配置

在发布一个你写好的PHP程序时&#xff0c;你是不是担心自已辛苦写出来的成果会被别人占为已有呢&#xff1f;其实我们可以用Zend Encoder为我们的PHP文件加上一层保护壳。 软件版本&#xff1a;2.0.1软件大小&#xff1a;10.2M适用平台&#xff1a;Win9X/2000/XP官方网址&#…

【vue开发】vue插件的install方法

MyPlugin.install function (Vue, options) {// 1. 添加全局方法或属性Vue.myGlobalMethod function () {// 逻辑...}// 2. 添加全局资源Vue.directive(my-directive, {bind (el, binding, vnode, oldVnode) {// 逻辑...}...})// 3. 注入组件Vue.mixin({created: function ()…

笑话

有一次坐公交拿了IC卡排队上车&#xff0c;前面一个人是扔硬币的&#xff0c;我大脑短路跟着把IC卡扔进去了…… ●早上要戴隐形眼镜&#xff0c;结果把盖打开直接把眼镜倒马桶里,然后镇定地倒入新的护理液,准备摘眼镜&#xff0c;半天摘不下来。 ●邻居忘了带钥匙&#xff0c;…

kafkaspot在ack机制下如何保证内存不溢

storm框架中的kafkaspout类实现的是BaseRichSpout&#xff0c;它里面已经重写了fail和ack方法&#xff0c;所以我们的bolt必须实现ack机制&#xff0c;就可以保证消息的重新发送&#xff1b;如果不实现ack机制&#xff0c;那么kafkaspout就无法得到消息的处理响应&#xff0c;就…

云开发0基础训练营第二期热力来袭!

第二期云开发0基础训练营热力来袭&#xff01;课程升级、更佳体验、依旧免费&#xff01;每年的 “金九银十” 都是传说中的学习黄金期&#xff01;这期间在校的小伙伴面临开学季/求职季/考研季挑战&#xff0c;已经步入社会的也即将步入年终前的冲刺阶段。所以&#xff0c;这段…

前端学习(14):相对路径和绝对路径

目录结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible" conte…

discuz数据从godaddy主机中导出的mysql数据乱码变问号???的解决方法

从godaddy主机导出的mysql数据安装在本地电脑上发现原来的中文都变成了问号&#xff1f;godaddy主机中的数据库版本是5.0.67&#xff0c;charsetutf8 collationutf8_general_ci 而我的supesite的版本是gbk的&#xff0c;导出的数据也是 CREATE TABLE cdb_access (……) ENGINE…

jvm的新生代和老年代简介

新生代分为三个区域&#xff0c;一个Eden区和两个Survivor区&#xff0c;它们之间的比例为&#xff08;8&#xff1a;1&#xff1a;1&#xff09;&#xff0c;这个比例也是可以修改的。通常情况下&#xff0c;对象主要分配在新生代的Eden区上&#xff0c;少数情况下也可能会直接…

Python 连接redis密码中特殊字符问题

连接方法&#xff1a; self.pool redis.ConnectionPool.from_url(self.redis_url)opredis redis.Redis(connection_poolself.pool)redis_url redis://:cot$#D4^&1234172.31.26.174:6379/0 直接连redis会报错&#xff0c;报错主要内容&#xff1a; ValueError: invalid l…

浅谈yield

c#1.0使用foreach 语句可以轻松地迭代集合。在c#1.0中&#xff0c;创建枚举器仍需要做大量的工作。c#2.0添加了yield语句&#xff0c;以便于创建枚举器。下面我们浅谈下yield的使用&#xff1a; 1、包含yield语句的方法或属性称为迭代块。迭代块必须声明为返回IEnumerator或IEn…

NIO核心框架介绍

NIO共引入了4个概念&#xff1a; - 缓存区&#xff1a;表示数据存放的容器&#xff0c;提供可读写的数据缓存区&#xff1b; - 字符集&#xff1a;用来对缓存数据进行解码和编码&#xff0c;在字节和Unicode字符之间转换&#xff1b; - 通道&#xff1a;用来接收或发送数据&…

前端学习(16):跳转链接小练习

点击图片实现跳转 目录结构 header.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv&q…

云开发的数据库权限机制解读丨云开发101

在使用云开发进行开发时&#xff0c;数据库权限是一个让不少人困扰的部分&#xff0c;四种数据库权限&#xff0c;到底是什么意思&#xff1f;其各自的权限、应用场景都是什么&#xff1f;大多数人对于这个机制&#xff0c;还是模糊的。为了帮助大家进行更好的开发&#xff0c;…

jremind V0.1.3.0添加透明

这次就把弹出消息的窗口改成了透明&#xff0c;本来说是做鼠标穿透的&#xff0c;给做成了透明&#xff0c;先用着&#xff01;有时间了再改了。 转载于:https://www.cnblogs.com/joypen/archive/2009/05/11/1693125.html

Java中的NIO非阻塞编程

在JDK1.4以前&#xff0c;Java的IO操作集中在java.io这个包中&#xff0c;是基于流的阻塞API。对于大多数应用来说&#xff0c;这样的API使用很方便&#xff0c;然而&#xff0c;一些对性能要求较高的应用&#xff0c;尤其是服务端应用&#xff0c;往往需要一个更为有效的方式来…

Nginx----基础

静态资源服务 通过本地文件系统提供服务&#xff1a;对css&#xff0c;js文件&#xff0c;图片等静态文件 反向代理服务 缓存&#xff1a;将一些数据经常不变的&#xff0c;缓存到Nginx中&#xff0c;直接给用户提供服务 负载均衡 api服务 OpenResty 数据库的服务比较简单&…

c#开发IE控件

c#开发IE控件主要是对BHO对象是使用,但是我们知道BHO是一个COM对象,而在.NET下开发基于COM的应用,总觉得不是很简单,这里有受控代码与COM的调用,我查找了下,国内并没有此类信息,下面是译稿,翻译的不好,欢迎指出.介绍:我们在浏览Internet信息的时候,往往需要增强用户浏览信息的,…