如果您曾经需要使用RabbitMQ来串行处理消息,并且有一群监听器来处理消息,那么我所看到的最好方法是在监听器上使用“独占消费者”标志,每个监听器上有1个线程来处理消息。
专用使用者标志可确保只有1个使用者可以从特定队列中读取消息,并且该使用者上的1个线程可确保按顺序处理消息。 但是有一个问题,我待会儿再讲。
让我用基于Spring Boot和Spring Integration的RabbitMQ消息使用者来演示这种行为。
首先,这是用于使用Spring java配置设置队列的配置,请注意,由于这是Spring Boot应用程序,因此在将Spring-amqp库添加到依赖项列表时,它将自动创建RabbitMQ连接工厂:
@Configuration
@Configuration
public class RabbitConfig {@Autowiredprivate ConnectionFactory rabbitConnectionFactory;@Beanpublic Queue sampleQueue() {return new Queue("sample.queue", true, false, false);}}
给定这个示例队列,一个从该队列获取消息并对其进行处理的侦听器如下所示,该流程是使用出色的Spring集成Java DSL库编写的:
@Configuration
public class RabbitInboundFlow {private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);@Autowiredprivate RabbitConfig rabbitConfig;@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();listenerContainer.setConnectionFactory(this.connectionFactory);listenerContainer.setQueues(this.rabbitConfig.sampleQueue());listenerContainer.setConcurrentConsumers(1);listenerContainer.setExclusive(true);return listenerContainer;}@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer())).transform(Transformers.objectToString()).handle((m) -> {logger.info("Processed {}", m.getPayload());}).get();}}
该流程非常简洁地用inboundFlow方法表示,RabbitMQ的消息有效负载从字节数组转换为String,最后只需将消息记录到日志中即可进行处理。
该流程的重要部分是侦听器配置,请注意将使用者设置为独占使用者的标志,并且在该使用者中将线程处理数设置为1。即使仅启动了应用程序的多个实例,该处理数也被设置为1。其中一个监听器将能够连接和处理消息。
现在来看问题,考虑一种情况,消息处理需要一段时间才能完成,并且在消息处理期间会回滚。 如果处理消息的应用程序实例在处理此类消息的过程中被停止,则行为是另一个实例将开始处理队列中的消息,当停止的实例回滚消息时,该回滚然后将邮件传递给新的排他消费者,从而使邮件混乱。
- 如果您有兴趣进一步探索它,可以使用以下github项目来使用此功能:https://github.com/bijukunjummen/test-rabbit-exclusive。
翻译自: https://www.javacodegeeks.com/2014/12/rabbitmq-processing-messages-serially-using-spring-integration-java-dsl.html