kafkaspot在ack机制下如何保证内存不溢

storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。

 

但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为key。

public class MySpout extends BaseRichSpout {private static final long serialVersionUID = 5028304756439810609L;// key:messageId,Dataprivate HashMap<String, String> waitAck = new HashMap<String, String>();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}public void nextTuple() {String sentence = "the cow jumped over the moon";String messageId = UUID.randomUUID().toString().replaceAll("-", "");waitAck.put(messageId, sentence);//指定messageId,开启ackfail机制collector.emit(new Values(sentence), messageId);}@Overridepublic void ack(Object msgId) {System.out.println("消息处理成功:" + msgId);System.out.println("删除缓存中的数据...");waitAck.remove(msgId);}@Overridepublic void fail(Object msgId) {System.out.println("消息处理失败:" + msgId);System.out.println("重新发送失败的信息...");//重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的,而且下游collector.emit(new Values(waitAck.get(msgId)),msgId);}
}

那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢?如果这样,如果消息处理不断失败,不断重发,消息不断积累在kafkaspout节点上,kafkaspout端会不就会出现内存溢出?

 

其实并没有,回想kafka的原理,Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。也就是说,kafkaspot在消费kafka的数据是,通过offset读取到消息并发送给bolt后,kafkaspot只是保存者当前的offset值。

当失败或成功根据msgId查询offset值,然后再去kafka消费该数据来确保消息的重新发送。

 

那么虽然offset数据小,但是当offset的数据量上去了还是会内存溢出的?

其实并没有,kafkaspout发现缓存的数据超过限制了,会把某端的数据清理掉的。

kafkaspot中发送数据的代码

collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

可以看到msgID里面包装了offset参数。

它不缓存已经发送出去的数据信息。

 

当他接收到来至bolt的响应后,会从接收到的msgId中得到offset。以下是从源码中折取的关键代码:

public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.ack(id.offset);}}m.ack(id.offset);public void ack(Long offset) {_pending.remove(offset);//处理成功移除offsetnumberAcked++;}public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.fail(id.offset);}}m.fail(id.offset);public void fail(Long offset) {failed.add(offset);//处理失败添加offsetnumberFailed++;}SortedSet<Long> _pending = new TreeSet<Long>();SortedSet<Long> failed = new TreeSet<Long>();

关于kafkaspot的源码解析大家可以看这边博客:http://www.cnblogs.com/cruze/p/4241181.html

源码解析中涉及了很多kafka的概念,所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的,如果不理解kafka概念,那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。

转发:http://www.cnblogs.com/intsmaze/p/5947078.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/423449.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云开发0基础训练营第二期热力来袭!

第二期云开发0基础训练营热力来袭&#xff01;课程升级、更佳体验、依旧免费&#xff01;每年的 “金九银十” 都是传说中的学习黄金期&#xff01;这期间在校的小伙伴面临开学季/求职季/考研季挑战&#xff0c;已经步入社会的也即将步入年终前的冲刺阶段。所以&#xff0c;这段…

前端学习(14):相对路径和绝对路径

目录结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible" conte…

discuz数据从godaddy主机中导出的mysql数据乱码变问号???的解决方法

从godaddy主机导出的mysql数据安装在本地电脑上发现原来的中文都变成了问号&#xff1f;godaddy主机中的数据库版本是5.0.67&#xff0c;charsetutf8 collationutf8_general_ci 而我的supesite的版本是gbk的&#xff0c;导出的数据也是 CREATE TABLE cdb_access (……) ENGINE…

jvm的新生代和老年代简介

新生代分为三个区域&#xff0c;一个Eden区和两个Survivor区&#xff0c;它们之间的比例为&#xff08;8&#xff1a;1&#xff1a;1&#xff09;&#xff0c;这个比例也是可以修改的。通常情况下&#xff0c;对象主要分配在新生代的Eden区上&#xff0c;少数情况下也可能会直接…

Python 连接redis密码中特殊字符问题

连接方法&#xff1a; self.pool redis.ConnectionPool.from_url(self.redis_url)opredis redis.Redis(connection_poolself.pool)redis_url redis://:cot$#D4^&1234172.31.26.174:6379/0 直接连redis会报错&#xff0c;报错主要内容&#xff1a; ValueError: invalid l…

浅谈yield

c#1.0使用foreach 语句可以轻松地迭代集合。在c#1.0中&#xff0c;创建枚举器仍需要做大量的工作。c#2.0添加了yield语句&#xff0c;以便于创建枚举器。下面我们浅谈下yield的使用&#xff1a; 1、包含yield语句的方法或属性称为迭代块。迭代块必须声明为返回IEnumerator或IEn…

NIO核心框架介绍

NIO共引入了4个概念&#xff1a; - 缓存区&#xff1a;表示数据存放的容器&#xff0c;提供可读写的数据缓存区&#xff1b; - 字符集&#xff1a;用来对缓存数据进行解码和编码&#xff0c;在字节和Unicode字符之间转换&#xff1b; - 通道&#xff1a;用来接收或发送数据&…

前端学习(16):跳转链接小练习

点击图片实现跳转 目录结构 header.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv&q…

云开发的数据库权限机制解读丨云开发101

在使用云开发进行开发时&#xff0c;数据库权限是一个让不少人困扰的部分&#xff0c;四种数据库权限&#xff0c;到底是什么意思&#xff1f;其各自的权限、应用场景都是什么&#xff1f;大多数人对于这个机制&#xff0c;还是模糊的。为了帮助大家进行更好的开发&#xff0c;…

jremind V0.1.3.0添加透明

这次就把弹出消息的窗口改成了透明&#xff0c;本来说是做鼠标穿透的&#xff0c;给做成了透明&#xff0c;先用着&#xff01;有时间了再改了。 转载于:https://www.cnblogs.com/joypen/archive/2009/05/11/1693125.html

Java中的NIO非阻塞编程

在JDK1.4以前&#xff0c;Java的IO操作集中在java.io这个包中&#xff0c;是基于流的阻塞API。对于大多数应用来说&#xff0c;这样的API使用很方便&#xff0c;然而&#xff0c;一些对性能要求较高的应用&#xff0c;尤其是服务端应用&#xff0c;往往需要一个更为有效的方式来…

Nginx----基础

静态资源服务 通过本地文件系统提供服务&#xff1a;对css&#xff0c;js文件&#xff0c;图片等静态文件 反向代理服务 缓存&#xff1a;将一些数据经常不变的&#xff0c;缓存到Nginx中&#xff0c;直接给用户提供服务 负载均衡 api服务 OpenResty 数据库的服务比较简单&…

c#开发IE控件

c#开发IE控件主要是对BHO对象是使用,但是我们知道BHO是一个COM对象,而在.NET下开发基于COM的应用,总觉得不是很简单,这里有受控代码与COM的调用,我查找了下,国内并没有此类信息,下面是译稿,翻译的不好,欢迎指出.介绍:我们在浏览Internet信息的时候,往往需要增强用户浏览信息的,…

JVM直接内存

概述 直接内存并不是虚拟机运行时数据区的一部分&#xff0c;也不是Java 虚拟机规范中农定义的内存区域。在JDK1.4 中新加入了NIO(New Input/Output)类&#xff0c;引入了一种基于通道(Channel)与缓冲区&#xff08;Buffer&#xff09;的I/O 方式&#xff0c;它可以使用native…

英语----专业单词

directive (滴 ‘ruai k tive) n&#xff1a;指令determine 【di tε:min】 (de ter min) v&#xff1a;确定 词根&#xff1a;term 边界 distributions (dis tree biu tion)v&#xff1a;分配 C&#xff1a;发行版 exact (衣 ge za k t) adj&#xff1a;确切的 ex&#xff1a…

js遍历table

js遍历table var tableObj document.getElementById("tableName");var str "";for(var i0;i<tableObj.rows.length;i){for(varj0;j<tableObj.rows[i].cells.length;j){//str tableObj.rows[i].cells[j].innerHTML" ";for(var z0;z<…

深入理解JVM—JVM内存模型

我们知道&#xff0c;计算机CPU和内存的交互是最频繁的&#xff0c;内存是我们的高速缓存区&#xff0c;用户磁盘和CPU的交互&#xff0c;而CPU运转速度越来越快&#xff0c;磁盘远远跟不上CPU的读写速度&#xff0c;才设计了内存&#xff0c;用户缓冲用户IO等待导致CPU的等待成…

Nginx----进阶

用Nginx搭建一个静态的web资源服务器/动静分离 简单使用 1、可以在安装的nginx目录新建自己的目录zy&#xff08;和conf在一个目录下&#xff0c;也就是和html目录在一个目录下&#xff0c;注意如果使用/zy&#xff0c;那么zy目录需要创建在linux根目录&#xff09;&#xff0c…

ASP.NET页面生命周期描述

ASP.NET页面生命周期描述2008-09-12 09:25在以前写个一篇关于ASP.NET页面生命周期的草稿,最近又看了看ASP.NET&#xff0c;做个补充,看看页面初始过程到底是怎么样的 下面是ASP.NET页面初始的过程: 1. Page_Init(); 2. Load ViewState; 3. Load Postback data; 4. Page_Load();…

小程序·云开发的HTTP API调用丨实战

小程序云开发之httpApi调用。 小程序云开发之httpApi调用&#xff08;返回“47001处理”&#xff09; 技术栈 采用 nodejs express 搭建web服务器&#xff0c;采用 axios 请求第三方 httpApi nodejsexpressaxios 项目结构 通过应用生成器工具 express-generator 可以快速创建一…