文章目录
- 1. 新建Springboot工程
- 2. 引入maven依赖
- 3. ActiveMq配置类
- 4. MQ生产者
- 5. MQ 点对点消费者
- 6. MQ 发布点阅消费者A
- 7. MQ 发布点阅消费者B
- 8. 统一测试类
1. 新建Springboot工程
2. 引入maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gblfy</groupId><artifactId>springboot-activemq</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>springboot-activemq</name><description>Spring Boot集成Activemq</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.7.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><!--全局编码设置--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--JDK版本--><java.version>1.8</java.version><!--全局版本--><fastjson.version>1.2.58</fastjson.version><lombok.version>1.18.8</lombok.version></properties><dependencies><!--SpringMVC 启动器--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Activemq Start--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId></dependency><!-- Activemq End--><!--数据处理--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!--lombok插件--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><!--maven编译插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3. ActiveMq配置类
package com.gblfy.activemq.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;import javax.jms.ConnectionFactory;/*** @author gblfy* @ClassNme ActiveMqConfig* @Description Mq配置类* @Date 2019/9/3 18:05* @version1.0*/
@Configuration
public class ActiveMqConfig {// queue模式的ListenerContainer@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();bean.setConnectionFactory(activeMQConnectionFactory);return bean;}// topic模式的ListenerContainer@Beanpublic JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();bean.setPubSubDomain(true);bean.setConnectionFactory(activeMQConnectionFactory);return bean;}}
4. MQ生产者
package com.gblfy.activemq.producer;import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;import java.io.Serializable;
import java.util.List;
import java.util.Map;/*** @author gblfy* @ClassNme MqProducer* @Description Mq 生产者封装公共类* @Date 2019/9/3 18:05* @version1.0*/
@Service
public class MqProducer {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;/*** 发送字符串消息队列** @param queueName 队列名称* @param message 字符串*/public void sendStringQueue(String queueName, String message) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);}/*** 发送字符串集合消息队列** @param queueName 队列名称* @param list 字符串集合*/public void sendStringListQueue(String queueName, List<String> list) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);}/*** 发送Map消息队列** @param queueName* @param headers*/public void sendMapQueue(String queueName, Map<String, Object> headers) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);}/*** 发送对象消息队列** @param queueName 队列名称* @param obj 对象*/public void sendObjQueue(String queueName, Serializable obj) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);}/*** 发送对象集合消息队列** @param queueName 队列名称* @param objList 对象集合*/public void sendObjListQueue(String queueName, List<Serializable> objList) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);}/*** 发送字符串消息主题** @param topicName 主题名称* @param message 字符串*/public void sendStringTopic(String topicName, String message) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);}/*** 发送字符串集合消息主题** @param topicName 主题名称* @param list 字符串集合*/public void sendStringListTopic(String topicName, List<String> list) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);}/*** 发送Map消息主题** @param queueName* @param headers*/public void sendMapTopic(String queueName, Map<String, Object> headers) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);}/*** 发送对象消息主题** @param topicName 主题名称* @param obj 对象*/public void sendObjTopic(String topicName, Serializable obj) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);}/*** 发送对象集合消息主题** @param topicName 主题名称* @param objList 对象集合*/public void sendObjListTopic(String topicName, List<Serializable> objList) {this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);}}
5. MQ 点对点消费者
package com.gblfy.activemq.consumer;import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.ObjectMessage;
import java.util.List;
import java.util.Map;/*** @author gblfy* @ClassNme QueueConsumer* @Description Mq点对点消费者* @Date 2019/9/3 18:05* @version1.0*/
@Component
public class QueueConsumer {@JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")public void receiveStringQueue(String msg) {System.out.println("接收到消息...." + msg);}@JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")public void receiveStringListQueue(List<String> list) {System.out.println("接收到集合队列消息...." + list);}@JmsListener(destination = "mapQueue", containerFactory = "jmsListenerContainerQueue")public void receiveMapQueue(Map<String, Object> headers) {System.out.println("接收到集合队列消息...." + headers);}@JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {System.out.println("接收到对象队列消息...." + objectMessage.getObject());}@JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {System.out.println("接收到的对象队列消息..." + objectMessage.getObject());}
}
6. MQ 发布点阅消费者A
package com.gblfy.activemq.consumer;import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.ObjectMessage;
import java.util.List;
/*** @author gblfy* @ClassNme ATopicConsumer* @Description Mq订阅者A* @Date 2019/9/3 18:05* @version1.0*/
@Component
public class ATopicConsumer {@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")public void receiveStringTopic(String msg) {System.out.println("ATopicConsumer接收到消息...." + msg);}@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")public void receiveStringListTopic(List<String> list) {System.out.println("ATopicConsumer接收到集合主题消息...." + list);}@JmsListener(destination = "mapTopic", containerFactory = "jmsListenerContainerTopic")public void receiveMapTopic(List<String> list) {System.out.println("ATopicConsumer接收到Map主题消息...." + list);}@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject());}@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());}}
7. MQ 发布点阅消费者B
package com.gblfy.activemq.consumer;import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.ObjectMessage;
import java.util.List;/*** @author gblfy* @ClassNme BTopicConsumer* @Description Mq订阅者B* @Date 2019/9/3 18:05* @version1.0*/
@Component
public class BTopicConsumer {@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")public void receiveStringTopic(String msg) {System.out.println("BTopicConsumer接收到消息...." + msg);}@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")public void receiveStringListTopic(List<String> list) {System.out.println("BTopicConsumer接收到集合主题消息...." + list);}@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject());}@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());}}
8. 统一测试类
package com.gblfy.activemq.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** @author gblfy* @ClassNme User* @Description Mq发送接送对象模拟* @Date 2019/9/3 18:05* @version1.0*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {private String id;private String name;private Integer age;
}
package com.gblfy.activemq;import com.gblfy.activemq.producer.MqProducer;
import com.gblfy.activemq.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @author gblfy* @ClassNme ActivemqdemoApplicationTests* @Description Mq 测试公共类* @Date 2019/9/3 18:05* @version1.0*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {@Autowiredprivate MqProducer mqProducer;/**************************MQ点对点测试场景***************************//*** 点对点场景 01* 消息类型 String*/@Testpublic void testStringQueue() {for (int i = 1; i <= 100; i++) {System.out.println("第" + i + "次发送字符串队列消息");mqProducer.sendStringQueue("stringQueue", "消息:" + i);}}/*** 点对点场景 02* 消息类型 StringList*/@Testpublic void testStringListQueue() {List<String> idList = new ArrayList<>();idList.add("id1");idList.add("id2");idList.add("id3");System.out.println("正在发送集合队列消息ing......");mqProducer.sendStringListQueue("stringListQueue", idList);}/*** 点对点场景 03* 消息类型 Map<String,Object></String,Object>*/@Testpublic void testMapQueue() {Map<String, Object> map = new HashMap<>();map.put("1", "sxh");map.put("2", "ljy");map.put("3", "sh");map.put("4", "qjj");map.put("5", "ygf");map.put("6", "lxj");map.put("7", "gblfy");System.out.println("正在发送Map队列消息ing......");mqProducer.sendMapQueue("mapQueue", map);}/*** 点对点场景 04* 消息类型 Obj*/@Testpublic void testObjQueue() {System.out.println("正在发送对象队列消息......");mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));}/*** 点对点场景 05* 消息类型 ObjList*/@Testpublic void testObjListQueue() {System.out.println("正在发送对象集合队列消息......");List<Serializable> userList = new ArrayList<>();userList.add(new User("1", "雨昕", 01));userList.add(new User("2", "刘英", 26));userList.add(new User("3", "振振", 12));mqProducer.sendObjListQueue("objListQueue", userList);}/**************************MQ发布订阅测试场景***************************//*** 发布订阅场景 01* 消息类型 String*/@Testpublic void testStringTopic() {for (int i = 1; i <= 100; i++) {System.out.println("第" + i + "次发送字符串主题消息");mqProducer.sendStringTopic("stringTopic", "消息:" + i);}}/*** 发布订阅场景 02* 消息类型 StringList*/@Testpublic void testStringListTopic() {List<String> idList = new ArrayList<>();idList.add("id1");idList.add("id2");idList.add("id3");System.out.println("正在发送集合主题消息ing......");mqProducer.sendStringListTopic("stringListTopic", idList);}/*** 发布订阅场景 03* 消息类型 Map*/@Testpublic void testMapTopic() {Map<String, Object> map = new HashMap<>();map.put("1", "sxh");map.put("2", "ljy");map.put("3", "sh");map.put("4", "qjj");map.put("5", "ygf");map.put("6", "lxj");map.put("7", "gblfy");System.out.println("正在发送Map队列消息ing......");mqProducer.sendMapTopic("mapQueue", map);}/*** 发布订阅场景 04* 消息类型 Obj*/@Testpublic void testObjTopic() {System.out.println("正在发送对象主题消息......");mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));}/*** 发布订阅场景 05* 消息类型 ObjList*/@Testpublic void testObjListTopic() {System.out.println("正在发送对象集合主题消息......");List<Serializable> userList = new ArrayList<>();userList.add(new User("1", "雨昕", 01));userList.add(new User("2", "刘英", 26));userList.add(new User("3", "振振", 12));mqProducer.sendObjListTopic("objListTopic", userList);}
}
GitLab地址:https://gitlab.com/gb-heima/springboot-activemq
Git下载方式:
git clone git@gitlab.com:gb-heima/springboot-activemq.git
zip包下载方式:
https://gitlab.com/gb-heima/springboot-activemq/-/archive/master/springboot-activemq-master.zip