RocketMQ高级特性三-消费者分类

目录

前言

概述

区别

PullConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

SimpleConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

PushConsumer

定义与概述

原理机制

使用场景

优缺点

Java 代码示例

总结


前言

RocketMQ中的消费者分类主要包括三种类型:PullConsumerSimpleConsumer、和PushConsumer。每种消费者类型都有其特定的使用场景、原理机制以及优缺点。

注:生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

概述

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。

区别

对比项PushConsumerSimpleConsumerPullConsumer
接口方式使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。业务方自行实现消息处理,并主动调用接口返回消费结果。业务方自行按队列拉取消息,并可选择性地提交消费结果
消费并发度管理由SDK管理消费并发度。由业务方消费逻辑自行管理消费线程。由业务方消费逻辑自行管理消费线程。
负载均衡粒度5.0 SDK是消息粒度,更均衡,早期版本是队列维度消息粒度,更均衡队列粒度,吞吐攒批性能更好,但容易不均衡
接口灵活度高度封装,不够灵活。原子接口,可灵活自定义。原子接口,可灵活自定义。
适用场景适用于无自定义流程的业务消息开发场景。适用于需要高度自定义业务流程的业务开发场景。仅推荐在流处理框架场景下集成使用

PullConsumer

定义与概述

PullConsumer是一种传统的消息拉取模式,在这种模式下,消费者需要主动从Broker中拉取消息,而不是由Broker主动推送消息。这种模式提供了更大的灵活性,允许消费者根据自身的处理能力和业务逻辑,自主控制消息的拉取和消费节奏。

原理机制

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

  • 消息拉取:消费者调用pull方法,从Broker的消息队列中主动拉取消息。拉取的过程通常是循环进行的,消费者可以在每次拉取时指定要拉取的消息数量和位置。
  • 偏移量管理:消费者需要自行管理消息消费的偏移量(offset),确保每次拉取的消息都是未消费过的,防止消息重复消费。
  • 流控管理:消费者可以根据自身的处理能力和当前系统的负载情况,动态调整消息的拉取频率和数量,避免处理能力不足导致的消息积压。
使用场景
  • 批处理:适合需要批量处理消息的场景,例如数据分析、日志处理等。
  • 自定义处理逻辑:在需要对消息进行复杂的过滤、分组或排序处理时,PullConsumer提供了更大的灵活性。

PushConsumer的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。

  • 返回消费失败:以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。

  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略。

注:出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息。

优缺点
  • 优点
    • 灵活性高:消费者可以完全控制消息拉取的时机和频率,适合对消费策略要求较高的场景。
    • 流控能力强:通过手动控制拉取频率,消费者能够避免消息积压和系统过载。
  • 缺点
    • 实现复杂:消费者需要手动管理偏移量、处理消息的幂等性问题,并实现流控逻辑。
    • 实时性较低:由于消息是被动拉取的,实时性相对较低。
Java 代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.Set;public class PullConsumerExample {public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");for (MessageQueue mq : mqs) {long offset = getMessageQueueOffset(mq);PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);processPullResult(pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());}consumer.shutdown();}private static long getMessageQueueOffset(MessageQueue mq) {// 获取消费队列的偏移量return 0;}private static void putMessageQueueOffset(MessageQueue mq, long offset) {// 更新消费队列的偏移量}private static void processPullResult(PullResult pullResult) {// 处理拉取的消息switch (pullResult.getPullStatus()) {case FOUND:// 处理消息break;case NO_NEW_MSG:// 没有新消息break;default:break;}}
}

SimpleConsumer

定义与概述

SimpleConsumer是RocketMQ 4.8.0版本之后引入的一种消费模式,它是PullConsumer的简化版本,适用于需要拉取消息但不想管理复杂消费逻辑的场景。SimpleConsumer简化了消息拉取和流控的实现,提供了更为直观和易用的API。

原理机制
  • 简化的拉取模型:SimpleConsumer内置了部分PullConsumer的逻辑,如偏移量管理和拉取频率控制,使开发者可以更专注于业务逻辑的实现。
  • 异步拉取:SimpleConsumer支持异步拉取消息,进一步简化了使用流程,并减少了开发者在流控管理上的负担。
  • 偏移量自动管理:SimpleConsumer可以自动管理消息的偏移量,无需手动维护,降低了重复消费的风险。
使用场景
  • 轻量级消费:适合对消费量不大或对消费逻辑要求较简单的场景,例如小型任务处理和轻量级的数据收集。
  • 异步任务处理:适合需要异步处理消息的场景,通过异步拉取简化了业务逻辑的实现。
优缺点
  • 优点
    • 简单易用:相较于PullConsumer,SimpleConsumer大大简化了开发难度,降低了使用门槛。
    • 自动管理:自动偏移量管理和异步拉取降低了消息丢失和重复消费的风险。
  • 缺点
    • 灵活性不足:由于自动管理逻辑的加入,SimpleConsumer在一些复杂场景下可能不如PullConsumer灵活。
    • 适用场景有限:由于其简化的设计,SimpleConsumer更适合处理较为简单的消费逻辑。
Java 代码示例
import org.apache.rocketmq.client.consumer.SimpleConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;public class SimpleConsumerExample {public static void main(String[] args) throws MQClientException {SimpleConsumer consumer = new SimpleConsumer("ConsumerGroupName", "localhost:9876");MessageQueue mq = new MessageQueue("TopicTest", "BrokerName", 0);long offset = consumer.searchOffset(mq, System.currentTimeMillis());List<Message> messages = consumer.pull(mq, "*", offset, 10);for (Message message : messages) {System.out.printf("Received Message: %s%n", new String(message.getBody()));}consumer.shutdown();}
}

PushConsumer

定义与概述

PushConsumer是RocketMQ中最常用的消费模式,Broker主动将消息推送给消费者,消费者只需关注如何处理接收到的消息。PushConsumer是事件驱动的消息消费模式,适用于需要实时处理消息的场景。

原理机制
  • 消息推送:Broker会自动将新到达的消息推送到消费者,消费者只需实现相应的处理逻辑。
  • 并发消费:PushConsumer支持多线程并发消费,通过配置线程池可以提高消息处理的并发性。
  • 消费模式:支持两种消费模式:并发消费(ConsumeConcurrently)和顺序消费(ConsumeOrderly)。并发消费不保证消息的顺序性,而顺序消费保证同一个队列内的消息按顺序消费。
使用场景
  • 实时处理:适合需要实时处理消息的场景,如金融交易、监控告警、实时数据分析等。
  • 并发消费:适合需要高并发处理的场景,通过配置多线程提高消费吞吐量。
优缺点
  • 优点
    • 实时性高:消息被推送到消费者后可以立即处理,适用于需要实时响应的业务场景。
    • 易于实现:PushConsumer实现简单,开发者只需关注业务逻辑的实现,而无需关心消息拉取和偏移量管理。
  • 缺点
    • 流控难度大:推模式下流控难度较大,可能出现消息积压的情况,尤其是在消费处理速度跟不上消息推送速度时。
    • 并发限制:虽然支持多线程并发消费,但对于顺序消费模式,仍然可能存在并发处理的局限性。
Java 代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class PushConsumerExample {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Received Message: %s%n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

总结

在RocketMQ的消费者分类中:

  • PullConsumer提供了灵活的消息拉取机制,适用于需要自主控制消费节奏的场景。
  • SimpleConsumer在PullConsumer的基础上进行了简化,适用于简单的消费场景。
  • PushConsumer则是最常用的模式,适用于需要实时处理消息的场景。

通过对不同消费者类型的理解和选择,可以更好地满足不同业务场景的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/53314.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

常用排序算法(上)

目录 前言&#xff1a; 1.排序的概念及其运用 1.1排序的概念 1.2排序运用 1.3 常见的排序算法 2.常见排序算法的实现 2.1 堆排序 2.1 1 向下调整算法 2.1 2 建堆 2.1 3 排序 2.2 插入排序 2.1.1基本思想&#xff1a; 2.1.2直接插入排序&#xff1a; 2.1.3 插…

SQL进阶技巧:每年在校人数统计 | 区间重叠问题

目录 0 问题分析 1 数据准备 2 问题分析 3 小结 区间重叠问题 0 问题分析 有一个录取学生人数表 in_school_stu,记录的是每年录取学生的人数及录取学生的学制,计算每年在校学生人数。 1 数据准备 create table in_school_stu as ( select stack(5,1,2001,2,1200,2,2000…

Vue 中 watch 和 watchEffect 的区别

watch 和 watcheffect 都是 vue 中用于监视响应式数据的 api&#xff0c;它们的区别在于&#xff1a;watch 用于监视特定响应式属性并执行回调函数。watcheffect 用于更通用的响应式数据监视&#xff0c;但回调函数中不能更新响应式数据。Vue 中 watch 和 watchEffect 的区别 …

linux下的Socket网络编程教程

套接字概念 Socket本身有“插座”的意思&#xff0c;在Linux环境下&#xff0c;用于表示进程间网络通信的特殊文件类型。本质为内核借助缓冲区形成的伪文件。与管道类似的&#xff0c;Linux系统将其封装成文件的目的是为了统一接口&#xff0c;使得读写套接字和读写文件的操作…

从材料到应用:螺杆支撑座材质选择的多样性与精准性!

支撑座是连接丝杆和电机的轴承固定座&#xff0c;其材料的选择直接影响使用效果。那么&#xff0c;大家知道螺杆支撑座常用的材质有哪些吗&#xff1f; 1、高碳钢&#xff1a;高碳钢因其高强度和良好的耐磨性&#xff0c;是螺杆支撑座制作中常用的材料。它能够很好地配合滚珠螺…

ESD防静电监控系统助力电子制造行业转型升级

在电子制造行业中&#xff0c;静电危害不容小觑。ESD 防静电监控系统的出现&#xff0c;为行业转型升级带来强大助力。电子元件对静电极为敏感&#xff0c;微小的静电放电都可能损坏元件&#xff0c;影响产品质量。ESD 防静电监控系统能够实时监测生产环境中的静电状况&#xf…

C++——类和对象(2)

目录 一、类的默认成员函数 二、构造函数 &#xff08;1&#xff09;定义 &#xff08;2&#xff09;特点 三、析构函数 &#xff08;1&#xff09;定义 &#xff08;2&#xff09;特点 四、拷贝构造函数 &#xff08;1&#xff09;定义 &#xff08;2&#xff09;特…

【2024-2025源码+文档+调试讲解】微信小程序的城市公交查询系统

摘 要 当今社会已经步入了科学技术进步和经济社会快速发展的新时期&#xff0c;国际信息和学术交流也不断加强&#xff0c;计算机技术对经济社会发展和人民生活改善的影响也日益突出&#xff0c;人类的生存和思考方式也产生了变化。传统城市公交查询管理采取了人工的管理方法…

【论文阅读】DETRs Beat YOLOs on Real-time Object Detection

文章目录 摘要一、介绍二、相关工作2.1 实时目标检测器2.2 端到端目标检测器 三、检测器的端到端速度3.1 分析 NMS3.2 端到端速度基准 四、实时 DETR4.1 模型概述4.2 高效混合编码器4.3不确定性最小的查询选择4.4 缩放的RT - DETR 五、实验5.1 与SOTA对比5.2 混合编码器的消融研…

【重构获得模式 Refactoring to Patterns】

重构获得模式 Refactoring to Patterns 面向对象设计模式是“好的面向对象设计”&#xff0c;所谓“好的面向对象设计”指的是那些可以满足“应对变化&#xff0c;提高复用”的设计。 现代软件设计的特征是“需求的频繁变化”。设计模式的要点是“寻找变化点&#xff0c;然后…

大语言模型LLM权重4bit向量量化(Vector Quantization)/查找表量化基本原理

参考 https://apple.github.io/coremltools/docs-guides/source/opt-palettization-overview.html https://apple.github.io/coremltools/docs-guides/source/opt-palettization-algos.html Apple Intelligence Foundation Language Models 苹果向量量化&#xff1a; DKM:…

在VMware虚拟机中编译文件的时候报错:找不到头文件ft2build.h

以下是报错内容&#xff0c;提示说找不到头文件ft2build.h freetype_show_font.c:12:10: fatal error: ft2build.h: No such file or directory #include <ft2build.h> ^~~~~~~~~~~~ compilation terminated. 在编译之前已经交叉编译了freetype&#xff0c;…

MQ-2烟雾传感器详解(STM32)

目录 一、介绍 二、传感器原理 1.原理图 2.引脚描述 3.工作原理介绍 三、程序设计 main.c文件 mq2.h文件 mq2.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 MQ-2气体传感器是一种常用的气体传感器&#xff0c;用于检测空气中的烟雾浓度。工作原理是基于半导…

Java项目: 基于SpringBoot+mybatis+maven+mysql图书馆管理系统(含源码+数据库+任务书+答辩PPT+毕业论文)

一、项目简介 本项目是一套基于SpringBootmybatismavenmysql图书馆管理系统 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操…

网络安全应急响应技术原理与应用

网络安全应急响应概述 概念 为应对网络安全事件&#xff0c;相关人员或组织机构对网络安全事件进行监测、预警、分析、响应和恢复等工作 网络安全应急响应组织建立与工作机制 网络安全应急响应预案内容与类型 常见网络安全应急事件场景与处理流程 应急演练&#xff1a;对假定…

一台手机一个ip地址吗?手机ip地址泄露了怎么办

在数字化时代&#xff0c;‌手机作为我们日常生活中不可或缺的一部分&#xff0c;‌其网络安全性也日益受到关注。‌其中一个常见的疑问便是&#xff1a;‌“一台手机是否对应一个固定的IP地址&#xff1f;‌”实际上&#xff0c;‌情况并非如此简单。‌本文首先解答这一问题&a…

Web3社交新经济,与 SOEX 实现无缝交易的高级安全性

出于充分的理由&#xff0c;安全性是交易中至关重要的考虑因素。每个人都应该确保自己的资金在交易时是安全的。由于 &#xff33;&#xff2f;&#xff25;&#xff38; 充当您与交易所的最佳连接&#xff0c;因此必须强调的是&#xff0c;该系统不会引发任何安全问题。 &a…

模型 涌现思想

系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。整体产生新特性&#xff0c;超越部分之和。 1 涌现思想的应用 1.1 蚁群算法中的涌现思想 蚁群算法&#xff08;Ant Colony Optimization, ACO&#xff09;是一种模拟蚂蚁觅食行为的计算模型&#xf…

QT项目实战之音乐播放器2.0版本

该版本相较于1.0版本最主要的不同在于连接数据库实现类似于歌曲收藏和取消收藏的功能。 详细情况看我的这篇文章http://t.csdnimg.cn/WS5s8。 效果展示 VSMyMusicShow2.0 define.h UseMySQL.h musicInfo.h VSMyMusicPlayer.h

PMP–一、二、三模–分类–14.敏捷–技巧–帮助团队交付价值的执行实践迭代和增量如何帮助交付工作产品

文章目录 技巧一模14.敏捷--实践--帮助团队交付价值的执行实践--持续集成--在不同层面测试、验收测试驱动开发 (ATDD) 、测试驱动开发和行为驱动开发、刺探 。90、 [单选] 敏捷项目的第一次迭代即将开始。发起人召集团队、Scrum主管、产品负责人和其他项目干系人参加启动会议。…