本文是我们名为“ Spring Integration for EAI ”的学院课程的一部分。
在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来,您将深入研究Spring Integration的基础知识,例如通道,转换器和适配器。 在这里查看 !
目录
- 1.简介 2.准备环境 3. JMS适配器:接收
- 3.1。 入站通道适配器:活动接收 3.2。 入站通道适配器:无源接收
4. JMS适配器:发送 5.使用网关 6.消息转换 7. JMS支持的消息通道 8.动态目标解析 9. AMQP集成 - 9.1。 安装 9.2。 演示应用
1.简介
本教程重点介绍如何将应用程序与Spring Integration和JMS消息传递集成。 为此,我将首先向您展示如何安装Active MQ,它将是本教程中的代理。 下一部分将显示使用Spring Integration JMS通道适配器发送和接收JMS消息的示例。 在这些示例之后,我们将看到一些通过配置消息转换和目标解析来自定义这些调用的方法。
本教程的最后一部分简要介绍了如何将Spring Integration与AMQP协议一起使用。 它将完成RabbitMQ的安装,最后给出一个基本的消息传递示例。
本教程由以下部分组成:
- 介绍
- 准备环境
- JMS适配器:接收
- JMS适配器:发送
- 使用网关
- 讯息转换
- JMS支持的消息通道
- 动态目的地解析
- AMQP集成
2.准备环境
如果要通过JMS发送消息,则首先需要一个代理。 本教程中包含的示例是通过Active MQ(一种开源消息传递代理)执行的。 在本节中,我将帮助您安装服务器并实现一个简单的Spring应用程序,以测试它是否已正确设置。 该说明基于Windows系统。 如果您已经安装了服务器,则跳过此部分。
第一步是从Apache.org下载Apache MQ服务器。 下载完成后,只需将其解压缩到您选择的文件夹中即可。
要启动服务器,你只需要执行其位于Apache的ActiveMQ的-5.9.0 \ bin文件夹中文件的ActiveMQ。
好的,服务器正在运行。 现在我们只需要实现该应用程序。 我们将创建一个生产者,一个使用者,一个spring配置文件和一个测试。
制片人
您可以使用任何Java类代替我的TicketOrder
对象。
public class JmsProducer {@Autowired@Qualifier("jmsTemplate")private JmsTemplate jmsTemplate;public void convertAndSendMessage(TicketOrder order) {jmsTemplate.convertAndSend(order);}public void convertAndSendMessage(String destination, TicketOrder order) {jmsTemplate.convertAndSend(destination, order);}
}
消费者
public class SyncConsumer {@Autowiredprivate JmsTemplate jmsTemplate;public TicketOrder receive() {return (TicketOrder) jmsTemplate.receiveAndConvert("test.sync.queue");}
}
Spring配置文件
<bean id="consumer" class="xpadro.spring.integration.consumer.SyncConsumer"/>
<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616" />
</bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/>
</bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="syncTestQueue"/>
</bean><!-- Destinations -->
<bean id="syncTestQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="test.sync.queue"/>
</bean>
考试
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate SyncConsumer consumer;@Testpublic void testReceiving() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage(order);Thread.sleep(2000);TicketOrder receivedOrder = consumer.receive();assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
}
如果测试通过,则说明所有设置正确。 现在,我们可以转到下一部分。
3. JMS适配器:接收
Spring Integration提供了多个适配器和网关来接收来自JMS队列或主题的消息。 下面简要讨论这些适配器:
- 入站通道适配器 :它在内部使用JmsTemplate主动从JMS队列或主题接收消息。
- 消息驱动通道适配器 :内部使用Spring MessageListener容器被动接收消息。
入站通道适配器:活动接收
本节说明如何使用上一节中介绍的第一个适配器。
JMS入站通道适配器主动轮询队列以从中检索消息。 由于它使用轮询器,因此您必须在Spring配置文件中对其进行定义。 适配器检索到消息后,它将通过指定的消息通道发送到消息传递系统中。 然后,我们可以使用端点(如转换器,过滤器等)来处理消息,也可以将其发送给服务激活器。
本示例从JMS队列检索票单消息并将其发送到服务激活器,服务激活器将对其进行处理并确认订单。 通过将订单发送到某种存储库来确认该订单,该存储库具有包含所有已注册订单的简单列表。
我们使用与“ 2准备环境”部分中相同的生产者:
<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure -->
<!-- Connection factory and jmsTemplate configuration -->
<!-- as seen in the second section --><!-- Destinations -->
<bean id="toIntQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="int.sync.queue"/>
</bean>
测试将使用生产者将消息发送到“ toIntQueue”。 现在,我们将设置Spring Integration配置:
Integration-jms.xml
<context:component-scan base-package="xpadro.spring.integration"/><int-jms:inbound-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel"/><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/><int:poller id="poller" default="true" fixed-delay="1000"/>
JMS入站通道适配器将使用定义的轮询器从“ toIntQueue”中检索消息。 您必须为适配器配置轮询器,否则它将抛出运行时异常。 在这种情况下,我们定义了一个默认的轮询器。 这意味着任何需要轮询的端点都将使用此轮询器。 如果未配置默认轮询器,则需要为每个主动检索消息的端点定义一个特定的轮询器。
消费者
服务激活器只是一个bean(通过组件扫描自动检测到):
@Component("ticketProcessor")
public class TicketProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketProcessor.class);private static final String ERROR_INVALID_ID = "Order ID is invalid";@Autowiredprivate OrderRepository repository;public void processOrder(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}private boolean isInvalidOrder(TicketOrder order) {if (order.getFilmId() == -1) {return true;}return false;}
}
在前面的代码片段中, processOrder
方法接收一个TicketOrder
对象并直接对其进行处理。 但是,您可以改为定义消息 <?>或Message <TicketOrder>以便接收消息。 这样,您将可以访问消息的有效负载及其标题。
还要注意,该方法返回void。 由于消息流在此处结束,因此我们不需要返回任何内容。 如果需要,您还可以定义服务适配器的回复通道并返回确认。 此外,例如,我们随后将向该回复通道订阅端点或网关,以便将确认发送到另一个JMS队列,将其发送到Web服务或将其存储到数据库。
最后,让我们看一下测试以了解如何执行所有测试:
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate OrderRepository repository;@Testpublic void testSendToIntegration() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage("int.sync.queue", order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());}
}
我已将Thread.sleep
为四秒钟,以等待消息发送。 我们本可以使用while循环来检查是否已收到消息,直到达到超时为止。
入站通道适配器:无源接收
JMS接收部分的第二部分使用消息驱动的通道适配器。 这样,消息一旦发送到队列,便会立即传递到适配器,而无需使用轮询器。 这是我们向其订阅者传递消息的消息通道。
该示例与上一节中看到的示例非常相似。 我将仅显示在配置中所做的更改。
我从上一个示例更改的唯一内容是spring集成配置:
<context:component-scan base-package="xpadro.spring.integration"/><int-jms:message-driven-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel" /><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/>
我删除了轮询器,并更改了消息驱动通道适配器的JMS入站适配器。 而已; 适配器将被动地接收消息并将其传递到jmsChannel
。
请考虑到消息侦听器适配器至少需要以下组合之一:
- 消息侦听器容器。
- 连接工厂和目的地。
在我们的示例中,我们使用了第二个选项。 目标在适配器配置中指定,连接工厂在jms-config文件中定义,该文件也由测试导入。
4. JMS适配器:发送
在上一节中,我们已经了解了如何接收外部系统发送到JMS队列的消息。 本节向您展示出站通道适配器,使您可以在系统之外发送JMS消息。
与入站适配器相比,出站适配器只有一种类型。 该适配器在内部使用JmsTemplate
发送消息,并且为了配置此适配器,您将需要指定以下至少一项:
- 一个JmsTemplate。
- 连接工厂和目的地。
与入站示例一样,我们使用第二个选项将消息发送到JMS队列。 配置如下:
对于此示例,我们将为jms配置(jms-config.xml)创建一个新队列。 这是我们的Spring Integration应用程序将消息发送到的位置:
<bean id="toJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.jms.queue"/>
</bean>
好的,现在我们使用JMS出站适配器配置集成配置:
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-channel-adapter id="jmsAdapter" channel="requestChannel" destination="toJmsQueue"/>
我们正在使用网关作为邮件系统的入口。 测试将使用此接口发送新的TicketOrder
对象。 网关将接收消息并将其放入requestChannel
通道。 由于它是直接通道 ,它将被发送到JMS出站通道适配器。
适配器收到一个Spring Integration消息。 然后,它可以通过两种方式发送消息:
- 将消息转换为JMS消息。 这是通过将适配器的属性“ extract-payload”设置为true(默认值)来完成的。 这是我们在示例中使用的选项。
- 按原样发送消息,即Spring Integration消息。 您可以通过将“ extract-payload”属性设置为false来完成此操作。
该决定取决于期望您的消息的系统类型。 如果另一个应用程序是Spring Integration应用程序,则可以使用第二种方法。 否则,请使用默认值。 在我们的示例中,另一端有一个简单的Spring JMS应用程序。 因此,我们必须选择第一个选项。
继续我们的示例,现在我们看一下测试,该测试使用网关接口发送消息,并使用自定义使用者接收消息。 在此测试中,使用者将扮演一个JMS应用程序的角色,该应用程序使用jmsTemplate
从JMS队列中检索它:
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-out-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutboundConfig {@Autowiredprivate SyncConsumer consumer;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);TicketOrder receivedOrder = consumer.receive("to.jms.queue");assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
}
5.使用网关
除了通道适配器之外,Spring Integration还提供了入站和出站网关。 您可能还记得以前的教程,网关提供了与外部系统的双向通信,这意味着发送和接收或接收和回复操作。 在这种情况下,它允许请求或重试操作。
在本节中,我们将看到一个使用JMS出站网关的示例。 网关将向队列发送JMS消息,并等待答复。 如果未发送回任何答复,则网关将抛出MessageTimeoutException 。
Spring Integration配置
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway id="inGateway" default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel"/><int:channel id="jmsReplyChannel"/><int:service-activator method="registerOrderConfirmation" input-channel="jmsReplyChannel" ref="ticketProcessor"/>
流程如下:
- 包装在Spring Integration Message中的TicketOrder将通过“ inGateway”网关进入消息传递系统。
- 网关会将消息放入“ requestChannel”通道。
- 通道将消息发送到其订阅的端点JMS出站网关。
- JMS出站网关提取消息的有效负载,并将其包装为JMS消息。
- 网关发送消息并等待答复。
- 当答复到来时,网关以包装在JMS消息中的TicketConfirmation形式,将获得有效负载并将其包装到Spring Integration消息中。
- 该消息将发送到“ jmsReplyChannel”通道,服务激活器(TicketProcessor)将在该通道中处理该消息并将其注册到我们的OrderRepository。
订单处理器非常简单。 它收到TicketConfirmation并将其添加到票证存储库:
@Component("ticketProcessor")
public class TicketProcessor {@Autowiredprivate OrderRepository repository;public void registerOrderConfirmation(TicketConfirmation confirmation) {repository.confirmOrder(confirmation);}
}
考试
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutGatewayConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("321", conf.getId());}
}
外部系统
为了完全理解该示例,我将向您展示将消息传递到JMS队列时发生的情况。
侦听Spring Integration发送消息的队列,有一个侦听器asyncConsumer
:
<bean id="toAsyncJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.async.jms.queue"/>
</bean><!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory"><jms:listener destination="to.async.jms.queue" ref="asyncConsumer"/>
</jms:listener-container>
侦听器接收到该消息,并使用票证确认创建新消息并进行回复。 注意,我们必须将回复消息的相关性ID设置为与请求消息相同的值。 这将使客户知道我们正在响应哪个消息。 另外,我们将目标设置为请求消息中配置的回复通道。
@Component("asyncConsumer")
public class AsyncConsumer implements MessageListener {@Autowiredprivate JmsTemplate template;@Overridepublic void onMessage(Message order) {final Message msgOrder = order;TicketOrder orderObject;try {orderObject = (TicketOrder) ((ObjectMessage) order).getObject();} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}float amount = 5.95f * orderObject.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("321", orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount);try {template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setJMSCorrelationID(msgOrder.getJMSCorrelationID());return message;}});} catch (JmsException | JMSException e) {throw JmsUtils.convertJmsAccessException((JMSException) e);}}
}
6.消息转换
消息通道适配器和网关都使用消息转换器将传入消息转换为Java类型,或者采用相反的方式。 转换器必须实现MessageConverter接口:
public interface MessageConverter {<P> Message<P> toMessage(Object object);<P> Object fromMessage(Message<P> message);}
Spring Integration带有MessageConverter
接口的两种实现:
MapMessageConverter
它的fromMessage
方法使用两个键创建一个新的HashMap:
- 有效负载:值为消息的有效负载(
message.getPayload
)。 - 标头:该值是另一个HashMap,具有来自原始消息的所有标头。
“ toMessage”方法期望一个具有相同结构(有效负载和标头键)的Map实例,并构造一个Spring Integration消息。
SimpleMessageConverter
这是适配器和网关使用的默认转换器。 您可以从源代码中看到它与对象之间的转换:
public Message<?> toMessage(Object object) throws Exception {if (object == null) {return null;}if (object instanceof Message<?>) {return (Message<?>) object;}return MessageBuilder.withPayload(object).build();
}public Object fromMessage(Message<?> message) throws Exception {return (message != null) ? message.getPayload() : null;
}
无论如何,如果需要自己的实现,则可以在通道适配器或网关配置中指定自定义转换器。 例如,使用网关:
<int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel" message-converter="myConverter"/>
只要记住您的转换器应该实现MessageConverter:
@Component("myConverter")
public class MyConverter implements MessageConverter {
7. JMS支持的消息通道
通道适配器和网关用于与外部系统进行通信。 JMS支持的消息通道用于在同一应用程序内的使用者和生产者之间发送和接收JMS消息。 尽管在这种情况下我们仍然可以使用通道适配器,但是使用JMS通道要简单得多。 与集成消息通道的区别在于,JMS通道将使用JMS代理发送消息。 这意味着消息将不仅仅存储在内存通道中。 相反,它将被发送到JMS提供程序,从而也可以使用事务。 如果使用事务,它将按以下方式工作:
- 如果回滚事务,则将消息发送到JMS支持的通道的生产者将不会编写该消息。
- 如果事务回滚,订阅JMS支持的通道的使用者将不会从该通道中删除消息。
对于此功能,Spring Integration提供了两个渠道:点对点和发布/订阅渠道。 它们配置如下:
点对点直接渠道
<int-jms:channel id="jmsChannel" queue="myQueue"/>
发布/订阅频道
<int-jms:publish-subscribe-channel id="jmsChannel" topic="myTopic"/>
在下面的示例中,我们可以看到一个简单的应用程序,其中有两个端点使用JMS支持的通道相互通信。
组态
发送到消息传递系统( TicketOrder
对象)的消息到达服务激活器(票证处理器)。 然后,该处理器将订单( sendJMS
)发送到JMS支持的消息。 订阅此通道,有一个相同的处理器将接收消息( receiveJms
),对其进行处理以创建TicketConfirmation
并将其注册到票证存储库:
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int:service-activator method="sendJms" input-channel="requestChannel" output-channel="jmsChannel" ref="ticketJmsProcessor"/><int-jms:channel id="jmsChannel" queue="syncTestQueue"/><int:service-activator method="receiveJms" input-channel="jmsChannel" ref="ticketJmsProcessor"/>
处理器
实现两种方法: sendJms
和receiveJms
:
@Component("ticketJmsProcessor")
public class TicketJmsProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketJmsProcessor.class);@Autowiredprivate OrderRepository repository;public TicketOrder sendJms(TicketOrder order) {logger.info("Sending order {}", order.getFilmId());return order;}public void receiveJms(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}
}
考试
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsToJmsConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());}
}
JMS支持的通道提供了不同的可能性,例如配置队列名称而不是队列引用或使用目标解析器:
<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>
8.动态目标解析
目标解析器是一个类,它允许我们将目标名称解析为JMS目标。 任何目标解析器都必须实现以下接口:
public interface DestinationResolver {Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)throws JMSException;
}
可以在JMS通道适配器,JMS网关和JMS支持的通道上指定目标解析器。 如果您未明确配置目标解析器,Spring将使用默认实现,即DynamicDestinationResolver 。 下面将解释该解析器作为Spring提供的其他实现:
- DynamicDestinationResolver :通过使用标准JMS Session.createTopic和Session.createQueue方法将目标名称解析为动态目标。
- BeanFactoryDestinationResolver :它将在Spring上下文中查找名称类似于提供的目标名称的bean,并期望其类型为javax.jms.Destination 。 如果找不到,它将抛出DestinationResolutionException 。
- JndiDestinationResolver :它将假定目标名称是JNDI位置。
如果我们不想使用默认的动态解析器,则可以实现自定义解析器,并在所需的端点中对其进行配置。 例如,以下JMS支持的通道使用不同的实现:
<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>
9. AMQP集成
安装
要安装并启动RabbitMQ服务器,您只需要遵循以下步骤即可。 如果您已经安装了服务器,则跳过此部分。
- 第一步是安装RabbitMQ服务器所需的erlang。 转到以下URL,下载系统版本并安装它:
- http://www.erlang.org/download.html
- 下一步是下载并安装RabbitMQ。 如果要使用与本教程相同的版本,请下载版本3.2.4。
- http://www.rabbitmq.com/
- 现在,打开命令提示符。 如果您是Windows用户,则可以通过单击开始菜单并在RabbitMQ文件夹中选择RabbitMQ命令提示符直接进入。
- 激活管理插件
> rabbitmq-plugins enable rabbitmq_management
- 启动服务器
> rabbitmq-server.bat
好的,现在我们将测试RabbitMQ是否已正确安装。 转到http:// localhost:15672并使用“ guest”作为用户名和密码登录。 如果使用的是3.0之前的版本,则端口为55672。
如果您看到网络用户界面,则一切就绪。
演示应用
为了将AMQP与Spring Integration结合使用,我们需要在pom.xml文件中添加以下依赖项:
SpringAMQP(适用于RabbitMQ)
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.3.1.RELEASE</version> </dependency>
Spring Integration AMQP端点
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-amqp</artifactId><version>3.0.2.RELEASE</version> </dependency>
现在,我们将创建一个新的配置文件amqp-config.xml,其中将包含RabbitMQ配置(例如我们在本教程先前使用的JMS的jms-config)。
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory" /><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /><rabbit:admin connection-factory="connectionFactory" /><rabbit:queue name="rabbit.queue" /><rabbit:direct-exchange name="rabbit.exchange"><rabbit:bindings><rabbit:binding queue="rabbit.queue" key="rabbit.key.binding" /></rabbit:bindings></rabbit:direct-exchange> </beans>
下一个文件是Spring Integration文件,其中包含通道和通道适配器:
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:int="http://www.springframework.org/schema/integration"xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd"><context:component-scan base-package="xpadro.spring.integration.amqp"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.amqp.service.AMQPService"/><int:channel id="requestChannel"/><int-amqp:outbound-channel-adapterchannel="requestChannel" amqp-template="amqpTemplate" exchange-name="rabbit.exchange"routing-key="rabbit.key.binding"/><int-amqp:inbound-channel-adapter channel="responseChannel"queue-names="rabbit.queue" connection-factory="connectionFactory" /><int:channel id="responseChannel"/><int:service-activator ref="amqpProcessor" method="process" input-channel="responseChannel"/></beans>
流程如下:
- 测试应用程序向网关发送一条消息,该消息将是一个简单的String。
- 从网关,它将通过“ requestChannel”通道到达出站通道适配器。
- 出站通道适配器将消息发送到“ rabbit.queue”队列。
- 订阅此“ rabbit.queue”队列,我们已经配置了入站通道适配器。 它将接收发送到队列的消息。
- 该消息通过“ responseChannel”通道发送到服务激活器。
- 服务激活器仅打印消息。
用作消息传递系统入口的网关包含一个方法:
public interface AMQPService {@Gatewaypublic void sendMessage(String message); }
服务激活器amqpProcessor非常简单。 它收到一条消息并打印其有效负载:
@Component("amqpProcessor") public class AmqpProcessor {public void process(Message<String> msg) {System.out.println("Message received: "+msg.getPayload());} }
为了完成该示例,以下是通过调用网关包装的服务来启动流的应用程序:
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/amqp-config.xml","/xpadro/spring/integration/test/int-amqp-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestIntegrationAMQPConfig {@Autowiredprivate AMQPService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {String msg = "hello";service.sendMessage(msg);Thread.sleep(2000);} }
翻译自: https://www.javacodegeeks.com/2015/09/enterprise-messaging.html