关于ActiveMQ的消息优先级支持 , 邮件列表上通常会不断出现一些问题,以及有关观察到的行为和“真正支持什么”的好问题? 我希望可以帮助您了解幕后情况以及可以支持的优先级。 详细信息可能会有些麻烦。 如果您对这些细节不感兴趣,请查看ActiveMQ Wiki ,以获取高级概述。
首先,由于ActiveMQ支持JMS 1.1,所以让我们看一下JMS规范对支持“ JMSPriority”的看法:
JMS定义了一个十级优先级值,最低优先级为0,最高优先级为9。 此外,客户应将优先级0-4视为正常优先级,将优先级5-9视为快速优先级。 JMS不需要提供者严格执行消息的优先级排序; 但是,它应该尽力在普通邮件之前传递快速邮件。
ActiveMQ观察到三个不同级别的“优先级”:
- 默认(JMSPriority == 4)
- 高(JMSPriority> 4 && <= 9)
- 低(JMSPriority> 0 && <4)
如果您没有为MessageProducer或单个消息指定优先级(请参阅MessageProducer#send(message,deliveryMode,优先级,timeToLive) ),则ActiveMQ的客户端将默认使用JMSPriority ==4。作为JMS使用者,您可以期望如果生产者未使用优先级或目标上未使用其他形式的选择标准,则按FIFO顺序进行。
规范指出,ActiveMQ还“尽其所能”在“正常”消息之前传递加速消息。 代理使用的消息存储对完成操作有很大的帮助,但是通常,您可以期望代理仅接受JDBC支持的消息存储的严格(0-9)优先级支持。 对于支持KahaDB的消息存储库,仅支持“类别优先级”(“低”,“默认”,“高”,其中每个类别中的优先级并非总是有区别的,即5和9被视为“高”)。 但是,使用正确的设置和消息传递配置文件,即使使用KahaDB,您也可以影响[严格]优先级的排序方式,因此让我们快速看一下。
启用邮件优先级
您可以使用activemq.xml配置文件中的以下设置在队列上启用消息优先级:
<destinationPolicy><policyMap><policyEntries><policyEntry queue='queueName' prioritizeMessages='true' /></policyEntries></policyMap>
</destinationPolicy>
对于queueName
,具有通配符支持,因此您可以在消息层次结构上启用优先级支持。
启用优先级支持后,代理将在其消息游标中使用优先级链表结构,并向KahaDB提示在将消息存储到磁盘时使用优先级类别。 优先级排序的严格程度有不同的级别,但是在最坏的情况下,您可以假定优先级将按类别维护。 以下因素起作用,这些因素控制使用KahaDB存储时优先级排序的严格程度:
- 在队列游标中启用/禁用缓存
- MaxPageInSize,用于从一批存储中分页多少条消息
- 消费者预取
- 过期消息检查
- 代理内存设置
- 持久/非持久消息
下一节将详细介绍KahaDB中为支持优先级而发生的事情,而下一节将探讨代理内存中的情况,并最终将其分发给消费者,并指出与上述不同的因素发挥作用。
KahaDB优先级分类
首先,我们将从如何将消息存储在磁盘上以及如何加载到目标位置开始。 KahaDB(默认消息存储)是基于文件的消息数据库,代理使用该数据库将消息持久存储在“日志”或“日志”中。 代理还通过保持一个单独的“索引”来跟踪日志中包含哪些消息,该“索引”保存有关消息的信息(例如其在日志中的位置,与之关联的目的地,订购等)。 索引还具有消息“优先级”的概念,该概念由三个B + Tree结构实现,每个优先级级别一个(请参见org.apache.activemq.store.kahadb.MessageDatabase中的MessageOrderIndex)。 此实现细节是消息优先级的基础,并且在从存储中删除消息时,对其余的代理有影响。
从商店中检索消息时,将成批完成(maxPageInSize),并且首先检索“ highPriority” BTree中的消息。 当高优先级消息耗尽时,存储将随后提供默认优先级,并随后提供低优先级消息。
您可以像这样设置maxPageInSize:
<policyEntry queue='queueName' prioritizeMessages='true'maxPageSize='500'>
页面大小越大,批处理中的消息数量就越大,每次“快照”一次可以看到的消息越多。 对于进入内存的每个批次,将严格按照商店游标的说明来优先考虑其消息的优先级。 缺点是,如果您的消息量很大,一次输入500条消息可能会耗尽代理内存。 默认设置为200。
消息光标优先级列表
当持久性消息从生产者进入代理时,它们将存储在磁盘上,但也将被缓存在内存中,等待分发给消费者。 这是默认设置,因此无需显式设置。 其背后的想法是能够调度到快速使用方,而不必直接从磁盘检索它(如果使用方变慢,则代理将自动调整自身,以使其在填充后不使用缓存,从而避免OOM)。 这样做的好处是,当对队列使用优先级支持时,用于游标的内部列表将支持严格的优先级(0-9),因此对于当前在内存(在缓存中)的所有消息,它们将按照从高到低的顺序正确排序。 诀窍在于,当缓存中的所有消息都是“低优先级消息”,然后高优先级消息进入代理但由于缓存已满而无法容纳在缓存中时,该消息将消失直接在商店中进行索引,并在“高优先级”索引中进行索引,但是在下一批优先级消息被分页到内存中之前,将无法在低优先级消息之前进行分发。
当NON持久消息进入代理时,它们将不会进入消息存储。 它们将尽可能长时间地保留在内存中,并且仅在内存超过定义的阈值(默认情况下> 70%)时才会推送到磁盘(在临时存储中)。 因此,上面的缓存消息的相同行为适用于非持久消息,即内存中的消息将严格排序(0-9),但是一旦将它们推送到磁盘,则只会观察到类别。
如果禁用游标的缓存(具有以下设置)
<policyEntry queue='queueName' prioritizeMessages='true'useCache='false' />
那么您可以帮助消除上述情况,即当高优先级消息进入时,高速缓存将充满低优先级消息(由于无法分页到内存而卡在磁盘上)。 但是,这样做会降低吞吐量,因为必须先将磁盘中的消息分页到发送给使用者之前,这会减慢调度速度。 但是请注意,这样做时,即使光标中具有优先级列表,您也更有可能看到不遵循“严格”优先级的消息。 但是,它们将正确遵循优先级类别(高,默认,低)。 综上所述,如果禁用缓存,则与启用缓存并填充优先级较低的消息相比,可以更快地发送优先级较高的消息。 但是,仅禁用高速缓存并不能使您获得严格的优先权。
禁用缓存有助于将优先级较高的消息先于优先级较低的消息发送给使用者,但是要使其按预期工作(并且已经使我痛苦),则需要禁用异步message expiry check
。 此过期检查每30秒将邮件分页到内存中,无论它们是否准备好进行分派(默认情况下),并对其执行TTL检查(生存时间),并丢弃那些应过期的邮件。 这种检查有效地将消息带入内存,并且将停滞正常的“页面调度”,足以错过更高优先级的消息。
<policyEntry queue='queueName' prioritizeMessages='true'useCache='false' expireMessagesPeriod='0' >
但是,关闭过期检查将使过期消息在存储区中保留的时间更长,因为唯一的过期检查将在发送之前立即进行,因此请对此进行有根据的决定,以及所有需要修改的ActiveMQ设置。 但是要朝着严格的顺序优先顺序的方向发展,您需要禁用此功能。
最后,消费者的预取在实现“严格订购”方面发挥了作用。 默认情况下,队列使用者的预取设置为1000,这意味着将批量发送1000条消息。 这有助于加快消费者在使用消息时的速度,但是就优先级而言,从本质上讲,它还像消息的缓存一样(如上所述),可能会导致看不到“严格排序”。 如果您的预取中填充了较低优先级的消息,并且代理中出现了一条新的高优先级消息,那么您也不会看到它,直到下一条消息分发给消费者为止。 因此,预取越低,在优先级较低的消息之前看到较高优先级的消息的可能性就越大。 预取为1时,您将始终获得商店游标知道的最高优先级消息。
<policyEntry queue='queueName' prioritizeMessages='true'useCache='false' expireMessagesPeriod='0' queuePrefetch='1' >
客户端消息优先级
ActiveMQ还在消息客户端中内置了优先级支持,并且默认情况下启用了该功能。 这意味着,当将消息发送到您的使用者时(甚至在使用预取功能在您的使用者接收消息之前),它们将被缓存在使用者方,并且默认情况下具有优先级。 这与您是否在代理方使用优先级支持无关。 这可能会影响您在消费者上看到的订购,因此请记住这一点。 要禁用它,请在代理URL上设置以下配置选项,例如tcp://0.0.0.0:61616?jms.messagePrioritySupported=false
但是如上所述,您需要将预取降低到1,以获得获得严格排序的最佳机会。
权衡
因此,最终可以通过KahaDB获得严格排序的消息,但是要进行重大权衡,这并不适用于所有消息传递情况。 您想要优化的快速消息传递吗? 还是要放慢消息传递速度,以实现对优先级的严格排序。 每种情况都不同,应根据具体情况进行评估。 但是,一般而言,您可以依赖类别级别的优先级。
在大队列中对消息进行重新排序并保持高性能是有问题的,大多数Message Queue供应商都做得不好。 ActiveMQ的优先级支持很强,但是正如ActiveMQ Wiki上讨论的描述消息优先级的问题一样,存在另一个很好的选择,即:使用消息选择器并以高优先级的消息最终被首先使用的方式平衡使用者。 这种方法往往会提供更多的灵活性和控制力,但这是另一篇文章。
翻译自: https://www.javacodegeeks.com/2013/04/activemq-message-priorities-how-it-works.html