创建2个工程,生产者和消费者
- 测试点对点和广播
- 发送常见类型的消息
- 例如:文本(String)、对象(Object)
文本转Long
String text = “123”;
Long.parseLong(text)
数组转集合
Arrays.toArray(list)
引入依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gblfy.activemq</groupId><artifactId>producer</artifactId><version>1.0-SNAPSHOT</version><packaging>war</packaging><name>producer Maven Webapp</name><url>http://www.gblfy.com</url><properties><!--全局编码设置--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--全局maven编译版本--><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><!--全局版本控制--><spring.version>4.2.4.RELEASE</spring.version></properties><dependencies><!-- Spring Start --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aspects</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>${spring.version}</version></dependency><!-- Spring End --><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><!--解析/反解析数据--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>javassist</groupId><artifactId>javassist</artifactId></dependency><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId></dependency><dependency><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId><scope>provided</scope></dependency><!--ActiveMQ Start--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>5.13.4</version></dependency><!--ActiveMQ End--></dependencies><build><plugins><!--Tomcat7插件--><plugin><groupId>org.apache.tomcat.maven</groupId><artifactId>tomcat7-maven-plugin</artifactId><version>2.2</version><configuration><!-- 指定端口 --><port>9004</port><!-- 请求路径 --><path>/</path></configuration></plugin></plugins></build>
</project>
消费者
引入配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.135:61616"/> </bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!--这个是队列目的地,导入索引库--> <bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="pinyougou_queue_solr"/> </bean> <!--这个是队列目的地,导入索引库--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="pinyougou_queue_solr_delete"/> </bean> <!--这个是订阅模式 生成商品详细页--> <bean id="topicPageDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="pinyougou_topic_page"/> </bean> <!--这个是订阅模式 删除商品详细页--> <bean id="topicPageDeleteDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="pinyougou_topic_page_delete"/> </bean> </beans>
注:扫描包,如果spring的配置文件中有此配置可以省略
在需要发送消息的类中,注入2个对象:
@Autowired
private Destination queueSolrDestination;//用于发送solr导入的消息
@Autowired
private JmsTemplate jmsTemplate;
假设发送的数据是一个list,list没有实现序列化,而发送的消息类型,如果是对象必须实现序列化
List<TbItem> itemList = goodsService.findItemListByGoodsIdandStatus(ids, status);
建议发送消息类型采用文本类型
利用fastjson工具类,将list转换为json字符串,接收的时候,在转回来是很方便的
final String jsonString = JSON.toJSONString(itemList); jmsTemplate.send(queueSolrDestination, new MessageCreator() { @Overridepublic Message createMessage(Session session) throws JMSException { return session.createTextMessage(jsonString);}});
消费者客户端
引入依赖和生产者一样
引入配置文件applicationContext-jms-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.135:61616"/> </bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--这个是队列目的地,导入索引库--> <bean id="queueSolrDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="pinyougou_queue_solr"/> </bean> <!-- 消息监听容器 导入索引库--><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="queueSolrDestination" /><property name="messageListener" ref="itemSearchListener" /></bean><!--这个是队列目的地,导入索引库--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="pinyougou_queue_solr_delete"/> </bean> <!-- 消息监听容器 导入索引库--><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="queueSolrDeleteDestination" /><property name="messageListener" ref="itemDeleteListener" /></bean></beans>
新建一个监听类itemSearchListener接收文本类型消息
注,这个监听类上加上@Component注解,并且能被spring的包扫描扫描到
下面监听类的包是:package com.pinyougou.search.service.impl;
在此项目spring的配置文件中有扫描包的配置,因此,可以被扫描到
package com.pinyougou.search.service.impl;import java.util.List;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSON;
import com.pinyougou.pojo.TbItem;
import com.pinyougou.search.service.ItemSearchService;
@Component
public class ItemSearchListener implements MessageListener {@Autowiredprivate ItemSearchService itemSearchService;@Overridepublic void onMessage(Message message) {TextMessage textMessage=(TextMessage)message;try {String text = textMessage.getText();//json字符串System.out.println("监听到消息:"+text);List<TbItem> itemList = JSON.parseArray(text, TbItem.class);itemSearchService.importList(itemList);System.out.println("导入到solr索引库");} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}
发送对象类型消息
@Autowiredprivate JmsTemplate jmsTemplate;@Autowiredprivate Destination queueTextDestination;/*** 发送文本消息* @param text*/public void sendTextMessage(final String text){jmsTemplate.send(queueTextDestination, new MessageCreator() {public Message createMessage(Session session) throws JMSException {
// return session.createTextMessage(text);return session.createObjectMessage(对象);}});}
新建一个监听接收对象类型消息
在这里插入代码片package com.pinyougou.search.service.impl;import java.util.Arrays;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import com.pinyougou.search.service.ItemSearchService;
@Component
public class ItemDeleteListener implements MessageListener {@Autowiredprivate ItemSearchService itemSearchService;@Overridepublic void onMessage(Message message) {ObjectMessage objectMessage =(ObjectMessage)message;try {Long[] goodsIds= (Long[]) objectMessage.getObject();System.out.println("监听获取到消息:"+goodsIds);itemSearchService.deleteByGoodsIds(Arrays.asList(goodsIds));System.out.println("执行索引库删除");} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}