组里最近遇到一个问题,微软的Azure Service Bus Queue是否可靠?是否会出现丢失消息的情况?
具体缘由如下,
由于开发的产品是SaaS产品,为防止消息丢失,跨Module消息传递使用的是微软Azure消息队列(Service Bus Queue),但是出现一个问题,一个Module向Queue里发送消息,但另一个Module没有取到该消息。因为消息发送过程中并未有异常。所以大家怀疑,是否Azure Service Bus Queue不可靠,丢失了我们的一些消息?
官方的说法是,99.5%的概率消息不会丢失。
但我想应该没有那么凑巧,毕竟我们的消息量还在测试阶段,没有那么大,不会那么凑巧碰上。所以索性根据同事的建议,写一个测试程序来确定Service Bus Queue是否会或者容易丢失消息。
一. 测试程序简介
原理:向消息队列(Queue)中发送一定量的消息,看能否全部取到。如可全部取到,则可认为消息队列基本可靠,问题出在我们自己身上。
过程:
首先建立一个消息队列(Queue),程序使用Azure .Net SDK实现向Queue发送和接受消息(接收到消息后会调用方法在Queue中删除此消息,删除成功,则视为接收成功)。
主程序执行后,会启动两个线程,
线程1负责不断向Queue中发送消息(总量一定,假定共发送10000条,由于SDK中Send方法无返回值告知是否发送成功,如果发送过程中无异常抛出,则视为成功发送)。
线程2负责不断地从Queue中取消息,取到消息到本地后,即删除在Queue中的此消息。取到消息并成功删除视为成功取到消息,计数器+1。
日志模块:
使用Log4net记录日志
二. 代码实现
Class ServiceBusQueueHandler负责封装.Net SDK的发送,接收消息。
class ServiceBusQueueHandler{public static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);public ServiceBusQueueHandler(){/* For most scenarios, it is recommended that you keep Mode to Auto. * This indicates that your application will attempt to use TCP to connect to the Windows Azure Service Bus, * but will use HTTP if unable to do so. In general, this allows your connection to be more efficient. * However, if TCP is always unavailable to your application, * you can save some time on your connection if you globally set the mode to HTTP.*/ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.AutoDetect;}//Send Messagepublic bool SendMessage(string strMessageBody, QueueClient client, int idelayTime = 0){//log.Debug("=>SendMessage");bool bRet = false;try{BrokeredMessage message = new BrokeredMessage(strMessageBody);DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);//log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));//log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));//set the time when this message will be visiablemessage.ScheduledEnqueueTimeUtc = utcEnqueueTime;//http://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.send.aspx client.Send(message);log.Debug(string.Format("Success send! Send Time = {0}, Body = {1}", DateTime.UtcNow.ToString(), message.GetBody<string>()));bRet = true;}catch (TimeoutException e){//Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.log.Debug(string.Format("TimeoutException: {0}", e.Message));return bRet;}catch (ArgumentException e){//Thrown when the BrokeredMessage is null.log.Debug(string.Format("ArgumentException: {0}", e.Message));return bRet;}catch (InvalidOperationException e){//Thrown if the message has already been sent by a QueueClient or MessageSender once already.log.Debug(string.Format("InvalidOperationException: {0}", e.Message));return bRet;}catch (OperationCanceledException e){//Thrown if the client entity has been closed or aborted.log.Debug(string.Format("OperationCanceledException: {0}", e.Message));return bRet;}catch (UnauthorizedAccessException e){//Thrown if there is an I/O or security error.log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));return bRet;}catch (SerializationException e){//Thrown when an error occurs during serialization or deserialization.log.Debug(string.Format("SerializationException: {0}", e.Message));return bRet;}catch (MessagingEntityNotFoundException e){//Thrown if the queue does not exist.log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));return bRet;}catch (MessagingException e){log.Debug(string.Format("MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e);}return bRet;}catch (Exception e){log.Debug(string.Format("Exception: {0}", e.Message));return bRet;}//log.Debug("<=SendMessage");return bRet;}//SendMessages, the maximum size of the batch is the same as the maximum size of a single message (currently 256 Kb).public bool SendMessages(List<string> arrayMessages, QueueClient client, int idelayTime = 0){//log.Debug("=>SendMessage");bool bRet = false;int i = 0;//prepare dataList<BrokeredMessage> arrayBrokedMessages = new List<BrokeredMessage>();DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));foreach (string strMessageBody in arrayMessages){BrokeredMessage message = new BrokeredMessage(strMessageBody);// The Id of message must be assigned message.MessageId = "Message_" + (++i).ToString();message.ScheduledEnqueueTimeUtc = utcEnqueueTime;arrayBrokedMessages.Add(message);}//send messagestry{client.SendBatch(arrayBrokedMessages);log.Debug(string.Format("Success send batch messages!"));bRet = true;}catch (TimeoutException e){//Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.log.Debug(string.Format("TimeoutException: {0}", e.Message));return bRet;}catch (ArgumentException e){//Thrown when the BrokeredMessage is null.log.Debug(string.Format("ArgumentException: {0}", e.Message));return bRet;}catch (InvalidOperationException e){//Thrown if the message has already been sent by a QueueClient or MessageSender once already.log.Debug(string.Format("InvalidOperationException: {0}", e.Message));return bRet;}catch (OperationCanceledException e){//Thrown if the client entity has been closed or aborted.log.Debug(string.Format("OperationCanceledException: {0}", e.Message));return bRet;}catch (UnauthorizedAccessException e){//Thrown if there is an I/O or security error.log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));return bRet;}catch (SerializationException e){//Thrown when an error occurs during serialization or deserialization.log.Debug(string.Format("SerializationException: {0}", e.Message));return bRet;}catch (MessagingEntityNotFoundException e){//Thrown if the queue does not exist.log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));return bRet;}catch (MessagingException e){log.Debug(string.Format("MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e);}return bRet;}catch (Exception e){log.Debug(string.Format("Exception: {0}", e.Message));return bRet;}log.Debug("<=SendMessage");return bRet;}//get messages from a queue//iWaitTimeout: The time span that the server will wait for the message batch to arrive before it times out.public List<BrokeredMessage> GetMessages(int iMaxNumMsg, int iWaitTimeout, QueueClient client){//log.Debug("=>ReceiveMessages"); List<BrokeredMessage> list = new List<BrokeredMessage>();try{//receive messages from Agent Subscriptionlist = client.ReceiveBatch(iMaxNumMsg, TimeSpan.FromSeconds(iWaitTimeout)).ToList<BrokeredMessage>();}catch (MessagingException e){log.Debug(string.Format("ReceiveMessages, MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e);}return null;}catch (Exception e){log.Debug(string.Format("ReceiveMessages, Exception: {0}", e.Message));return null;}//subClient.Close();//log.Debug("<=ReceiveMessages");return list;}public bool DeleteMessage(BrokeredMessage message){//log.Debug("=>DeleteMessage");bool bRet = false;try{message.Complete();bRet = true;log.Debug(string.Format("Delete Message Successfully"));}catch (Exception e){log.Debug(e.Message);return bRet;}//log.Debug("<=DeleteMessage");return bRet;}private void HandleTransientErrors(MessagingException e){//If transient error/exception, let's back-off for 2 seconds and retry log.Debug(e.Message);log.Debug("Transient error happened! Will retry in 2 seconds");Thread.Sleep(2000);}}
Main方法以及线程1,线程2的实现。
//this function is used to send a number of messages to a queuepublic static void SendMessageToQueue(){int sendMessageNum = 10000;log.Debug(string.Format("=> SendMessageToQueue, send message number = {0}", sendMessageNum));//prepare the handler, clientServiceBusQueueHandler handler = new ServiceBusQueueHandler();QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);//the message num which is sent successfullyint count = 0;for (int i = 0; i < sendMessageNum; i++){//send a messagestring strMessageBody = System.Guid.NewGuid().ToString();bool bRet = handler.SendMessage(strMessageBody, client, 10);if (bRet){count++;}//wait 2s, then send next messageThread.Sleep(2000);}log.Debug(string.Format("<= SendMessageToQueue, success sent message number = {0}", count));}public static void ReceiveMessageFromQueue(){log.Debug("=> ReceiveMessageFromQueue");//prepare the handler, clientServiceBusQueueHandler handler = new ServiceBusQueueHandler();QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);//the message num which is received successfullyint count = 0;//if we can't get message in 1 hour(60 * 60 = 30 * 120), we think there are no more messages in the queueint failCount = 0;while (failCount < 30){List<BrokeredMessage> list = handler.GetMessages(10, 120, client);if (list.Count > 0){foreach (BrokeredMessage e in list){log.Debug(string.Format("Received 1 Message, Time = {0}, Message Body = {1}", DateTime.UtcNow.ToString(), e.GetBody<string>()));//delete messagebool bRet = handler.DeleteMessage(e);if (bRet){count++;}}log.Debug(string.Format("Current Count Number = {0}", count));}else{failCount++;log.Debug(string.Format("Didn't Receive any Message this time, fail count number = {0}", failCount));}//wait 10s, then send next messageThread.Sleep(1000);}log.Debug(string.Format("<= ReceiveMessageFromQueue, success received message number = {0}", count));}static void Main(string[] args){log4net.GlobalContext.Properties["LogName"] = "TestServiceBus.log";log4net.Config.XmlConfigurator.Configure();Console.WriteLine("Start");Thread threadSendMessage = new Thread(SendMessageToQueue);Thread threadReceMessage = new Thread(ReceiveMessageFromQueue);threadSendMessage.Start();threadReceMessage.Start();//Console.WriteLine("Stop"); Console.ReadLine();}
当然,这里有一个小地方,因为线程1只会发送10000条消息,线程2一直在接收,但当一个小时内没有接收到消息时,则可认为队列中不会再有消息,则停止接收。
三. 测试结果
从Log来看,程序跑了将近8个小时,最后结果如下:
成功发送10000条消息
2015-04-30 15:01:49,576 [3] DEBUG TestServiceBus.Program <= SendMessageToQueue, success sent message number = 10000
成功接收10000条消息
2015-04-30 15:02:03,638 [4] DEBUG TestServiceBus.Program Current Count Number = 10000
所以仅从此次测试结果来看,Service Bus Queue并未丢失消息。所以组里遇到消息的问题,建议还是从自己代码入手检查问题,是否我们自己出了问题,而非Service Bus Queue。
---------------------------------------------------------------
2015年5月5日更新:最终找到Service Bus丢失消息的原因,问题果然出在我们自己这边,发消息时,message id有重复的可能,导致可能会丢信。message id应唯一。
抛砖引玉,谢谢:-)
Kevin Song
2015年5月2日