如何优雅的使用RabbitMQ

RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景大概有3种:

1、系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。

2、当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。

3、系统的高可用性,比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。

一、开始使用RabbitMQ

RabbitMQ官网提供了详细的安装步骤,另外官网还提供了RabbitMQ在六种场景的使用教程。其中教程1、3、6将覆盖99%的使用场景,所以正常来说只需要搞清楚这3个教程即可快速上手。

二、简单分析

我们以官方提供的教程1做个简单梳理:该教程展示了Producer如何向一个消息队列(message queue)发送一个消息(message),消息消费者(Consumer)收到该消息后消费该消息。

1、producer端:


var factory = new ConnectionFactory() { HostName = "localhost" };
  using ( var connection = factory.CreateConnection())
  {
      while (Console.ReadLine() != null )
      {
          using ( var channel = connection.CreateModel())
          {
              //创建一个名叫"hello"的消息队列
              channel.QueueDeclare(queue: "hello" ,
                  durable: false ,
                  exclusive: false ,
                  autoDelete: false ,
                  arguments: null );
              var message = "Hello World!" ;
              var body = Encoding.UTF8.GetBytes(message);
              //向该消息队列发送消息message
              channel.BasicPublish(exchange: "" ,
                  routingKey: "hello" ,
                  basicProperties: null ,
                  body: body);
              Console.WriteLine( " [x] Sent {0}" , message);
          }
      }
  }

该段代码非常简单,几乎到了无法精简的地步:创建了一个信道(channel)->创建一个队列->向该队列发送消息。

2、Consumer端


var factory = new ConnectionFactory() { HostName = "localhost" };
  using ( var connection = factory.CreateConnection())
  {
      using ( var channel = connection.CreateModel())
      {
          //创建一个名为"hello"的队列,防止producer端没有创建该队列
          channel.QueueDeclare(queue: "hello" ,
                               durable: false ,
                               exclusive: false ,
                               autoDelete: false ,
                               arguments: null );
          //回调,当consumer收到消息后会执行该函数
          var consumer = new EventingBasicConsumer(channel);
          consumer.Received += (model, ea) =>
          {
              var body = ea.Body;
              var message = Encoding.UTF8.GetString(body);
              Console.WriteLine( " [x] Received {0}" , message);
          };
          //消费队列"hello"中的消息
          channel.BasicConsume(queue: "hello" ,
                               noAck: true ,
                               consumer: consumer);
          Console.WriteLine( " Press [enter] to exit." );
          Console.ReadLine();
      }
  }

该段代码可以理解为:创建信道->创建队列->定义回调函数->消费消息。

该实例描述了Send/Receive模式,可以简单理解为1(producer) VS 1(consumer)的场景;

实例3则描述了Publish/Subscriber模式,即1(producer) VS 多个(consumer);

在以上两个示例中,producer只需要发送消息即可,并不关心consumer的返回结果。实例6则描述了一个RPC调用场景,producer发送消息后还要接收consumer的返回结果,这一场景看起来跟使用消息队列的目的有点相悖。因为使用消息队列的目的之一就是要异步,但是这一场景似乎又将异步变成了同步,不过这一场景也很有用,比如一个用户操作产生了一个消息,应用服务收到该消息后执行了一些逻辑并使得数据库发生了变化,UI会一直等待应用服务的返回结果才刷新页面。

三、 发现抽象

我桌子上放着一本RabbitMQ in Action,另外官网提供的文档也很详细,我感觉在一个月内我就能精通RabbitMQ,到时候简历上又可以写上“精通…”,感觉有点小得意呢... ,但是我知道这并不是使用RabbitMQ的最佳方式。

我们知道合理的抽象可以帮我们隐藏掉一些技术细节,让我们将重心放在核心业务上,比如一个人问你:“大雁塔如何走?”你的回答可能是“小寨往东,一直走两站,右手边”,如果你回答:“右转45度,向前走100米,再转90度…”,对方就会迷失在这些细节中。

消息队列的使用过程中实际隐藏着一种抽象——服务总线(Service Bus)。

我们在回头看第一个例子,这个例子隐含的业务是:ClientA发送一个指令,ClientB收到该指令后做出反应。如果是这样,我们为什么要关心如何创建channel,如何创建一个queue? 我仅仅是要发送一个消息而已。另外这个例子写的其实不够健壮:

没有重试机制:如果ClientB第一次没有执行成功如何对该消息处理?

没有错误处理机制:如果ClientB在重试了N次之后还是异常如何处理该消息?

没有熔断机制;

如何对ClientA做一个schedule(计划安排),比如定时发送等;

没有消息审计机制;

无法对消息的各个状态做追踪;

事物处理等。

服务总线正是这种场景的抽象,并且为我们提供了这些机制,让我们赶快来看个究竟吧。

四、初识MassTransit

MassTransit是.NET平台下的一款开源免费的ESB产品,官网:http://masstransit-project.com/,GitHub 700 star,500 Fork,类似的产品还有NServiceBus,之所以要选用MassTransit是因为他要比NServiceBus轻量级,另外在MassTransit开发之初就选用了RabbitMQ作为消息传输组建;同时我想拿他跟NServiceBus做个比较,看看他们到底有哪些侧重点。

1、新建控制台应用程序:Masstransit.RabbitMQ.GreetingClient

使用MassTransit可以从Nuget中安装:

?

1
Install-Package MassTransit.RabbitMQ

2、创建服务总线,发送一个命令


static void Main( string [] args)
{
     Console.WriteLine( "Press 'Enter' to send a message.To exit, Ctrl + C" );
     var bus = BusCreator.CreateBus();
     var sendToUri = new Uri($ "{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}" );
     while (Console.ReadLine()!= null )
     {
         Task.Run(() => SendCommand(bus, sendToUri)).Wait();
     }
     Console.ReadLine();
}
private static async void SendCommand(IBusControl bus,Uri sendToUri)
{
     var endPoint =await bus.GetSendEndpoint(sendToUri);
     var command = new GreetingCommand()
     {
         Id = Guid.NewGuid(),
         DateTime = DateTime.Now
     };
     await endPoint.Send(command);
     Console.WriteLine($ "send command:id={command.Id},{command.DateTime}" );
}

这一段代码隐藏了众多关于消息队列的细节,将我们的注意力集中在发送消息上,同时ServiceBus提供的API也更接近业务,我们虽然发送的是一个消息,但是在这种场景下体现出来是一个命令,Send(command)这一API描述了我们的意图。

3、服务端接收这一命令

新建一个命令台控制程序:Masstransit.RabbitMQ.GreetingServer


var bus = BusCreator.CreateBus((cfg, host) =>
{
     cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>
     {
         e.Consumer<GreetingConsumer>();
     });
});

这一代码可以理解为服务端在监听消息,我们在服务端注册了一个名为“GreetingConsumer”的消费者,GreetingConsumer的定义:


public class GreetingConsumer :IConsumer<GreetingCommand>
{
     public async Task Consume(ConsumeContext<GreetingCommand> context)
     {
         await Console.Out.WriteLineAsync($ "receive greeting commmand: {context.Message.Id},{context.Message.DateTime}" );
     }
}

该consumer可以消费类型为GreetingCommand的消息。这一实例几乎隐藏了有关RabbitMQ的技术细节,将代码中心放在了业务中,将这两个控制台应用跑起来试试:

五、实现Publish/Subscribe模式

发布/订阅模式使得基于消息传递的软件架构成为可能,这一能力表现为ClientA发送消息X,ClientB和ClientC都可以订阅消息X。

1、我们在上面的例子中改造一下,当GreetingConsumer收到GreetingCommand后发送一个GreetingEvent:


var greetingEvent = new GreetingEvent()
  {
      Id = context.Message.Id,
      DateTime = DateTime.Now
  };
  await context.Publish(greetingEvent);

2、新建控制台程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用来订阅GreetingEvent消息:

?


var bus = BusCreator.CreateBus((cfg, host) =>
  {
      cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>
      {
          e.Consumer<GreetingEventConsumer>();
      });
  });
  bus.Start();

定义GreetingEventConsumer:


public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>
  {
      public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)
      {
          await Console.Out.WriteLineAsync($ "receive greeting event: id {context.Message.Id}" );
      }
  }

这一代码跟Masstransit.RabbitMQ.GreetingServer接受一个命令几乎一模一样,唯一的区别在于:

在Send/Receive模式中Client首先要获得对方(Server)的终结点(endpoint),直接向该终结点发送命令。Server方监听自己的终结点并消费命令。

而Publish/Subscribe模式中Client publish一个事件,SubscriberA在自己的终结点(endpointA)监听事件,SubscriberB在自己的终结点(endpointB)监听事件。

3、根据上面的分析再定义一个Masstransit.RabbitMQ.GreetingEvent.SubscriberB

4、将4个控制台应用程序跑起来看看

六、实现RPC模式

这一模式在Masstransit中被称作Request/Response模式,通过IRequestClient<IRequest, IResponse> 接口来实现相关操作。一个相关的例子在官方的github。

 

结束语:本篇文章分析了如何使用Masstransit来抽象业务,避免直接使用具体的消息队列,当然本文提到的众多服务总线机制,如“重试、熔断等”并没有在该文中出现,需要大家进一步去了解该项目。

通过对Masstransit的一些试用和NServiceBus的对比,Masstransit在实际项目中很容易上手并且免费,各种API定义的也非常清晰,但是官方的文档有点过于简单,实际使用中还需要去做深入的研究。作为.NET平台下为数不多的ESB开源产品,其关注程度还是不够,期待大家为开源项目做出贡献。

相关文章: 

  • .Net使用RabbitMQ详解

  • 体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景

  • RabbitMQ消息队列应用

  • RabbitMQ 高可用集群搭建及电商平台使用经验总结

  • .NET 使用 RabbitMQ 图文简介

原文地址:http://www.cnblogs.com/richieyang/p/5492432.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/327123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hutool中身份证工具-IdcardUtil

JAVA工具例大全--根据身份编号获取户籍省份 发布于 2020-10-10 | 后端技术 | 浏览&#xff08;61&#xff09; | 评论&#xff08;0&#xff09;开场语(刷新后不一样):人生若只如初见&#xff0c;何事秋风悲画扇。作为一名IT人&#xff0c;你当然也想有自己一片天地&…

[Asp.Net Core轻量级Aop解决方案]AspectCore Project 介绍

AspectCore Project 介绍 什么是AspectCore Project ? AspectCore Project 是适用于Asp.Net Core 平台的轻量级 Aop(Aspect-oriented programming) 解决方案&#xff0c;它更好的遵循Asp.Net Core的模块化开发理念&#xff0c;使用AspectCore可以更容易构建低耦合、易扩展的We…

放松眼球的网站

http://www.spielzeugz.de/html5/liquid-particles-3D/

异步广度优先搜索算法

为什么要异步&#xff1f; CPU的工艺越来越小&#xff0c;Cannon Lake架构的Intel CPU已经达到10nm技术&#xff0c;因此在面积不变的情况下&#xff0c;核心数可以明显提升。单纯的提升主频将造成发热量大、需要的电压大、功耗大的问题。而传统的算法与数据结构是针对单核心单…

开箱即用 - jwt 无状态分布式授权

基于JWT(Json Web Token)的授权方式 JWT 是JSON风格轻量级的授权和身份认证规范&#xff0c;可实现无状态、分布式的Web应用授权&#xff1b; 从客户端请求服务器获取token&#xff0c; 用该token 去访问实现了jwt认证的web服务器。 token 可保存自定义信息&#xff0c;如用户基…

Java类加载器总结

转载自 Java类加载器总结 1.类的加载过程 JVM将类加载过程分为三个步骤&#xff1a;装载&#xff08;Load&#xff09;&#xff0c;链接&#xff08;Link&#xff09;和初始化(Initialize)链接又分为三个步骤&#xff0c;如下图所示&#xff1a; 1) 装载&#xff1a;查找并…

MyBatis-Plus EntityWrapper的使用 wrapper le ge

https://blog.csdn.net/shujuelin/article/details/99568651 MyBatis-Plus EntityWrapper的使用 脚丫先生 2019-08-14 14:43:43 2660 收藏 分类专栏&#xff1a; javaee 版权 调度Airflow 本专刊主要以调度系统Airflow详细讲解(会把工作中对于调度系统的docker容器化部署、…

又踩.NET Core的坑:在同步方法中调用异步方法Wait时发生死锁(deadlock)

之前在将 Memcached 客户端 EnyimMemcached 迁移 .NET Core 时被这个“坑”坑的刻骨铭心&#xff08;详见以下链接&#xff09;&#xff0c;当时以为只是在构造函数中调用异步方法&#xff08;注&#xff1a;这里的异步方法都是指基于Task的&#xff09;才会出线死锁&#xff0…

jvm类加载器以及双亲委派

转载自 jvm类加载器以及双亲委派 首先来了解几个概念&#xff1a; 类加载&#xff1a; 概念&#xff1a;虚拟机把描述类的数据从Class文件加载到内存&#xff0c;并对数据进行校验--转换解析--初始化&#xff0c;最终形成能被java虚拟机直接使用的java类型&#xff0c;就是jvm…

分布式系统搭建:服务发现揭秘

CAP理论 加州大学终身教授与著名计算机科学家Eric Allen Brewer在90年代末提出了CAP理论&#xff0c;理论断言任一个基于网络的分布式系统&#xff0c;最多只能满足“数据一致性”、“可用性”、“分区容错性”三要素中的两个要素。 该理论后被MIT证明可行&#xff0c;故架构师…

SSL / TLS 协议运行机制详解

转载自 SSL / TLS 协议运行机制详解 互联网的通信安全&#xff0c;建立在SSL/TLS协议之上。 本文简要介绍SSL/TLS协议的运行机制。文章的重点是设计思想和运行过程&#xff0c;不涉及具体的实现细节。如果想了解这方面的内容&#xff0c;请参阅RFC文档。 一、作用 不使用SS…

在ASP.NET Core Web API上使用Swagger提供API文档

我在开发自己的博客系统&#xff08;http://daxnet.me&#xff09;时&#xff0c;给自己的RESTful服务增加了基于Swagger的API文档功能。当设置IISExpress的默认启动路由到Swagger的API文档页面后&#xff0c;在IISExpress启动Web API站点后&#xff0c;会自动重定向到API文档页…

一文告诉你 Java RMI 和 RPC 的区别

转载自 一文告诉你 Java RMI 和 RPC 的区别 RPC 远程过程调用 RPC&#xff08;Remote Procedure Call Protocol&#xff09;远程过程调用协议&#xff0c;通过网络从远程计算机上请求调用某种服务。一次RPC调用的过程大概有10步&#xff1a; 1.执行客户端调用语句&#xff…

Java架构师必须知道的 6 大设计原则

转载自 Java架构师必须知道的 6 大设计原则 在软件开发中&#xff0c;前人对软件系统的设计和开发总结了一些原则和模式&#xff0c; 不管用什么语言做开发&#xff0c;都将对我们系统设计和开发提供指导意义。本文主要将总结这些常见的原则&#xff0c;和具体阐述意义。 开发…

Flux --gt; Redux --gt; Redux React 入门 基础实例教程

本文的目的很简单&#xff0c;介绍Redux相关概念用法 及其在React项目中的基本使用 假设你会一些ES6、会一些React、有看过Redux相关的文章&#xff0c;这篇入门小文应该能帮助你理一下相关的知识 一般来说&#xff0c;推荐使用 ES6ReactWebpack 的开发模式&#xff0c;但Webpa…

mybatisplus 强制制空 空覆盖原来的字符串

ApiModelProperty(value "证件照片url") TableField(value "id_photo_url",fill FieldFill.UPDATE) private String idPhotoUrl; 方法一 Data EqualsAndHashCode(callSuper false) Accessors(chain true) TableName("base_party_member") A…

微软开源Visual Studio测试平台VSTest

IT之家1月21日消息 微软在MSDN博客上宣布&#xff0c;开源旗下Visual Studio测试平台VSTest。这一平台是具备高扩展性的单元测试执行框架&#xff0c;能够在不同的核心之间实现并行化&#xff0c;提供进程隔离&#xff0c;并能够整合进Visual Studio。 目前&#xff0c;VSTest能…

线程的状态与调度

当我们使用new关键字新建一个线程&#xff0c;这个时候线程就进入了新建状态&#xff08;New&#xff09;&#xff0c;也就是图中未启动状态&#xff1b;调用start方法启动线程&#xff0c;这个时候就进入了可运行状态&#xff0c;也就是就绪状态&#xff08;Runnable&#xff…

深入JVM系列(三)之类加载、类加载器、双亲委派机制与常见问题

转载自 深入JVM系列&#xff08;三&#xff09;之类加载、类加载器、双亲委派机制与常见问题 一&#xff0e;概述 定义&#xff1a;虚拟机把描述类的数据从Class文件加载到内存&#xff0c;并对数据进行校验、转换解析和初始化&#xff0c;最终形成可以被虚拟机直接使用的java…