Spring整合JMS——基于ActiveMQ实现(一)

Spring整合JMS——基于ActiveMQ实现(一)

1.1     JMS简介

       JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

1.2     Spring整合JMS

       对JMS做了一个简要介绍之后,接下来就讲一下Spring整合JMS的具体过程。JMS只是一个标准,真正在使用它的时候我们需要有它的具体实现,这里我们就使用Apache的activeMQ来作为它的实现。所使用的依赖利用Maven来进行管理,具体依赖如下:

 

Xml代码 复制代码 收藏代码
  1. <dependencies>  
  2.         <dependency>  
  3.             <groupId>junit</groupId>  
  4.             <artifactId>junit</artifactId>  
  5.             <version>4.10</version>  
  6.             <scope>test</scope>  
  7.         </dependency>  
  8.         <dependency>  
  9.             <groupId>org.springframework</groupId>  
  10.             <artifactId>spring-context</artifactId>  
  11.             <version>${spring-version}</version>  
  12.         </dependency>  
  13.         <dependency>  
  14.             <groupId>org.springframework</groupId>  
  15.             <artifactId>spring-jms</artifactId>  
  16.             <version>${spring-version}</version>  
  17.         </dependency>  
  18.         <dependency>  
  19.             <groupId>org.springframework</groupId>  
  20.             <artifactId>spring-test</artifactId>  
  21.             <version>${spring-version}</version>  
  22.         </dependency>  
  23.         <dependency>  
  24.             <groupId>javax.annotation</groupId>  
  25.             <artifactId>jsr250-api</artifactId>  
  26.             <version>1.0</version>  
  27.         </dependency>  
  28.         <dependency>  
  29.             <groupId>org.apache.activemq</groupId>  
  30.             <artifactId>activemq-core</artifactId>  
  31.             <version>5.7.0</version>  
  32.         </dependency>  
  33. </dependencies>  

<dependencies>        <dependency>            <groupId>junit</groupId>            <artifactId>junit</artifactId>            <version>4.10</version>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-context</artifactId>            <version>${spring-version}</version>        </dependency>        <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-jms</artifactId>            <version>${spring-version}</version>        </dependency>        <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-test</artifactId>            <version>${spring-version}</version>        </dependency>        <dependency>            <groupId>javax.annotation</groupId>            <artifactId>jsr250-api</artifactId>            <version>1.0</version>        </dependency>        <dependency>            <groupId>org.apache.activemq</groupId>            <artifactId>activemq-core</artifactId>            <version>5.7.0</version>        </dependency></dependencies>

 

1.2.1  activeMQ准备

       既然是使用的apache的activeMQ作为JMS的实现,那么首先我们应该到apache官网上下载activeMQ(http://activemq.apache.org/download.html),进行解压后运行其bin目录下面的activemq.bat文件启动activeMQ。

1.2.2配置ConnectionFactory

       ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。这里我们使用SingleConnectionFactory来作为示例。

Xml代码 复制代码 收藏代码
  1. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>  

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>

 

       这样就定义好产生JMS服务器链接的ConnectionFactory了吗?答案是非也。Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。所以定义一个ConnectionFactory的完整代码应该如下所示:

Xml代码 复制代码 收藏代码
  1. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
  2. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
  3.     <property name="brokerURL" value="tcp://localhost:61616"/>  
  4. </bean>  
  5.   
  6. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
  7. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
  8.     <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
  9.     <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
  10. </bean>  

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL" value="tcp://localhost:61616"/>    </bean>        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>    </bean>

  

1.2.3配置生产者

配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的,所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象。

Xml代码 复制代码 收藏代码
  1. <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
  2. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  3.     <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
  4.     <property name="connectionFactory" ref="connectionFactory"/>  
  5. </bean>  

    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->        <property name="connectionFactory" ref="connectionFactory"/>    </bean>

 

       在真正利用JmsTemplate进行消息发送的时候,我们需要知道消息发送的目的地,即destination。在Jms中有一个用来表示目的地的Destination接口,它里面没有任何方法定义,只是用来做一个标识而已。当我们在使用JmsTemplate进行消息发送时没有指定destination的时候将使用默认的Destination。默认Destination可以通过在定义jmsTemplate bean对象时通过属性defaultDestination或defaultDestinationName来进行注入,defaultDestinationName对应的就是一个普通字符串。在ActiveMQ中实现了两种类型的Destination,一个是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。在定义这两种类型的Destination时我们都可以通过一个name属性来进行构造,如:

 

 

 

 

Xml代码 复制代码 收藏代码
  1. <!--这个是队列目的地,点对点的-->  
  2. <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  3.     <constructor-arg>  
  4.         <value>queue</value>  
  5.     </constructor-arg>  
  6. </bean>  
  7. <!--这个是主题目的地,一对多的-->  
  8. <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
  9.     <constructor-arg value="topic"/>  
  10. </bean>  

    <!--这个是队列目的地,点对点的-->    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg>            <value>queue</value>        </constructor-arg>    </bean>    <!--这个是主题目的地,一对多的-->    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">        <constructor-arg value="topic"/>    </bean>

 

 

 

       假设我们定义了一个ProducerService,里面有一个向Destination发送纯文本消息的方法sendMessage,那么我们的代码就大概是这个样子:

 

 

 

 

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.service.impl;   
  2.     
  3. import javax.annotation.Resource;   
  4. import javax.jms.Destination;   
  5. import javax.jms.JMSException;   
  6. import javax.jms.Message;   
  7. import javax.jms.Session;   
  8.     
  9. import org.springframework.jms.core.JmsTemplate;   
  10. import org.springframework.jms.core.MessageCreator;   
  11. import org.springframework.stereotype.Component;   
  12.     
  13. import com.tiantian.springintejms.service.ProducerService;   
  14.     
  15. @Component  
  16. public class ProducerServiceImpl implements ProducerService {   
  17.     
  18.     private JmsTemplate jmsTemplate;   
  19.        
  20.     public void sendMessage(Destination destination, final String message) {   
  21.         System.out.println("---------------生产者发送消息-----------------");   
  22.         System.out.println("---------------生产者发了一个消息:" + message);   
  23.         jmsTemplate.send(destination, new MessageCreator() {   
  24.             public Message createMessage(Session session) throws JMSException {   
  25.                 return session.createTextMessage(message);   
  26.             }   
  27.         });   
  28.     }    
  29.   
  30.     public JmsTemplate getJmsTemplate() {   
  31.         returnjmsTemplate;   
  32.     }    
  33.   
  34.     @Resource  
  35.     public void setJmsTemplate(JmsTemplate jmsTemplate) {   
  36.         this.jmsTemplate = jmsTemplate;   
  37.     }   
  38.     
  39. }  

package com.tiantian.springintejms.service.impl; import javax.annotation.Resource;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session; import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component; import com.tiantian.springintejms.service.ProducerService; @Componentpublic class ProducerServiceImpl implements ProducerService {     private JmsTemplate jmsTemplate;        public void sendMessage(Destination destination, final String message) {        System.out.println("---------------生产者发送消息-----------------");        System.out.println("---------------生产者发了一个消息:" + message);        jmsTemplate.send(destination, new MessageCreator() {            public Message createMessage(Session session) throws JMSException {                return session.createTextMessage(message);            }        });    }     public JmsTemplate getJmsTemplate() {        returnjmsTemplate;    }     @Resource    public void setJmsTemplate(JmsTemplate jmsTemplate) {        this.jmsTemplate = jmsTemplate;    } }

 

 

 

       我们可以看到在sendMessage方法体里面我们是通过jmsTemplate来发送消息到对应的Destination的。到此,我们生成一个简单的文本消息并把它发送到指定目的地Destination的生产者就配置好了。

1.2.4配置消费者

生产者往指定目的地Destination发送消息后,接下来就是消费者对指定目的地的消息进行消费了。那么消费者是如何知道有生产者发送消息到指定目的地Destination了呢?这是通过Spring为我们封装的消息监听容器MessageListenerContainer实现的,它负责接收信息,并把接收到的信息分发给真正的MessageListener进行处理。每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,这是通过在配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。Spring一共为我们提供了两种类型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。

SimpleMessageListenerContainer会在一开始的时候就创建一个会话session和消费者Consumer,并且会使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容Java EE的JMS限制。

大多数情况下我们还是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer会动态的适应运行时需要,并且能够参与外部的事务管理。它很好的平衡了对JMS提供者要求低、先进功能如事务参与和兼容Java EE环境。

定义处理消息的MessageListener

       要定义处理消息的MessageListener我们只需要实现JMS规范中的MessageListener接口就可以了。MessageListener接口中只有一个方法onMessage方法,当接收到消息的时候会自动调用该方法。

 

 

 

 

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.listener;   
  2.     
  3. import javax.jms.JMSException;   
  4. import javax.jms.Message;   
  5. import javax.jms.MessageListener;   
  6. import javax.jms.TextMessage;   
  7.     
  8. public class ConsumerMessageListener implements MessageListener {   
  9.     
  10.     public void onMessage(Message message) {   
  11.         //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage  
  12.         TextMessage textMsg = (TextMessage) message;   
  13.         System.out.println("接收到一个纯文本消息。");   
  14.         try {   
  15.             System.out.println("消息内容是:" + textMsg.getText());   
  16.         } catch (JMSException e) {   
  17.             e.printStackTrace();   
  18.         }   
  19.     }   
  20.     
  21. }  

package com.tiantian.springintejms.listener; import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener {     public void onMessage(Message message) {        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage        TextMessage textMsg = (TextMessage) message;        System.out.println("接收到一个纯文本消息。");        try {            System.out.println("消息内容是:" + textMsg.getText());        } catch (JMSException e) {            e.printStackTrace();        }    } }

 

  

 

       有了MessageListener之后我们就可以在Spring的配置文件中配置一个消息监听容器了。

Xml代码 复制代码 收藏代码
  1. <!--这个是队列目的地-->  
  2. <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  3.     <constructor-arg>  
  4.         <value>queue</value>  
  5.     </constructor-arg>  
  6. </bean>  
  7. <!-- 消息监听器 -->  
  8. <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>       
  9.   
  10. <!-- 消息监听容器 -->  
  11. <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  12.     <property name="connectionFactory" ref="connectionFactory" />  
  13.     <property name="destination" ref="queueDestination" />  
  14.     <property name="messageListener" ref="consumerMessageListener" />  
  15. </bean>  

    <!--这个是队列目的地-->    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg>            <value>queue</value>        </constructor-arg>    </bean>    <!-- 消息监听器 -->    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>        <!-- 消息监听容器 -->    <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory" />        <property name="destination" ref="queueDestination" />        <property name="messageListener" ref="consumerMessageListener" />    </bean>

 

 

       我们可以看到我们定义了一个名叫queue的ActiveMQQueue目的地,我们的监听器就是监听了发送到这个目的地的消息。

       至此我们的生成者和消费者都配置完成了,这也就意味着我们的整合已经完成了。这个时候完整的Spring的配置文件应该是这样的:

Xml代码 复制代码 收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:jms="http://www.springframework.org/schema/jms"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans   
  6.      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  7.      http://www.springframework.org/schema/context   
  8.      http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  9.     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  10.     http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">  
  11.     
  12.     <context:component-scan base-package="com.tiantian" />  
  13.     
  14.     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
  15.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  16.         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
  17.         <property name="connectionFactory" ref="connectionFactory"/>  
  18.     </bean>  
  19.        
  20.     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
  21.     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
  22.         <property name="brokerURL" value="tcp://localhost:61616"/>  
  23.     </bean>  
  24.        
  25.     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
  26.     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
  27.         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
  28.         <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
  29.     </bean>  
  30.        
  31.     <!--这个是队列目的地-->  
  32.     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  33.         <constructor-arg>  
  34.             <value>queue</value>  
  35.         </constructor-arg>  
  36.     </bean>  
  37.     <!-- 消息监听器 -->  
  38.     <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>  
  39.     <!-- 消息监听容器 -->  
  40.     <bean id="jmsContainer"  
  41.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  42.         <property name="connectionFactory" ref="connectionFactory" />  
  43.         <property name="destination" ref="queueDestination" />  
  44.         <property name="messageListener" ref="consumerMessageListener" />  
  45.     </bean>  
  46. </beans>  

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"    xmlns:jms="http://www.springframework.org/schema/jms"    xsi:schemaLocation="http://www.springframework.org/schema/beans     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd     http://www.springframework.org/schema/context     http://www.springframework.org/schema/context/spring-context-3.0.xsd    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">     <context:component-scan base-package="com.tiantian" />     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->        <property name="connectionFactory" ref="connectionFactory"/>    </bean>        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL" value="tcp://localhost:61616"/>    </bean>        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>    </bean>        <!--这个是队列目的地-->    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg>            <value>queue</value>        </constructor-arg>    </bean>    <!-- 消息监听器 -->    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>    <!-- 消息监听容器 -->    <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory" />        <property name="destination" ref="queueDestination" />        <property name="messageListener" ref="consumerMessageListener" />    </bean></beans>

 

 

       接着我们来测试一下,看看我们的整合是否真的成功了,测试代码如下:

 

 

 

 

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.test;   
  2.     
  3. import javax.jms.Destination;   
  4.     
  5. import org.junit.Test;   
  6. import org.junit.runner.RunWith;   
  7. import org.springframework.beans.factory.annotation.Autowired;   
  8. import org.springframework.beans.factory.annotation.Qualifier;   
  9. import org.springframework.test.context.ContextConfiguration;   
  10. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;   
  11. import com.tiantian.springintejms.service.ProducerService;   
  12.     
  13. @RunWith(SpringJUnit4ClassRunner.class)   
  14. @ContextConfiguration("/applicationContext.xml")   
  15. public class ProducerConsumerTest {   
  16.     
  17.     @Autowired  
  18.     private ProducerService producerService;   
  19.     @Autowired  
  20.     @Qualifier("queueDestination")   
  21.     private Destination destination;   
  22.        
  23.     @Test  
  24.     public void testSend() {   
  25.         for (int i=0; i<2; i++) {   
  26.             producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1));   
  27.         }   
  28.     }   
  29.        
  30. }  

package com.tiantian.springintejms.test; import javax.jms.Destination; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.tiantian.springintejms.service.ProducerService; @RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest {     @Autowired    private ProducerService producerService;    @Autowired    @Qualifier("queueDestination")    private Destination destination;        @Test    public void testSend() {        for (int i=0; i<2; i++) {            producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1));        }    }    }

 

 

 

       在上面的测试代码中我们利用生产者发送了两个消息,正常来说,消费者应该可以接收到这两个消息。运行测试代码后控制台输出如下:

 

 

       看,控制台已经进行了正确的输出,这说明我们的整合确实是已经成功了。

 

Spring整合JMS(二)——消息监听器

 

3.1     消息监听器MessageListener

       在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分别来介绍一下这几种类型的区别。

3.1.1  MessageListener

MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。我们前面在讲配置消费者的时候用的消息监听器就是MessageListener,代码如下:

 

Java代码 复制代码 收藏代码
  1. import javax.jms.JMSException;   
  2. import javax.jms.Message;   
  3. import javax.jms.MessageListener;   
  4. import javax.jms.TextMessage;   
  5.     
  6. public class ConsumerMessageListener implements MessageListener {   
  7.     
  8.     public void onMessage(Message message) {   
  9.         //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage  
  10.         TextMessage textMsg = (TextMessage) message;   
  11.         System.out.println("接收到一个纯文本消息。");   
  12.         try {   
  13.             System.out.println("消息内容是:" + textMsg.getText());   
  14.         } catch (JMSException e) {   
  15.             e.printStackTrace();   
  16.         }   
  17.     }   
  18.     
  19. }  
import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener {     public void onMessage(Message message) {        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage        TextMessage textMsg = (TextMessage) message;        System.out.println("接收到一个纯文本消息。");        try {            System.out.println("消息内容是:" + textMsg.getText());        } catch (JMSException e) {            e.printStackTrace();        }    } }

  

3.1.2  SessionAwareMessageListener

SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。先来看一段代码:

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.listener;   
  2.     
  3. import javax.jms.Destination;   
  4. import javax.jms.JMSException;   
  5. import javax.jms.Message;   
  6. import javax.jms.MessageProducer;   
  7. import javax.jms.Session;   
  8. import javax.jms.TextMessage;   
  9.     
  10. import org.springframework.jms.listener.SessionAwareMessageListener;   
  11.     
  12. public class ConsumerSessionAwareMessageListener implements  
  13.         SessionAwareMessageListener<TextMessage> {   
  14.     
  15.     private Destination destination;   
  16.        
  17.     public void onMessage(TextMessage message, Session session) throws JMSException {   
  18.         System.out.println("收到一条消息");   
  19.         System.out.println("消息内容是:" + message.getText());   
  20.         MessageProducer producer = session.createProducer(destination);   
  21.         Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");   
  22.         producer.send(textMessage);   
  23.     }   
  24.     
  25.     public Destination getDestination() {   
  26.         returndestination;   
  27.     }   
  28.     
  29.     public void setDestination(Destination destination) {   
  30.         this.destination = destination;   
  31.     }   
  32.     
  33. }  
package com.tiantian.springintejms.listener; import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage; import org.springframework.jms.listener.SessionAwareMessageListener; public class ConsumerSessionAwareMessageListener implements        SessionAwareMessageListener<TextMessage> {     private Destination destination;        public void onMessage(TextMessage message, Session session) throws JMSException {        System.out.println("收到一条消息");        System.out.println("消息内容是:" + message.getText());        MessageProducer producer = session.createProducer(destination);        Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");        producer.send(textMessage);    }     public Destination getDestination() {        returndestination;    }     public void setDestination(Destination destination) {        this.destination = destination;    } }

 

       在上面代码中我们定义了一个SessionAwareMessageListener,在这个Listener中我们在接收到了一个消息之后,利用对应的Session创建了一个到destination的生产者和对应的消息,然后利用创建好的生产者发送对应的消息。

       接着我们在Spring的配置文件中配置该消息监听器将处理来自一个叫sessionAwareQueue的目的地的消息,并且往该MessageListener中通过set方法注入其属性destination的值为queueDestination。这样当我们的SessionAwareMessageListener接收到消息之后就会往queueDestination发送一个消息。

Xml代码 复制代码 收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:jms="http://www.springframework.org/schema/jms"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans   
  6.      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  7.      http://www.springframework.org/schema/context   
  8.      http://www.springframework.org/schema/context/spring-context-3.0.xsd   
  9.     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  10.     http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">  
  11.     
  12.     <context:component-scan base-package="com.tiantian" />    
  13.     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
  14.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  15.         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
  16.         <property name="connectionFactory" ref="connectionFactory"/>  
  17.     </bean>  
  18.        
  19.     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
  20.     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
  21.         <property name="brokerURL" value="tcp://localhost:61616"/>  
  22.     </bean>  
  23.        
  24.     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
  25.     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
  26.         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
  27.         <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
  28.     </bean>  
  29.        
  30.     <!--这个是队列目的地-->  
  31.     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
  32.         <constructor-arg>  
  33.             <value>queue</value>  
  34.         </constructor-arg>  
  35.     </bean>  
  36.     <!--这个是sessionAwareQueue目的地-->  
  37.     <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">  
  38.         <constructor-arg>  
  39.             <value>sessionAwareQueue</value>  
  40.         </constructor-arg>  
  41.     </bean>  
  42.     <!-- 消息监听器 -->  
  43.     <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>  
  44.     <!-- 可以获取session的MessageListener -->  
  45.     <bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">  
  46.         <property name="destination" ref="queueDestination"/>  
  47.     </bean>  
  48.     <!-- 消息监听容器 -->  
  49.     <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  50.         <property name="connectionFactory" ref="connectionFactory" />  
  51.         <property name="destination" ref="queueDestination" />  
  52.         <property name="messageListener" ref="consumerMessageListener" />  
  53.     </bean>  
  54.        
  55.     <bean id="sessionAwareListenerContainer"  
  56.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  57.         <property name="connectionFactory" ref="connectionFactory" />  
  58.         <property name="destination" ref="sessionAwareQueue" />  
  59.         <property name="messageListener" ref="consumerSessionAwareMessageListener" />  
  60.     </bean>  
  61. </beans>  
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"    xmlns:jms="http://www.springframework.org/schema/jms"    xsi:schemaLocation="http://www.springframework.org/schema/beans     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd     http://www.springframework.org/schema/context     http://www.springframework.org/schema/context/spring-context-3.0.xsd    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">     <context:component-scan base-package="com.tiantian" />     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->        <property name="connectionFactory" ref="connectionFactory"/>    </bean>        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL" value="tcp://localhost:61616"/>    </bean>        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>    </bean>        <!--这个是队列目的地-->    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg>            <value>queue</value>        </constructor-arg>    </bean>    <!--这个是sessionAwareQueue目的地-->    <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg>            <value>sessionAwareQueue</value>        </constructor-arg>    </bean>    <!-- 消息监听器 -->    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>    <!-- 可以获取session的MessageListener -->    <bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">        <property name="destination" ref="queueDestination"/>    </bean>    <!-- 消息监听容器 -->    <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory" />        <property name="destination" ref="queueDestination" />        <property name="messageListener" ref="consumerMessageListener" />    </bean>        <bean id="sessionAwareListenerContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory" />        <property name="destination" ref="sessionAwareQueue" />        <property name="messageListener" ref="consumerSessionAwareMessageListener" />    </bean></beans>

 

 

       接着我们来做一个测试,测试代码如下:

Java代码 复制代码 收藏代码
  1. @RunWith(SpringJUnit4ClassRunner.class)   
  2. @ContextConfiguration("/applicationContext.xml")   
  3. public class ProducerConsumerTest {   
  4.     
  5.     @Autowired  
  6.     private ProducerService producerService;   
  7.     @Autowired  
  8.     @Qualifier("sessionAwareQueue")   
  9.     private Destination sessionAwareQueue;   
  10.        
  11.     @Test  
  12.     public void testSessionAwareMessageListener() {   
  13.         producerService.sendMessage(sessionAwareQueue, "测试SessionAwareMessageListener");   
  14.     }   
  15.        
  16. }  
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest {     @Autowired    private ProducerService producerService;    @Autowired    @Qualifier("sessionAwareQueue")    private Destination sessionAwareQueue;        @Test    public void testSessionAwareMessageListener() {        producerService.sendMessage(sessionAwareQueue, "测试SessionAwareMessageListener");    }    }

 

       在上述测试代码中,我们通过前面定义好的生产者往我们定义好的SessionAwareMessageListener监听的sessionAwareQueue发送了一个消息。程序运行之后控制台输出如下:



 

 

       这说明我们已经成功的往sessionAwareQueue发送了一条纯文本消息,消息会被ConsumerSessionAwareMessageListener的onMessage方法进行处理,在onMessage方法中ConsumerSessionAwareMessageListener就是简单的把接收到的纯文本信息的内容打印出来了,之后再往queueDestination发送了一个纯文本消息,消息内容是“ConsumerSessionAwareMessageListener…”,该消息随后就被ConsumerMessageListener处理了,根据我们的定义,在ConsumerMessageListener中也只是简单的打印了一下接收到的消息内容。

3.1.3  MessageListenerAdapter

MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。

       MessageListenerAdapter会把接收到的消息做如下转换:

       TextMessage转换为String对象;

       BytesMessage转换为byte数组;

       MapMessage转换为Map对象;

       ObjectMessage转换为对应的Serializable对象。

       既然前面说了MessageListenerAdapter会把接收到的消息做一个类型转换,然后利用反射把它交给真正的目标处理器——一个普通的Java类进行处理(如果真正的目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为参数调用它们的onMessage方法,而不会再利用反射去进行调用),那么我们在定义一个MessageListenerAdapter的时候就需要为它指定这样一个目标类。这个目标类我们可以通过MessageListenerAdapter的构造方法参数指定,如:

Xml代码 复制代码 收藏代码
  1. <!-- 消息监听适配器 -->  
  2.     <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
  3.         <constructor-arg>  
  4.             <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
  5.         </constructor-arg>  
  6.     </bean>  
<!-- 消息监听适配器 -->    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">        <constructor-arg>            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>        </constructor-arg>    </bean>

 

 

       也可以通过它的delegate属性来指定,如:

Xml代码 复制代码 收藏代码
  1. <!-- 消息监听适配器 -->  
  2.     <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
  3.         <property name="delegate">  
  4.             <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
  5.         </property>  
  6.         <property name="defaultListenerMethod" value="receiveMessage"/>  
  7.     </bean>  
<!-- 消息监听适配器 -->    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">        <property name="delegate">            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>        </property>        <property name="defaultListenerMethod" value="receiveMessage"/>    </bean>

 

 

       前面说了如果我们指定的这个目标处理器是一个MessageListener或者是一个SessionAwareMessageListener的时候Spring将直接利用接收到的Message对象作为方法参数调用它们的onMessage方法。但是如果指定的目标处理器是一个普通的Java类时Spring将利用Message进行了类型转换之后的对象作为参数通过反射去调用真正的目标处理器的处理方法,那么Spring是如何知道该调用哪个方法呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的,当我们没有指定该属性时,Spring会默认调用目标处理器的handleMessage方法。

       接下来我们来看一个示例,假设我们有一个普通的Java类ConsumerListener,其对应有两个方法,handleMessage和receiveMessage,其代码如下:

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.listener;   
  2.     
  3. public class ConsumerListener {   
  4.     
  5.     public void handleMessage(String message) {   
  6.         System.out.println("ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);   
  7.     }   
  8.        
  9.     public void receiveMessage(String message) {   
  10.         System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);   
  11.     }   
  12.        
  13. }  
package com.tiantian.springintejms.listener; public class ConsumerListener {     public void handleMessage(String message) {        System.out.println("ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);    }        public void receiveMessage(String message) {        System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);    }    }

 

       假设我们要把它作为一个消息监听器来监听发送到adapterQueue的消息,这个时候我们就可以定义一个对应的MessageListenerAdapter来把它当做一个MessageListener使用。

Xml代码 复制代码 收藏代码
  1. <!-- 消息监听适配器 -->  
  2. <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
  3.     <property name="delegate">  
  4.         <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
  5.     </property>  
  6.     <property name="defaultListenerMethod" value="receiveMessage"/>  
  7. </bean>  
    <!-- 消息监听适配器 -->    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">        <property name="delegate">            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>        </property>        <property name="defaultListenerMethod" value="receiveMessage"/>    </bean>

 

       当然,有了MessageListener之后我们还需要配置其对应的MessageListenerContainer,这里配置如下:

Xml代码 复制代码 收藏代码
  1. <!-- 消息监听适配器对应的监听容器 -->  
  2. <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  3.     <property name="connectionFactory" ref="connectionFactory"/>  
  4.     <property name="destination" ref="adapterQueue"/>  
  5.     <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->  
  6. </bean>  
    <!-- 消息监听适配器对应的监听容器 -->    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="destination" ref="adapterQueue"/>        <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->    </bean>

 

       在上面的MessageListenerAdapter中我们指定了其defaultListenerMethod属性的值为receiveMessage,所以当MessageListenerAdapter接收到消息之后会自动的调用我们指定的ConsumerListener的receiveMessage方法。

       针对于上述代码我们定义测试代码如下:

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.test;   
  2.     
  3. import javax.jms.Destination;   
  4.     
  5. import org.junit.Test;   
  6. import org.junit.runner.RunWith;   
  7. import org.springframework.beans.factory.annotation.Autowired;   
  8. import org.springframework.beans.factory.annotation.Qualifier;   
  9. import org.springframework.test.context.ContextConfiguration;   
  10. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;   
  11.     
  12. import com.tiantian.springintejms.service.ProducerService;   
  13.     
  14. @RunWith(SpringJUnit4ClassRunner.class)   
  15. @ContextConfiguration("/applicationContext.xml")   
  16. public class ProducerConsumerTest {   
  17.   
  18.     @Autowired  
  19.     @Qualifier("adapterQueue")   
  20.     private Destination adapterQueue;   
  21.   
  22.     @Test  
  23.     public void testMessageListenerAdapter() {   
  24.         producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");   
  25.     }   
  26.        
  27. }  
package com.tiantian.springintejms.test; import javax.jms.Destination; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.tiantian.springintejms.service.ProducerService; @RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest {    @Autowired    @Qualifier("adapterQueue")    private Destination adapterQueue;    @Test    public void testMessageListenerAdapter() {        producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");    }    }

 

       这时候我们会看到控制台输出如下:



        如果我们指定MessageListenerAdapter的defaultListenerMethod属性,那么在运行上述代码时控制台会输出如下结果:



        MessageListenerAdapter除了会自动的把一个普通Java类当做MessageListener来处理接收到的消息之外,其另外一个主要的功能是可以自动的发送返回消息

     当我们用于处理接收到的消息的方法的返回值不为空的时候,Spring会自动将它封装为一个JMS Message,然后自动进行回复。那么这个时候这个回复消息将发送到哪里呢?这主要有两种方式可以指定。
       第一,可以通过发送的Message的setJMSReplyTo方法指定该消息对应的回复消息的目的地。这里我们把我们的生产者发送消息的代码做一下修改,在发送消息之前先指定该消息对应的回复目的地为一个叫responseQueue的队列目的地,具体代码如下所示:

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.service.impl;   
  2.     
  3. import javax.jms.Destination;   
  4. import javax.jms.JMSException;   
  5. import javax.jms.Message;   
  6. import javax.jms.Session;   
  7. import javax.jms.TextMessage;   
  8.     
  9. import org.springframework.beans.factory.annotation.Autowired;   
  10. import org.springframework.beans.factory.annotation.Qualifier;   
  11. import org.springframework.jms.core.JmsTemplate;   
  12. import org.springframework.jms.core.MessageCreator;   
  13. import org.springframework.stereotype.Component;   
  14.     
  15. import com.tiantian.springintejms.service.ProducerService;   
  16.     
  17. @Component  
  18. public class ProducerServiceImpl implements ProducerService {    
  19.   
  20.     @Autowired  
  21.     private JmsTemplate jmsTemplate;   
  22.   
  23.     @Autowired  
  24.     @Qualifier("responseQueue")   
  25.     private Destination responseDestination;   
  26.        
  27.     public void sendMessage(Destination destination, final String message) {   
  28.         System.out.println("---------------生产者发送消息-----------------");   
  29.         System.out.println("---------------生产者发了一个消息:" + message);   
  30.         jmsTemplate.send(destination, new MessageCreator() {   
  31.             public Message createMessage(Session session) throws JMSException {   
  32.                 TextMessage textMessage = session.createTextMessage(message);   
  33.                 textMessage.setJMSReplyTo(responseDestination);   
  34.                 return textMessage;   
  35.             }   
  36.         });   
  37.     }   
  38.     
  39. }  
package com.tiantian.springintejms.service.impl; import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import javax.jms.TextMessage; import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component; import com.tiantian.springintejms.service.ProducerService; @Componentpublic class ProducerServiceImpl implements ProducerService {     @Autowired    private JmsTemplate jmsTemplate;    @Autowired    @Qualifier("responseQueue")    private Destination responseDestination;        public void sendMessage(Destination destination, final String message) {        System.out.println("---------------生产者发送消息-----------------");        System.out.println("---------------生产者发了一个消息:" + message);        jmsTemplate.send(destination, new MessageCreator() {            public Message createMessage(Session session) throws JMSException {                TextMessage textMessage = session.createTextMessage(message);                textMessage.setJMSReplyTo(responseDestination);                return textMessage;            }        });    } }

 

       接着定义一个叫responseQueue的队列目的地及其对应的消息监听器和监听容器。

Xml代码 复制代码 收藏代码
  1. <!-- 用于测试消息回复的 -->  
  2. <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">  
  3.     <constructor-arg>  
  4.         <value>responseQueue</value>  
  5.     </constructor-arg>  
  6. </bean>  
  7.   
  8. <!-- responseQueue对应的监听器 -->  
  9. <bean id="responseQueueListener" class="com.tiantian.springintejms.listener.ResponseQueueListener"/>  
  10.   
  11. <!-- responseQueue对应的监听容器 -->  
  12. <bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  13.     <property name="connectionFactory" ref="connectionFactory"/>  
  14.     <property name="destination" ref="responseQueue"/>  
  15.     <property name="messageListener" ref="responseQueueListener"/>  
  16. </bean>  
    <!-- 用于测试消息回复的 -->    <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg>            <value>responseQueue</value>        </constructor-arg>    </bean>    <!-- responseQueue对应的监听器 -->    <bean id="responseQueueListener" class="com.tiantian.springintejms.listener.ResponseQueueListener"/>    <!-- responseQueue对应的监听容器 -->    <bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="destination" ref="responseQueue"/>        <property name="messageListener" ref="responseQueueListener"/>    </bean>

  

       ResponseQueueListener的定义如下所示:

Java代码 复制代码 收藏代码
  1. public class ResponseQueueListener implements MessageListener {   
  2.     
  3.     public void onMessage(Message message) {   
  4.         if (message instanceof TextMessage) {   
  5.             TextMessage textMessage = (TextMessage) message;   
  6.             try {   
  7.                 System.out.println("接收到发送到responseQueue的一个文本消息,内容是:" + textMessage.getText());   
  8.             } catch (JMSException e) {   
  9.                 e.printStackTrace();   
  10.             }   
  11.         }   
  12.     }   
  13.     
  14. }  
public class ResponseQueueListener implements MessageListener {     public void onMessage(Message message) {        if (message instanceof TextMessage) {            TextMessage textMessage = (TextMessage) message;            try {                System.out.println("接收到发送到responseQueue的一个文本消息,内容是:" + textMessage.getText());            } catch (JMSException e) {                e.printStackTrace();            }        }    } }

 

       接着我们运行我们的测试代码,利用生产者往我们定义好的MessageListenerAdapter负责处理的adapterQueue目的地发送一个消息。测试代码如下所示:

Java代码 复制代码 收藏代码
  1. @RunWith(SpringJUnit4ClassRunner.class)   
  2. @ContextConfiguration("/applicationContext.xml")   
  3. public class ProducerConsumerTest {   
  4.     
  5.     @Autowired  
  6.     private ProducerService producerService;   
  7.   
  8.     @Qualifier("adapterQueue")   
  9.     @Autowired  
  10.     private Destination adapterQueue;      
  11.   
  12.     @Test  
  13.     public void testMessageListenerAdapter() {   
  14.         producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");   
  15.     }   
  16.        
  17. }  
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest {     @Autowired    private ProducerService producerService;    @Qualifier("adapterQueue")    @Autowired    private Destination adapterQueue;       @Test    public void testMessageListenerAdapter() {        producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");    }    }

 

       运行上述测试代码之后,控制台输出如下:

 



        这说明我们的生产者发送消息被MessageListenerAdapter处理之后,MessageListenerAdapter确实把监听器的返回内容封装成一个Message往原Message通过setJMSReplyTo方法指定的回复目的地发送了一个消息。对于MessageListenerAdapter对应的监听器处理方法返回的是一个null值或者返回类型是void的情况,MessageListenerAdapter是不会自动进行消息的回复的,有兴趣的网友可以自己测试一下。

       第二,通过MessageListenerAdapter的defaultResponseDestination属性来指定。这里我们也来做一个测试,首先维持生产者发送消息的代码不变,即发送消息前不通过Message的setJMSReplyTo方法指定消息的回复目的地;接着我们在定义MessageListenerAdapter的时候通过其defaultResponseDestination属性指定其默认的回复目的地是“defaultResponseQueue”,并定义defaultResponseQueue对应的消息监听器和消息监听容器。

Xml代码 复制代码 收藏代码
  1. <!-- 消息监听适配器 -->  
  2. <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
  3.     <!-- <constructor-arg>  
  4.         <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
  5.     </constructor-arg> -->  
  6.     <property name="delegate">  
  7.         <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
  8.     </property>  
  9.     <property name="defaultListenerMethod" value="receiveMessage"/>  
  10.     <property name="defaultResponseDestination" ref="defaultResponseQueue"/>  
  11. </bean>  
  12.   
  13. <!-- 消息监听适配器对应的监听容器 -->  
  14. <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  15.     <property name="connectionFactory" ref="connectionFactory"/>  
  16.     <property name="destination" ref="adapterQueue"/>  
  17.     <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->  
  18. </bean>  
  19.   
  20. !-- 默认的消息回复队列 -->  
  21. <bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">  
  22.     <constructor-arg>  
  23.         <value>defaultResponseQueue</value>  
  24.     </constructor-arg>  
  25. </bean>  
  26.   
  27. <!-- defaultResponseQueue对应的监听器 -->  
  28. <bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>  
  29.   
  30. <!-- defaultResponseQueue对应的监听容器 -->  
  31. <bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  32.     <property name="connectionFactory" ref="connectionFactory"/>  
  33.     <property name="destination" ref="defaultResponseQueue"/>  
  34.     <property name="messageListener" ref="defaultResponseQueueListener"/>  
  35. </bean>  
    <!-- 消息监听适配器 -->    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">        <!-- <constructor-arg>            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>        </constructor-arg> -->        <property name="delegate">            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>        </property>        <property name="defaultListenerMethod" value="receiveMessage"/>        <property name="defaultResponseDestination" ref="defaultResponseQueue"/>    </bean>    <!-- 消息监听适配器对应的监听容器 -->    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="destination" ref="adapterQueue"/>        <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->    </bean><!-- 默认的消息回复队列 -->    <bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg>            <value>defaultResponseQueue</value>        </constructor-arg>    </bean>    <!-- defaultResponseQueue对应的监听器 -->    <bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>    <!-- defaultResponseQueue对应的监听容器 -->    <bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="destination" ref="defaultResponseQueue"/>        <property name="messageListener" ref="defaultResponseQueueListener"/>    </bean>

 

       DefaultResponseQueueListener的代码如下所示:

Java代码 复制代码 收藏代码
  1. package com.tiantian.springintejms.listener;   
  2.     
  3. import javax.jms.JMSException;   
  4. import javax.jms.Message;   
  5. import javax.jms.MessageListener;   
  6. import javax.jms.TextMessage;   
  7.     
  8. public class DefaultResponseQueueListener implements MessageListener {   
  9.     
  10.     public void onMessage(Message message) {   
  11.         if (message instanceof TextMessage) {   
  12.             TextMessage textMessage = (TextMessage) message;   
  13.             try {   
  14.                 System.out.println("DefaultResponseQueueListener接收到发送到defaultResponseQueue的一个文本消息,内容是:" + textMessage.getText());   
  15.             } catch (JMSException e) {   
  16.                 e.printStackTrace();   
  17.             }   
  18.         }   
  19.     }   
  20.     
  21. }  
package com.tiantian.springintejms.listener; import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage; public class DefaultResponseQueueListener implements MessageListener {     public void onMessage(Message message) {        if (message instanceof TextMessage) {            TextMessage textMessage = (TextMessage) message;            try {                System.out.println("DefaultResponseQueueListener接收到发送到defaultResponseQueue的一个文本消息,内容是:" + textMessage.getText());            } catch (JMSException e) {                e.printStackTrace();            }        }    } }

 

       这时候运行如下测试代码:

Java代码 复制代码 收藏代码
  1. @Test  
  2. public void testMessageListenerAdapter() {   
  3.     producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");   
  4. }  
    @Test    public void testMessageListenerAdapter() {        producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");    }

 

       控制台将输出如下内容:

 



        这说明MessageListenerAdapter会自动把真正的消息处理器返回的非空内容封装成一个Message发送回复消息到通过defaultResponseDestination属性指定的默认消息回复目的地。

       既然我们可以通过两种方式来指定MessageListenerAdapter自动发送回复消息的目的地,那么当我们两种方式都指定了而且它们的目的地还不一样的时候会怎么发送呢?是两个都发还是只发其中的一个呢?关于这部分的测试我这里就不赘述了,有兴趣的网友可以自己进行。这里我可以直接的告诉大家,当两种方式都指定了消息的回复目的地的时候使用发送消息的setJMSReplyTo方法指定的目的地将具有较高的优先级,MessageListenerAdapter将只往该方法指定的消息回复目的地发送回复消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387889.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOS7+CDH5.14.0安装全流程记录,图文详解全程实测-8CDH5安装和集群配置

Cloudera Manager Server和Agent都启动以后&#xff0c;就可以进行CDH5的安装配置了。 准备文件 从 http://archive.cloudera.com/cdh5/parcels/中下载CDH5.14.0的相关文件 把CDH5需要的安装文件放到主节点上&#xff0c;新建目录为/opt/cloudera/parcel-repo把我们之前下载的…

node.js安装部署测试

&#xff08;一&#xff09;安装配置&#xff1a; 1&#xff1a;从nodejs.org下载需要的版本 2&#xff1a;直接安装&#xff0c;默认设置 &#xff0c;默认安装在c:\program files\nodejs下。 3&#xff1a;更改npm安装模块的默认目录 &#xff08;默认目录在安装目录下的node…

社群系统ThinkSNS+ V2.2-V2.3升级教程

WARNING本升级指南仅适用于 2.2 版本升级至 2.3 版本&#xff0c;如果你并非 2.2 版本&#xff0c;请查看其他升级指南&#xff0c;Plus 程序不允许跨版本升级&#xff01;#更新代码预计耗时&#xff1a; 2 小时这是你自我操作的步骤&#xff0c;确认将你的 2.2 版本代码升级到…

activemq部署安装

一、架构和技术介绍 1、简介 ActiveMQ 是Apache出品&#xff0c;最流行的&#xff0c;能力强劲的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现 2、activemq的特性 1. 多种语言和协议编写客户端。语言: Java, C, C, C#, Ruby, Perl, Python, PHP。应用协议: …

主串与模式串的匹配

主串与模式串的匹配 &#xff08;1&#xff09;BF算法&#xff1a; BF算法比较简单直观&#xff0c;其匹配原理是主串S.ch[i]和模式串T.ch[j]比较&#xff0c;若相等&#xff0c;则i和j分别指示串中的下一个位置&#xff0c;继续比较后续字符&#xff0c;若不相等&#xff0c;从…

什么是 DDoS 攻击?

欢迎访问网易云社区&#xff0c;了解更多网易技术产品运营经验。 全称Distributed Denial of Service&#xff0c;中文意思为“分布式拒绝服务”&#xff0c;就是利用大量合法的分布式服务器对目标发送请求&#xff0c;从而导致正常合法用户无法获得服务。通俗点讲就是利用网络…

nginx 并发过十万

一般来说nginx 配置文件中对优化比较有作用的为以下几项&#xff1a; worker_processes 8; nginx 进程数&#xff0c;建议按照cpu 数目来指定&#xff0c;一般为它的倍数。 worker_cpu_affinity 00000001 00000010 00000100 00001000 00010000 00100000 01000000 10000000; 为每…

神经网络使用情景

神经网络使用情景 人脸&#xff0f;图像识别语音搜索文本到语音&#xff08;转录&#xff09;垃圾邮件筛选&#xff08;异常情况探测&#xff09;欺诈探测推荐系统&#xff08;客户关系管理、广告技术、避免用户流失&#xff09;回归分析 为何选择Deeplearning4j&#xff1f; …

GitHub常用命令及使用

GitHub使用介绍 摘要&#xff1a; 常用命令&#xff1a; git init 新建一个空的仓库git status 查看状态git add . 添加文件git commit -m 注释 提交添加的文件并备注说明git remote add origin gitgithub.com:jinzhaogit/git.git 连接远程仓库git push -u origin master 将本地…

deeplearning4j

deeplearning4j 是基于java的深度学习库&#xff0c;当然&#xff0c;它有许多特点&#xff0c;但暂时还没学那么深入&#xff0c;所以就不做介绍了 需要学习dl4j&#xff0c;无从下手&#xff0c;就想着先看看官网的examples&#xff0c;于是&#xff0c;下载了examples程序&a…

推理编程_答案集编程的知识表示和推理

推理编程Read about the difference between declarative and imperative programming and learn from code examples (Answer Set Programming, Python and C).了解声明式和命令式编程之间的区别&#xff0c;并从代码示例(答案集编程&#xff0c;Python和C)中学习。 介绍 (In…

python安装包

由于Google、YouTube等大型公司的推广&#xff0c;Python编程语言越来越受欢迎&#xff0c;很多编程爱好者&#xff0c;也将Python做为了首先的编程语言。 今天我们就来讲一下&#xff0c;学习的第一步&#xff0c;安装Python IDLE编辑器&#xff0c;也它的调试和使用。 第一步…

104 权限 sudo 解压缩

主要内容:https://www.cnblogs.com/pyyu/articles/9355477.html 1 查看系统版本信息: #查看系统版本信息 cat /etc/redhat-release CentOS Linux release 7.4.1708 (Core) #查看内核版本号 uname -r 3.10.0-693.el7.x86_64 #查看系统多少位 uname -m x86_64 #查看内核所有信息…

Cloud Native 介绍

为什么80%的码农都做不了架构师&#xff1f;>>> 背景 Cloud Native表面看起来比较容易理解&#xff0c;但是细思好像又有些模糊不清&#xff1a;Cloud Native和Cloud关系是啥&#xff1f;它用来解决什么问题&#xff1f;它是一个新技术还是一个新的方法&#xff1f…

餐厅数据分析报告_如何使用数据科学选择理想的餐厅设计场所

餐厅数据分析报告空间数据科学 (Spatial Data Science) Designing any product requires a lot of analysis and research. It is also true for designing any building. Before we begin to design any building, we collect information about the location where we are de…

PCB genesis 大孔扩孔(不用G84命令)实现方法

PCB钻孔时,当钻刀>6.3mm时,超出钻孔范围,钻孔工序是没有这么大的钻刀,当这种情况,工程CAM会都采用G84命令用小孔扩孔的方式制作, 在这里介绍一种如果不用G84命令,用程序实现将大孔生成小孔钻孔达到扩孔的目的。 一.我们先了解一下G84命令扩孔 孔尺寸大小 孔密度 连一篇文章有…

图像识别中的深度学习

来源&#xff1a;《中国计算机学会通讯》第8期《专题》 作者&#xff1a;王晓刚 深度学习发展历史 深度学习是近十年来人工智能领域取得的重要突破。它在语音识别、自然语言处理、计算机视觉、图像与视频分析、多媒体等诸多领域的应用取得了巨大成功。现有的深度学习模型属于神…

多个css样式合并到一个“目录”css文件中

执行访问jsp后发现没有效果 同样的代码&#xff0c;在html中效果对比如下&#xff1a; 具体原因&#xff1a;不清楚&#xff0c;暂时记着~~~在jsp中不支持import这种css样式的引用 转载于:https://www.cnblogs.com/mangwusuozhi/p/10050108.html

方差,协方差 、统计学的基本概念

一、统计学的基本概念 统计学里最基本的概念就是样本的均值、方差、标准差。首先&#xff0c;我们给定一个含有n个样本的集合&#xff0c;下面给出这些概念的公式描述&#xff1a; 均值&#xff1a; 标准差&#xff1a; 方差&#xff1a; 均值描述的是样本集合的中间点&#xf…

Python 主成分分析PCA

Python 主成分分析PCA 主成分分析&#xff08;PCA&#xff09;是一种基于变量协方差矩阵对数据进行压缩降维、去噪的有效方法&#xff0c;PCA的思想是将n维特征映射到k维上&#xff08;k<n&#xff09;&#xff0c;这k维特征称为主元&#xff0c;是旧特征的线性组合&#xf…