从零开始读RocketMq源码(二)Message的发送详解

目录

前言

准备

消息发送方式

深入源码

消息发送模式

选择发送方式

同步发送消息

校验消息体

获取Topic订阅信息

高级特性-消息重投

选择消息队列-负载均衡

装载消息体发送消息

压缩消息内容

构造发送message的请求的Header

更新broker故障信息

异步发送消息

总结


前言

上一篇我们已经对RocketMq生产者启动源码进行了学习《从零开始读RocketMq源码(一)生产者启动》那么本篇我们将对生产者发送消息的源码进行学习

准备

如果没看前一篇的,这里还是要强调本篇的rocketmq版本

首先我们从github上拉取rocketmqd的源码链接到本地,使用idea打开。

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

注:请保持和本篇的版本一直,方便后面文章中给出的代码块定位

消息发送方式

在读源码之前我们先了解下mq支持的发送消息的类型。

消息的发送方式有三种,但我们最常用的是同步的方式发送

  • sync 同步:消息发送后,必须等待消息的发送结果返回后,才能发送下一条消息
  • async 异步:消息发送后,不用等待返回结果,直接发送下一条数据,但会设置一个回调方法接收返回结果
  • oneway 单向:消息发送后,不会返回结果,也不会等待,也不会设置回调方法。适用场景日志收集、监控数据和快速通知等对可靠性要求不高但需要高性能的场景

深入源码

首先进入外层的producer.send()方法中

//源码位置:
//包名:org.apache.rocketmq.example.simple
//文件名:Producer
//行数:42
SendResult sendResult = producer.send(msg);

消息发送模式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:431
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));//批量发送if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, null, null);} else {//单条发送return sendDirect(msg, null, null);}
}
  1. 自动批处理发送 -sendByAccumulator()
  • 该方法用于将消息累积到一个批处理容器中,等待足够的消息数量或达到某个时间间隔后,再进行批量发送。
  • 可以显著减少发送次数,提高吞吐量。

     2. 直接发送 -sendDirect()

  • 适用于即时发送或消息已经是批处理消息的情况

本章的重点就是直接发送消息,这也是开发中使用最频发的方式

选择发送方式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:720
public SendResult sendDirect(Message msg, MessageQueue mq,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// send in sync modeif (sendCallback == null) {if (mq == null) {//同步不指定队列return this.defaultMQProducerImpl.send(msg);} else {//同步指定队列return this.defaultMQProducerImpl.send(msg, mq);}} else {if (mq == null) {//异步不指定队列this.defaultMQProducerImpl.send(msg, sendCallback);} else {//异步指定队列this.defaultMQProducerImpl.send(msg, mq, sendCallback);}return null;}
}

有上面代码可以知道,方法中提供了三个参数设置:

  • msg :消息体,这个为必填项
  • sendCallback :消息回调对象,如果这个参数不为空,则为异步发送,为空则为同步发送
  • mq :指定的队列(指定与不指定的区别在于后续是否需要对队列负载均衡,下面源码中会讲到)

根据最开始生产者发送消息,我们只传入了msg,所以本次重点看同步不指定队列代码实现

同步发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1525
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

跟踪代码我们可以看到,方法中我们默认设置了CommunicationMode.SYNC 同步发送模式,并且回调参数为空,以及设置了默认超时时间3s

校验消息体

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:704
Validators.checkMessage(msg, this.defaultMQProducer);

该方法就是校验消息内容是否合规

  • 校验消息内容是否不为空,消息大小是否超过最大值maxMessageSize = 1024 * 1024 * 4; // 4M
  • 校验消息发送的topic是否为不为空,以及topic的长度是否超过默认最长值127

获取Topic订阅信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:709
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

该方法通过消息体中的topic名称获取topic的订阅信息,该方法在我们上一篇生产者启动中已经出现过了,深入方法内部其实就是先从本地topicPublishInfoTable map中获取数据,没有则从远程nameserver中拉取

高级特性-消息重投

这是rocketMq中一个重要的特性,消息如果投递失败了,会重新投递

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:715
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

这段代码就是获取总过重投的次数:

不难看出,只有发送方式为同步发送时才为1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3次,其余发送方式都只有一次机会。

只有同步发送消息才支持消息重投,如果第一次投递失败了,mq还回重试2次投递

找到上面源码位置往下看,其实可以看到下面代码就是使用了一个for循环来进行重投

选择消息队列-负载均衡

通过上面我们知道,最开始并没有指定队列,所以需要程序来获取一个队列。

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:724
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);

因为自动创建的topic,会被默认分配4个队列(生产环境为手动创建topic以及设置队列数量),所以我们必须使用负载均衡保证队列的合理分配到不同队列上,减轻单个队列的压力

  • topicPublishInfo:为消息发送到指定topic的订阅信息
  • lastBrokerName :为上一次选择的broker名称(如果在集群模式下,topic也会存在于多个broker上,因此记录上一次选择的broker名称可以避免连续选择同一个 Broker,从而实现更好的负载均衡和容错处理
  • resetIndex :重置队列索引位置(根据源码逻辑可知,当消息进行重新投递时会重置topic订阅消息中队列的索引位置)

深入上面源码会发现,队列负载均衡的算法获取索引策略默认就是轮询

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:TopicPublishInfo
//行数:101
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

负载均衡策略

  1. 轮询策略 (Round-Robin)
  2. 随机策略 (Random)
  3. 一致性哈希策略 (Consistent Hashing)
  4. 权重随机策略 (Weighted Random)
  5. 最少连接策略 (Least Connections)

装载消息体发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:740
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

该方法就是发现消息的核心方法了,不管是同步发送还是异步发送都会执行该方法

做一些发送消息前的准备,接下深入该方法查看

压缩消息内容

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:898
if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;
}
  • 首先判断消息是否大于4k( compressMsgBodyOverHowmuch = 1024 * 4),大于则进行压缩,小于则不处理
//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1070
byte[] data = compressor.compress(body, compressLevel);
  • 传入消息体以及压缩的等级,这里大佬们提供了三种压缩实现,分别基于三种不同的压缩框架

在我们日常工作中,如果需要压缩内容,也可以参考大佬们的实现,学习源码不仅仅是了解框架的本身,也要吸取优秀的地方合理运用

构造发送message的请求的Header

message是Producer发送给Broker的一个请求,我们可以把内容抽象成两部分组成:请求头请求体

  • 请求体就是消息本身数据
  • 请求头 SendMessageRequestHeader 则包含了各种必要的数据,比如topicmessaeQueue等等,更多可直接查看请求头对象源码

最后就是使用基于netty实现的远程调用发送消息到broker中

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1016
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);

更新broker故障信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);

程序执行到这个位置,说明前面消息发送的流程全部执行完成了,那么我们也知道了消息发送的结果,从而知道broker服务的状态情况,我们需要把当前的broker故障情况更新到 faultItemTable 本地map中,供后续对broker服务的故障规避faultItemTable 该map在前一篇生产者启动中也提到过。

异步发送消息

选择发送方式代码中当sendCallback!=null时则进入异步发送消息

跟踪源码我们可知,异步发送其实就是创建了一个单独的线程,使用Runnable对象实现,因为会返回一个执行结果

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:550
Runnable runnable = new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);} catch (Exception e) {newCallBack.onException(e);}} else {newCallBack.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
  • sendDefaultImpl() 该方法就是和同步发送调用的同一个了,唯一区别就是类型 CommunicationMode.ASYNC 和存在回调方法newCallBack
  • executeAsyncMessageSend() 执行异步消息发送

总结

本篇对生产者发送消息源码进行了跟踪学习,你是否也有所收获呢。下一篇我们将对rocketMq的核心组件Broker进行源码解读,Broker负责接收和存储消息,管理消息队列,并将消息分发给消费者, 是担任连接生产者和消费者,确保消息的高效传输和存储,保证系统的可靠性和性能的重要角色。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42188.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Open3D KDtree的建立与使用

目录 一、概述 1.1kd树原理 1.2kd树搜索原理 1.3kd树构建示例 二、常见的领域搜索方式 2.1K近邻搜索(K-Nearest Neighbors, KNN Search) 2.2半径搜索(Radius Search) 2.3混合搜索(Hybrid Search) …

后端之路——登录校验前言(Cookie\ Session\ JWT令牌)

前言:Servlet 【登录校验】这个功能技术的基础是【会话技术】,那么在讲【会话技术】的时候必然要谈到【Cookie】和【Session】这两个东西,那么在这之前必须要先讲一下一个很重要但是很多人都会忽略的一个知识点:【Servlet】 什么是…

Oracle 19c 统一审计表清理

zabbix 收到SYSAUX表空间告警超过90%告警,最后面给出的清理方法只适合ORACLE 统一审计表的清理,传统审计表的清理SYS.AUD$不适合,请注意。 SQL> Col tablespace_name for a30 Col used_pct for a10 Set line 120 pages 120 select total.…

STM32实战篇:闪灯 × 流水灯 × 蜂鸣器

IO引脚初始化 即开展某项活动之前所做的准备工作,对于一个IO引脚来说,在使用它之前必须要做一些参数配置(例如:选择工作模式、速率)的工作(即IO引脚的初始化)。 IO引脚初始化流程 1、使能IO引…

LED灯的呼吸功能

"呼吸功能"通常是指 LED 灯的一种工作模式,它模拟人类的呼吸节奏,即 LED 灯的亮度会周期性地逐渐增强然后逐渐减弱,给人一种 LED 在"呼吸"的感觉。这种效果通常用于指示设备的状态或者简单地作为装饰效果。(就…

Spring Boot Security自定义AuthenticationProvider

以下是一个简单的示例,展示如何使用AuthenticationProvider自定义身份验证。首先,创建一个继承自标准AuthenticationProvider的类,并实现authenticate方法。 import com.kamier.security.web.service.MyUser; import org.springframework.se…

YOLOv10改进 | EIoU、SIoU、WIoU、DIoU、FocusIoU等二十余种损失函数

一、本文介绍 这篇文章介绍了YOLOv10的重大改进,特别是在损失函数方面的创新。它不仅包括了多种IoU损失函数的改进和变体,如SIoU、WIoU、GIoU、DIoU、EIOU、CIoU,还融合了“Focus”思想,创造了一系列新的损失函数。这些组合形式的…

独立开发者系列(22)——API调试工具apifox的使用

接口的逻辑已经实现,需要对外发布接口,而发布接口的时候,我们需要能自己简单调试接口。当然,其实自己也可以写简单的代码调试自己的接口,因为其实就是简单的request请求或者curl库读取,调整请求方式get或者…

RxJava学习记录

文章目录 1. 总览1.1 基本原理1.2 导入包和依赖 2. 操作符2.1 创建操作符2.2 转换操作符2.3 组合操作符2.4 功能操作符 1. 总览 1.1 基本原理 参考文献 构建流:每一步操作都会生成一个新的Observable节点(没错,包括ObserveOn和SubscribeOn线程变换操作…

echarts实现3D饼图

先看下最终效果 实现思路 使用echarts-gl的曲面图&#xff08;surface&#xff09;类型 通过parametric绘制曲面参数实现3D效果 代码实现 <template><div id"surfacePie"></div> </template> <script setup>import {onMounted} fro…

简单的找到自己需要的flutter ui 模板

简单的找到自己需要的flutter ui 模板 网站 https://flutterawesome.com/ 简介 我原本以为会很难用 实际上不错 很简单 打开后界面类似于,右上角可以搜索 点击view github 相当简单 很oks

【见刊通知】MVIPIT 2023机器视觉、图像处理与影像技术国际会议

MVIPIT 2023&#xff1a;https://ieeexplore.ieee.org/xpl/conhome/10578343/proceeding 入库Ei数据库需等20-50天左右 第二届会议征稿启动&#xff08;MVIPIT 2024&#xff09; The 2nd International Conference on Machine Vision, Image Processing & Imaging Techn…

MacOS和Windows中怎么安装Redis

希望文章能给到你启发和灵感&#xff5e; 如果觉得文章对你有帮助的话&#xff0c;点赞 关注 收藏 支持一下博主吧&#xff5e; 阅读指南 开篇说明一、基础环境说明1.1 硬件环境1.2 软件环境 二、MacOS中Redis的安装2.1 HomeBrew 安装&#xff08;推荐&#xff09;2.2 通过官方…

70.WEB渗透测试-信息收集- WAF、框架组件识别(10)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;69.WEB渗透测试-信息收集- WAF、框架组件识别&#xff08;9&#xff09; 关于waf相应的识…

江协科技51单片机学习- p25 无源蜂鸣器

&#x1f680;write in front&#x1f680; &#x1f50e;大家好&#xff0c;我是黄桃罐头&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​…

环信IM实现小米、oppo推送详细步骤

本文教大家集成环信IM后如何实现小米、oppo推送。 一、小米推送 步骤一、在小米开放平台创建应用。 在 小米开放平台 创建应用&#xff0c;开启推送服务。详见小米官方网站的 推送服务接入指南。 步骤二、上传推送证书。 注册完成后&#xff0c;需要在环信即时通讯云控制台…

LeetCode-刷题记录-前缀和合集(本篇blog会持续更新哦~)

一、前缀和&#xff08;Prefix Sum&#xff09;算法概述 前缀和算法通过预先计算数组的累加和&#xff0c;可以在常数时间内回答多个区间和相关的查询问题&#xff0c;是解决子数组和问题中的重要工具。 它的基本思想是通过预先计算和存储数组的前缀和&#xff0c;可以在 O(1)…

7.8作业

一、思维导图 二、 1】按值修改 2】按值查找&#xff0c;返回当前节点的地址 &#xff08;先不考虑重复&#xff0c;如果有重复&#xff0c;返回第一个&#xff09; 3】反转 4】销毁链表 //按值修改 int value_change(linklistptr H,datatype e,int value) {if(HNULL||empty(H…

Greenplum(二)【SQL】

前言 Greenplum 的剩余部分主要其实主要就是 DDL 和之前学的 MySQL 不大一样&#xff0c;毕竟 Greenplum 是基于 PostgreSQL 数据库的&#xff0c;不过那些 DML 和 MySQL、Hive 基本上大差不差&#xff0c;所以就没有必要浪费时间了。 1、DDL 1.1、库操作 1.1.1、创建数据库…

python爬虫加入进度条

安装tqdm和requests库 pip install tqdm -i https://pypi.tuna.tsinghua.edu.cn/simplepip install requests -i https://pypi.tuna.tsinghua.edu.cn/simple带进度条下载 import time # 引入time模块&#xff0c;用于处理时间相关的功能 from tqdm import * # 从tqdm包中…