调用
//消息队列发消息MqConfigInfo config = new MqConfigInfo();config.MQExChange = "DrawingOutput";config.MQQueueName = "DrawingOutput";config.MQRoutingKey = "DrawingOutput";MqHelper heper = new MqHelper(config);byte[] body = Encoding.UTF8.GetBytes("98K");//发送的内容heper.SendMsg(body);
消息队列帮助类MqHelper
using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Content; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace LogTest {public class MqHelper : IDisposable{#region 消息队列的配置信息public IConnection MQConnection { get; set; }public IModel MQModel { get; set; }public MqConfigInfo MqConfigInfo { get; set; }#endregionpublic MqHelper(MqConfigInfo configInfo){MqConfigInfo = configInfo;var username = "guest";//用户名//if (string.IsNullOrEmpty(username))//{// throw new ConfigurationErrorsException("MQHelper配置节MQUserName错误");//}var password = "guest";//密码//if (string.IsNullOrEmpty(password))//{// throw new ConfigurationErrorsException("MQHelper配置节MQPassWord错误");//}var virtualhost = "mq_test";//虚拟主机名//if (string.IsNullOrEmpty(virtualhost))//{// throw new ConfigurationErrorsException("MQHelper配置节MQVirtualHost错误");//}var connectionFactory = new ConnectionFactory{UserName = username,Password = password,VirtualHost = virtualhost,RequestedHeartbeat = 0,HostName = "192.168.1.49",//消息队列的ipPort = 5672};try{MQConnection = connectionFactory.CreateConnection();MQModel = MQConnection.CreateModel();if (MqConfigInfo.MQExChangeType != null){MQModel.ExchangeDeclare(MqConfigInfo.MQExChange, MqConfigInfo.MQExChangeType);QueueDeclareOk ok = MQModel.QueueDeclare(MqConfigInfo.MQQueueName, true, false, false, null);MQModel.QueueBind(MqConfigInfo.MQQueueName, MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey);}}catch (Exception ex){throw new Exception("MQHelper创建连接失败", ex);}}/// <summary>/// 发送消息/// </summary>/// <typeparam name="T">消息类型</typeparam>/// <param name="message">消息主体</param>/// <returns></returns>public bool SendMsg(object message){try{IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;//header["Header"] =MqConfigInfo.MQHeader;string json = JsonConvert.SerializeObject(message);byte[] body = Encoding.UTF8.GetBytes(json);if (MqConfigInfo.MQPersistModel){((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;}MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), body);}catch (Exception ex){throw ex;}return true;}/// <summary>/// 发送消息/// </summary>/// <param name="message">消息主体</param>/// <returns></returns>public bool SendMsg(byte[] message){try{IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;//header["Header"] =MqConfigInfo.MQHeader;if (MqConfigInfo.MQPersistModel){((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;}MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), message);}catch (Exception ex){throw ex;}return true;}/// <summary>/// 发送消息/// </summary>/// <param name="message">消息主体</param>/// <returns></returns>public bool SendMsg(string message){try{IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;//header["Header"] =MqConfigInfo.MQHeader;byte[] body = Encoding.UTF8.GetBytes(message);if (MqConfigInfo.MQPersistModel){((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;}MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), body);}catch (Exception ex){throw ex;}return true;}public void Dispose(){if (MQModel != null){MQModel.Dispose();}if (MQConnection != null){MQConnection.Dispose();}}}/// <summary>/// 消息队列配置信息/// </summary>public class MqConfigInfo{public MqConfigInfo(){MQExChangeType = "direct";MQPersistModel = true;}/// <summary>/// 交换机/// </summary>public string MQExChange { get; set; }/// <summary>/// 交换机类型(fanout,direct,topic, headers)默认direct/// </summary>public string MQExChangeType { get; set; }/// <summary>/// 路由Key/// </summary>public string MQRoutingKey { get; set; }/// <summary>/// 消息头/// </summary>public string MQHeader { get; set; }/// <summary>/// 消息的持久化/// </summary>public bool MQPersistModel { get; set; }/// <summary>/// 队列名称/// </summary>public string MQQueueName { get; set; }} }