面试题---深入源码理解MQ长轮询优化机制

引言

在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。

一、MQ基础概念与业务场景

1.1 MQ基础概念

MQ(Message Queue)即消息队列,是一种应用程序对应用程序的通信方法。它通过在发送方和接收方之间引入一个中间层,实现异步、解耦的消息传递。常见的MQ产品有ActiveMQ、RabbitMQ、Kafka、RocketMQ等。

1.2 业务场景

延时消息

延时消息指的是消息在发送到MQ后,并不会立即被消费者消费,而是等待一段指定的时间后才被投递给消费者。这种机制广泛应用于以下场景:

  • 订单超时处理:用户下单后,如果长时间未支付,系统自动取消订单。
  • 短信验证码:用户注册或登录时,发送验证码短信,验证码在一定时间内有效。
  • 任务调度:在指定时间后执行某项任务,如定时清理日志、备份数据等。
定时消息

定时消息与延时消息类似,但更加灵活。它允许用户指定消息在将来的某个具体时间点被投递给消费者。定时消息适用于以下场景:

  • 定时通知:在指定时间点发送通知消息,如每日工作报告、定时提醒等。
  • 周期性任务:按照固定的时间间隔执行任务,如每小时数据汇总、每日系统维护等。

二、MQ长轮询机制原理

2.1 轮询与长轮询

轮询

轮询是一种客户端与服务器之间实时通信的技术手段。客户端定期发送请求来查询服务器是否有新数据或事件,并将响应返回给客户端。轮询的优点是简单易实现,适用于各种浏览器和服务器。然而,轮询也存在明显的缺点:会产生大量的无效请求,浪费带宽和服务器资源,产生不必要的网络流量和延迟。

长轮询

长轮询是对轮询的一种改进。在长轮询中,客户端发送一个HTTP请求给服务器,并保持连接打开。如果服务器没有新数据,则不会立即返回响应,而是将请求挂起,直到有新数据到达或超时。这种方式显著减少了无效的网络请求,提高了数据更新的实时性。

2.2 长轮询机制在MQ中的应用

在MQ系统中,长轮询机制主要用于优化消费者拉取消息的过程。传统的轮询方式下,消费者需要定期向Broker发送拉取请求,即使Broker没有新消息也会返回空响应。这种方式会导致大量的无效请求和资源浪费。而长轮询机制则允许消费者在没有新消息时保持连接挂起状态,直到有新消息到达或超时后再返回响应。这样,消费者可以实时地获取新消息,同时减少了无效请求和资源浪费。

三、RocketMQ长轮询机制源码分析

3.1 RocketMQ概述

RocketMQ是一款分布式消息中间件,由阿里巴巴开源。它支持高吞吐、低延迟的消息传递,并提供了丰富的消息过滤、顺序消息、事务消息等高级功能。RocketMQ中的消费者拉取消息时,就采用了长轮询机制来优化性能。

3.2 PullMessageService组件

在RocketMQ中,PullMessageService组件负责处理消费者的拉取请求。它是一个后台线程服务,会不断地从pullRequestQueue中取出PullRequest对象,并向Broker发送拉取请求。

java复制代码
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}
}

3.3 PullRequest对象

PullRequest对象表示一个拉取请求,它包含了消费者的消息队列、拉取偏移量、挂起时间等信息。当PullMessageServicepullRequestQueue中取出PullRequest对象后,会调用pullMessage方法向Broker发送拉取请求。

java复制代码
public void pullMessage(final PullRequest pullRequest) {
// ... 省略部分代码 ...
try {
this.executePullRequestImmediately(pullRequest);} catch (Exception e) {
// ... 省略异常处理代码 ...}
}

3.4 长轮询实现细节

executePullRequestImmediately方法中,RocketMQ会根据是否启用长轮询机制来决定拉取策略。如果启用了长轮询(longPollingEnable=true),则会根据消费者设置的挂起超时时间(brokerSuspendMaxTimeMillis)来决定重试时间。

java复制代码
private void executePullRequestImmediately(final PullRequest pullRequest) {
// ... 省略部分代码 ...
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
// 长轮询逻辑
final long beginLockTimestamp = System.currentTimeMillis();
// ... 省略加锁和超时处理代码 ...
this.pullMessage(pullRequest);
// ... 省略部分代码 ...} else {
// 短轮询逻辑
// ... 省略短轮询处理代码 ...}
}

在长轮询逻辑中,RocketMQ会调用pullMessage方法向Broker发送拉取请求。如果Broker没有新消息,则会将请求挂起一段时间(默认为5秒),直到有新消息到达或超时后再返回响应。

3.5 PullRequestHoldService与ReputMessageService

RocketMQ中的长轮询机制由PullRequestHoldServiceReputMessageService两个线程共同实现。

  • PullRequestHoldService:每隔一定时间(默认为5秒)检查pullRequestTable中的挂起请求,如果有新消息到达则触发拉取操作,否则继续挂起。
  • ReputMessageService:负责处理消息存储中的新消息到达事件。每当有新消息到达时,它会调用PullRequestHoldService中的相关方法尝试拉取消息。

这两个线程的协作确保了消费者在没有新消息时不会频繁发送拉取请求,从而减少了无效请求和资源浪费。

四、Java模拟实现长轮询功能

4.1 模拟场景

为了演示长轮询机制的实现原理,我们可以模拟一个简单的场景:客户端向服务器订阅某个频道的消息,服务器在有新消息到达时推送给客户端。客户端使用长轮询机制来保持与服务器的连接并实时获取新消息。

4.2 服务器端实现

服务器端使用Spring Boot框架来创建一个简单的Web服务,并使用DeferredResult来实现长轮询功能。

java复制代码
@RestController
@RequestMapping("/im")
public class IMController {
private final ConcurrentHashMap<String, DeferredResult<String>> clientMap = new ConcurrentHashMap<>();
private final List<String> messageQueue = new CopyOnWriteArrayList<>();
@GetMapping("/subscribe")
public DeferredResult<String> subscribe(@RequestParam String channel) {DeferredResult<String> deferredResult = new DeferredResult<>(10000L); // 设置超时时间为10秒clientMap.put(channel, deferredResult);
return deferredResult;}
@PostMapping("/send")
public String send(@RequestParam String channel, @RequestParam String message) {messageQueue.add(channel + ":" + message);notifyClients(channel);
return "Message sent";}
private void notifyClients(String channel) {DeferredResult<String> deferredResult = clientMap.get(channel);
if (deferredResult != null) {
String message = messageQueue.poll();
if (message != null) {deferredResult.setResult(message);clientMap.remove(channel);} else {
// 如果没有新消息,则重新放入队列等待下一次检查clientMap.put(channel, deferredResult);}}}
}

在上面的代码中,subscribe方法用于处理客户端的订阅请求,并返回一个DeferredResult对象。该对象会在有新消息到达时被设置结果并返回给客户端。send方法用于处理消息发送请求,并将消息添加到消息队列中。notifyClients方法负责检查消息队列并通知等待中的客户端。

4.3 客户端实现

客户端使用JavaScript的fetch API来发送长轮询请求。

javascript复制代码
function subscribe(channel) {
fetch(`/im/subscribe?channel=${channel}`).then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');}
return response.text();}).then(message => {
console.log(`Received message: ${message}`);
// 收到消息后再次发起订阅请求以保持长轮询
setTimeout(() => subscribe(channel), 1000);}).catch(error => {
console.error('There was a problem with the fetch operation:', error);
// 请求失败或超时后重新发起订阅请求
setTimeout(() => subscribe(channel), 5000);});
}
// 示例:订阅"testChannel"频道
subscribe('testChannel');

在上面的代码中,subscribe函数用于发送订阅请求并保持长轮询连接。当收到服务器返回的消息时,会打印消息内容并再次发起订阅请求以保持连接。如果请求失败或超时,则会在一段时间后重新发起订阅请求。

五、总结与展望

本文深入探讨了MQ系统中长轮询机制的原理及其在RocketMQ中的实现细节。通过源码分析和Java模拟实现,我们了解了长轮询机制如何优化消费者拉取消息的过程,减少无效请求和资源浪费。未来,随着分布式系统的不断发展和消息中间件的不断演进,长轮询机制将继续发挥其重要作用,为消息传递提供更加高效、可靠的解决方案。

同时,我们也应该看到长轮询机制并不是万能的。在实际应用中,我们需要根据具体的业务场景和需求来选择合适的消息传递模式和优化策略。例如,在对于实时性要求极高的场景下,我们可以考虑使用WebSocket等更高级的技术来实现全双工通信。而在对于消息顺序和一致性要求较高的场景下,则需要结合其他机制(如分布式事务、消息重试等)来确保消息的可靠传递。

总之,MQ系统中的长轮询机制是一种重要的优化手段,它能够帮助我们更好地实现消息的异步传递和实时更新。在未来的发展中,我们将继续探索和优化这一机制,为分布式系统的消息传递提供更加高效、可靠的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60451.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GPT中转站技术架构

本文介绍阿波罗AI中转站&#xff08;https://api.ablai.top/&#xff09;的技术架构&#xff0c;该中转API的技术架构采用了分布式架构、智能调度和API中转等技术&#xff0c;确保了全球范围内的高效访问和稳定运行。以下是对该技术架构的详细分析&#xff1a; 分布式架构 分…

【强化学习的数学原理】第02课-贝尔曼公式-笔记

学习资料&#xff1a;bilibili 西湖大学赵世钰老师的【强化学习的数学原理】课程。链接&#xff1a;强化学习的数学原理 西湖大学 赵世钰 文章目录 一、为什么return重要&#xff1f;如何计算return&#xff1f;二、state value的定义三、Bellman公式的详细推导四、公式向量形式…

[less] Operation on an invalid type

我这个是升级项目的时候遇到的&#xff0c;要从 scss 升级到 less&#xff0c;然后代码中就报了这个错误 我说一下代码的错误过程&#xff0c;但是这里没有复现&#xff0c;因为我原本报错的代码要复杂很多&#xff0c;而且是公司代码&#xff0c;不方便透露&#xff0c;这是我…

ssm面向品牌会员的在线商城小程序

摘要 随着Internet的发展&#xff0c;人们的日常生活已经离不开网络。未来人们的生活与工作将变得越来越数字化&#xff0c;网络化和电子化。它将是直接管理面向品牌会员的在线商城小程序的最新形式。本小程序是以面向品牌会员的在线商城管理为目标&#xff0c;使用 java技术制…

spring-logback引用外部文件

背景 在spring微服务开发和云部署中&#xff0c;都涉及到日志的收集&#xff0c;很多时候为例方便管理和开发&#xff0c;很多公司都会开发一些基础配置代码。其中日志就是很重要的部分&#xff0c; 为了方便部署、收集、查看&#xff0c;所以日志文件需要存储在同一个…

.NET周刊【11月第3期 2024-11-17】

国内文章 .NET 9使用Scalar替代Swagger https://www.cnblogs.com/netry/p/18543378/scalar-an-alternative-to-swagger-in-dotnet-9 .NET 9 移除了 Swashbuckle.AspNetCore&#xff0c;因为其维护不力&#xff0c;并转向 Microsoft.AspNetCore.OpenApi。除了 Swashbuckle&am…

国土变更调查拓扑错误自动化修复工具的研究

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 一、拓扑错误的形成原因 1.边界不一致 2.不规则图形 3.尖锐角 4.局部狭长 5.细小碎面 6.更新层相互重叠 二、修复成果展示 1.边界不一致 2.不规则图形 3.尖锐角 4.局部狭…

react 使用中注意事项提要

1、尽可能的减少数组index作为key&#xff0c;数组中插入元素的等操作时&#xff0c;会使得效率底下&#xff1b; 2、如果在 JSX 中给元素添加类, 需要使用 className 代替 class&#xff1b;类似&#xff1a;label 的 for属性&#xff0c;使用htmlFor代替&#xff1b; 3、在…

异常和中断

在计算机系统中&#xff0c;异常和中断是两种常见的用于处理异步事件的机制。以下是常见的异常和中断及其特点的详细解释&#xff1a; 异常&#xff08;内中断&#xff09; 异常&#xff0c;也称为内中断&#xff0c;是由CPU内部事件引起的中断。异常通常与程序执行的当前指令…

WPS 加载项开发说明wpsjs

wpsjs几个常用的CMD命令&#xff1a; 1.打开cmd输入命令测试版本号 npm -v 2.首次安装nodejs&#xff0c;npm默认国外镜像&#xff0c;包下载较慢时&#xff0c;可切换到国内镜像 //下载速度较慢时可切换国内镜像 npm config set registry https://registry.npmmirror.com …

Javaweb梳理18——JavaScript

今日目标 掌握 JavaScript 的基础语法掌握 JavaScript 的常用对象&#xff08;Array、String&#xff09;能根据需求灵活运用定时器及通过 js 代码进行页面跳转能通过DOM 对象对标签进行常规操作掌握常用的事件能独立完成表单校验案例 18.1 JavaScript简介 JavaScript 是一门跨…

android 使用MediaPlayer实现音乐播放--权限请求

在Android应用中&#xff0c;获取本地音乐文件的权限是实现音乐扫描功能的关键步骤之一。随着Android版本的不断更新&#xff0c;从Android 6.0&#xff08;API级别23&#xff09;开始&#xff0c;应用需要动态请求权限&#xff0c;而到了android 13以上需要的权限又做了进一步…

GPT系列文章

GPT系列文章 GPT1 GPT1是由OpenAI公司发表在2018年要早于我们之前介绍的所熟知的BERT系列文章。总结&#xff1a;GPT 是一种半监督学习&#xff0c;采用两阶段任务模型&#xff0c;通过使用无监督的 Pre-training 和有监督的 Fine-tuning 来实现强大的自然语言理解。在 Pre-t…

NUXT3学习日记四(路由中间件、导航守卫)

前言 在 Nuxt 3 中&#xff0c;中间件&#xff08;Middleware&#xff09;是用于在页面渲染之前或导航发生之前执行的函数。它们允许你在路由切换时执行逻辑&#xff0c;像是身份验证、重定向、权限控制、数据预加载等任务。中间件可以被全局使用&#xff0c;也可以只在特定页…

汽车免拆诊断案例 | 2012款路虎揽胜运动版柴油车加速无力

故障现象  一辆2012款路虎揽胜运动版车&#xff0c;搭载3.0T柴油发动机&#xff08;型号为306DT&#xff09;&#xff0c;累计行驶里程约为10.2万km。车主进厂反映&#xff0c;车辆行驶中加速无力&#xff0c;且发动机故障灯异常点亮。 故障诊断 接车后试车&#xff0c;发动…

网络安全与加密

1.Base64简单说明描述&#xff1a;Base64可以成为密码学的基石&#xff0c;非常重要。特点&#xff1a;可以将任意的二进制数据进行Base64编码结果&#xff1a;所有的数据都能被编码为并只用65个字符就能表示的文本文件。65字符&#xff1a;A~Z a~z 0~9 / 对文件进行base64编码…

DrissionPage爬虫工具教程

当然可以&#xff01;下面是一些更高级和复杂的 DrissionPage 使用示例&#xff0c;包括处理动态加载的内容、处理登录和会话、处理多页面操作等。 处理动态加载的内容 许多现代网站使用 JavaScript 动态加载内容。在这种情况下&#xff0c;我们需要等待特定的元素出现&#…

C语言:数组转换指针的时机

1、指针数组 如果一个数组中的所有元素保存的都是指针&#xff0c;那么我们就称它为指针数组&#xff0c;指针数组的定义形式一般为&#xff1a; dataType *arrayName[length];[ ]的优先级高于*&#xff0c;该定义形式应该理解为&#xff1a; dataType *(arrayName[length])…

华为机试HJ60 查找组成一个偶数最接近的两个素数

首先看一下题 描述 任意一个偶数&#xff08;大于2&#xff09;都可以由2个素数组成&#xff0c;组成偶数的2个素数有很多种情况&#xff0c;本题目要求输出组成指定偶数的两个素数差值最小的素数对。 数据范围&#xff1a;输入的数据满足 4≤n≤1000 输入描述&#xff1a; 输…

UE5 DownloadImage加载jpg失败的解决方法

DownloadImage加载jpg失败的解决方法 现象解决方案具体方法 现象 用UE自带的 DownloadImage 无法下载成功&#xff0c;从 failure 引脚出来。 接入一个由监控器自动保存起的图像&#xff0c;有些可以正常加载成功&#xff0c;有些无法加载成功。 经调查问题出现在&#xff0c;…