spring集成mq
我最近参加了在拉斯维加斯举行的2016年Spring大会 ,很幸运地看到了我在软件世界中长期敬佩的一些人。 我亲自遇到了其中的两个人,他们实际上合并了几年前我与Spring Integration相关的一些次要贡献– Gary Russel和Artem Bilan ,他们启发了我重新审视我已经有一段时间没有使用过的Spring Integration 。
我再一次想起了Spring Integration如何使任何复杂的Enterprise集成方案看起来都很简单。 我很高兴看到基于Spring Integration Java的DSL现在已经完全集成到Spring Integration伞和更高层次的抽象中,例如Spring Cloud Stream(感谢我的好朋友和这个项目的贡献者,此介绍
Soby Chacko ),这使得某些消息驱动的场景更加容易。
在本文中,我只是在回顾与RabbitMQ的一个非常简单的集成方案,在以后的文章中,将使用Spring Cloud Stream重新实现它。
考虑一个场景,其中两个服务之间通过RabbitMQ代理相互通信,其中一个生成某种工作,另一种处理该工作。
制片人
可以使用Spring Integration Java DSL以以下方式在代码中表示工作单元产生/分发部分:
@Configuration
public class WorksOutbound {@Autowiredprivate RabbitConfig rabbitConfig;@Beanpublic IntegrationFlow toOutboundQueueFlow() {return IntegrationFlows.from("worksChannel").transform(Transformers.toJson()).handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate())).get();}
}
这是非常容易理解的-该流程首先从名为“ worksChannel”的通道中读取一条消息,然后将该消息转换为json,然后使用出站通道适配器将其分发给RabbitMQ交换。 现在,消息如何到达名为“ worksChannel”的通道-我已经通过Messaging网关(Spring Integration世界的入口)配置了消息-
@MessagingGateway
public interface WorkUnitGateway {@Gateway(requestChannel = "worksChannel")void generate(WorkUnit workUnit);}
因此,现在,如果Java客户端想要向Rabbitmq派遣“工作单元”,则调用将如下所示:
WorkUnit sampleWorkUnit = new WorkUnit(UUID.randomUUID().toString(), definition);
workUnitGateway.generate(sampleWorkUnit);
我在这里刷了几件事-特别是Rabbit MQ配置,该配置由工厂运行,但是可以在此处使用
消费者
沿着生产者的思路,消费者流程将从接收来自RabbitMQ队列的消息开始,将其转换为域模型,然后处理该消息,使用Spring Integration Java DSL通过以下方式表示:
@Configuration
public class WorkInbound {@Autowiredprivate RabbitConfig rabbitConfig;@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic IntegrationFlow inboundFlow() {return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, rabbitConfig.worksQueue()).concurrentConsumers(3)).transform(Transformers.fromJson(WorkUnit.class)).handle("workHandler", "process").get();}
}
代码应该很直观,上面的workHandler是一个简单的Java pojo,看起来像这样,完成了仅记录有效负载的非常重要的工作:
@Service
public class WorkHandler {private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);public void process(WorkUnit workUnit) {LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());}
}
本质上就是这样,如果使用直接的Java和原始RabbitMQ库尝试使用Spring Integration,它将提供相当出色的代码外观。
Spring Cloud Stream使整个设置更加简单,并且将成为以后的主题。
如果您有兴趣尝试一下,我已将整个代码发布在我的github仓库中 。
翻译自: https://www.javacodegeeks.com/2016/08/integrating-rabbit-mq-using-spring-integration-java-dsl.html
spring集成mq