企业讯息

本文是我们名为“ Spring Integration for EAI ”的学院课程的一部分。

在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来,您将深入研究Spring Integration的基础知识,例如通道,转换器和适配器。 在这里查看 !

目录

1.简介 2.准备环境 3. JMS适配器:接收
3.1。 入站通道适配器:活动接收 3.2。 入站通道适配器:无源接收
4. JMS适配器:发送 5.使用网关 6.消息转换 7. JMS支持的消息通道 8.动态目标解析 9. AMQP集成
9.1。 安装 9.2。 演示应用

1.简介

本教程重点介绍如何将应用程序与Spring Integration和JMS消息传递集成。 为此,我将首先向您展示如何安装Active MQ,它将是本教程中的代理。 下一部分将显示使用Spring Integration JMS通道适配器发送和接收JMS消息的示例。 在这些示例之后,我们将看到一些通过配置消息转换和目标解析来自定义这些调用的方法。

本教程的最后一部分简要介绍了如何将Spring Integration与AMQP协议一起使用。 它将完成RabbitMQ的安装,最后给出一个基本的消息传递示例。

本教程由以下部分组成:

  1. 介绍
  2. 准备环境
  3. JMS适配器:接收
  4. JMS适配器:发送
  5. 使用网关
  6. 讯息转换
  7. JMS支持的消息通道
  8. 动态目的地解析
  9. AMQP集成

2.准备环境

如果要通过JMS发送消息,则首先需要一个代理。 本教程中包含的示例是通过Active MQ(一种开源消息传递代理)执行的。 在本节中,我将帮助您安装服务器并实现一个简单的Spring应用程序,以测试它是否已正确设置。 该说明基于Windows系统。 如果您已经安装了服务器,则跳过此部分。

第一步是从Apache.org下载Apache MQ服务器。 下载完成后,只需将其解压缩到您选择的文件夹中即可。

要启动服务器,你只需要执行其位于Apache的ActiveMQ的-5.9.0 \ bin文件夹中文件的ActiveMQ。

图1

图1

好的,服务器正在运行。 现在我们只需要实现该应用程序。 我们将创建一个生产者,一个使用者,一个spring配置文件和一个测试。

制片人

您可以使用任何Java类代替我的TicketOrder对象。

public class JmsProducer {@Autowired@Qualifier("jmsTemplate")private JmsTemplate jmsTemplate;public void convertAndSendMessage(TicketOrder order) {jmsTemplate.convertAndSend(order);}public void convertAndSendMessage(String destination, TicketOrder order) {jmsTemplate.convertAndSend(destination, order);}
}

消费者

public class SyncConsumer {@Autowiredprivate JmsTemplate jmsTemplate;public TicketOrder receive() {return (TicketOrder) jmsTemplate.receiveAndConvert("test.sync.queue");}
}

Spring配置文件

<bean id="consumer" class="xpadro.spring.integration.consumer.SyncConsumer"/>
<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616" />
</bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/>
</bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="syncTestQueue"/>
</bean><!-- Destinations -->
<bean id="syncTestQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="test.sync.queue"/>
</bean>

考试

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate SyncConsumer consumer;@Testpublic void testReceiving() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage(order);Thread.sleep(2000);TicketOrder receivedOrder = consumer.receive();assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
}

如果测试通过,则说明所有设置正确。 现在,我们可以转到下一部分。

3. JMS适配器:接收

Spring Integration提供了多个适配器和网关来接收来自JMS队列或主题的消息。 下面简要讨论这些适配器:

  • 入站通道适配器 :它在内部使用JmsTemplate主动从JMS队列或主题接收消息。
  • 消息驱动通道适配器 :内部使用Spring MessageListener容器被动接收消息。

入站通道适配器:活动接收

本节说明如何使用上一节中介绍的第一个适配器。

JMS入站通道适配器主动轮询队列以从中检索消息。 由于它使用轮询器,因此您必须在Spring配置文件中对其进行定义。 适配器检索到消息后,它将通过指定的消息通道发送到消息传递系统中。 然后,我们可以使用端点(如转换器,过滤器等)来处理消息,也可以将其发送给服务激活器。

本示例从JMS队列检索票单消息并将其发送到服务激活器,服务激活器将对其进行处理并确认订单。 通过将订单发送到某种存储库来确认该订单,该存储库具有包含所有已注册订单的简单列表。

我们使用与“ 2准备环境”部分中相同的生产者:

<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure -->
<!-- Connection factory and jmsTemplate configuration -->
<!-- as seen in the second section --><!-- Destinations -->
<bean id="toIntQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="int.sync.queue"/>
</bean>

测试将使用生产者将消息发送到“ toIntQueue”。 现在,我们将设置Spring Integration配置:

Integration-jms.xml

<context:component-scan base-package="xpadro.spring.integration"/><int-jms:inbound-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel"/><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/><int:poller id="poller" default="true" fixed-delay="1000"/>

JMS入站通道适配器将使用定义的轮询器从“ toIntQueue”中检索消息。 您必须为适配器配置轮询器,否则它将抛出运行时异常。 在这种情况下,我们定义了一个默认的轮询器。 这意味着任何需要轮询的端点都将使用此轮询器。 如果未配置默认轮询器,则需要为每个主动检索消息的端点定义一个特定的轮询器。

消费者

服务激活器只是一个bean(通过组件扫描自动检测到):

@Component("ticketProcessor")
public class TicketProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketProcessor.class);private static final String ERROR_INVALID_ID = "Order ID is invalid";@Autowiredprivate OrderRepository repository;public void processOrder(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}private boolean isInvalidOrder(TicketOrder order) {if (order.getFilmId() == -1) {return true;}return false;}
}

在前面的代码片段中, processOrder方法接收一个TicketOrder对象并直接对其进行处理。 但是,您可以改为定义消息 <?>或Message <TicketOrder>以便接收消息。 这样,您将可以访问消息的有效负载及其标题。

还要注意,该方法返回void。 由于消息流在此处结束,因此我们不需要返回任何内容。 如果需要,您还可以定义服务适配器的回复通道并返回确认。 此外,例如,我们随后将向该回复通道订阅端点或网关,以便将确认发送到另一个JMS队列,将其发送到Web服务或将其存储到数据库。

最后,让我们看一下测试以了解如何执行所有测试:

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate OrderRepository repository;@Testpublic void testSendToIntegration() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage("int.sync.queue", order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());}
}

我已将Thread.sleep为四秒钟,以等待消息发送。 我们本可以使用while循环来检查是否已收到消息,直到达到超时为止。

入站通道适配器:无源接收

JMS接收部分的第二部分使用消息驱动的通道适配器。 这样,消息一旦发送到队列,便会立即传递到适配器,而无需使用轮询器。 这是我们向其订阅者传递消息的消息通道。

该示例与上一节中看到的示例非常相似。 我将仅显示在配置中所做的更改。

我从上一个示例更改的唯一内容是spring集成配置:

<context:component-scan base-package="xpadro.spring.integration"/><int-jms:message-driven-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel" /><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/>

我删除了轮询器,并更改了消息驱动通道适配器的JMS入站适配器。 而已; 适配器将被动地接收消息并将其传递到jmsChannel

请考虑到消息侦听器适配器至少需要以下组合之一:

  • 消息侦听器容器。
  • 连接工厂和目的地。

在我们的示例中,我们使用了第二个选项。 目标在适配器配置中指定,连接工厂在jms-config文件中定义,该文件也由测试导入。

4. JMS适配器:发送

在上一节中,我们已经了解了如何接收外部系统发送到JMS队列的消息。 本节向您展示出站通道适配器,使您可以在系统之外发送JMS消息。

与入站适配器相比,出站适配器只有一种类型。 该适配器在内部使用JmsTemplate发送消息,并且为了配置此适配器,您将需要指定以下至少一项:

  • 一个JmsTemplate。
  • 连接工厂和目的地。

与入站示例一样,我们使用第二个选项将消息发送到JMS队列。 配置如下:

对于此示例,我们将为jms配置(jms-config.xml)创建一个新队列。 这是我们的Spring Integration应用程序将消息发送到的位置:

<bean id="toJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.jms.queue"/>
</bean>

好的,现在我们使用JMS出站适配器配置集成配置:

<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-channel-adapter id="jmsAdapter" channel="requestChannel" destination="toJmsQueue"/>

我们正在使用网关作为邮件系统的入口。 测试将使用此接口发送新的TicketOrder对象。 网关将接收消息并将其放入requestChannel通道。 由于它是直接通道 ,它将被发送到JMS出站通道适配器。

适配器收到一个Spring Integration消息。 然后,它可以通过两种方式发送消息:

  • 将消息转换为JMS消息。 这是通过将适配器的属性“ extract-payload”设置为true(默认值)来完成的。 这是我们在示例中使用的选项。
  • 按原样发送消息,即Spring Integration消息。 您可以通过将“ extract-payload”属性设置为false来完成此操作。

该决定取决于期望您的消息的系统类型。 如果另一个应用程序是Spring Integration应用程序,则可以使用第二种方法。 否则,请使用默认值。 在我们的示例中,另一端有一个简单的Spring JMS应用程序。 因此,我们必须选择第一个选项。

继续我们的示例,现在我们看一下测试,该测试使用网关接口发送消息,并使用自定义使用者接收消息。 在此测试中,使用者将扮演一个JMS应用程序的角色,该应用程序使用jmsTemplate从JMS队列中检索它:

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-out-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutboundConfig {@Autowiredprivate SyncConsumer consumer;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);TicketOrder receivedOrder = consumer.receive("to.jms.queue");assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
}

5.使用网关

除了通道适配器之外,Spring Integration还提供了入站和出站网关。 您可能还记得以前的教程,网关提供了与外部系统的双向通信,这意味着发送和接收或接收和回复操作。 在这种情况下,它允许请求或重试操作。

在本节中,我们将看到一个使用JMS出站网关的示例。 网关将向队列发送JMS消息,并等待答复。 如果未发送回任何答复,则网关将抛出MessageTimeoutException 。

Spring Integration配置

<context:component-scan base-package="xpadro.spring.integration"/><int:gateway id="inGateway" default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel"/><int:channel id="jmsReplyChannel"/><int:service-activator method="registerOrderConfirmation" input-channel="jmsReplyChannel" ref="ticketProcessor"/>

流程如下:

  1. 包装在Spring Integration Message中的TicketOrder将通过“ inGateway”网关进入消息传递系统。
  2. 网关会将消息放入“ requestChannel”通道。
  3. 通道将消息发送到其订阅的端点JMS出站网关。
  4. JMS出站网关提取消息的有效负载,并将其包装为JMS消息。
  5. 网关发送消息并等待答复。
  6. 当答复到来时,网关以包装在JMS消息中的TicketConfirmation形式,将获得有效负载并将其包装到Spring Integration消息中。
  7. 该消息将发送到“ jmsReplyChannel”通道,服务激活器(TicketProcessor)将在该通道中处理该消息并将其注册到我们的OrderRepository。

订单处理器非常简单。 它收到TicketConfirmation并将其添加到票证存储库:

@Component("ticketProcessor")
public class TicketProcessor {@Autowiredprivate OrderRepository repository;public void registerOrderConfirmation(TicketConfirmation confirmation) {repository.confirmOrder(confirmation);}
}

考试

@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutGatewayConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("321", conf.getId());}
}

外部系统

为了完全理解该示例,我将向您展示将消息传递到JMS队列时发生的情况。

侦听Spring Integration发送消息的队列,有一个侦听器asyncConsumer

<bean id="toAsyncJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.async.jms.queue"/>
</bean><!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory"><jms:listener destination="to.async.jms.queue" ref="asyncConsumer"/>
</jms:listener-container>

侦听器接收到该消息,并使用票证确认创建新消息并进行回复。 注意,我们必须将回复消息的相关性ID设置为与请求消息相同的值。 这将使客户知道我们正在响应哪个消息。 另外,我们将目标设置为请求消息中配置的回复通道。

@Component("asyncConsumer")
public class AsyncConsumer implements MessageListener {@Autowiredprivate JmsTemplate template;@Overridepublic void onMessage(Message order) {final Message msgOrder = order;TicketOrder orderObject;try {orderObject = (TicketOrder) ((ObjectMessage) order).getObject();} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}float amount = 5.95f * orderObject.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("321", orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount);try {template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setJMSCorrelationID(msgOrder.getJMSCorrelationID());return message;}});} catch (JmsException | JMSException e) {throw JmsUtils.convertJmsAccessException((JMSException) e);}}
}

6.消息转换

消息通道适配器和网关都使用消息转换器将传入消息转换为Java类型,或者采用相反的方式。 转换器必须实现MessageConverter接口:

public interface MessageConverter {<P> Message<P> toMessage(Object object);<P> Object fromMessage(Message<P> message);}

Spring Integration带有MessageConverter接口的两种实现:

MapMessageConverter

它的fromMessage方法使用两个键创建一个新的HashMap:

  • 有效负载:值为消息的有效负载( message.getPayload )。
  • 标头:该值是另一个HashMap,具有来自原始消息的所有标头。

“ toMessage”方法期望一个具有相同结构(有效负载和标头键)的Map实例,并构造一个Spring Integration消息。

SimpleMessageConverter

这是适配器和网关使用的默认转换器。 您可以从源代码中看到它与对象之间的转换:

public Message<?> toMessage(Object object) throws Exception {if (object == null) {return null;}if (object instanceof Message<?>) {return (Message<?>) object;}return MessageBuilder.withPayload(object).build();
}public Object fromMessage(Message<?> message) throws Exception {return (message != null) ? message.getPayload() : null;
}

无论如何,如果需要自己的实现,则可以在通道适配器或网关配置中指定自定义转换器。 例如,使用网关:

<int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel" message-converter="myConverter"/>

只要记住您的转换器应该实现MessageConverter:

@Component("myConverter")
public class MyConverter implements MessageConverter {

7. JMS支持的消息通道

通道适配器和网关用于与外部系统进行通信。 JMS支持的消息通道用于在同一应用程序内的使用者和生产者之间发送和接收JMS消息。 尽管在这种情况下我们仍然可以使用通道适配器,但是使用JMS通道要简单得多。 与集成消息通道的区别在于,JMS通道将使用JMS代理发送消息。 这意味着消息将不仅仅存储在内存通道中。 相反,它将被发送到JMS提供程序,从而也可以使用事务。 如果使用事务,它将按以下方式工作:

  • 如果回滚事务,则将消息发送到JMS支持的通道的生产者将不会编写该消息。
  • 如果事务回滚,订阅JMS支持的通道的使用者将不会从该通道中删除消息。

对于此功能,Spring Integration提供了两个渠道:点对点和发布/订阅渠道。 它们配置如下:

点对点直接渠道

<int-jms:channel id="jmsChannel" queue="myQueue"/>

发布/订阅频道

<int-jms:publish-subscribe-channel id="jmsChannel" topic="myTopic"/>

在下面的示例中,我们可以看到一个简单的应用程序,其中有两个端点使用JMS支持的通道相互通信。

组态

发送到消息传递系统( TicketOrder对象)的消息到达服务激活器(票证处理器)。 然后,该处理器将订单( sendJMS )发送到JMS支持的消息。 订阅此通道,有一个相同的处理器将接收消息( receiveJms ),对其进行处理以创建TicketConfirmation并将其注册到票证存储库:

<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int:service-activator method="sendJms" input-channel="requestChannel" output-channel="jmsChannel" ref="ticketJmsProcessor"/><int-jms:channel id="jmsChannel" queue="syncTestQueue"/><int:service-activator method="receiveJms" input-channel="jmsChannel" ref="ticketJmsProcessor"/>

处理器

实现两种方法: sendJmsreceiveJms

@Component("ticketJmsProcessor")
public class TicketJmsProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketJmsProcessor.class);@Autowiredprivate OrderRepository repository;public TicketOrder sendJms(TicketOrder order) {logger.info("Sending order {}", order.getFilmId());return order;}public void receiveJms(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}
}

考试

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsToJmsConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());}
}

JMS支持的通道提供了不同的可能性,例如配置队列名称而不是队列引用或使用目标解析器:

<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>

8.动态目标解析

目标解析器是一个类,它允许我们将目标名称解析为JMS目标。 任何目标解析器都必须实现以下接口:

public interface DestinationResolver {Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)throws JMSException;
}

可以在JMS通道适配器,JMS网关和JMS支持的通道上指定目标解析器。 如果您未明确配置目标解析器,Spring将使用默认实现,即DynamicDestinationResolver 。 下面将解释该解析器作为Spring提供的其他实现:

  • DynamicDestinationResolver :通过使用标准JMS Session.createTopic和Session.createQueue方法将目标名称解析为动态目标。
  • BeanFactoryDe​​stinationResolver :它将在Spring上下文中查找名称类似于提供的目标名称的bean,并期望其类型为javax.jms.Destination 。 如果找不到,它将抛出DestinationResolutionException 。
  • JndiDestinationResolver :它将假定目标名称是JNDI位置。

如果我们不想使用默认的动态解析器​​,则可以实现自定义解析器,并在所需的端点中对其进行配置。 例如,以下JMS支持的通道使用不同的实现:

<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>

9. AMQP集成

安装

要安装并启动RabbitMQ服务器,您只需要遵循以下步骤即可。 如果您已经安装了服务器,则跳过此部分。

  1. 第一步是安装RabbitMQ服务器所需的erlang。 转到以下URL,下载系统版本并安装它:
  • http://www.erlang.org/download.html
  • 下一步是下载并安装RabbitMQ。 如果要使用与本教程相同的版本,请下载版本3.2.4。
    • http://www.rabbitmq.com/
  • 现在,打开命令提示符。 如果您是Windows用户,则可以通过单击开始菜单并在RabbitMQ文件夹中选择RabbitMQ命令提示符直接进入。
  • 激活管理插件
  • > rabbitmq-plugins enable rabbitmq_management
  • 启动服务器
  • > rabbitmq-server.bat

    好的,现在我们将测试RabbitMQ是否已正确安装。 转到http:// localhost:15672并使用“ guest”作为用户名和密码登录。 如果使用的是3.0之前的版本,则端口为55672。

    如果您看到网络用户界面,则一切就绪。

    演示应用

    为了将AMQP与Spring Integration结合使用,我们需要在pom.xml文件中添加以下依赖项:

    SpringAMQP(适用于RabbitMQ)

    <dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.3.1.RELEASE</version>
    </dependency>

    Spring Integration AMQP端点

    <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-amqp</artifactId><version>3.0.2.RELEASE</version>
    </dependency>

    现在,我们将创建一个新的配置文件amqp-config.xml,其中将包含RabbitMQ配置(例如我们在本教程先前使用的JMS的jms-config)。

    <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory" /><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /><rabbit:admin connection-factory="connectionFactory" /><rabbit:queue name="rabbit.queue" /><rabbit:direct-exchange name="rabbit.exchange"><rabbit:bindings><rabbit:binding queue="rabbit.queue" key="rabbit.key.binding" /></rabbit:bindings></rabbit:direct-exchange>
    </beans>

    下一个文件是Spring Integration文件,其中包含通道和通道适配器:

    <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:int="http://www.springframework.org/schema/integration"xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd"><context:component-scan base-package="xpadro.spring.integration.amqp"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.amqp.service.AMQPService"/><int:channel id="requestChannel"/><int-amqp:outbound-channel-adapterchannel="requestChannel" amqp-template="amqpTemplate" exchange-name="rabbit.exchange"routing-key="rabbit.key.binding"/><int-amqp:inbound-channel-adapter channel="responseChannel"queue-names="rabbit.queue" connection-factory="connectionFactory" /><int:channel id="responseChannel"/><int:service-activator ref="amqpProcessor" method="process" input-channel="responseChannel"/></beans>

    流程如下:

    1. 测试应用程序向网关发送一条消息,该消息将是一个简单的String。
    2. 从网关,它将通过“ requestChannel”通道到达出站通道适配器。
    3. 出站通道适配器将消息发送到“ rabbit.queue”队列。
    4. 订阅此“ rabbit.queue”队列,我们​​已经配置了入站通道适配器。 它将接收发送到队列的消息。
    5. 该消息通过“ responseChannel”通道发送到服务激活器。
    6. 服务激活器仅打印消息。

    用作消息传递系统入口的网关包含一个方法:

    public interface AMQPService {@Gatewaypublic void sendMessage(String message);
    }

    服务激活器amqpProcessor非常简单。 它收到一条消息并打印其有效负载:

    @Component("amqpProcessor")
    public class AmqpProcessor {public void process(Message<String> msg) {System.out.println("Message received: "+msg.getPayload());}
    }

    为了完成该示例,以下是通过调用网关包装的服务来启动流的应用程序:

    @ContextConfiguration(locations = {"/xpadro/spring/integration/test/amqp-config.xml","/xpadro/spring/integration/test/int-amqp-config.xml"})
    @RunWith(SpringJUnit4ClassRunner.class)
    public class TestIntegrationAMQPConfig {@Autowiredprivate AMQPService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {String msg = "hello";service.sendMessage(msg);Thread.sleep(2000);}
    }

    翻译自: https://www.javacodegeeks.com/2015/09/enterprise-messaging.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/356791.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring MVC 学习笔记一 HelloWorld

Spring MVC 学习笔记一 HelloWorld Spring MVC 的使用可以按照以下步骤进行&#xff08;使用Eclipse&#xff09;&#xff1a; 加入JAR包在web.xml中配置DispatcherServlet加入Spring MVC的配置文件编写处理请求的处理器&#xff0c;并添加对应注解编写视图下面按照国际惯例先来…

php 避免xss_PHP防止XSS注入

我们在做网站的时候&#xff0c;经常有input提交&#xff0c;通常前端对input中的内容不做判断&#xff0c;只做不为空等简单的操作。但是&#xff0c;有的input中会提交一些javascript或者html,会给网站造成一定的危害。为此&#xff0c;防止XSS注入的任务交给了后端&#xff…

全字符微信名 php,PHP方法处理微信昵称特殊符号过滤

我们在通过PHP获取微信昵称&#xff0c;并且存于数据库的时候&#xff0c;由于一些昵称带有特殊符号&#xff0c;所以存不进去&#xff0c;这时候我们可以通过下面的方式来处理。方法二protected function removeEmoji($clean_text) {// Match Emoticons$regexEmoticons /[\x{…

[转载]我的PMP复习备考经验谈(下篇)——一本关于PMP备考的小指南

原文地址&#xff1a;我的PMP复习备考经验谈(下篇)——一本关于PMP备考的小指南作者&#xff1a;羽少宸PMP复习备考经验谈&#xff08;下篇&#xff09;——PMP备考小指南 总结经验&#xff0c;展望未来&#xff0c;以此纪念PMP复习备考时光 继上篇&#xff0c;猛击直达四、如何…

php项目私有化部署保护代码,ThinkPHP项目安全配置解决方案

前言:ThinkPHP MVC框架越来被开发者接受,众多的开发者选择了这个框架&#xff0c;也有很多的优秀项目使用的ThinkPHP框架。最近整理了一下ThinkPHP项目的一些安全配置。可能并不适用全部项目&#xff0c;大家可以适当的使用如下的安全配置。前置知识:web容器和各类组件的版本&a…

大数据分析 es hive_使用Hive和iReport进行大数据分析

大数据分析 es hive每个JJ Abrams的电视连续剧疑犯追踪从主要人物芬奇先生一个下列叙述情节开始&#xff1a;“ 你是被监视。 政府拥有一个秘密系统-每天每天每小时都会对您进行监视的机器。 我知道是因为...我建造了它。 “当然&#xff0c;我们的技术人员知道得更多。 庞大的…

java+jsp+网页制作,java+jsp+mysql网页制作总结(2)

错误&#xff1a;url通过get传递时汉字出错解决&#xff1a;url通过get传递时汉字会乱码&#xff0c;1.String name1request.getParameter("name");String name new String(name1.getBytes("ISO-8859-1"),"gbk");2.通过post传递参数错误&#x…

Redis聚类

本文是我们学院课程的一部分&#xff0c;标题为Redis NoSQL键值存储 。 这是Redis的速成班。 您将学习如何安装Redis并启动服务器。 此外&#xff0c;您将在Redis命令行中乱七八糟。 接下来是更高级的主题&#xff0c;例如复制&#xff0c;分片和集群&#xff0c;同时还介绍了…

域策略禁用usb

文档及模板可在 http://pan.baidu.com/s/1qYTcjTy 下载 pro_usb_users.adm 此模板可禁用到 指定盘符&#xff0c;针对用户策略 pro_usb_computers.adm 此模板 针对计算机&#xff0c;一般只要它就好了。 可以从 3 个方面下手 adm 配置 文件。注册表usb驱动其实 adm配置文件&…

Redis分片

本文是我们学院课程的一部分&#xff0c;标题为Redis NoSQL键值存储 。 这是Redis的速成班。 您将学习如何安装Redis并启动服务器。 此外&#xff0c;您将在Redis命令行中乱七八糟。 接下来是更高级的主题&#xff0c;例如复制&#xff0c;分片和集群&#xff0c;同时还介绍了…

Openstack入坑指南

什么是云计算 概念 云计算是一种基于互联网的计算方式&#xff0c;通过这种方式&#xff0c;共享的软硬件资源和信息&#xff0c;可以按需求提供给计算机和其他设备。用户不需要了解”云“中的基础设施细节&#xff0c;不必具有相应的专业知识&#xff0c;也无需直接控制。云计…

MATLAB如何用循环分割,利用Matlab进行分割提取浮游生物

我试图从扫描图像中提取浮游生物.大纲也不错,但是,现在我不知道如何提取图像,因此可以单独保存每个浮游生物.我尝试使用标签,但是有很多噪音,它标出了每一个规格.我想知道是否有更好的方法来做到这一点.这是我的代码&#xff1a;I imread(plankton_2.jpg);figure, imshow(I), …

第一个Python程序

在E:\Python 下新建一个hello.py文件&#xff0c;里面的内容是print(hello world) 进入命令提示窗格&#xff0c;输入E: 点击回车 输入 cd python 点击回车 输入Python hello.py 结果如图 转载于:https://www.cnblogs.com/lgqboke/p/5882049.html

20145219 《信息安全系统设计基础》第01周学习总结

20145219 《信息安全系统设计基础》第01周学习总结 教材学习内容总结 别出心裁的Linux命令学习法 1、Ubuntu快捷键 CTRLALTT:打开终端&#xff1b;CTRLSHIFTT&#xff1a;新建标签页&#xff1b;ALT数字N&#xff1a;终端中切换到第N个标签页&#xff1b;Tab:终端中命令补全&…

拉盖尔多项式 matlab,类氢原子的定态波函数

&nbsp&nbsp&nbsp&nbsp&nbsp&nbsp&nbsp预备知识 球坐标系中的定态薛定谔方程&#xff0c;原子单位制本文使用原子单位制&#xff0e;类氢原子(hydrogen-like atom)被定义为原子核有 $Z$ 个质子(核电荷为 $Ze$)有一个核外电子的原子/离子&#xff0…

再论EM算法的收敛性和K-Means的收敛性

标签&#xff08;空格分隔&#xff09;&#xff1a; 机器学习 &#xff08;最近被一波波的笔试面试淹没了&#xff0c;但是在有两次面试时被问到了同一个问题&#xff1a;K-Means算法的收敛性。在网上查阅了很多资料&#xff0c;并没有看到很清晰的解释&#xff0c;所以希望可以…

杰尔·地狱

什么是JAR地狱&#xff1f; &#xff08;或者是classpath地狱&#xff1f;还是依赖地狱&#xff1f;&#xff09;在考虑使用Maven或OSGi等现代开发工具时&#xff0c;哪些方面仍然有意义&#xff1f; 有趣的是&#xff0c;似乎没有对这些问题的结构化答案&#xff08;即&#…

matlab实验符号计算答案,实验五matlab符号计算

实验五matlab符号计算 实验 5 符号计算 教师评分班级 学号 姓名实验日期 2014 年 6 月 17 日 星期 二 第 1 至 2 节课实验地点实验目的1. 掌握定义符号对象的办法2. 掌握符号表达式的运算法则以及符号矩阵运算3. 掌握求符号函数极限及导数的方法4. 掌握求符号函数定积分和不定积…

Java学习笔记之:Java String类

一、引言 字符串广泛应用在Java编程中&#xff0c;在Java中字符串属于对象&#xff0c;Java提供了String类来创建和操作字符串。 创建字符串最简单的方式如下: String str "Hello world!"; String类型是特殊的引用类型&#xff0c;我们也可以通过实例化的方式来创建 …

WildFly 10 CR 2发布– Java EE 7,Java 8,Hibernate 5,JavaScript支持热重载

昨天&#xff0c;WildFly团队发布了最新版本的WildFly 10 。 CR2很可能是预计于十月份发布最终版本之前的最后一个版本。 即使主要支持的Java EE规范是7&#xff0c;WildFly 8和WildFly 9仍具有许多新功能&#xff0c;该版本现在具有三个服务器版本&#xff0c;实现了Java EE 7…