RabbitMQ帮助类的封装
基本部分
public class RabbitMQInvoker
{ #region Identy private static IConnection _CurrentConnection = null ; private readonly string _HostName = null ; private readonly string _UserName = null ; private readonly string _Password = null ; #endregion public RabbitMQInvoker ( string hostName = "localhost" , string userName = "guest" , string password = "guest" ) { this . _HostName = hostName; this . _UserName = userName; this . _Password = password; } .. .. ..
}
初始化链接
#region 初始化链接 private static object RabbitMQInvoker_InitLock = new object ( ) ; private void InitConnection ( ) { if ( _CurrentConnection == null || ! _CurrentConnection. IsOpen) { lock ( RabbitMQInvoker_InitLock) { if ( _CurrentConnection == null || ! _CurrentConnection. IsOpen) { var factory = new ConnectionFactory ( ) { HostName = this . _HostName, Password = this . _Password, UserName = this . _UserName} ; _CurrentConnection = factory. CreateConnection ( ) ; } } } } #endregion
初始化交换机
#region 初始化交换机 private static Dictionary< string , bool > RabbitMQInvoker_ExchangeQueue = new Dictionary< string , bool > ( ) ; private static object RabbitMQInvoker_BindQueueLock = new object ( ) ; private void InitExchange ( string exchangeName) { if ( ! RabbitMQInvoker_ExchangeQueue. ContainsKey ( $"InitExchange_ { exchangeName } " ) ) { lock ( RabbitMQInvoker_BindQueueLock) { if ( ! RabbitMQInvoker_ExchangeQueue. ContainsKey ( $"InitExchange_ { exchangeName } " ) ) { this . InitConnection ( ) ; using ( IModel channel = _CurrentConnection. CreateModel ( ) ) { channel. ExchangeDeclare ( exchange : exchangeName, type : ExchangeType. Fanout, durable : true , autoDelete : false , arguments : null ) ; } RabbitMQInvoker_ExchangeQueue[ $"InitExchange_ { exchangeName } " ] = true ; } } } } private void InitBindQueue ( RabbitMQConsumerModel rabbitMQConsumerModel) { if ( ! RabbitMQInvoker_ExchangeQueue. ContainsKey ( $"InitBindQueue_ { rabbitMQConsumerModel. ExchangeName } _ { rabbitMQConsumerModel. QueueName } " ) ) { lock ( RabbitMQInvoker_BindQueueLock) { if ( ! RabbitMQInvoker_ExchangeQueue. ContainsKey ( $"InitBindQueue_ { rabbitMQConsumerModel. ExchangeName } _ { rabbitMQConsumerModel. QueueName } " ) ) { this . InitConnection ( ) ; using ( IModel channel = _CurrentConnection. CreateModel ( ) ) { channel. ExchangeDeclare ( exchange : rabbitMQConsumerModel. ExchangeName, type : ExchangeType. Fanout, durable : true , autoDelete : false , arguments : null ) ; channel. QueueDeclare ( queue : rabbitMQConsumerModel. QueueName, durable : true , exclusive : false , autoDelete : false , arguments : null ) ; channel. QueueBind ( queue : rabbitMQConsumerModel. QueueName, exchange : rabbitMQConsumerModel. ExchangeName, routingKey : string . Empty, arguments : null ) ; } RabbitMQInvoker_ExchangeQueue[ $"InitBindQueue_ { rabbitMQConsumerModel. ExchangeName } _ { rabbitMQConsumerModel. QueueName } " ] = true ; } } } } #endregion
发送信息
#region 发送消息 public void Send ( string exchangeName, string message) { this . InitExchange ( exchangeName) ; if ( _CurrentConnection == null || ! _CurrentConnection. IsOpen) { this . InitConnection ( ) ; } using ( var channel = _CurrentConnection. CreateModel ( ) ) { try { channel. TxSelect ( ) ; var body = Encoding. UTF8. GetBytes ( message) ; channel. BasicPublish ( exchange : exchangeName, routingKey : string . Empty, basicProperties : null , body : body) ; channel. TxCommit ( ) ; Console. WriteLine ( $" [x] Sent { body. Length } " ) ; } catch ( Exception ex) { Console. WriteLine ( ex. Message) ; Console. WriteLine ( $"【 { message } 】发送到Broker失败! { ex. Message } " ) ; channel. TxRollback ( ) ; } } } #endregion
接收信息
#region Receive public void RegistReciveAction ( RabbitMQConsumerModel rabbitMQConsumerMode, Func< string , bool > func) { this . InitBindQueue ( rabbitMQConsumerMode) ; Task. Run ( ( ) => { using ( var channel = _CurrentConnection. CreateModel ( ) ) { var consumer = new EventingBasicConsumer ( channel) ; channel. BasicQos ( 0 , 0 , true ) ; consumer. Received += ( sender, ea) => { string str = Encoding. UTF8. GetString ( ea. Body. ToArray ( ) ) ; if ( func ( str) ) { channel. BasicAck ( deliveryTag : ea. DeliveryTag, multiple : false ) ; } else { channel. BasicReject ( deliveryTag : ea. DeliveryTag, requeue : true ) ; } } ; channel. BasicConsume ( queue : rabbitMQConsumerMode. QueueName, autoAck : false , consumer: consumer) ; Console. WriteLine ( $" Register Consumer To { rabbitMQConsumerMode. ExchangeName } - { rabbitMQConsumerMode. QueueName } " ) ; Console. ReadLine ( ) ; Console. WriteLine ( $" After Register Consumer To { rabbitMQConsumerMode. ExchangeName } - { rabbitMQConsumerMode. QueueName } " ) ; } } ) ; } #endregion