使用EasyNetQ组件操作RabbitMQ消息队列服务

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。EasyNetQ则是基于官方.NET组件RabbitMQ.Client 的又一层封装,使用起来更加方便。本篇随笔主要大概介绍下RabbitMQ的基础知识和环境的准备,以及使用EasyNetQ的相关开发调用。

1、RabbitMQ基础知识

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ的特点强大的应用程序消息传递;使用方便;运行在所有主要操作系统上;支持大量开发人员平台;开源商业支持。消息队列的模式有两种模式:P2P(Point to Point),P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。Publish/Subscribe(Pub/Sub),包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

 EasyNetQ 的目标是提供一个使.NET中的RabbitMQ尽可能简单的库。在EasyNetQ中消息应由.NET类型表示,消息应通过其.NET类型进行路由。EasyNetQ按消息类型进行路由。发布消息时,EasyNetQ会检查其类型,并根据类型名称,命名空间和装配体给出一个路由密钥。在消费方面,用户订阅类型。订阅类型后,该类型的消息将路由到订户。默认情况下,EasyNetQ使用Newtonsoft.Json库将.NET类型序列化为JSON。这具有消息是人类可读的优点,因此您可以使用RabbitMQ管理应用程序等工具来调试消息问题。

   EasyNetQ是在RabbitMQ.Client库之上提供服务的组件集合。这些操作可以像序列化,错误处理,线程编组,连接管理等。它们由mini-IoC容器组成。您可以轻松地用自己的实现替换任何组件。因此,如果您希望XML序列化而不是内置的JSON,只需编写一个ISerializer的实现并将其注册到容器。以下是官方提供的一个结构图,这个结构图可以很好的解析该组件的结构:

 

2、RabbitMQ的环境准备

本处主要介绍在Windows系统中安装RabbitMQ。

 1. 下载安装erlang 

      下载地址 http://www.erlang.org/downloads(根据操作系统选择32还64位)  

  2. 下载安装rabbitmq-server

     下载地址 http://www.rabbitmq.com/install-windows.html

下载后获得两个安装文件,按照顺序安装即可

 安装erlang环境后,一般会添加了一个ERLANG_HOME的系统变量,指向erlang的安装目录路径,如下所示(一般都添加了,确认下

 

安装RabbitMQ后,在程序里面可以看到

 我们使用它的命令行来启动RabbitMQ的服务

查看安装是否成功命令 :rabbitmqctl status

安装成功,在浏览器中输入 http://127.0.0.1:15672/,可以看到如下界面,使用默认的账号密码均为guest登陆进行管理

 guest 账号是管理员账号,可以添加Exchanges,Queues,Admin。但我们一般不使用guest账号,可以选择用命令来添加账号和权限,也可以使用管理界面进行添加相应的内容。

例如我添加相应的用户账号

一般我们还需要添加虚拟机,默认的虚拟机为/,我这里添加了一个虚拟机myvhost。

然后绑定账号到虚拟机上即可。

 

 3、EasyNetQ组件的使用

EasyNetQ组件的使用方式比较简单,跟很多组件都类似,例如:建立连接,进行操作做等等,对于EasyNetQ组件也是如此。

在.NET中使用EasyNetQ,要求至少基于 .NET4.5的框架基础上进行开发,可以直接在VS项目上使用NuGet的程序包进行添加EasyNetQ的引用。

一般添加引用后,至少包含了下面图示的几个引用DLL。

 

  1)创建连接:

使用EasyNetQ连接RabbitMQ,是在应用程序启动时创建一个IBus对象,并且,在应用程序关闭时释放该对象。

RabbitMQ连接是基于IBus接口的,当IBus中的方法被调用,连接才会开启。创建一个IBus对象的方法如下:

var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=admin;password=123456”);

与RabbitMQ服务器的延迟连接由IBus接口表示,创建连接的方式连接字符串由格式为key = value的键/值对组成,每一个用分号(;)分隔。

  • host,host=localhost 或者host =192.168.1.102或者host=my.rabbitmq.com,如果用到集群配置的话,那么可以用逗号将服务地址隔开,例如host=a.com,b.com,c.com
  • virtualHost,虚拟主机,默认为'/'
  • username,用户登录名
  • password,用户登录密码
  • requestedHeartbeat,心跳设置,默认是10秒
  • prefetchcount,默认是50
  • pubisherConfirms,默认为false
  • persistentMessages,消息持久化,默认为true
  • product,产品名
  • platform,平台
  • timeout,默认为10秒

一般我们在代码里面测试的话,简化连接代码如下所示。

 //初始化bus对象bus = RabbitHutch.CreateBus("host=localhost");

 

   2关闭连接:

bus.Dispose();

   要关闭连接,只需简单地处理总线,这将关闭EasyNetQ使用的连接,渠道,消费者和所有其他资源。

如果我们在Winform窗体里面初始化一个IBus对象,那么在窗体关闭的时候,关闭这个接口即可。

        private void FrmPublisher_FormClosed(object sender, FormClosedEventArgs e){//关闭IBus接口if(bus != null){bus.Dispose();}}

 

   3发布消息:

EasyNetQ支持最简单的消息模式是发布和订阅。发布消息后,任意消费者可以订阅该消息,也可以多个消费者订阅。并且不需要额外配置。首先,如上文中需要先创建一个IBus对象,然后,在创建一个可序列化的.NET对象。调用Publish方法即可。

var message = new MyMessage { Text = "Hello Rabbit" };
bus.Publish(message);

 

 4订阅消息:

EasyNetQ提供了消息订阅,当调用Subscribe方法时候,EasyNetQ会创建一个用于接收消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id.代码如下:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

第一个参数是订阅id,另外一个是delegate参数,用于处理接收到的消息。这里要注意的是,subscribe_id参数很重要,假如开发者用同一个subscribeid订阅了同一种消息类型两次或者多次,RabbitMQ会以轮训的方式给每个订阅的队列发送消息。接收到之后,其他队列就接收不到该消息。如果用不同的subscribeid订阅同一种消息类型,那么生成的每一个队列都会收到该消息。

需要注意的是,在收到消息处理消息时候,不要占用太多的时间,会影响消息的处理效率,所以,遇到占用长时间的处理方法,最好用异步处理。

为了测试发布和订阅消息,我们可以建立几个不同的项目来进行测试,如发布放在一个Winform项目,订阅放在一个Winform项目,另外一个项目放置共享的消息对象定义,如下所示。

定义消息对象类如下所示。

    /// <summary>/// 定义的MQ消息类型/// </summary>public class TextMessage{public string Text { get; set; }}

然后在发布消息的Winform项目上创建一个处理的窗体,并添加如下代码。

namespace MyRabbitMQ.Publisher
{/// <summary>/// 测试RabbitMQ消息队列的发布/// </summary>public partial class FrmPublisher : DevExpress.XtraEditors.XtraForm{//构建一个IBus公用接口对象private IBus bus = null;public FrmPublisher(){InitializeComponent();//初始化bus对象bus = RabbitHutch.CreateBus("host=localhost");//对指定消息类型进行回应bus.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to: "+ request.Text});//收到消息后输出到控制台上显示bus.Receive("my.queue", x => x.Add<MyMessage>(message => Console.WriteLine(message.ToJson())).Add<MyOtherMessage>(message => Console.WriteLine(message.ToJson())));}

发布消息的处理代码,如下代码所示。

        private void btnSend_Click(object sender, EventArgs e){if (bus != null){bus.Publish(new TextMessage{Text = this.txtContent.Text});}}

然后在创建一个类似窗体,用来订阅消息的处理窗体,如下所示代码和窗体。

namespace MyRabbitMQ.Subcriber
{   /// <summary>/// 测试RabbitMQ消息队列的订阅/// </summary>public partial class FrmSubcriber : DevExpress.XtraEditors.XtraForm{//构建一个IBus公用接口对象private IBus bus = null;public FrmSubcriber(){InitializeComponent();//初始化bus对象bus = RabbitHutch.CreateBus("host=localhost");if(bus != null){//订阅一个消息,并对接收到的消息进行处理,展示在控件上bus.Subscribe<TextMessage>("test", (msg) =>{StringBuilder sb = new StringBuilder();sb.AppendLine(msg.Text + "," + DateTime.Now.ToString());sb.AppendLine(this.txtContent.Text);this.txtContent.Invoke(new MethodInvoker(delegate(){this.txtContent.Text = sb.ToString();}));});}//使用消息发送接口发送消息bus.Send("my.queue", new MyMessage { Text = "Hello Widgets!" });bus.Send("my.queue", new MyOtherMessage { Text = "Hello wuhuacong!" });}

发送请求获取响应的代码如下所示。

        private void btnRequest_Click(object sender, EventArgs e){//定义请求消息的对象var request = new MyRequest(){Text = string.Format("请求消息,{0}", DateTime.Now)};//异步获取请求消息的结果并进行处理,展示应答消息在窗体中的var task = bus.RequestAsync<MyRequest, MyResponse>(request);task.ContinueWith(response =>{StringBuilder sb = new StringBuilder();sb.AppendLine(response.Result.Text);sb.AppendLine(this.txtContent.Text);this.txtContent.Invoke(new MethodInvoker(delegate(){this.txtContent.Text = sb.ToString();}));});}

 

两个项目联合进行测试如下界面所示。

 

发布者多次发送消息的情况下,订阅者中,会进行消息的轮训处理,也就是进行均匀分配。

 

  5)消息发送(Send)和接收(Receive)

与Publish/Subscribe略有不同的是,Send/Receive 可以自己定义队列名称。

//发送端代码
bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });//接收端代码
bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));

并且,也可以在同一个队列上发送不同的消息类型,Receive方法可以这么写:

bus.Receive("my.queue", x => x.Add<MyMessage>(message => deliveredMyMessage = message).Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));

如果消息到达队列,但是没有发现相应消息类型的处理时,EasyNetQ会发送一条消息到error队列,并且,带上一个异常信息:No handler found for message type <message type>。与Subscribe类型,如果在同一个队列,同一个消息类型,多次调用Receive方法时,消息会通过轮询的形式发送给每个Receive端。

 

   6)远程过程调用:

var request = new TestRequestMessage {Text = "Hello from the client! "};
bus.Request<TestRequestMessage, TestResponseMessage>(request, response => Console.WriteLine("Got response: '{0}'", response.Text));

   7RPC服务器:

bus.Respond<TestRequestMessage, TestResponseMessage>(request => new TestResponseMessage{ Text = request.Text + " all done!" });

   8记录器:

var logger = new MyLogger() ;
var bus = RabbitHutch.CreateBus(“my connection string”, x => x.Register<IEasyNetQLogger>(_ => logger));

   9路由:

Publish方法,可以加一个topic参数。

bus.Publish(message, "X.A");

 消息订阅方可以通过路由来过滤相应的消息。

  * 匹配一个字符

  #匹配0个或者多个字符

  所以 X.A.2 会匹配到 "#", "X.#", "*.A.*" 但不会匹配 "X.B.*" 或者 "A". 当消息订阅需要用到topic时候,需要调用Subscribe的重载方法

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));

上述这种方式,会将消息轮询发送给两个订阅者,如果只需要一个订阅者的话,可以这么调用:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));

RabbitMQ具有非常好的功能,基于主题的路由,允许订阅者基于多个标准过滤消息。*(星号)匹配一个字。#(哈希)匹配为零个或多个单词。

 RabbitMQ的应用场景,一般在快速处理订单,以及异步的多任务处理中可以得到很好的体现,下面是几个应用场景。

邮件和短消息的处理

订单的解耦处理

RabbitMQ的服务器架构

 

3、RabbitMQ查询状态出现错误的处理

安装成功之后使用rabbitmqctl status命令之后出现如下错误。

Status of node rabbit@WUHUACONG ...
Error: unable to perform an operation on node 'rabbit@WUHUACONG'. Please see diagnostics information and suggestions below.Most common reasons for this are:* Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)* CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)* Target node is not runningIn addition to the diagnostics info below:* See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more* Consult server logs on node rabbit@WUHUACONGDIAGNOSTICS
===========attempted to contact: [rabbit@WUHUACONG]rabbit@WUHUACONG:* connected to epmd (port 4369) on WUHUACONG* epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic* TCP connection succeeded but Erlang distribution failed* Authentication failed (rejected by the remote node), please check the Erlang cookieCurrent node details:* node name: rabbitmqcli100@WUHUACONG* effective user's home directory: C:\Users\Administrator* Erlang cookie hash: RgaUM2coc+rxIhJrfLS7Jw==

这个问题出现比较常见,主要原因是两个目录的.erlang.cookie文件内容不一样。

要确保.erlang.cookie文件的一致性,不知道什么原因导致了C:\Users\{UserName}\.erlang.cookie和默认情况下C:\WINDOWS\System32\config\systemprofile\.erlang.cookie不一致了,将Windows目录下的拷贝到用户目录下就可以了。

反正无论如何,两个地址的Cookie内容一致就可以了,然后重启下RabbitMQ服务器即可正常运行,并可以正常获取它的状态信息。

 

转载于:https://www.cnblogs.com/wuhuacong/p/8927096.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/251877.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

context-param和init-param的区别

http://www.cnblogs.com/hzj-/articles/1689836.html 转载于:https://www.cnblogs.com/wangc04/p/7501054.html

TensorFlow 1.12.2 发布,修复 GIF 构造安全漏洞

开发四年只会写业务代码&#xff0c;分布式高并发都不会还做程序员&#xff1f; TensorFlow 1.12.2 发布了&#xff0c;此处本修复了一个潜在的安全漏洞&#xff1a; 精心设计的 GIF 图像可以在解码过程中产生空指针解引用更新说明&#xff1a; https://github.com/tensorflo…

【教程】如何在标签打印工具TFORMer Designer中自定义布局?

TEC-IT的在线标签生成器TFORMer Designer提供标签打印服务&#xff0c;并提供即用型行业标签模板作为Web服务。使用此软件&#xff0c;您可以在几秒钟内创建您自己的标签和表格或在工业和物流业中使用即时可用的模板。TFORMer Designer的最新更新现在允许使用自定义标签布局。 …

对象变为指定格式的数组

拿到的对象的格式&#xff08;一个对象里面都好多属性&#xff09; 想要转换成的数据格式&#xff08;一个数组里面有好多个对象&#xff0c;每个对象有一个id和name的属性&#xff09; 如何处理的 selectionChange(val) { // 列表选择var dynamicTags1 [];var arr[]for(var i…

bootstrapValidator remote 验证问题

1 加载jQuery和bootstrap.min.js 后引入bootstrapValidator.min.js字段验证之remote 远程验证(类似ajax验证)&#xff0c;返回值必须是 {"valid":true}{"valid":false} true表示 验证通过 false 表示验证不通过。 当添加remote 验证后&#xff0c;验证通过…

世界顶级的程序员们告诉你:这些书都是你应该读的

在很早之前就想整理一份来自经验丰富的顶级程序员推荐阅读的书籍清单&#xff0c;全栈工程师Dmitry Shvetsov整理了Bob叔以及Jeff Atwood and DHH等世界知名程序员曾经在博客中推荐过的书单&#xff0c;下面我们就一起来看看深受大神们青睐的书籍都是哪些?世界顶级的程序员们告…

《20170911-构建之法:现代软件工程-阅读笔记》

第一章&#xff1a; 介绍软件工程和软件的关系&#xff0c;软件程序软件工程。 软件工程是把系统的、有序的、可量化的方法应用到软件的开发、运营和维护上的过程。 计算机科学这一学术领域可以分为以下这些偏理论的领域&#xff1a; 1.计算机理论 2.信息和编码理论 3.算法和数…

mysql学习(2)索引的本质

2019独角兽企业重金招聘Python工程师标准>>> 问题&#xff1a;SQL查询慢怎么办&#xff1f; 优化手段&#xff0c;加索引。 索引是帮助MYSQL高效的获取数据的排好序的数据结构。 问题&#xff1a;索引结构为什么使用Btree而不使用二叉树&#xff0c;红黑树或者HASH结…

bzoj4245: [ONTAK2015]OR-XOR

一道很有意思的题目。 先求一次前缀和&#xff0c;可以发现答案是 (sum[0] xor sum[x1])or(sum[x1] xor sum[x2])or(sum[x2] xor sum[x3])or……or(sum[m-1] xor sum[n]) 然后其实&#xff08;a xor b&#xff09;or b a or b 那么sum[0]0,可以把柿子变成 sum[x1] or sum[x2] o…

移动端常见的一些兼容性问题

1、安卓浏览器看背景图片&#xff0c;有些设备会模糊。 是devicePixelRatio作怪&#xff0c;因为手机分辨率太小&#xff0c;如果按照分辨率来显示网页&#xff0c;这样字会非常小&#xff0c;所以苹果当初就把iPhone 4的960*640分辨率&#xff0c;在网页里只显示了480*320&…

go-变量

这次我们学习一下golang语言 gitee: go-study 定义 定义的变量或者函数必须用到(pakeage内的全局除外) var a int // 默认为0 var b string //默认为"" fmt.Printf("%d %q\n",a, s) 复制代码直接定义可以不写类型(int..)go会自行判断 var a, b 3, 4 var …

CSS3:CSS3 文本效果

ylbtech-CSS3&#xff1a;CSS3 文本效果1.返回顶部 1、CSS3 文本效果 CSS3 文本效果 CSS3中包含几个新的文本特征。 在本章中您将了解以下文本属性&#xff1a; text-shadowbox-shadowtext-overflowword-wrapword-break浏览器支持 属性 text-shadow4.010.03.54.09.5box-sha…

洛谷 P2296 寻找道路

题目描述 在有向图G 中&#xff0c;每条边的长度均为1 &#xff0c;现给定起点和终点&#xff0c;请你在图中找一条从起点到终点的路径&#xff0c;该路径满足以下条件&#xff1a; 1 &#xff0e;路径上的所有点的出边所指向的点都直接或间接与终点连通。 2 &#xff0e;在满足…

Feature Preprocessing on Kaggle

刚入手data science, 想着自己玩一玩kaggle&#xff0c;玩了新手Titanic和House Price的 项目, 觉得基本的baseline还是可以写出来&#xff0c;但是具体到一些细节&#xff0c;以至于到能拿到的出手的成绩还是需要理论分析的。 本文旨在介绍kaggle比赛到各种原理与技巧&#xf…

十天冲刺-04

昨天&#xff1a;完成了日历界面的部署&#xff0c;并且能够获取到选中的日期 今天&#xff1a;完成根据日期查找消费记录功能 问题&#xff1a;日历界面占用屏幕太多&#xff0c;后期会进行调整转载于:https://www.cnblogs.com/liujinxin123/p/10760254.html

构建Spring Boot程序有用的文章

构建Spring Boot程序有用的文章&#xff1a; http://www.jb51.net/article/111546.htm转载于:https://www.cnblogs.com/xiandedanteng/p/7508334.html

如果您遇到文件或数据库问题,如何重置Joomla

2019独角兽企业重金招聘Python工程师标准>>> 如果您遇到Joomla站点的问题&#xff0c;那么重新安装其核心文件和数据库可能是最佳解决方案。 了解问题 这种方法无法解决您的所有问题。但它主要适用于由Joomla核心引起的问题。 运行Joomla核心更新后&#xff0c;这些…

数组初始化 和 vector初始化

int result[256] {0}; 整个数组都初始化为0 vector<int> B(length,1); 整个vector初始化为1 如果你定义的vector是这样定义的&#xff1a; vector<int> B; 去初始化&#xff0c;千万不要用&#xff1a; for(int i 0;i < length;i)B[i] 1; 这样会数组越界&…

Genymotion模拟器拖入文件报An error occured while deploying the file的错误

今天需要用到资源文件&#xff0c;需要将资源文件拖拽到sd卡中&#xff0c;但老是出现这个问题&#xff1a; 资源文件拖不进去genymotion。查看了sd的DownLoad目录&#xff0c;确实没有成功拖拽进去。 遇到这种问题的&#xff0c;我按下面的思路排查问题&#xff1a; Genymotio…

激光炸弹(BZOJ1218)

激光炸弹&#xff08;BZOJ1218&#xff09; 一种新型的激光炸弹&#xff0c;可以摧毁一个边长为R的正方形内的所有的目标。现在地图上有n(N<10000)个目标&#xff0c;用整数Xi,Yi(其值在[0,5000])表示目标在地图上的位置&#xff0c;每个目标都有一个价值。激光炸弹的投放是…