MQ - RabbitMQ、SpringAMQP --学习笔记

什么是MQ?

MQ 是消息队列(Message Queue)的缩写,它是一种应用程序间异步通信的技术。消息队列允许应用程序或服务间通过发送消息来交换数据,而不是直接调用对方,从而实现解耦、异步处理和负载均衡等目的。

简单来说,消息队列就像是一个邮局,应用程序就像是寄信人和收信人。一个应用程序(寄信人)发送消息(信件)到消息队列(邮局),另一个应用程序(收信人)从队列中取出消息进行处理。这个过程可以是同步的,也可以是完全异步的,意味着发送者和接收者不必同时在线,他们通过消息队列中转消息。

消息队列的主要优势包括:

  1. 解耦:发送者和接收者只依赖于消息队列,而不是直接依赖于对方,降低系统间的耦合度。
  2. 异步处理:发送者可以快速发送消息并继续处理其他任务,无需等待接收者的响应。
  3. 负载均衡:通过调整消费者的数量来处理不同的负载。
  4. 缓冲:在高负载情况下,消息队列可以作为缓冲,防止系统因请求过多而崩溃。
  5. 可扩展性:系统组件可以独立地扩展,提高系统整体的扩展性和弹性。

消息队列广泛应用于微服务架构、分布式系统、大数据处理流程等领域,常见的消息队列实现包括 Kafka、RabbitMQ、ActiveMQ 等。

RabbitMQ

一、安装

以下是基于docker的安装步骤

使用下面的命令即可:

docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network networkname # 网络名\-d \rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://虚拟机ip:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在命令中已经指定了。

 二、基本构造

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理,可通过控制台的Queue选项卡进行管理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列,可通过控制台的Exchange选项卡进行管理

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,通过控制台的Admin选项卡进行管理

SpringAMQP

一、概述

我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

二、使用

1、引入依赖

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

 2、修改配置

spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码

3、发送消息示例:

    private RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}

4、接收消息示例:


@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

WorkQueues:任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

5、交换机类型

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

 Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:

  • 创建一个名为 hmall.fanout的交换机,类型是Fanout

  • 创建两个队列fanout.queue1fanout.queue2,绑定到交换机hmall.fanout

  • consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

  • 在publisher中编写测试方法,向test.fanout发送消息

说白了,就是发送给Fanout交换机的消息,Fanout交换机会将消息转发给所有绑定它的队列(广播)

发送示例:

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "test.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

接收示例:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

  1. 声明一个名为hmall.direct的交换机

  2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred

  3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred

  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  5. 在publisher中编写测试方法,向hmall.direct发送消息

说白了,就是在绑定队列与交换机时添加了一个RoutingKey(路由key),相当于令牌, 利用这个RoutingKey做身份验证,将消息发送给需要的消费者

发送示例:

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "test.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

接收示例:

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

图示:

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • japan.news 则代表日本新闻

  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • japan.news

 至于收发消息,与DIrect交换机一致

声明队列和交换机

可以在控制台通过图形化界面手动声明(很简单不赘述)

也可以直接在程序中声明,SpringAMQP提供了一个Queue类,用来创建队列:

示例:

    /*** 声明交换机* @return Direct类型交换机    */@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();    //FanoutExchange   topicExchange}/*** 声明队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}

用注解声明:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

配置JSON转换器

默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

首先在publisherconsumer两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36926.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

零成本打造精品宣传册

​随着互联网的发展&#xff0c;企业和个人对宣传册的需求日益增长&#xff0c;然而&#xff0c;高质量的宣传册制作往往需要不菲的成本。那么&#xff0c;如何零成本打造精品宣传册呢&#xff1f; 一、明确定位和目标群体 在制作宣传册之前&#xff0c;首先要明确其定位和目标…

qt pro文件常用配置

概述 记录一下常用的项目pro文件的一些常用配置 常用配置 QT core gui concurrent#添加concurrent并行处理模块 CONFIG windeployqt#打包部署&#xff0c;项目->构建步骤->Make参数 添加windeployqt&#xff0c;编译自动打包greaterThan(QT_MAJOR_VERSION, 4):…

Kafka入门到精通(三)-Kafka

Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台&#xff0c;由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。 这种动作&#xff08;网页浏览&#xff0c;搜索和其他用户的行动&#xf…

JeecgBoot新建模块

引言 jeecg-boot设置了demo, system等默认模块。在二次开发中&#xff0c;常常需要进行模块扩展。比如新增一个订单模块或支付模块。如何准确的新增模块&#xff0c;在此文进行记录。 步骤 新建模块 在项目点击右键&#xff0c;新建模块。 如下图。 注意&#xff1a;报名需…

鸿蒙NEXT开发知识:工具常用命令—ohpm config

设置ohpm用户级配置项。 命令格式 ohpm config set <key> <value> ohpm config get <key> ohpm config delete <key> ohpm config list 说明 配置文件中信息以键值对<key> <value>形式存在。 功能描述 ohpm 从命令行和 .ohpmrc 文件中…

Linux命令----wc,uniq,sort的用法

1.wc的用法&#xff1a;wc 命令用于计算文件中的行数、单词数和字节数。 常用选项 -l&#xff1a;只显示行数-w&#xff1a;只显示单词数-c&#xff1a;只显示字节数-m&#xff1a;只显示字符数&#xff08;与 -c 类似&#xff0c;但处理多字节字符&#xff09;-L&#xff1a…

day22--77. 组合+216.组合总和III+17.电话号码的字母组合

一、77. 组合 题目链接&#xff1a;https://leetcode.cn/problems/combinations/ 文章讲解&#xff1a;https://programmercarl.com/0077.%E7%BB%84%E5%90%88.html 视频讲解&#xff1a;https://www.bilibili.com/video/BV1ti4y1L7cv 1.1 初见思路 组合问题用回溯学会使用剪…

SpringBoot:SpringBoot中调用失败如何重试

一、引言 在实际的应用中&#xff0c;我们经常需要调用第三方API来获取数据或执行某些操作。然而&#xff0c;由于网络不稳定、第三方服务异常等原因&#xff0c;API调用可能会失败。为了提高系统的稳定性和可靠性&#xff0c;我们通常会考虑实现重试机制。 Spring Retry为Spri…

基于uni-app与图鸟UI的移动应用模板构建研究

摘要 随着移动互联网技术的迅猛发展&#xff0c;移动端应用已成为企业展示形象、提供服务的重要窗口。本文基于uni-app框架和图鸟UI设计&#xff0c;深入探讨了如何高效构建覆盖多个领域的移动端应用模板。通过对商城、办公、投票、生活服务等多种类型模板的详细介绍&#xff…

Educational Codeforces Round 112 (Rated for Div. 2) C. Coin Rows(构造 + 贪心 + 前缀和)

可以知道爱丽丝的路径是拐两次弯的折线 那么我们知道鲍勃能够选择的位置只有两段黄线中的一段 所以可以求出来第二行的后缀和&#xff0c;然后求出来第一行的前缀行&#xff0c;这样鲍勃在爱丽丝分割之后的情况下就会选择这两者中最大的一段&#xff0c;然而爱丽丝也会阻碍鲍…

Open AI Stream Completion Set Variable Inside Function PHP With Openai-php SDK

题意&#xff1a;使用 OpenAI 的 PHP SDK&#xff08;例如 openai-php&#xff09;来在函数内部设置和完成一个流&#xff08;stream&#xff09;相关的变量 问题背景&#xff1a; How to set variable inside this openai-php sdk function in stream completion ? I am usi…

华为HCIA综合实验(结合前几期所有内容)

第一章 实验目的 &#xff08;1&#xff09;配置Telnet&#xff0c;要求所有网络设备支持远程管理&#xff0c;密码为admin&#xff08;2&#xff09;配置Trunk&#xff0c;交换机之间的链路均为Trunk模式&#xff08;3&#xff09;配置VLAN&#xff0c;在SW2和SW3上创建相关…

Qt6.6编译Qt二维图形编辑器QVGE源码

QVGE是一个开源的多平台QtC编写的图形编辑器&#xff0c;可以用来画网络节点图&#xff0c;或者其他作用。 QVGE可以轻松创建和参数设定的小型到中型图形(1000节点/边缘)&#xff0c;共同的视觉特性的节点和边缘&#xff1a;形状、尺寸、颜色、标签等。定义(用户定义)属性的图表…

深度学习Week18——学习残差网络和ResNet-50算法

文章目录 深度学习Week18——学习残差网络和ResNet-50算法 一、前言 二、我的环境 三、前期工作 1、配置环境 2、导入数据 2.1 加载数据 2.2 配置数据集 2.3 数据可视化 2.4 再次检查数据 四、构建ResNet-50网络模型 五、编译模型 六、训练模型 七、模型评估 八、指定图片预测 …

leetCode.92. 反转链表 II

leetCode.92. 反转链表 II 题目思路 代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode …

【Python数据分析与可视化】:使用【Matplotlib】实现销售数据的全面分析 ——【Matplotlib】数模学习

目录 安装Matplotlib 1.打开PyCharm&#xff1a; 2.打开终端&#xff1a; 3.安装Matplotlib&#xff1a; 4.确认安装&#xff1a; 导入Matplotlib 创建简单的折线图 代码解析&#xff1a; 创建子图 代码解析&#xff1a; 创建柱状图 代码解析&#xff1a; 创建散点…

Vite响应Ajax请求

Vite响应Ajax请求 陈拓 2024/06/20-2024/06/24 1. 概述 http-server、live-server 等常用于本地测试和开发的http服务器不能很好的支持 ES 模块&#xff0c;在测试ES 模块时浏览器控制台经常显示错误&#xff1a; Failed to load module script: Expected a JavaScript modu…

esp8266 GPIO

功能综述 ESP8266 的 16 个通⽤ IO 的管脚位置和名称如下表所示。 管脚功能选择 功能选择寄存器 PERIPHS_IO_MUX_MTDI_U&#xff08;不同的 GPIO&#xff0c;该寄存器不同&#xff09; PIN_FUNC_SELECT(PERIPHS_IO_MUX_MTDI_U,FUNC_GPIO12);PERIPHS_IO_MUX_为前缀。后面的…

基于SpringBoot+Vue的大药房管理系统(带1w+文档)

基于SpringBootVue的大药房管理系统(带1w文档) 本系统主要包括管理员和用户两个用户角色&#xff1b;主要包括&#xff1a;首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;保健品分类管理&#xff0c;药品分类管理&#xff0c;药品信息管理&#xff0c;疫情常识管理…

Flink入门实战详解

Flink入门实战 Flink项目构建 1)基于MavenIdea创建项目&#xff1a; 使用maven进行项目构建&#xff0c;如图1所示。 图-34 构建maven项目 输入项目中的maven的坐标和存储坐标&#xff0c;如图2所示。 图2 maven坐标和存储位置 2)Maven依赖&#xff1a; <properties>…