RocketMQ消息处理详解!

文章目录

  • 引言
  • 同步发送
    • 原理分析
    • 优缺点
      • 优点
      • 缺点
    • 使用场景
  • 异步发送
    • 原理分析
    • 优缺点
      • 优点
      • 缺点
      • 使用场景
  • 单向发送
    • 原理分析
    • 优缺点
      • 优点
      • 缺点
    • 使用场景
  • 三种方式对比
  • 如何选择
    • 同步发送
    • 异步发送
    • 单向发送
  • 总结

引言

在 RocketMQ 中,有 3种简单的消息发送方式:同步发送、异步发送和单向发送。这篇文章,我们将详细分析这三种发送方式的原理、优缺点、使用场景以及使用该方式是否会丢失数据。

本文源码基于: Apache RocketMQ release-5.2.0

同步发送

原理分析

在同步发送模式下,RocketMQ 默认采用同步刷盘方式,当生产者将消息发送到 Broker 后,会等待 Broker 的响应(默认超时 5分钟),Broker 接收消息后,会将其写入内存缓存,并进行刷盘操作。因此,如果 Broker 响应成功,代表消息一定成功写入磁盘。

在这里插入图片描述

同步发送主要涉及以下几个步骤:

1、创建Producer:创建一个Producer对象;
2、创建消息:创建一个Message对象,设置Topic、Tag标签和消息体;
3、发送消息:调用DefaultMQProducer的send方法;
4、等待响应:发送方会阻塞等待服务器的响应,直到收到确认消息;

在这里插入图片描述

如下示例代码为一个完整的同步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;public class SyncProducerTest {public static void main(String[] args) throws Exception {// 1、创建 producer,设置组名为 SyncGroupTestDefaultMQProducer producer = new DefaultMQProducer("SyncGroup");// 2、指定 NameServer的地址,以获取 Broker路由地址producer.setNamesrvAddr("x.x.x.x:9876");// 3、启动 producerproducer.start();// 4、创建消息,并指定 Topic,Tag和消息体Message msg = new Message("SyncTopic", "sync", "SyncMessage".getBytes("UTF-8"));// 5、发送同步消息SendResult sendResult = producer.send(msg);// 6、通过 sendResult 判断消息是否成功送达System.out.printf("message send result:" + sendResult);// 7、关闭 Producerproducer.shutdown();}
}

RocketMQ 的同步发送主要涉及以下几个关键源码类和方法:

1、DefaultMQProducer:生产者类,负责发送消息。
2、MQClientAPIImpl#sendMessage:底层消息发送实现。
3、NettyRemotingClient#invokeSync:通过 Netty 实现网络通信。
4、Broker 端的 SendMessageProcessor:处理发送请求。

优缺点

优点

1、简单易用。
2、可靠性高,发送方可以确认消息是否成功发送,一旦发送成功,消息就已经写入磁盘,消息不会丢失。

缺点

1、延迟较高,需要等待服务器的响应。
2、吞吐量可能受限于网络延迟和服务器性能。

使用场景

适用于对消息可靠性要求较高的场景,如订单系统、金融交易、重要的消息通知等。


异步发送

原理分析

在异步发送模式下,RocketMQ 默认采用异步刷盘方式,当生产者发送消息到 Broker 后,消息写入内存缓存成功后,Broker 立即返回响应(默认超时 5分钟),后台线程再异步将消息批量写入磁盘。因此,这种方式提高了系统的吞吐量和性能,但在系统崩溃时可能会丢失部分未刷盘的消息。

在这里插入图片描述

异步发送主要涉及以下几个步骤:

1、创建Producer:创建一个Producer对象;
2、创建消息:同样创建一个Message对象。
3、发送消息:调用DefaultMQProducer的send方法,传递一个SendCallback回调对象。
4、处理响应:回调函数会在消息发送成功或失败时被调用。

在这里插入图片描述

如下示例代码为一个完整的异步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;public class AsyncProducerTest {public static void main(String[] args) throws Exception {// 1、创建 producer,设置组名为 AsyncGroupTestDefaultMQProducer producer = new DefaultMQProducer("AsyncGroup");// 2、指定 NameServer的地址,以获取 Broker路由地址producer.setNamesrvAddr("x.x.x.x:9876");// 3、启动 producerproducer.start();// 4、创建消息,并指定Topic,Tag和消息体Message msg = new Message("AsyncTopic","async", "AsyncMessage".getBytes("UTF-8"));// 5、发送异步消息,SendCallback是处理异步回调的方法producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {  // 成功回调System.out.println("message send success: " + sendResult);}@Overridepublic void onException(Throwable throwable) {  // 失败回调System.out.println("message send fail: " + throwable);}});// 6、关闭 Producerproducer.shutdown();}
}

RocketMQ 的异步发送主要涉及以下几个关键源码类和方法:

1、DefaultMQProducer:生产者类,负责发送消息。
2、MQClientAPIImpl#sendMessage:底层消息发送实现。
3、NettyRemotingClient#invokeAsync:通过 Netty 实现网络通信。
4、Broker 端的 SendMessageProcessor:处理发送请求。

优缺点

优点

1、非阻塞,发送方可以继续执行其他任务,提高吞吐量。
2、延迟较低,适用于对响应时间敏感的场景。

缺点

1、实现复杂度较高,需要处理异步回调。
2、可靠性相对降低,需要处理失败重试等问题。
3、无法保证发送出去的数据不丢失。

使用场景

适用于对响应时间要求较高的场景,如实时数据处理、日志采集、消费信息的推送等。


单向发送

原理分析

单向(OneWay)发送是一种只负责发送消息而不等待任何响应的方式。生产者将消息发送到 Broker 后(默认超时 5分钟),不关心消息是否成功到达或被持久化,主要依赖 Broker 进行刷盘操作,单向发送通常与异步刷盘结合使用,以提高发送效率。

在这里插入图片描述

单向发送主要涉及以下几个步骤:

1、创建Producer:创建一个Producer对象;
2、创建消息:创建一个Message对象。
3、发送消息:调用DefaultMQProducer的sendOneway方法。

在这里插入图片描述

如下示例代码为一个完整的单向发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;public class OneWayProducerTest {public static void main(String[] args) throws Exception {// 1、创建 producer,设置组名为 OneWayGroupTestDefaultMQProducer producer = new DefaultMQProducer("OneWayGroup");// 2、指定 NameServer的地址,以获取 Broker路由地址producer.setNamesrvAddr("x.x.x.x:9876");// 3、启动 producerproducer.start();// 4、创建消息,并指定Topic,Tag和消息体Message msg = new Message("OneWayTopic","oneway", "OneWayMessage".getBytes("UTF-8"));// 5、发送单向消息producer.sendOneway(msg);// 6、关闭 Producerproducer.shutdown();}
}

RocketMQ 的单向发送主要涉及以下几个关键类和方法:

1、DefaultMQProducer:生产者类,负责发送消息。
2、MQClientAPIImpl#sendMessage:底层消息发送实现。
3、NettyRemotingClient#invokeOneway:通过 Netty 实现网络通信。
4、Broker 端的 SendMessageProcessor:处理发送请求。

优缺点

优点

1、非常高效,延迟最低。
2、适用于对可靠性要求不高的场景。

缺点

1、无法确认消息是否成功发送。
2、可靠性最低,消息可能丢失。

使用场景

适用于对可靠性要求不高的场景,如日志收集、监控数据上报等。

三种方式对比

发送方式优点缺点使用场景
同步发送可靠性高,简单易用延迟较高,吞吐量受限订单系统、金融交易、重要的消息通知等
异步发送非阻塞,延迟较低实现复杂度高,可靠性相对降低实时数据处理、日志采集、消费信息的推送等
单向发送高效,延迟最低无法确认消息是否成功发送,可靠性最低日志收集、监控数据上报等

如何选择

同步发送

消息发送后会等待服务器的响应,整个过程业务是阻塞等待的,适用于对可靠性要求高的场景,比如 订单系统、金融交易等。

异步发送

消息发送后,不等待服务器响应,而是通过回调函数处理响应,适用于对响应时间要求高的场景,比如实时数据处理、日志采集、消费信息的推送等

单向发送

单向发送只负责发送消息而不等待任何响应的方式,也不需要对发送的状态、结果负责,适用于对可靠性要求不高的场景,比如日志收集、监控数据上报等。

每种发送方式都有其适用的场景和优缺点,具体如何选择,一定需要根据业务需求进行权衡。

总结

本文分析了 RocketMQ 同步发送、异步发送和单向发送三种方式的原理、优缺点以及使用场景,并且分析了每种方式涉及到的核心源码。

通过上文的介绍可以知道同步发送方式可以保证消息发送时不丢,但是性能相对其他两种方式差一些。

编辑:三两肉

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/57494.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算服务器:开启科学计算新变革的强大引擎

1983 年,著名数学家 Lax 为首的调研小组指出,大型科学计算对国家安全、科技进步与经济发展至关重要,从美国国家利益出发,大型计算的绝对优势不容动摇。 科学计算是什么?为何在 20 世纪 80 年代就被提升到美国国家利益层…

Pytest日志收集器配置

前言 在pytest框架中,日志记录(logging)是一个强大的功能,它允许我们在测试期间记录信息、警告、错误等,从而帮助调试和监控测试进度。 pytest与Python标准库中的logging模块完美集成,因此你可以很容易地在…

vmware虚拟机linux系统安装

一、下载linux镜像安装包 步骤1---网址地址下载镜像 地址:Index of /ubuntu-releases/22.04/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 步骤2---下载linux版本号 步骤3---查看下载的linuxiso linux镜像操作系统 二、vmware新建安装linux操作系统…

边缘计算技术的优势与挑战

如今,随着5G快速无线网络的到来,将计算存储和物联网(IoT)分析的部署放在靠近数据产生的地方,使得边缘计算成为可能。 物联网设备和新应用的扩展需要实时计算能力。5G无线正在考虑边缘系统,以快速跟踪支持实…

基于SpringBoot+Vue的厨艺交流系统的设计与实现(源码+定制开发)厨艺知识与美食交流系统开发、在线厨艺分享与交流平台开发、智能厨艺交流与分享系统开发

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

爬虫中代理ip选择和使用实战

一、爬虫中的反爬问题 爬虫技术不仅是一种工具,更像是一门捕捉信息的艺术。通过它,我们能够从浩瀚的互联网中,精确获取到所需的有价值数据。对于那些需要进行数据分析或模型训练的人来说,爬虫技术几乎是必备的技能。虽然网上公开…

git提交到github个人记录

windows下git下载 1.进入git官网https://git-scm.com/downloads/win 一直默认选项即可 2.在settings中SSH and GPG keys中Add SSH key 3.选择git cmd git使用 1.配置用户名,和邮箱 git config --global user.email "youexample.com" git config --g…

Director3D: Real-world Camera Trajectory and 3DScene Generation from Text 论文解读

目录 一、概述 二、相关工作 1、文本到3D生成 2、3DGS 三、Director3D 1、Cinematographer 2、Decorator 3、Detailer 4、Loss 一、概述 该论文提出利用真实世界数据集,设计一个从文本生成真实世界3D场景和自适应相机轨迹的强大的开放世界文本到3D生成框架…

067_基于springboot的HSK学习平台

目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍:CodeMentor毕业设计领航者、全网关注者30W群落,InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者,博客领航之星、开发者头条/腾讯云/AW…

【进阶OpenCV】 (18)-- Dlib库 --人脸关键点定位

文章目录 人脸关键点定位一、作用二、原理三、代码实现1. 构造人脸检测器2. 载入模型(加载预测器)3. 获取关键点4. 显示图像5. 完整代码 总结 人脸关键点定位 在dlib库中,有shape_predictor_68_face_landmarks.dat预测器,这是一个…

安装vue发生异常: idealTree:nodejs: sill idealTree buildDeps

一、异常 C:\>npm install vue -g npm ERR! code CERT_HAS_EXPIRED npm ERR! errno CERT_HAS_EXPIREDnpm ERR! request to https://registry.npm.taobao.org/vue failed, reason: certificate has expired 二、原因 请求 https://registry.npm.taobao.org 失败,证…

Spring Boot与Flyway实现自动化数据库版本控制

一、为什么使用Flyway 最简单的一个项目是一个软件连接到一个数据库,但是大多数项目中我们不仅要处理我们开发环境的副本,还需要处理其他很多副本。例如:开发环境、测试环境、生产环境。想到数据库管理,我们立刻就能想到一系列问…

网站漏扫:守护网络安全的关键防线

网站漏洞扫描,简称漏扫,是一种针对网站进行漏洞检测的安全服务。网站漏洞扫描在网络安全中占据着至关重要的地位。 网站漏扫在及时发现和修复漏洞方面发挥着关键作用 通过对网站和系统的全面扫描,能够快速识别出各种潜在的漏洞,…

jmeter中发送post请求遇到的问题

用jmeter发送post请求,把请求参数放在Body Data处,参数都写得正确,但没想到结果每次都报错,直接响应结果乱七八糟,改成用Parameters,反而不乱报错了。 上图 请求里如下 另外一些请求也是这样 这个响应结果也是错误的…

【文献及模型、制图分享】长江中游经济区“水—能源—粮食”系统与城市绿色转型适配性研究

文献介绍 评价资源系统与城市绿色发展适配程度是判断城市健康程度的关键。通过构建“系统压力指数—优劣解距离法(TOPSIS)—脱钩弹性模型(Tapio)”评价体系,对2012—2021年长江中游经济区“水—能源—粮食”&#xff…

aws 把vpc残留删除干净

最近忘了把vpc 删干净导致又被收了冤大头钱 在删除vpc 的收发现又eni在使用,但是忘了是哪个资源在占用 先用命令行把占用的资源找出来停掉 使用 AWS 命令行界面(CLI)来查看 VPC 的使用情况 列出子网: aws ec2 describe-subnets …

项目管理必备!2024年Jira与禅道之间的秘密故事?

一、项目管理工具的重要性 在当今快节奏的工作环境中,项目管理软件的重要性愈发凸显。随着企业规模的不断扩大和业务的日益复杂,传统的项目管理方式已经难以满足需求。项目管理软件成为提升团队协作和工作效率的关键工具。 首先,项目管理软…

vue3项目页面实现echarts图表渐变色的动态配置

完整代码可点击vue3项目页面实现echarts图表渐变色的动态配置-星林社区 https://www.jl1mall.com/forum/PostDetail?postId202410151031000091552查看 一、背景 在开发可配置业务平台时,需要实现让用户对项目内echarts图表的动态配置,让用户脱离代码也…

计算机导论

概述 计算机简史 1935年代,英国数学家图灵(Alan Turing)提出“图灵机”,奠定了计算机的理论基础。 1952年,冯诺依曼确定了计算机由运算器、控制器、存储器、输入、输出等5部分组成(Von Neumann 体系结构)。 60年代…

k8s备份恢复(velero)

velero简介 velero官网: https://velero.io/ velero-github: https://github.com/vmware-tanzu/velero velero的特性 备份可以按集群资源的子集,按命名空间、资源类型标签选择器进行过滤,从而为备份和恢复的内容提供高度的灵活…