Storm的ack机制在项目应用中的坑

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了。好吧,那就让我开始啪啪打你们脸吧。

先说一下ACK机制:

  为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪。

  这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法;

  如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调用spout的fail方法;

  在处理tuple的每一个bolt都会通过OutputCollector来告知storm, 当前bolt处理是否成功。

  另外需要注意的,当spout触发fail动作时,不会自动重发失败的tuple,需要我们在spout中重新获取发送失败数据,手动重新再发送一次。

Ack原理
  Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树(因为一个tuple通过spout发出了,经过每一个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。
Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。
Acker跟踪算法的原理:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理,如果为0则认为已完全处理。

要实现ack机制:
1,spout发射tuple的时候指定messageId
2,spout要重写BaseRichSpout的fail和ack方法
3,spout对发射的tuple进行缓存(否则spout的fail方法收到acker发来的messsageId,spout也无法获取到发送失败的数据进行重发),看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg貌似需要自己cache,然后用这个msgId去查询,太坑爹了
3,spout根据messageId对于ack的tuple则从缓存队列中删除,对于fail的tuple可以选择重发。
4,设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);

Storm的Bolt有BsicBolt和RichBolt:
  在BasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
  使用RichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

由一个tuple产生一个新的tuple称为:anchoring,你发射一个tuple的同时也就完成了一次anchoring。

  ack机制即,spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理;在规定的时间内(默认是30秒),没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作

  注意,我开始以为如果继承BaseBasicBolt那么程序抛出异常,也会让spout进行重发,但是我错了,程序直接异常停止了

  这里我以分布式程序入门案例worldcount为例子吧。请看下面大屏幕:没有错我就是那个你们走在路上经常听见的名字刘洋。

  这里spout1-1task发送句子"i am liu yang"给bolt2-2task进行处理,该task把句子切分为单词,根据字段分发到下一个bolt中,bolt2-2,bolt4-4,bolt5-5对每一个单词添加一个后缀1后再发送给下一个bolt进行存储到数据库的操作,这个时候bolt7-7task在存储数据到数据库时失败,向spout发送fail响应,这个时候spout收到消息就会再次发送的该数据。

  好,那么我思考一个问题:spout如何保证再次发送的数据就是之前失败的数据,所以在spout实例中,绝对要定义一个map缓存,缓存发出去的每一条数据,key当然就是messageId,当spout实例收到所有bolt的响应后如果是ack,就会调用我们重写的ack方法,在这个方法里面我们就要根据messageId删除这个key-value,如果spout实例收到所有bolt响应后,发现是faile,则会调用我们重写的fail方法,根据messageId查询到对应的数据再次发送该数据出去。

spout代码如下

public class MySpout extends BaseRichSpout {private static final long serialVersionUID = 5028304756439810609L;// key:messageId,Dataprivate HashMap<String, String> waitAck = new HashMap<String, String>();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}public void nextTuple() {String sentence = "i am liu yang";String messageId = UUID.randomUUID().toString().replaceAll("-", "");waitAck.put(messageId, sentence);//指定messageId,开启ackfail机制collector.emit(new Values(sentence), messageId);}@Overridepublic void ack(Object msgId) {System.out.println("消息处理成功:" + msgId);System.out.println("删除缓存中的数据...");waitAck.remove(msgId);}@Overridepublic void fail(Object msgId) {System.out.println("消息处理失败:" + msgId);System.out.println("重新发送失败的信息...");//重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的。collector.emit(new Values(waitAck.get(msgId)),msgId);}
}

虽然在storm项目中我们的spout源通常来源kafka,而且我们使用storm提供的工具类KafkaSpout类,其实这个类里面就维护者<messageId,Tuple>对的集合。

Storm怎么处理重复的tuple?

  因为Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并重新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:

(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后续的批处理计算会更正实时计算的误差。

(2)使用第三方集中存储来过滤,比如利用mysql,memcached或者redis根据逻辑主键来去重。

(3)使用bloom filter做过滤,简单高效。

问题一:你们有没有想过如果某一个task节点处理的tuple一直失败,消息一直重发会怎么样?

  我们都知道,spout作为消息的发送源,在没有收到该tuple来至左右bolt的返回信息前,是不会删除的,那么如果消息一直失败,就会导致spout节点存储的tuple数据越来越多,导致内存溢出。

问题二:有没有想过,如果该tuple的众多子tuple中,某一个子tuple处理failed了,但是另外的子tuple仍然会继续执行,如果子tuple都是执行数据存储操作,那么就算整个消息失败,那些生成的子tuple还是会成功执行而不会回滚的。

  这个时候storm的原生api是无法支持这种事务性操作,我们可以使用storm提供的高级api-trident来做到(具体如何我不清楚,目前没有研究它,但是我可以它内部一定是根据分布式协议比如两阶段提交协议等)。向这种业务中要保证事务性功能,我们完全可以根据我们自身的业务来做到,比如这里的入库操作,我们先记录该消息是否已经入库的状态,再入库时查询状态来决定是否给予执行。

问题三:tuple的追踪并不一定要是从spout结点到最后一个bolt,只要是spout开始,可以在任意层次bolt停止追踪做出应答。

Acker task 组件来设置一个topology里面的acker的数量,默认值是一,如果你的topoogy里面的tuple比较多的话,那么请把acker的数量设置多一点,效率会更高一点。

调整可靠性 
acker task是非常轻量级的, 所以一个topology里面不需要很多acker。你可以通过Strom UI(id: -1)来跟踪它的性能。 如果它的吞吐量看起来不正常,那么你就需要多加点acker了。
如果可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据, 那么你可以通过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减少一半, 因为对于每一个tuple都要发送一个ack消息。并且它需要更少的id来保存下游的tuple, 减少带宽占用。
有三种方法可以去掉可靠性。

第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下, storm会在spout发射一个tuple之后马上调用spout的ack方法。也就是说这个tuple树不会被跟踪。
第二个方法是在tuple层面去掉可靠性。 你可以在发射tuple的时候不指定messageid来达到不跟粽某个特定的spout tuple的目的。
最后一个方法是如果你对于一个tuple树里面的某一部分到底成不成功不是很关心,那么可以在发射这些tuple的时候unanchor它们。 这样这些tuple就不在tuple树里面, 也就不会被跟踪了。

可靠性配置

有三种方法可以去掉消息的可靠性:

将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立刻被调用;

Spout发送一个消息时,不指定此消息的messageID。当需要关闭特定消息可靠性的时候,可以使用此方法;

最后,如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要做锚定,即在emit方法中不指定输入消息。因为这些子孙消息没有被锚定在任何tuple tree中,因此他们的失败不会引起任何spout重新发送消息。

如何关闭Ack机制

有2种途径

spout发送数据是不带上msgid

设置acker数等于0

值得注意的一点是Storm调用Ack或者fail的task始终是产生这个tuple的那个task,所以如果一个Spout,被分为很多个task来执行,消息执行的成功失败与否始终会通知最开始发出tuple的那个task。

作为Storm的使用者,有两件事情要做以更好的利用Storm的可靠性特征,首先你在生成一个tuple的时候要通知Storm,其次,完全处理一个tuple之后要通知Storm,这样Storm就可以检测到整个tuple树有没有完成处理,并且通知源Spout处理结果。

1 由于对应的task挂掉了,一个tuple没有被Ack:

Storm的超时机制在超时之后会把这个tuple标记为失败,从而可以重新处理。

2 Acker挂掉了: 在这种情况下,由这个Acker所跟踪的所有spout tuple都会出现超时,也会被重新的处理。

3 Spout 挂掉了:在这种情况下给Spout发送消息的消息源负责重新发送这些消息。

三个基本的机制,保证了Storm的完全分布式,可伸缩的并且高度容错的。

另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数,当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple, 从而限制spout发送数据。

通过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。

转发:https://www.cnblogs.com/intsmaze/p/5918087.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423460.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云开发数据库VS传统数据库丨云开发101

云开发数据库与传统数据库的不同 在小程序云开发中&#xff0c;最核心的便是三大组件&#xff1a;数据库、云存储和云函数&#xff0c;从今天开始&#xff0c;我们将开始隔日更的专栏文章&#xff0c;云开发101&#xff0c;在第一周&#xff0c;我们将从最最核心的数据库开始说…

前端学习(10):HTML语义化

我理解的HTML语义化 经过查看别人博文中的一些描述&#xff0c;我将HTML的语义化总结为&#xff1a; 用最恰当的标签来标记内容。 该如何理解呢&#xff1f;比如需要加入一个标题&#xff0c;这个标题的字体比正文的要大写&#xff0c;还要加粗。能够实现这种效果的方法有很多…

大尾端 小尾端

提到体系结构时&#xff0c;经常遇到大小尾端的概念&#xff0c;这里做个总结。 big endian&#xff1a;大尾端&#xff0c;也称大端&#xff08;高位&#xff09;优先存储。little endian&#xff1a;小尾端&#xff0c;也称小端&#xff08;低位&#xff09;优先存储。如下00…

Storm的BaseBasicBolt源码解析ack机制

我们在学习ack机制的时候&#xff0c;我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。 在BaseBasicBolt中&#xff0c;BasicOutputCollector在emit数据的时候&#xff0c;会自动和输入的tuple相关联&#xff0c;而在execute方法结束的时候那个输入tuple会被自动ack。 在使…

博客中gitalk最新评论的获取 github api使用

博客中&#xff0c;对于网友的评论以及每篇文章的评论数还是很重要的。但是基于静态的页面想要存储动态的评论数据是比较难的&#xff0c;一般博客主题中都内置了评论插件&#xff0c;但是博客主题中对于最新评论的支持显示还是很少的&#xff0c;至少目前我是没怎么发现。博客…

前端学习(11):标题和段落

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>标题和段落</title> </head> <body><h1>我是歌谣</h1><h2>我是歌谣</h2><h3>我是歌谣</h3>&l…

Kafka文件存储机制

Kafka是什么 Kafka是最初由Linkedin公司开发&#xff0c;是一个分布式、分区的、多副本的、多订阅者&#xff0c;基于zookeeper协调的分布式日志系统(也可以当做MQ系统)&#xff0c;常见可以用于web/nginx日志、访问日志&#xff0c;消息服务等等&#xff0c;Linkedin于2010年贡…

用Zend Encoder加密PHP文件和PHP 优化配置

在发布一个你写好的PHP程序时&#xff0c;你是不是担心自已辛苦写出来的成果会被别人占为已有呢&#xff1f;其实我们可以用Zend Encoder为我们的PHP文件加上一层保护壳。 软件版本&#xff1a;2.0.1软件大小&#xff1a;10.2M适用平台&#xff1a;Win9X/2000/XP官方网址&#…

【vue开发】vue插件的install方法

MyPlugin.install function (Vue, options) {// 1. 添加全局方法或属性Vue.myGlobalMethod function () {// 逻辑...}// 2. 添加全局资源Vue.directive(my-directive, {bind (el, binding, vnode, oldVnode) {// 逻辑...}...})// 3. 注入组件Vue.mixin({created: function ()…

笑话

有一次坐公交拿了IC卡排队上车&#xff0c;前面一个人是扔硬币的&#xff0c;我大脑短路跟着把IC卡扔进去了…… ●早上要戴隐形眼镜&#xff0c;结果把盖打开直接把眼镜倒马桶里,然后镇定地倒入新的护理液,准备摘眼镜&#xff0c;半天摘不下来。 ●邻居忘了带钥匙&#xff0c;…

kafkaspot在ack机制下如何保证内存不溢

storm框架中的kafkaspout类实现的是BaseRichSpout&#xff0c;它里面已经重写了fail和ack方法&#xff0c;所以我们的bolt必须实现ack机制&#xff0c;就可以保证消息的重新发送&#xff1b;如果不实现ack机制&#xff0c;那么kafkaspout就无法得到消息的处理响应&#xff0c;就…

云开发0基础训练营第二期热力来袭!

第二期云开发0基础训练营热力来袭&#xff01;课程升级、更佳体验、依旧免费&#xff01;每年的 “金九银十” 都是传说中的学习黄金期&#xff01;这期间在校的小伙伴面临开学季/求职季/考研季挑战&#xff0c;已经步入社会的也即将步入年终前的冲刺阶段。所以&#xff0c;这段…

前端学习(14):相对路径和绝对路径

目录结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible" conte…

discuz数据从godaddy主机中导出的mysql数据乱码变问号???的解决方法

从godaddy主机导出的mysql数据安装在本地电脑上发现原来的中文都变成了问号&#xff1f;godaddy主机中的数据库版本是5.0.67&#xff0c;charsetutf8 collationutf8_general_ci 而我的supesite的版本是gbk的&#xff0c;导出的数据也是 CREATE TABLE cdb_access (……) ENGINE…

jvm的新生代和老年代简介

新生代分为三个区域&#xff0c;一个Eden区和两个Survivor区&#xff0c;它们之间的比例为&#xff08;8&#xff1a;1&#xff1a;1&#xff09;&#xff0c;这个比例也是可以修改的。通常情况下&#xff0c;对象主要分配在新生代的Eden区上&#xff0c;少数情况下也可能会直接…

Python 连接redis密码中特殊字符问题

连接方法&#xff1a; self.pool redis.ConnectionPool.from_url(self.redis_url)opredis redis.Redis(connection_poolself.pool)redis_url redis://:cot$#D4^&1234172.31.26.174:6379/0 直接连redis会报错&#xff0c;报错主要内容&#xff1a; ValueError: invalid l…

浅谈yield

c#1.0使用foreach 语句可以轻松地迭代集合。在c#1.0中&#xff0c;创建枚举器仍需要做大量的工作。c#2.0添加了yield语句&#xff0c;以便于创建枚举器。下面我们浅谈下yield的使用&#xff1a; 1、包含yield语句的方法或属性称为迭代块。迭代块必须声明为返回IEnumerator或IEn…

NIO核心框架介绍

NIO共引入了4个概念&#xff1a; - 缓存区&#xff1a;表示数据存放的容器&#xff0c;提供可读写的数据缓存区&#xff1b; - 字符集&#xff1a;用来对缓存数据进行解码和编码&#xff0c;在字节和Unicode字符之间转换&#xff1b; - 通道&#xff1a;用来接收或发送数据&…

前端学习(16):跳转链接小练习

点击图片实现跳转 目录结构 header.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv&q…

云开发的数据库权限机制解读丨云开发101

在使用云开发进行开发时&#xff0c;数据库权限是一个让不少人困扰的部分&#xff0c;四种数据库权限&#xff0c;到底是什么意思&#xff1f;其各自的权限、应用场景都是什么&#xff1f;大多数人对于这个机制&#xff0c;还是模糊的。为了帮助大家进行更好的开发&#xff0c;…