RabbitMQ的基本使用,进行实例案例的消息队列

目录

一、介绍

1. 概述

2. 作用

3. 工作原理

二、RabbitMQ安装部署

1. 安装

2. 部署

3. 增加用户

三、实现案例

1. 项目创建

2. 项目配置

3. 生产者代码

4. 消费者代码

四、测试

每篇一获


一、介绍

1. 概述

RabbitMQ 是一种开源的消息代理和队列服务器,用于通过简单和可扩展的方式在分布式系统中传递消息。它实现了高级消息队列协议(AMQP)。

  • 服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信)
  • 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送  给另一端,称为延迟消息通讯(异步通信)
  • 一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议
  • 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.

以下是关于RabbitMQ的一些详细信息:

  1. 消息代理:RabbitMQ是一个消息代理,它接受并转发消息。你可以把它想象成一个邮局:当你把邮件放在邮筒里时,你可以确定邮差最终会把邮件送到你的收件人。在这个比喻中,RabbitMQ是邮筒、邮局和邮差。

  2. 可靠性:RabbitMQ支持消息持久化、传递确认、发布者确认和高可用性。

  3. 灵活的路由:RabbitMQ提供了多种消息路由模式,包括点对点、发布/订阅和路由模式。

  4. 集群:多个RabbitMQ服务器可以组成一个集群,形成一个高可用、负载均衡的系统。

  5. 多协议支持:RabbitMQ支持多种消息队列协议,包括AMQP、STOMP、MQTT等。

  6. 客户端支持:RabbitMQ为多种编程语言提供了客户端库,包括Java、.NET、Python、Ruby、PHP等。

  7. 管理界面:RabbitMQ提供了一个易于使用的用户界面,用于管理和监控你的RabbitMQ服务器。

  8. 跟踪:如果你需要查看消息传递的详细信息,RabbitMQ提供了消息跟踪功能。

  9. 插件机制:RabbitMQ支持通过插件扩展其核心功能。

2. 作用

RabbitMQ的主要作用和优势如下:

  1. 解耦:在系统设计中,组件之间的高度耦合是非常不利的。RabbitMQ作为消息队列中间件,可以有效地解耦系统,使得系统组件之间不直接通信,只通过消息队列来交换信息。

  2. 异步通信:RabbitMQ提供了异步处理的能力。当一个操作需要大量时间时,可以将该操作作为一个消息发送到队列,然后立即返回。这样,用户不需要等待这个操作完成,提高了系统的响应性能。

  3. 缓冲:RabbitMQ可以在处理高负载的情况下起到缓冲的作用。当消息的产生速度超过处理速度时,RabbitMQ可以暂存这些消息,等待处理程序准备好后再进行处理。

  4. 可靠性:RabbitMQ提供了消息持久化、传递确认、发布者确认等机制,确保消息不会丢失。

  5. 路由能力:RabbitMQ提供了灵活的消息路由能力,如点对点、发布/订阅等模式,满足不同的业务需求。

  6. 扩展性:RabbitMQ支持集群,可以通过增加更多的RabbitMQ节点来提高系统的处理能力。

  7. 跨平台和语言无关:RabbitMQ提供了多种语言的客户端,如Java、.NET、Python等,可以在不同的平台和语言之间进行通信。

  8. 监控:RabbitMQ提供了管理界面,可以方便地监控和管理消息队列的状态。

RabbitMQ作为一个消息队列中间件,可以帮助我们构建高效、可靠、可扩展的分布式系统。

3. 工作原理

RabbitMQ的工作原理主要基于生产者-消费者模型和消息队列。以下是其基本的工作流程:

  1. 生产者:生产者是创建消息的应用程序。它创建消息并发送到RabbitMQ。

  2. 队列:队列是RabbitMQ的内部结构,用于存储消息。多个生产者可以发送消息到一个队列,多个消费者可以从一个队列中获取消息。

  3. 交换器:生产者发送消息到交换器(Exchange),然后交换器根据一定的规则(路由键)将消息路由到一个或多个队列。RabbitMQ提供了几种类型的交换器,如直接交换器、主题交换器、头交换器和扇出交换器。

  4. 消费者:消费者是接收消息的应用程序。消费者连接到RabbitMQ并订阅一个队列,当新的消息到达队列时,RabbitMQ会将消息推送给消费者,或者消费者可以主动从队列中拉取消息。

  5. 消息确认:当消费者处理完一个消息后,它需要向RabbitMQ发送一个确认,告诉RabbitMQ这个消息已经被处理,可以从队列中删除。如果消费者处理消息时发生错误,它可以发送一个拒绝,告诉RabbitMQ这个消息没有被正确处理。

  6. 持久化:为了防止消息丢失,RabbitMQ提供了消息持久化的功能。生产者在发送消息时可以设置消息为持久化,RabbitMQ会将这些消息存储到磁盘,即使RabbitMQ服务器重启,这些消息也不会丢失。

通过这种方式,RabbitMQ可以在分布式系统中实现消息的可靠传递。 

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言.

  • Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
  • Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
  • Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
  • ExchangeType:交换机类型决定了路由消息行为,
  • RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
  • Message Queue:消息队列,用于存储还未被消费者消费的消息. Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内 容.
  • BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.

二、RabbitMQ安装部署

1. 安装

在虚拟机中下载RabbitMQ的镜像

命令:

docker pull rabbitmq:management 

2. 部署

查看防火墙列表的端口是否开启

5672(RabbitMQ的用户端口)和15672(RabbitMQ的管理员端口)

命令:

firewall-cmd --zone=public --list-ports

开放端口5672:

firewall-cmd --zone=public --add-port=5672/tcp --permanent 

开放端口15672:

firewall-cmd --zone=public --add-port=15672/tcp --permanent

更新防火墙端口:

firewall-cmd --reload

创建并运行RabbitMQ的容器:

docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management 

--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)

-e:指定环境变量:

RABBITMQ_DEFAULT_VHOST:默认虚拟机名

RABBITMQ_DEFAULT_USER:默认的用户名

RABBITMQ_DEFAULT_PASS:默认用户名的密码

最后是运用RabbitMQ镜像;

3. 增加用户

使用虚拟机IP和RabbitMQ的管理员端口登入后台管理:

如图所示增加用户:

点击创建的用户,在点击设置应用

之后退出,登入创建的用户

三、实现案例

在实现案例的时候,虚拟机的RabbitMQ容器不用停止运行,虚拟机不用关闭。

1. 项目创建

打开我们的开放工具,创建项目,来实现生产者-消费者的消息队列:

根据如图创建项目:

父项目

生产者(publisher)在父项目中

创建消费者(consumer)在父项目中:

2. 项目配置

在生产者(publisher)项目中配置yml文件:

server:port: 9949
spring:rabbitmq:host: 192.***.***.***username: Junpassword: 123456port: 5672virtual-host: my_vhost

server.port:配置生产者的端口

host:配置虚拟机的IP(这里需要根据自己的虚拟机IP进行填写)

username:配置我们在RabbitMQ中创建的用户名称

password:配置我们在RabbitMQ中创建的用户密码

port:配置RabbitMQ的用户端口

virtual-host: 配置默认虚拟机名(my_vhost)

消费者(consumer)项目中配置yml文件:

server:port: 8848
spring:rabbitmq:host: 192.168.211.129username: Junpassword: 123456port: 5672virtual-host: my_vhost

server.port:配置消费者的端口

host:配置虚拟机的IP(这里需要根据自己的虚拟机IP进行填写)

username:配置我们在RabbitMQ中创建的用户名称

password:配置我们在RabbitMQ中创建的用户密码

port:配置RabbitMQ的用户端口

virtual-host: 配置默认虚拟机名(my_vhost)

3. 生产者代码

在生产者中创建一个配置类,使用@Configuration注解的类表示这个类包含了一个或多个@Bean注解的方法,这些方法将会被Spring容器调用,其返回值将被添加到Spring的应用上下文中,作为一个bean供其他部分使用。

这个配置类的名字是RabbitConfig(可以自己修改),它的主要作用是配置RabbitMQ的队列

RabbitConfig:

package com.cloudjun.publisher;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@SuppressWarnings("all")
public class RabbitConfig {// 创建队列@Beanpublic Queue messageQueue() {return new Queue("messageQueue");}@Beanpublic Queue messageUser() {return new Queue("messageUser");}}

创建实体对象来作为传输信息内容:

User:
 

package com.cloudjun.publisher;import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;import java.io.Serializable;@SuppressWarnings("all")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {private String username;private String userpwd;}

创建一个控制器类,使用@RestController注解的类表示这个类是一个控制器,它可以处理HTTP请求。

这个控制器类的名字是TestController,它的主要作用是处理HTTP请求,并通过RabbitMQ发送消息。

TestController

package com.cloudjun.publisher;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author CloudJun*/
@RestController
public class TestController {@Autowiredprivate AmqpTemplate template;@Autowiredprivate ObjectMapper objectMapper;@RequestMapping("test01")public String test01(){// 发送消息到名为messageQueue的队列// 这里的messageQueue是RabbitMQ中定义的队列名称// 这里的"Hello World!"是发送的消息内容template.convertAndSend("messageQueue", "HelloWorld!");return "💖";}@RequestMapping("test02")public String test02() throws Exception {// 发送消息到名为messageQueue的队列// 这里的messageQueue是RabbitMQ中定义的队列名称User user = new User("Jun", "123456");// 序列化对象转换为JSON字符串String json = objectMapper.writeValueAsString(user);template.convertAndSend("messageUser", json);return "💖";}}

类及代码说明:

在这个类中,使用了@Autowired注解来自动注入AmqpTemplateObjectMapper对象。AmqpTemplate是Spring提供的一个操作RabbitMQ的工具,可以用来发送和接收消息。ObjectMapper是Jackson库提供的一个工具,可以用来将对象转换为JSON字符串,或者将JSON字符串转换为对象。

  1. test01方法发送了一个字符串"Hello World!"到名为messageQueue的队列。
  2. test02方法创建了一个User对象,然后使用ObjectMapper将这个对象转换为JSON字符串,然后发送这个JSON字符串到名为messageUser的队列。

4. 消费者代码

创建实体对象来作为接收生产者的信息内容:

User:

package com.cloudjun.consumer;import lombok.*;import java.io.Serializable;@SuppressWarnings("all")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User implements Serializable {private String username;private String userpwd;}

创建消息消费者类,使用@Component注解的类表示这个类是一个组件,它会被Spring管理。

这个消息消费者类的名字是Receiver(名称可以直接修改),它的主要作用是接收并处理RabbitMQ的消息

Receiver:

package com.cloudjun.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "messageQueue")
public class Receiver {@RabbitHandlerpublic void messageMsg(String msg) {log.warn("接收到:" + msg);}}

类及代码说明:

在这个类中,使用了@Slf4j注解来启用日志,使用了@RabbitListener注解来监听名为messageQueue的队列,这个队列是在前面的RabbitConfig配置类中定义的。

这个类定义了一个名为process的方法,这个方法使用了@RabbitHandler注解,表示这个方法是处理消息的方法。当messageQueue队列中有新的消息时,这个方法会被调用,方法的参数msg就是接收到的消息。

process方法中,使用了log.warn来打印接收到的消息,这样我们就可以在日志中看到接收到的消息。

总的来说,这个类的作用是接收并处理RabbitMQ的消息,并将接收到的消息打印在日志中。

在创建一个消息消费者类,这个消息消费者类的名字是PojoReceiver,它的主要作用是接收并处理RabbitMQ的消息

PojoReceiver:

package com.cloudjun.consumer;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "messageUser")
public class PojoReceiver {@Autowiredprivate ObjectMapper objectMapper;@RabbitHandlerpublic void messageUser(String json) throws Exception {User user = objectMapper.readValue(json, User.class);// 处理user对象log.warn("接收到:" + user);}}

类及代码说明:

这个类定义了一个名为process的方法,这个方法使用了@RabbitHandler注解,表示这个方法是处理消息的方法。当messageUser队列中有新的消息时,这个方法会被调用,方法的参数json就是接收到的消息。

不同于前面的Receiver类,这个类在接收到消息后,会使用ObjectMapper将消息从JSON字符串转换为User对象。这样,我们就可以在处理消息时,直接操作User对象,而不需要手动解析JSON字符串。

process方法中,使用了log.warn来打印接收到的User对象,这样我们就可以在日志中看到接收到的消息。

总的来说,这个类的作用是接收并处理RabbitMQ的消息,并将接收到的消息从JSON字符串转换为User对象,然后将User对象打印在日志中。 

四、测试

启动两个项目,在浏览器中访问生产者的配置路径,并且在消费者中看看是否可以查看到,生产者转递过来的信息。

方法一:

方法二:

每篇一获

学习RabbitMQ的基本使用后,你可以从以下几个方面受益:

  1. 可靠性:通过使用RabbitMQ消息队列技术,可以确保消息的可靠性,即使在消息处理过程中出现故障,也可以确保消息不会丢失。

  2. 异步处理:使用RabbitMQ可以实现异步处理,将消息发送到队列中,然后再异步处理它们。这样可以加速应用程序的响应时间,提高系统的吞吐量。

  3. 解耦合:使用RabbitMQ可以实现应用程序之间的解耦合,例如一个应用程序可以发送消息到一个队列中,而另一个应用程序可以从该队列中接收并处理消息。这样可以降低应用程序之间的依赖性,提高系统的可维护性和可扩展性。

  4. 伸缩性:使用RabbitMQ可以轻松地水平扩展消息处理能力,通过添加更多的消费者来实现更高的吞吐量。

  5. 可视化管理:RabbitMQ提供了一个易于使用的Web管理界面,可以监控和管理RabbitMQ服务器,包括队列、交换机、绑定等等。

总之,学习RabbitMQ可以帮助你更好地理解消息队列的概念和实现方式,并且可以应用到实际项目中,提高应用程序的可靠性、响应时间和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/634365.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Web文件上传功能简述

文章目录 正文简单文件上传文件写入 总结 正文 在日常项目开发过程中,文件上传是一个非常常见的功能,当然正规项目都有专门的文件服务器保存上传的文件,实际只需要保存文件路径链接到数据库中即可,但在小型项目中可能没有专门的文…

Oracle命令大全

文章目录 1. SQL*Plus命令(用于连接与管理Oracle数据库)2. SQL数据定义语言(DDL)命令3. SQL数据操作语言(DML)命令4. PL/SQL程序块5. 系统用户管理6. 数据备份与恢复相关命令1. SQL*Plus命令(用…

西门子燃烧控制器维修LMV37.410A2WH

西门子燃烧控制器维修范围包括: LMV系列燃烧器控制系统维修 LMV5系列控制器维修 AZL系列显示操作单元维修 QRI系列火焰探测器维修 SQM4系列电动执行机构维修 AZL系列或其他控制系统维修或设置燃烧器的启停,燃料,运行模式,运行…

基于网络爬虫的微博热点分析,包括文本分析和主题分析

基于Python的网络爬虫的微博热点分析是一项技术上具有挑战性的任务。我们使用requests库来获取微博热点数据,并使用pandas对数据进行处理和分析。为了更好地理解微博热点话题,我们采用LDA主题分析方法,结合jieba分词工具将文本分割成有意义的…

配置zabbix平台对数据库以及主从状态的监控

引言:明人不说暗话,今天分享下配置zabbix平台对数据库以及主从状态的监控 准备好zabbix监控平台(zabbix-server端)例10.12.153.235 db1客户端(zabbix-agent)例10.12.153.73 1.安装Zabbix存储库 # rpm -Uv…

PDF.js - 免费开源的 JavaScript 读取、显示 PDF 文档的工具库,由 Mozilla 开发并且持续维护

最近新项目需要处理 PDF,研究了 PDf.js 之后觉得很不错,于是写篇文章推荐给大家。 PDF.js 的功能和它的名字一样简单,是一个使用 HTML5 技术来让前端网页支持读取、解析和显示 PDF 文档的 JS 工具库。这个项目由大名鼎鼎的 Mozilla 组织开发…

RabbitMQ安装和使用

简介 RabbitMQ是一套开源(MPL)的消息队列服务软件,是由LShift提供的一个Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成。所有主要的编程语言均有与代理接口通讯的客户端库…

SQL Server中数据表的增删查改

文章目录 一、增二、查三、改四、删除 一、增 进行增删查改的前提需要在指定数据库中创建数据表,对这块不大理解的可以先看看前面几期文章: 创建数据库 创建数据表 use StudentManageDB go insert into Students (StudentName,Gender,Birthday,Age,Stu…

HarmonyOS【应用服务开发】在模块中添加Ability

Ability是应用/服务所具备的能力的抽象,一个Module可以包含一个或多个Ability。应用/服务先后提供了两种应用模型: FA(Feature Ability)模型: API 7开始支持的模型,已经不再主推。Stage模型:AP…

C# new Thread和Task.Run,多线程(Thread和Task)

一、开启多线程-new Thread的使用 示例一 Thread thread25yi new Thread(new ThreadStart(obj.MethodTimer1)); thread25yi.Start(); void MethodTimer1() { while (true) { Console.WriteLine(DateTime.Now.ToString() "_" thread25yi.CurrentThread.Managed…

杂记 | 在Linux上使用Docker-compose安装单机版Milvus向量数据库并配置访问控制和可视化面板(Attu)

文章目录 01 Milvus向量数据库简介02 安装前的准备03 安装3.1 创建milvus工作目录3.2 下载并编辑docker-compose.yml3.3 下载milvus.yml文件3.4 启动milvus 04 访问可视化面板并修改密码 01 Milvus向量数据库简介 Milvus是一款开源的向量数据库,它专为AI应用设计&a…

Docker(三)使用 Docker 镜像:从仓库获取镜像;管理本地主机上的镜像;介绍镜像实现的基本原理

作者主页: 正函数的个人主页 文章收录专栏: Docker 欢迎大家点赞 👍 收藏 ⭐ 加关注哦! 使用 Docker 镜像 在之前的介绍中,我们知道镜像是 Docker 的三大组件之一。 Docker 运行容器前需要本地存在对应的镜像&#x…

IaC基础设施即代码:Terraform 使用 dynamic动态内联块 创建docker资源

目录 一、实验 1.环境 2.Terraform查看版本 3.Linux主机安装Docker 4.Terraform使用本地编译(In-house)的Providers 5.Docker-CE 开启远程API 6. Linux主机拉取镜像 7.Terraform 使用 dynamic动态内联块 创建资源 二、问题 1.Terraform 计划资源…

精品基于Uniapp+springboot校园学校趣事管理系统app

《[含文档PPT源码等]精品基于Uniappspringboot趣事管理系统app》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功! 软件开发环境及开发工具: 开发语言:Java 后台框架:springboot、ssm 安卓…

Unity vs Godot :哪个游戏引擎更适合你?

Unity vs Godot :哪个游戏引擎更适合你? 游戏引擎的选择对开发过程和最终产品质量有着重大影响。近年来,Godot和Unity这两款引擎受到广泛关注。本文将从多个维度对两者进行比较,以期为开发者提供正确的选择建议。 Godot和Unity都有…

Backtrader 文档学习-Indicators混合时间框架

Backtrader 文档学习-Indicators混合时间周期 1.不同时间周期 如果数据源在Cerebro引擎中具有不同的时间范围和不同的长度,指示器将会终止。 比如:data0是日线,data1是月线 。 pivotpoint btind.PivotPoint(self.data1) sellsignal self…

IMDB电影评论的情感分析——paddle

项目地址:IMDB电影评论的情感分析 - 飞桨AI Studio星河社区 (baidu.com) 1. 实验介绍 1.1 实验目的 理解并掌握循环神经网络的基础知识点,包括模型的时序结构、模型的前向传播、反向传播等掌握长短时记忆网络LSTM和门控循环单元网络GRU的设计原理熟悉如…

Halcon基于形变的模板匹配

Halcon基于形变的模板匹配 形变分为两种,一种是基于目标局部的形变,另一种是由于透视关系而产生的形变。基于形变的模板匹配也是一种基于形状的匹配方法,但不同的是,其返回结果中不仅包括轻微形变的形状.形变的位置和参数&#x…

Node.js基础知识点(五)- http

一.request 请求事件处理函数 var http require(http)var server http.createServer() request 请求事件处理函数,需要接收两个参数: Request 请求对象 请求对象可以用来获取客户端的一些请求信息,例如请求路径 Response 响应对象 响应对…

神器yakit之web fuzzer功能

前言 yakit并不像burp一样单独设置爆破模块,但是yakit也是可以爆破的,并且更好用(个人感觉)。 手工测试场景中需要渗透人员对报文进行反复的发送畸形或者特定的payload进行查看服务器的反馈并以此来进行下一步的判断。 Fuzz标签便…