第十四章 RabbitMQ应用

文章目录

  • 前言
  • 1、RabbitMQ概念
    • 1.1、生产者和消费者
    • 1.2、队列
    • 1.3、交换机、路由键、绑定
      • 1.3.1、交换机类型
  • 2、RabbitMQ运转流程
    • 2.1、生产者发送消息流程
    • 2.2、消费者接收消息的过程
    • 2.3、AMQP协议
  • 3、RabbitMQ windows安装
    • 3.1、下载
    • 3.2、安装
  • 4、Spring Boot 整合RabbitMQ
    • 4.1、在user-service添加依赖
    • 4.2、配置文件添加
    • 4.3、增加RabbitMQ配置类
    • 4.4、新增消费监听类
    • 4.5、消息生产端


在这里插入图片描述

前言

一般MQ用于系统解耦、削峰使用,常见于微服务、业务活动等场景。

1、RabbitMQ概念

RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

1.1、生产者和消费者

  • Producer:生产者,就是投递消息的一方。消息一般可以包含2个部分:消息体和标签(Label)。消息的标签用来描述这条消息,比如一个交换器的名称和一个路由键。
  • Consumer:消费者,就是接受消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)
  • Broker:消息中间件的服务节点。一个RabbitMQ Broker看做一台RabbitMQ服务器

1.2、队列

Queue:队列,是RabbitMQ的内部对象,用于存储消息


1.3、交换机、路由键、绑定

Exchange:交换器。生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。

RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:绑定。RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。

1.3.1、交换机类型

  • Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。

  • Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。
  • Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。
  • Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。

    自学参考:https://blog.csdn.net/qq_38550836/article/details/95358353

2、RabbitMQ运转流程

2.1、生产者发送消息流程

  • 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
  • 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
  • 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
  • 生产者通过路由键将交换器和队列绑定起来
  • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
  • 相应的交换器根据接收到的路由键查找相匹配的队列。
  • 如果找到,则将从生产者发送过来的消息存入相应的队列。
  • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  • 关闭信道
  • 关闭连接

2.2、消费者接收消息的过程

  • 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
  • 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
  • 等待RabbitMQ Broker回应并投递相应队列中队列的消息,消费者接收消息。
  • 消费者确认(ack)接收到的消息。
  • RabbitMQ从队列中删除相应已经被确认的消息。
  • 关闭信道
  • 关闭连接
    无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。

2.3、AMQP协议

Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等

  • Broker:接收和分发消息的应用,RabbitMQ 就是 Message Broker
  • Virtual Host:虚拟 Broker,将多个单元隔离开
  • Connection:publisher / consumer 和 broker 之间的 tcp 连接
  • Channel:connection 内部建立的逻辑连接,通常每个线程创建单独的 channel
  • Routing key:路由键,用来指示消息的路由转发,相当于快递的地址
  • Exchange:交换机,相当于快递的分拨中心
  • Queue:消息队列,消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,用于 message 的分发依据

3、RabbitMQ windows安装

3.1、下载

https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe

3.2、安装

配置环境变量

cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin

开启rabbitmq-plugins插件

rabbitmq-plugins enable rabbitmq_management

打开地址
http://127.0.0.1:15672/

输入用户名/密码:guest/guest

4、Spring Boot 整合RabbitMQ

4.1、在user-service添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2、配置文件添加

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

4.3、增加RabbitMQ配置类

package com.xxxx.user.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/******************direct**********************//*** 创建direct队列* @return*/@Beanpublic Queue directQueue(){return new Queue("directQueue");}/*** 创建direct交换机* @return*/@Beanpublic DirectExchange directExchange(){return new DirectExchange("directExchange");}/*** 把队列和交换机绑定在一起* @param queue* @param directExchange* @return*/@Beanpublic Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("routingKey");}/******************topic**********************/@Beanpublic Queue topicQuerue1(){return new Queue("topicQuerue1");}@Beanpublic Queue topicQuerue2(){return new Queue("topicQuerue2");}@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topicExchange");}@Beanpublic Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1");}/*** 通配符:* 表示一个词,# 表示零个或多个词* @param queue* @param topicExchange* @return*/@Beanpublic Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");}/******************fanout**********************/@Beanpublic Queue fanoutQueue1(){return new Queue("fanoutQueue1");}@Beanpublic Queue fanoutQueue2(){return new Queue("fanoutQueue2");}@Beanpublic Queue fanoutQueue3(){return new Queue("fanoutQueue3");}@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}@Beanpublic Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}@Beanpublic Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}@Beanpublic Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}
}

4.4、新增消费监听类

package com.xxxx.user.consumer;import com.xxxx.springCloud.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
@RabbitListener(queues = "directQueue")
public class DataDirectReceiver {@RabbitHandlerpublic void process(String data){log.info("收到directQueue队列信息:" + data);}@RabbitHandlerpublic void process(UserInfo data){log.info("收到directQueue队列信息:" + data);}
}
package com.xxxx.user.consumer;import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
@RabbitListener(queues = {"topicQuerue1","topicQuerue2"})
public class DataFanoutReceiver {@RabbitHandlerpublic void process(String data){log.info("收到topicQuerue队列信息:" + data);}@RabbitHandlerpublic void process(UserInfo data){log.info("收到topicQuerue队列信息:" + data);}
}
package com.xxxx.user.consumer;import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
@RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"})
public class DataTopicReceiver {@RabbitHandlerpublic void process(String data){log.info("收到topicQuerue队列信息:" + data);}@RabbitHandlerpublic void process(UserInfo data){log.info("收到topicQuerue队列信息:" + data);}
}

4.5、消息生产端

package com.xxxx.user;import com.xxxx.common.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DataSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendDirect(){UserInfo userInfo = new UserInfo();userInfo.setUserAccount("tiger");userInfo.setPassword("12345");this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo);}@Testpublic void sendTopic(){this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic");}@Testpublic void sendFanout(){this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic");}
}


在这里插入图片描述



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/653157.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[USACO22JAN] Tests for Haybales G

Farmer John 的奶牛们决定为 Farmer Nhoj 农场的奶牛们举办一场编程竞赛。为了使问题尽可能有趣&#xff0c;他们花费了大量时间来构造具有挑战性的测试用例。特别是对于一个问题&#xff0c;「Haybales」&#xff0c;奶牛们需要你的帮助来设计具有挑战性的测试用例。这有关解决…

【语录】岁月

中年 写中年&#xff0c;应该是年少励志三千里 踌躇百步无寸功&#xff0c;转眼高堂已白发 儿女蹒跚学堂中&#xff0c;不如意事常八九&#xff0c;可与人言无二三 可是诸位&#xff0c;不用悲伤&#xff0c;稻盛和夫说&#xff0c; 人生并不是一场物质的盛宴&#xff0c;而是…

单片机学习笔记---LCD1602调试工具

LCD1602调试工具 这一节开始之前先说明一下&#xff0c;模块化编程相关的知识&#xff08;就是将代码分成多个文件来写&#xff0c;比如函数的定义放在.c文件中&#xff0c;函数的声明写在.h文件中&#xff09;属于是C语言的内容&#xff0c;学过C语言的伙伴应该都知道。由于这…

C语言中static的用法说明

1.修饰变量 变量分为局部变量和全局变量&#xff0c;但他们都是存在内存的静态区。 静态全局变量&#xff1a;作用域仅限于变量被定义的文件中&#xff0c;其他文件即使用extern声明也没法使用它。准确地说作用域是从定义之处开始&#xff0c;到文件结尾处结束&#xff0c;在…

Vue2.0+Element实现日历组件

(壹)博主介绍 &#x1f320;个人博客&#xff1a; 尔滨三皮⌛程序寄语&#xff1a;木秀于林&#xff0c;风必摧之&#xff1b;行高于人&#xff0c;众必非之。 (贰)文章内容 1、安装依赖 npm install moment2.29.4 --savenpm install lunar0.0.3 --savenpm install lunar-java…

配置华为交换机生成树VBST案例

知识改变命运&#xff0c;技术就是要分享&#xff0c;有问题随时联系&#xff0c;免费答疑&#xff0c;欢迎联系 厦门微思网络​​​​​​https://www.xmws.cn 华为认证\华为HCIA-Datacom\华为HCIP-Datacom\华为HCIE-Datacom 思科认证CCNA\CCNP\CCIE 红帽认证Linux\RHCE\RHC…

代理IP使用指南:风险与注意事项

在当今的数字化时代&#xff0c;使用在线代理IP已经成为一种常见的网络行为。然而&#xff0c;在使用这些代理IP时&#xff0c;我们需要注意一些风险和问题&#xff0c;以确保我们的网络安全和隐本私文。将探讨使用代理IP时需要注意的几个关键问题。 1、代理IP的安全性 使用代理…

设计模式:工厂方法模式

工厂模式属于创建型模式&#xff0c;也被称为多态工厂模式&#xff0c;它在创建对象时提供了一种封装机制&#xff0c;将实际创建对象的代码与使用代码分离&#xff0c;有子类决定要实例化的产品是哪一个&#xff0c;把产品的实例化推迟到子类。 使用场景 重复代码 : 创建对象…

OpenAI ChatGPT-4开发笔记2024-07:Embedding之Text Similarity文本相似度

语义相似性semantic similarity 背景结果 背景 OpenAI has made waves online with its innovative embedding and transcription models, leading to breakthroughs in NLP and speech recognition. These models enhance accuracy, efficiency, and flexibility while speed…

算法每日一题: 边权重均等查询 | 公共子祖先

大家好&#xff0c;我是星恒&#xff0c;今天给大家带来的是一道图里面有关公共子祖先的题目&#xff0c;理解起来简单&#xff0c;大家 题目&#xff1a;leetcode 2846 现有一棵由 n 个节点组成的无向树&#xff0c;节点按从 0 到 n - 1 编号。给你一个整数 n 和一个长度为 n …

聊聊大模型 RAG 探索之路的血泪史,一周出Demo,半年用不好

大家好&#xff0c;今天我们来继续看看 RAG 落地的一些有趣的事儿&#xff0c;从技术社群早上的讨论开始&#xff0c;喜欢技术交流的可以文末加入我们 一、从一周出Demo、半年用不好说起 最近读了读2024-傅盛开年AI大课&#xff0c;其中有讲到RAG环节&#xff0c;三张片子比较…

多路IO复用服务器——select模型和poll模型

文章目录 一、多路IO复用服务器是什么&#xff1f;二、使用原理三、种类四、select模型五、select模型优缺点六、poll模型总结 一、多路IO复用服务器是什么&#xff1f; 服务器要与客户端完成tcp连接&#xff0c;并保持连接维护可用sock。 每个都需要准备一个进程管一个sock&a…

解决Xshell连接远程linux服务器,关闭Xshell程序对应的运行程序也相应关闭的问题

1.先安装screen yum install screen 2.具体步骤 screen -S test #创建一个新的窗口&#xff0c;这个窗口关闭时程序不会关闭 java -jar test.jar #运行jar包 3.杀掉java进程后&#xff0c;重新启动java项目前要执行一下screen -S test&#xff0c;即以后启动一个项目前&#…

计算机网络(第六版)复习提纲15

第四章 网络层 处于五层体系结构的第三层 SS4.2 网际协议IP 与网际协议IPv4配套的三个协议&#xff1a; 地址解析协议ARP 网际控制报文协议ICMP 网际组管理协议IGMP 一、虚拟互联网络 1.四层次的互联设备 2.虚拟互连网络 网际协议IP使得性能各异的网络在网络层上看来好像是一…

2023.1.23 关于 Redis 哨兵模式详解

目录 引言 人工恢复主节点故障 ​编辑 主从 哨兵模式 Docker 模拟部署哨兵模式 关于端口映射 展现哨兵机制 哨兵重新选取主节点的流程 哨兵模式注意事项 引言 人工恢复主节点故障 1、正常情况 2、主节点宕机 3、程序员主动恢复 先看看该主节点还能不能抢救如果不好定…

统一异常处理

统一异常处理 统一异常处理创建一个类定义方法ControllerAdvice和ExceptionHandler注意事项 统一异常处理 创建一个类 首先,我们来创建一个类,名字随意,这里我们取名ERHandler 定义方法 在ERHandler中,我们可以定义几个类,参数用来接收各种异常,这里的异常可以是任意的,返回…

【每日一题】YACS P817:两数归零

题目描述 这是上海计算机学会竞赛P817&#xff1a;两数归零标签&#xff1a; m a p map map、二分查找给定 n n n 个整数 a 1 , a 2 , a 3 , . . . , a n a_1,a_2,a_3,...,a_n a1​,a2​,a3​,...,an​&#xff0c;请统计有多少二元组 i , j i,j i,j 满足 i < j i<j…

面试官:你可以说一说你对Jmeter元素的理解吗?下

面试官&#xff1a;你可以说一说你对Jmeter元素的理解吗&#xff1f;下 监听器配置元素CSV数据集配置HTTPCookie管理器HTTP请求默认值登录配置元素 监听器 Listeners&#xff1a;显示测试执行的结果。它们可以以不同的格式显示结果&#xff0c;例如树、表、图形或日志文件 图…

嵌入式学习-驱动

嵌入式的一些基本概念 CPU与MCU的区别 CPU&#xff08;中央处理器&#xff0c;central processing unit) 指集成了运算器、控制器、寄存器、高速缓存等功能模块的芯片&#xff0c;负责执行计算机程序指令的处理器。MCU&#xff08;单片微型计算机或单片机&#xff0c;microco…

LLM大语言模型(五):用streamlit开发LLM应用

目录 背景准备工作切记streamlit开发LLM demo开一个新页面初始化session先渲染历史消息接收用户输入模拟调用LLM 参考 背景 Streamlit是一个开源Python库&#xff0c;可以轻松创建和共享用于机器学习和数据科学的漂亮的自定义web应用程序&#xff0c;用户可以在几分钟内构建一…