【Microsoft Azure学习之旅】测试消息队列(Service Bus Queue)是否会丢消息

  组里最近遇到一个问题,微软的Azure Service Bus Queue是否可靠?是否会出现丢失消息的情况?

  具体缘由如下,

  由于开发的产品是SaaS产品,为防止消息丢失,跨Module消息传递使用的是微软Azure消息队列(Service Bus Queue),但是出现一个问题,一个Module向Queue里发送消息,但另一个Module没有取到该消息。因为消息发送过程中并未有异常。所以大家怀疑,是否Azure Service Bus Queue不可靠,丢失了我们的一些消息

  官方的说法是,99.5%的概率消息不会丢失。

  但我想应该没有那么凑巧,毕竟我们的消息量还在测试阶段,没有那么大,不会那么凑巧碰上。所以索性根据同事的建议,写一个测试程序来确定Service Bus Queue是否会或者容易丢失消息。

  

一. 测试程序简介

  原理:向消息队列(Queue)中发送一定量的消息,看能否全部取到。如可全部取到,则可认为消息队列基本可靠,问题出在我们自己身上。

  过程:

  首先建立一个消息队列(Queue),程序使用Azure .Net SDK实现向Queue发送和接受消息(接收到消息后会调用方法在Queue中删除此消息,删除成功,则视为接收成功)。

  主程序执行后,会启动两个线程,

  线程1负责不断向Queue中发送消息(总量一定,假定共发送10000条,由于SDK中Send方法无返回值告知是否发送成功,如果发送过程中无异常抛出,则视为成功发送)。

  线程2负责不断地从Queue中取消息,取到消息到本地后,即删除在Queue中的此消息。取到消息并成功删除视为成功取到消息,计数器+1。

  日志模块:

  使用Log4net记录日志  

 

二. 代码实现

  Class ServiceBusQueueHandler负责封装.Net SDK的发送,接收消息。

class ServiceBusQueueHandler{public static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);public ServiceBusQueueHandler(){/* For most scenarios, it is recommended that you keep Mode to Auto. * This indicates that your application will attempt to use TCP to connect to the Windows Azure Service Bus, * but will use HTTP if unable to do so. In general, this allows your connection to be more efficient. * However, if TCP is always unavailable to your application, * you can save some time on your connection if you globally set the mode to HTTP.*/ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.AutoDetect;}//Send Messagepublic bool SendMessage(string strMessageBody, QueueClient client, int idelayTime = 0){//log.Debug("=>SendMessage");bool bRet = false;try{BrokeredMessage message = new BrokeredMessage(strMessageBody);DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);//log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));//log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));//set the time when this message will be visiablemessage.ScheduledEnqueueTimeUtc = utcEnqueueTime;//http://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.send.aspx
                client.Send(message);log.Debug(string.Format("Success send! Send Time = {0}, Body = {1}", DateTime.UtcNow.ToString(), message.GetBody<string>()));bRet = true;}catch (TimeoutException e){//Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.log.Debug(string.Format("TimeoutException: {0}", e.Message));return bRet;}catch (ArgumentException e){//Thrown when the BrokeredMessage is null.log.Debug(string.Format("ArgumentException: {0}", e.Message));return bRet;}catch (InvalidOperationException e){//Thrown if the message has already been sent by a QueueClient or MessageSender once already.log.Debug(string.Format("InvalidOperationException: {0}", e.Message));return bRet;}catch (OperationCanceledException e){//Thrown if the client entity has been closed or aborted.log.Debug(string.Format("OperationCanceledException: {0}", e.Message));return bRet;}catch (UnauthorizedAccessException e){//Thrown if there is an I/O or security error.log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));return bRet;}catch (SerializationException e){//Thrown when an error occurs during serialization or deserialization.log.Debug(string.Format("SerializationException: {0}", e.Message));return bRet;}catch (MessagingEntityNotFoundException e){//Thrown if the queue does not exist.log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));return bRet;}catch (MessagingException e){log.Debug(string.Format("MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried.
                    HandleTransientErrors(e);}return bRet;}catch (Exception e){log.Debug(string.Format("Exception: {0}", e.Message));return bRet;}//log.Debug("<=SendMessage");return bRet;}//SendMessages, the maximum size of the batch is the same as the maximum size of a single message (currently 256 Kb).public bool SendMessages(List<string> arrayMessages, QueueClient client, int idelayTime = 0){//log.Debug("=>SendMessage");bool bRet = false;int i = 0;//prepare dataList<BrokeredMessage> arrayBrokedMessages = new List<BrokeredMessage>();DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));foreach (string strMessageBody in arrayMessages){BrokeredMessage message = new BrokeredMessage(strMessageBody);// The Id of message must be assigned message.MessageId = "Message_" + (++i).ToString();message.ScheduledEnqueueTimeUtc = utcEnqueueTime;arrayBrokedMessages.Add(message);}//send messagestry{client.SendBatch(arrayBrokedMessages);log.Debug(string.Format("Success send batch messages!"));bRet = true;}catch (TimeoutException e){//Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.log.Debug(string.Format("TimeoutException: {0}", e.Message));return bRet;}catch (ArgumentException e){//Thrown when the BrokeredMessage is null.log.Debug(string.Format("ArgumentException: {0}", e.Message));return bRet;}catch (InvalidOperationException e){//Thrown if the message has already been sent by a QueueClient or MessageSender once already.log.Debug(string.Format("InvalidOperationException: {0}", e.Message));return bRet;}catch (OperationCanceledException e){//Thrown if the client entity has been closed or aborted.log.Debug(string.Format("OperationCanceledException: {0}", e.Message));return bRet;}catch (UnauthorizedAccessException e){//Thrown if there is an I/O or security error.log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));return bRet;}catch (SerializationException e){//Thrown when an error occurs during serialization or deserialization.log.Debug(string.Format("SerializationException: {0}", e.Message));return bRet;}catch (MessagingEntityNotFoundException e){//Thrown if the queue does not exist.log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));return bRet;}catch (MessagingException e){log.Debug(string.Format("MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried.
                    HandleTransientErrors(e);}return bRet;}catch (Exception e){log.Debug(string.Format("Exception: {0}", e.Message));return bRet;}log.Debug("<=SendMessage");return bRet;}//get messages from a queue//iWaitTimeout: The time span that the server will wait for the message batch to arrive before it times out.public List<BrokeredMessage> GetMessages(int iMaxNumMsg, int iWaitTimeout, QueueClient client){//log.Debug("=>ReceiveMessages");
List<BrokeredMessage> list = new List<BrokeredMessage>();try{//receive messages from Agent Subscriptionlist = client.ReceiveBatch(iMaxNumMsg, TimeSpan.FromSeconds(iWaitTimeout)).ToList<BrokeredMessage>();}catch (MessagingException e){log.Debug(string.Format("ReceiveMessages, MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried.
                    HandleTransientErrors(e);}return null;}catch (Exception e){log.Debug(string.Format("ReceiveMessages, Exception: {0}", e.Message));return null;}//subClient.Close();//log.Debug("<=ReceiveMessages");return list;}public bool DeleteMessage(BrokeredMessage message){//log.Debug("=>DeleteMessage");bool bRet = false;try{message.Complete();bRet = true;log.Debug(string.Format("Delete Message Successfully"));}catch (Exception e){log.Debug(e.Message);return bRet;}//log.Debug("<=DeleteMessage");return bRet;}private void HandleTransientErrors(MessagingException e){//If transient error/exception, let's back-off for 2 seconds and retry
            log.Debug(e.Message);log.Debug("Transient error happened! Will retry in 2 seconds");Thread.Sleep(2000);}}

 

  Main方法以及线程1,线程2的实现。

//this function is used to send a number of messages to a queuepublic static void SendMessageToQueue(){int sendMessageNum = 10000;log.Debug(string.Format("=> SendMessageToQueue, send message number = {0}", sendMessageNum));//prepare the handler, clientServiceBusQueueHandler handler = new ServiceBusQueueHandler();QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);//the message num which is sent successfullyint count = 0;for (int i = 0; i < sendMessageNum; i++){//send a messagestring strMessageBody = System.Guid.NewGuid().ToString();bool bRet = handler.SendMessage(strMessageBody, client, 10);if (bRet){count++;}//wait 2s, then send next messageThread.Sleep(2000);}log.Debug(string.Format("<= SendMessageToQueue, success sent message number = {0}", count));}public static void ReceiveMessageFromQueue(){log.Debug("=> ReceiveMessageFromQueue");//prepare the handler, clientServiceBusQueueHandler handler = new ServiceBusQueueHandler();QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);//the message num which is received successfullyint count = 0;//if we can't get message in 1 hour(60 * 60 =  30 * 120), we think there are no more messages in the queueint failCount = 0;while (failCount < 30){List<BrokeredMessage> list = handler.GetMessages(10, 120, client);if (list.Count > 0){foreach (BrokeredMessage e in list){log.Debug(string.Format("Received 1 Message, Time = {0}, Message Body = {1}", DateTime.UtcNow.ToString(), e.GetBody<string>()));//delete messagebool bRet = handler.DeleteMessage(e);if (bRet){count++;}}log.Debug(string.Format("Current Count Number = {0}", count));}else{failCount++;log.Debug(string.Format("Didn't Receive any Message this time, fail count number = {0}", failCount));}//wait 10s, then send next messageThread.Sleep(1000);}log.Debug(string.Format("<= ReceiveMessageFromQueue, success received message number = {0}", count));}static void Main(string[] args){log4net.GlobalContext.Properties["LogName"] = "TestServiceBus.log";log4net.Config.XmlConfigurator.Configure();Console.WriteLine("Start");Thread threadSendMessage = new Thread(SendMessageToQueue);Thread threadReceMessage = new Thread(ReceiveMessageFromQueue);threadSendMessage.Start();threadReceMessage.Start();//Console.WriteLine("Stop");
            Console.ReadLine();}

  当然,这里有一个小地方,因为线程1只会发送10000条消息,线程2一直在接收,但当一个小时内没有接收到消息时,则可认为队列中不会再有消息,则停止接收。

 

三. 测试结果

  从Log来看,程序跑了将近8个小时,最后结果如下:

  成功发送10000条消息

2015-04-30 15:01:49,576 [3] DEBUG TestServiceBus.Program <= SendMessageToQueue, success sent message number = 10000

  成功接收10000条消息

2015-04-30 15:02:03,638 [4] DEBUG TestServiceBus.Program Current Count Number = 10000

  所以仅从此次测试结果来看,Service Bus Queue并未丢失消息。所以组里遇到消息的问题,建议还是从自己代码入手检查问题,是否我们自己出了问题,而非Service Bus Queue。

 

---------------------------------------------------------------

2015年5月5日更新:最终找到Service Bus丢失消息的原因,问题果然出在我们自己这边,发消息时,message id有重复的可能,导致可能会丢信。message id应唯一。

 

  抛砖引玉,谢谢:-)

 

  Kevin Song

  2015年5月2日

  

 

转载于:https://www.cnblogs.com/KevinSong/p/Azure.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/544926.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用SharedPreferences存储和读取数据

转&#xff1a;http://www.worlduc.com/blog2012.aspx?bid19403392 1、任务目标 &#xff08;1&#xff09;掌握Android中SharedPreferences的使用方法。 2、任务陈述 &#xff08;1&#xff09;运行后&#xff0c;显示如下界面&#xff0c;可以写入和读取SharedPreferences中…

Zookeeper 的 5 大核心知识点!

1 ZooKeeper简介ZooKeeper 是一个开源的分布式协调框架&#xff0c;它的定位是为分布式应用提供一致性服务&#xff0c;是整个大数据体系的管理员。ZooKeeper 会封装好复杂易出错的关键服务&#xff0c;将高效、稳定、易用的服务提供给用户使用。如果上面的官方言语你不太理解&…

PHP | 计算字符串中的单词总数

Given a string and we have to count the total number of words in it. 给定一个字符串&#xff0c;我们必须计算其中的单词总数。 str_word_count() function str_word_count()函数 To find the total number of words in a string, we can use str_word_count() function…

parted分区介绍

简介: 当硬盘或raid后,硬盘大于2T的时候,可以使用parted进行分区; 使用parted的前提是操作系统已经安装部署完成; 大于2T的硬盘在安装部署阶段可以使用raid的虚拟磁盘技术分区,如分出100G安装系统,剩余的在安装系统后,使用parted进行分区; 1.parted非交互式分区: …

SharedPreferences详解

我们在开发软件的时候,常需要向用户提供软件参数设置功能,例如我们常用的微信,用户可以设置是否允许陌生人添加自己为好友.对于软件配置参数的保存,如果是在window下通常我们会采用ini文件进行保存.如果是J2EE下面,我们会采用properties属性文件或者xml进行保存.在我们的Androi…

【视频版】最新版Swagger 3升级指南和新功能体验!

作者 | 王磊来源 | Java中文社群&#xff08;ID&#xff1a;javacn666&#xff09;转载请联系授权&#xff08;微信ID&#xff1a;GG_Stone&#xff09;Swagger 3.0 发布已经有一段时间了&#xff0c;它于 2020.7 月 发布&#xff0c;但目前市面上使用的主流版本还是 Swagger 2…

java treemap_Java TreeMap pollFirstEntry()方法与示例

java treemapTreeMap类pollFirstEntry()方法 (TreeMap Class pollFirstEntry() method) pollFirstEntry() method is available in java.util package. pollFirstEntry()方法在java.util包中可用。 pollFirstEntry() method is used to return and then remove the entry (key-…

各大厂面试高频的面试题新鲜出炉,你能答上几道?

关于生产环境如何配置线程数&#xff0c;还是要根据业务来进行区分&#xff0c;我们时常会听到什么IO密集型、CPU密集型任务...那么这里提一个问题&#xff1a;大家知道什么样的任务或者代码会被认定为IO/CPU密集&#xff1f;又是用什么样的标准来认定IO/CPU密集&#xff1f;如…

c/c++如何获取数组的长度

2019独角兽企业重金招聘Python工程师标准>>> C、C中没有提供 直接获取数组长度的函数&#xff0c;对于存放字符串的字符数组提供了一个strlen函数获取长度&#xff0c;那么对于其他类型的数组如何获取他们的长度呢&#xff1f;其中一种方法是使 用sizeof(array) / s…

JSP JAVA 自定义 错误页面(404,505,500)

当网站页面找不到或者服务器内部出错的时候&#xff0c;我们不想让用户看到默认的那张 404&#xff0c;500 的错误页面&#xff0c;那要是想自己做一张 找不到页面的页面改怎么做呢&#xff1f;在 web .xml 文件中 加入下面的语句就能达到这个效果<error-page><error-…

【送给读者】全新苹果 AirPods,包邮送一套!

为回馈长期以来科创人读者对本栏目的关注支持&#xff0c;本周小编联合了计算机领域八位高质量原创号主一起为大家送出一套 全新苹果AirPods 2代。以下推荐的公号原创率都很高&#xff0c;均为个人IP号&#xff0c;有些小伙伴应该已经关注部分公号。本次抽奖采用第三方抽奖小程…

java 方法 示例_Java扫描仪的hasNextBoolean()方法与示例

java 方法 示例扫描器类的hasNextBoolean()方法 (Scanner Class hasNextBoolean() method) hasNextBoolean() method is available in java.util package. hasNextBoolean()方法在java.util包中可用。 hasNextBoolean() method is used to check whether this Scanners next in…

进程控制(kill)

为什么80%的码农都做不了架构师&#xff1f;>>> kill&#xff1a;终止进程&#xff08;或传送信号到某进程&#xff09; kill [options] [process_ids] kill命令可以发送信号给进程&#xff0c;可以终止&#xff08;terminate&#xff09;&#xff08;默认操作&a…

oracle怎样修改表名、列名、字段类型、添加表列、删除表列

ALTER TABLE SCOTT.TEST RENAME TO TEST1--修改表名 ALTER TABLE SCOTT.TEST RENAME COLUMN NAME TO NAME1 --修改表列名 ALTER TABLE SCOTT.TEST MODIFY NAME1 NUMBER(20) --修改字段类型 ALTER TABLE SCOTT.TEST ADD ADDRESS VARCHAR2(40) --添加表列 ALTER TABLE SCOTT.TEST…

TextArea里Placeholder换行问题

转&#xff1a;http://www.tuicool.com/articles/feYVNf 页面上使用TextArea控件时&#xff0c;会时不时的想给个提示&#xff0c;比如按照一定方式操作之类的。 正常情况下&#xff0c;会使用Placeholder&#xff0c;但这样的提示是不会换行的&#xff0c;无论是用\r\n&…

printstream_Java PrintStream clearError()方法与示例

printstreamPrintStream类clearError()方法 (PrintStream Class clearError() method) clearError() method is available in java.io package. clearError()方法在java.io包中可用。 clearError() method is used to clear the internal error state of this PrintStream. cle…

uniq用法详解

uniquniq命令可以去除排序过的文件中的重复行&#xff0c;因此uniq经常和sort合用。也就是说&#xff0c;为了使uniq起作用&#xff0c;所有的重复行必须是相邻的。uniq语法[rootwww ~]# uniq [-icu]选项与参数&#xff1a;-i &#xff1a;忽略大小写字符的不同&#xff1b;-…

Swagger增强神器:Knife4j!用它轻松实现接口搜索、Word下载、接口过滤...

视频版内容&#xff1a;Swagger 是开发中最常用的框架之一了&#xff0c;但 Swagger 本身又有很多不完善的地方&#xff0c;比如&#xff0c;在众多的接口中查询某一个接口&#xff0c;又或者是把所有的接口导出成 Word 格式等&#xff0c;都无法在 Swagger 中实现。有人可能会…

tohexstring方法_Java Long类toHexString()方法的示例

tohexstring方法长类toHexString()方法 (Long class toHexString() method) toHexString() method is available in java.lang package. toHexString()方法在java.lang包中可用。 toHexString() method is used to represent a hexadecimal string of the given parameter [val…

关于显示和隐藏DIV标签

document.getElementById("DIV的ID").style.display"none";//隐藏 document.getElementById("DIV的ID").style.display"block";//显示