rocketmq 初探(二)

大家好,我是烤鸭:

    上一篇简单介绍和rocketmq,这一篇看下源码之注册中心。

namesrv

先看两个初始化方法
NamesrvController.initialize() 和 NettyRemotingServer.start();

public boolean initialize() {// 加载配置文件this.kvConfigManager.load();// 创建 NettyRemotingServer 并初始化参数this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));// 将刚才的线程池和netty server 绑定this.registerProcessor();// 每隔10s检测最近120s不活跃的broker并移除this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 每隔10分钟输出一下配置this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);// 如果想用tls,ssl协议的话,需要证书构造 sslContextif (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;
}

NettyRemotingServer.start()

public void start() {// 用刚才初始化的线程池创建线程this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 构建netty 相关的handler,包含连接、读数据、解码、请求和响应处理prepareSharableHandlers();// 创建netty server,使用初始化的参数和刚才的handler初始化channelServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 启动channel的监听器,针对channel的连接、关闭、异常、空闲(后面其他的实现都是关闭逻辑)if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 每秒处理过时的响应,如果超时时间+1秒没响应,就移除该请求并手动回调(由于注册中心没有对外发请求,所以没用到,client和server用到了)this.timer.scheduleAtFixedRate(new TimerTask() {从@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}

再看下 NettyClientHandler,对请求和响应指令进行处理

/*** Entry of incoming command processing.** <p>* <strong>Note:</strong>* The incoming remoting command may be* <ul>* <li>An inquiry request from a remote peer component;</li>* <li>A response to a previous request issued by this very participant.</li>* </ul>* </p>** @param ctx Channel handler context.* @param msg incoming remoting command.* @throws Exception if there were any error while processing the incoming command.*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:// 接收请求并处理processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:// 接收响应,维护responseTable(注册中心用不到)processResponseCommand(ctx, cmd);break;default:break;}}
}

由于注册中心没有发起 request,看下 processRequestCommand(接收request)

/*** Process incoming request command issued by remote peer.** @param ctx channel handler context.* @param cmd request command.*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {// request的code在 RequestCode 类维护,包括 发送、拉取等等final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;// 自增计数final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {// ACL鉴权 (client端和broker使用)doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};// 异步 or 同步if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {NettyRequestProcessor processor = pair.getObject1();// 比较重要的地方,单独分析RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};// 系统繁忙,注册中心不会提示这个(broker 刷盘不及时会报这个)if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {// 避免日志打印的太多if ((System.currentTimeMillis() % 10000) == 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}// 不是单向请求(onewayRPC,线程池满的话,直接返回系统繁忙)if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}
}

我们先看一下 NettyRequestProcessor.processRequest 实现

在这里插入图片描述

DefaultRequestProcessor.processRequest

其实看名字就能看出来 注册中心的操作了

public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {case RequestCode.PUT_KV_CONFIG:// admin调用,配置添加到 configTable,定期打印return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:// admin调用,获取配置return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:// admin调用,删除配置return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:// broker 获取topic配置return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER:// 注册broker,版本不同处理逻辑有些不一样(topic配置信息封装不同)Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:// 下线 brokerreturn this.unregisterBroker(ctx, request);case RequestCode.GET_ROUTEINFO_BY_TOPIC:// 根据topic获取路由信息,获取的key是 ORDER_TOPIC_CONFIG+topicidreturn this.getRouteInfoByTopic(ctx, request);case RequestCode.GET_BROKER_CLUSTER_INFO:// 获取broker 集群信息return this.getBrokerClusterInfo(ctx, request);case RequestCode.WIPE_WRITE_PERM_OF_BROKER:// 废除broker的写入权限return this.wipeWritePermOfBroker(ctx, request);case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:// 获取所有的topicreturn getAllTopicListFromNameserver(ctx, request);case RequestCode.DELETE_TOPIC_IN_NAMESRV:// 删除topicreturn deleteTopicInNamesrv(ctx, request);case RequestCode.GET_KVLIST_BY_NAMESPACE:// 根据namespace获取配置return this.getKVListByNamespace(ctx, request);case RequestCode.GET_TOPICS_BY_CLUSTER:// 根据cluster下的broker获取topicreturn this.getTopicsByCluster(ctx, request);case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:// 获取cluster、broker和关联信息return this.getSystemTopicListFromNs(ctx, request);case RequestCode.GET_UNIT_TOPIC_LIST:// 设置unit_mode true && 非重试的时候,这个配置好像没用啊(https://github.com/apache/rocketmq/issues/639)return this.getUnitTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:// 设置unit_mode true(校验消息和心跳的时候),获取topicreturn this.getHasUnitSubTopicList(ctx, request);case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:// !getUnitTopicList && getHasUnitSubTopicListreturn this.getHasUnitSubUnUnitTopicList(ctx, request);case RequestCode.UPDATE_NAMESRV_CONFIG:// 更新注册中心配置return this.updateConfig(ctx, request);case RequestCode.GET_NAMESRV_CONFIG:// 获取注册中心配置return this.getConfig(ctx, request);default:break;}return null;
}

小结

注册中心的作用:

存了 cluster、broker、topic的信息。

提供了一些接口,可以broker注册和下线,修改配置等。

检测和维护broker是否活跃。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/412519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2 JVM 运行机制

转载于:https://www.cnblogs.com/likevin/p/10186591.html

WIN10远程连接时提示内部错误

微软官方的解决方案是重置远程连接设置&#xff0c;步骤如下&#xff1a; 1、以管理员身份运行命令提示符 2、输入以下命令&#xff1a; netsh winsoc reset 随后会提示重启电脑&#xff0c;遂解决。 3、重启后还不行的话&#xff0c;再试试删除掉远程连接保存的凭据&#xff0…

rocketmq 初探(四)

大家好&#xff0c;我是烤鸭&#xff1a; 上一篇简单介绍broker的初始化&#xff0c;这一篇介绍 NettyRequestProcessor 的实现(主要是broker里用到的)。 AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor NettyRequestProce…

《自律100天,穿越人生盲点》读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《自律100天&#xff0c;穿越人生盲点》&#xff0c;读书笔记。 第一章 “自律100天”的华丽开启 第一节 “自律100天”的底层逻辑 习惯没办法用金钱换&#xff0c;只能用时间。 训练延迟满足(增强自控、培养耐心、减少短期诱惑…

2021 年终总结

2021 年终总结 大家好&#xff0c;我是烤鸭&#xff0c;这是一篇无关技术的记录&#xff0c;总结一下这一年干了什么。 年初的目标 减肥 炒股回本 PMP拿证 多赚钱 多学习新技术 坚持写博客(一周一篇) 多看书 没达成的目标 减肥这个事&#xff0c;拖了好久&#xff0c…

记一次线上cpu飙升100%的排查过程

大家好&#xff0c;我是烤鸭&#xff1a; 最近没怎么写技术文章&#xff0c;还是得回归下初心&#xff0c;正好前几天出现个线上问题&#xff0c;记录下排查过程。 问题描述 某个时间点&#xff0c;接收到接口响应慢报警。 过一会收到服务器cpu可用率低(<10%)报警。 去c…

Node.js(爱前端) 一

一 Node.js 简介 1.1 官网 https://nodejs.org/en/ 官网介绍&#xff1a; Node.js是一个构建在 Chrome 浏览器V8引擎上的 JavaScript 运行环境。 Node.js 使用了事件驱动、非阻塞I/O模型&#xff0c;这些都使它轻量、好用。 Node.js 的包生态&#xff08;npm&#xff09;&#…

记一次线上服务假死排查过程

大家好&#xff0c;我是烤鸭&#xff1a; 最近线上问题有点多啊&#xff0c;分享一个服务假死的排查过程。 问题描述 9点10分&#xff0c;收到进程无响应报警(一共6台机器&#xff0c;有1台出现)&#xff0c;后来又有1台出现。 排查思路 首先确认是否误报或者网络抖动&…

vue小记

1.vue绑定属性&#xff0c;点击事件 1.<!-- 完整语法 --> <a v-bind:href"url">...</a><!-- 缩写 --> <a :href"url">...</a>2.<!-- 完整语法 --> <a v-on:click"doSomething">...</a>&l…

nacos注册中心自动上下负载

大家好&#xff0c;我是烤鸭&#xff1a; 还有2天就过年了&#xff0c;祝大家新年快乐。最近好久没写技术文章了&#xff0c;还是得回归下主业&#xff0c;今天分享下nacos注册中心自动上下负载的方式和组件。 组件版本 <properties><java.version>1.8</java.v…

windows10 C盘清理

大家好&#xff0c;我是烤鸭&#xff1a; 身为一个号称修电脑的&#xff0c;磁盘清理是必备技能了。前几天刚出的新闻 男子帮女友清理电脑C盘&#xff0c;扫出17万个文件。 想必大家都经历过清理C盘的痛苦&#xff0c;这两天正好又清了&#xff0c;分享下。 先给个结论&#…

《实现领域驱动设计》读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《实现领域驱动设计》&#xff0c;读书笔记&#xff0c;贴个封面&#xff0c;要不不知道是哪本。 了解概念 刚开始接触DDD&#xff0c;肯定懵逼&#xff0c;很多名词&#xff0c;一点点看下。 领域&#xff1a;带有业务属性的范…

spring junit单元测试

项目是有很多个功能块组成的&#xff0c;我们开发的时候&#xff0c;当我们开发出来一个功能&#xff0c;想要测试这个功能是否正确&#xff0c;不可能等到前端和后端全部写好了再进行测试&#xff0c;这样太浪费时间&#xff0c;有没有什么方法能直接测试后台的功能写的是否正…

windows docker redis

大家好&#xff0c;我是烤鸭&#xff1a; docker真的太方便了&#xff0c;尤其是对windows系统&#xff0c;友好的不得了。以前还只能是正版的专业版才能用&#xff0c;现在已经没有限制了&#xff0c;虽然加了收费&#xff0c;个人用免费就够了。redis 新版也不支持windows系统…

[css] CSS3新增伪类有哪些并简要描述

[css] CSS3新增伪类有哪些并简要描述 个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

模拟微信自动化发送(微信公众号文章自动点击)

大家好&#xff0c;我是烤鸭&#xff1a; 分享个微信自动化发送的新方式&#xff0c;仅技术分享。 本来是公众号文章抓取相关的&#xff0c;审核一直不过&#xff0c;将就看吧。 需要的工具 Java&#xff08;jdk1.8&#xff09; Fiddler Python&#xff08;3.8&#xff09;…

Entity FrameWork 操作使用详情

Entity FrameWork 是以ADO.net为基础发展的ORM解决方案。 一、安装Entity FrameWork框架 二、添加ADO.Net实体数据模型 三、EF插入数据 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace EFDem…

网络通信中TCP出现的黏包以及解决方法 socket 模拟黏包

粘包问题概述 1.1 描述背景 采用TCP协议进行网络数据传送的软件设计中&#xff0c;普遍存在粘包问题。这主要是由于现代操作系统的网络传输机制所产生的。我们知道&#xff0c;网络通信采用的套接字(socket)技术&#xff0c;其实现实际是由系统内核提供一片连续缓存(流缓冲)来…

windows docker redis 集群部署

大家好&#xff0c;我是烤鸭&#xff1a; 上次分享了windows docker redis&#xff0c;这么快就不够用了&#xff0c;单机的不行&#xff0c;整个集群的&#xff0c;看了网上的教程都好麻烦&#xff0c;简单点。 单机的&#xff1a;https://blog.csdn.net/Angry_Mills/article…

Codeforces Round #530 Div. 1 自闭记

A&#xff1a;显然应该让未确定的大小尽量大。不知道写了啥就wa了一发。 #include<iostream> #include<cstdio> #include<cmath> #include<cstdlib> #include<cstring> #include<algorithm> using namespace std; #define ll long long #…