rocketmq 初探(三)

大家好,我是烤鸭:

    上一篇介绍了注册中心,这一篇看下broker。基于 rocketmq 4.9 版本。

BrokerStartup#BrokerController

按照代码的先后顺序撸源码:

BrokerController.createBrokerController

public static BrokerController createBrokerController(String[] args) {// ...try {// ...final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();// salve节点的messageMmeory 比例由40% 降低至 30%if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}// ...switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:// 0 是masterbrokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}// DLeger模式,brokerId 为 -1if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}// ...final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);// 初始化boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注册 shutdown 事件Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

BrokerController.initialize()

public boolean initialize() throws CloneNotSupportedException {// 加载本地配置文件(topic,consumerOffset等信息),加载不到加载 .bak文件boolean result = this.topicConfigManager.load();result = result && this.consumerOffsetManager.load();result = result && this.subscriptionGroupManager.load();result = result && this.consumerFilterManager.load();// 初始化 messageStore(持久化)if (result) {try {this.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}// commitlog、consumer和topic关系、索引恢复(临时文件存在的话)result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);// netty 配置NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));// 不同的线程池,注册到对应的processorthis.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));// ...// 线程池注册到processor,后续仔细说下this.registerProcessor();final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;// 延迟1天,每天记录昨天存取的消息数量this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);// 每隔5s检查是否更新consumer和offset数据this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);// 每隔10s检查是否更新consumer过滤规则数据this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);// 每隔3分钟检查,开启consumer消费过慢后移除该consumer(默认关闭)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);// 每秒打印send\pull\query\transaction队列大小和的最慢的消费耗时this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);// 主broker同步从broker的时候重试this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);// 没配置注册中心的话,会每隔2分钟去拉取(url为默认读取系统变量 rocketmq.namesrv.domain:8080/rocketmq)if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}// DLeger模式,从节点不需要定期更新 HA主节点地址if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}} else {// 每分钟打印主节点和从节点的offset的不同this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}// tls模式需要证书建立sslif (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}// 事务初始化initialTransaction();// acl鉴权初始化initialAcl();// rpc钩子初始化(acl和匿名的)initialRpcHooks();}return result;
}

BrokerStartup.start()

public void start() throws Exception {// 消息存储,包含 commitLog 和 集群模式下消息同步if (this.messageStore != null) {this.messageStore.start();}// nettyServer 的初始化,初探(二)有详细的if (this.remotingServer != null) {this.remotingServer.start();}// 同上if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}// 每500ms监测 tls的3个证书,如果变了就重新加载(tls.server.certPath...)if (this.fileWatchService != null) {this.fileWatchService.start();}// nettyRemotingClient.satrt,一会单独看下if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}// 需要hold的拉取请求统一处理,每隔5s或1s检测消息是否到达if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}// 每10s扫描 producer、consumer、broker 的在线情况	if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}// 过滤服务器,在broker机器启动多个filter进程用来进程consumer消息过滤if (this.filterServerManager != null) {this.filterServerManager.start();}// 启用DLedger,slave 每隔10s更新配置,broker注册到nameserverif (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}// 随机 30-60s,定时 broker注册到nameserverthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}// 快速失败,如果是刷盘过慢,返回 system busy,清理队列(发送、拉取、心跳、请求)if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

小结

这篇主要是介绍了broker的init和start。

DLegerCommit开启模式(替代原有的commitLog,可以直接读取CommitLog的API)

初始化:

  • 消费持久化 messageStore

  • commitlog、consumer和topic关系、索引恢复(临时文件存在的话)

  • netty配置(remotingServer 和 fastRemotingServer)

  • processor(PullMessage、SendMessage 等)

  • 定时器(记录消费总量、更新consumer和offset数据 等)

  • acl鉴权、事务、rpchook

启动:

  • 消息持久化 messageStore
  • netty server启动(remotingServer 和 fastRemotingServer)
  • tls模式检测证书变化
  • pullRequestHold模式下,启动监听消息
  • 每10s扫描 producer、consumer、broker 的在线情况
  • 启用DLedger,slave 每隔10s更新配置,broker注册到nameserver
  • 随机 30-60s,定时 broker注册到nameserver
  • 快速失败,如果是刷盘过慢,返回 system busy,清理队列(发送、拉取、心跳、请求)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/412515.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WIN10远程连接时提示内部错误

微软官方的解决方案是重置远程连接设置&#xff0c;步骤如下&#xff1a; 1、以管理员身份运行命令提示符 2、输入以下命令&#xff1a; netsh winsoc reset 随后会提示重启电脑&#xff0c;遂解决。 3、重启后还不行的话&#xff0c;再试试删除掉远程连接保存的凭据&#xff0…

[css] css的属性content有什么作用呢?有哪些场景可以用到?

[css] css的属性content有什么作用呢&#xff1f;有哪些场景可以用到&#xff1f; content属性与 ::before 及 ::after 伪元素配合使用生成文本内容通过attr()将选择器对象的属性作为字符串进行显示&#xff0c;如&#xff1a;a::after{content: attr(href)} <a href"h…

rocketmq 初探(四)

大家好&#xff0c;我是烤鸭&#xff1a; 上一篇简单介绍broker的初始化&#xff0c;这一篇介绍 NettyRequestProcessor 的实现(主要是broker里用到的)。 AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor NettyRequestProce…

iOS 去除警告 看我就够了

你是不是看着开发过程中出现的一堆的警告会心情一阵烦躁&#xff0c;别烦躁了&#xff0c;看完此文章&#xff0c;消除警告的小尾巴。 一、SVN 操作导致的警告 1.svn删除文件后报错 ”xx“is missing from working copy 使用命令sudo find 工程项目路径 -name ".svn"…

[css] 什么是FOUC?你是如何避免FOUC的?

[css] 什么是FOUC&#xff1f;你是如何避免FOUC的&#xff1f; FOUC 即 Flash of Unstyled Content&#xff0c;是指页面一开始以样式 A&#xff08;或无样式&#xff09;的渲染&#xff0c;突然变成样式B。 原因是样式表的晚于 HTML 加载导致页面重新进行绘制。通过 import 方…

rocketmq 初探(五)

大家好&#xff0c;我是烤鸭&#xff1a; 上一篇简单介绍部分 NettyRequestProcessor (AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor)&#xff0c;这一篇介绍其他的。 PullMessageProcessor、QueryMessageProcessor、Repl…

Python 装饰器初探

Python 装饰器初探 在谈及Python的时候&#xff0c;装饰器一直就是道绕不过去的坎。面试的时候&#xff0c;也经常会被问及装饰器的相关知识。总感觉自己的理解很浅显&#xff0c;不够深刻。是时候做出改变&#xff0c;对Python的装饰器做个全面的了解了。 1. 函数装饰器 直接上…

[css] 解释下 CSS sprites的原理和优缺点分别是什么

[css] 解释下 CSS sprites的原理和优缺点分别是什么 我来说下我的观点 原理&#xff1a; 多张图合并成一张图优点&解决的问题hover效果&#xff0c;如果是多个图片&#xff0c;网络正常的情况下首次会闪烁一下。如果是断网情况下&#xff0c;就没图片了。sprites 就很好的…

《自律100天,穿越人生盲点》读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《自律100天&#xff0c;穿越人生盲点》&#xff0c;读书笔记。 第一章 “自律100天”的华丽开启 第一节 “自律100天”的底层逻辑 习惯没办法用金钱换&#xff0c;只能用时间。 训练延迟满足(增强自控、培养耐心、减少短期诱惑…

递推数列

题目描述 给定a0,a1,以及anpa(n-1) qa(n-2)中的p,q。这里n > 2。 求第k个数对10000的模。 输入描述: 输入包括5个整数&#xff1a;a0、a1、p、q、k。 输出描述: 第k个数a(k)对10000的模。 分析 循环求出ak即可 #include <iostream>using namespace std;int main(){in…

[css] 请描述margin边界叠加是什么及解决方案

[css] 请描述margin边界叠加是什么及解决方案 1&#xff0c;使用padding代替&#xff0c;但是父盒子要减去相应的高度 2&#xff0c;使用boder&#xff08;透明&#xff09;代替&#xff08;不推荐&#xff0c;不符合书写规范&#xff0c;如果父盒子子盒子时有颜色的不好处理&…

从线上慢sql看explain关键字

大家好&#xff0c;我是烤鸭&#xff1a; 最近有点忙的头晕&#xff0c;又懒又累&#xff0c;正好线上遇到慢sql的问题&#xff0c;就说下 MySQL Explain 关键字的解析和使用示例。 explain 关键字说明 使用explain关键字可以模拟优化器执行sql查询语句&#xff0c;从而得…

[css] style标签写在body前和body后的区别是什么?

[css] style标签写在body前和body后的区别是什么&#xff1f; 渲染机制的区别&#xff0c;在body前是已经把样式浏览一遍&#xff0c;到了对应标签直接&#xff0c;渲染样式。显示块。 在body后&#xff0c;是浏览器已经把标签浏览了&#xff0c;但基于没有样式&#xff0c;显…

自然语言处理的一些链接

Word2Vec Tutorial - The Skip-Gram ModelVisualizing A Neural Machine Translation Model (Mechanics of Seq2seq Models With Attention) 转载于:https://www.cnblogs.com/linyihai/p/10200351.html

《Java并发编程实践-第一部分》-读书笔记

大家好&#xff0c;我是烤鸭&#xff1a; 《Java并发编程实战-第一部分》-读书笔记。 第一章&#xff1a;介绍 1.1 并发历史&#xff1a; 多个程序在各自的进程中执行&#xff0c;由系统分配资源&#xff0c;如&#xff1a;内存、文件句柄、安全证书。进程间通信方式&#x…

[css] 说说你对css盒子模型的理解

[css] 说说你对css盒子模型的理解 css盒模型由两个盒子组成&#xff0c;外在的控制是否换行的盒子&#xff0c;以及内在的控制元素内容的盒子。比如&#xff1a;display: inline-block, 则它的外在的盒子就是inline也就是不占据一行&#xff0c;而block则表示内部的元素具有块状…

go语言基础之格式化输出

1、fmt包的格式化输出输入 格式说明 格式 含义 %% 一个%字面量 %b 一个二进制整数值(基数为2)&#xff0c;或者是一个(高级的)用科学计数法表示的指数为2的浮点数 %c 字符型。可以把输入的数字按照ASCII码相应转换为对应的字符 %d 一个十进制数值(基数为10) %e 以科…

2021 年终总结

2021 年终总结 大家好&#xff0c;我是烤鸭&#xff0c;这是一篇无关技术的记录&#xff0c;总结一下这一年干了什么。 年初的目标 减肥 炒股回本 PMP拿证 多赚钱 多学习新技术 坚持写博客(一周一篇) 多看书 没达成的目标 减肥这个事&#xff0c;拖了好久&#xff0c…

[css] ::before和:after中单冒号和双冒号的区别是什么,这两个伪元素有什么作用?

[css] ::before和:after中单冒号和双冒号的区别是什么&#xff0c;这两个伪元素有什么作用&#xff1f; 区别&#xff1a;伪元素在css1中已经存在当时用单冒号&#xff0c;css3时做了修订用双冒号 ::before ::after表示伪元素用来区别伪类。作用&#xff1a;在元素前面&#x…

[css] css常用的布局方式有哪些?

[css] css常用的布局方式有哪些&#xff1f; 1&#xff1a;圣杯布局 2&#xff1a;双飞翼 3&#xff1a;flex个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题