RabbitMQ入门教程——发布/订阅

什么是发布订阅

发布订阅是一种设计模式定义了一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有的订阅者对象,使他们能够自动更新自己的状态。

为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受者(receiver)把日志输出到屏幕上。最终,日志消息被广播给所有的接受者(receivers)。

Exchanges

RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,生产者只能发送消息给到exchange,exchange比较简单,一边从生产者就收消息,一边把消息推送到队列中。exchange必须清楚的知道消息应该按照什么规则路由到对应的队列中,而具体使用那种路由算法是由exchange type决定的。AMQP协议提供了四种交换机类型:

Name(交换机类型)

Default pre-declared names(预声明的默认名称)

Direct exchange(直连交换机)

(Empty string) and amq.direct

Fanout exchange(扇型交换机)

amq.fanout

Topic exchange(主题交换机)

amq.topic

Headers exchange(头交换机)

amq.match (and amq.headers in RabbitMQ)

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

  • Name
  • Durability (消息代理重启后,交换机是否还存在)
  • Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
  • Arguments(依赖代理本身)

    交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

    本文中具体讲解下以下两种交换机:直连交换机(前面几个例子中使用的交换机类型),扇形交换机(本文中要使用的交换机类型)

    直连交换机

    直连交换机(direct exchange)可以使用消息携带的路由键(routing key)将消息投递给对应的队列中。用来处理消息的单播路由(unicast routing),也可以处理多播路由。

    那么它具体是如何工作的呢

    • 将一个队列绑定到某个交换机上,同时给该绑定指定一个路由键(routing key)
    • 当一个携带路由键为R的消息被发送到直连交换机时,交换机会把它路由给绑定值同样为R的队列。

    直连交换机经常用来循环分发任务给多个工作者,当这样做时,一定要明白,这时消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)中。

    直连交换机图例:

    扇形交换机

    扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key)。扇形交换机用来处理消息的广播路由(broadcast routing)。

    由于扇形交换机投递消息到所有绑定他的队列,以下几个场景比较适合使用扇形交换机:

    • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
    • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
    • 分发系统使用它来广播各种状态和配置更新
    • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)

    扇形交换机图例

    创建exchange

     

                        channel.ExchangeDeclare(exchange: "log_exchange", //exchange 名称

                            type: ExchangeType.Fanout, //exchange 类型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

    临时队列

    之前的几个示例中我们在为每一个声名的队列都指定了一个名字,因为我们希望consumer指向正确的队列。当我们希望在生产者和消费者之间共享队列时,为队列命名就非常的重要了。

    不过我们要实现的日志系统只是想要得到所有的消息,而且只对当前正在传递的消息感兴趣,并不关心队列的名称,所以为了满足我们的需求,要做两件事情:

    无论什么时间连接到RabbitMQ我们都需要一个新的空的队列。为了达到目的我们可以使用随机数创建队列,或让服务器给我们提供一个随机的名称。

    一旦消费者与RabbitMQ断开,消费者所接受的队列都应该被自动删除。

    创建临时队列

     

                        //创建一个未命名的新的消息队列,

                        QueueDeclareOk queue = channel.QueueDeclare(queue: "", //队列名称,为空时有系统自动分配

                            durable: false,

                            exclusive: false,

                            autoDelete: true,//自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

                            arguments: null);

                        //

                        //queue = channel.QueueDeclare();

     

    绑定

    我们已经创建了一个扇型交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换器和队列之间的联系我们称之为绑定(binding)

    创建交换机与队列的关系

     

    //扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key

                        //fanout exchange不需要指定routing key 指定了也没用

                        //通过绑定告诉exchange 需要发送消息到哪些消息队列

                        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);

     

    完整代码:

    生产者  Pub_SubProducer.cs

     

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    using RabbitMQ.Client;

     

    namespace RabbitMQProducer

    {

        public class Pub_SubProducer

        {

            const string EXCHANGE_NAME = "log_exchange";

            const string ROUTING_KEY = "";

     

            //直接发送消息到交换机

            public static void Publish()

            {

                var factory = new ConnectionFactory()

                {

                    HostName = "127.0.0.1"

                };

                using (var connection = factory.CreateConnection())

                {

                    using (IModel channel = connection.CreateModel())

                    {

                        channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名称

                            type: ExchangeType.Fanout, //exchange 类型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

                        Parallel.For(1, 100, item =>

                        {

                            string message = $"日志内容{DateTime.Now.ToString()}";

                            channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                            Console.WriteLine(message);

                        });

     

                        Console.WriteLine(" Press [enter] to exit.");

                        Console.ReadLine();

                    }

                }

            }

        }

    }

     

    消费者 Pub_SubConsumer.cs

     

    using RabbitMQ.Client;

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    using RabbitMQ.Client.Events;

    using System.IO;

     

    namespace RabbitMQConsumer

    {

        public class Pub_SubConsumer

        {

            const string EXCHANGE_NAME = "log_exchange";

            const string ROUTING_KEY = "";

            //输出到屏幕

            public static void Subscribe()

            {

                var factory = new ConnectionFactory()

                {

                    HostName = "127.0.0.1"

                };

                using (var connection = factory.CreateConnection())

                {

                    using (IModel channel = connection.CreateModel())

                    {

                        channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名称

                            type: ExchangeType.Fanout, //exchange 类型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

                        //创建一个未命名的新的消息队列,

                        QueueDeclareOk queue = channel.QueueDeclare(queue: "", //队列名称,为空时有系统自动分配

                            durable: false,

                            exclusive: false,

                            autoDelete: true,//自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

                            arguments: null);

                        //

                        //queue = channel.QueueDeclare();

     

                        string queueName = queue.QueueName;

                        //扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key

                        //fanout exchange不需要指定routing key 指定了也没用

                        //通过绑定告诉exchange 需要发送消息到哪些消息队列

                        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);

     

                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                        consumer.Received += (sender, args) =>

                        {

                            string message = Encoding.UTF8.GetString(args.Body);

                            Console.WriteLine(message);

                        };

     

                        channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);

     

                        Console.WriteLine(" Press [enter] to exit.");

                        Console.ReadLine();

                    }

                }

            }

     

            /// <summary>

            /// 输出到文件

            /// </summary>

            public static void SubscribeFile()

            {

                var factory = new ConnectionFactory()

                {

                    HostName = "127.0.0.1"

                };

                using (var connection = factory.CreateConnection())

                {

                    using (IModel channel = connection.CreateModel())

                    {

                        channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名称

                            type: ExchangeType.Fanout, //exchange 类型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

                        //创建一个未命名的新的消息队列,

                        QueueDeclareOk queue = channel.QueueDeclare(queue: "", //队列名称,为空时有系统自动分配

                            durable: false,

                            exclusive: false,

                            autoDelete: true,//自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

                            arguments: null);

                        //

                        //queue = channel.QueueDeclare();

     

                        string queueName = queue.QueueName;

                        //扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key

                        //fanout exchange不需要指定routing key 指定了也没用

                        //通过绑定告诉exchange 需要发送消息到哪些消息队列

                        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);

     

                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                        consumer.Received += (sender, args) =>

                        {

                            string message = Encoding.UTF8.GetString(args.Body);

     

                            //写入日志到txt文件

                            using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))

                            {

                                writer.WriteLine(message);

                                writer.Close();

                            }

     

                            Console.WriteLine(message);

                        };

     

                        channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);

     

                        Console.WriteLine(" Press [enter] to exit.");

                        Console.ReadLine();

                    }

                }

            }

        }

    }

     

    运行以上实例代码发现,每个订阅者实例 都能得到相同的内容。

转载于:https://www.cnblogs.com/AlvinLee/p/6150680.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/402547.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux C 学习---函数指针

我们经常会听到这样的说法&#xff0c;不懂得函数指针就不是真正的C语言高手。我们不管这句话对与否&#xff0c;但是它都从侧面反应出了函数指针的重要性&#xff0c;所以我们还是有必要掌握对函数指针的使用。先来看看函数指针的定义吧。 函数是由执行语句组成的指令序列或者…

CSS3与页面布局学习笔记(六)——CSS3新特性(阴影、动画、渐变、变形( transform)、透明、伪元素等)...

一、阴影 1.1、文字阴影 text-shadow<length>①&#xff1a; 第1个长度值用来设置对象的阴影水平偏移值。可以为负值 <length>②&#xff1a; 第2个长度值用来设置对象的阴影垂直偏移值。可以为负值 <length>③&#xff1a; 如果提供了第3个长度值则用来设置…

解决表字段使用关键字导致Mybatis Generator生成代码异常的解决方案

From: http://blog.itfsw.com/2017/05/23/jiejue-biao-ziduan-shiyong-guanjianzi-daozhi-mybatis-generator-shengcheng-daima-yichang-de-jiejue-fangan/ 在某个项目中遇到这么一个问题&#xff0c;因为原始表结构中某些字段定义使用了MySQL的关键字如match等&#xff0c;在…

Linux C编程---指针数组简析(二维数组、多级指针)

讲到指针和数组&#xff0c;先给大家看一道例题&#xff1a; 题目&#xff1a;填空练习&#xff08;指向指针的指针&#xff09; 1.程序分析&#xff1a;      2.程序源代码&#xff1a; main() { char *s[]{"man","woman","girl","bo…

20169210《Linux内核原理与分析》第十二周作业

Return-to-libc 攻击实验 缓冲区溢出的常用攻击方法是用 shellcode 的地址来覆盖漏洞程序的返回地址&#xff0c;使得漏洞程序去执行存放在栈中 shellcode。为了阻止这种类型的攻击&#xff0c;一些操作系统使得系统管理员具有使栈不可执行的能力。这样的话&#xff0c;一旦程序…

判断android图片是否硬解码(方法)

2019独角兽企业重金招聘Python工程师标准>>> 在oncreate方面的setContentView(R.layout.main); 前面&#xff0c;添加如下代码&#xff1a; getWindow().setFlags(WindowManager.LayoutParams.FLAG_HARDWARE_ACCELERATED, WindowManager.LayoutParams.FLAG_HAR…

Linux C 编程技巧--利用有限状态机模型编程

我们知道&#xff0c;一般编写程序时都要画出流程图&#xff0c;按照流程图结构来编程&#xff0c;如果编写一个比较繁琐&#xff0c;容易思维混乱的程序时&#xff0c;我们可以利用有限状态机模型画出一个状态转移图&#xff0c;这样便可以利用画出的逻辑图来编写程序&#xf…

linux远程登录三种方式telnet,ssh,vnc

linux远程连接三种方式telnet&#xff0c;ssh&#xff0c;vnctelnet和ssh服务只能实现基于字符界面的远程控制&#xff0c;如果要基于图形界面进行远程控制&#xff0c;可以借助免费的VNC来完成。一、telnet连接1.首先进入终端&#xff0c;查看是否安装了telnet服务。linux默认…

大数据之Yarn——Capacity调度器概念以及配置

试想一下&#xff0c;你现在所在的公司有一个hadoop的集群。但是A项目组经常做一些定时的BI报表&#xff0c;B项目组则经常使用一些软件做一些临时需求。那么他们肯定会遇到同时提交任务的场景&#xff0c;这个时候到底如何分配资源满足这两个任务呢&#xff1f;是先执行A的任务…

C/C++经典面试题

面试题1&#xff1a;变量的声明和定义有什么区别 为变量分配地址和存储空间的称为定义&#xff0c;不分配地址的称为声明。一个变量可以在多个地方声明&#xff0c;但只能在一个地方定义。加入extern修饰的是变量的声明&#xff0c;说明此变量将在文件以外或在文件后面部分定义…

Java跳出多重循环

From: https://www.cnblogs.com/fastfn/p/9777067.html 场景&#xff1a;很多的时候需要做到跳出多重循环&#xff0c;而在Java中虽然后goto关键字&#xff0c;但是是保留字&#xff0c;并没有启用。而在处理分支结构的if...else,switch...case,好像都达不到想要的效果。 作为…

java基础集合简介Map(三)下

From: https://www.cnblogs.com/douyu2580860/p/8358768.html --Map接口简介 今天来看一看map集合&#xff0c;map映射接口&#xff0c;用于存放键值对&#xff0c;<key,value>&#xff0c;通过key来查找value,顾名思义key不能为空&#xff0c;唯一且不重复&#xff0c;不…

从getmemery()函数看内存管理、函数传参等一系列问题

在C 面试题目中&#xff0c;会经常出现getmemery()函数的改错题&#xff0c;比如下面这道题&#xff0c; 例一&#xff1a;代码如下&#xff1a; [cpp] view plaincopy #include <stdio.h> char *getmemery() { char p[] "hello world!"; …

Java中array、List、Set互相转换

From: https://www.cnblogs.com/yysbolg/p/9977365.html 数组转List String[] staffs new String[]{"A", "B", "C"}; List staffsList Arrays.asList(staffs);//注意: Arrays.asList() 返回一个受指定数组决定的固定大小的列表。所以不能做 a…

Apache Shiro 使用手册(三)Shiro 授权

授权即访问控制&#xff0c;它将判断用户在应用程序中对资源是否拥有相应的访问权限。 如&#xff0c;判断一个用户有查看页面的权限&#xff0c;编辑数据的权限&#xff0c;拥有某一按钮的权限&#xff0c;以及是否拥有打印的权限等等。 一、授权的三要素授权有着三个核心元素…

UVa 10026 - Shoemaker's Problem

题目大意&#xff1a;鞋匠有n个任务&#xff0c;第i个任务要花费ti天&#xff0c;同时第i个任务每耽误一天要有fi的罚金。求完成所有任务的最小罚金。 虽然知道是贪心&#xff0c;可是并不确定如何作贪心选择&#xff0c;只好“取经”了...假如有两个任务i和j&#xff0c;先做i…

在VS2012中实现Ext JS的智能提示太简单了

Visual Studio 2012太强大了&#xff0c;居然能自己会去提取Ext JS的类的属性和方法&#xff0c;从而实现只能提示。下面就来介绍一下实现这个功能。在Visual Studio 2012中随便创建一个Web项目&#xff0c;我创建了一个空的Web项目&#xff0c;目录结构如下图所示&#xff1a;…

mybatis 查询之神坑

先看一个示例&#xff1a; 数据表数据&#xff1a; mybatis类和查询语句&#xff1a; 1. 当UserInfoMap中所有字段(包含association)都为NULL的话&#xff0c;getUserInfo的返回结果是个null&#xff0c;即使查询的记录存在&#xff01;运行结果如下&#xff1a; 2019-06-26 …

微软万圣节文件

为什么80%的码农都做不了架构师&#xff1f;>>> http://www.aka.org.cn/Docs/halloween/halloweenDoc.html 微软万圣节文件 圣节文件在微软以外被用作称呼一系列来源可靠的备忘录&#xff0c;内容是微软总部用来对付开源软件&#xff08;特别是Linux&#xff09;的…

linux C 学习 简单字符串逆序输出

看了下网上的字符串逆序输出&#xff0c;都相对复杂&#xff0c;下面给一个简单的字符串逆序输出小程序实现: [cpp] view plaincopy #include <stdio.h> #include <stdlib.h> #include <string.h> int main() { int i; int n; …