文章目录
- 一:说明
- 二:e-car项目配置
- 1 引入activemq依赖
- 2 application启动类配置消息监听
- 3 application.yml配置
- 4 MQConfig.java 配置类
- 5 ecar 项目中的监听
- 6 junit 发送消息
- 三:tcm-chatgpt项目配置
- 5 MQListener.java 监听消息
- 三 测试
- 启动activemq服务
- 队列![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/76d5be042cbb43069dd723da2ebd60c3.png)
- 发布订阅模式
- 四:发送对象消息
- 配置队列名称
- 开启监听
- 测试类
- 接收ObjectMessage的消息
- Map接收的消息
一:说明
1-在两个不同的应用发送和接收消息
2-消息发送应用是 e-car 项目,接收端是 tcm-chatgpt项目,当然,同一个项目也是可以发送和接收的
二:e-car项目配置
1 引入activemq依赖
<!-- 集成 ActiveMQ -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2 application启动类配置消息监听
加上 @EnableJms 注解, 开启JMS
@EnableJms // 开启JMS
@SpringBootApplication(scanBasePackages="com.nrbc.ecar")
@MapperScan("com.nrbc.ecar.dao.mapper")
public class EcarAppClientApplication {public static void main(String[] args) {SpringApplication.run(EcarAppClientApplication.class, args);}
}
3 application.yml配置
- 1 注意: 开启主题策略,默认是关闭 开启主题模式,要设置为true
如果要使用队列模式,pub-sub-domain 要设置为false jms:
pub-sub-domain: false - 同时使用jms的Queue(队列)和Topic(发布订阅),可查看这边文章: 文章链接
spring:# activemq相关配置activemq:broker-url: tcp://localhost:61616user: lipingpassword: lipingpackages:# 配置信任所有的包,这个配置为了支持发送对象消息(如果传递的是对象则需要设置为true,默认是传字符串)trust-all: true# 开启主题策略,默认是关闭 开启主题模式jms:pub-sub-domain: true# 配置activemq队列的名称和主题名称
amq:qname:queueName-1:queueName-1topicName:name-1:topic-prot-1name-2:topic-prot-2
4 MQConfig.java 配置类
/*** 专门配置mq通道的配置类*/
@Slf4j
@Configuration
public class MQConfig {@Value("${amq.topicName.name-1}")private String tpName;@Bean(name = "queueName")Queue queueName() {return new ActiveMQQueue("test_queue");}/*** 主题(发布\订阅模式)通道* @author kazaf* @date 2024/4/24 16:43*/@Bean(name = "topic1")Topic queueFind() {log.info("${amq.topicName.name-1}=" + tpName);return new ActiveMQTopic("topic-model");}@Bean(name = "topic2")Topic topic2() {return new ActiveMQTopic("topic-model2");}@Bean(name = "topic3")Topic topic3() {return new ActiveMQTopic(tpName);}}
5 ecar 项目中的监听
/*** 专门配置mq通道的配置类*/
@Slf4j
@Component
public class MQListener {/*@JmsListener(destination = "test_queue")public void jiant (String message) {System.out.println("监听到消息》:" + message);log.info("监听到消息---》:" + message);}*/@JmsListener(destination = "topic-model")public void reciveTopic(String message) {log.info("11接收主题消息》:"+message);}@JmsListener(destination = "topic-model2")public void topicReceive2(String message) {log.info("topic-2监听到消息---》:" + message);}/*** 主题名称从配置文件中动态获取 */@JmsListener(destination = "${amq.topicName.name-1}")public void topicReceive3(String message) {log.info("topic-3监听到消息---》:" + message);}}
6 junit 发送消息
- 调用类需要注入消息模板,队列名称或者主题名称
也可以编写接口发送,demo随个人习惯
@Slf4j
@SpringBootTest(classes = EcarAppClientApplication.class)
@RunWith(SpringRunner.class)
public class ActivemqTest {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue queueName;@Autowiredprivate Topic topic1;@Autowiredprivate Topic topic2;@Autowiredprivate Topic topic3;@Testpublic void queueSender() {String message="我是队列发出的信息";jmsMessagingTemplate.convertAndSend(queueName, message);log.info("queueSender发送--》:"+message);
// TimeUnit.SECONDS.sleep(60);}/*** 发送 主题消息(广播)* @author kazaf* @date 2024/4/24 17:09*/@Testpublic void topicSender() {String message="我是topic-1a";jmsMessagingTemplate.convertAndSend(topic1, message);log.info("topicSender发送--》:"+message);String message2="我是topic2";jmsMessagingTemplate.convertAndSend(topic2, message2);log.info("topic2Sender发送--》:"+message2);String message3="我是topic3发送的";jmsMessagingTemplate.convertAndSend(topic3, message3);log.info("topic3Sender发送--》:"+message3);}
}
三:tcm-chatgpt项目配置
- 1、2、3、5 跟e-car项目一样的配置
5 MQListener.java 监听消息
代码
/*** @Description: mq监听类* @Author: kazaf* @Date: 2024-04-24 9:34*/
@Slf4j
@Component
public class MQListener {@JmsListener(destination = "test_queue")public void onMessage(String message) {System.out.print(message);//获取到消息后可以干一些事情log.info("恰恰监听到的消息》:"+message);}@JmsListener(destination = "topic-model")public void reciveTopic(String message) {log.info("11接收主题消息》:"+message);}@JmsListener(destination = "topic-model2")public void reciveTopic2(String message) {log.info("22接收主题消息》:"+message);}
}
三 测试
启动activemq服务
-
window端直接启动 bin\win64\activemq.bat 批处理文件
-
访问 http://localhost:8161 输入 admin / admin 的默认用户名密码登录(根据自己是否修改过)
-
启动tcm-chatgpt服务
-
启动e-car服务
-
运行 ActivemqTest.java 测试类中的 queue发送消息或者Topic发送消息
队列
发布订阅模式
四:发送对象消息
配置队列名称
/*** 专门配置mq通道的配置类*/
@Slf4j
@Configuration
public class MQConfig {/**** @author kazaf* @date 2024/4/25 10:51*/@Bean(name = "textMessageQueue")Queue textMessageQueue() {return new ActiveMQQueue("textMessage-queue");}@Bean(name = "objMessageQueue")Queue objMessageQueue() {return new ActiveMQQueue("objMessage-queue");}
}
开启监听
@Slf4j
@Component
public class MQListener {/*** 主题名称从配置文件中动态获取*/@JmsListener(destination = "textMessage-queue")public void queueTextMessageReceive(String message) throws JMSException {log.info("queueTextMessageReceive监听到消息---》:" + message);}@JmsListener(destination = "objMessage-queue")public void queueMapMessageReceive(Map<String,Object> map) {log.info("queueObjMessageReceive-Map-监听到消息---》:"+ map.get("name")+"--money="+map.get("money"));}/*** 主题名称从配置文件中动态获取*/@JmsListener(destination = "objMessage-queue")public void queueObjMessageReceive(Message message) throws JMSException {if (message instanceof TextMessage) {log.info("queueObjMessageReceive-Text-监听到消息---》:" + ((TextMessage) message).getText());} else if (message instanceof MapMessage) {MapMessage mapMessage = (MapMessage) message;log.info("queueObjMessageReceive-Map-监听到消息---》:"+ mapMessage.getString("name")+"--money="+mapMessage.getString("money"));} else if (message instanceof ObjectMessage) {ObjectMessage objectMessage = (ObjectMessage) message;User user = (User) objectMessage.getObject();log.info("queueObjMessageReceive-obj-监听到消息---》:" + user);}log.info("queueObjMessageReceive-监听到消息--");}
测试类
@Slf4j
@SpringBootTest(classes = EcarAppClientApplication.class)
@RunWith(SpringRunner.class)
public class ActivemqTest {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate JmsTemplate jmsTemplate;@Autowiredprivate Queue textMessageQueue;@Autowiredprivate Queue objMessageQueue;@Testpublic void senderTextMessage() {jmsTemplate.send(textMessageQueue, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText("发送TextMessage消息");return message;}});}@Testpublic void senderObjMessage() {/*User user = new User();user.setUsername("李哥哥");user.setPassword("mima123");jmsMessagingTemplate.convertAndSend(objMessageQueue, user);*/Map<String, Object> map = new HashMap<>();map.put("name", "掐果果");map.put("money", 1231111111.55);jmsMessagingTemplate.convertAndSend(objMessageQueue, map);}
}