SpringAMQP

目录

利用SpringAMQP实现HelloWorld中的基础消息队列功能:

1.在父工程中引入spring-amqp的依赖

2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列

模拟WorkQueue,实现一个队列绑定多个消费者:

1.在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

2.在consumer服务中定义两个消息监听者,都监听simple.queue队列,消费者1每秒处理50条消息,消费者2每秒处理10条消息

发布和订阅:

发布订阅-Fanout Exchange

利用SpringAMQP演示FanoutExchange的使用

1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定

2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

3.在publisher中编写测试方法,向itcast.fanout发送消息

发布订阅-DirectExchange

1.利用@RabbitListener声明Exchange、Queue、RoutingKey,在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

2.在publisher中编写测试方法,向itcast. direct发送消息

发布订阅-TopicExchange

1.并利用@RabbitListener声明Exchange、Queue、RoutingKey,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

2.在publisher中编写测试方法,向itcast. topic发送消息

测试发送Object类型消息

​编辑 消息转换器


什么是AMQP?
·应用间消息通信的一种协议,与语言和平台无关。

SpringAMQP如何发送消息?
·引入amqp的starter依赖

·配置RabbitMQ地址
·利用RabbitTemplate的convertAndSend方法


SpringAMQP如何接收消息?

·引入amqp的starter依赖

·配置RabbitMQ地址
·定义类,添加@Component注解
·类中声明方法,添加@RabbitListener注解,方法参数就时消息

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能


Work模型的使用:
·多个消费者绑定到一个队列,同一条消息只会被一个
消费者处理
·通过设置prefetch来控制消费者预取的消息数量


交换机的作用是什么?

·接收publisher发送的消息
·将消息按照规则路由到与之绑定的队列

·不能缓存消息,路由失败,消息丢失
·FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

·Queue
·FanoutExchange

·Binding


描述下Direct交换机与Fanout交换机的差异?

·Fanout交换机将消息路由给每一个与之绑定的队列

·Direct交换机根据RoutingKey判断路由给哪个队列
·如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

·@Queue
·@Exchange


SpringAMQP中消息的序列化和反序列化是怎么实现的?

·利用MessageConverter实现的,默认是JDK的序列化

·注意发送方与接收方必须使用相同的MessageConverter

SpringAMQP官网:https://spring.io/projects/spring-amqp

什么是AMQP: 

Advanced Message Queuing Protocol,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

什么是SpringAMQP:

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

所有流程的文件相对位置:

利用SpringAMQP实现HelloWorld中的基础消息队列功能:

1.在父工程中引入spring-amqp的依赖

在mq-demo的pom文件导入:

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

1.在publisher服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: xxx.xxx.xxx.xxx #RabbitMQ的虚拟机ipport: 5672 #Rabbit的端口username: itcastpassword: 123321virtual-host: / #该username的虚拟主机
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: xxx.xxx.xxx.xxx #RabbitMQ的虚拟机ipport: 5672 #Rabbit的端口username: itcastpassword: 123321virtual-host: / #该username的虚拟主机

 2.在publisher服务中新建一个测试类,编写测试方法:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}

}

package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}}

运行该测试用例,会看到成功运行

此时在RabbitMQ页面中可以看到simple.queue队列有一条数据

点击simple.queue,进入该服务中看,能看到发送的hello,spring amqp

3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列

1.在consumer服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: xxx.xxx.xxx.xxx #RabbitMQ的虚拟机ipport: 5672 #Rabbit的端口username: itcastpassword: 123321virtual-host: / #该username的虚拟主机
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: XXX.XXX.XXX.XXX #RabbitMQ的虚拟机ipport: 5672 #Rabbit的端口username: itcastpassword: 123321virtual-host: / #该username的虚拟主机

 2.在consumer服务中新建一个类,编写消费逻辑:

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}}
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}}

 运行该代码,可以看到运行成功,并接受到了simple.queue中的消息

并且消息也就没了 

模拟WorkQueue,实现一个队列绑定多个消费者:

1.在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

处理消息的发送,发送50条,并有20毫米的延迟,不然太快了

@Test
public void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__!";for(int i=1;i<=50;i++){rabbitTemplate.convertAndSend(queueName,message + i);Thread.sleep(20);}
}
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__!";for(int i=1;i<=50;i++){rabbitTemplate.convertAndSend(queueName,message + i);Thread.sleep(20);}}}

2.在consumer服务中定义两个消息监听者,都监听simple.queue队列,消费者1每秒处理50条消息,消费者2每秒处理10条消息

处理消息的接收,用两个接受者,看看处理的情况

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【"+ msg + "】" + LocalTime.now());Thread.sleep(20);
}

        

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2....接收到消息:【"+ msg + "】" + LocalTime.now());Thread.sleep(200);
}
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【"+ msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2....接收到消息:【"+ msg + "】" + LocalTime.now());Thread.sleep(200);}}

启动consumer的启动类,再启动publisher的测试类。

此时可以看到,消费者1和消费者2分别处理偶数和奇数的消息。

说明就算消费者2的处理时间是消费者1的十倍,但是它仍然是和消费者1分别拿了一半的消息。

修改consumer的application.yml文件

listener:simple:prefetch: 1 #每次只取一条,等处理完成后再继续取
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.80.129 #RabbitMQ的虚拟机ipport: 5672 #Rabbit的端口username: itcastpassword: 123321virtual-host: / #该username的虚拟主机listener:simple:prefetch: 1 #每次只取一条,等处理完成后再继续取

重新再运行consumer的启动类和publisher的测试类。此时可以看到消费者2不再跟消费者1一人一半消息了。

发布和订阅:

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

常见exchange类型包括:
1、Fanout:广播
2、Direct:路由

3、Topic:话题

注意: exchange负责消息路由,而不是存储,路由失败则消息丢失

发布订阅-Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

利用SpringAMQP演示FanoutExchange的使用

1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定

在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:

@Configuration
public class FanoutConfig {//交换机名字:itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//队列名称: fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//队列名称: fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {//交换机名字:itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//队列名称: fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//队列名称: fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

启动consumer的启动类

可以在RabbitMQ的网站上看到这两个队列

可以在RabbitMQ的网站上看到该交换机

点击该交换机,可以看到里面绑定了两个队列

2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg)  {System.out.println("消费者接收到fanout.queue1的消息:【"+ msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg)  {System.out.println("消费者接收到fanout.queue2的消息:【"+ msg + "】");
}
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg) {
//        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
//    }
////    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(20);
//    }//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue2(String msg) throws InterruptedException {
//        System.err.println("消费者2....接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(200);
//    }@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg)  {System.out.println("消费者接收到fanout.queue1的消息:【"+ msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg)  {System.out.println("消费者接收到fanout.queue2的消息:【"+ msg + "】");}}

3.在publisher中编写测试方法,向itcast.fanout发送消息

@Test
public void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello,everyone!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);
}
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__!";for(int i=1;i<=50;i++){rabbitTemplate.convertAndSend(queueName,message + i);Thread.sleep(20);}}@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello,everyone!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);}}

 运行publisher的测试类和consumer的启动类,可以看到两个队列都收到了

发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

1、每一个Queue都与Exchange设置一个BindingKey

2、发布者发送消息时,指定消息的RoutingKey
3、Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

1.利用@RabbitListener声明Exchange、Queue、RoutingKey,在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}))
public void ListenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【"+ msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}))
public void ListenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【"+ msg + "】");
}
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg) {
//        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
//    }
////    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(20);
//    }//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue2(String msg) throws InterruptedException {
//        System.err.println("消费者2....接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(200);
//    }@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg)  {System.out.println("消费者接收到fanout.queue1的消息:【"+ msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg)  {System.out.println("消费者接收到fanout.queue2的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void ListenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void ListenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【"+ msg + "】");}}

运行consumer的启动类,在RabbitMQ的网页中可以看到多出了itcast.direct的交换机

队列也多出两个direct.queue

 可以看到两个队列分别绑定了两个key

2.在publisher中编写测试方法,向itcast. direct发送消息

@Test
public void testSendDirectExchange(){//交换机名称String exchangeName = "itcast.direct";//消息String message = "hello,red!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"red",message);
}
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__!";for(int i=1;i<=50;i++){rabbitTemplate.convertAndSend(queueName,message + i);Thread.sleep(20);}}@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello,everyone!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);}@Testpublic void testSendDirectExchange(){//交换机名称String exchangeName = "itcast.direct";//消息String message = "hello,red!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"red",message);}}

通过修改message的内容,绑定了不同key的队列会收到不同的消息

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。

Queue与Exchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词
*:代指一个单词

1.并利用@RabbitListener声明Exchange、Queue、RoutingKey,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"
))
public void ListenTopictQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【"+ msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"
))
public void ListenTopictQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【"+ msg + "】");
}
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg) {
//        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
//    }
////    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(20);
//    }//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue2(String msg) throws InterruptedException {
//        System.err.println("消费者2....接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(200);
//    }@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg)  {System.out.println("消费者接收到fanout.queue1的消息:【"+ msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg)  {System.out.println("消费者接收到fanout.queue2的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void ListenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void ListenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void ListenTopictQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void ListenTopictQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【"+ msg + "】");}}

运行后,在RabbitMQ网页中,可以看到,队列中出现了两个topic.queue

在交换机中也出现了itcas.topic

 点击该交换机,可以看到绑定了两个topic.queue,并且绑定了不同的key。

2.在publisher中编写测试方法,向itcast. topic发送消息

@Test
public void testSendTopicExchange(){//交换机名称String exchangeName = "itcast.topic";//消息String message = "天气!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
}
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__!";for(int i=1;i<=50;i++){rabbitTemplate.convertAndSend(queueName,message + i);Thread.sleep(20);}}@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello,everyone!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);}@Testpublic void testSendDirectExchange(){//交换机名称String exchangeName = "itcast.direct";//消息String message = "hello,red!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"red",message);}@Testpublic void testSendTopicExchange(){//交换机名称String exchangeName = "itcast.topic";//消息String message = "天气!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);}}

可以看到,通过修改不同的message,会由不同的队列接收

测试发送Object类型消息

说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

编写一个队列,在FanoutConfig里写

@Bean
public Queue objectQueue(){return new Queue("object.queue");
}
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {//交换机名字:itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//队列名称: fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//队列名称: fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}@Beanpublic Queue objectQueue(){return new Queue("object.queue");}
}

 运行consumer的启动类,在RabbitMQ网页中可以看到object.queue的队列

编写消息

@Test
public void testSendObjectQueue(){Map<String,Object> msg = new HashMap<>();msg.put("name","小明");msg.put("age",21);rabbitTemplate.convertAndSend("object.queue",msg);
}
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.HashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName,message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__!";for(int i=1;i<=50;i++){rabbitTemplate.convertAndSend(queueName,message + i);Thread.sleep(20);}}@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello,everyone!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);}@Testpublic void testSendDirectExchange(){//交换机名称String exchangeName = "itcast.direct";//消息String message = "hello,red!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"red",message);}@Testpublic void testSendTopicExchange(){//交换机名称String exchangeName = "itcast.topic";//消息String message = "天气!";//发送消息rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);}@Testpublic void testSendObjectQueue(){Map<String,Object> msg = new HashMap<>();msg.put("name","小明");msg.put("age",21);rabbitTemplate.convertAndSend("object.queue",msg);}}

启动该测试类,在RabbitMQ网页中可以看到object.queue已经有新的消息

点击该队列,看到消息变成了奇怪的序列化

 消息转换器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter类型的Bean即可。推荐用JSON方式序列化

在mq-demo的pom文件中添加:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency></dependencies>
</project>

在publisher的启动类声明一个bean

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}
package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

 启动测试类,可以看到该内容是正常的了

在consumer中,同样是声明bean,再写一个监听的函数

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}
package cn.itcast.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息:" + msg);
}
package cn.itcast.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component
public class SpringRabbitListener {
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg) {
//        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
//    }
////    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(20);
//    }//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue2(String msg) throws InterruptedException {
//        System.err.println("消费者2....接收到消息:【"+ msg + "】" + LocalTime.now());
//        Thread.sleep(200);
//    }@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg)  {System.out.println("消费者接收到fanout.queue1的消息:【"+ msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg)  {System.out.println("消费者接收到fanout.queue2的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void ListenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void ListenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void ListenTopictQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【"+ msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void ListenTopictQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【"+ msg + "】");}@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息:" + msg);}
}

 启动consumer的启动类,可以看到成功接收到消息

代码文件点击下载icon-default.png?t=N7T8https://pan.baidu.com/s/1wXL7uWpLXu7X-OWuUZRUKA?pwd=tnxl 上一篇:RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/217150.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python Bokeh库详解:交互式数据可视化

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 数据可视化在数据分析和科学研究中起着至关重要的作用。而Bokeh&#xff08;发音为“bo-kay”&#xff09;是一个强大的Python交互式可视化库&#xff0c;提供了丰富的工具和功能&#xff0c;使得创建交互式、吸…

西工大网络空间安全学院计算机系统基础实验二(phase_3,phase_4,phase_5)

大家千万不要着急&#xff0c;不要慌张&#xff0c;即使自己并不了解多少汇编代码的知识&#xff0c;即使自己计基课上的基础知识学的并不扎实&#xff0c;也都不要紧&#xff0c;因为这次计基实验考察的重点并不是基础知识&#xff0c;而是对gdb工具的掌握&#xff0c;以及心细…

基于.NET Core + Quartz.NET+ Vue + IView开箱即用的定时任务UI

前言 定时任务调度应该是平时业务开发中比较常见的需求&#xff0c;比如说微信文章定时发布、定时更新某一个业务状态、定时删除一些冗余数据等等。今天给大家推荐一个基于.NET Core Quartz.NET Vue IView开箱即用的定时任务UI&#xff08;不依赖数据库,只需在界面做简单配…

Spring Boot学习随笔- 集成JSP模板(配置视图解析器)、整合Mybatis(@MapperScan注解的使用)

学习视频&#xff1a;【编程不良人】2021年SpringBoot最新最全教程 第五章、JSP模板集成 5.1 引入JSP依赖 <!--引入jsp解析依赖--> <!--C标签库--> <dependency><groupId>jstl</groupId><artifactId>jstl</artifactId><version&…

第二十一章网络通信

网络程序设计基础 局域网与互联网 为了实现两台计算机的通信&#xff0c;必须用一个网络线路连接两台计算机。如下图所示 网络协议 1.IP协议 IP是Internet Protocol的简称&#xff0c;是一种网络协议。Internet 网络采用的协议是TCP/IP协议&#xff0c;其全称是Transmissio…

java系列-LinkedHashMap

1.插入新节点时&#xff0c;会将该节点加到链表尾部 public class LinkedHashMap<K,V> extends HashMap<K,V> implements Map<K,V>{/*** The head (eldest) of the doubly linked list.*/transient LinkedHashMapEntry<K,V> head;/*** The tail (young…

【小红书运营指南2】小红书自律标签的达人分解

小红书标签的达人分解 写在最前面11.27初步想法达人分析 标签拆解&#xff08;速览版&#xff09;分析应用 思路 相关标签拆解&#xff08;详细版&#xff09;11、职场-职场干货 文化薯&#xff08;创业&#xff0c;也是专注知识付费&#xff0c;可以对标学习&#xff09;笔记画…

用modelbox server启动流程图,暴露Restful接口

背景 假设你已经搭建了modelbox开发容器&#xff0c;能够使用webUI构建流程图。如果没有请参考昇腾npu上构建modelbox webUI开发容器教程。 现在&#xff0c;本文会说明&#xff0c;如何在终端用命令的方式将流程图暴露为服务&#xff0c;并能够在本地用postman访问。 本文参…

华为OD机试三(完全二叉树部分后续遍历、猜密码、五子棋谜)

1. 完全二叉树部分后续遍历 题目表述&#xff1a;输入&#xff1a; 1 2 3 4 5 6 7 输出&#xff1a; 2 3 1 示例代码&#xff1a; # 测试数据 test_data [1,2,3,4,5,6,7] # 找出非叶子节点 new_list [] for i in test_data:left 2 * iright left 1if left in test_data…

使用 Webshell 访问 SQL Server 主机并利用 SSRS

RDS SQL Server提供Webshell功能&#xff0c;用户可以通过Web界面登录RDS SQL Server 实例的操作系统。通过Webshell&#xff0c;用户可以在RDS SQL Server实例上执行命令、上传和下载文件以及执行各种操作。Webshell 提供了一种方便高效的远程管理方法&#xff0c;尤其是在 SS…

如何使用nacos进行配置管理以及代码集成

首先需要在maven的pom文件中引入nacos-config依赖 <!--nacos配置管理依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency> 在项目中添加boo…

Python 全栈体系【四阶】(四)

第四章 机器学习 一、人工智能概述 1. 什么是人工智能 人工智能&#xff08;Artificial Intelligence&#xff09;是计算机科学的一个分支学科&#xff0c;主要研究用计算机模拟人的思考方式和行为方式&#xff0c;从而在某些领域代替人进行工作。 2. 人工智能的学科体系 …

输出网络结构图,mmdetection

控制台输入&#xff1a;python tools/train.py /home/yuan3080/桌面/detection_paper_6/mmdetection-master1/mmdetection-master_yanhuo/work_dirs/lad_r50_paa_r101_fpn_coco_1x/lad_r50_a_r101_fpn_coco_1x.py 这个是输出方法里面的&#xff0c;不是原始方法。 如下所示&a…

分层自动化测试的实战思考!

自动化测试的分层模型 自动化测试的分层模型&#xff0c;我们应该已经很熟悉了&#xff0c;按照分层测试理念&#xff0c;自动化测试的投入产出应该是一个金字塔模型。越是向下&#xff0c;投入/产出比就越高&#xff0c;但开展的难易程度/成本和技术要求就越高&#xff0c;但…

附录C 流水线:基础与中级概念

1. 引言 1.1 什么是流水线&#xff1f; 流水线爱是一种将多条指令重叠执行的实现技术&#xff0c;它利用了一条指令所需的多个操作之间的并行性。&#xff08;指令操作的非原子性和指令类型的多样性&#xff09; 在计算流水线中&#xff0c;每个步骤完成指令的一部分&#x…

Leetcode143 重排链表

重排链表 题解1 线性表 给定一个单链表 L 的头节点 head &#xff0c;单链表 L 表示为&#xff1a; L0 → L1 → … → Ln - 1 → Ln请将其重新排列后变为&#xff1a; L0 → Ln → L1 → Ln - 1 → L2 → Ln - 2 → …不能只是单纯的改变节点内部的值&#xff0c;而是需要实际…

现代物流系统的分析与设计

目 录 引言 3一、系统分析 4 &#xff08;一&#xff09;需求分析 4 &#xff08;二&#xff09;可行性分析 4 二、 总体设计 4 &#xff08;一&#xff09;项目规划 4 &#xff08;二&#xff09;系统功能结构图 5 三、详细设计 6 &#xff08;一&#xff09;系统登录设计 6 …

【技术分享】企业网必不可少的NAT技术

NAT是一种地址转换技术&#xff0c;它可以将IP数据报文头中的IP地址转换为另一个IP地址&#xff0c;并通过转换端口号达到地址重用的目的。NAT作为一种缓解IPv4公网地址枯竭的过渡技术&#xff0c;由于实现简单&#xff0c;得到了广泛应用。 NAT解决了什么问题&#xff1f; 随…

【lesson11】表的约束(4)

文章目录 表的约束的介绍唯一键约束测试建表插入测试建表插入测试建表插入测试修改表插入测试 表的约束的介绍 真正约束字段的是数据类型&#xff0c;但是数据类型约束很单一&#xff0c;需要有一些额外的约束&#xff0c;更好的保证数据的合法性&#xff0c;从业务逻辑角度保…

关于学习计算机的心得与体会

也是隔了一周没有发文了&#xff0c;最近一直在准备期末考试&#xff0c;后来想了很久&#xff0c;学了这么久的计算机&#xff0c;这当中有些收获和失去想和各位正在和我一样在学习计算机的路上的老铁分享一下&#xff0c;希望可以作为你们碰到困难时的良药。先叠个甲&#xf…