SpringBoot 整合 RabbitMQ 实现流量消峰

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC 的调用等等。

以前一直使用的是 ActiveMQ,在实际的生产使用中也出现了一些小问题,在网络查阅了很多的资料后,决定尝试使用 RabbitMQ 来替换 ActiveMQ,RabbitMQ 的高可用性、高性能、灵活性等一些特点吸引了我们,查阅了一些资料整理出此文。

RabbitMQ 介绍

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

相关概念

通常我们谈到队列服务, 会有三个概念:发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

图片

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。

  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。

  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个 RabbitMQ 服务器都有一个默认的虚拟主机“/”。

  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

交换机(Exchange)

交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

  • Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routingkey, 消息的routingkey 匹配时, 才会被交换器投送到绑定的队列中去.

  • Topic:按规则转发消息(最灵活)

  • Headers:设置 header attribute 参数类型的交换机

  • Fanout:转发消息到所有绑定队列

Direct Exchange

Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

图片

第一个 X - Q1 就有一个 binding key,名字为 orange;X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗?- 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。

Topic Exchange

Topic Exchange 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

  • 路由键必须是一串字符,用句号( .) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。

  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是 agreements.eu.berlin.#,那么,以agreements.eu.berlin 开头的路由键都是可以的。

具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

topic 和 direct 类似, 只是匹配上支持了"模式", 在"点分"的 routing_key 形式中, 可以使用两个通配符:

  • *表示一个词.

  • #表示零个或多个词.

Headers Exchange

headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

Fanout Exchange

Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。

Spring Boot 集成 RabbitMQ

Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了 spring-boot-starter-amqp 项目对消息各种支持。

简单使用

1、配置 Pom 包,主要是添加 spring-boot-starter-amqp 的支持

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置文件

配置 RabbitMQ 的安装地址、端口以及账户信息

spring.application.name=Spring-boot-rabbitmqspring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

3、队列配置

@Configuration
public class RabbitConfig {@Beanpublic Queue Queue() {return new Queue("hello");}}

4、发送者

rabbitTemplate 是 Spring Boot 提供的默认实现

@component
public class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hello " + new Date();System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("hello", context);}}

4、接收者

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver  : " + hello);}}

5、测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {@Autowiredprivate HelloSender helloSender;@Testpublic void hello() throws Exception {helloSender.send();}}

注意,发送者和接收者的 queue name 必须一致,不然不能接收

多对多使用

一个发送者,N 个接收者或者 N 个发送者和 N 个接收者会出现什么情况呢?

一对多发送

对上面的代码进行了小改造,接收端注册了两个 Receiver,Receiver1 和 Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果

@Test
public void oneToMany() throws Exception {for (int i=0;i<100;i++){neoSender.send(i);}
}

结果如下:

Receiver 1: Spring boot test queue ****** 11
Receiver 2: Spring boot test queue ****** 12
Receiver 2: Spring boot test queue ****** 14
Receiver 1: Spring boot test queue ****** 13
Receiver 2: Spring boot test queue ****** 15
Receiver 1: Spring boot test queue ****** 16
Receiver 1: Spring boot test queue ****** 18
Receiver 2: Spring boot test queue ****** 17
Receiver 2: Spring boot test queue ****** 19
Receiver 1: Spring boot test queue ****** 20

根据返回结果得到以下结论

一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中

多对多发送

复制了一份发送者,加入标记,在一百个循环中相互交替发送

@Test
public void manyToMany() throws Exception {for (int i=0;i<100;i++){neoSender.send(i);neoSender2.send(i);}
}

结果如下:

Receiver 1: Spring boot test queue ****** 20
Receiver 2: Spring boot test queue ****** 20
Receiver 1: Spring boot test queue ****** 21
Receiver 2: Spring boot test queue ****** 21
Receiver 1: Spring boot test queue ****** 22
Receiver 2: Spring boot test queue ****** 22
Receiver 1: Spring boot test queue ****** 23
Receiver 2: Spring boot test queue ****** 23
Receiver 1: Spring boot test queue ****** 24
Receiver 2: Spring boot test queue ****** 24
Receiver 1: Spring boot test queue ****** 25
Receiver 2: Spring boot test queue ****** 25

结论:和一对多一样,接收端仍然会均匀接收到消息

高级使用

对象的支持

Spring Boot 以及完美的支持对象的发送和接收,不需要格外的配置。

//发送者
public void send(User user) {System.out.println("Sender object: " + user.toString());this.rabbitTemplate.convertAndSend("object", user);
}...//接收者
@RabbitHandler
public void process(User user) {System.out.println("Receiver object : " + user);
}

结果如下:

Sender object: User{name='test', pass='123456'}
Receiver object : User{name='test', pass='123456'}
Topic Exchange

topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由的绑定不同的队列

首先对 topic 规则配置,这里使用两个队列来测试

@Configuration
public class TopicRabbitConfig {final static String message = "topic.message";final static String messages = "topic.messages";@Beanpublic Queue queueMessage() {return new Queue(TopicRabbitConfig.message);}@Beanpublic Queue queueMessages() {return new Queue(TopicRabbitConfig.messages);}@BeanTopicExchange exchange() {return new TopicExchange("exchange");}@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");}
}

使用 queueMessages 同时匹配两个队列,queueMessage 只匹配 "topic.message" 队列

public void send1() {String context = "hi, i am message 1";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);}public void send2() {String context = "hi, i am messages 2";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);}

发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息

Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

Fanout 相关配置

@Configuration
public class FanoutRabbitConfig {@Beanpublic Queue AMessage() {return new Queue("fanout.A");}@Beanpublic Queue BMessage() {return new Queue("fanout.B");}@Beanpublic Queue CMessage() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {return BindingBuilder.bind(AMessage).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(BMessage).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(CMessage).to(fanoutExchange);}}

这里使用了 A、B、C 三个队列绑定到 Fanout 交换机上面,发送端的 routing_key 写任何字符都会被忽略:

public void send() {String context = "hi, fanout msg ";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}

结果如下:

Sender : hi, fanout msg 
...
fanout Receiver B: hi, fanout msg 
fanout Receiver A  : hi, fanout msg 
fanout Receiver C: hi, fanout msg 

结果说明,绑定到 fanout 交换机上面的队列都收到了消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/64013.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git-分支(branch)常用命令

分支 我们在做项目开发的时候&#xff0c;无论是软件项目还是其他机械工程项目&#xff0c;我们为了提高效率以及合理的节省时间等等原因&#xff0c;现在都不再是线性进行&#xff0c;而是将一个项目抽离出诸进行线&#xff0c;每一条线在git中我们就叫做分支&#xff0c;bran…

goenv go 环境配置

Golang环境配置 1. goenv工具 goenv 是一个用于管理 Go 语言版本的工具&#xff0c;类似于 Python 的 pyenv 或 Ruby 的 rbenv。不过需要注意的是&#xff0c;goenv 并不是一个官方维护的工具&#xff0c;而是一个社区项目。Go 语言本身已经提供了很好的版本管理工具&#xf…

Electron electron-builder.yml 配置 (自定义包名,用户自定义安装目录...)

electron-builder.yml 配置 # 唯一的应用程序标识符&#xff0c;用于操作系统级别的识别 appId: com.electron.app# 应用程序的名称&#xff0c;显示在用户界面上 productName: 我的应用# 定义构建资源目录&#xff0c;放置图标、证书等资源文件 directories:buildResources: …

springboot425滑雪场管理系统(论文+源码)_kaic

摘要 近年来&#xff0c;信息化管理行业的不断兴起&#xff0c;使得人们的日常生活越来越离不开计算机和互联网技术。首先&#xff0c;根据收集到的用户需求分析&#xff0c;对设计系统有一个初步的认识与了解&#xff0c;确定滑雪场管理系统的总体功能模块。然后&#xff0c;详…

好玩的汇编编译器NASM:一款基于x86架构的汇编与反汇编软件

好玩的汇编编译器NASM This is the project webpage for the Netwide Assembler (NASM), an asssembler for the x86 CPU architecture portable to nearly every modern platform, and with code generation for many platforms old and new. Netwide Assembler&#xff08;…

前端面试准备问题2

1.防抖和节流分别是什么&#xff0c;应用场景 防抖&#xff1a;在事件被触发后&#xff0c;只有在指定的延迟时间内没有再次触发&#xff0c;才执行事件处理函数。 在我的理解中&#xff0c;简单的说就是在一个指定的时间内&#xff0c;仅触发一次&#xff0c;如果有多次重复触…

Java 的基本数据类型与包装类的区别

Java 提供了 8 种基本数据类型&#xff1a;byte、short、int、long、float、double、char 和 boolean。它们是直接存储值的&#xff0c;不是对象。 而包装类&#xff08;如 Integer、Double 等&#xff09;是将基本数据类型封装为对象&#xff0c;提供了更多方法支持&#xff…

RabbitMQ 基本使用方法详解

RabbitMQ 基本使用方法 在你的代码中&#xff0c;涉及到了 RabbitMQ 的基本使用&#xff0c;包括队列定义、交换机的配置、消息的发送与接收等内容。下面我将详细总结 RabbitMQ 的基本使用方法&#xff0c;重点解释如何在 Spring Boot 项目中与 RabbitMQ 集成。 1. 引入依赖 …

【AI知识】有监督学习分类任务之支持向量机

1.支持向量机概念 支持向量机&#xff08;Support Vector Machine, SVM&#xff09; 是一种有监督学习算法&#xff0c;主要用于分类任务&#xff08;也可用于回归任务&#xff0c;即支持向量回归&#xff0c;SVR&#xff09;。SVM的核心思想是找到一个最优的超平面&#xff0…

杭州乘云联合信通院发布《云计算智能化可观测性能力成熟度模型》

原文地址&#xff1a;杭州乘云联合中国信通院等单位正式发布《云计算智能化可观测性能力成熟度模型》标准 2024年12月3日&#xff0c;由全球数字经济大会组委会主办、中国信通院承办的 2024全球数字经济大会 云AI计算创新发展大会&#xff08;2024 Cloud AI Compute Ignite&…

【经典】制造供应链四类策略(MTS、MTO、ATO、ETO)细说

关注作者 制造供应链的牛鞭问题与复杂问题主要是从两个方面解决&#xff0c;一是同步化供应链消减从需求到供应的放大效应&#xff0c;二是供应链细分&#xff0c;针对不同的客户、不同的需求供应的匹配策略来应对复杂性&#xff0c;更好的满足客户并以最低的总成本来实现。 对…

实时日志与发展:Elasticsearch 推出全新专用的 logsdb 索引模式

作者&#xff1a;来自 Elastic Mark Settle, George Kobar 及 Amena Siddiqi Elastic 最新发布的 logsdb 索引模式是专为日志管理优化的功能&#xff0c;旨在提升日志数据的存储效率、查询性能以及整体可用性。这个模式专注于满足现代日志处理需求&#xff0c;提供更高效的日志…

React Image Crop——在React应用中轻松实现图片裁剪功能

React Image Crop是一个用于在React应用程序中裁剪和调整图像的库。它提供了一个简单而强大的界面&#xff0c;允许用户选择和调整裁剪区域&#xff0c;并生成裁剪后的图像。 什么是React Image Crop&#xff1f; React Image Crop是一个开源的React组件&#xff0c;用于在浏览…

Vue.js 中,前端如何处理从后端返回的 Excel 文件流

目的:页面中点击按钮实现下载excel文件 步骤: 向后端发送请求: 使用 axios(或其他 HTTP 客户端)向后端发送请求以获取文件。通常,文件会以 Blob 形式返回。 处理响应数据: 将响应数据(Blob 对象)处理为下载链接,并模拟点击以触发下载。 具体步骤: 1. 安装 axios(…

【HarmonyOS】鸿蒙应用实现手机摇一摇功能

【HarmonyOS】鸿蒙应用实现手机摇一摇功能 一、前言 手机摇一摇功能&#xff0c;是通过获取手机设备&#xff0c;加速度传感器接口&#xff0c;获取其中的数值&#xff0c;进行逻辑判断实现的功能。 在鸿蒙中手机设备传感器ohos.sensor (传感器)的系统API监听有以下&#xf…

微积分复习笔记 Calculus Volume 2 - 3.7 Improper Integrals

3.7 Improper Integrals - Calculus Volume 2 | OpenStax

box 提取

box 提取 import json import os import shutilimport cv2 import numpy as np import pypinyinclass Aaa():passdef pinyin(word):s for i in pypinyin.pinyin(word, stylepypinyin.NORMAL):s .join(i)return s if __name__ __main__:selfAaa()base_dirrE:\data\dao\20241…

ViewModel

ViewMode是MVVM架构模式中VM层对应的类&#xff0c;它的作用是存储界面数据&#xff0c;并和界面发生数据交互。ViewModel能感知生命周期&#xff0c;并且在界面由于配置问题发生重建时候&#xff0c;可以保持当前的数据不变。生命周期如下&#xff1a; ViewMode由ViewModePr…

ESP32-S3模组上跑通ES8388(29)

接前一篇文章:ESP32-S3模组上跑通ES8388(28) 二、利用ESP-ADF操作ES8388 2. 详细解析 上一回解析到了es8388_init函数中的第11段也是最后一段代码,没有解析完,本回继续解析。为了便于理解和回顾,再次贴出该片段,在components\audio_hal\driver\es8388\es8388.c中,如下…

C#—索引器

C#—索引器 索引器&#xff08;Indexer&#xff09;是类中的一个特殊成员&#xff0c;它能够让对象以类似数组的形式来操作&#xff0c;使程序看起来更为直观&#xff0c;更容易编写。索引器与属性类似&#xff0c;在定义索引器时同样会用到 get 和 set 访问器&#xff0c;不同…