消息中间件之RocketMQ源码分析(六)

Consumer消费方式

RocketMQ的消费方式包含Pull和Push两种

  • Pull方式。
    用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。
    缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中
    org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
    在这里插入图片描述
    1.fetchSubscribeMessageQueues(String topic).拉取全部可以消费的Queue.如果某一个Broker下线,这里也可以实时感知到
    2.遍历全部Queue,拉取每个Queue可以消费的消息
    3.如果拉取到消息,则执行用户编写的消费代码
    4.保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消息位点上报给Broker,也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度
  • Push方式。
    代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
    在这里插入图片描述
    1.初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Push服务PullMessageService.该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
    启动PullMessageService的拉取服务
    在这里插入图片描述

在这里插入图片描述
PullMessageService不断拉取消息。pullRequestQueue中保存着待拉取地Topic和Queue消息,程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
在这里插入图片描述
消费者拉取消息并消费,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;校验消费者是否被挂起。在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的ProcessQueue
在这里插入图片描述
在这里插入图片描述

拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认1000,可以调整),则延迟一段时间再拉取;
如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
在这里插入图片描述
并发消费和顺序消费校验。
在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。
顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,
需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
在这里插入图片描述
本地缓存队列的Span如果大于配置的最大差值(默认2000,可以调整),
则认为本地消费过慢,需要执行本地流控
在这里插入图片描述
队列锁定
在这里插入图片描述
订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
在这里插入图片描述
封装拉取请求和拉取后的回调对象PullCallback。这里主要将消息拉取请求和拉取结果处理封装成PullCallback,
并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。
如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
在这里插入图片描述

在这里插入图片描述
2.核心-消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessagOrderlyService
将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
在这里插入图片描述
3.核心-保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,
由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,
对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败

  • Pull和Push方式的比较
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/670827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3+ts+vite搭建步骤

vue3 ts vite搭建步骤如下: 1,首先确保你的机器上已经安装了 Node.js 和 npm。你可以通过在终端运行 node -v 和 npm -v 来检查是否已经安装。 2,安装 Vite。在你的终端运行以下命令 npm install -g create-vite3,使用 Vite 创…

docker部署自己的网站wordpress

目录 安装 1.创建目录 2.创建并启动mysql 3.创建并启动wordpress 使用 1.设置语言 2.设置基础信息 3.首页 安装 1.创建目录 mkdir -p /opt/wordpress/{db,data} 2.创建并启动mysql docker run -d --name my_mysql --restart always -e MYSQL_ROOT_PASSWORD123456 -e …

深度学习本科课程 实验1 Pytorch基本操作

一、Pytorch基本操作考察 1.1 任务内容 使用 𝐓𝐞𝐧𝐬𝐨𝐫 初始化一个 𝟏𝟑 的矩阵 𝑴 和一个 𝟐𝟏 的矩阵 𝑵,对两矩阵…

home work day5

第四章 堆与拷贝构造函数 一 、程序阅读题 1、给出下面程序输出结果。 #include <iostream.h> class example {int a; public: example(int b5){ab;} void print(){aa1;cout <<a<<"";} void print()const {cout<<a<<endl;} …

flutter使用qr_code_scanner扫描二维码

qr_code_scanner仓库地址&#xff1a;qr_code_scanner | Flutter Package 需要添加android和ios的相机权限和本地相册权限&#xff1a; android中添加权限: 在android\app\build.gradle中修改&#xff1a;minSdkVersion 20 并且在android/app/src/main/AndroidManifest.xml中…

【力扣】Z字形变换,模拟+直接构造

Z字形变换原题地址 方法一&#xff1a;利用二维矩阵模拟 对于特殊情况&#xff0c;z字形变换后只有一行或只有一列&#xff0c;则变换后的字符串和原字符串相同。 对于一般情况&#xff0c;我们可以考虑按照题目要求&#xff0c;把字符串按照Z字形存储到二维数组中&#xff…

【linux】git和gdb调试工具

在linux下提交代码同步到gitee 1.创建一个新的仓库&#xff08;演示步骤&#xff09; 2.init 这两个步骤用于识别提交代码的身份&#xff0c;一个你的名字&#xff0c;一个你的邮箱 开启本地仓库 克隆本地仓库成功 我们将这个仓库拷到了111目录底下. 我们发现少了一个.gitig…

测试python模块每个模块耗时

1.安装包 pip install snakeviz2. 运行测试 # 模块测试运行命令 PYTHONPATH$PWD python -m cProfile -o profile.stats_v2 new_core/schemes/cost_bom/bom_server.py #可视化运行的结果 python -m snakeviz profile.stats_v2 --server -H 0.0.0.0 -p 80813. 参数解释 cProfil…

最小覆盖子串[困难]

优质博文&#xff1a;IT-BLOG-CN 一、题目 给你一个字符串s、一个字符串t。返回s中涵盖t所有字符的最小子串。如果s中不存在涵盖t所有字符的子串&#xff0c;则返回空字符串"" 。 对于t中重复字符&#xff0c;我们寻找的子字符串中该字符数量必须不少于t中该字符数量…

使用Nginx搭建旁路服务器获取客户端真实IP

一、前言 在实际业务开发过程中&#xff0c;很多时候有记录客户端真实IP的需求&#xff0c;但是从客户端发送的请求往往会经过很多代理服务器&#xff0c;导致后端服务获取的IP为代理以后的IP&#xff0c;不具有业务含义。为了解决这个问题&#xff0c;可以搭建一个旁路服务器…

谷歌seo搜索引擎优化方法有什么?

想知道谷歌优化方法有什么&#xff0c;首先要了解谷歌搜索引擎的工作原理&#xff0c;谷歌的工作原理主要是通过“爬虫”来实现的&#xff0c;所谓“爬虫”就是一只能够读取并分析网页内容的程序&#xff0c;或者也能理解成机器人&#xff0c;当你在谷歌上输入关键词进行搜索时…

彻底学会系列:一、机器学习之线性回归(二)

0. 概念和公式 请参考&#xff1a;一、机器学习之线性回归&#xff08;一&#xff09; 1. 涉及公式 1.1 简单线性回归 y w x b y wx b ywxb 1.2 多元线性回归 y ^ w 1 X 1 w 2 X 2 . . . w n X n w 0 \hat y w_1X_1 w_2X_2 ... w_nX_n w_0 y^​w1​X1​w2​X2​.…

uniapp小程序实现直播组件live-player全屏问题

实现效果&#xff1a; 代码&#xff1a; <template><view class"player-content"><!-- #ifdef APP-PLUS --><video id"myVideo" :src"srcLink" autoplay controls><!-- 打开全屏 --><image class"img…

K8S-PV 与PVC

持久存储卷&#xff08;Persistent Volume&#xff0c;PV&#xff09; PV 是k8s管理员定义的好的物理存储或者说实际存储&#xff0c;对应用来说是透明的&#xff0c;应用只需要向着PVC申请即可&#xff0c;具体使用的创建好的那个PV是由PVC去匹配和绑定的。 PV是集群中的定义…

速度规划:s形曲线------pencv c++绘图(1)

理论篇 代码篇&#xff1a; opencv环境配置 注意&#xff01;注意&#xff01;注意&#xff01; 配置结束后运行环境切换为如下再运行&#xff1a; #include <iostream> #include <cmath>#include <opencv2/opencv.hpp>using namespace std;double a_max…

github和gitee

github GitHub是一个面向开源及私有软件项目的托管平台&#xff0c;因为只支持Git作为唯一的版本库格式进行托管&#xff0c;故名GitHub。 github可以给提交的代码打上标签&#xff0c;方便版本的迭代和回退&#xff0c;也是一个存储代码的仓库 github工作区 gitee是gitHub的…

蓝桥杯---分小组

9名运动员参加比赛,需要分3组进行预赛. 有哪些分组的方案呢? 我们标记运动员为 A,B,C .... I 下面的程序列出了所有的分组方法。 该程序的正常输出为:

【CSS】margin塌陷和margin合并及其解决方案

【CSS】margin塌陷和margin合并及其解决方案 一、解决margin塌陷的问题二、避免外边距margin重叠&#xff08;margin合并&#xff09; 一、解决margin塌陷的问题 问题&#xff1a;当父元素包裹着一个子元素且父元素没有边框的时候&#xff0c;当给子元素设置margin-top:100px&…

JS实现一键复制、选中复制、选中多行复制

JS实现一键复制 首先我们准备一份通用的一键复制代码&#xff1a; export function copyTextFun(text) {if (!text) return falsevar textarea document.createElement(textarea) textarea.value text document.body.appendChild(textarea) textarea.select() message.dest…

【精选】java继承进阶——继承的特点 this、super

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…