MassTransit:是一款.NET的分布式应用程序框架(开源、免费)。通过MassTransit,可以轻松创建利用基于消息的、松耦合异步通信的应用程序和服务,以提高可用性,可靠性和可伸缩性。
MassTransit本身定位轻量级的服务总线,并支持多种传输方式如:RabbitMQ、Azure Service Bus、ActiveMQ、Amazon SQS、Kafka、Azure Event Hub。消息异常处理:重试配置、重新交付、erro管道、死信管道。分布式事务处理:sagas、Courier。容器支持:.NETcore自身的、autofac、castle windsor等、调度支持:Quartz 、hangfire。更多功能参考官网文档。
MassTransit目前已经发布到了第7个版本了,7.0版本新增了对Kafka 的支持,构建仅支持.NET Standard 2.0...其他改动不大。MassTransit社区使用也是很活跃的,对于首次接触的,通过本篇文章(基于rabbitmq)帮你快速入门!
一个应用程序或服务可以使用两种不同的方法来生产消息,主要区别是sent需要指定具体的端点地址,而pub不需要,下面的代码会演示这两种方式。
发布事件(多个接收者)
发送命令(一个接收者)
发布事件(事件消息)
场景假设:在xx项目中,需要与第三方进行交互。比如:订单发货之后,把发货的信息的推送给第三方、把订单的状态变化也推送过去。我们分析下需求,系统要求在发货之后,需要做若干事情。可以解读为,发货这个动作已经发生了,需要做的事情不确定。这不是典型的发布订阅模式嘛!好了,那使用masstransit如何实现呢?
1.创建一个类库项目定义消息体,命名为contract
public interface OrderShipped{public Guid OrderId { get; set; }//订单号}
2.创建一个api项目作为消息的生产方,命名为Delivery,然后安装nuget包:
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>{x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostConfig =>{hostConfig.Username("Amq");//填写你的用户名hostConfig.Password("mq123");//填写你的用户名});});});services.AddMassTransitHostedService();
在ValueController中,进行发布消息
readonly IPublishEndpoint _publishEndpoint;public ValuesController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}/// <summary>/// 测试发布/// </summary>/// <returns></returns>[HttpPost]public async Task<ActionResult> OrderDelivery(){//其他await _publishEndpoint.Publish<OrderShipped>(new{OrderId = Guid.NewGuid()});return Ok();}
通过使用IPublishEndpoint实例的Publish方法,发布一个事件。
3.创建一个api项目作为消息的消费方,命名为Listener,然后安装nuget包:
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>{x.AddConsumer<DeliveryExpressNotifyConsumer>();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostConfig =>{hostConfig.Username("Amq");hostConfig.Password("mq123");});config.ReceiveEndpoint("Order.Shipped", e =>{e.ConfigureConsumer<DeliveryExpressNotifyConsumer>(context);});});});services.AddMassTransitHostedService();
实现接口IConsumer即可完成消费逻辑的过程。
public class DeliveryExpressNotifyConsumer : IConsumer<OrderShipped>{ILogger<DeliveryExpressNotifyConsumer> _logger;public DeliveryExpressNotifyConsumer(ILogger<DeliveryExpressNotifyConsumer> logger){_logger = logger;}public async Task Consume(ConsumeContext<OrderShipped> context){_logger.LogInformation("订单id: {Value}已发货,通知第三方", context.Message.OrderId);}}
到此,消息生产方和消费方代码都已经实现了,运行一下,效果如下
发送消息(命令消息)
发送消息适用的场景,常常是一种命令,并且期望消息只被一个接收者或服务实例进行处理。masstransit使用发送消息和发布消息,在消息生产方不同之处,sent消息需要指定目标地址,使用ISendEndpoint的Send方法,消费者代码一样的配置。以下代码演示发送一个创建发货单的指令消息,比较简单直接贴出源码:
1.定义一个消息SubmitShippingOrder
public class SubmitShippingOrder{public Guid OrderId { get; set; }}
2.sent消息
readonly IPublishEndpoint _publishEndpoint;readonly ISendEndpointProvider _sendEndpointProvider;public ValuesController(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider){_publishEndpoint = publishEndpoint;_sendEndpointProvider = sendEndpointProvider;}/// <summary>/// 测试发送/// </summary>/// <returns></returns>[HttpPost]public async Task<ActionResult> Delivery(){var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:Submit.Shipping.Order")); //获取发送端点await endpoint.Send(new SubmitShippingOrder{OrderId = Guid.NewGuid()});return Ok();}
3.消费
public class SubmitShippingOrderConsumer : IConsumer<SubmitShippingOrder>{ILogger<SubmitShippingOrderConsumer> _logger;public SubmitShippingOrderConsumer(ILogger<SubmitShippingOrderConsumer> logger){_logger = logger;}public async Task Consume(ConsumeContext<SubmitShippingOrder> context){_logger.LogInformation("创建发货单,订单id: {Value}", context.Message.OrderId);}}
运行一下效果:
官网文档:
http://masstransit-project.com/
https://masstransit-project.com/advanced/courier/