storm消息可靠机制(ack)的原理和使用

关于storm的基础,参照我这篇文章:流式计算storm
关于并发和并行,参照我这篇文章:并发和并行
关于storm的并行度解释,参照我这篇文章:storm的并行度解释
关于storm的流分组策略,参照我这篇文章:storm的流分组策略
关于storm的消息可靠机制,参照我这篇文章:storm的消息可靠机制

storm的消息可靠机制可以确保spout发出的每条tuple消息都会被完整的处理;
主要是由spout和bolt共同完成的.
本文主要讨论storm的消息可靠机制的原理和使用

storm的可靠机制,是storm的一大亮点,那么他是如何实现的呢?
先看效果:1.spout每发一条消息,就新建一个唯一的msgId(比如UUID),然后将这条消息和这个唯一id存在map中;2.每个bolt在处理tuple后,emit的时候带上tulpe,成功,就调用ack方法,代表成功,失败就调用fail方法,代表失败;这样编写代码后,你会发现,失败的消息spout会重新发送,效果就出来了
实现原理:原理很简单,使用了异或的知识点.我们知道,任意两个相同的数字,异或的结果都是0.例如:1^1=0现在请跟着我的思路想:1.首先想象有个服务,叫ack,他的主要作用就是判断每条tuple信息是否都成功处理2.每个spout发送和接收成功,都要给ack发送一个数字,最后由ack计算,判断整条链路是否成功处理3.spout作为发送方,假设他要给3个bolt发送消息,分别是bolt1,bolt2,bolt3;4.假设这3个bolt最后都发给bolt4;5.假设本次要处理的消息叫做root_id;6.开始发送了;7.spout给bolt1发送消息<root_id,1>8.spout给bolt2发送消息<root_id,2>9.spout给bolt3发送消息<root_id,3>10.发送完spout再给ack发送1^2^311.bolt1收到<root_id,1>,处理成功再给bolt4发送<root_id,4>;12.发送完bolt1再给ack发送1^4,处理不成功就不发送了;13.bolt2收到<root_id,2>,处理成功再给bolt4发送<root_id,5>;14.发送完bolt2再给ack发送2^5,处理不成功就不发送了;15.bolt3收到<root_id,3>,处理成功再给bolt4发送<root_id,6>;16.发送完bolt3再给ack发送3^6,处理不成功就不发送了;17.bolt4收到前3个bolt的消息,<root_id,4>,<root_id,5>,<root_id,6>,处理成功后分别给ack发送4,5,6,处理不成功就不发送了;18.我们站在ack的角度来看,对于root_id这条消息来说,如果所有spout和bolt都成功,那么应该会收到:1^2^3,1^4,2^5,3^6,4,5,6;19.将所有收到的数字异或操作,即:1^2^3^1^4^2^5^3^6^4^5^6,由于相同数字异或结果为0,即上面的式子的结果就是0,任意少收到哪个值,最终的结果都不会为0;20.如果ack最终计算的结果是0,那么就代表这个消息root_id处理成功了21.如果ack最终计算结果不为0,那么就代表这个消息root_id处理失败了

如何使用

举个项目中的例子:
spout中:

    这个类 extends BaseRichSpoutprivate OutputCollector collector;private ConcurrentHashMap<UUID, Values> pending;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {this.collector = collector;this.pending = new ConcurrentHashMap<>();}@Overridepublic void nextTuple() {//具体业务...Values value = new Values("要传的业务数据");UUID msgId = UUID.randomUUID();this.pending.put(msgId, value);this.collector.emit(value, msgId);}@Overridepublic void ack(Object msgId) {//收到成功消息,就删除这条msgIdthis.pending.remove(msgId);}@Overridepublic void fail(Object msgId) {//收到失败消息就重新发送一遍//一般成熟的做法是会再记录个失败次数,不会一直失败重发的this.collector.emit(this.pending.get(msgId), msgId);}

bolt中:

    这个类 extends BaseRichBoltprivate OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {this.collector=collector;}@Overridepublic void execute(Tuple tuple) {try {//具体业务...//注意,这里发送的时候,一定要带上tuplethis.collector.emit(tuple,new Values("业务数据"));collector.ack(tuple);} catch (Exception e) {collector.fail(tuple);e.printStackTrace();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/499985.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JS--Console.log()详解

对于JavaScript程序的调试&#xff0c;相比于alert()&#xff0c;使用console.log()是一种更好的方式&#xff0c;原因在于&#xff1a;alert()函数会阻断JavaScript程序的执行&#xff0c;从而造成副作用&#xff1b;而console.log()仅在控制台中打印相关信息&#xff0c;因此…

订单单量监控v2

前段时间做了一个订单单量监控的项目,已经投入使用了,现在总结一下 前期的想法参考这篇文章 整体使用了storm实时计算框架和redis数据库,还有kafka消息队列 先上效果图,我们可以后期将数据展示出来,明显发现某天00点有单量突变的情况,明显是促销活动导致单量增加了 而后面的报…

AFNetworking 3.0迁移指南

AFNetworking是一款在OS X和iOS下都令人喜爱的网络库。为了迎合iOS新版本的升级, AFNetworking在3.0版本中删除了基于 NSURLConnection API的所有支持。如果你的项目以前使用过这些API&#xff0c;建议您立即升级到基于 NSURLSession 的API的AFNetworking的版本。本指南将引导您…

想法记录---实时计算的TopN的实现

TopN就是找出时间段内出现频率最高的n个 TopN的计算是个老生常谈的话题,比如微博的热搜,都是隔段时间就统计一次TopN 现在想做一个实时计算的TopN. 先说说离线计算的TopN,再说实时TopN 离线TopN 离线TopN一般出现在大数据的应用场景,使用hadoop的map reduce,网上有很多案例 …

iOS中的MVC设计模式

一、MVC概述模型&#xff0d;视图&#xff0d;控制器&#xff08;MVC&#xff09;是Xerox PARC在二十世纪八十年代为编程语言Smalltalk&#xff0d;80发明的一种软件设计模式&#xff0c;已被广泛使用。后来被推荐为Oracle旗下Sun公司Java EE平台的设计模式&#xff0c;并且受到…

java 时间的相关转换操作

关于时间的操作,写了个util 主要用于预测的时候,时间段的确定 预测的时间段,需要明确的规定范围,由于业务的促销都是整点开始,所以我们的预测时间段,也是从整点开始 所以就要将时间分为多个整段,比如一分钟一段,那么就是2018-06-18 00:00:00 到2018-06-18 00:01:00 就是一段 那…

iOS-MVVM-模式介绍

一、MVVM概述 MVVM 到底是什么&#xff1f;我们首先看一下MVC架构&#xff1a;我们看到的是一个典型的 MVC 设置。Model 呈现数据&#xff0c;View 呈现用户界面&#xff0c;而 View Controller 调节它两者之间的交互。Cool&#xff01;稍微考虑一下&#xff0c;虽然 View 和 …

[数据库]---mysql数据库 使用binlog+canal或binlake进行数据库的复制

前言 在进行冷热分离的时候&#xff0c;需要将数据实时的复制在历史数据库中&#xff0c;我们使用的是binlogcanal的思想,将每次数据库数据的变更转换成消息发出来,然后再操作这些消息达到数据复制的 在京东,实现同样功能的组件&#xff0c;叫binlake 接下来详细说下: 1.Binl…

加密算法概念简介--MD5、SHA、DES、3DES、AES、RSA、ECC

MD5MD5即Message-Digest Algorithm 5&#xff08;信息-摘要算法5&#xff09;&#xff0c;用于确保信息传输完整一致。是计算机广泛使用的杂凑算法之一&#xff08;又译摘要算法、哈希算法&#xff09;&#xff0c;主流编程语言普遍已有MD5实现。将数据&#xff08;如汉字&…

MAC下配置ZSH

MAC下面的终端是神器。而且苹果非常贴心的为我们准备好了ZSH。 可惜ZSH不是很好用&#xff0c;需要配合一些插件和模板&#xff1a;oh-my-zsh将bash切换为zsh chsh -s /bin/zsh其实还可以用which来定位&#xff08;特别是ubuntu的童鞋&#xff09; chsh -s which zsh 直接用zsh…

搭建: canal部署与实例运行

搭建&#xff1a; canal部署与实例运行 数据库读log同步用 详见下面

MAC下使用OpenSSL生成私钥和公钥

MAC OS自带了OpenSSL,直接在命令行里使用OPENSSL就可以。打开命令行工具&#xff0c;然后输入 openssl打开openssl&#xff0c;接着只要三句命令就可以搞定。1、打开Terminal--cd 到指定文件夹&#xff0c;如桌面Mac:~/Desktop $ openssl2、第一句命令&#xff1a;生成私钥&…

idea插件开发(01)---最简单的helloworld版,不需要知道原理,先跟我做一个最简单的弹框插件

前言 用了那么多idea插件,也想自己做一个插件,下面就是入门版本 你不需要先知道所有的概念,先跟着我的步骤做一个小;例子,后面再说原理 相关概念看后面一篇 本次以windos系统为例 开始 1.你得安装一个环境,供idea插件的开发使用 下载地址: https://www.jetbrains.com/idea/…

苹果封装的对称加密和非对称加密API

一、信息摘要算法5&#xff1a;MD51.系统库位置&#xff1a;<CommonCrypto/CommonHMAC.h>。2.非加密算法&#xff0c;属于哈希散列&#xff0c;不可逆&#xff0c;用于检验数据完整性。二、安全散列(哈希)算法SHA&#xff1a; 1.包含的散列算法&#xff1a;SHA-1&#xf…

RSA算法原理

一、RSA算法数论&#xff1a;将两个大素数相乘十分容易&#xff0c;但是想要对其乘积进行因式分解却极其困难&#xff0c;因此可以将乘积公开作为加密密钥。二、RSA算法涉及三个参数&#xff0c;n、e1、e2。三、公私钥生成过程1、首先&#xff0c;需要两个大质数&#xff0c;p和…

idea插件开发(02)---相关概念介绍

上一篇已经介绍了idea插件开发最简单的一个例子 本篇来说说相关概念,部分概念是网上抄的,网友的不同说法,但大致都是同一个意思 1.idea介绍 idea整个组件结构是基于PicoContainer的,他是一个嵌入式的Ioc容器,有点像Spring的容器 我们做的idea插件,作为扩展功能组件,可以添加到…

ECC椭圆曲线加密算法原理

比特币使用椭圆曲线算法生成公钥和私钥&#xff0c;选择的是secp256k1曲线。与RSA&#xff08;Ron Rivest&#xff0c;Adi Shamir&#xff0c;Len Adleman三位天才的名字&#xff09;一样&#xff0c;ECC&#xff08;Elliptic Curves Cryptography&#xff0c;椭圆曲线加密&…

mysql报错 DuplicateKeyException分析与解决

在做数据库同步的时候,发现一个错误,mysql报错如下: org.springframework.dao.DuplicateKeyException: ### Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException:XXX ### The error may involve com.jd.medicine.b2…

[运维]---linux机器一般监控用到的概念记录

发布在linux上的机器,一般我们需要监测各项数据来证实服务器是没问题的, 如果出现问题,我们也可以通过以下指标找到问题的方向 容器指标硬件指标磁盘指标系统指标网络指标 容器指标 线程数 -当前容器内线程总数&#xff08;平均到每个核的线程数&#xff09;进程数 -当前容器…

SHA算法原理

一、SHA1算法简介 安全哈希算法&#xff08;Secure Hash Algorithm&#xff09;主要适用于数字签名标准&#xff08;Digital Signature Standard DSS&#xff09;里面定义的数字签名算法&#xff08;Digital Signature Algorithm DSA&#xff09;。对于长度小于2^64位的消息&am…