c#发送讯息_企业讯息

c#发送讯息

本文是我们名为“ EAI的Spring集成 ”的学院课程的一部分。

在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来,您将深入研究Spring Integration的基础知识,例如通道,转换器和适配器。 在这里查看 !

目录

1.简介 2.准备环境 3. JMS适配器:接收
3.1。 入站通道适配器:活动接收 3.2。 入站通道适配器:被动接收
4. JMS适配器:发送 5.使用网关 6.消息转换 7. JMS支持的消息通道 8.动态目标解析 9. AMQP集成
9.1。 安装 9.2。 演示应用

1.简介

本教程重点介绍如何将我们的应用程序与Spring Integration和JMS消息传递集成。 为此,我将首先向您展示如何安装Active MQ,这将是本教程中的代理。 下一部分将显示使用Spring Integration JMS通道适配器发送和接收JMS消息的示例。 在这些示例之后,我们将看到一些通过配置消息转换和目标解析来自定义这些调用的方法。

本教程的最后一部分简要介绍了如何将Spring Integration与AMQP协议一起使用。 它将完成RabbitMQ的安装,最后给出一个基本的消息传递示例。

本教程由以下部分组成:

  1. 介绍
  2. 准备环境
  3. JMS适配器:接收
  4. JMS适配器:发送
  5. 使用网关
  6. 讯息转换
  7. JMS支持的消息通道
  8. 动态目的地解析
  9. AMQP集成

2.准备环境

如果要通过JMS发送消息,则首先需要一个代理。 本教程中包含的示例是通过Active MQ(一种开源消息传递代理)执行的。 在本节中,我将帮助您安装服务器并实现一个简单的Spring应用程序,以测试其是否已正确设置。 该说明基于Windows系统。 如果您已经安装了服务器,则跳过此部分。

第一步是从Apache.org下载Apache MQ服务器。 下载完成后,只需将其解压缩到您选择的文件夹中即可。

要启动服务器,你只需要执行其位于Apache的ActiveMQ的-5.9.0 \ bin文件夹中文件的ActiveMQ。

图1

图1

好的,服务器正在运行。 现在我们只需要实现该应用程序。 我们将创建一个生产者,一个使用者,一个spring配置文件和一个测试。

制片人

您可以使用任何Java类代替我的TicketOrder对象。

public class JmsProducer {@Autowired@Qualifier("jmsTemplate")private JmsTemplate jmsTemplate;public void convertAndSendMessage(TicketOrder order) {jmsTemplate.convertAndSend(order);}public void convertAndSendMessage(String destination, TicketOrder order) {jmsTemplate.convertAndSend(destination, order);}
}

消费者

public class SyncConsumer {@Autowiredprivate JmsTemplate jmsTemplate;public TicketOrder receive() {return (TicketOrder) jmsTemplate.receiveAndConvert("test.sync.queue");}
}

Spring配置文件

<bean id="consumer" class="xpadro.spring.integration.consumer.SyncConsumer"/>
<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616" />
</bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/>
</bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="syncTestQueue"/>
</bean><!-- Destinations -->
<bean id="syncTestQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="test.sync.queue"/>
</bean>

考试

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate SyncConsumer consumer;@Testpublic void testReceiving() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage(order);Thread.sleep(2000);TicketOrder receivedOrder = consumer.receive();assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
}

如果测试通过,则说明所有设置正确。 现在,我们可以转到下一部分。

3. JMS适配器:接收

Spring Integration提供了多个适配器和网关来接收来自JMS队列或主题的消息。 下面简要讨论这些适配器:

  • 入站通道适配器 :它在内部使用JmsTemplate主动接收来自JMS队列或主题的消息。
  • 消息驱动通道适配器 :它内部使用Spring MessageListener容器被动接收消息。

入站通道适配器:活动接收

本节说明如何使用上一节中介绍的第一个适配器。

JMS入站通道适配器主动轮询队列以从中检索消息。 由于它使用轮询器,因此您必须在Spring配置文件中对其进行定义。 适配器检索到消息后,它将通过指定的消息通道发送到消息传递系统中。 然后,我们可以使用端点(如转换器,过滤器等)来处理消息,也可以将其发送给服务激活器。

本示例从JMS队列检索票单消息并将其发送到服务激活器,服务激活器将对其进行处理并确认订单。 通过将订单发送到某种存储库来确认该订单,该存储库具有包含所有已注册订单的简单列表。

我们正在使用与“ 2准备环境”部分中相同的生产者:

<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure -->
<!-- Connection factory and jmsTemplate configuration -->
<!-- as seen in the second section --><!-- Destinations -->
<bean id="toIntQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="int.sync.queue"/>
</bean>

该测试将使用生产者将消息发送到“ toIntQueue”。 现在,我们将设置Spring Integration配置:

Integration-jms.xml

<context:component-scan base-package="xpadro.spring.integration"/><int-jms:inbound-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel"/><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/><int:poller id="poller" default="true" fixed-delay="1000"/>

JMS入站通道适配器将使用定义的轮询器从“ toIntQueue”中检索消息。 您必须为适配器配置轮询器,否则它将抛出运行时异常。 在这种情况下,我们定义了默认轮询器。 这意味着任何需要轮询的端点都将使用此轮询器。 如果未配置默认轮询器,则需要为每个主动检索消息的端点定义一个特定的轮询器。

消费者

服务激活器只是一个bean(通过组件扫描自动检测到):

@Component("ticketProcessor")
public class TicketProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketProcessor.class);private static final String ERROR_INVALID_ID = "Order ID is invalid";@Autowiredprivate OrderRepository repository;public void processOrder(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}private boolean isInvalidOrder(TicketOrder order) {if (order.getFilmId() == -1) {return true;}return false;}
}

在前面的代码片段中, processOrder方法接收一个TicketOrder对象并直接对其进行处理。 但是,您可以改为定义消息 <?>或Message <TicketOrder>以便接收消息。 这样,您就可以访问消息的有效负载及其标题。

还要注意,该方法返回void。 由于消息流在此处结束,因此我们不需要返回任何内容。 如果需要,您还可以定义服务适配器的回复通道并返回确认。 另外,例如,我们随后将向该回复通道订阅端点或网关,以便将确认发送到另一个JMS队列,将其发送到Web服务或将其存储到数据库。

最后,让我们看一下测试以了解如何执行所有测试:

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsConfig {@Autowiredprivate JmsProducer producer;@Autowiredprivate OrderRepository repository;@Testpublic void testSendToIntegration() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());//Sends the message to the jmsTemplate's default destinationproducer.convertAndSendMessage("int.sync.queue", order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());}
}

我已将Thread.sleep为四秒钟,以等待消息发送。 我们本可以使用while循环来检查是否已收到消息,直到达到超时为止。

入站通道适配器:被动接收

JMS接收部分的第二部分使用消息驱动的通道适配器。 这样,消息一旦发送到队列,便会立即传递到适配器,而无需使用轮询器。 这是我们向其订阅者传递消息的消息通道。

该示例与上一节中看到的示例非常相似。 我将仅显示配置中所做的更改。

我从上一个示例更改的唯一内容是spring集成配置:

<context:component-scan base-package="xpadro.spring.integration"/><int-jms:message-driven-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel" /><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/>

我删除了轮询器,并更改了消息驱动通道适配器的JMS入站适配器。 而已; 适配器将被动接收消息并将其传递到jmsChannel

请考虑到消息侦听器适配器至少需要以下组合之一:

  • 消息侦听器容器。
  • 连接工厂和目的地。

在我们的示例中,我们使用了第二个选项。 目标在适配器配置中指定,连接工厂在jms-config文件中定义,该文件也由测试导入。

4. JMS适配器:发送

在上一节中,我们已经了解了如何接收外部系统发送到JMS队列的消息。 本部分向您显示出站通道适配器,使您可以在系统之外发送JMS消息。

与入站适配器相比,出站适配器只有一种类型。 该适配器内部使用JmsTemplate发送消息,并且为了配置此适配器,您将需要指定以下至少一项:

  • 一个JmsTemplate。
  • 连接工厂和目的地。

与入站示例一样,我们使用第二个选项将消息发送到JMS队列。 配置如下:

对于此示例,我们将为jms配置(jms-config.xml)创建一个新队列。 这是我们的Spring Integration应用程序将消息发送到的位置:

<bean id="toJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.jms.queue"/>
</bean>

好的,现在我们使用JMS出站适配器配置集成配置:

<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-channel-adapter id="jmsAdapter" channel="requestChannel" destination="toJmsQueue"/>

我们正在使用网关作为邮件系统的入口。 测试将使用此接口发送新的TicketOrder对象。 网关将接收到消息并将其放入requestChannel通道。 由于它是直接通道 ,它将被发送到JMS出站通道适配器。

适配器收到一个Spring Integration消息。 然后,它可以通过两种方式发送消息:

  • 将消息转换为JMS消息。 这是通过将适配器的属性“ extract-payload”设置为true(默认值)来完成的。 这是我们在示例中使用的选项。
  • 按原样发送消息,即Spring Integration消息。 您可以通过将“ extract-payload”属性设置为false来完成此操作。

该决定取决于期望您的消息的系统类型。 如果另一个应用程序是Spring Integration应用程序,则可以使用第二种方法。 否则,请使用默认值。 在我们的示例中,另一端有一个简单的Spring JMS应用程序。 因此,我们必须选择第一个选项。

继续我们的示例,现在我们看一下测试,该测试使用网关接口发送消息,并使用自定义使用者接收消息。 在此测试中,使用者将扮演一个JMS应用程序的角色,该应用程序使用jmsTemplate从JMS队列中检索它:

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-out-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutboundConfig {@Autowiredprivate SyncConsumer consumer;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);TicketOrder receivedOrder = consumer.receive("to.jms.queue");assertNotNull(receivedOrder);assertEquals(1, receivedOrder.getFilmId());assertEquals(5, receivedOrder.getQuantity());}
}

5.使用网关

除了通道适配器之外,Spring Integration还提供了入站和出站网关。 您可能还记得以前的教程,网关提供了与外部系统的双向通信,这意味着发送和接收或接收和回复操作。 在这种情况下,它允许请求或重试操作。

在本节中,我们将看到一个使用JMS出站网关的示例。 网关会将JMS消息发送到队列,然后等待答复。 如果未发送回任何答复,则网关将抛出MessageTimeoutException 。

Spring Integration配置

<context:component-scan base-package="xpadro.spring.integration"/><int:gateway id="inGateway" default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel"/><int:channel id="jmsReplyChannel"/><int:service-activator method="registerOrderConfirmation" input-channel="jmsReplyChannel" ref="ticketProcessor"/>

流程如下:

  1. 包装在Spring Integration Message中的TicketOrder将通过“ inGateway”网关进入消息传递系统。
  2. 网关会将消息放入“ requestChannel”通道。
  3. 通道将消息发送到其订阅的端点JMS出站网关。
  4. JMS出站网关提取消息的有效负载,并将其包装为JMS消息。
  5. 网关发送消息并等待答复。
  6. 当答复到来时,网关以包装在JMS消息中的TicketConfirmation形式,将获得有效负载并将其包装到Spring Integration消息中。
  7. 该消息将发送到“ jmsReplyChannel”通道,服务激活器(TicketProcessor)将在该通道中处理该消息并将其注册到我们的OrderRepository。

订单处理器非常简单。 它收到TicketConfirmation并将其添加到票证存储库:

@Component("ticketProcessor")
public class TicketProcessor {@Autowiredprivate OrderRepository repository;public void registerOrderConfirmation(TicketConfirmation confirmation) {repository.confirmOrder(confirmation);}
}

考试

@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsOutGatewayConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("321", conf.getId());}
}

外部系统

为了完全理解该示例,我将向您展示将消息传递到JMS队列时发生的情况。

侦听Spring Integration发送消息的队列,有一个侦听器asyncConsumer

<bean id="toAsyncJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="to.async.jms.queue"/>
</bean><!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory"><jms:listener destination="to.async.jms.queue" ref="asyncConsumer"/>
</jms:listener-container>

侦听器接收到该消息,并使用票证确认创建新消息并进行回复。 注意,我们必须将回复消息的相关性ID设置为与请求消息相同的值。 这将使客户知道我们正在响应哪个消息。 另外,我们将目的地设置为请求消息中配置的回复通道。

@Component("asyncConsumer")
public class AsyncConsumer implements MessageListener {@Autowiredprivate JmsTemplate template;@Overridepublic void onMessage(Message order) {final Message msgOrder = order;TicketOrder orderObject;try {orderObject = (TicketOrder) ((ObjectMessage) order).getObject();} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}float amount = 5.95f * orderObject.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("321", orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount);try {template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setJMSCorrelationID(msgOrder.getJMSCorrelationID());return message;}});} catch (JmsException | JMSException e) {throw JmsUtils.convertJmsAccessException((JMSException) e);}}
}

6.消息转换

消息通道适配器和网关都使用消息转换器将传入消息转换为Java类型,或者采用相反的方式。 转换器必须实现MessageConverter接口:

public interface MessageConverter {<P> Message<P> toMessage(Object object);<P> Object fromMessage(Message<P> message);}

Spring Integration带有MessageConverter接口的两种实现:

MapMessageConverter

它的fromMessage方法使用两个键创建一个新的HashMap:

  • 有效负载:该值是消息的有效负载( message.getPayload )。
  • 标头:值是另一个HashMap,其中包含来自原始消息的所有标头。

“ toMessage”方法期望一个具有相同结构(有效负载和标头键)的Map实例,并构造一个Spring Integration消息。

SimpleMessageConverter

这是适配器和网关使用的默认转换器。 您可以从源代码中看到它与对象之间的转换:

public Message<?> toMessage(Object object) throws Exception {if (object == null) {return null;}if (object instanceof Message<?>) {return (Message<?>) object;}return MessageBuilder.withPayload(object).build();
}public Object fromMessage(Message<?> message) throws Exception {return (message != null) ? message.getPayload() : null;
}

无论如何,如果需要自己的实现,则可以在通道适配器或网关配置中指定自定义转换器。 例如,使用网关:

<int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel" message-converter="myConverter"/>

只要记住您的转换器应该实现MessageConverter:

@Component("myConverter")
public class MyConverter implements MessageConverter {

7. JMS支持的消息通道

通道适配器和网关用于与外部系统进行通信。 JMS支持的消息通道用于在同一应用程序内的使用者和生产者之间发送和接收JMS消息。 尽管在这种情况下我们仍然可以使用通道适配器,但是使用JMS通道要简单得多。 与集成消息通道的区别在于,JMS通道将使用JMS代理发送消息。 这意味着消息将不仅仅存储在内存通道中。 相反,它将被发送到JMS提供者,从而也可以使用事务。 如果使用事务,它将按以下方式工作:

  • 如果回滚事务,则将消息发送到JMS支持的通道的生产者将不会编写该消息。
  • 如果事务回滚,订阅JMS支持的通道的使用者将不会从中删除消息。

对于此功能,Spring Integration提供了两个渠道:点对点和发布/订阅渠道。 它们配置如下:

点对点直接渠道

<int-jms:channel id="jmsChannel" queue="myQueue"/>

发布/订阅频道

<int-jms:publish-subscribe-channel id="jmsChannel" topic="myTopic"/>

在以下示例中,我们可以看到一个简单的应用程序,其中有两个端点使用JMS支持的通道相互通信。

组态

发送到消息传递系统( TicketOrder对象)的消息到达服务激活器(票证处理器)。 然后,该处理器将订单( sendJMS )发送到JMS支持的消息。 订阅此通道,有一个相同的处理器将接收消息( receiveJms ),对其进行处理以创建TicketConfirmation并将其注册到票证存储库:

<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/><int:channel id="requestChannel"/><int:service-activator method="sendJms" input-channel="requestChannel" output-channel="jmsChannel" ref="ticketJmsProcessor"/><int-jms:channel id="jmsChannel" queue="syncTestQueue"/><int:service-activator method="receiveJms" input-channel="jmsChannel" ref="ticketJmsProcessor"/>

处理器

实现两种方法: sendJmsreceiveJms

@Component("ticketJmsProcessor")
public class TicketJmsProcessor {private static final Logger logger = LoggerFactory.getLogger(TicketJmsProcessor.class);@Autowiredprivate OrderRepository repository;public TicketOrder sendJms(TicketOrder order) {logger.info("Sending order {}", order.getFilmId());return order;}public void receiveJms(TicketOrder order) {logger.info("Processing order {}", order.getFilmId());float amount = 5.95f * order.getQuantity();TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount);repository.confirmOrder(confirmation);}
}

考试

@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml","/xpadro/spring/integration/test/int-jms-jms-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestIntegrationJmsToJmsConfig {@Autowiredprivate OrderRepository repository;@Autowiredprivate TicketService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {TicketOrder order = new TicketOrder(1, 5, new Date());service.sendOrder(order);Thread.sleep(4000);assertEquals(1, repository.getConfirmations().size());assertNotNull(repository.getConfirmations().get(0));TicketConfirmation conf = repository.getConfirmations().get(0);assertEquals("123", conf.getId());}
}

JMS支持的通道提供了不同的可能性,例如配置队列名称而不是队列引用或使用目标解析器:

<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>

8.动态目标解析

目标解析器是一个类,它允许我们将目标名称解析为JMS目标。 任何目标解析器都必须实现以下接口:

public interface DestinationResolver {Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)throws JMSException;
}

可以在JMS通道适配器,JMS网关和JMS支持的通道上指定目标解析器。 如果您未明确配置目标解析器,Spring将使用默认实现,即DynamicDestinationResolver 。 下面将解释该解析器作为Spring提供的其他实现:

  • DynamicDestinationResolver :通过使用标准JMS Session.createTopic和Session.createQueue方法将目标名称解析为动态目标。
  • BeanFactoryDe​​stinationResolver :它将在Spring上下文中查找具有类似提供的目标名称的名称的bean,并期望其类型为javax.jms.Destination 。 如果找不到它,它将抛出DestinationResolutionException 。
  • JndiDestinationResolver :它将假定目标名称是JNDI位置。

如果我们不想使用默认的动态解析器​​,则可以实现自定义解析器,并在所需的端点中对其进行配置。 例如,以下JMS支持的通道使用不同的实现:

<int-jms:channel id="jmsChannel" queue-name="myQueue"destination-resolver="myDestinationResolver"/>

9. AMQP集成

安装

要安装和启动RabbitMQ服务器,您只需要按照以下步骤操作即可。 如果您已经安装了服务器,则跳过此部分。

  1. 第一步是安装RabbitMQ服务器所需的erlang。 转到以下URL,下载系统版本并安装它:
  • http://www.erlang.org/download.html
  • 下一步是下载并安装RabbitMQ。 如果要使用与本教程相同的版本,请下载版本3.2.4。
    • http://www.rabbitmq.com/
  • 现在,打开命令提示符。 如果您是Windows用户,则可以通过单击开始菜单并在RabbitMQ文件夹中选择RabbitMQ命令提示符直接进入。
  • 激活管理插件
  • > rabbitmq-plugins enable rabbitmq_management
  • 启动服务器
  • > rabbitmq-server.bat

    好的,现在我们将测试RabbitMQ是否已正确安装。 转到http:// localhost:15672并使用“ guest”作为用户名和密码登录。 如果使用的是3.0之前的版本,则端口为55672。

    如果您看到网络用户界面,则一切就绪。

    演示应用

    为了将AMQP与Spring Integration结合使用,我们需要在pom.xml文件中添加以下依赖项:

    SpringAMQP(适用于RabbitMQ)

    <dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.3.1.RELEASE</version>
    </dependency>

    Spring Integration AMQP端点

    <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-amqp</artifactId><version>3.0.2.RELEASE</version>
    </dependency>

    现在,我们将创建一个新的配置文件amqp-config.xml,其中将包含RabbitMQ配置(例如我们在本教程先前使用的JMS的jms-config)。

    <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><rabbit:connection-factory id="connectionFactory" /><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /><rabbit:admin connection-factory="connectionFactory" /><rabbit:queue name="rabbit.queue" /><rabbit:direct-exchange name="rabbit.exchange"><rabbit:bindings><rabbit:binding queue="rabbit.queue" key="rabbit.key.binding" /></rabbit:bindings></rabbit:direct-exchange>
    </beans>

    下一个文件是Spring Integration文件,其中包含通道和通道适配器:

    <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:int="http://www.springframework.org/schema/integration"xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd"><context:component-scan base-package="xpadro.spring.integration.amqp"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.amqp.service.AMQPService"/><int:channel id="requestChannel"/><int-amqp:outbound-channel-adapterchannel="requestChannel" amqp-template="amqpTemplate" exchange-name="rabbit.exchange"routing-key="rabbit.key.binding"/><int-amqp:inbound-channel-adapter channel="responseChannel"queue-names="rabbit.queue" connection-factory="connectionFactory" /><int:channel id="responseChannel"/><int:service-activator ref="amqpProcessor" method="process" input-channel="responseChannel"/></beans>

    流程如下:

    1. 测试应用程序向网关发送一条消息,该消息将是一个简单的String。
    2. 从网关,它将通过“ requestChannel”通道到达出站通道适配器。
    3. 出站通道适配器将消息发送到“ rabbit.queue”队列。
    4. 订阅此“ rabbit.queue”队列,我们​​已经配置了入站通道适配器。 它将接收发送到队列的消息。
    5. 该消息通过“ responseChannel”通道发送到服务激活器。
    6. 服务激活器仅打印消息。

    充当消息传递系统入口点的网关包含一个方法:

    public interface AMQPService {@Gatewaypublic void sendMessage(String message);
    }

    服务激活器amqpProcessor非常简单。 它收到一条消息并打印其有效负载:

    @Component("amqpProcessor")
    public class AmqpProcessor {public void process(Message<String> msg) {System.out.println("Message received: "+msg.getPayload());}
    }

    为了完成该示例,以下是通过调用网关包装的服务来启动流的应用程序:

    @ContextConfiguration(locations = {"/xpadro/spring/integration/test/amqp-config.xml","/xpadro/spring/integration/test/int-amqp-config.xml"})
    @RunWith(SpringJUnit4ClassRunner.class)
    public class TestIntegrationAMQPConfig {@Autowiredprivate AMQPService service;@Testpublic void testSendToJms() throws InterruptedException, RemoteException {String msg = "hello";service.sendMessage(msg);Thread.sleep(2000);}
    }

    翻译自: https://www.javacodegeeks.com/2015/09/enterprise-messaging.html

    c#发送讯息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336906.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql触发器中case语句_一个很好的触发器例子(case when)

CREATE OR REPLACE TRIGGER TR_CGD1BEFORE UPDATE OR INSERT OR DELETE ON BB_MJ_CGD1_TBFOR EACH ROWDECLAREV_COUNT NUMBER;BEGINCASEWHEN UPDATING OR INSERTING THENIF :NEW.DJZT 结束 THEN:NEW.DJZT : 结束;ELSESELECT COUNT(*)INTO V_COUNTFROM BB_MJ_KCRKD2_TB RKD2WHE…

java cr_WildFly 10 CR 2发布– Java EE 7,Java 8,Hibernate 5,JavaScript支持热重载

java cr昨天&#xff0c;WildFly团队发布了最新版本的WildFly 10 。 CR2很可能是预计于十月份发布最终版本之前的最后一个版本。 即使主要支持的Java EE规范是7&#xff0c;WildFly 8和WildFly 9也提供了许多新功能&#xff0c;而WildFly 9和WildFly 9现在制作了三个服务器版本…

python位置参数ppt_如何在Python中使用一个或多个相同的位置参数?

介绍..如果我们正在编写一个对两个数字执行算术运算的程序&#xff0c;则可以将它们定义为两个位置参数。但是由于它们是相同种类的/ python数据类型的参数&#xff0c;因此使用nargs选项告诉argparse您确实需要两种相同的类型可能更有意义。怎么做..1.让我们编写一个程序来减去…

java设计模式教程_Java设计模式教程

java设计模式教程课程大纲 架构和计算机科学中的设计模式是记录特定专业领域中设计问题的解决方案的正式方法。 这个想法是由建筑师Christopher Alexander在建筑领域引入的&#xff0c;并已被修改为包括计算机科学在内的其他各个学科。 设计模式是针对软件设计中给定上下文中常…

python剑指offer面试题_剑指Offer(Python语言)面试题38

面试题38&#xff1a;字符串的排列题目:输入一个字符串&#xff0c;打印出该字符串中字符的所有排列。例如&#xff0c;输入字符串abc,则打印出由字符a,b,c所能排列出来的所有字符串abc,acb&#xff0c;bac,bca和cba。# -*- coding:utf-8 -*-class Solution:def Permutation(se…

侬娜·杰尔_杰尔·地狱

侬娜杰尔什么是JAR地狱&#xff1f; &#xff08;或者是classpath地狱&#xff1f;还是依赖地狱&#xff1f;&#xff09;在考虑使用Maven或OSGi等现代开发工具时&#xff0c;哪些方面仍然有意义&#xff1f; 有趣的是&#xff0c;似乎没有对这些问题的结构化答案&#xff08;…

cmake 安装mysql5.6_CMAKE安装MYSQL 5.6.10

mysql5.6.10不支持configure安装了&#xff0c;提供了CMAKE安装方式 #sudo groupadd mysql #sudo useradd mysql -g mysql #sudo mkdir -p /home/mysql/data #sudo mkdir /usr/local/mysql #sudo mkdir /var/log/mysql #sudo chown -R mysql:mysql /home/mysql/data #sudo cho…

java 多线程变量可见性_Java多线程:易变变量,事前关联和内存一致性

java 多线程变量可见性什么是volatile变量&#xff1f; volatile是Java中的关键字。 您不能将其用作变量或方法名称。 期。 我们什么时候应该使用它&#xff1f; 哈哈&#xff0c;对不起&#xff0c;没办法。 当我们在多线程环境中与多个线程共享变量时&#xff0c;通常使用v…

Mysql运行在内核空间_思考mysql内核之初级系列6—innodb文件管理 | 学步园

在上一篇里面&#xff0c;bingxi和alex思考了information_schema&#xff0c;这个一直在innodb外围打转。没有进入到innodb的内部。在后续的文章中&#xff0c;以innodb的为主&#xff0c;逐个思考。Bingxi和alex今天了解了fil文件管理。对应的文件为&#xff1a;D:/mysql-5.1.…

pcl_openmap_OpenMap教程第2部分–使用MapHandler构建基本地图应用程序–第1部分

pcl_openmap1.简介 在第一个教程中&#xff0c;我们创建了一个基本的OpenMap GIS应用程序&#xff0c;该应用程序在JFrame中显示一个从文件系统加载的具有一个形状图层的地图。 该教程基于com.bbn.openmap.app.example.SimpleMap 。 在该教程中&#xff0c;我们使用了以下OpenM…

mysql7.5安装教程_CentOS7.5下yum安装MySQL8图文教程

卸载MariaDB1.列出所有安装的MariaDB rpm 包rpm -qa | grep mariadb2.强制卸载rpm -e --nodeps mariadb-libs-5.5.60-1.el7_5.x86_64安装MySQL1. 环境CentOS7.52. 获取MySQL最新版 rpm包yum仓库下载MySQLyum localinstall https://repo.mysql.com//mysql80-community-release-e…

字符串url获取参数_如何从URL查询字符串获取示例参数或将其附加到URL查询字符串(示例)?...

字符串url获取参数让我们剖析几个简单的用例&#xff0c;并查看视图参数的工作原理&#xff08;视图参数名称不是强制性的&#xff0c;以匹配通过URL查询字符串传递的请求参数&#xff0c;但在本文中&#xff0c;我们将重点讨论这种情况&#xff09;&#xff1a; 情况1 在inde…

mysql和sqlserver分页的区别_关于SQLServer和MySQL 查询分页语句区别

首先来定义几个要用到的参数(例子)t_user数据表int currentPage ; //当前页int pageRecord ; //每页显示记录数关于SqlServer数据库分页SQL语句为:String sql "select top "pageRecord " * from t_user where id not in (select top "(currentPage-1)*pag…

java 微型数据库_Java 9代码工具:使用Java微型基准测试工具的实践会话

java 微型数据库用肉眼看&#xff0c;基准测试似乎只是确定执行某些代码需要花费多长时间的简单问题。 但是&#xff0c;通常情况下&#xff0c;这是幼稚的方法。 提供具有准确和可重复结果的有意义的基准并非易事。 在本文中&#xff0c;我们将向您介绍OpenJDK代码工具项目&a…

mysql快速随机_MySQL随机取数据最高效的方法

mysql随机取数据最高效率的方法发现在SQL语句里有一个 ORDER BY rand() 这样的一个语句&#xff0c;这个说是用着方便&#xff0c;但是效率实在是太低了&#xff0c;于是我用了以下的方法来优化&#xff0c;就是用JOIN表的方法来达到这个取随机数据行的方法&#xff0c;你可以用…

部署被测软件应用和中间件_使用FlexDeploy对融合中间件应用程序进行自动化软件测试...

部署被测软件应用和中间件自动化软件测试是任何软件组织都必须执行的强制性活动之一&#xff0c;以保证其产品质量。 但是&#xff0c;此过程通常变得相当复杂&#xff0c;尤其是涉及由多个不同部分组成的现代复杂系统的自动化测试时。 所有这些部分都基于不同的技术&#xff0…

python batch_size_python 實現動態 batch size,多張圖片如何堆疊轉成指針

前陣子有發問&#xff0c;關於 python 動態 batch size 如何實現&#xff0c;目前解決之前問題現在遇到的問題是當我把兩張圖片直接用 numpy concat 堆疊在一起 進行 acl.util.numpy_to_ptr 轉換成指針進行推理後&#xff0c;得到的結果只有第一張圖片是對的&#xff0c;第二張…

投行数据_投行对Java的二十大核心访谈问答

投行数据这是在金融领域&#xff08;主要是在大型投资银行&#xff09;共享Java核心访谈问题和答案的新系列。 在JP Morgan&#xff0c;Morgan Stanley&#xff0c;Barclays或Goldman Sachs上会问许多这些Java面试问题。 银行主要从多线程 &#xff0c; 集合 &#xff0c;序列化…

php中mysql_fetch_row_php中的mysql_fetch_row,mysql_fetch_array,mysql_fetch_object

1.mysql_fetch_rowmysql_fetch_row&#xff0c;这个函数是从结果集中取一行作为枚举数据&#xff0c;从和指定的结果标识关联的结果集中取得一行数据并作为数组返回。每个结果的列储存在一个数组的单元中&#xff0c;偏移量从 0 开始。 注意&#xff0c;这里是从0开始偏移&…

primefaces_通过OmniFaces缓存组件以编程方式缓存PrimeFaces图表

primefaces在这篇文章中&#xff0c;您将看到如何结合PrimeFaces和OmniFaces获得可缓存的图表。 为了使事情变得简单&#xff0c;我们将使用PrimeFaces 折线图。 对于这种图表&#xff0c;我们可以在页面中使用<p&#xff1a;chart />标签和一个简单的托管bean。 因此&am…