rocketmq 初探(四)

大家好,我是烤鸭:

    上一篇简单介绍broker的初始化,这一篇介绍 NettyRequestProcessor 的实现(主要是broker里用到的)。

AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor

NettyRequestProcessor

/*** Common remoting command processor*/
public interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();}

先看下哪些是broker里用到的,从包名就能看出来(remoting包下的后边看),接下来一个一个分析。

在这里插入图片描述

AdminBrokerProcessor (后台发起的 CURD)

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {case RequestCode.UPDATE_AND_CREATE_TOPIC:return this.updateAndCreateTopic(ctx, request);//...default:break;}return null;
}

ClientManageProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.HEART_BEAT:// 心跳,针对consumer,如果心跳的信息有更新(group和subscribe),会updatereturn this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT:// 下线,针对 producer和consumer, producer和consumer 下线return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:// 通过注册的filter校验client的configreturn this.checkClientConfig(ctx, request);default:break;}return null;
}

ConsumerManageProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.GET_CONSUMER_LIST_BY_GROUP:// 根据group获取customer的listreturn this.getConsumerListByGroup(ctx, request);case RequestCode.UPDATE_CONSUMER_OFFSET:// 更新topic@group对应的queueId和offset(没有就直接put,已存在的话如果要更新的offset<本地offset,记录日志)return this.updateConsumerOffset(ctx, request);case RequestCode.QUERY_CONSUMER_OFFSET:// 根据topic@group、queueId获取offset(如果offset<0,并且在磁盘没有刷到内存,返回0,否则返回 QUERY_NOT_FOUND)return this.queryConsumerOffset(ctx, request);default:break;}return null;
}

EndTransactionProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);// salve 不支持事务if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}// 来源于事务检查(ClientRemotingProcessor.processRequest)if (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {// 非事务类型,直接返回case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}// commit, break 执行执行case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();// 提交if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 根据offset从commitlog中获取当前的 halfmessage(半消息)result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 校验半消息的合法性,group、事务队列的offset、commitlog的offset是否一致RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 根据半消息构建一条事务消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 发送最终消息,消息刷到commitlogRemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// 最终消息发送成功,在commitlog里原msg打一个'd'的标记this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}// 回滚} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 流程同提交差不多,半小时获取成功之后,在commitlog里原msg打一个'd'的标记result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;
}

ForwardRequestProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {return null;
}

小结

介绍了4个 processor:

AdminBrokerProcessor:给console台提供的 curd接口

ClientManageProcessor:针对client端的请求,包含 心跳(上线)、下线、配置检查

ConsumerManageProcessor:针对consumer的,包含 获取consumer列表、更新queueId和offset、获取offset

EndTransactionProcessor:提交模式,从commitlog根据offset获取半消息,写入commitlog,原半消息打一个’d’的标记。

回滚模式,从commitlog根据offset获取半消息,原半消息打一个’d’的标记。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/412512.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

iOS 去除警告 看我就够了

你是不是看着开发过程中出现的一堆的警告会心情一阵烦躁&#xff0c;别烦躁了&#xff0c;看完此文章&#xff0c;消除警告的小尾巴。 一、SVN 操作导致的警告 1.svn删除文件后报错 ”xx“is missing from working copy 使用命令sudo find 工程项目路径 -name ".svn"…

[css] 什么是FOUC?你是如何避免FOUC的?

[css] 什么是FOUC&#xff1f;你是如何避免FOUC的&#xff1f; FOUC 即 Flash of Unstyled Content&#xff0c;是指页面一开始以样式 A&#xff08;或无样式&#xff09;的渲染&#xff0c;突然变成样式B。 原因是样式表的晚于 HTML 加载导致页面重新进行绘制。通过 import 方…

rocketmq 初探(五)

大家好&#xff0c;我是烤鸭&#xff1a; 上一篇简单介绍部分 NettyRequestProcessor (AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor)&#xff0c;这一篇介绍其他的。 PullMessageProcessor、QueryMessageProcessor、Repl…

Python 装饰器初探

Python 装饰器初探 在谈及Python的时候&#xff0c;装饰器一直就是道绕不过去的坎。面试的时候&#xff0c;也经常会被问及装饰器的相关知识。总感觉自己的理解很浅显&#xff0c;不够深刻。是时候做出改变&#xff0c;对Python的装饰器做个全面的了解了。 1. 函数装饰器 直接上…

[css] 解释下 CSS sprites的原理和优缺点分别是什么

[css] 解释下 CSS sprites的原理和优缺点分别是什么 我来说下我的观点 原理&#xff1a; 多张图合并成一张图优点&解决的问题hover效果&#xff0c;如果是多个图片&#xff0c;网络正常的情况下首次会闪烁一下。如果是断网情况下&#xff0c;就没图片了。sprites 就很好的…

《自律100天,穿越人生盲点》读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《自律100天&#xff0c;穿越人生盲点》&#xff0c;读书笔记。 第一章 “自律100天”的华丽开启 第一节 “自律100天”的底层逻辑 习惯没办法用金钱换&#xff0c;只能用时间。 训练延迟满足(增强自控、培养耐心、减少短期诱惑…

递推数列

题目描述 给定a0,a1,以及anpa(n-1) qa(n-2)中的p,q。这里n > 2。 求第k个数对10000的模。 输入描述: 输入包括5个整数&#xff1a;a0、a1、p、q、k。 输出描述: 第k个数a(k)对10000的模。 分析 循环求出ak即可 #include <iostream>using namespace std;int main(){in…

[css] 请描述margin边界叠加是什么及解决方案

[css] 请描述margin边界叠加是什么及解决方案 1&#xff0c;使用padding代替&#xff0c;但是父盒子要减去相应的高度 2&#xff0c;使用boder&#xff08;透明&#xff09;代替&#xff08;不推荐&#xff0c;不符合书写规范&#xff0c;如果父盒子子盒子时有颜色的不好处理&…

从线上慢sql看explain关键字

大家好&#xff0c;我是烤鸭&#xff1a; 最近有点忙的头晕&#xff0c;又懒又累&#xff0c;正好线上遇到慢sql的问题&#xff0c;就说下 MySQL Explain 关键字的解析和使用示例。 explain 关键字说明 使用explain关键字可以模拟优化器执行sql查询语句&#xff0c;从而得…

[css] style标签写在body前和body后的区别是什么?

[css] style标签写在body前和body后的区别是什么&#xff1f; 渲染机制的区别&#xff0c;在body前是已经把样式浏览一遍&#xff0c;到了对应标签直接&#xff0c;渲染样式。显示块。 在body后&#xff0c;是浏览器已经把标签浏览了&#xff0c;但基于没有样式&#xff0c;显…

自然语言处理的一些链接

Word2Vec Tutorial - The Skip-Gram ModelVisualizing A Neural Machine Translation Model (Mechanics of Seq2seq Models With Attention) 转载于:https://www.cnblogs.com/linyihai/p/10200351.html

《Java并发编程实践-第一部分》-读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《Java并发编程实战-第一部分》-读书笔记。 第一章&#xff1a;介绍 1.1 并发历史&#xff1a; 多个程序在各自的进程中执行&#xff0c;由系统分配资源&#xff0c;如&#xff1a;内存、文件句柄、安全证书。进程间通信方式&#x…

[css] 说说你对css盒子模型的理解

[css] 说说你对css盒子模型的理解 css盒模型由两个盒子组成&#xff0c;外在的控制是否换行的盒子&#xff0c;以及内在的控制元素内容的盒子。比如&#xff1a;display: inline-block, 则它的外在的盒子就是inline也就是不占据一行&#xff0c;而block则表示内部的元素具有块状…

go语言基础之格式化输出

1、fmt包的格式化输出输入 格式说明 格式 含义 %% 一个%字面量 %b 一个二进制整数值(基数为2)&#xff0c;或者是一个(高级的)用科学计数法表示的指数为2的浮点数 %c 字符型。可以把输入的数字按照ASCII码相应转换为对应的字符 %d 一个十进制数值(基数为10) %e 以科…

2021 年终总结

2021 年终总结 大家好&#xff0c;我是烤鸭&#xff0c;这是一篇无关技术的记录&#xff0c;总结一下这一年干了什么。 年初的目标 减肥 炒股回本 PMP拿证 多赚钱 多学习新技术 坚持写博客(一周一篇) 多看书 没达成的目标 减肥这个事&#xff0c;拖了好久&#xff0c…

[css] ::before和:after中单冒号和双冒号的区别是什么,这两个伪元素有什么作用?

[css] ::before和:after中单冒号和双冒号的区别是什么&#xff0c;这两个伪元素有什么作用&#xff1f; 区别&#xff1a;伪元素在css1中已经存在当时用单冒号&#xff0c;css3时做了修订用双冒号 ::before ::after表示伪元素用来区别伪类。作用&#xff1a;在元素前面&#x…

[css] css常用的布局方式有哪些?

[css] css常用的布局方式有哪些&#xff1f; 1&#xff1a;圣杯布局 2&#xff1a;双飞翼 3&#xff1a;flex个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

记一次线上cpu飙升100%的排查过程

大家好&#xff0c;我是烤鸭&#xff1a; 最近没怎么写技术文章&#xff0c;还是得回归下初心&#xff0c;正好前几天出现个线上问题&#xff0c;记录下排查过程。 问题描述 某个时间点&#xff0c;接收到接口响应慢报警。 过一会收到服务器cpu可用率低(<10%)报警。 去c…

[css] 对比下px、em、rem有什么不同?

[css] 对比下px、em、rem有什么不同&#xff1f; px是css中的逻辑像素&#xff0c;和移动端的物理像素之间会有一个比值dpr em是指相对于父元素的大小 rem中的r就是root&#xff0c;也就是相对于root元素的大小&#xff08;html标签&#xff09;个人简介 我是歌谣&#xff0c…

Node.js(爱前端) 一

一 Node.js 简介 1.1 官网 https://nodejs.org/en/ 官网介绍&#xff1a; Node.js是一个构建在 Chrome 浏览器V8引擎上的 JavaScript 运行环境。 Node.js 使用了事件驱动、非阻塞I/O模型&#xff0c;这些都使它轻量、好用。 Node.js 的包生态&#xff08;npm&#xff09;&#…