RocketMQ的长轮询(Long Polling)实现分析

目录

前言

长轮询

1.实现步骤

1.1客户端轮询发送请求

1.2服务端处理数据

1.3客户端接收数据

2.实现实例

RocketMQ长轮询

1.PullMessage服务

2.PullMessageProcessor服务

3.PullCallback回调

总结


前言

消息队列一般在消费端都会提供push和pull两种模式,RocketMQ同样实现了这两种模式,分别提供了两个实现类:DefaultMQPushConsumer和DefaultMQPullConsumer;两种方式各有优势:

push模式:推送模式,即服务端有数据之后立马推送消息给客户端,需要客户端和服务器建立长连接,实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;

pull模式:拉取模式,即客户端主动去服务端拉取数据,主动权在客户端,拉取数据,然后处理数据,再拉取数据,一直循环下去,具体拉取数据的时间间隔不好设定,太短可能会导致大量的连接拉取不到数据,太长导致数据接收不及时; RocketMQ使用了长轮询的方式,兼顾了push和pull两种模式的优点,下面首先对长轮询做简单介绍,进而分析RocketMQ内置的长轮询模式。

长轮询

长轮询通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会hold住请求,等待服务端有数据,或者一直没有数据超时处理,然后一直循环下去;下面看一下如何简单实现一个长轮询;

1.实现步骤

1.1客户端轮询发送请求

客户端应该存在一个一直循环的程序,不停的向服务端发送获取消息请求;

1.2服务端处理数据

服务器接收到客户端请求之后,首先查看是否有数据,如果有数据则直接返回,如果没有则保持连接,等待获取数据,服务端获取数据之后,会通知之前的请求连接来获取数据,然后返回给客户端;

1.3客户端接收数据

正常情况下,客户端会马上接收到服务端的数据,或者等待一段时间获取到数据;如果一直获取不到数据,会有超时处理;在获取数据或者超时处理之后会关闭连接,然后再次发起长轮询请求;

2.实现实例

以下使用netty模拟一个http服务器,使用HttpURLConnection模拟客户端发送请求,使用BlockingQueue存放数据;

服务端代码

public class Server {public static void start(final int port) throws Exception {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup woker = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();try {serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker).childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("http-decoder", new HttpServerCodec());ch.pipeline().addLast(new HttpServerHandler());}});ChannelFuture future = serverBootstrap.bind(port).sync();System.out.println("server start ok port is " + port);DataCenter.start();future.channel().closeFuture().sync();} finally {boss.shutdownGracefully();woker.shutdownGracefully();}}public static void main(String[] args) throws Exception {start(8080);}
}

netty默认支持http协议,直接使用即可,启动端口为8080;同时启动数据中心服务,相关代码如下:

public class DataCenter {private static Random random = new Random();private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();private static AtomicInteger num = new AtomicInteger();public static void start() {while (true) {try {Thread.sleep(random.nextInt(5) * 1000);String data = "hello world" + num.incrementAndGet();queue.put(data);System.out.println("store data:" + data);} catch (InterruptedException e) {e.printStackTrace();}}}public static String getData() throws InterruptedException {return queue.take();}}

为了模拟服务端没有数据,需要等待的情况,这里使用BlockingQueue来模拟,不定期的往队列里面插入数据,同时对外提供获取数据的方法,使用的是take方法,没有数据会阻塞知道有数据为止;getData在类HttpServerHandler中使用,此类也很简单,如下:

public class HttpServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpRequest) {FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);httpResponse.content().writeBytes(DataCenter.getData().getBytes());httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes());ctx.writeAndFlush(httpResponse);}}
}

获取到客户端的请求之后,从数据中心获取一条消息,如果没有数据,会进行等待,直到有数据为止;然后使用FullHttpResponse返回给客户端;客户端使用HttpURLConnection来和服务端建立连接,不停的拉取数据,代码如下:

public class Client {public static void main(String[] args) {while (true) {HttpURLConnection connection = null;try {URL url = new URL("http://localhost:8080");connection = (HttpURLConnection) url.openConnection();connection.setReadTimeout(10000);connection.setConnectTimeout(3000);connection.setRequestMethod("GET");connection.connect();if (200 == connection.getResponseCode()) {BufferedReader reader = null;try {reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));StringBuffer result = new StringBuffer();String line = null;while ((line = reader.readLine()) != null) {result.append(line);}System.out.println("时间:" + new Date().toString() + "result =  " + result);} finally {if (reader != null) {reader.close();}}}} catch (IOException e) {e.printStackTrace();} finally {if (connection != null) {connection.disconnect();}}}}
}

以上只是简单的模拟了长轮询的方式,下面重点来看看RocketMQ是如何实现长轮询的;

RocketMQ长轮询

RocketMQ的消费端提供了两种消费模式分别是:DefaultMQPushConsumer和DefaultMQPullConsumer,其中DefaultMQPushConsumer就是使用的长轮询,所以下面重点分析此类;

1.PullMessage服务

从名字可以看出来就是客户端从服务端拉取数据的服务,看里面的一个核心方法:

@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}

服务启动之后,会一直不停的循环调用拉取数据,PullRequest可以看作是拉取数据需要的参数,部分代码如下:

public class PullRequest {private String consumerGroup;private MessageQueue messageQueue;private ProcessQueue processQueue;private long nextOffset;private boolean lockedFirst = false;...省略...
}

每个MessageQueue 对应了封装成了一个PullRequest,因为拉取数据是以每个Broker下面的Queue为单位,同时里面还一个ProcessQueue,每个MessageQueue也同样对应一个ProcessQueue,保存了这个MessageQueue消息处理状态的快照;还有nextOffset用来标识读取的位置;继续看一段pullMessage中的内容,给服务端发送请求的头内容:

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;

其中有一个参数是SuspendTimeoutMillis,作用是设置Broker的最长阻塞时间,默认为15秒,前提是没有消息的情况下,有消息会立刻返回;

2.PullMessageProcessor服务

从名字可以看出,服务端用来处理pullMessage的服务,下面重点看一下processRequest方法,其中包括对获取不同结果做的处理:

 switch (response.getCode()) {case ResponseCode.SUCCESS:...省略...break;case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:...省略...break;default:assert false;

一共处理了四个类型,我们关心的是在没有获取到数据的情况下是如何处理的,可以重点看一下ResponseCode.PULL_NOT_FOUND,表示没有拉取到数据,此时会调用PullRequestHoldService服务,从名字可以看出此服务用来hold住请求,不会立马返回,response被至为了null,不给客户端响应;下面重点看一下PullRequestHoldService:

@Overridepublic void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}

此方法主要就是通过不停的检查被hold住的请求,检查是否已经有数据了,具体检查哪些就是在ResponseCode.PULL_NOT_FOUND中调用的suspendPullRequest方法:

private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =new ConcurrentHashMap<String, ManyPullRequest>(1024);public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);}

将需要hold处理的PullRequest放入到一个ConcurrentHashMap中,等待被检查;具体的检查代码在checkHoldRequest中:

private void checkHoldRequest() {for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);try {this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}

此方法用来获取指定messageQueue下最大的offset,然后用来和当前的offset来比较,来确定是否有新的消息到来;往下看notifyMessageArriving方法:

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;if (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);}if (newestOffset > request.getPullFromThisOffset()) {if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}}

方法中两个重要的判定就是:比较当前的offset和maxoffset,看是否有新的消息到来,有新的消息返回客户端;另外一个就是比较当前的时间和阻塞的时间,看是否超过了最大的阻塞时间,超过也同样返回; 此方法不光在PullRequestHoldService服务类中循环调用检查,同时在DefaultMessageStore中消息被存储的时候调用;其实就是主动检查和被动通知两种方式。

3.PullCallback回调

服务端处理完之后,给客户端响应,回调其中的PullCallback,其中在处理完消息之后,重要的一步就是再次把pullRequest放到PullMessageService服务中,等待下一次的轮询;

总结

本文首先介绍了两种消费消息的模式,介绍了其中的优缺点,然后引出了长轮询,并且在本地简单模拟了长轮询,最后重点介绍了RocketMQ中是如何实现的长轮询

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云原生Kubernetes:K8S集群版本升级(v1.20.15 - v1.22.14)

目录 一、理论 1.K8S集群升级 2.集群概况 3.升级集群&#xff08;v1.21.14&#xff09; 4.验证集群&#xff08;v1.21.14&#xff09; 5.升级集群&#xff08;v1.22.14&#xff09; 6.验证集群 (v1.22.14) 二、实验 1.升级集群&#xff08;v1.21.14&#xff09; 2.验…

apifox的使用以及和idea集成

apifox 简介 Apifox 是 API 文档、API 调试、API Mock、API 自动化测试一体化协作平台&#xff0c;定位 Postman Swagger Mock JMeter&#xff0c;由此可见apifox集功能于一身&#xff0c;极大的提升了我们开发的效率&#xff0c;不用再为postman网络连接失败而发愁&…

零售数据分析师熬夜整理:人、货、场、供、财这样做

在零售数据分析中&#xff0c;人、货、场、供、财数据分析非常重要&#xff0c;它们分别是指人员、商品、场所、供应和财务&#xff0c;对这些要素进行数据分析&#xff0c;可以更好地了解市场需求、优化商品供应链、调整销售策略和提高盈利能力。零售数据量大、分析指标多且复…

uni-app 瀑布流布局的实现

方式一&#xff1a;使用纯 CSS 实现 使用 flex 布局方式 <!-- 瀑布流布局 --> <template><view class"container"><viewclass"cont-box":style"{ --layout-width: 100 / flowData.column - flowData.columnSpace % }"v-f…

Apache Ant的安装

介绍 Apache Ant是一个Java库和一个 命令行工具&#xff0c;可以用来构建Java应用。Ant提供了许多内置的任务&#xff08;tasks&#xff09;&#xff0c;可以编译、组装、测试、运行Java应用。Ant也可以构建非Java应用&#xff0c;例如C、C应用。 Ant非常灵活&#xff0c;没有…

GitHub Action 通过SSH 自动部署到云服务器上

准备 正式开始之前&#xff0c;你需要掌握 GitHub Action 的基础语法&#xff1a; workflow &#xff08;工作流程&#xff09;&#xff1a;持续集成一次运行的过程&#xff0c;就是一个 workflow。name: 工作流的名称。on: 指定次工作流的触发器。push 表示只要有人将更改推…

Vue鼠标右键画矩形和Ctrl按键多选组件

效果图 说明 下面会贴出组件代码以及一个Demo&#xff0c;上面的效果图即为Demo的效果&#xff0c;建议直接将两份代码拷贝到自己的开发环境直接运行调试。 组件代码 <template><!-- 鼠标画矩形选择对象 --><div class"objects" ref"objectsR…

42.会话划分问题求解(打标)

思路分析&#xff1a; &#xff08;1&#xff09;为每一次浏览找到他的上一次浏览时间 lag(view_timestamp, 1, 0) over(partition by user_id order by view_timestamp) as last_view_timestamp &#xff08;2&#xff09;为&#xff1e;60s的设置一个初始会话的标签flagif(vi…

ArcGIS笔记6_绘制中间镂空的面要素、面要素抠洞

本文目录 前言Step 1 对海湾大整面和零散的岛屿分别绘制面要素Step 2 利用[擦除]工具从海湾大整面中抠掉零散的岛屿 前言 使用ArcGIS做项目时&#xff0c;很多场景下都需要绘制中间镂空的面要素&#xff0c;比如一个海湾中间有许多零散的岛屿&#xff0c;计算水域面积时就要扣…

ant design pro v6如何引入第三方js?如腾讯地图等!

由于ant pro隐藏.html&#xff0c;需要通过他们约定的方式引入即可。 1.配置config文件 /config/config.tsheadScripts: [// 解决首次加载时白屏的问题{ src: /scripts/loading.js, async: true },{ src: "https://map.qq.com/api/gljs?v1.exp&keyOB4BZ-D4W3U-B7VV…

企业如何凭借软文投放实现营销目标?

数字时代下&#xff0c;软文投放成为许多企业营销的主要方式&#xff0c;因为软文投放成本低且效果持续性强&#xff0c;最近也有不少企业来找媒介盒子进行软文投放&#xff0c;接下来媒介盒子就来给大家分享下&#xff0c;企业在软文投放中需要掌握哪些技巧&#xff0c;才能实…

加速企业云计算部署:应对新时代的挑战

随着科技的飞速发展&#xff0c;企业面临着诸多挑战。在这个高度互联的世界中&#xff0c;企业的成功与否常常取决于其能否快速、有效地响应市场的变化。云计算作为一种新兴的技术趋势&#xff0c;为企业提供了实现这一目标的可能。通过加速企业云计算部署&#xff0c;企业可以…

【C/C++】宏定义中的#和##

C和C中的宏&#xff08;Macro&#xff09;属于编译器预处理的范畴。 单井号# 运算符 单井号#&#xff08;字符串化运算符&#xff09;用于将 宏参数变量名 转换为 字符串&#xff08;Stringfication&#xff09;。 下面是一个示例&#xff1a; #include <stdio.h>#de…

MTK6877/MT6877天玑900安卓5G核心板_安卓开发板主板定制开发

2021年5月13日&#xff0c;MediaTek 宣布发布旗下的天玑900系列芯片&#xff0c;又名MT6877。天玑900基于6nm先进工艺制造&#xff0c;搭载硬件级4K HDR视频录制引擎&#xff0c;支持1.08亿像素摄像头、5G双全网通和Wi-Fi 6连接、旗舰级存储规格和120Hz的FHD超高清分辨率显示&a…

第五十六章 学习常用技能 - 执行 SQL 查询

文章目录 第五十六章 学习常用技能 - 执行 SQL 查询执行 SQL 查询检查对象属性 第五十六章 学习常用技能 - 执行 SQL 查询 执行 SQL 查询 要运行 SQL 查询&#xff0c;请在管理门户中执行以下操作&#xff1a; 选择系统资源管理器 > SQL。如果需要&#xff0c;请选择标题…

数据在内存中的存储(2)

文章目录 3. 浮点型在内存中的存储3.1 一个例子3.2 浮点数存储规则 3. 浮点型在内存中的存储 常见的浮点数&#xff1a; 3.14159 1E10 ------ 1.0 * 10^10 浮点数家族包括&#xff1a; float、double、long double 类型 浮点数表示的范围&#xff1a;float.h中定义 3.1 一个例…

mask-R-CNN

前言 代码 论文 # Mask-rcnn 算法在 torch vision 中有直接实现&#xff0c;可以直接引用使用在自己的工作中。 import torchvision model torchvision.models.detection.maskrcnn_resnet50_fpn(weightsMaskRCNN_ResNet50_FPN_Weights.DEFAULT)Mask R-CNN&#xff08;Mask R…

Linux开发-Ubuntu软件源工具

开发&验证环境&#xff1a; 操作系统&#xff1a;ubuntu 20.04 软件源&#xff1a;http://archive.ubuntu.com/ubuntu 开发工具 sudo apt install vim sudo apt install git sudo apt install git-lfs# gnu工具链 sudo apt install gcc sudo apt install g sudo apt inst…

yolov8如何进行训练验证推理

1、新建脚本main.py&#xff0c;也可以建一个yaml文件&#xff08;避免改到default.yaml&#xff09;&#xff0c;这个yaml文件是在训练时用到 batchsize什么的都可以在yaml文件改&#xff0c;这俩东西不用填 2、两种训练的方法&#xff0c;用的时候可以注释掉其他 from u…

【Python学习笔记】函数

1. 函数组成 Python中&#xff0c;我们是这样定义函数的&#xff1a; def function(para1, para2):print("func start")print(para1)print(para2)print("func end")print("让技术总监面试 求职者")return "func return"def 是关键字…