rocketmq 初探(四)

大家好,我是烤鸭:

    上一篇简单介绍broker的初始化,这一篇介绍 NettyRequestProcessor 的实现(主要是broker里用到的)。

AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor

NettyRequestProcessor

/*** Common remoting command processor*/
public interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();}

先看下哪些是broker里用到的,从包名就能看出来(remoting包下的后边看),接下来一个一个分析。

在这里插入图片描述

AdminBrokerProcessor (后台发起的 CURD)

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {case RequestCode.UPDATE_AND_CREATE_TOPIC:return this.updateAndCreateTopic(ctx, request);//...default:break;}return null;
}

ClientManageProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.HEART_BEAT:// 心跳,针对consumer,如果心跳的信息有更新(group和subscribe),会updatereturn this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT:// 下线,针对 producer和consumer, producer和consumer 下线return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:// 通过注册的filter校验client的configreturn this.checkClientConfig(ctx, request);default:break;}return null;
}

ConsumerManageProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.GET_CONSUMER_LIST_BY_GROUP:// 根据group获取customer的listreturn this.getConsumerListByGroup(ctx, request);case RequestCode.UPDATE_CONSUMER_OFFSET:// 更新topic@group对应的queueId和offset(没有就直接put,已存在的话如果要更新的offset<本地offset,记录日志)return this.updateConsumerOffset(ctx, request);case RequestCode.QUERY_CONSUMER_OFFSET:// 根据topic@group、queueId获取offset(如果offset<0,并且在磁盘没有刷到内存,返回0,否则返回 QUERY_NOT_FOUND)return this.queryConsumerOffset(ctx, request);default:break;}return null;
}

EndTransactionProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);// salve 不支持事务if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}// 来源于事务检查(ClientRemotingProcessor.processRequest)if (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {// 非事务类型,直接返回case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}// commit, break 执行执行case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();// 提交if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 根据offset从commitlog中获取当前的 halfmessage(半消息)result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 校验半消息的合法性,group、事务队列的offset、commitlog的offset是否一致RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 根据半消息构建一条事务消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 发送最终消息,消息刷到commitlogRemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// 最终消息发送成功,在commitlog里原msg打一个'd'的标记this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}// 回滚} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 流程同提交差不多,半小时获取成功之后,在commitlog里原msg打一个'd'的标记result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;
}

ForwardRequestProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {return null;
}

小结

介绍了4个 processor:

AdminBrokerProcessor:给console台提供的 curd接口

ClientManageProcessor:针对client端的请求,包含 心跳(上线)、下线、配置检查

ConsumerManageProcessor:针对consumer的,包含 获取consumer列表、更新queueId和offset、获取offset

EndTransactionProcessor:提交模式,从commitlog根据offset获取半消息,写入commitlog,原半消息打一个’d’的标记。

回滚模式,从commitlog根据offset获取半消息,原半消息打一个’d’的标记。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/412512.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《自律100天,穿越人生盲点》读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《自律100天&#xff0c;穿越人生盲点》&#xff0c;读书笔记。 第一章 “自律100天”的华丽开启 第一节 “自律100天”的底层逻辑 习惯没办法用金钱换&#xff0c;只能用时间。 训练延迟满足(增强自控、培养耐心、减少短期诱惑…

2021 年终总结

2021 年终总结 大家好&#xff0c;我是烤鸭&#xff0c;这是一篇无关技术的记录&#xff0c;总结一下这一年干了什么。 年初的目标 减肥 炒股回本 PMP拿证 多赚钱 多学习新技术 坚持写博客(一周一篇) 多看书 没达成的目标 减肥这个事&#xff0c;拖了好久&#xff0c…

记一次线上cpu飙升100%的排查过程

大家好&#xff0c;我是烤鸭&#xff1a; 最近没怎么写技术文章&#xff0c;还是得回归下初心&#xff0c;正好前几天出现个线上问题&#xff0c;记录下排查过程。 问题描述 某个时间点&#xff0c;接收到接口响应慢报警。 过一会收到服务器cpu可用率低(<10%)报警。 去c…

Node.js(爱前端) 一

一 Node.js 简介 1.1 官网 https://nodejs.org/en/ 官网介绍&#xff1a; Node.js是一个构建在 Chrome 浏览器V8引擎上的 JavaScript 运行环境。 Node.js 使用了事件驱动、非阻塞I/O模型&#xff0c;这些都使它轻量、好用。 Node.js 的包生态&#xff08;npm&#xff09;&#…

记一次线上服务假死排查过程

大家好&#xff0c;我是烤鸭&#xff1a; 最近线上问题有点多啊&#xff0c;分享一个服务假死的排查过程。 问题描述 9点10分&#xff0c;收到进程无响应报警(一共6台机器&#xff0c;有1台出现)&#xff0c;后来又有1台出现。 排查思路 首先确认是否误报或者网络抖动&…

vue小记

1.vue绑定属性&#xff0c;点击事件 1.<!-- 完整语法 --> <a v-bind:href"url">...</a><!-- 缩写 --> <a :href"url">...</a>2.<!-- 完整语法 --> <a v-on:click"doSomething">...</a>&l…

nacos注册中心自动上下负载

大家好&#xff0c;我是烤鸭&#xff1a; 还有2天就过年了&#xff0c;祝大家新年快乐。最近好久没写技术文章了&#xff0c;还是得回归下主业&#xff0c;今天分享下nacos注册中心自动上下负载的方式和组件。 组件版本 <properties><java.version>1.8</java.v…

windows10 C盘清理

大家好&#xff0c;我是烤鸭&#xff1a; 身为一个号称修电脑的&#xff0c;磁盘清理是必备技能了。前几天刚出的新闻 男子帮女友清理电脑C盘&#xff0c;扫出17万个文件。 想必大家都经历过清理C盘的痛苦&#xff0c;这两天正好又清了&#xff0c;分享下。 先给个结论&#…

《实现领域驱动设计》读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《实现领域驱动设计》&#xff0c;读书笔记&#xff0c;贴个封面&#xff0c;要不不知道是哪本。 了解概念 刚开始接触DDD&#xff0c;肯定懵逼&#xff0c;很多名词&#xff0c;一点点看下。 领域&#xff1a;带有业务属性的范…

spring junit单元测试

项目是有很多个功能块组成的&#xff0c;我们开发的时候&#xff0c;当我们开发出来一个功能&#xff0c;想要测试这个功能是否正确&#xff0c;不可能等到前端和后端全部写好了再进行测试&#xff0c;这样太浪费时间&#xff0c;有没有什么方法能直接测试后台的功能写的是否正…

windows docker redis

大家好&#xff0c;我是烤鸭&#xff1a; docker真的太方便了&#xff0c;尤其是对windows系统&#xff0c;友好的不得了。以前还只能是正版的专业版才能用&#xff0c;现在已经没有限制了&#xff0c;虽然加了收费&#xff0c;个人用免费就够了。redis 新版也不支持windows系统…

[css] CSS3新增伪类有哪些并简要描述

[css] CSS3新增伪类有哪些并简要描述 个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

模拟微信自动化发送(微信公众号文章自动点击)

大家好&#xff0c;我是烤鸭&#xff1a; 分享个微信自动化发送的新方式&#xff0c;仅技术分享。 本来是公众号文章抓取相关的&#xff0c;审核一直不过&#xff0c;将就看吧。 需要的工具 Java&#xff08;jdk1.8&#xff09; Fiddler Python&#xff08;3.8&#xff09;…

Entity FrameWork 操作使用详情

Entity FrameWork 是以ADO.net为基础发展的ORM解决方案。 一、安装Entity FrameWork框架 二、添加ADO.Net实体数据模型 三、EF插入数据 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace EFDem…

网络通信中TCP出现的黏包以及解决方法 socket 模拟黏包

粘包问题概述 1.1 描述背景 采用TCP协议进行网络数据传送的软件设计中&#xff0c;普遍存在粘包问题。这主要是由于现代操作系统的网络传输机制所产生的。我们知道&#xff0c;网络通信采用的套接字(socket)技术&#xff0c;其实现实际是由系统内核提供一片连续缓存(流缓冲)来…

windows docker redis 集群部署

大家好&#xff0c;我是烤鸭&#xff1a; 上次分享了windows docker redis&#xff0c;这么快就不够用了&#xff0c;单机的不行&#xff0c;整个集群的&#xff0c;看了网上的教程都好麻烦&#xff0c;简单点。 单机的&#xff1a;https://blog.csdn.net/Angry_Mills/article…

Codeforces Round #530 Div. 1 自闭记

A&#xff1a;显然应该让未确定的大小尽量大。不知道写了啥就wa了一发。 #include<iostream> #include<cstdio> #include<cmath> #include<cstdlib> #include<cstring> #include<algorithm> using namespace std; #define ll long long #…

自研redis sdk支持自动dns切换(附源码)

大家好&#xff0c;我是烤鸭&#xff1a; 标题起的有点大了&#xff0c;说是自研&#xff0c;其实就是个封装&#xff0c;不过倒是解决了dns切换的问题&#xff08;虽然不太优雅&#xff09;。 背景 之前做活动的时候&#xff0c;用域名链接的redis&#xff0c;当时做了主备集…