说明:SpringAMQP(官网:https://spring.io/projects/spring-amqp)是基于RabbitMQ封装的一套模板,并利用了SpringBoot对其实现了自动装配,使用起来非常方便。安装和原始使用参考:http://t.csdn.cn/51qyD
基础操作
创建两个模块,一个用于发送消息(sender),一个用于接收消息(receiver),两个模块拥有共同的父模块
第一步:添加依赖
在父模块的pom.xml文件中,添加依赖,如下:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/></parent><dependencies><!--lombok依赖,用于生成set、get、toString方法--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
第二步:创建配置文件
配置文件(application.yml)内容如下,两个模块的内容一样
spring:rabbitmq:# MQ ip地址host: XXX.XXX.XXX.XXX# MQ的端口号port: 5672# 虚拟主机 每个用户单独对应一个 不同用户之间无法访问彼此的虚拟主机virtual-host: /# 用户名username: root# 密码password: 123456
第三步:创建Listener类
在接收方,创建监听类,用来接收消息,如下:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitListenerDemo {@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage(String msg){System.out.println("msg = " + msg);}
}
第四步:编写发送端代码
在发送方的测试类中,写测试代码,发消息给接收方,其中RunWith()注解用于构建程序运行的上下文环境;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSender() {rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!");}
}
第五步:启动
先启动接收方(这是因为,如果队列在RabbitMQ管理平台上不存在的话,先启动发送方会造成消息丢失,而先启动接收方,RabbitMQ会根据队列名先创建出队列),再启动发送方;
可以看到,测试完成,接收方可以接收到消息
工作队列
实际的业务情况是一个发送方,可能会有多个接收方来接收,而且接收方处理效率可能各不相同。这样,接收方的代码可以写成这样,使用线程休眠模拟接收方执行的效率,再设置变量用于统计各个接收方执行的次数:
(RabbitListenerDemo.java)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitListenerDemo {private static int count1 = 0;private static int count2 = 0;private static int count3 = 0;@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage1(String msg) throws InterruptedException {System.out.println("msg1 = " + msg + "======= count1 =" + (++count1));Thread.sleep(10);}@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage2(String msg) throws InterruptedException {System.out.println("msg2 = " + msg + "======= count2 =" + (++count2));Thread.sleep(20);}@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage3(String msg) throws InterruptedException {System.out.println("msg3 = " + msg + "======= count3 =" + (++count3));Thread.sleep(50);}
}
(SenderTest:循环发送200次,休眠10毫秒)
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSender() throws InterruptedException {for (int i = 0; i < 200; i++) {rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!======>" + i);Thread.sleep(10);}}
}
启动,可以看到执行效率最低的3号,也和1号、2号接收到了等量的消息量,
这是因为RabbitMQ有默认的分配策略,使每个接收方都可以接收到等量的消息量,而不是处理越快的处理越多。可以在接收方的配置文件中,添加这个配置,表示每个接收方只能一个消息一个消息处理(可以推测默认是先按照接收方数量,把请求都平均分配好之后,再让它们各自处理的);
spring:rabbitmq:listener:simple:prefetch: 1
重启测试,可以看到,达到了“能者多劳”的效果
发布/订阅
发布/订阅,是指在消息发给队列前,对消息所绑定的队列信息做判断,然后按照绑定的队列对消息进行分发;
根据分发的情况,可分为以下三种:
-
广播(Fanout):消息分发给所有队列;
-
路由(Direct):消息只分发给拥有关键字(RoutingKey)的队列;
-
主题(Topic):消息只分发给符合条件的队列;
Fanout(广播)
创建一个广播配置类,用于绑定队列与广播交换机(FanoutExchange);
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 广播配置类*/
@Configuration
public class FanoutConfig {/*** 声明交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("essay.fanout");}/*** 生成第一个队列* @return*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机** @return*/@Beanpublic Binding bindingQueue1(){return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/*** 生成第二个队列* @return*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机** @return*/@Beanpublic Binding bindingQueue2(){return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}
}
接收方代码
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("接收者1接收到了消息:" + msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.out.println("接收者2接收到了消息:" + msg);}
发送方代码:消息并不直接发送给队列,而是发送个交换机;
@Testpublic void fanoutExchangeTest(){// 第二个参数是routeKey(路由转发关键字)不能不加,可以为空字符串rabbitTemplate.convertAndSend("essay.fanout","", "hello everyone!");}
测试结果,每个队列都接收到了消息,并发给各自的接收方
Direct(路由)
在接收方的接收方法上,创建对应的队列、路由交换机,并设置routeKey(路由关键字),接收者1号(group1, group2),接收者2号(group1, group3)
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),key = {"group1", "group2"}))public void listenDirectQueue1(String msg){System.out.println("接收者1号接收到了消息:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),key = {"group1", "group3"}))public void listenDirectQueue2(String msg){System.out.println("接收者2号接收到了消息:" + msg);}
发送方发送消息,routeKey = group1
rabbitTemplate.convertAndSend("essay.direct","group1", "hello group1!");
发送方发送消息,routeKey = group2
rabbitTemplate.convertAndSend("essay.direct","group2", "hello group2!");
只有接收者1号拥有group2,故只有接收者1号接收到消息
Topic(主题)
与路由类似,不同的是ExchangeTypes的类型和key的组成,key由通配符和关键字组成
-
#:表示一个或多个字符;
-
*:表示一个字符;
如下面的三个key分别表示:
-
group.#:表示以“group”开头的消息都发过来;
-
#.class:表示以“class”结尾的消息都发过来;
-
*.person:表示两个字符,并以“person”结尾的消息都发过来;
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),key = "group.#"))public void listenTopicQueue1(String msg){System.out.println("接收者1号接收到了消息:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),key = "#.class"))public void listenTopicQueue2(String msg){System.out.println("接收者2号接收到了消息:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue3"),exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),key = "*.person"))public void listenTopicQueue3(String msg){System.out.println("接收者3号接收到了消息:" + msg);}
发送方测试
// 1号接收rabbitTemplate.convertAndSend("essay.topic","group.b.c.d", "hello NO.1!");// 2号接收rabbitTemplate.convertAndSend("essay.topic","b.c.d.class", "hello NO.2!");// 3号接收rabbitTemplate.convertAndSend("essay.topic","b.person", "hello NO.3!");
启动,测试结果如下,可以看到达到了预期结果
总结
RabbitMQ是一门异步通信的技术,SpringAMQP是基于RabbitMQ的模版,可以省去原始操作RabbitMQ的繁琐(建立连接、设置连接参数、创建通道、创建队列、发送消息/接收消息)。
另外,可以使用SpringAMQP建立工作队列、发布/订阅等模式,其中工作队列可设置spring.rabbitmq.listener.simple.prefetch=1
,达到“能者多劳”的效果;
而发布/订阅模式又分为广播、路由和主题,广播模式需要手动建立队列和路由交换机的关联,路由与主题的区别在于路由交换机的类型和路由关键字的格式。