【RabbitMQ与SpringBoot集成测试收发消息】
- 一、环境说明
- 二、实验步骤
- 三、小结
一、环境说明
- 安装环境:虚拟机VMWare + Centos7.6 + Maven3.6.3 + JDK1.8
- RabbitMQ版本:rabbitmq-server-3.8.8-1.el7.noarch.rpm
- 编程工具Idea + 运行JDK为17
二、实验步骤
-
在RabbitMQ的UI界面或命令行上
创建新的Virtual Host
,取名为vhtest02
,如下图所示:
-
使用Idea的
Spring Initializr
创建生产者工程springrabbitmqtest
,坐标如下:
-
配置application.properties,可参考添加如下内容:
spring.rabbitmq.host=192.168.36.132 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=vhtest02 spring.rabbitmq.username=sujiangming spring.rabbitmq.password=openGauss@1234
根据你自己的环境改成你自己的ip、port、virtual-host、用户名和密码。
-
编写生产者配置类,用于创建Exchange、Queue以及将两者绑定在一起,代码如下:
类名为:com.rabbitmq.springboot.config.RabbitMqConfig
,代码如下所示:package com.rabbitmq.springboot.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitMqConfig {//创建交换机@Bean(name = "topicExchange")public TopicExchange topicExchange(){return new TopicExchange("springboot_topic_exchange");}//创建队列@Bean(name = "topicQueueSpringBoot")public Queue topicQueue(){return QueueBuilder.durable("springboot_topic_queue").build();}//队列绑定交换机@Beanpublic Binding bindingExchangeTopicQueue(@Qualifier("topicQueueSpringBoot") Queue queue,@Qualifier("topicExchange")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("log.#").noargs();} }
-
修改
com.rabbitmq.springboot.SpringRabbitMqTestApplicationTests
类,添加注解和测试方法,具体代码如下:package com.rabbitmq.springboot;import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest class SpringRabbitMqTestApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage() throws InterruptedException {//convertAndSend(交换机名称,路由key,消息内容)int temp = 0;while (true){rabbitTemplate.convertAndSend("springboot_topic_exchange","log.info","发送了info消息" + temp);rabbitTemplate.convertAndSend("springboot_topic_exchange","log.error","发送了error消息"+ temp);rabbitTemplate.convertAndSend("springboot_topic_exchange","log.warning","发送了warning消息"+ temp);temp++;Thread.sleep(2000);}}@Testvoid contextLoads() {}}
该程序会一直运行,因为我加了while(true),模拟用户一直产生数据。
-
运行测试:运行
com.rabbitmq.springboot.SpringRabbitMqTestApplicationTests
类中的方法testSendMessage()
,正常运行会看到如下内容:
-
编写消费者工程,具体创建工程如步骤2所示;
-
修改application.properties,如步骤3所示,可直接复步骤3内容即可;
-
创建监听类:
com.rabbitmq.consumer.listener.MessageListener
,用于监听某个队列的消息,一旦监听到有数据,立马进行消费,代码如下:package com.rabbitmq.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** 消息监听器*/ @Component public class MessageListener {/*** 监听某个队列的消息* @param msg 接收到的消息*/@RabbitListener(queues = "springboot_topic_queue")public void topicListener(String msg){System.out.println("接收到消息:" + msg);} }
-
修改启动类:
SpringbootRabbitmqConsumerApplication
,在其类上面添加注解@ComponentScan("com.rabbitmq.consumer.*")
,如不添加该注解运行会自动退出,修改好如下图所示:
-
运行测试:运行
SpringbootRabbitmqConsumerApplication
类,正常情况下会看到如下内容:
三、小结
本文参考了来自网络上的资料,
如有侵权,请及时联系博主进行删除
。本文仅是博主本人在学习过程中作为学习笔记使用,常言道:好记性不如烂笔头。如本文对您有所帮助,请您动动发财的手指给博主点个赞
,谢谢您的阅读~~~