[九]RabbitMQ-客户端源码之Consumer

在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。

这里先来看下消费者客户端的关键代码:

        QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicQos(32);channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [X] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}

可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。

在AMQConnection中有关MainLoop的主线程,专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:

if (method instanceof Basic.Deliver) {processDelivery(command, (Basic.Deliver) method);return true;
} 

之后调用processDelivery方法:

protected void processDelivery(Command command, Basic.Deliver method) {Basic.Deliver m = method;Consumer callback = _consumers.get(m.getConsumerTag());if (callback == null) {if (defaultConsumer == null) {throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case.");}else {callback = defaultConsumer;}}Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());try {this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());} catch (Throwable ex) {getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery");}
}

这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数,然后调用这个回调函数的handleDeliver()方法进行处理,这里有些同学会有疑问,明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法,其实这里只是包了一层皮,ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。

我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的:

@Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException
{checkShutdown();this._queue.add(new Delivery(envelope, properties, body));
}

这里的queue就是一个LinkedBlockingQueue,客户端程序通过调用nextDelivery()方法来获取数据:

public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{return handle(_queue.take());
}private Delivery handle(Delivery delivery) {if (delivery == POISON ||delivery == null && (_shutdown != null || _cancelled != null)) {if (delivery == POISON) {_queue.add(POISON);if (_shutdown == null && _cancelled == null) {throw new IllegalStateException("POISON in queue, but null _shutdown and null _cancelled. " +"This should never happen, please report as a BUG");}}if (null != _shutdown)throw Utility.fixStackTrace(_shutdown);if (null != _cancelled)throw Utility.fixStackTrace(_cancelled);}return delivery;
}

这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作,也就是一个可能会阻塞等待的操作。


附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/290871.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Performance Metrics(性能指标1)

Performance Metrics(性能指标) 在我们开始旅行本书之前,我必须先了解本书的性能指标和希望优化后的结果,在第二章中,我们探索更多的性能检测工具和性能指标,可是,您得会使用这些工具和明白这些性能指标的意义。 由于业…

ExtJs 4.x Ajax简单封装

/*** Ajax 请求*/ Ext.define("SinoCloudAjaxRequestClass", {constructor : function () {var me this;var viewport me.getViewPort();if(viewport){window.sinoCloudAjaxRequestClassLoadingMak new Ext.LoadMask(viewport, {msg:"处理中..."});}},r…

可能是.NET领域性能最好的对象映射框架——Mapster

我之前文章提到过 MediatR 的作者 Jimmy Bogard,他也是大名鼎鼎的对象映射框架 AutoMapper 的作者。AutoMapper 的功能强大,在 .NET 领域的开发者中有非常高的知名度和使用率。而今天老衣要提的是另外一款高性能对象映射框架:Mapster——它轻…

Android studio之Unknown run configuration type AndroidRunConfigurationType解决办法

1、问题 我也就是只是一开始点击了File->invalidate cachas / restart -> invalidate and restart 在Android studio里面运行之前正常的安卓项目,报下错误Unknown run configuration type AndroidRunConfigurationTyp 2、解决办法 原因:是因为插件…

Delphi XE5实现减少编译出来的程序体积

本文章介绍了Delphi XE5实现减少编译出来的程序体积,一般情况下,编译出来的文件会比较大,对于发布来说,比较不方便,经过查询,找到了两个减少体积的办法1:关闭DEBUG信息,通过下面的步…

超级计算机适用于科学计算,中国科学院

中科院合肥物质科学研究院物质科学计算中心超算用户使用规章为加强物质科学计算中心(中科院超级计算环境合肥分中心)的运行管理,合理和科学地使用超算资源,发挥超算平台在科研工作中的作用,特制订此规章。1. 用户应自觉遵守国家的各项法律规定…

[cocos2d]修改富文本文本和高度

1.local richTable { {text , color cc.c3b(173,118,15)}, {custom , color ItemMacro[index].color, param id} } 2.item:setContentSize(50,20)转载于:https://www.cnblogs.com/Faiz-room/p/6727072.html

CodeForces 546B

题目链接:http://acm.hust.edu.cn/vjudge/contest/view.action?cid82659#problem/C 解题思路:先对输入的数据放入a数组里面存储,再将a数组用sort进行排序,从第二个数开始判断,是否比第一个大,如果大&#…

Avalonia跨平台入门第一篇

作为一枚屌丝程序员来说最大的爱好就是撸代码,有时候根本停不下来(沉迷工作,无法自拔);因为一直都是WPF开发,后面也摸索了一下Xamarin的东西;这不又看到其他人又在搞什么跨平台;我也是手也很痒痒;就像刚开始摸索Xamarin一样,想又不知如何下手;这不再次迈出了第一步去摸索Avalon…

linux之用route命令看简单路由信息

1、我们在linux上简单看路由信息使用下面命令 route -n

三角形带优化库nvtrisrip的使用

nvtrisrip是NVIDIA提供的一个开源优化库,这个库可以将三角形顶点索引数组转换为三角形带索引数组。可以极大的提高渲染速度。NVIDIA这个库的官方地址是:http://www.nvidia.com/object/nvtristrip_library.html不过这里代码不全也不够新,推荐从…

angular-ui-tab-scroll

2019独角兽企业重金招聘Python工程师标准>>> A scrollable tab plugin intended for scrolling UI Bootstrap tabset. 功能介绍:http://npm.taobao.org/package/angular-ui-tab-scroll 下载地址:https://github.com/VersifitTechnologies/ang…

调用带有 out 参数的方法时检查弃元参数

前言C# 支持弃元,弃元是应用程序代码中故意未使用的占位符变量。弃元将意图传达给编译器和读取代码的其他人:你打算忽略表达式的结果。通过为变量分配下划线(_)作为其名称,可以指示变量是弃元变量。例如下列代码:if (DateTime.Try…

007-网站的搭建

昨天在极客学院的视频引导下,我成功的模拟量本地建站和利用虚拟主机建站。 我用的虚拟主机是阿里云提供的虚拟主机,域名是从万网购买的,网站模板是wordpress。 先通过虚拟主机把网站搭建好,再买域名,将域名解析到网站上…

潍坊学院计算机系崔玲玲,人工免疫算法在引水工程中的应用.pdf

人工免疫算法在引水工程中的应用.pdf第 14卷第2期 潍坊学院学报 Vo1.14No.22014年 4月 JournalofWeifangUniversity Apr.2014人工免疫算法在引水工程中的应用崔玲玲 ,王林叶 ,陈志银(1-潍坊学院,山东 潍坊 …

Android之Unable to execute dex: Multiple dex files define 解决方法

1、问题 运行Android项目,出现Unable to execute dex: Multiple dex files define 这个错误 2、原因 代码里面引用的jar包和项目里面的类冲突了,一般比如,我写了这个项目,然后把这个项目打成jar包,然后再导入这个项目…

为什么?

为什么80%的码农都做不了架构师?>>> 为什么总有那么多的难以忘怀?或许这是前世我们欠下的债吧为什么总觉得别人家的好?却忽视了身边最真实的温暖为什么总是固执的坚持着虚幻的前景?因为就算再小的梦想也有实现的权利为…

抽象类和接口类的区别

2019独角兽企业重金招聘Python工程师标准>>> 一、 抽象类abstract class 1 .抽象类是指在 class 前加了 abstract 关键字且存在抽象方法(在类方法 function 关键字前加了 abstract 关键字)的类。 2 .抽象类不能被直接实…

浅谈C#字符串构建利器StringBuilder

前言在日常的开发中StringBuilder大家肯定都有用过,甚至用的很多。毕竟大家都知道一个不成文的规范,当需要高频的大量的构建字符串的时候StringBuilder的性能是要高于直接对字符串进行拼接的,因为直接使用或都会产生一个新的String实例&#…

高效时间管理

P1 高效时间管理P2课程主要内容时间管理概述高效时间管理策略性的目标设定设定优先顺序规划与组织时间管理工具消除时间杀手高效日程管理如何为领导制定日程表P3昨天是一张已被注销的支票明天是一张尚未到期的支票今天则是随时可运用的现金,请善用它!P4高…