.NET Core 使用RabbitMQ

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

RabbitMQ安装

RabbitMQ安装,网上已经有许多教程了,这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。

1.首先安装erlang

rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

2.然后安装socat

yun install socat

3.最后安装RabbitMQ

rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm

RabbitMQ常用命令

启用Web控制台

rabbitmq-plugins enable rabbitmq_management

开启服务

systemctl start rabbitmq-server.service

停止服务

systemctl stop rabbitmq-server.service

查看服务状态

systemctl status rabbitmq-server.service

查看RabbitMQ状态

rabbitmqctl status

添加用户赋予管理员权限

rabbitmqctl  add_user  username  password
rabbitmqctl  set_user_tags  username  administrator

查看用户列表

rabbitmqctl list_users

删除用户

rabbitmqctl delete_user username

修改用户密码

rabbitmqctl oldPassword Username newPassword

访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。

.NET Core 使用RabbitMQ

定义生产者
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory {UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();//声明一个队列channel.QueueDeclare("hello", false, false, false, null);Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!");
string input;
do{input = Console.ReadLine();  
 var sendBytes = Encoding.UTF8.GetBytes(input);  
  //发布消息channel.BasicPublish("", "hello", null, sendBytes);} while (input.Trim().ToLower()!="exit"); channel.Close(); connection.Close();
定义消费者
            //创建连接工厂ConnectionFactory factory = new ConnectionFactory{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip};          
           //创建连接var connection = factory.CreateConnection();    
          //创建通道var channel = connection.CreateModel();          
          //事件基本消费者EventingBasicConsumer consumer = new EventingBasicConsumer(channel);        
          //接收到消息事件consumer.Received += (ch, ea) =>{                var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine($"收到消息: {message}");        
           //确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);};            //启动消费者 设置为手动应答消息channel.BasicConsume("hello", false, consumer);Console.WriteLine("消费者已启动");Console.ReadKey();channel.Dispose();connection.Close();
运行

启动了一个生产者,两个消费者,可以看见两个消费者都能收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

修改一下消费者的代码:

//接收到消息事件
consumer.Received += (ch, ea) => {  
 var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine($"收到消息: {message}");Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");Thread.Sleep(10000);    //确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);Console.WriteLine($"已发送回执[{ea.DeliveryTag}]"); };

演示:

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ提供了四种Exchange模式:direct,fanout,topic,header 。但是 header模式在实际使用中较少,所以这里只介绍前三种模式。

Exchange不是消费者关心的,所以消费者的代码完全不用变,用上面的消费者就行了。
由于避免文章过长,影响阅读,所以只贴了部分代码,但是demo里面是完整可运行的,详细代码请查看demo。

Direct Exchange

所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//定义一个Direct类型交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
//定义一个队列
channel.QueueDeclare(queueName, false, false, false, null);
//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null);

运行:

Fanout Exchange

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

static void Main(string[] args){   
 string exchangeName = "TestFanoutChange";  
  string queueName1 = "hello1";  
   string queueName2 = "hello2";  
     string routeKey = "";    //创建连接工厂ConnectionFactory factory = new ConnectionFactory{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip};    //创建连接var connection = factory.CreateConnection();    //创建通道var channel = connection.CreateModel();    //定义一个Direct类型交换机channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);    //定义队列1channel.QueueDeclare(queueName1, false, false, false, null);  
    //定义队列2channel.QueueDeclare(queueName2, false, false, false, null);  
     //将队列绑定到交换机channel.QueueBind(queueName1, exchangeName, routeKey, null);channel.QueueBind(queueName2, exchangeName, routeKey, null);  
    //生成两个队列的消费者ConsumerGenerator(queueName1);ConsumerGenerator(queueName2);Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");    string input;  
 do{input = Console.ReadLine();    
    var sendBytes = Encoding.UTF8.GetBytes(input);    
       //发布消息channel.BasicPublish(exchangeName, routeKey, null, sendBytes);} while (input.Trim().ToLower() != "exit");channel.Close();connection.Close(); }/// <summary>/// 根据队列名称生成消费者/// </summary>/// <param name="queueName"></param>static void ConsumerGenerator(string queueName){    //创建连接工厂ConnectionFactory factory = new ConnectionFactory{UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip};    //创建连接var connection = factory.CreateConnection();    //创建通道var channel = connection.CreateModel();    //事件基本消费者EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //接收到消息事件consumer.Received += (ch, ea) =>{        var message = Encoding.UTF8.GetString(ea.Body);Console.WriteLine($"Queue:{queueName}收到消息: {message}");        //确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);};    //启动消费者 设置为手动应答消息channel.BasicConsume(queueName, false, consumer);Console.WriteLine($"Queue:{queueName},消费者已启动"); }

运行:

Topic Exchange

所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上,

Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”,但是“XiaoChen.” 只会匹配到“XiaoChen.money”。

所以,Topic Exchange 使用非常灵活。

string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";
//创建连接工厂

ConnectionFactory factory = new ConnectionFactory {UserName = "admin",//用户名Password = "admin",//密码HostName = "192.168.157.130"//rabbitmq ip};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//定义一个Direct类型交换机channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
//定义队列1
channel.QueueDeclare(queueName, false, false, false, null);
//将队列绑定到交换机
channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");
string input;do{input = Console.ReadLine();    var sendBytes = Encoding.UTF8.GetBytes(input);    //发布消息channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);} while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();

运行

Demo下载:DotNetCore.RabbitMQ

相关文章: 

  • .net core 使用Redis的发布订阅

  • RabbitMQ知多少

  • RabbitMQ系列教程之四:路由(Routing)

  • RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

  • RabbitMQ系列教程之二:工作队列(Work Queues)

  • 如何优雅的使用RabbitMQ

  • .NET 使用 RabbitMQ 图文简介

  • RabbitMQ 高可用集群搭建及电商平台使用经验总结

原文地址:http://www.cnblogs.com/stulzq/p/7551819.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/323490.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nssl1157-简单数学题【约数,换元法】

正题 题目大意 给出N&#xff0c;求所有的T使得 N−12TN−T\frac{N-\frac12T}{N-T}N−TN−21​T​ 是正整数。 解题思路 我们定义xN−TxN-TxN−T 那么TN−xN−(N−T)TN-xN-(N-T)TN−xN−(N−T) 原来的式子边为 N−12(N−x)x\frac{N-\frac12(N-x)}{x}xN−21​(N−x)​ N−12N−…

冒泡法排序

#include<stdio.h> main(){int i,j,t,a[]{2,5,8,6,15,89,55,46};for(i0;i<7;i){for(j0;j<7-i;j){if(a[j]>a[j1]){ta[j1];a[j1]a[j];a[j]t;} }}for(i0;i<8;i){printf("%d ",a[i]);} } 刚开始还以为敲的有bug&#xff0c;数组里有五六位后排序就…

十分钟快速了解 ES6 Promise

转载自 十分钟快速了解 ES6 Promise 什么是Promise Promise最早由社区提出并实现&#xff0c;典型的一些库有Q&#xff0c;when&#xff0c; bluebird等&#xff1b;它们的出现是为了更好地解决JavaScript中异步编程的问题&#xff0c;传统的异步编程最大的特点就是地狱般的回…

揭示.NET Core和.NET Standard

作为.NET家族的最新成员&#xff0c;有很多关于.NET Core和.NET Standard的误解&#xff0c;以及它们于.NET Framework之间的区别。在这篇文章&#xff0c;我会准确的解释他们究竟是什么&#xff0c;并看看何时应选择哪一个。在详细介绍之前&#xff0c;首先查看.NET的结构图&a…

使用wxjava实现发表内容、预览信息以及推送文章

大家好&#xff0c;我是雄雄。 文章目录前言保存草稿的方法获取草稿箱列表根据media_id获取草稿箱信息给指定人发送预览文章推送文章&#xff08;按照标签推送&#xff09;前言 今天分享的内容有如下几个&#xff1a; 保存草稿根绝media_id会哦去草稿箱的信息发表内容&#x…

ssl提高组周六模拟赛【2018.9.22】

前言 这周居然考两道数学&#xff0c;反正是我的弱项。然后第一题考试的时候zz了竟然忘了怎么二分 成绩 只放Rank1∼10Rank\ 1\sim10Rank 1∼10 RankRankRankPersonPersonPersonScoreScoreScoreAAABBBCCC1112015hjw2015hjw2015hjw2102102101001001007070704040402222017mysel…

指针数组(三)

#include<stdio.h>void g(int *pArr,int len){pArr[2]88;pArr[4]88;}void f(){int a[]{1,2,3,5,9},i;g(a,5);for(i0;i<5;i){printf("%d\t",a[i]);}}main(){f();}利用指针改变数组里的内容和内容

腾讯面试题:一条SQL语句执行得很慢的原因有哪些?

最近的招聘季&#xff0c;腾讯有一道面试题&#xff1a;一条SQL语句执行得很慢的原因有哪些&#xff1f; 其实&#xff0c;这个问题可以涉及到 MySQL 的很多核心知识&#xff0c;可以扯出一大堆&#xff0c;就像要考你计算机网络的知识时&#xff0c;问你“输入URL回车之后&am…

spring cloud+dotnet core搭建微服务架构:服务发现(二)

前言上篇文章《手把手教你使用spring clouddotnet core搭建微服务架构&#xff1a;服务治理&#xff08;-&#xff09;》实际上只讲了服务治理中的服务注册&#xff0c;服务与服务之间如何调用呢&#xff1f;传统的方式&#xff0c;服务A调用服务B&#xff0c;那么服务A访问的是…

猿创征文|公众号开发之路——为了研究公众号,我注册了公司

大家好&#xff0c;我是雄雄。 内容先知&#x1f60a;1.前言&#x1f636;2.想法&#x1f913;3.注册及研究&#x1f4aa;4.注册公司现在是北京时间&#xff1a;2022年9月10日23:34&#xff0c;农历八月十五日&#xff0c;2022年的教师节&#xff0c;也是中秋节&#xff0c;祝大…

学习手记(2018.9.15~2018.11.10)——备战NOIP2018

分层图常见方法 二进制压缩 用二进制表示一些东西的状态 形态 就是用第几层表示第几个形态(如第几天这样的) 树不重合点对数量 取下面的更优。 换元法 求一个数时可以不一定要求它&#xff0c;可以通过求和他有关联的式子从而间接的得到他。 数学归纳法 先证明i0i0i0时&…

动态数组(四)

#include<stdio.h>void f(int *q){*q200;}main(){int *p(int *)malloc(sizeof(int));*p10;printf("%d\n",*p);f(p);printf("%d\n",*p);}动态数组应用&#xff1a;&#xff08;记得导入#include<malloc.h>&#xff09; #include<stdio.h>…

Docker 实战笔记

Docker的安装与配置CentOS前提条件目前&#xff0c;CentOS 仅在发行的版本中的内核支持DockerDocker 运行在CentOS 7 上要求系统为64位&#xff0c;系统内核版本在3.10以上。本次测试运行使用的是 centOS 7.2 64位的系统版本。使用代码 uname -r即可查看系统内核版本Docker软件…

JVM性能调优监控工具jps、jstack、jmap、jhat、jstat、hprof使用详解

转载自 JVM性能调优监控工具jps、jstack、jmap、jhat、jstat、hprof使用详解 现实企业级Java开发中&#xff0c;有时候我们会碰到下面这些问题&#xff1a; OutOfMemoryError&#xff0c;内存不足 内存泄露 线程死锁 锁争用&#xff08;Lock Contention&#xff09; Java进…

jeecg微服务中如何实现机构的概念(当前机构只能查到其子机构)

大家好&#xff0c;我是雄雄。 内容先知前言代码实现前言 以前&#xff0c;我们做的国家平台项目&#xff0c;都是采用现在开源的框架来做的&#xff0c;比如若依&#xff0c;bladex&#xff0c;相比之下&#xff0c;若依用的要稍多点儿。因为那时候刚开始接触vue&#xff0c;…

nssl1162-农夫约的假期【中位数,贪心】

正题 题目大意 n∗nn*nn∗n的矩阵&#xff0c;给若干个点&#xff0c;求一个位置让这些点的距离和这个位置的曼哈顿距离和最小。 解题思路 我们可以将x坐标和y坐标分开来处理。 这样就是一个线性问题了。 我们先定义这个位置为所有x的平均数&#xff0c;然后我们考虑将x左右移…

初学结构体(一)

#include<stdio.h>struct Student{int age;float score;char sex;};main(){struct Student st{25,88,M};printf("%d\n",st.age); struct Student *pst&st;pst->age100;printf("%d\n",st.age);printf("%d",pst->age); }构造结构…

.NET Conf 2017后初尝Xamarin Forms 3.0@Linux

对很多.NET粉&#xff0c;.NET Conf 2017的东西估计提前一个月都熟悉了&#xff0c;Xamarin粉估计最大惊喜不是Xamarin Live Player, 也不是Xamarin.Forms混合NativeControl &#xff0c;而是Xamarin.Forms 3.0&#xff0c;因为它融入到 Linux平台. Xamarin.Forms 跨平台&#…

Redis单例、主从模式、sentinel以及集群的配置方式及优缺点对比

转载自 Redis单例、主从模式、sentinel以及集群的配置方式及优缺点对比 redis作为一种高效的缓存框架&#xff0c;使用是非常广泛的&#xff0c;在数据存储上&#xff0c;在运行时其将数据存储在内存中&#xff0c;以实现数据的高效读写&#xff0c;并且根据定制的持久化规则…

微信小程序中使用画布canvas实现动态心电图绘制

大家好&#xff0c;我是雄雄。 内容先知前言效果图实现代码前言 近期&#xff0c;接了个项目&#xff0c;三端&#xff08;小程序、PC、公众号&#xff09;同步开发&#xff0c;PC端没的问题&#xff0c;以前一直做的就是PC端&#xff0c;但是小程序和公众号之前没有做过&…