1.首先我们来讲解一下消息队列的作用
比如说我们的订单系统,再客户订单生成了以后,可能会有
快递系统,通知系统,和打印系统需要用到当前订单的详细内容
所以这个时候常规的操作是在A里面通过代码调用B,C ,D系统的接口来通知他们有新订单了
如果此时有个E系统呢
那我们的做法可能只能在A系统中增加代码来通知E系统,但是如果后期我们的E系统又不要了呢,岂不是我们又要在A系统中去除掉这一部分代码所以说这样的代码冗余就很高,对后期的性能也很有影响,因为系统A中通知B,C,D系统还要判断 B,C,D返回值中是否成功,如果没有成功还要子再次请求这样系统性能就非常的低下
所以说我们使用了MQ来解决这个问题
我们A系统秩序通知MQ系统 后面的B,C,D要信息的话直接找MQ系统调用就行
接下来我这里讲解下如何把MQ集成到.net 项目中
生产者端:
using Aliyun.MQ;
using Aliyun.MQ.Model;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;namespace CSDNSign.Controllers
{[Route("api/[controller]")][ApiController]public class MQController : ControllerBase{readonly IFreeSql _sql;private readonly ILogger<MQController> _logger;private const string _endpoint = "*************";// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。private const string _accessKeyId = "*************";// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。private const string _secretAccessKey = "*************";// 所属的Topic。private const string _topicName = "xsw";// Topic所属实例ID,默认实例为空。private const string _instanceId = "*************";private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);static MQProducer producer = _client.GetProducer(_instanceId, _topicName);/// <summary>/// /// </summary>/// <param name="sql"></param>/// <param name="logger"></param>public MQController(IFreeSql sql, ILogger<MQController> logger){_sql = sql;_logger = logger;}#region /// <summary>/// /// </summary>/// <param name="json"></param>/// <param name="key"></param>/// <param name="tag"></param>/// <returns></returns>[HttpPost][Route("TestMQ")]public string TestMQ(string json, string key, string tag){try{TopicMessage sendMsg = new TopicMessage(json);// 设置属性。sendMsg.PutProperty("a", "a");// 设置Key。sendMsg.MessageKey = key;TopicMessage result = producer.PublishMessage(sendMsg);return JsonConvert.SerializeObject(result) ;}catch (Exception ex){return ex.Message.ToString();}}#endregion}
}
其中的endpoint可以在阿里云中接入点中查看
由于我们用的http的方式接受所以我们复制下面的地址
消费者端:
using Aliyun.MQ;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Security.Permissions;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;namespace WPFTest
{/// <summary>/// Interaction logic for MainWindow.xaml/// </summary>public partial class MainWindow : Window{private const string _endpoint = "********";// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。private const string _accessKeyId = "*****";// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。private const string _secretAccessKey = "*****";// 所属的Topic。private const string _topicName = "*****";// Topic所属实例ID,默认实例为空。private const string _instanceId = "*******";private const string _groupId = "*********";private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);private Thread thread;public MainWindow(){InitializeComponent();this.button_Stop.IsEnabled = false;}private void button_Click(object sender, RoutedEventArgs e){thread = new Thread(new ThreadStart(TaskStart));thread.IsBackground = true;thread.Start();this.button_get.IsEnabled = false;this.button_Stop.IsEnabled = true;}/// <summary>/// 开始任务/// </summary>private void TaskStart(){Dispatcher.Invoke(() => this.textBox.Text = "====== 开始获取MQ任务 ====== \n");// 在当前线程循环消费消息,建议多开个几个线程并发消费消息。while (true){try{// 长轮询消费消息。// 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。List<Message> messages = null;try{messages = consumer.ConsumeMessage(3, // 一次最多消费3条(最多可设置为16条)。3 // 长轮询时间3秒(最多可设置为30秒)。);}catch (Exception exp1){if (exp1 is MessageNotExistException){Dispatcher.Invoke(() => textBox.Text += string.Format(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId + "\n"));continue;}Console.WriteLine(exp1);Thread.Sleep(2000);}if (messages == null){continue;}List<string> handlers = new List<string>();List<string> mqmessage = new List<string>();Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");// 处理业务逻辑。foreach (Message message in messages){Console.WriteLine(message);Console.WriteLine("Property a is:" + message.GetProperty("a"));handlers.Add(message.ReceiptHandle);mqmessage.Add(message.Body);}// Message.nextConsumeTime前若不确认消息消费成功,则消息会被重复消费。// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。try{consumer.AckMessage(handlers);Console.WriteLine("Ack message success:");foreach (string handle in mqmessage){Dispatcher.Invoke(() => textBox.Text += handle + "\n");}Console.WriteLine();return;}catch (Exception exp2){// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。if (exp2 is AckMessageException){AckMessageException ackExp = (AckMessageException)exp2;Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems){Dispatcher.Invoke(() => textBox.Text += string.Format("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage));}}}}catch (Exception ex){Console.WriteLine(ex);Thread.Sleep(2000);}}}private void button_Clean_Click(object sender, RoutedEventArgs e){this.textBox.Text = "";}private void button_Stop_Click(object sender, RoutedEventArgs e){this.button_get.IsEnabled = true;this.button_Stop.IsEnabled = false;thread.Interrupt();// Wait for newThread to end.//thread.Join();}}
}
最后项目效果如下所示: