【RabbitMQ】快速入门及基本使用

一、引言

1、、消息队列

Ⅰ、什么是消息队列?

        消息队列是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数。也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

Ⅱ、Message queue 释义

        服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信) 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)

Ⅲ、消息队列特点

  1. 异步性:消息队列允许发送者发送消息后继续其它任务,而不需要等待接收者的响应。这种异步性能够提高系统的吞吐量和响应速度。
  2. 解耦性:消息队列将发送者和接收者解耦,使它们不需要知道彼此的存在。发送者只需将消息发送到队列中,而不需要关心谁将接收该消息。接收者通过订阅队列来获取消息,而不需要关心消息的发送者。这种解耦性能够提高系统的可扩展性和可维护性。
  3. 可靠性:消息队列通常具有持久化机制,可以确保消息的可靠性传输。即使在发送者或接收者出现故障的情况下,消息也不会丢失。此外,消息队列还具有消息重试和消息确认等机制,确保消息能够被正确处理。
  4. 消息类型和格式:消息队列中的消息是有类型的,并且具有格式。这使得消息可以被按类型读取,并遵循一定的格式。
  5. 多进程支持:消息队列允许一个或多个进程向它写入或者读取消息。
  6. 数据持久性:当从消息队列中读出消息后,消息队列中对应的数据都会被删除,确保数据不会重复消费。
  7. 分布式特性:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性。

Ⅳ、好处

  1. 高吞吐量:由于消息的传输速度比普通的文件快,所以能够实现高吞吐量。
  2. 支持异步操作:一个线程在处理完自己的任务之后,可以把结果发送到另一个线程。
  3. 支持并发操作:多个线程可以同时处理消息队列中的消息。
  4. 支持分布式系统:由于消息队列的解耦特性,分布式系统中的组件可以独立扩展。
  5. 提供数据持久化机制:消息队列可以将数据进行持久化,确保数据不会因为处理过程中的失败而丢失。
  6. 提供错误恢复机制:当系统的一部分组件失效时,消息队列可以保证即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  7. 提高响应速度:通过异步处理,消息队列可以显著提高系统的响应速度。
  8. 解耦:在项目启动之初来预测将来项目会碰到什么需求是极其困难的。使用消息队列可以在处理过程中间插入一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  9. 扩展性:由于消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
  10. 灵活性:使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。
  11. 可恢复性:当体系的一部分组件失效时,不会影响到整个系统。
  12. 送达保证:消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。
  13. 排序保证:在许多情况下,数据处理的顺序都很重要。消息队列能保证数据会按照特定的顺序来处理。
  14. 缓冲:在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行——写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
  15. 理解数据流:在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
  16. 异步通信:很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

2、消息队列相关

Ⅰ、AMQP

        一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.

技术选型

3、什么是RabbitMQ

        RabbitMQ是一个开源的消息队列系统,使用Erlang语言开发,基于AMQP(高级消息队列协议)实现。它最初起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ的主要特性包括易用性、扩展性、高可用性以及可靠性。消息队列(MQ)是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。在事件驱动(发布-订阅)架构中,RabbitMQ扮演着Broker的角色。

  • Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
  • Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
  • Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
  • ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
  • Message Queue:消息队列,用于存储还未被消费者消费的消息.
  • Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内 容.
  • BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.

二、快速入门

1、Docker安装部署RabbitMQ

【注意】获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面

docker pull rabbitmq:management

--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)

-e:指定环境变量:
       RABBITMQ_DEFAULT_VHOST:默认虚拟机名
        RABBITMQ_DEFAULT_USER:默认的用户名
        RABBITMQ_DEFAULT_PASS:默认用户名的密码

docker run -d \
--name 容器名\
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management

容器启动后,查看容器日志

docker logs 容器名

开启防火墙

systemctl start firewalld

开放端口

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent

更新防火墙规则

firewall-cmd --reload

Ⅰ、配置用户

进入管理后台

密码:admin

账号:admin

http://ip:15672

创建用户

点进用户进行分配

2、springboot连接配置

Ⅰ、新建项目

 新建一个空项目,里面新建两个spring boot模块,并且导入依赖

接收者(publisher)消费者(consumer)

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

Ⅱ、配置yml

spring:rabbitmq:host: 0.0.0.0 #虚拟机开启的IP地址username: spring #创建的用户password: 123456 #用户的密码port: 5672virtual-host: my_vhost 

Ⅲ、案例一

接收一个string类型

生产者

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@SuppressWarnings("all")
public class RabbitConfig {@Beanpublic Queue firstQueue() {// 创建一个名为firstQueue的队列return new Queue("firstQueue");}
}

使用Controller 

@RestController
@SuppressWarnings("all")
public class SenderController {// 自动装配rabbitTemplate@Autowiredprivate AmqpTemplate rabbitTemplate;// 发送消息到firstQueue队列@RequestMapping("/sendFirst")public String sendFirst() {// 将消息转换并发送到firstQueue队列rabbitTemplate.convertAndSend("firstQueue", "Hello World");return "🐉";}
}

访问http://localhost:8888/sendFirst

可以看到消息队列中有了一个

消费者

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue")
public class Receiver {@RabbitHandlerpublic void process(String msg) {log.warn("接收到:" + msg);}
}

运行之后可以看到我们发送的消息

Ⅳ、案例二

我们接受一个实体类

生产者(publisher)

新建一个实体类User 实现接口Serializable

@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private String username;
private String userpwd;
}

RabbitConfig 类里面添加一个队列

    @Beanpublic Queue secondQueue() {// 创建一个名为 secondQueue 的队列return new Queue("secondQueue");}

完整代码 RabbitConfig 

@Configuration
@SuppressWarnings("all")
public class RabbitConfig {@Beanpublic Queue firstQueue() {// 创建一个名为firstQueue的队列return new Queue("firstQueue");}@Beanpublic Queue secondQueue() {// 创建一个名为 secondQueue 的队列return new Queue("secondQueue");}
}

在Controller 层添加方法

        // 自动装配ObjectMapper@Autowiredprivate ObjectMapper objectMapper;
// 发送消息到secondQueue队列@RequestMapping("/send2")public String send2() throws JsonProcessingException {// 创建一个User对象User wfzldr = new User("wfzldr", "1234567890");// 将User对象转换为json字符串String json = objectMapper.writeValueAsString(wfzldr);// 将消息转换并发送到firstQueue队列// 将消息转换并发送到secondQueue队列rabbitTemplate.convertAndSend("secondQueue", json);return "🐉";}

完整代码SenderController

@RestController
@SuppressWarnings("all")
public class SenderController {// 自动装配rabbitTemplate@Autowiredprivate AmqpTemplate rabbitTemplate;// 自动装配ObjectMapper@Autowiredprivate ObjectMapper objectMapper;// 发送消息到firstQueue队列@RequestMapping("/sendFirst")public String sendFirst() {// 将消息转换并发送到firstQueue队列rabbitTemplate.convertAndSend("firstQueue", "Hello World");return "🐉";}// 发送消息到secondQueue队列@RequestMapping("/send2")public String send2() throws JsonProcessingException {// 创建一个User对象User wfzldr = new User("wfzldr", "1234567890");// 将User对象转换为json字符串String json = objectMapper.writeValueAsString(wfzldr);// 将消息转换并发送到firstQueue队列// 将消息转换并发送到secondQueue队列rabbitTemplate.convertAndSend("secondQueue", json);return "🐉";}
}

消费者(consumer)

新建一个接受实体的Receiver

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "secondQueue")
public class EntityReceiver {@Autowiredprivate ObjectMapper objectMapper;@RabbitHandlerpublic void process(String json) throws JsonProcessingException {User user = objectMapper.readValue(json, User.class);log.warn("接收到:" + user);}
}

我破门也需要在消费者里添加实体

@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private String username;
private String userpwd;
}

运行两个方法

 

3、创建公共模块

在项目里面新建一个publi的公共模块,在里面写公共的实体类

在公共模块的pom.xml文件里面把打包方式war改成 jar 

在对应的消费者或者生产者里面引入对应的模块即可

        <!--        引入公共模块--><dependency><groupId>org.example</groupId><artifactId>public</artifactId><version>1.0-SNAPSHOT</version></dependency>

我的分享就到这里,欢迎大家在评论区留言!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/642550.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PySimpleGUI:让spin支持循环

需求 自己用PySimpleGUI写了个小工具&#xff0c;但是发现它的spin不支持循环。 Tkinter本身的Spinbox有wrap这个开关可以觉得是否支持循环&#xff0c;但是没看到PySimpleGUI也支持这个特性。 代码实现 所谓spin的循环&#xff0c;是指当值变换到最大最小值时&#xff0c;可…

移动开发行业——鸿蒙OS NEXT开出繁花

1月18日&#xff0c;华为宣布HarmonyOS NEXT开发者预览版开放申请&#xff0c;根据官方注解&#xff0c;这个版本的鸿蒙系统有个更通俗易懂的名字——“星河版”&#xff0c;也被称为“纯血”鸿蒙。 根据官方解释&#xff0c;之所以取名星河版&#xff0c;寓意鸿蒙OS NEXT就像…

Screen 简介

目录 1. screen 简介2. screen 基本命令 1. screen 简介 screen 是一个在 Unix 和类 Unix 系统上的 终端复用 工具。它允许用户在单个终端窗口中运行多个终端会话&#xff0c;并提供了一些其他功能&#xff0c;如会话断开后的恢复和远程连接的分离。以下是 screen 的一些主要特…

【C语言】扫雷游戏完整代码实现

目录 1.game.h 2.game.c 3.progress.c 4.运行结果 1.game.h #define _CRT_SECURE_NO_WARNINGS#include <string.h> #include <stdio.h> #include <time.h> #include<stdlib.h>#define ROW 9 #define COL 9 #define ROWS 11 #define COLS 11 #defin…

SpringBoot 中配置处理

1、简介 本文介绍一些 springboot 配置管理相关的内容&#xff0c;如何自定义配置&#xff0c;导入配置&#xff0c;配置绑定和基于 profiles 的配置。 2、springboot 配置管理 2.1、自定义配置类 在 spring 中使用 Configuration 注解定义配置类&#xff0c;在 springboot 中…

进程线程知识

一 初识linux线程 1 线程由来 我们之前说创建一个进程&#xff0c;要创建进程控制块pcb&#xff0c;进程地址空间&#xff0c;页表&#xff0c;而且我之前的博客中都有意无意的说明这个pcb是描述进程的&#xff0c;是os用来管理进程的&#xff0c;而有了线程后&#xff0c;就要…

Python基础第八篇(Python异常处理,模块与包)

文章目录 一、了解异常二、捕获异常&#xff08;1&#xff09;.异常案例代码&#xff08;2&#xff09;.读出结果 三、异常的传递&#xff08;1&#xff09;.异常传递案例代码&#xff08;2&#xff09;.读出结果 四、Python模块&#xff08;1&#xff09;.模块的导入&#xff…

大模型+自动驾驶

论文&#xff1a;https://arxiv.org/pdf/2401.08045.pdf 大型基础模型的兴起&#xff0c;它们基于广泛的数据集进行训练&#xff0c;正在彻底改变人工智能领域的面貌。例如SAM、DALL-E2和GPT-4这样的模型通过提取复杂的模式&#xff0c;并在不同任务中有效地执行&#xff0c;从…

k8s使用ingress实现应用的灰度发布升级

v1是1.14.0版本nginx ,实操时候升级到v2是1.20.0版本nginx&#xff0c;来测试灰度发布实现过程 一、方案&#xff1a;使用ingress实现应用的灰度发布 1、服务端&#xff1a;正常版本v1&#xff0c;灰度升级版本v2 2、客户端&#xff1a;带有请求头versionv2标识的请求访问版…

【MySQL】一文总结MVCC多版本并发控制

目录 MVCC 介绍当前读和快照读当前读快照读 MVCC 原理解析隐式字段Undo Log版本链Read ViewRead View 可见性原则 RC 和 RR 下的 Read ViewRC 下的 Read ViewRR 下的 Read View小结RR 级别下能否防止幻读总结 MVCC 介绍 在当今高度并发的数据库环境中&#xff0c;有效的并发控…

超级管理员权限绕过windows登录windows命令

文章目录 一、设置超级管理员权限二、绕过windows登录界面三、windows命令 一、设置超级管理员权限 查看本机的用户列表&#xff1a;net user激活超级用户&#xff1a;net user administrator /active:yes为超级用户设置密码&#xff1a;net user administrator 123(password)…

Redis 高可用之主从复制

1、简介 在 Redis 中&#xff0c;主从复制就是多个节点进行数据同步&#xff0c;在这些节点中&#xff0c;有 Master 和 slave 两个角色&#xff0c;Master 以写为主&#xff0c;slave 以读为主&#xff0c;当 Master 数据变化的时候&#xff0c;会自动将新的数据同步到其他的 …

StableDiffusion新版汉化

新旧版不同&#xff0c;这里以新版为例&#xff0c;用的是带链接&#xff0c;可以更新的方法。 步骤&#xff1a; 1.找到这个位置&#xff0c;依次点击&#xff0c;注意选项。 2.点击加载&#xff0c;等待刷新。 ctrlF搜索 zh_CN Localization 右边点击install&#xff0c…

外包干了4个月,技术退步明显.......

先说一下自己的情况&#xff0c;大专生&#xff0c;18年通过校招进入武汉某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落! 而我已经在一个企业干了四年的功能测…

GAMES101-Assignment8

一、总览 1.1 连接绳子的约束 在rope.cpp 中, 实现Rope 类的构造函数。这个构造函数应该可以创建一个 新的绳子(Rope) 对象&#xff0c;该对象从start 开始&#xff0c;end 结束&#xff0c;包含num_nodes 个节点。也就是如下图所示&#xff1a; 每个结点都有质量&#xff…

【论文阅读】Automated Runtime-Aware Scheduling for Multi-Tenant DNN Inference on GPU

该论文发布在 ICCAD’21 会议。该会议是EDA领域的顶级会议。 基本信息 AuthorHardwareProblemPerspectiveAlgorithm/StrategyImprovment/AchievementFuxun YuGPUResource under-utilization ContentionSW SchedulingOperator-level schedulingML-based scheduling auto-searc…

Rustdesk 中VP8 / VP9 / AV1 是什么?

环境&#xff1a; Rustdesk1.1.9 VP8 / VP9 / AV1 问题描述&#xff1a; VP8 / VP9 / AV1 是什么&#xff1f; 解决方案&#xff1a; 1.VP8、VP9和AV1是视频编解码器&#xff0c;用于压缩和解压缩视频数据。它们是由Google和Alliance for Open Media&#xff08;AOM&#…

011 变量

变量的定义 注意事项 作用域 类变量&#xff1a;定义在类中的变量&#xff0c;类的整个生命周期内可用实例变量&#xff1a;定义在类中的变量&#xff0c;类的实例可以使用局部变量&#xff1a;定义在方法中的变量&#xff0c;仅在方法内可以使用 class Dog{// 类变量static …

机器学习 | 深入理解并掌握核心概念

在如今数字化时代的浪潮下&#xff0c;机器学习已经成为人工智能领域的璀璨明星。它像一面魔镜&#xff0c;赋予计算机系统学习和改进的能力&#xff0c;让机器能够从海量数据中提取规律、预测未来&#xff0c;甚至做出智能决策。本 专栏 将带您踏上机器学习的奇妙之旅&#xf…

题解:CF1920E. Counting Binary Strings

题解&#xff1a;CF1920E. Counting Binary Strings 题意简述 题目链接&#xff1a;Problem - E - Codeforces。 洛谷翻译&#xff1a;Counting Binary Strings - 洛谷。 思路解析 假设我们有一个01串str&#xff08;设里面有z个“1”&#xff09;&#xff0c;我们要求它里…