RabbitMQ快速实战

目录

什么是消息队列?

消息队列的优势

应用解耦

异步提速

削峰填谷

总结

主流MQ产品特点比较

Rabbitmq快速上手

创建用户admin

Exchange和Queue

Connection和Channel

RabbitMQ中的核心概念总结


什么是消息队列?

       MQ全称Message Queue消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。       

       消息队列是一种在应用程序之间传递消息的技术。它提供了一种异步通信模式,允许应用程序在不同的时间处理消息。消息队列通常用于解耦应用程序,以便它们可以独立地扩展和修改。在消息队列中,消息发送者将消息发送到队列中,然后消息接收者从队列中接收消息。这种模式允许消息接收者按照自己的节奏处理消息,而不必等待消息发送者处理完消息。常见的消息队列包括RabbitMQ、Kafka和ActiveMQ等。

     

         

消息队列的优势

应用解耦

              

       假如用户访问订单系统,而订单系统跟其他系统是强耦合的,如图如果库存系统挂了,那么整个订单系统也都不能用了。   如果这种情况还想要增加新的XX系统进来,那么就只能修改源代码来完成。系统的耦合性越高,容错性就越低,可维护性就越低。

                 

       通过引入MQ做到应用解耦,库存系统出现异常可以等库存系统恢复后去MQ中拿消息,此时不影响别的系统调用,如果还要加入新的系统比如XX系统,那么只需XX系统去MQ中拿取消息进行处理即可。使用MQ可以提升容错性和可维护性。

异步提速

             

 原先用户请求订单系统,需要等到订单系顺序调用其他系统无误后返回,比较耗时。

       

        现在通过引入MQ,订单系统只需要把信息发送到MQ中即可,相当于完成了之前顺序请求其他系统的步骤,时间成本大大减低。

削峰填谷
             

以上场景中激增请求会打垮系统,造成服务不可用。

             

 通过将激增请求先放到MQ当前,然后系统再根据自身情况拉取请求来消费

总结

MQ优势

应用解耦:提高系统容错性和可维护性

异步提速:提升用户体验和系统吞吐量

削峰填谷:提高系统稳定性

MQ劣势

系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息不丢失等情况?


主流MQ产品特点比较

       


Rabbitmq快速上手

百度网盘链接(windows版):https://pan.baidu.com/s/1yEHQvd0VrqJTYmkZvDPf4Q 
提取码:kzzr 

安装完成后,访问本地http://localhost:15672/,账号密码都是guest

        

​       登录控制台后上方就能看到RabbitMQ的主要功能。其中Overview是概述,主要展示RabbitMQ服务的一些整体运行情况。后面Conections、Channels、Exchanges和Queues就是RabbitMQ的核心功能。最后的Admin则是一些管理功能。

创建用户admin

 创建用户对应的虚拟机,以下分配了/和/mirror虚拟机,通过左下方set可以添加虚拟机给用户(前提虚拟机已经创建)。

Exchange和Queue

创建一个队列

Virtual host:                                     虚拟机名称,自己选择                 
Type:          创建队列的类型,有三种Classic经典队列、Quorum仲裁队列、Stream流队列
Name:                                     队列名称
Durability:       是否持久(都是针对没有处理过的消息)化Durable会存到硬盘当中,服务重启队列中的消息还是会保留。Transient消息发过来之后,服务重启则消息丢失。            
Autodelete:                                     是否自动删除    
Arguments:                                   配置队列的多个参数

       有了队列后,我们就可以在这个队列上收发消息,在队列列表中点击刚刚创建的队列,然后就可以看到以下队列功能。

        RabbitMQ中的消息都是通过Queue队列传递的,这个Queue其实就是一个典型的FIFO的队列数据结构。而Exchange交换机则是用来辅助进行消息分发的。Exchange与Queue之间会建立一种绑定关系,通过绑定关系,Exchange交换机里发送的消息就可以分发到不同的Queue上。

      进入Exchanges菜单,可以看到针对每个虚拟机,RabbitMQ都预先创建了多个Exchange交换机。我们选择/mirror下的amq.direct交换机,与test队列做绑定。

               

 发送一条消息,在队列中接收消息

       Exchange交换机既然可以绑定一个队列,当然也可以绑定更多的队列。而Exchange的作用,就是将发送到Exchange的消息转发到绑定的队列上。在具体使用时,通常只有消息生产者需要与Exchange打交道。而消费者,则并不需要与Exchange打交道,只要从Queue中消费消息就可以了。

​        另外,Exchange并不只是简单的将消息全部转发给Queue,在实际使用中,Exchange与Queue之间可以建立不同类型的绑定关系,然后通过一些不同的策略,选择将消息转发到哪些Queue上。这时候,Messaage上几个没有用上的参数,像Routing Key ,Headers,Properties这些参数就能派上用场了。

​         在这个过程中,我们都是通过页面操作完成的消息发送与接收。在实际应用时,其实就是通过RabbitMQ提供的客户端API来完成这些功能。但是整个执行的过程,其实跟页面操作是相同的。


Connection和Channel

​       这两个概念实际上是跟客户端应用的对应关系。一个Connection可以理解为一个客户端应用。而一个应用可以创建多个Channel,用来与RabbitMQ进行交互。

1、创建一个Maven项目,在pom.xml中引入RabbitMQ客户端的依赖:

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>

2、然后就可以创建一个消费者实例,尝试从RabbitMQ上的test1这个队列上拉取消息。

public class FirstConsumer {private static final String HOST_NAME="127.0.0.1";private static final int HOST_PORT=5672;private static final String QUEUE_NAME="test2";public static final String USER_NAME="admin";public static final String PASSWORD="123456";public static final String VIRTUAL_HOST="/mirror";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST_NAME);factory.setPort(HOST_PORT);factory.setUsername(USER_NAME);factory.setPassword(PASSWORD);factory.setVirtualHost(VIRTUAL_HOST);Connection connection = factory.newConnection();Channel channel = connection.createChannel();/*** 声明一个对列。几个参数依次为: 队列名,durable是否实例化;exclusive:是否独占;autoDelete:是否自动删除;arguments:参数* 这几个参数跟创建队列的页面是一致的。* 如果Broker上没有队列,那么就会自动创建队列。* 但是如果Broker上已经由了这个队列。那么队列的属性必须匹配,否则会报错。*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//每个worker同时最多只处理一个消息channel.basicQos(1);//回调函数,处理接收到的消息Consumer myconsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {System.out.println("========================");String routingKey = envelope.getRoutingKey();System.out.println("routingKey >"+routingKey);String contentType = properties.getContentType();System.out.println("contentType >"+contentType);long deliveryTag = envelope.getDeliveryTag();System.out.println("deliveryTag >"+deliveryTag);System.out.println("content:"+new String(body,"UTF-8"));// (process the message components here ...)channel.basicAck(deliveryTag, false);}};//从test1队列接收消息channel.basicConsume(QUEUE_NAME, myconsumer);}
}

虽然我们这次测试只用到了消费者,在此也将生产者代码补充

 

public class FirstProducer {private static final String HOST_NAME="127.0.0.1";private static final int HOST_PORT=5672;private static final String QUEUE_NAME="test2";public static final String USER_NAME="admin";public static final String PASSWORD="admin";public static final String VIRTUAL_HOST="/mirror";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST_NAME);factory.setPort(HOST_PORT);factory.setUsername(USER_NAME);factory.setPassword(PASSWORD);factory.setVirtualHost(VIRTUAL_HOST);Connection connection = factory.newConnection();Channel channel = connection.createChannel();/*** 声明一个对列。几个参数依次为: 队列名,durable是否实例化;exclusive:是否独占;autoDelete:是否自动删除;arguments:参数* 这几个参数跟创建队列的页面是一致的。* 如果Broker上没有队列,那么就会自动创建队列。* 但是如果Broker上已经由了这个队列。那么队列的属性必须匹配,否则会报错。*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "message";channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());channel.close();connection.close();}
}

         执行完消费者应用程序后,就会在RabbitMQ上新创建一个test2的队列(如果你之前没有创建过的话),并且启动一个消费者,处理test2队列上的消息。这时,我们可以从管理平台页面上往test2队列发送一条消息,这个消费者程序就会及时消费消息。

可以看到Connection和Channel已经建立完成。

现在通过test2队列发送消息,我们客户端消费者会进行消费。

控制台打印出了消息内容


RabbitMQ中的核心概念总结

1、服务主机Broker

  ​     一个搭建RabbitMQ Server的服务器称为Broker。这个并不是RabbitMQ特有的概念,但是却是几乎所有MQ产品通用的一个概念。未来如果需要搭建集群,就需要通过这些Broker来构建。

2、虚拟主机 virtual host

​        RabbitMQ出于服务器复用的想法,可以在一个RabbitMQ集群中划分出多个虚拟主机,每一个虚拟主机都有全套的基础服务组件,可以针对每个虚拟主机进行权限以及数据分配。不同虚拟主机之间是完全隔离的,如果不考虑资源分配的情况,一个虚拟主机就可以当成一个独立的RabbitMQ服务使用。

2、连接 Connection

​       客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接,这个连接就是Connection。既然是通道,那就需要尽量注意在停止使用时要关闭,释放资源。

3、信道 Channel

​       一旦客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。也可以理解为是客户端与RabbitMQ实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。

​       RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,所以在实际业务中,对于Connection和Channel的分配也需要根据实际情况进行考量。

4、交换机 Exchange

​       这是RabbitMQ中进行数据路由的重要组件。消息发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略。从Web管理界面就能看到,在每个虚拟主机中,RabbitMQ都会默认创建几个不同类型的交换机来。

       交换机多用来与生产者打交道。生产者发送的消息通过Exchange交换机分配到各个不同的Queue队列上,而对于消息消费者来说,通常只需要关注自己感兴趣的队列就可以了。

5、队列 Queue

​       Queue是实际保存数据的最小单位。Queue不需要Exchange也可以独立工作,只不过通常在业务场景中,会增加Exchange实现更复杂的消息分配策略。Queue结构天生就具有FIFO的顺序,消息最终都会被分发到不同的Queue当中,然后才被消费者进行消费处理。这也是最近RabbitMQ功能变动最大的地方。最为常用的是经典队列Classic。RabbitMQ 3.8.X版本添加了Quorum队列,3.9.X又添加了Stream队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/661229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring5深入浅出篇:Spring中ioc(控制反转)与DI(依赖注入)

Spring5深入浅出篇:Spring中ioc(控制反转)与DI(依赖注入) 反转(转移)控制(IOC Inverse of Control) 控制&#xff1a;对于成员变量赋值的控制权 反转控制&#xff1a;把对于成员变量赋值的控制权&#xff0c;从代码中反转(转移)到Spring⼯⼚和配置⽂件中完成好处&#xff1a;…

七、并发工具(上)

一、自定义线程池 1&#xff09;背景&#xff1a; 在 QPS 量比较高的情况下&#xff0c;我们不可能说所有的访问都创建一个线程执行&#xff0c;这会导致内存占用过高&#xff0c;甚至有可能出现 out of memory另外也要考虑 cpu 核数&#xff0c;如果请求超过了cpu核数&#…

【bitonicSort学习】

bitonicSort学习 什么是Bitonic Sort核心 什么是Bitonic Sort https://zhuanlan.zhihu.com/p/53963918 这个是用来并行排序的一个操作 之前学过一些CPU排序&#xff0c;快排 冒泡 归并啥的&#xff0c;有一些能转成并行&#xff0c;有一些不适合 像快排这种二分策略就可以考虑…

Vue3的自定义指令怎么迁移到nuxt3

一、找到Vue3中指令的源码 const DISTANCE 100; // 距离 const ANIMATIONTIME 500; // 500毫秒 let distance: number | null null,animationtime: number | null null; const map new WeakMap(); const ob new IntersectionObserver((entries) > {for (const entrie…

草图导入3d后模型贴材质的步骤?---模大狮模型网

3D模型在导入草图大师后出现混乱可能有多种原因&#xff0c;以下是一些可能的原因和解决方法&#xff1a; 模型尺寸问题&#xff1a;如果3D模型的尺寸在导入草图大师时与画布尺寸不匹配&#xff0c;可能导致模型混乱。解决方法是在3D建模软件中调整模型的尺寸&#xff0c;使其适…

FreeRTOS使用计数信号量进行任务同步与资源管理

FreeRTOS使用计数信号量进行任务同步与资源管理 介绍 在多任务系统中&#xff0c;任务之间的同步和对共享资源的管理是非常重要的。FreeRTOS 提供了丰富的同步机制&#xff0c;其中计数信号量是一种强大的工具&#xff0c;用于实现任务之间的同步和对资源的访问控制。 什么是…

figure方法详解之清除图形内容

figure方法详解之清除图形内容 一 clf():二 clear():三 clear()方法和clf()方法的区别&#xff1a; 前言 Hello 大家好&#xff01;我是甜美的江。 在数据可视化中&#xff0c;Matplotlib 是一个功能强大且广泛使用的库&#xff0c;它提供了各种方法来创建高质量的图形。在 Mat…

unity 拖入文件 窗口大小

目录 unity 拖入文件插件 设置窗口大小 unity 拖入文件插件 GitHub - Bunny83/UnityWindowsFileDrag-Drop: Adds file drag and drop support for Unity standalong builds on windows. 设置窗口大小 file build

Iceberg从入门到精通系列之二十一:Spark集成Iceberg

Iceberg从入门到精通系列之二十一&#xff1a;Spark集成Iceberg 一、在 Spark 3 中使用 Iceberg二、添加目录三、创建表四、写五、读六、Catalogs七、目录配置八、使用目录九、替换会话目录十、使用目录特定的 Hadoop 配置值十一、加载自定义目录十二、SQL 扩展十三、运行时配置…

电子电器架构——车载网关转发buffer心得汇总

电子电器架构——车载网关转发buffer心得汇总 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力…

vue2父组件向子组件传值时,子组件同时接收多个数据类型,控制台报警的问题

最近项目遇到一个问题,就是我的父组件向子组件(公共组件)传值时,子组件同时接收多个数据类型,控制台报警的问题,如下图,子组件明明写了可同时接收字符串,整型和布尔值,但控制台依旧报警: 仔细检查父组件,发现父组件是这样写的: <common-tabletooltip :content=…

2024 springboot Mybatis-flex 打包出错

Mybatis-flex官网&#xff1a;快速开始 - MyBatis-Flex 官方网站 从 Mybatis-flex官网获取模板后&#xff0c;加入自己的项目内容想打包确保错&#xff0c;先试试一下方法 这里改成skip的默认是true改成false&#xff0c;再次打包就可以了

Git系列---标签管理

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 目录 1.理解标签2.创建标签…

MySQL原理(一)架构组成之物理文件组成

目录 一、日志文件 1、错误日志 Error Log 1.1、作用&#xff1a; 1.2、开启关闭&#xff1a; 1.3、使用 2、二进制日志 Binary Log & Binary Log Index 2.1、作用&#xff1a; 2.2、开启关闭&#xff1a; 2.3、Binlog还有一些附加选项参数 &#xff08;1&#x…

verilog编程之乘法器的实现

知识储备 首先来回顾一下乘法是如何在计算机中实现的。 假设现在有两个32位带符号定点整数x和y&#xff0c;我们现在要让x和y相乘&#xff0c;然后把乘积存放在z中&#xff0c;大家知道&#xff0c;两个32位数相乘&#xff0c;结果不会超过64位&#xff0c;因此z的长度应该为64…

大数据开发之离线数仓项目(3数仓数据同步策略)(可面试使用)

第 1 章&#xff1a;实时数仓同步数据 实时数仓由flink源源不断从kafka当中读数据计算&#xff0c;所以不需要手动同步数据到实时数仓。 第 2 章&#xff1a;离线数仓同步数据 2.1 用户行为数据同步 2.1.1 数据通道 用户行为数据由flume从kafka直接同步到hdfs&#xff0c;…

通俗易懂理解通道注意力机制(CAM)与空间注意力机制(SAM)

重要说明&#xff1a;本文从网上资料整理而来&#xff0c;仅记录博主学习相关知识点的过程&#xff0c;侵删。 一、参考资料 通道注意力&#xff0c;空间注意力&#xff0c;像素注意力 通道注意力机制和空间注意力机制 视觉 注意力机制——通道注意力、空间注意力、自注意力…

Linux:进程信号的概念与产生原理

文章目录 信号的概念实践信号关于前台和后台进程的操作 操作系统与外设信号的产生signal系统调用 前面的篇章结束了信号量的话题&#xff0c;那么接下来引入的是信号的话题&#xff0c;信号和信号量之间没有任何关系&#xff0c;只是名字比较像 信号的概念 在生活中存在各种各…

Java学习day24:线程的同步和锁(例题+知识点详解)

声明&#xff1a;该专栏本人重新过一遍java知识点时候的笔记汇总&#xff0c;主要是每天的知识点题解&#xff0c;算是让自己巩固复习&#xff0c;也希望能给初学的朋友们一点帮助&#xff0c;大佬们不喜勿喷(抱拳了老铁&#xff01;) 往期回顾 Java学习day23&#xff1a;线程构…

Matlab图像模拟加噪——高斯噪声、椒盐噪声、泊松噪声、乘性噪声、均匀噪声、指数噪声

1.高斯噪声 (1)通过均值和方差来产生 Jimnoise(I, gaussian, 0, 0.01);%高斯噪声&#xff0c;均值为0&#xff0c;方差为0.01(2)通过位置信息来产生 Iim2double(I); Vzeros(size(I)); %建立矩阵V for i1:size(V, 1)V(i,:)0.02*i/size(V,1); end Jimnoise(I, localvar, V); …