对于队列,无论是实现为JMS ,数据库表(即Ruby的Delayed :: Job用于队列的什么),甚至是Amazon的SQS ,用于评估队列状态的最常见指标是其长度。 从本质上讲,可以基于在任何给定时间队列中驻留多少消息来得出效率度量。 如果只有几条消息,则表明队列正在高效地运行。 如果有很多消息,则说明效率低下,必须发出警报。
但是,如果您处于持续繁忙的环境中,并且队列突然出现快速填充的趋势,那该怎么办? 如果你有足够的工人已经在运行来处理突发,你需要更多的火起来?
您可以解雇更多的工人,但这样做可能会花费您。 也就是说,您可能必须设置新的工作程序实例,例如Heroku工作程序dynos或AWS AMI,这最终将使您花费大量的金钱。 有时,这些工作人员实例需要一些时间来启动,当它们开始运行时,活动爆发已经结束,队列又恢复了正常–最初可用的工作人员充分地处理了负载。
事实证明,队列的长度是一个滞后指标。 您浪费了不必要的资源。 错误的警报!
如果您已经有足够的能力来处理队列中的消息大量涌入,那么监视队列的长度就不会有太大帮助。 实际上,这是一个误导性指标,可能会导致您采取不必要的操作。
因此,当已经有足够的工人在场时,队列的长度并不表示系统的效率 。 相反,在高容量环境中意味着某些事情的度量标准是消息在队列中驻留的时间 。 这是一个可行的指标:如果消息被卡在队列中等待处理,那么您需要更多的处理器!
Moo超过队列长度,让队列等待时间
默认情况下, Amazon的SQS不提供查询消息已在队列中保留多长时间的功能。 因此, 我写了Moo 。
Moo为客户端提供了一个接口,该接口可用于获取队列度量标准中的消息时间并对其采取措施。 这是通过使用时间戳扩展SQS消息来完成的。 然后,当从SQS队列中弹出消息时,将检查该时间戳。 如果超过阈值差,则调用回调。
Moo的用户会发现它的用法类似于Ahoy! ,它是AWS Java SDK之上的面向异步回调的外观。 实际上,Moo使用Ahoy! 在下面,带有附加功能,即附加“最大排队时间”异步回调。
Moo支持多个队列时间阈值,并且设置最大队列时间的方法如下:
为队列中的时间添加最大阈值
//adds a 1 second max threshold
sqs.addQueueWaitTimeCallback(1000, new QueueWaitTimeCallback() {public void onThresholdExceeded(long waitTime) {//waitTime is the actual time in queue//do something... like fire off a web hook, etc}
});
请注意, addQueueWaitTimeCallback
方法在队列值和相伴的QueueWaitTimeCallback
回调实现中花费的最长时间为毫秒。 如果超过最大阈值,则在消息接收期间将异步调用onThresholdExceeded
方法;否则,将被onThresholdExceeded
。 此外, onThresholdExceeded
将接收实际队列等待时间作为参数。
告诉我Mo
要启动Moo实例,您有多种选择,包括配置AWS的AmazonSQS
实例或仅传递密钥,机密和队列名称,如下所示:
为队列中的时间添加最大阈值
SQS sqs = new SQS(System.getProperty("key"), System.getProperty("secret"), System.getProperty("queue"));
接下来,您可以将零附加到许多QueueWaitTimeCallback
实例,如下所示:
为队列中的时间添加最大阈值
sqs.addQueueWaitTimeCallback(600000, new QueueWaitTimeCallback() {public void onThresholdExceeded(long actualWaitTime) {//do something -- fire off SNS message?}
});
在这种情况下,如果消息在队列中的时间超过10分钟,我将添加一个要调用的回调。 注意,这些QueueWaitTimeCallback
回调由队列读取器实例触发。 因此,例如, QueueWaitTimeCallback
当然可以启动其自身的更多实例。
这是一个示例JSON文档,您可能希望将其放入SQS队列中:
为队列中的时间添加最大阈值
{ "employees":[{ "firstName":"John", "lastName":"Doe" },{ "firstName":"Anna", "lastName":"Smith" },{ "firstName":"Peter", "lastName":"Jones" }
]}
发送和接收此消息与使用Ahoy!时完全一样。 例如,要发送消息,只需将String
传递给send
方法:
为队列中的时间添加最大阈值
sqs.send(json, new SendCallback() {public void onSend(String messageId) {//messageId is from SQS}
});
注意, send
方法带有一个可选的SendCallback
。
通过receive
方法接收消息,该方法需要一个强制的ReceiveCallback
– 对于从队列中接收到的每个消息,该回调将被异步调用。 每个实例将接收放置在队列中的消息以及消息的SQS ID。
为队列中的时间添加最大阈值
sqs.receive(new ReceiveCallback() {public void onReceive(String messageId, String message) {//do something w/the message -- in this case it's JSON}
});
注意,如果在收到消息后,Moo注意到消息在队列中等待的时间超过为关联的QueueWaitTimeCallback
配置的最大队列等待时间阈值,则Moo将调用它。 注意,Moo可以调用多个实例。 因此,您可以建立一条链来随着时间的增加采取各种行动。
请记住,队列的长度通常是一个滞后指标。 实际含义的度量标准是消息在队列中的停留时间。 这是一个可行的指标, Moo使您能够对此做点事情! 你能挖一下它么?
翻译自: https://www.javacodegeeks.com/2013/10/all-other-metrics-are-useless.html