RabbitMQ的确认机制
生产者确认
public class ProductionMessageConfirm
{public static void Send(){ConnectionFactory factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()){//创建通道channelusing (var channel = connection.CreateModel()){Console.WriteLine("生产者准备就绪....");channel.QueueDeclare(queue: "ConfirmSelectQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);//声明交换机exchangchannel.ExchangeDeclare(exchange: "ConfirmSelectQueueExchange", type: ExchangeType.Direct, durable: true,autoDelete: false, arguments: null);//绑定exchange和queuechannel.QueueBind(queue: "ConfirmSelectQueue", exchange: "ConfirmSelectQueueExchange", routingKey: "ConfirmSelectKey");string message = "";//发送消息//在控制台输入消息,按enter键发送消息while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)){message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);try{//开启消息确认模式channel.ConfirmSelect();//基于当前这个channel 开启消息确认机制 //发送消息channel.BasicPublish(exchange: "ConfirmSelectQueueExchange", routingKey: "ConfirmSelectKey", basicProperties: null, body: body);//事务提交 //调用WaitForConfirms 方法来判断如果消息正常发送,就返回true,否则就返回falseif (channel.WaitForConfirms()) //如果一条消息或多消息都确认发送{Console.WriteLine($"【{message}】发送到Broke成功!");}else{//可以记录个日志,重试一下;}//通过当前信道,调用WaitForConfirmsOrDie,如果所有消息发送成功 就正常执行;如果有消息发送失败;就抛出异常;//channel.WaitForConfirmsOrDie();//如果所有消息发送成功 就正常执行;如果有消息发送失败;就抛出异常;}catch (Exception){Console.WriteLine($"【{message}】发送到Broker失败!");//就应该通知管理员// 重新试一下}}Console.Read();}}}
}
生产者事务版确认
public class ProductionMessageTx
{public static void Send(){ConnectionFactory factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()){//创建通道channelusing (var channel = connection.CreateModel()){Console.WriteLine("生产者准备就绪....");channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);//声明交换机exchangchannel.ExchangeDeclare(exchange: "MessageTxQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);//绑定exchange和queuechannel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01");channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02");string message = "";//发送消息//在控制台输入消息,按enter键发送消息while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)){message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);try{//通过当前信道调用TxSelect方法开启事务机制---RabbitMQ AMQP协议,强事务保证;channel.TxSelect(); //事务是协议支持的//发送消息//同时给多个队列发送消息;要么都成功;要么都失败;//执行下面这两句话,都不会吧消息发送到队列中去;channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body);channel.BasicPublish(exchange: "MessageTxQueueExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body);//事务提交channel.TxCommit(); //只有事务提交成功以后,才会真正的写入到队列里面去//如果失败---抛出异常;Console.WriteLine($"【{message}】发送到Broke成功!");}catch (Exception ex){Console.WriteLine($"【{message}】发送到Broker失败!");channel.TxRollback(); //事务回滚 或者重试一下;throw;}}Console.Read();}}}
}
消费者确认
自动确认
public class ConsumerMessageConfirm
{public static void Consumption(){var factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()){using (IModel channel = connection.CreateModel()){#region EventingBasicConsumer//定义消费者 var consumer = new EventingBasicConsumer(channel);int i = 1;consumer.Received += (model, ea) =>{if (i == 11){throw new Exception("");}i++;Console.WriteLine(Encoding.UTF8.GetString(ea.Body.ToArray()));};Console.WriteLine("消费者准备就绪....");//处理消息 //autoAck: true 自动确认:消息队列的消息直接推送给当前这个消费者;//照单全收,消费是否正常消费,RabbitMQ不管;channel.BasicConsume(queue: "PersistenceQueue", autoAck: true, consumer: consumer);Console.ReadKey();#endregion}}}
}
消费后确认
public class ConsumptionACKConfirm
{public static void Consumption(){var factory = new ConnectionFactory();factory.HostName = "localhost";//RabbitMQ服务在本地运行factory.UserName = "guest";//用户名factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()){using (IModel channel = connection.CreateModel()){#region EventingBasicConsumer//定义消费者 var consumer = new EventingBasicConsumer(channel);int i = 0;consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.ToArray());//如果在这里处理消息的手,异常了呢? //Console.WriteLine($"接收到消息:{message}"); ; if (i < 50){//手动确认 消息正常消费 告诉Broker:你可以把当前这条消息删除掉了channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);Console.WriteLine(message);}else{//否定:告诉Broker,这个消息我没有正常消费; requeue: true:重新写入到队列里去; false:你还是删除掉;channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);}i++;};Console.WriteLine("消费者准备就绪....");{//处理消息 //autoAck: true 自动确认; //channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: true, consumer: consumer);}{//处理消息 //autoAck: false 显示确认; channel.BasicConsume(queue: "PersistenceQueue", autoAck: false, consumer: consumer);}Console.ReadKey();#endregion}}}
}