消息中间件之RocketMQ源码分析(六)

Consumer消费方式

RocketMQ的消费方式包含Pull和Push两种

  • Pull方式。
    用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。
    缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中
    org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
    在这里插入图片描述
    1.fetchSubscribeMessageQueues(String topic).拉取全部可以消费的Queue.如果某一个Broker下线,这里也可以实时感知到
    2.遍历全部Queue,拉取每个Queue可以消费的消息
    3.如果拉取到消息,则执行用户编写的消费代码
    4.保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消息位点上报给Broker,也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度
  • Push方式。
    代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
    在这里插入图片描述
    1.初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Push服务PullMessageService.该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
    启动PullMessageService的拉取服务
    在这里插入图片描述

在这里插入图片描述
PullMessageService不断拉取消息。pullRequestQueue中保存着待拉取地Topic和Queue消息,程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
在这里插入图片描述
消费者拉取消息并消费,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;校验消费者是否被挂起。在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的ProcessQueue
在这里插入图片描述
在这里插入图片描述

拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认1000,可以调整),则延迟一段时间再拉取;
如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
在这里插入图片描述
并发消费和顺序消费校验。
在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。
顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,
需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
在这里插入图片描述
本地缓存队列的Span如果大于配置的最大差值(默认2000,可以调整),
则认为本地消费过慢,需要执行本地流控
在这里插入图片描述
队列锁定
在这里插入图片描述
订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
在这里插入图片描述
封装拉取请求和拉取后的回调对象PullCallback。这里主要将消息拉取请求和拉取结果处理封装成PullCallback,
并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。
如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
在这里插入图片描述

在这里插入图片描述
2.核心-消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessagOrderlyService
将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
在这里插入图片描述
3.核心-保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,
由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,
对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败

  • Pull和Push方式的比较
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/670827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker部署自己的网站wordpress

目录 安装 1.创建目录 2.创建并启动mysql 3.创建并启动wordpress 使用 1.设置语言 2.设置基础信息 3.首页 安装 1.创建目录 mkdir -p /opt/wordpress/{db,data} 2.创建并启动mysql docker run -d --name my_mysql --restart always -e MYSQL_ROOT_PASSWORD123456 -e …

flutter使用qr_code_scanner扫描二维码

qr_code_scanner仓库地址:qr_code_scanner | Flutter Package 需要添加android和ios的相机权限和本地相册权限: android中添加权限: 在android\app\build.gradle中修改:minSdkVersion 20 并且在android/app/src/main/AndroidManifest.xml中…

【力扣】Z字形变换,模拟+直接构造

Z字形变换原题地址 方法一:利用二维矩阵模拟 对于特殊情况,z字形变换后只有一行或只有一列,则变换后的字符串和原字符串相同。 对于一般情况,我们可以考虑按照题目要求,把字符串按照Z字形存储到二维数组中&#xff…

【linux】git和gdb调试工具

在linux下提交代码同步到gitee 1.创建一个新的仓库(演示步骤) 2.init 这两个步骤用于识别提交代码的身份,一个你的名字,一个你的邮箱 开启本地仓库 克隆本地仓库成功 我们将这个仓库拷到了111目录底下. 我们发现少了一个.gitig…

最小覆盖子串[困难]

优质博文:IT-BLOG-CN 一、题目 给你一个字符串s、一个字符串t。返回s中涵盖t所有字符的最小子串。如果s中不存在涵盖t所有字符的子串,则返回空字符串"" 。 对于t中重复字符,我们寻找的子字符串中该字符数量必须不少于t中该字符数量…

使用Nginx搭建旁路服务器获取客户端真实IP

一、前言 在实际业务开发过程中,很多时候有记录客户端真实IP的需求,但是从客户端发送的请求往往会经过很多代理服务器,导致后端服务获取的IP为代理以后的IP,不具有业务含义。为了解决这个问题,可以搭建一个旁路服务器…

谷歌seo搜索引擎优化方法有什么?

想知道谷歌优化方法有什么,首先要了解谷歌搜索引擎的工作原理,谷歌的工作原理主要是通过“爬虫”来实现的,所谓“爬虫”就是一只能够读取并分析网页内容的程序,或者也能理解成机器人,当你在谷歌上输入关键词进行搜索时…

uniapp小程序实现直播组件live-player全屏问题

实现效果&#xff1a; 代码&#xff1a; <template><view class"player-content"><!-- #ifdef APP-PLUS --><video id"myVideo" :src"srcLink" autoplay controls><!-- 打开全屏 --><image class"img…

速度规划:s形曲线------pencv c++绘图(1)

理论篇 代码篇&#xff1a; opencv环境配置 注意&#xff01;注意&#xff01;注意&#xff01; 配置结束后运行环境切换为如下再运行&#xff1a; #include <iostream> #include <cmath>#include <opencv2/opencv.hpp>using namespace std;double a_max…

github和gitee

github GitHub是一个面向开源及私有软件项目的托管平台&#xff0c;因为只支持Git作为唯一的版本库格式进行托管&#xff0c;故名GitHub。 github可以给提交的代码打上标签&#xff0c;方便版本的迭代和回退&#xff0c;也是一个存储代码的仓库 github工作区 gitee是gitHub的…

蓝桥杯---分小组

9名运动员参加比赛,需要分3组进行预赛. 有哪些分组的方案呢? 我们标记运动员为 A,B,C .... I 下面的程序列出了所有的分组方法。 该程序的正常输出为:

【CSS】margin塌陷和margin合并及其解决方案

【CSS】margin塌陷和margin合并及其解决方案 一、解决margin塌陷的问题二、避免外边距margin重叠&#xff08;margin合并&#xff09; 一、解决margin塌陷的问题 问题&#xff1a;当父元素包裹着一个子元素且父元素没有边框的时候&#xff0c;当给子元素设置margin-top:100px&…

【精选】java继承进阶——继承的特点 this、super

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…

5 款提升 UI 设计效率的软件工具

你知道如何选择正确的UI设计软件吗&#xff1f;你知道设计漂亮的用户界面和带来良好用户体验的应用程序需要什么界面设计软件吗&#xff1f;基于APP界面的不同功能&#xff0c;所选择的APP界面设计软件也会有所不同。然而&#xff0c;并不是说所有的APP界面设计软件都非常精通&…

Leetcode02.05:链表求和

一、题目描述 给定两个用链表表示的整数&#xff0c;每个节点包含一个数位。 这些数位是反向存放的&#xff0c;也就是个位排在链表首部。 编写函数对这两个整数求和&#xff0c;并用链表形式返回结果。 示例&#xff1a; 输入&#xff1a;(7 -> 1 -> 6) (5 -> 9 -…

【C语言】贪吃蛇 详解

该项目需要的技术要点 C语言函数、枚举、结构体、动态内存管理、预处理指令、链表、Win32API等。 由于篇幅限制 和 使知识模块化&#xff0c; 若想了解 使用到的 Win32API 的知识&#xff1a;请点击跳转&#xff1a;【Win32API】贪吃蛇会使用到的 Win32API 目录 1. 贪吃蛇游…

HTTP相关问题

目录 1.从输入URL到页面展示到底发生了什么&#xff1f; 2.HTTP状态码有哪些&#xff1f; 2.1 2XX(成功状态码) 2.2 3XX(重定向状态码) 2.3 4XX(客户端错误状态码) 2.4 5XX(服务端错误状态码) 3.HTTP 请求头中常见的字段有哪些&#xff1f; 4.HTTP和HTTPS有什么区别&…

vue3-内置组件-Transition

基于状态变化的过渡和动画&#xff08;常用&#xff09; 建议多看几遍~~。然后动手去写写&#xff0c;学编程只有多动手才能有感觉。 内置组件: 它在任意别的组件中都可以被使用&#xff0c;无需注册。 Vue 提供了两个内置组件&#xff0c;可以帮助你制作基于状态变化的过渡和动…

EMC测试介绍

EMC测试介绍 EMC包括电磁干扰(EMI) 和抗电磁干扰(EMS)两个部分。发射干扰传导发射测试极限线以峰值检坡器测量时使用的决策树应用EN55022标准的波形示例测试仪器![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/4580f693ae9a4f84891ece29681c7bf2.png) 辐射发射测试…

react 之 react.memo

React.memo 作用&#xff1a;允许组件在props没有改变的情况下跳过重新渲染 组件默认的渲染机制 默认机制&#xff1a;顶层组件发生重新渲染&#xff0c;这个组件树的子级组件都会被重新渲染 // memo // 作用&#xff1a;允许组件在props没有改变的情况下跳过重新渲染import…