SpringAMQP 快速入门

SpringAMQP 快速入门

    • 1. 创建项目
    • 2. 快速入门
      • 2.2.1 消息发送
      • 2.2.2 消息接收
    • 3. 交换机
      • 3.1 Fanout Exchange(扇出交换机)
        • 3.1.1 创建队列与交换机
        • 3.1.2 消息接收
        • 3.1.3 消息发送
      • 3.2 Direct Exchange(直连交换机)
        • 3.2.1 创建交换机与队列 绑定 Routing Key
        • 3.2.2 消息接收
        • 3.1.3 消息发送
      • 3.3 Topic Exchange(主题交换机)
        • 3.3.1 创建交换机与队列 绑定 Routing Key
        • 3.3.2 消息接收
        • 3.3.3 消息发送
    • 4. 声明队列和交换机
      • 4.1 声明队列
      • 4.2 声明交换机
      • 4.3 声明绑定关系
      • 4.4 测试
      • 4.5 基于注解声明
    • 5. 消息转换器
      • 5.1 测试默认消息转换器
      • 5.2 配置JSON转换器
      • 5.3 消息接收

Spring AMQP(Advanced Message Queuing Protocol)是 Spring 框架的一个模块,用于简化在基于消息的应用程序中使用消息队列的开发。它建立在 AMQP 协议之上,提供了与消息中间件(如 RabbitMQ)集成的便捷方式。

以下是 Spring AMQP 的主要特点和概念:

  1. 简化的消息生产者和消费者: Spring AMQP 提供了简单的模板(AmqpTemplate)用于发送和接收消息,大大简化了消息生产者和消费者的开发。
  2. 声明式的消息监听器容器: Spring AMQP 允许使用注解声明消息监听器,而不需要手动编写消息消费者。通过 @RabbitListener 注解,可以将一个方法标识为消息监听器,以便在接收到消息时自动调用该方法。
  3. 消息转换: Spring AMQP 提供了灵活的消息转换机制,可以将消息从一种格式转换为另一种格式,以便与不同类型的消息队列进行集成。
  4. 声明式的队列、交换机和绑定: 使用 Spring AMQP,可以通过注解声明式地定义队列、交换机和绑定关系,而不需要在代码中显式创建这些对象。
  5. 事务支持: Spring AMQP 支持事务,可以在消息发送和接收过程中使用事务来确保消息的可靠性。
  6. 异常处理: 提供了丰富的异常体系,方便开发者处理在消息处理过程中可能发生的异常情况。
  7. 集成 Spring Boot: Spring AMQP 很好地集成到 Spring Boot 中,通过简单的配置即可快速搭建基于消息的应用。
  8. 并发性: 允许配置消息监听器容器的并发性,以便同时处理多个消息。

1. 创建项目

选择 AMQP 依赖

在这里插入图片描述

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hzy</groupId><artifactId>SpringAMQP-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringAMQP-demo</name><description>SpringAMQP-demo</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

在SpringAMQP-demo中新建两个子模块 publisherconsumer

在这里插入图片描述

2. 快速入门

前面在消息队列 - RabbitMQ这篇博客中添加了一个 test 用户和 /test 虚拟主机,现在在 /test 中创建一个队列

在这里插入图片描述

2.2.1 消息发送

配置 publisherapplication.yml 文件

spring:rabbitmq:host: 192.168.193.40port: 5672username: testpassword: testvirtual-host: /test

然后在publisher中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testQueue1() {// 队列名称String queueName = "test.queue1";// 消息String message = "hello";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

成功发送到 test.queue1

在这里插入图片描述

2.2.2 消息接收

consumerapplication.yml中添加配置:

spring:rabbitmq:host: 192.168.193.40port: 5672username: testpassword: testvirtual-host: /test

新建一个类SpringRabbitListener 监听队列 test.queue1

@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 方法体中接收的就是消息体的内容@RabbitListener(queues = "test.queue1")public void listenTestQueueMessage(String msg) {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

启动服务后就接收到了消息

在这里插入图片描述

队列中的消息被消费

在这里插入图片描述

3. 交换机

3.1 Fanout Exchange(扇出交换机)

扇出交换机将消息广播到与交换机绑定的所有队列,忽略路由键。适用于广播消息给多个消费者的场景,不关心消息的具体内容。

3.1.1 创建队列与交换机

创建队列

在这里插入图片描述
创建交换机

在这里插入图片描述

绑定队列到交换机

在这里插入图片描述

3.1.2 消息接收

SpringRabbitListener中添加两个方法分别监听 fanout.queue1 和 fanout.queue2

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1Message(String msg) {System.out.println("spring 消费者接收 fanout.queue1 消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void  listenFanoutQueue2Message(String msg) {System.out.println("spring 消费者接收 fanout.queue2 消息:【" + msg + "】");}
3.1.3 消息发送

SpringAmqpTest 中添加消息发送方法

    @Testpublic void SendFanoutExchange() {// 交换机名称String exchangeName = "test.fanout";// 消息String message = "hello, test.fanout";rabbitTemplate.convertAndSend(exchangeName, "", message);}

成功接收到消息

在这里插入图片描述

3.2 Direct Exchange(直连交换机)

直连交换机是最简单的交换机类型,它将消息路由到与消息中的路由键完全匹配的队列中,在消息生产者指定的路由键和队列的绑定键完全相同时,消息将被发送到相应的队列。

3.2.1 创建交换机与队列 绑定 Routing Key

创建队列

在这里插入图片描述

创建交换机

在这里插入图片描述

绑定 Routing Key

在这里插入图片描述

3.2.2 消息接收

SpringRabbitListener中添加两个方法分别监听 fanout.queue1 和 fanout.queue2

    @RabbitListener(queues = "direct.queue1")public void listenDirectQueue1Message(String msg) {System.out.println("spring 消费者1接收 direct.queue1 消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void  listenFanoutQueue2Message(String msg) {System.out.println("spring 消费者2接收 direct.queue2 消息:【" + msg + "】");}
3.1.3 消息发送

SpringAmqpTest 中添加消息发送方法

    @Testpublic void SendDirectExchange() {// 交换机名称String exchangeName = "test.direct";// 消息String message = "hello, red";rabbitTemplate.convertAndSend(exchangeName, "red", message);}

Routing Key 为 red时两个队列都能收到消息

在这里插入图片描述

修改 Routing Key 为 blue

    @Testpublic void SendDirectExchange() {// 交换机名称String exchangeName = "test.direct";// 消息String message = "hello, blue";rabbitTemplate.convertAndSend(exchangeName, "blue", message);}

只有 direct.queue2 能收到消息

在这里插入图片描述

3.3 Topic Exchange(主题交换机)

Topic ExchangeDirect Exchange 类似区别在于使用直连交换机时,消息的路由键(Routing Key)需要与队列绑定时指定的路由键完全匹配。使用主题交换机时,消息的路由键可以使用通配符进行模式匹配,支持更灵活的消息路由规则。

路由键中可以使用 *(匹配一个单词)和 #(匹配零个或多个单词)通配符。

适用于需要根据一定的模式匹配将消息路由到不同队列的场景,可以处理更复杂的消息路由需求。

3.3.1 创建交换机与队列 绑定 Routing Key

创建队列

在这里插入图片描述

创建交换机

在这里插入图片描述

绑定 Routing Key

在这里插入图片描述

3.3.2 消息接收

SpringRabbitListener中添加两个方法分别监听 topic.queue1 和 topic.queue2

    @RabbitListener(queues = "topic.queue1")public void listenTopicQueue1Message(String msg) {System.out.println("spring 消费者1接收 topic.queue1 消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void  listenTopicQueue2Message(String msg) {System.out.println("spring 消费者2接收 topic.queue2 消息:【" + msg + "】");}
3.3.3 消息发送

SpringAmqpTest 中添加消息发送方法

    @Testpublic void SendTopicExchange() {// 交换机名称String exchangeName = "test.topic";// 消息String message = "邮件通知";rabbitTemplate.convertAndSend(exchangeName, "mail.notices", message);message = "微信通知";rabbitTemplate.convertAndSend(exchangeName, "wechat.notices", message);message = "今日新闻";rabbitTemplate.convertAndSend(exchangeName, "today.news", message);}

在这里插入图片描述

4. 声明队列和交换机

在 Spring AMQP 中,声明队列和交换机是连接到 RabbitMQ 之前的重要步骤。这些声明定义了你的消息传递系统的基础架构,包括队列和交换机的名称、类型以及与其相关的其他属性。

之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

Spring AMQP 提供了用来声明队列、交换机、及其绑定关系的的类:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBulider构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

4.1 声明队列

创建 一个配置类 FanoutConfiguration

@Configuration
public class FanoutConfiguration {
}

SpringAMQP 提供了 Queue类用来创建队列

在这里插入图片描述

在配置类中添加方法

    @Beanpublic Queue fanoutQueue3(){// durable() 持久化队列QueueBuilder durable = QueueBuilder.durable("fanout.queue3");return durable.build();}

4.2 声明交换机

SpringAMQP提供了一个Exchange接口,来表示所有不同类型的交换机:

在这里插入图片描述

在配置类中添加方法

    @Beanpublic FanoutExchange fanoutExchange(){ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange("test.fanout2");return exchangeBuilder.build();}

4.3 声明绑定关系

SpringAMQP 提供了 Binding 类 来绑定队列于交换机

在这里插入图片描述

在配置类中添加方法

    @Beanpublic Binding fanoutBinding3(FanoutExchange fanoutExchange,Queue fanoutQueue3){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}

4.4 测试

启动服务,查看控制台可以看到队列、交换机、和绑定关系都成功创建。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4.5 基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,每添加一个队列、交换机、绑定关系都要写一个@Bean方法。Spring还提供了基于注解方式来声明。

使用注解方式声明 Fanout交换机与队列

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "fanout.queue3", durable = "true"),exchange = @Exchange(value = "test.fanout2", type = "fanout")))public void listenFanoutQueue3Message(String msg) {System.out.println("消费者 接收 fanout.queue3 消息:【" + msg + "】");}

使用注解方式声明Direct交换机与队列

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.queue1", durable = "true"),exchange = @Exchange(name = "test.direct", type = "direct"),key = {"red","blue"}          ))public void listenDirectQueue1Message(String msg) {System.out.println("spring 消费者1接收 direct.queue1 消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.queue2", durable = "true"),exchange = @Exchange(name = "test.direct", type = "direct"),key = {"red","yellow"}))public void listenDirectQueue2Message(String msg) {System.out.println("spring 消费者2接收 direct.queue2 消息:【" + msg + "】");}

使用注解方式声明Topic交换机与队列

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.queue1", durable = "true"),exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC),key = "#.notices"))public void listenTopicQueue1Message(String msg) {System.out.println("spring 消费者1接收 topic.queue1 消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.queue2", durable = "true"),//  type = ExchangeTypes.TOPIC 或者 "topic" 默认 "direct"exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2Message(String msg) {System.out.println("spring 消费者2接收 topic.queue2 消息:【" + msg + "】");}

可以看到使用@RabbitListener注解的方式比 @Bean 方式简单很多

5. 消息转换器

5.1 测试默认消息转换器

发送一个map集合

    @Testpublic void testSendMapQueue1() {// 队列名称String queueName = "test.queue1";// 消息Map<String,String> map = new HashMap<>();map.put("name","zs");// 发送消息rabbitTemplate.convertAndSend(queueName, map);}

在控制台查看消息

在这里插入图片描述

可以看到默认使用的序列化方式是JDK序列化,众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

使用我们需要使用可读性更高更轻量级的序列化方式:JSON

5.2 配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

再测试一次在控制台中查看

![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-

5.3 消息接收

    @RabbitListener(queues = "test.queue1")public void listenTestQueueMessage(Map<String, String> msg) {System.out.println("消费者接收到test.queue1消息:【" + msg + "】");}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/207260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Validate 验证规则详解

前言: 以前小编发过一篇Validate 验证规则 如何使用的&#xff0c;没有去将Validate 验证规则的原理应用场景&#xff0c;这篇文章来完善一下。 不知道如何使用的朋友可以点击下面传送门 传送门 讲解: Validate 验证规则通常指的是在 Web 开发中&#xff0c;使用验证器&…

【开源】基于Vue.js的智慧社区业务综合平台

文末获取源码&#xff0c;项目编号&#xff1a; S 077 。 \color{red}{文末获取源码&#xff0c;项目编号&#xff1a;S077。} 文末获取源码&#xff0c;项目编号&#xff1a;S077。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 业务类型模块2.2 基础业务模块2.3 预…

精选Axure原型设计模板,RP原型组件库(PC端移动端元件库及Axure函数及运算符说明)

好的原型组件会大大的提高产品经理的工作效率&#xff0c;现精选了一批Axure 8的原型设计模板&#xff0c;包含了原型设计的常用元素和AxureRP 8函数及运算符的说明文档&#xff0c;及各种设备模板框架。 分享给大家可以共同学习&#xff0c;文末可下载完整原型组件包~&#x…

苹果手机ios系统安装了一个免签应用书签webclip描述文件该如何卸载?

随着移动应用的普及&#xff0c;越来越多的用户开始关注到苹果免签的应用。相比于需要通过 App Store 审核和签名的应用&#xff0c;免签应用无需经过苹果的审核过程&#xff0c;可以直接安装和使用。那么&#xff0c;苹果免签应用是如何制作的呢&#xff1f;本文将介绍制作苹果…

SQL进阶 | CASE表达式

本文所有案例基于《SQL进阶教程》实现。 概述 SQL中的CASE表达式是一种通用的条件表达式&#xff0c;类似于其他语言中的if/else语句。它用于在SQL语句中实现条件逻辑。CASE表达式以WHEN子句开始&#xff0c;后面跟着一个或多个WHEN条件&#xff0c;每个WHEN条件后面跟着一个TH…

C++相关闲碎记录(3)

1、reference wrapper 例如声明如下的模板&#xff1a; template <typename T> void foo(T val); 如果调用使用&#xff1a; int x; foo(std::ref(x)); T变成int&&#xff0c;而使用调用 int x; foo(std::cref(x)); T变成const int&。 这个特性被C标准库用…

fijkplayer flutter 直播流播放

fijkplayer flutter 直播流播放 fijkplayer 是 ijkplayer 的 Flutter 封装&#xff0c; 是一款支持 android 和 iOS 的 Flutter 媒体播放器插件&#xff0c; 由 ijkplayer 底层驱动。 通过纹理&#xff08;Texture&#xff09;接入播放器视频渲染到 Flutter 中。 前言 目前使用…

PostgreSQL 技术内幕(十二) CloudberryDB 并行化查询之路

随着数据驱动的应用日益增多&#xff0c;数据查询和分析的量级和时效性要求也在不断提升&#xff0c;对数据库的查询性能提出了更高的要求。为了满足这一需求&#xff0c;数据库引擎不断经历创新&#xff0c;其中并行执行引擎是性能提升的重要手段之一&#xff0c;逐渐成为数据…

One-to-Few Label Assignment for End-to-End Dense Detection阅读笔记

One-to-Few Label Assignment for End-to-End Dense Detection阅读笔记 Abstract 一对一&#xff08;o2o&#xff09;标签分配对基于变换器的端到端检测起着关键作用&#xff0c;最近已经被引入到全卷积检测器中&#xff0c;用于端到端密集检测。然而&#xff0c;o2o可能因为…

elasticsearch 内网下如何以离线的方式上传任意的huggingFace上的NLP模型(国内避坑指南)

es自2020年的8.x版本以来&#xff0c;就提供了机器学习的能力。我们可以使用es官方提供的工具eland&#xff0c;将hugging face上的NLP模型&#xff0c;上传到es集群中。利用es的机器学习模块&#xff0c;来运维部署管理模型。配合es的管道处理&#xff0c;来更加便捷的处理数据…

吴恩达《机器学习》12-1:优化目标

在机器学习的旅程中&#xff0c;我们已经接触了多种学习算法。在监督学习中&#xff0c;选择使用算法 A 还是算法 B 的重要性逐渐减弱&#xff0c;而更关键的是如何在应用这些算法时优化目标。这包括设计特征、选择正则化参数等因素&#xff0c;这些在不同水平的实践者之间可能…

UG NX二次开发(C#)-求曲线在某一点处的法矢和切矢

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1、前言2、在UG NX中创建一个曲线3、直接放代码4、测试案例1、前言 最近确实有点忙了,好久没更新博客了。今天恰好有时间,就更新下,还请家人们见谅。 今天我们讲一下如何获取一条曲线上某一条曲…

注意力机制的快速学习

注意力机制的快速学习 注意力机制 将焦点聚焦在比较重要的事物上 我&#xff08;查询对象Q&#xff09;&#xff0c;这张图&#xff08;被查询对象V&#xff09; 我看一张图&#xff0c;第一眼&#xff0c;就会判断那些东西对我而言比较重要&#xff0c;那些对于我不重要&…

Pytorch从零开始实战12

Pytorch从零开始实战——DenseNet算法实战 本系列来源于365天深度学习训练营 原作者K同学 文章目录 Pytorch从零开始实战——DenseNet算法实战环境准备数据集模型选择开始训练可视化总结 环境准备 本文基于Jupyter notebook&#xff0c;使用Python3.8&#xff0c;Pytorch2.…

DevEco Studio 运行项目有时会自动出现.js和.map文件

运行的时候报错了&#xff0c;发现多了.js和.map&#xff0c;而且还不是一个&#xff0c;很多个。 通过查询&#xff0c;好像是之前已知问题了&#xff0c;给的建议是手动删除(一个一个删)&#xff0c;而且有的评论还说&#xff0c;一周出现了3次&#xff0c;太可怕了。 搜的过…

【网络编程】-- 02 端口、通信协议

网络编程 3 端口 端口表示计算机上的一个程序的进程 不同的进程有不同的端口号&#xff01;用来区分不同的软件进程 被规定总共0~65535 TCP,UDP&#xff1a;65535 * 2 在同一协议下&#xff0c;端口号不可以冲突占用 端口分类&#xff1a; 公有端口&#xff1a;0~1023 HT…

亚信安慧AntDB数据库中级培训ACP上线,中国移动总部首批客户认证通过

近日&#xff0c;亚信安慧AntDB数据库ACP&#xff08;AntDB Certified Professional&#xff09;中级培训课程于官网上线。在中国移动总部客户运维团队、现场项目部伙伴和AntDB数据库成员的协同组织下&#xff0c;首批中级认证学员顺利完成相关课程的培训&#xff0c;并获得Ant…

自然语言处理22-基于本地知识库的快速问答系统,利用大模型的中文训练集为知识库

大家好,我是微学AI,今天给大家介绍一下自然语言处理22-基于本地知识库的快速问答系统,利用大模型的中文训练集为知识库。我们的快速问答系统是基于本地知识库和大模型的最新技术,它利用了经过训练的中文大模型,该模型使用了包括alpaca_gpt4_data的开源数据集。 一、本地…

C //例10.3 从键盘读入若干个字符串,对它们按字母大小的顺序排序,然后把排好序的字符串送到磁盘文件中保存。

C程序设计 &#xff08;第四版&#xff09; 谭浩强 例10.3 例10.3 从键盘读入若干个字符串&#xff0c;对它们按字母大小的顺序排序&#xff0c;然后把排好序的字符串送到磁盘文件中保存。 IDE工具&#xff1a;VS2010 Note: 使用不同的IDE工具可能有部分差异。 代码块 方法…