RocketMQ 源码分析——Producer

文章目录

  • 消息发送代码实现
  • 消息发送者启动流程
    • 检查配置
    • 获得MQ客户端实例
    • 启动实例
    • 定时任务
  • Producer 消息发送流程
    • 选择队列
      • 默认选择队列策略
      • 故障延迟机制策略*
      • 两种策略的选择
  • 技术亮点:ThreadLocal

消息发送代码实现

下面是一个生产者发送消息的demo(同步发送)

image.png

主要做了几件事:

  • 初始化一个生产者(DefaultMQProducer)对象
  • 设置 NameServer 的地址
  • 启动生产者
  • 发送消息

消息发送者启动流程

image.png

DefaultMQProducerImpl类start()

image.png

检查配置

DefaultMQProducerImpl

image.png

获得MQ客户端实例

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表。DefaultMQProducerImpl类start()

image.png

一个clientId只会创建一个MQClientInstance

image.png

clientId生成规则:IP@instanceName@unitName

image.png

RocketMQ中消息发送者、消息消费者都属于”客户端“。每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

不同的生产者、消费端如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

image.png

启动实例

MQClientInstance类start()

image.png

定时任务

MQClientInstance类startScheduledTask()

image.png

Producer 消息发送流程

我们从一个生产者案例的代码进入代码可知:DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

image.png

image.png

从核心方法可知消息发送就是4个步骤:验证消息、查找路由、选择队列、消息发送。

image.png

image.png

选择队列

默认选择队列策略

采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。

故障延迟机制策略*

采用此策略后,每次向Broker成功或者异常的发送,RocketMQ都会计算出该Borker的可用时间(发送结束时间-发送开始时间,失败的按照30S计算),并且保存,方便下次发送时做筛选。

image.png

除了记录Broker的发送消息时长之外,还要计算一个Broker的不可用时长。这里采用一个经验值:

如果发送时长在550ms之内,不可用时长为0。

达到550ms,不可用时长为30S

达到1000ms,不可用时长为60S

达到2000ms,不可用时长为120S

达到3000ms,不可用时长为180S

达到15000ms,不可用时长为600S

image.png

image.png

有了以上的Broker规避信息后发送消息就非常简单了。

在开启故障延迟机制策略步骤如下:

  1. 根据消息队列表时做轮训
  2. 选好一个队列
  3. 判断该队列所在Broker是否可用
  4. 如果是可用则返回该队列,队列选择逻辑结束
  5. 如果不可用,则接着步骤2继续
  6. 如果都不可用,则随机选一个

代码如下:

image.png

两种策略的选择

从这种策略上可以很明显看到,默认队列选择是轮训策略,而故障延迟选择队列则是优先考虑消息的发送时长短的队列。那么如何选择呢?

首先RocketMQ默认的发送失败有重试策略,默认是2,也就是如果向不同的Broker发送三次都失败了那么这条消息的发送就失败了,作为RocketMQ肯定是尽力要确保消息发送成功。所以给出以下建议。

如果是网络比较好的环境,推荐默认策略,毕竟网络问题导致的发送失败几率比较小。

如果是网络不太好的环境,推荐故障延迟机制,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。

当然以上成立的条件是一个Topic创建在2个Broker以上的的基础上。

技术亮点:ThreadLocal

image.png

image.png

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/85591.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Prometheus+Consul 自助服务发现

Prometheus 官网 https://prometheus.io/download/ Consul 介绍 Consul 是基于 GO 语言开发的开源工具,主要面向分布式,服务化的系统提供服务注册、服务发现和配置管理的功能。Consul 提供服务注册/发现、健康检查、Key/Value存储、多数据中心和分布式一致性保证等功能。通过…

国际版阿里云/腾讯云免开户:云存储服务:云存储服务能够让你随时随地拜访和同享文件

云存储服务:云存储服务能够让你随时随地拜访和同享文件 云存储服务是一种基于云技术的存储渠道,能够让用户存储、管理和同享各种类型的数据文件,如文档、图片、视频、音频等。这种服务具有许多长处,以下是对其进行的详细分析&…

Mallox勒索病毒:最新变种.mallox_lab袭击了您的计算机?

引言 在数字化时代,数据是我们生活和工作的重要组成部分,但同时也引发了各种网络威胁,.mallox_lab勒索病毒便是其中之一。这种恶意软件以其加密文件并勒索赎金的方式而闻名,给个人和组织带来了巨大的风险和损失。本文将深入探讨.…

400电话申请流程详解,助您快速办理联通、移动、电信400电话

导语:随着企业业务的发展,越来越多的企业开始关注400电话的申请与办理。本文将为您详细介绍联通、移动、电信400电话的申请流程,帮助您快速办理400电话,提升企业形象和客户服务质量。 一、联通400电话申请流程 咨询与选择&#x…

BUUCTF:[GYCTF2020]FlaskApp

Flask的网站,这里的功能是Base64编码解码,并输出 并且是存在SSTI的 /hint 提示PIN码 既然提示PIN,那应该是开启了Debug模式的,解密栏那里随便输入点什么报错看看,直接报错了,并且该Flask开启了Debug模式&am…

图论16(Leetcode863.二叉树中所有距离为K的结点)

答案&#xff1a; /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode(int x) { val x; }* }*/ class Solution {public List<Integer> distanceK(TreeNode root, TreeNode tar…

广州汽车神秘顾客调查:洞悉消费者需求,提升品牌竞争力

在广州这座繁华都市中&#xff0c;汽车市场百花齐放&#xff0c;各大品牌激烈竞争。要在这个市场中脱颖而出&#xff0c;了解消费者的真实需求和购车体验至关重要&#xff0c;许多汽车品牌选择引入神秘顾客调查来评估自身的服务质量和客户满意度。群狼调研(长沙神秘顾客检查)专…

多分类中混淆矩阵的TP,TN,FN,FP计算

关于混淆矩阵&#xff0c;各位可以在这里了解&#xff1a;混淆矩阵细致理解_夏天是冰红茶的博客-CSDN博客 上一篇中我们了解了混淆矩阵&#xff0c;并且进行了类定义&#xff0c;那么在这一节中我们将要对其进行扩展&#xff0c;在多分类中&#xff0c;如何去计算TP&#xff0…

AB包的依赖关系

1、什么是依赖关系 有时候一个模型所需要的东西可能在不同的包里面&#xff0c;例如蓝色立方体的模型和材质在不同的包&#xff08;mode和head&#xff09;里&#xff0c;这时需要加载两个包才能让这个球正常显示 2、如何获取依赖关系并加载 //加载AB包 AssetBundle ab Asse…

Manifest merger failed

编译报错&#xff1a;Manifest merger failed with multiple errors 定位编译错误&#xff1a;java.lang.RuntimeException: Manifest merger failed with multiple errors 近日&#xff0c;项目中需要引入一个module。在成功导入后&#xff0c;添加依赖到主模块上&#xff0c…

图像识别技术如何改变智能家居的体验?

图像识别技术在智能家居中的应用正在改变我们的生活体验。通过图像识别技术&#xff0c;智能家居可以更准确地识别用户&#xff0c;并自动调整环境以适应用户的需求。以下是图像识别技术在智能家居中的一些应用&#xff1a; 人脸识别&#xff1a;通过人脸识别技术&#xff0c;智…

《动手学深度学习 Pytorch版》 7.3 网络中的网络(NiN)

LeNet、AlexNet和VGG的设计模式都是先用卷积层与汇聚层提取特征&#xff0c;然后用全连接层对特征进行处理。 AlexNet和VGG对LeNet的改进主要在于扩大和加深这两个模块。网络中的网络&#xff08;NiN&#xff09;则是在每个像素的通道上分别使用多层感知机。 import torch fr…

科技云报道:云安全的新战场上,如何打破“云威胁”的阴霾?

科技云报道原创。 近年来&#xff0c;在云计算和网络安全产业的蓬勃发展下&#xff0c;我国云安全行业市场规模呈现高速增长态势&#xff0c;在网络安全市场总体规模中占比不断上升。 据统计&#xff0c;近5年我国云安全市场保持高速增长&#xff0c;2021年我国云安全市场规模…

代码随想录算法训练营第58天|739. 每日温度,496.下一个更大元素 I (单调栈开始)

链接: 739. 每日温度 链接: 496.下一个更大元素 I 739. 每日温度 单调栈入门题 这题的关键时保证了栈内所有的元素都是单调递增的&#xff08;单调栈&#xff09; class Solution {public int[] dailyTemperatures(int[] temperatures) {Deque<Integer> stack new L…

(25)(25.1) 光学流量传感器的测试和设置

文章目录 25.1.1 测试传感器 25.1.2 校准传感器 25.1.3 测距传感器检查 25.1.4 预解锁检查 25.1.5 首次飞行 25.1.6 第二次飞行 25.1.7 正常操作设置 25.1.8 视频示例&#xff08;Copter-3.4&#xff09; 25.1.9 空中校准 25.1.1 测试传感器 将传感器连接至自动驾驶仪…

uniapp-引入模块失效的问题

在前段时间的开发中&#xff0c;我遇到了一个bug 就是引入了一个模块但是却没有生效&#xff0c;由于静态页面不是我完成的&#xff0c;所以我花了一些时间才发现问题所在 1. 查看是否忘记添加components components: {addCart,myCart, }, 2. 查看是否引入路径有误 Compone…

【C语言】指针的进阶(四)—— 企业笔试题解析

笔试题1&#xff1a; int main() {int a[5] { 1, 2, 3, 4, 5 };int* ptr (int*)(&a 1);printf("%d,%d", *(a 1), *(ptr - 1));return 0; } 【答案】在x86环境下运行 【解析】 &a是取出整个数组的地址&#xff0c;&a就表示整个数组&#xff0c;因此…

Biome-BGC生态系统模型与Python融合技术

Biome-BGC是利用站点描述数据、气象数据和植被生理生态参数&#xff0c;模拟日尺度碳、水和氮通量的有效模型&#xff0c;其研究的空间尺度可以从点尺度扩展到陆地生态系统。 在Biome-BGC模型中&#xff0c;对于碳的生物量积累&#xff0c;采用光合酶促反应机理模型计算出每天…

使用Chatgpt编写的PHP数据库pdo操作类(增删改查)

摘要 将PDO封装成PHP类进行调用有很多好处&#xff0c;包括&#xff1a; 1、封装性和抽象性&#xff1a; 通过将PDO封装到一个类中&#xff0c;您可以将数据库操作逻辑与应用程序的其他部分分离开来&#xff0c;提高了代码的组织性和可维护性。这样&#xff0c;您只需在一个地…

soildwork2022怎么恢复软件界面的默认设置?

1.点击菜单中的” 视图” 2.在弹出的子菜单中选择”工作区” 3.选择工作区中的”默认” 4.点击默认后软件界面就恢复了默认设置。