RabbitMQ 原文译03--发布和订阅

发布/订阅

在之前的案例中我们创建了一个工作队列,这个工作队列的实现思想就是一个把每一个任务平均分配给每一个执行者,在这个篇文章我们会做一些不一样的东西,把一个消息发送给多个消费者,这种模式就被称作"发布/订阅".

为了说明这个模式,我们将要创建一个简单的日志系统,一个负责发布消息,另外一个负责接收打印他们.

在我们的日志系统中,每一个运行中的接收者副本将都会获得消息,这种方式可以让我们在运行一个接收者直接把消息保存在磁盘的同时,另外一个消费者可以把消息打印到屏幕上.

本质上,发布一个日志消息将会广播给所有的接收者

交换机(Exchanges)

在之前的文章中,我们接受和发送消息都是通过一个队列来完成了,现在是时候引入RabbitMQ的全部工作模型了.

让我们快速回忆一下之前涉及到的模型

--生产者(发布者),是一个负责发送消息的用户应用程序.

--队列,负责存储消息

--消费者(接收者),负责接收消息的用户程序.

RabbitMQ的核心思想是生产者永远不会直接把消息发送给队列,事实上生产者甚至经常不知道一个发出去的消息是否可以有队列去接收它.

相应的,生产者只能消息发送给交换机,交换机的工作机制非常简单,一方面它从生产者那里接收到消息,另一方面它会把消息发送给相应的队列上.交换机必须要知道怎么处理接收到的消息,它应该被放入一个特殊的队列吗?它是否应该被放入多个队列?或者它是否需要被忽略.

处理这工作的方式是通过交换机类型来实现的.

这里有几个可用的交换机类型:direct,topic,headers,fanout 我们将会关注最后一个(fanout),让我们创建一个fanout的交换机,名字叫做'logs'

channel.ExchangeDeclare("logs", "fanout");

这个fanout的交换机功能非常简单(你也许已经从名字中猜到了他的方式),把接收到的消息广播给所有已知的队列,这个这是我们的日志系统需要的.

列出RabbitMQ已添加的交换机:

cmd:rabbitmqctl list_exchanges

 

无命名的交换机:在之前的案例中我们对于交换机一无所知,但是仍然可以把消息发送到队列上,这是因为我们使用的是一个默认的交互机,名字为空(""),回顾一下我们之前发送消息的方式

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",routingKey: "hello", basicProperties: null,body: body);

第一个参数就是交换机的名称,空字符串表示默认的无命名的交换机:消息通过存在的RoutingKey被发送到队列上.

现在我们发送命名的交换机代替:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null,  body: body);

临时队列

在之前的案例中,我们使用的队列是一个指定了名字的队列(记得hello 和task_queue 吗),给一个队名命名是严格的,我们需要执行者连接的同样的队列来工作,当你想在生产者和消费者之间共享队列的时候指定一个队列名是非常重要的.但是我们的日志系统则不在此列,

我们想要监听到所有的日志消息,而不仅仅是他们的子集,我们也仅仅对当前正在流转的消息感兴趣,而不是老的消息,结局这个问题我们需要2件事情.

首先,无论何时我们连接到队列,我们都需要一个新鲜的,空的队列,为了实现这个目标我们可以每次创建一个随机名称的队列,或者更加便捷的方式--让服务为我们的队列随机命名.

第二,一旦我们断开到消费者到队列的连接,我们需要自动删除队列.

在.Net客户端,我们使用无参的queueDeclare()方法来创建一个随机命名的非持久的,自动删除的排他队列.

var queueName = channel.QueueDeclare().QueueName;

queueName就是一个随机的队列名,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg.

绑定

我们已经创建了一个fanout的交换机和一个队列,现在我们需要告诉我们交换机发送消息到我们的队列,交换机和队列之间的关系叫做绑定.

channel.QueueBind(queue: queueName,exchange: "logs", routingKey: "");

从现在开始logs 交换机将会把消息放入我们的队列当中.

列出队列cmd: rabbitmqctl list_bindings

汇总

负责发送消息的生产者可之前案例基本上是一样的,最大的不同是我们将消息发送到了我们的命名队列logs上而不是默认的队列上,发送的时候我们需要使用routingKey,但是它的值是被fanout交换机忽略的.

EmitLog.cs

class EmitLog
{public static void Main(string[] args){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: "fanout");var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);Console.WriteLine(" [x] Sent {0}", message);}Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}private static string GetMessage(string[] args){return ((args.Length > 0)? string.Join(" ", args): "info: Hello World!");}
}

正如你看到的,我们在建立连接之后创建了一个队列,这一步是必须的,因为发送到一个不存在的交换机是不被允许的。

当队列还没有绑定到交换机是发送的消息将会丢失,但是这对我们日志系统来说没有问题,当没有消费者监听时我们可以安全的忽略这个消息。

ReceiveLogs.cs:

class ReceiveLogs
{public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost" };using(var connection = factory.CreateConnection())using(var channel = connection.CreateModel()){channel.ExchangeDeclare(exchange: "logs", type: "fanout");var queueName = channel.QueueDeclare().QueueName;channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");Console.WriteLine(" [*] Waiting for logs.");var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);Console.WriteLine(" [x] {0}", message);};channel.BasicConsume(queue: queueName,noAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}
}

同时运行两个receive,可以看到两个接收端可以同时接收到一个消息。

转载于:https://www.cnblogs.com/grayguo/p/5356070.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/573764.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html富文本编辑器插件_vue中使用vuequilleditor富文本编辑器

点击上方“小姚同学技术栈”快速关注我哟!vue-quill-editor是一个基于quill、适用于vue的富文本编辑器开源项目,支持服务端渲染和单页应用。目前项目热度还算可以,如果不考虑使用markdown,vue-quill-editor是一个比较好的选择。本…

二元函数图像生成器_GAN生成图像综述

点击上方“CVer”,选择加"星标"或“置顶”重磅干货,第一时间送达作者:YTimo(PKU EECS) 研究方向:深度学习,计算机视觉本文转载自:SIGAI摘要生成对抗网络(Generative adversarial network, GAN)…

设计模式之禅读书笔记

》设计原则《 》Single Responsibility Principle(单一职责原则)类只有一个修改的原因。 ●类的复杂性降低,实现什么职责都有明确的定义。 ●可读性高 ●可维护性高 ●变更引起的风险降低。 PS:基本不可能实现 》里氏替换原则&…

mysql mysql_set_charset_SQL注入攻击之 mysql_set_charset [转]

本文转载地址:http://hi.baidu.com/cuttinger/blog/item/e9a93901934755147bec2cb0.html1。老话题,mysql_real_escape_string单引号,大多数情况下,防止sql注入攻击足够了。$mysql mysql_connect("host","user&quo…

idea导入maven项目依赖报错_解决Maven依赖冲突的好帮手,这款IDEA插件了解一下?

1、何为依赖冲突Maven是个很好用的依赖管理工具,但是再好的东西也不是完美的。Maven的依赖机制会导致Jar包的冲突。举个例子,现在你的项目中,使用了两个Jar包,分别是A和B。现在A需要依赖另一个Jar包C,B也需要依赖C。但…

java线程创建方式_Java创建线程安全的方法

原文链接 译者:秦建平 校对:方腾飞首先来看一个问题:下面这个方法是线程安全的吗?如何才能让这个方法变成线程安全的?public class MyCount {private static int counter 0;public static int getCount(){return coun…

win7 能下node什么版本_微软从未公开的win10版本,3GB+极度精简,老爷机有救了

在windows家族中,最好用的就是win7和XP系统。堪称经典,而且还是发展最成功的系统版本。前几天韩博士也发布一篇关于XP系统的文章,评论区引发极大争论。大家众说纷纭,觉得XP系统是顺畅,但是很多软件硬件都不支持&#x…

【Swift学习】Swift编程之旅(一)

学习一门新语言最经典的例子就是输出“Hello World!” print("Hello World!")  swift就是这样来输出的。 如果你使用过其他语言,那么看上去是非常的熟悉吧。但比一些c要简单的多吧 1、不需要导入一些单独的库,比如输入/输出或字符…

孔夫子二手书采集

文章目录 项目演示软件采集单本数据网页搜索数据对比 使用场景概述部分核心逻辑Vb工程图数据导入与读取下拉框选择参数设置线程 使用方法下载软件授权导入文件预览处理后的数据 项目结构附件说明 项目演示 操作视频详见演示视频,以下为图文演示 软件采集单本数据 …

为什么用redis做缓存而不是mybatis自带的缓存_如何用Java设计一个本地缓存,涨姿势了...

最近在看Mybatis的源码,刚好看到缓存这一块,Mybatis提供了一级缓存和二级缓存;一级缓存相对来说比较简单,功能比较齐全的是二级缓存,基本上满足了一个缓存该有的功能。当然如果拿来和专门的缓存框架如ehcache来对比可能…

process 类 java_编写可执行jar——java的Process类的使用(二)

你知道怎么在控制台使用ping吗?那你知道怎么在java中使用ping吗?1.批处理文件批处理文件大家一定不陌生。接触最多的应该就是tomcat中的start.bat或者start.sh了。bat是在windows环境下运行的批处理文件,sh则是linux的shell脚本。2.adb指令安…

2782: [HNOI2006]最短母串

2782: [HNOI2006]最短母串 Time Limit: 1 Sec Memory Limit: 128 MBSubmit: 3 Solved: 2[Submit][Status][Web Board]Description 给定n个字符串(S1,S2,„,Sn),要求找到一个最短的字符串T,使得这n个字符串(S1,S2,„,…

c java 内部类_java程序中能否在内部类当中再定义一个内部类?

展开全部我被你的想62616964757a686964616fe78988e69d8331333363386664法震撼了,哈哈.亏你想的出来...这么弄代码不好理解,Java看起来醒目,也是Java中的一个规范!可以吗?必须可以..看代码演示...声明下,我也第一次,多次嵌套,看你想法后去试验下是可行的我用的两种办法!不多说看…

数据库分区分表以及读写分离

谈谈怎么实现Oracle数据库分区表 Oracle数据库分区是作为Oracle数据库性能优化的一种重要的手段和方法,做手头的项目以前,只聆听过分区的大名,感觉特神秘,看见某某高手在讨论会上夸夸其谈时,真是骂自己学艺不精&#x…

JSP Workshop

http://www.cnblogs.com/ITtangtang/p/4126395.html 发现http://www.tutorialspoint.com/里的资料很全也很不错啊! 资料:http://www.tutorialspoint.com/jsp/jsp_tutorial.pdf 另外,http://www.runoob.com/jsp/jsp-tutorial.html 中关于JSP…

h5页面不可 移动_H5营销|为什么H5适合于微信营销

随着互联网技术的不断发展,更新在移动互联网时代,网络营销也开始越来越新颖化,而微信H5就是其中的佼佼者。它的出现满足了用户视觉上的审美要求,并且可以使营销方式变得更加的美观整洁,那么这里就有一个问题。为什么微…

亚太地区数学建模优秀论文_数学建模美赛强势来袭!

01美赛,即美国大学生数学建模竞赛(MCM/ICM)又要来啦!赛题内容涉及经济、管理、环境、资源、生态、医学、安全、未来科技等众多领域。竞赛要求三人(本科生)为一组,在四天时间内,就指定的问题完成从建立模型、求解、验证到论文撰写的…

《软件调试》读书笔记:第13章 硬错误和蓝屏

会话管理器进程SMSS.exe是系统启动后的第一个用户态进程,负责启动和监护windows子系统进程:CSRSS.exe和登陆管理进程:WinLogonSMSS.exe从注册表中查询子系统exe文件的位置,并且启动它 CSRSS是windows子系统进程,自NT4以…

信息安全技术网络安全等级保护定级指南_行业标准 |报业网络安全等级保护定级参考指南V2.0发布,明确保护对象、定级要求...

近期,中国新闻技术工作者联合会正式发布《报业网络安全等级保护定级参考指南V2.0》。该指南由中国新闻技术工作者联合会组织网络安全领域的专家、报业技术专家以及业务专家经过多次调研、学习、探讨后,在原《报业网络安全等级保护定级参考指南V1.0》的基…

数学作图工具_科研论文作图系列-从PPT到AI (一)

导语:之前的推送中,小编给大家介绍过几款科研作图软件,包括统计分析软件Origin和Prism,图像处理软件ImageJ等等。从本期开始,小编将和大家一起继续学习科研论文作图。重点介绍图像的处理和排版,用到的工具主…