前言
生产者发送消息到了队列,队列推送数据给了消费者,这里存在一些问题需要思考下
生产者如何确保消息一定投递到了队列中
RabbitMQ 丢失了消息(下文暂不涉及这块)
队列如何确保消费者收到了消息呢
生产者可靠发送
执行流程
当生产者将消息发送出去后,如果不进行特殊设置,默认情况下,发送消息操作后是没有返回任何消息给生产者的,这时生产者是不知道消息有没有真的到达了RabbitMQ,消息可能在到达RabbitMQ前已经丢了。
对于这种情况,有两种解决方案来应对,
事务机制,该机制是AMQP协议层面提供的解决方案
发送方确认机制,这是RabbitMQ提供的解决方案
事务机制
可以将当前使用的channel设置成事务模式。借助如下与事务机制相关的三个方法来完成事务操作。
channel.TxSelect() 启动事务模式
channel.TxCommit() 提交事务
channel.TxRollback() 回滚事务
TXCommit
事务模式下,时序图中发布消息前后增加了一点动作,设置信道为事务模式与提交事务。如果TXCommit()调用成功,则说明事务提交成功,消息一定到达了RabbitMQ中。
TXRollback
当事务提交执行之前,由于RabbitMQ异常崩溃或其他原因抛出异常,则可以捕获异常,再执行TXRollback(),完成事务回滚。
生产者代码
在原有Basic代码基础上增加事务操作,此处先注释掉与事务有关的代码(16、34和40行),并且在发送消息后设置异常模拟
var connFactory = new ConnectionFactory
{HostName = "xxx.xxx.xxx.xxx",Port = 5672,UserName = "rabbitmqdemo",Password = "rabbitmqdemo@test",VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{using (var channel = conn.CreateModel()){var queueName = "helloworld_tx";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);//channel.TxSelect(); while (true){Console.WriteLine("消息内容(exit退出):");var message = Console.ReadLine();if (message.Trim().ToLower() == "exit"){break;}try{var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);if (message.Trim().ToLower() == "9527"){throw new Exception("触发异常");}//channel.TxCommit(); Console.WriteLine("消息内容发送完毕:" + message);}catch (Exception ex){Console.WriteLine("消息发送出现异常:" + ex.Message);//channel.TxRollback(); }}}
}
消费端代码保持不变(仅改动下队列名称)
因生产者已经将消息发送到了队列中,后触发了异常,实则已经和消息能不能到达队列无关了,在消费端也就正常消费了。
取消16、34和40行的注释后,在事务模式下,当再次触发异常后,能够看见,消费端并没有收到消息(这不是说消费端那边出现异常了,而是消息没有到达队列中)。发送消息与其他操作在同一事务下提供了一致性。
局限性
事务模式下,只有收到了Broker的TX.Commit-OK才算提交成功,否则便可以捕获异常执行事务回滚,所以能够解决生产者和RabbitMQ之间消息确认的问题。但是使用事务机制下有一个缺点,在一条消息发送之后会使得发送方阻塞,以等待RabbitMQ的回应,之后才能发送下一条消息,严重降低RabbitMQ的性能与吞吐量,一般不使用。
确认机制
生产者将channel设置为确认模式(confirm),在该信道上发送的消息都会分配一个唯一Id(从1开始),当消息进入到队列中后,RabbitMQ会发送一个ack(Acknowledgement)命令(Basic.Ack)给生产者,这样确保生产者知道这条消息是真的到了。
相比于事务机制下的阻塞,发送方确认机制下是异步的,发布一条消息后,可以等待信道确认的同时发送下一条消息,当信道收到确认时,生产者可以通过回调方法来处理该消息。当RabbitMQ自身内部错误导致消息丢失,会发送nack(Negative Acknowledgement)命令(Basic.Nack),生产者可以在回调方法中处理该命令,诸如重试,记录等操作,所有消息在确认模式下,都会被ack或是nack一次。
核心方法
channel.ConfirmSelect 设置信道为确认模式(Confirm),收到Confirm.Select-OK命令表示同意将当前信道设置为Confirm模式。
channel.WaitForConfirms 信道等待Broker回执确认信息
三种模式
生产者实现confirm有三种方式,取决于confirm的时机
同步确认模式
单条确认模式:发送完消息后等待确认
批量确认模式:发送完一批消息后等待确认
异步确认模式:发送完消息后,提供一个回调方法,Broker回执了一条或多条后由生产者回调执行,异步等待确认
单条确认模式
生产者通过调用channel.ConfirmSelect()将信道设置为确认模式(Confirm)。发送消息到RabbitMQ,当消息进入到队列后,发送Basic.Ack给生产者,信道等待获取确认信息,执行回调逻辑。
var connFactory = new ConnectionFactory
{HostName = "xxx.xxx.xxx.xxx",Port = 5672,UserName = "rabbitmqdemo",Password = "rabbitmqdemo@test",VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{using (var channel = conn.CreateModel()){var queueName = "helloworld_producerack_singleconfirm";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.ConfirmSelect(); while (true){Console.WriteLine("消息内容(exit退出):");var message = Console.ReadLine();if (message.Trim().ToLower() == "exit"){break;}var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);Console.WriteLine("消息内容发送完毕:" + message);Console.WriteLine(channel.WaitForConfirms(new TimeSpan(0, 0, 1)) ? "消息发送成功" : "消息发送失败");}}
}
消息发送完毕,同步等待获取到Broker的回执确认消息,这样来判断是否ack或是nack了,如此可确定生产者消息是否真的到达了Broker。
批量确认模式
信道开启confirm模式,与单条发送后确认不同,批量发送多条之后再调用waitForConfirms确认,这样发送多条之后才会等待一次确认消息,极大提升confirm效率,但问题在于出现返回basic.nack或者超时的情况时,生产者需要将这批消息全部重发,则会带来明显的重复消息数量,并且当消息经常丢失时,批量confirm性能应该是不升反降的。
var connFactory = new ConnectionFactory
{HostName = "xxx.xxx.xxx.xxx",Port = 5672,UserName = "rabbitmqdemo",Password = "rabbitmqdemo@test",VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{using (var channel = conn.CreateModel()){var queueName = "helloworld_producerack_batchconfirm";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.ConfirmSelect();while (true){for (int i = 0; i < 3; i++){Console.WriteLine("消息内容(exit退出):");var message = Console.ReadLine();if (message.Trim().ToLower() == "exit"){break;}var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);Console.WriteLine("消息内容发送完毕:" + message);}Console.WriteLine(channel.WaitForConfirms() ? "所有消息内容发送完毕" : "存在消息发送失败");}}
}
此处对于批量,我设置三个为一组,三个执行完毕后,获得Broker的回执确认消息。
异步确认模式
异步模式下,额外增加回调方法来处理,Channel对象提供的BasicAcks,BasicNacks两个回调事件
// // Summary: // Signalled when a Basic.Nack command arrives from the broker. event EventHandler<BasicNackEventArgs> BasicNacks;// // Summary: // Signalled when a Basic.Ack command arrives from the broker. event EventHandler<BasicAckEventArgs> BasicAcks;
BasicAckEventArg和BasicNackEventArgs都包含deliveryTag(当前Chanel在发送消息时给的消息序号)。
var connFactory = new ConnectionFactory
{HostName = "xxx.xxx.xxx.xxx",Port = 5672,UserName = "rabbitmqdemo",Password = "rabbitmqdemo@test",VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{using (var channel = conn.CreateModel()){var queueName = "helloworld_producerack_asyncconfirm";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.ConfirmSelect();channel.BasicAcks += new EventHandler<BasicAckEventArgs>((o, basic) =>{Console.WriteLine($"调用ack回调方法: DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }");});channel.BasicNacks += new EventHandler<BasicNackEventArgs>((o, basic) =>{Console.WriteLine($"调用Nacks回调方法; DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }");});while (true){Console.WriteLine("消息内容(exit退出):");var message = Console.ReadLine();if (message.Trim().ToLower() == "exit"){break;}var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);Console.WriteLine("消息内容发送完毕:" + message);}}
}
生产者发送消息,生产者收到回执确认消息,调用回调方法
在此基础上,我们可以为channel维护一个unconfirm的消息序号集合,用来控制nack回调下的失败重试。具体实现为
当发布一条消息时,从channel获取当前的唯一的Id存入集合中。
当回调一次BasicAcks事件方法时,unconfirm集合删掉相应的一条或多条记录。
// ...channel.ConfirmSelect();var _pendingDeliveryTags = new LinkedList<ulong>();
channel.BasicAcks += new EventHandler<BasicAckEventArgs>((o, basic) =>
{Console.WriteLine($"调用ack回调方法: DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }");if (_pendingDeliveryTags.Count > 0){return;}if (!basic.Multiple){_pendingDeliveryTags.Remove(basic.DeliveryTag);return;}while (_pendingDeliveryTags.First.Value < basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();}if (_pendingDeliveryTags.First.Value == basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();}
});
channel.BasicNacks += new EventHandler<BasicNackEventArgs>((o, basic) =>
{Console.WriteLine($"调用Nacks回调方法; DeliveryTag: {basic.DeliveryTag};Multiple: { basic.Multiple }"); if (_pendingDeliveryTags.Count > 0){ return;} if (!basic.Multiple){_pendingDeliveryTags.Remove(basic.DeliveryTag); return;} while (_pendingDeliveryTags.First.Value < basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();} if (_pendingDeliveryTags.First.Value == basic.DeliveryTag){_pendingDeliveryTags.RemoveFirst();}//Todo 执行消息重发});for (int i = 0; i < 5000; i++)
{var message = "第" + (i + 1) + "条消息";var body = Encoding.UTF8.GetBytes(message);var nextSeqNo = channel.NextPublishSeqNo;channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);_pendingDeliveryTags.AddLast(nextSeqNo);Console.WriteLine("消息内容发送完毕:" + message);
}
异步模式下,当生产者持续向RabbitMQ发送消息,可能生产者端并不会收到100个ack消息 ,也许收到1个,或者2个,或N个ack消息,这些ack消息的multiple都为true,ack消息中的deliveryTag的值的含义也不是一样的,当multiple为true,deliveryTag代表的值是一批的id,像如下图中的4905,则代表是4904和4905两个唯一Id值。而当multiple为false时,则deliveryTag就仅代表一个唯一的Id值。
从性能方面比对这几种方式,批量和异步确认更胜一筹。可从代码复杂方面来讲,事务机制或是单条确认模式都简单,异步确认模式反而是更复杂。而当遇到批量消息需要重发场景时,批量确认模式却又会存在性能不升反降。因此,也无绝对的优劣,适合的才是最好的。
消费者可靠接收
生产者将消息可靠传递到了RabbitMQ,RabbitMQ将消息推送给消费者,消费者端如何确保消息可靠接收了呢? RabbitMQ提供了消息确认机制,消费者在订阅队列时,可以指定autoAck参数。
AutoAck参数
当autoAck为false时,RabbitMQ会等待消费者显示的回复确认信号后再从内存(或磁盘)中移除消息(先标记再删除)。
当autoAck为true时,RabbitMQ会把消息发送出去的消息自动确认再从内存(或磁盘)中删除,消费者是否接收到了消息不管。
RabbitMQ的Web管理平台上可以看到当前队列是否AutoAck。当AutoAck设置为false时,面板中展示Ack required为勾选。同时队列中的消息分成了两部分:
一部分是等待投递给消费者的消息数 Ready;
一部分是已经投递给消费者,但是未收到消费者确认信号的消息数 Unacked。
如果 RabbitMQ 服务器端一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
RabbitMQ 不会为未确认的消息设置过期时间,判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久
核心方法
BasicAck 用于确认当前消
BasicNack 用于拒绝当前消息,可以执行批量拒绝
BasicReject 用于拒绝当前消息,仅拒绝一条消息
手动确认
生产者端将100条消息存入队列中,可以通过Web面板看到已有Total/Ready都是100条。
消费者端设置消费能力Qos为5,同时channel.BasicConsume 设置autoAck为false
var connFactory = new ConnectionFactory
{HostName = "xxx.xxx.xxx.xxx",Port = 5672,UserName = "rabbitmqdemo",Password = "rabbitmqdemo@test",VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{using (var channel = conn.CreateModel()){var queueName = "helloworld_consumerack";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{Thread.Sleep(10000);var message = ea.Body;Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);Console.ReadKey();}
}
运行代码,在web面板中可以开始看到待确认数为5,即RabbitMQ按照消费者最大负载能力推送消息,同时等待消费者手动确认。
这样,能够可靠的确定消费者着实接收到了消息。当消费者因处理业务发生异常下或是因网络不稳定、服务器异常等,消费者没有反馈Ack,这样消息一直会处在队列中,这样就确保了有其他消费者(也许还是自身)有机会执行重试,确保业务正常,仅仅当RabbitMQ收到消费者的Ack反馈后才会删除。
自动确认
RabbitMQ消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能。
生产者发送100条消息到RabbitMQ中等待消费。
消费者端设置自动确认Ack,接收消息中设置了当读取到第50条消息时抛出异常。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{Thread.Sleep(1000);var message = ea.Body;Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));if (Encoding.UTF8.GetString(message.ToArray()).Contains("50")){Console.WriteLine("异常");throw new Exception("模拟异常");}((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
};channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
当运行程序,消息全部会推送到消费者中(basicQos在AutoAck下无效),出现异常时,RabbitMQ中也不会留有消息,这样一来,消息就丢失了。
这种不安全的模式,使用场景受限,消息易丢失。同时,由于自动确认下,设置Qos无效,当消息数量过多时,RabbitMQ发送到消费者端的消息太快,数量太多,又可能造成消费者端消息挤压而耗尽内存导致进程终止。
2022-08-23,望技术有成后能回来看见自己的脚步