文章目录
- 一.搭建SpringBoot环境
- 二.配置
- 1.配置application.yml
- 2.定义RabbitConfig类
- 三.生产端
- 四.消费端
一.搭建SpringBoot环境
我们选择基于Spring-Rabbit去操作RabbitMQ
使用spring-boot-starter-amqp会自动添加spring-rabbit依赖,如下:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
二.配置
1.配置application.yml
配置连接rabbitmq的参数
server:port: 44000
spring:application:name: test-rabbitmq-producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /
2.定义RabbitConfig类
定义RabbitConfig类,配置Exchange、Queue、及绑定交换机
本例配置Topic交换机
package com.xuecheng.test.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqConfig {public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";public static final String QUEUE_INFORM_SMS = "queue_inform_sms";public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";public static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static final String ROUTINGKEY_SMS="inform.#.sms.#";//声明交换机@Bean(EXCHANGE_TOPICS_INFORM)public Exchange EXCHANGE_TOPICS_INFORM(){//durable(true) 持久化,mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();}//声明QUEUE_INFORM_EMAIL队列@Bean(QUEUE_INFORM_EMAIL)public Queue QUEUE_INFORM_EMAIL(){return new Queue(QUEUE_INFORM_EMAIL);}//声明QUEUE_INFORM_SMS队列@Bean(QUEUE_INFORM_SMS)public Queue QUEUE_INFORM_SMS(){return new Queue(QUEUE_INFORM_SMS);}//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey@Beanpublic Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS队列绑定交换机,指定routingKey@Beanpublic Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}
三.生产端
使用RarbbitTemplate发送消息
package com.xuecheng.test.rabbitmq;
import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendByTopics(){
for (int i=0;i<5;i++){
String message = "sms email inform to user"+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
System.out.println("Send Message is:'" + message + "'");
}
}
}
四.消费端
创建消费端工程,添加依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency
使用@RabbitListener注解监听队列
package com.xuecheng.test.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveHandler {
//监听email队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg,Message message,Channel channel){
System.out.println(msg);
} /
/监听sms队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg,Message message,Channel channel){
System.out.println(msg);
}
}