RabbitMQ-消息中间件学习记录(what-how-why)

  1. 什么是消息中间件
    简单的来说就是消息队列中间件,生产者发送消息到中间件,消息中间件用于
    保存消息并发送消息到消费者。

  2. 消息中间件RabbitMQ的基本组件
    1)producer -生产者
    2)customer -消费者
    3)broker (经纪人)- MQ服务器,管理消息对列、消息及相关消息。(接收并存储生产者发送的消息,发送消息到消费者)
    4)exchange-交换机,将生产者的消息按照一定规则发送给对应的消息对列queue
    5)queue-消息对列,队列,消息存放的容器,消息先进先出
    6)Message-消息,程序间的通信的数据

  3. 什么是消息队列queue(生产者生产msg-queue,消费者监听queue-消费)
    消息对列是一种分布式中的通信方式,它通过异步传输消息的方式,来解耦消 息的 生产者和消费者。在消息中间件中,生产者将消息发送到消息对列中,以为先进先出的方式,消费者从对列中取出消息(可以监听对列是否有消息-@RabblitListener和@RabbitHandler)

  4. 消息中间件的作用
    主要有三个作用:分别是服务解耦、实现异步通信、流量削峰
    1). 服务解耦:(场景-用户下订单、库存服务工作)
    例如订单服务-用户下订单,库存服务处理对应减库存,才返回给用户下单成功的消息。如果说库存服务出现了问题,就会造成订单丢失等问题。如果使用消息中间件(消息对列),可以把下的订单信息—> mq就返回用户下单这个,mq再发送给库存服务,这样生产者发送消息和消费者接收处理消息相互不影响,即使宕机了,消息还在中间件中。

2). 异步通信/异步调用:(用户注册新用户,服务发送短信和邮件)
传统的模式,用户注册系统新用户,服务给用户发送短信和邮件,三个操作都完成之后才返回用户下单注册的消息。因为短信和邮箱和注册信息是没有关系的服务,用户注册后消息发送给mq,用户不需要等邮件和短信发送成功,mq直接返回用户注册成功,至此用户注册业务完成。至于短信和邮件交给mq发送给短信业务-去发送。

注意:
异步就是某线程发出请求,不需要等其他线程完成就接着完成操作。用户注册,消息发送给mq,不需要等短信服务完成,短信发布发送都与注册无关,两者是异步关系。异步不是并发,所有操作同时进行,异步是各过各的。

3). 流量削峰:(商品秒杀)
例如商品秒杀的时候,这时候数据库并不能承受这么大的请求。可以把请求下订单的信息暂存在mq中,返回给用户下单成功,之后的操作由mq发送给对应的服务处理。缓存数据减少数据库的压力。

  1. 为什么需要使用消息中间件
    服务解耦、异步通信、流量削峰

  2. 消息中间件在分布式系统中使用场景(异步)
    6.1 服务解耦-订单和库存服务。用户下订单,消息发给mq,mq返回用户下订单成功,消费者-库存服务接收mq消息再去调用减少库存的消息。
    6.2 异步通信-用户注册新账户 用户注册和admin发送短信和邮件异步
    6.3 流量削峰-商品秒杀,先mq先存储订单信息,返回订单服务下单成功,后慢慢处理。减少大并发对数据库的影响/。

  3. RabbitMQ的五种消息模型/工作模式、
    1) simple 简单的一对一模式,producce-queue-customer
    2) word模式,一个消息对列queue—> 多个消费者,消费者争抢消息队列里面消息,注意一个消息只能被一个消费者消费。
    3) fanout-广播、订阅者模式。交换机将消息发送给所有binding的对列,消费端可以有多个customer使用word模式消费对列的消息。
    4) topic-主体模式,生产者的消息按照不同的路由规则,模糊匹配给不同满足条件的消息对列,消费者再去消费对列中消息
    5)routeKey,路由键(exchange-type-direct),按照不同的路由键发送到对应的queue中。

  4. 消息中间件是异步还是同步
    异步,各干各的,互不影响。(异步并不是并发-同时请求一个请求,而是互不影响个干各的,没有约束和先后顺序)。received生产者的message,send消息到消费者。二者是异步,解耦合互不影响。

  5. mq的消息确认机制confirm(MQ如何避免消息丢失?)

    1. . 对于生产者端来说,主要有两种确认机制
      a. message到broker后,mq立马确认confirm并返回消息告知生产者消息发送成功,如果失败也告知生产者,并重新发送。
      b. message到MQ之后,如果消息对列没有received成功(queue存储msg成功),会确认并返回消息接收失败到生产者
      a b 保证了生产者端不会丢失消息。

    2). 对于消费者来说。
    a. 消费者接收到queue的消息后,默认自动确认,queue删除该message。
    b. 消费者接收到msg后,对数据进行逻辑处理,如果直接confirm-queue直接删除msg,处理数据过程中可能会宕机消息丢失。
    ----设置为手动confirm确认收货,数据处理完再收货成功,queue再去删除msg。也可以对数据不满,退回到queue重新入队,也可以直接删除数据。
    c. 接收失败告知queue,不会删除数据,MQ重新发送消息-这种操作很常见
    这样避免数据在消费者端丢失

1、2两种方式避免了mq的消息丢失。

  1. MQ重复消费
    1)如何造成重复消费
    (1) 生产者端,传输到MQ-queue消息对列接收成功,MQ因为网络问题没有ack->producer,导致生产者又发送了一次消息到MQ。queue-customer-这样msg就被消费了两次。
    (2)消费者端,MQ-queue消息对列消息传到customer。一种是消费者没有接收成功,因为网络问题没有ack queue,queue重复发送,这种不会造成msg重复消费。另一种是消费者消费成功,但是因为不可控因素没有ack queue,消息对列重复发送mgs-to-customer-重复消费。

    2)解决方法
    对于幂等性消息(查询),消费者重复消费也没有关系。
    对于非幂等性消息,消费者重复消费就会有影响了。
    方法:消费者在消费消息之前,获取msg唯一id,到redis进行存储判断setnx(判断是已经存在并存储key-value)。

Boolen flag=stringRadisTemplate.opsForValue.setAbsent(id,value);
1-flag=true,key不存在,未被消费,c正常消费msg
2-false,key存在,已经被消费(两种可能-正在消费或者完成消费-忘记告知ack-queue了),无论哪种情况都直接丢弃。

注意一个问题:如果redis显示有消费记录-且消费者正在消费,此时消费者执行业务宕机了,redis分布式锁会成死锁-解决方法在IfAbsent方法加上过期时间和单位。

一句话就是:消费之前,缓存中有消费记录则丢弃消息,不二次消费。
redis缓存中没有消费记录则,重复存入缓存并消费(设计锁过期时间)。

以下是消息中间件MQ的相关代码和配置信息

  1. 使用MQ的步骤
    1)在pom文件中加上依赖amqp
 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency
2) 配置文件配置rabbit服务器的对应信息(spring.rabbitmq host、port,username,ps等)
spring.rabbitmq.host=rabbitmq服务器地址信息
spring.rabbitmq.port=端口号
spring.rabbitmq.username=账户name
spring.rabbitmq.password=密码
spring.rabbitmq.virtual-host=/
#1. 生产者发送message, mq收到消息就确认回复到生产者
spring.rabbitmq.publisher-confirms=tr
#2. queue消息对列接收生产者的消息失败,就确认返回消息到生产操者
spring.rabbitmq.publisher-returns=true
#3. 消费者接收queue消息对列的消息之后,手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3) 服务启动类上面加上注解@EnableRabbit-开启MQ
在springboot启动类加上 @EnableRabbit-开启MQ

4) 业务使用消息中间件存储消息的时候
(1) 创建交换机(注意有不同类型的交换机 direct-fanout-topic)

        public void createExchange() {
//        1. 创建direct类型的exchange     交换机的名字-hello.java.exchangeDirectExchange directExchange = new DirectExchange("hello.java.exchange", true, false);
//       2. 声明交换机amqpAdmin.declareExchange(directExchange);log.info("exchange创建成功1111", "hello.java.exchange");}

(2)创建消息队列queue

public void createQueue() {
//       1. 创建队列-queue  队列名称-hello-java-queueQueue queue = new Queue("hello.java.queue", true, false, false);
//       2. 声明mq队列amqpAdmin.declareQueue(queue);log.info("queue创建成功1111", "hello.java.queue");}

(3)交换机和消息队列直接关系绑定

    public void bindEQ() {
//        1. 创建绑定对象( "hello.java.queue"--消息对列, "hello.java.exchange"--交换机,"hello.java"-绑定关系的route-key)Binding binding = new Binding("hello.java.queue",Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);//       2. 声明绑定关系(这个关系实际也是一个对象)amqpAdmin.declareBinding(binding);log.info("Binding创建成功1111", "hello.java.binding");}

(4)使用MQ的操作工具类 RabbitTemplate-操作发送消息
对象注入

  @AutowiredRabbitTemplate rabbitTemplate;

生产者发送消息,需要携带消息-mgs和发送给哪个queue的route-key。注意发送消息需要一个唯一id,后面防止重复发送需要此id判断

    public void sendMessageStr() throws InterruptedException {String msg = "测试数据测试数";
//        发送10条message到exchange中
//        new CorrelationData(UUID.randomUUID().toString()  发送的消息的唯一id mq可以接收并处理rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java" , msg+ "11111111111111", new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", msg + "222222222222", new CorrelationData(UUID.randomUUID().toString()));log.info("交换机消息发送成功----------->");}
   (4)消费者监听消息对列消息,消费消息使用@RabbitListener监听消息对列,使用RabbitHandler接收对应类型的消息。前者放在类上面,后者放到监听方法上面。
queues是消息对列名称的集合
@RabbitListener(queues = {"hello.java.queue"})

使用@RabbitHandler监听不同类型的消息

// 消息是TestEntity2 类型,会自动匹配到对应方法接收
@RabbitHandler
public void receiveOfSecond(TestEntity2 testEntity2) throws InterruptedException {System.out.println("receiveOfSecond-监听接受queue的数据是----->" + testEntity2);
}@RabbitHandler
public void receiveOfFirst(TestEntity testEntity) throws InterruptedException {System.out.println("receiveOfFirst-监听接受queue的数据是----->" + testEntity);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/43228.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java 动态数据统计图】动态数据统计思路案例(动态,排序,数组)四(116)

需求&#xff1a;&#xff1a;前端根据后端的返回数据&#xff1a;画统计图&#xff1b; 1.动态获取地域数据以及数据中的平均值&#xff0c;按照平均值降序排序&#xff1b; 说明&#xff1a; X轴是动态的&#xff0c;有对应区域数据则展示&#xff1b; X轴 区域数据降序排序…

LabVIEW调用DLL传递结构体参数

LabVIEW 中调用动态库接口时&#xff0c;如果是值传递的结构体&#xff0c;可以根据字段拆解为多个参数&#xff1b;如果参数为结构体指针&#xff0c;可用簇&#xff08;Cluster&#xff09;来匹配&#xff0c;其内存连续相当于单字节对齐。 1.值传递 接口定义&#xff1a; …

【FAQ】调用视频汇聚平台EasyCVR的iframe地址,视频无法播放的原因排查

有用户反馈&#xff0c;在调用iframe地址后嵌入用户自己的前端页面&#xff0c;视频无法播放并且要求登录。 安防监控视频汇聚平台EasyCVR基于云边端一体化架构&#xff0c;具有强大的数据接入、处理及分发能力&#xff0c;可提供视频监控直播、云端录像、视频云存储、视频集中…

视频集中存储EasyCVR视频汇聚平台定制项目增加AI智能算法

安防视频集中存储EasyCVR视频汇聚平台&#xff0c;可支持海量视频的轻量化接入与汇聚管理。平台能提供视频存储磁盘阵列、视频监控直播、视频轮播、视频录像、云存储、回放与检索、智能告警、服务器集群、语音对讲、云台控制、电子地图、平台级联、H.265自动转码等功能。为了便…

【Unity每日一记】Physics.Raycast 相关_Unity中的“X光射线”

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

05_bitmaphyperloglogGEO

Bitmap&hyperloglog&GEO 面试问 记录对集合中的数据进行统计在移动应用中&#xff0c;需要统计每天的新增用户数和第2天的留存用户数&#xff1b;在电商网站的商品评论中&#xff0c;需要统计评论列表中的最新评论&#xff1a;在签到打卡中&#xff0c;需要统计一个月内…

Python “贪吃蛇”游戏,在不断改进中学习pygame编程

目录 前言 改进过程一 增加提示信息 原版帮助摘要 pygame.draw pygame.font class Rect class Surface 改进过程二 增加显示得分 改进过程三 增加背景景乐 增加提示音效 音乐切换 静音切换 mixer.music.play 注意事项 原版帮助摘要 pygame.mixer pygame.mix…

kvm和vmware有什么区别?如何选择?

一、kvm和vmware的区别 VMware vSphere 平台 VMware 可以提供 ESXi 虚拟机监控程序和 vSphere 虚拟化平台。VMware ESXi 是一个能够直接安装到物理服务器上的裸机虚拟机监控程序&#xff0c;可以帮你整合硬件。你可以用 VMware 的虚拟化技术来创建和部署虚拟机&#xff08;VM…

HTML详解连载(7)

HTML详解连载&#xff08;7&#xff09; 专栏链接 [link](http://t.csdn.cn/xF0H3)下面进行专栏介绍 开始喽结构伪类选择器作用 :nth-child&#xff08;公式&#xff09;作用举例 伪元素选择器作用注意&#xff1a; PxCoook作用盒子模型-重要组成部分 盒子模型-边框线属性名属性…

excel中定位条件,excel中有哪些数据类型、excel常见错误值、查找与替换

一、如何定位条件 操作步骤&#xff1a;开始 - 查找和选择 - 定位条件&#xff08;ctrl G 或 F5&#xff09; 注&#xff1a;如果F5不可用&#xff0c;可能是这个快捷键被占用了 案例&#xff1a;使用定位条件选择取余中空单元格&#xff0c;填入100&#xff0c;按组合键ct…

【LeetCode75】第三十三题 二叉树的最大深度

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 从这一题开始&#xff0c;LeetCode75进入到了二叉树章节。 这边建议不熟悉二叉树的小伙伴可以先去做做力扣的前序遍历&#xff0c;中序遍…

使用git rebase 之后的如何恢复到原始状态

我们常常喜欢使用git rebase去切换分支提交代码,操作流程就是: 先切换分支:比如当前是master 我们修改了一堆代码产生一个commit id :5555555567777 那么我们常常比较懒就直接切换了:git checkout dev 然后呢?使用命令git rebase 5555555567777,想把这笔修改提交到d…

iPhone上的个人热点丢失了怎么办?如何修复iPhone上不见的个人热点?

个人热点功能可将我们的iPhone手机转变为 Wi-Fi 热点&#xff0c;有了Wi-Fi 热点后就可以与附近的其他设备共享其互联网连接。 一般情况下&#xff0c;个人热点打开就可以使用&#xff0c;但也有部分用户在升级系统或越狱后发现 iPhone 的个人热点消失了。 iPhone上的个人热点…

antd5源码调试环境搭建(window系统)

将antd源码克隆至本地 $ git clone gitgithub.com:ant-design/ant-design.git $ cd ant-design $ npm install $ npm start前提安装python3、安装node版本18版本 不然后续安装依赖会报python3相关的错误。 项目需要使用git 初始化 不然会报husky相关的错误 git init重新安…

【论文解读】Hybrid-SORT: Weak Cues Matter for Online Multi-Object Tracking

因为Hybrid-SORT的baseline是基于OCSORT进行改进的&#xff0c;在这之前建议先了解byteTrack和【】的相关知识 1.介绍 1.1 基本框架 多目标跟踪(MOT)将问题分为两个子任务。第一个任务是检测每个帧中的对象。第二个任务是将它们在不同的框架中联系起来。关联任务主要通过显式…

RabbitMq-发布确认高级(避坑指南版)

在初学rabbitMq的时候&#xff0c;伙伴们肯定已经接触到了“发布确认”的概念&#xff0c;但是到了后期学习中&#xff0c;会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢&#xff1f;或者是什么样的场景引出这样的概念呢&#xff1f; 在生产环…

day45 ● 70. 爬楼梯 (进阶)● 322. 零钱兑换 ● 279.完全平方数

70. 爬楼梯 class Solution {public int climbStairs(int n) {if(n <2) return n;int[] dp new int [n];dp[0] 1;dp[1] 2;for(int i 2; i< n;i){dp[i] dp[i-1] dp[i-2];}return dp[n-1];} } 322. 零钱兑换 class Solution {public int coinChange(int[] coins, in…

为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

目录 一、为什么需要带有 subscribe 的 group.id二、我们需要使用commitSync手动提交偏移量吗&#xff1f;三、如果我想手动提交偏移量&#xff0c;该怎么做&#xff1f; 一、为什么需要带有 subscribe 的 group.id 消费概念&#xff1a; Kafka 使用消费者组的概念来实现主题的…

vscode | linux | c++ intelliense 被弃用解决方案

每日一句&#xff0c;vscode用的爽是爽&#xff0c;主要是可配置太强了。如果也很会研究&#xff0c;可以直接去咸鱼接单了 废话少说&#xff0c;直接整。 用着用着说是c intelliense被弃用&#xff0c;很多辅助功能无法使用&#xff0c;像查看定义、查看引用、函数跳转、智能提…

基于Rust的QuickLZ压缩算法的详细实现与分析

1. 引言 QuickLZ是一种被广泛应用的高效压缩算法。在许多应用中&#xff0c;快速的数据压缩和解压缩是非常关键的&#xff0c;特别是在网络传输和存储空间有限的场景中。为了满足现代软件开发的需求&#xff0c;我们将使用Rust语言来实现这一算法。Rust是一种专为系统级编程而…