这是broker源码系列第一篇。还是和往常一样,建议copy到本地阅读
broker是基于netty的
rocketmq队列分物理队列和逻辑队列,物理队列只有一个而逻辑队列有很多个
rocketmq 物理队列,一个物理队列对应一个文件,一个物理队列可以对应多个逻辑队列
rocketmq 静态队列文档:https://www.jgaonet.com/mindoc/docs/rocketmq/statictopic-RocketMQ_Static_Topic_Logic_Queue_%E8%AE%BE%E8%AE%A1.md
客户端使用静态队列即用的是逻辑队列,然后broker根据mapping通过逻辑队列的id来获取实际物理队列的
rocketmq5所谓的一致性:
rocketmq5所谓的一致性指的是本地事务和发送消息这个操作的一致性,即本地事务与发送消息这个操作要么都成功要么都失败,不能本地事务成功提交,但是消息发送却失败了即导致当前流程多处理一个对象a,而后面的流程会漏掉这个消息即不会对a进行处理,也不能本地事务失败但消息却提交成功,这样会导致本地少处理一个对象a而后面的流程多处理了这个对象a,这一切都源于消息一旦发送就不能撤回,所以解决办法就是确定本地事务提交成功后,本流程发出的消息才对下游可见。!!!这个事务消息不是指生产者发送一条消息,然后消费者消费成功并ack后才算事务成功,而是生产者一旦成功把消息提交到broker,事务流程就算完了。如果流程1处理一下,然后发一条消息给rocketmq,然后流程2处理完该消息,这样才算一个完整流程的话,那么就是分布式事务的事情了,也就是说这里的本地事务和发送消息的一致性可以简单看做是整个分布式事务中的一环。rocketmq的解决办法就是2pc,只不过这个2pc是生产者和broker之间的即1:先发送prepare消息给broker,broker返回ack后再执行本地事务,本地事务执行成功后再发送commited/rollback消息给broker,如果是commited则broker也提交该消息,提交成功后返回ack给生产者,此时事务结束,就是说一旦提交,消息就变成普通消息,就对下游可见,如果是rollback,则直接删除,这样下游也肯定看不到。一旦提交,消息就变成普通消息,就对下游可见。存在一些场景:如果本地事务提交成功但是还没有发送消息,此时就断了,那么就会导致本地成功但broker不知道是否成功,所以broker在事务超时后就会回查即主动询问生产者,因为此时网络断了,所以肯定无法询问原生产者,然后官方文档说的是回查同组任一其他生产者,也就是说问你问不到,那broker就问组内其他生产者,因为生产者b要能查生产者a的事务执行结果,所以这里就要求同组内的所有的生产者都必须能访问全局数据即同组任一生产者都能访问同组任意其他生产者的事务执行结果,除非同组所有生产者都访问不到,不过如果所有生产者都访问不到那肯定是网络出了问题,那肯定是生产大事故。因为可能一直回查不到生产者导致超过最大重试次数,那么broker就会把该消息丢到指定的xxx队列,和死信队列一样,要开发者新建一个消费者去处理这个xxx队列下的所有消息
原理明白了,那么broker的处理流程不用看就大致知道是个怎么回事了,这个事务消息的实现肯定是要client sdk配合的
!!!rocketMQ读写队列
1:(本点摘抄于网络文章)读写队列,则是在做路由信息时使用。在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息。在物理文件层面,只有写队列才会创建文件。举个例子:写队列个数是8,设置的读队列个数是4.这个时候,会创建8个文件夹,代表0 1 2 3 4 5 6 7,但在消息消费时,路由信息只返回4,在具体拉取消息时,就只会消费0 1 2 3这4个队列中的消息,4 5 6 7中的信息压根就不会被消费。反过来,如果写队列个数是4,读队列个数是8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7所有的队列中消费,当然 4 5 6 7中压根就没有消息 ,假设消费group有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息。由此可见,只有readQueueNums>=writeQueueNums,程序才能正常进行。最佳实践是readQueueNums=writeQueueNums。那rocketmq为什么要区分读写队列呢?直接强制readQueueNums=writeQueueNums,不就没有问题了吗?rocketmq设置读写队列数的目的在于方便队列的缩容和扩容。思考一个问题,一个topic在每个broker上创建了128个队列,现在需要将队列缩容到64个,怎么做才能100%不会丢失消息,并且无需重启应用程序?最佳实践:先缩容写队列128->64,写队列由0 1 2 ......127缩至 0 1 2 ........63。等到64 65 66......127中的消息全部消费完后,再缩容读队列128->64.(同时缩容写队列和读队列可能会导致部分消息未被消费)2:(个人猜测,因为对rocketmq5还不太懂):如1中所述,队列实际只有一份,标号0~x,然后读队列k和写队列k,只要他们的k即queueid相同,那么就对应的是同一个底层的物理队列,说是物理队列,实际上也是一个逻辑队列,底层只有一个逻辑上的CommitLog大文件,不管是读还是写队列,都是用的这个逻辑文件,然后这个CommitLog逻辑文件实际上会被分成多个小的offset连续的MappedFile物理文件
存疑:topic.unitMode是什么
1:broker流程
1.1:broker流程骨架
NettyRemotingServer.NettyServerHandler.channelRead0 NettyRemotingAbstract.processMessageReceived switch (msg.getType()) { #首先判断收到的是请求还是响应case REQUEST_COMMAND:NettyRemotingAbstract.processRequestCommand(ctx, msg);Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque) #创建一个task,后续会丢到线程池异步执行return () -> { RemotingHelper.parseChannelRemoteAddr(ctx.channel()); #首先解析地址doBeforeRpcHooks(remoteAddr, cmd); #然后执行before rpchook,不过默认是0个this.requestPipeline.execute(ctx, cmd); #然后执行请求pipeline,这里是authenticationPipeline#注意,这里不是处理请求的地方AuthenticationPipeline.executeresponse = pair.getObject1().processRequest(ctx, cmd); #!!!这里负责处理请求,这是一个多态#不同的请求对应不同的类1:pull消费者对应的处理函数 #1:pull消费者对应的处理函数PullMessageProcessor.processRequest......2:生产者发来消息对应的处理函数 #2:生产者发来消息对应的处理函数SendMessageProcessor.processRequest......3:admin相关命令对应的处理函数 #3:admin相关命令对应的处理函数AdminBrokerProcessor.processRequest switch (request.getCode()) {case RequestCode.UPDATE_AND_CREATE_TOPIC:return this.updateAndCreateTopic(ctx, request);case RequestCode.UPDATE_AND_CREATE_TOPIC_LIST:....case ...doAfterRpcHooks(remoteAddr, cmd, response); #然后执行after rpchook,不过默认是0个writeResponse(ctx.channel(), cmd, response);break;case RESPONSE_COMMAND:NettyRemotingAbstract.processResponseCommand(ctx, msg);break;
1:生产者事务或非事务单条消息发送流程
SendMessageProcess.processRequestcase RequestCode.CONSUMER_SEND_MSG_BACK: #如果这个消息是消费者消费失败而发送回来的消息AbstractSendMessageProcessor.consumerSendMsgBack #!!!也就是说sendMessage不仅仅用于生产者...暂时略...default:TopicQueueMappingManager.buildTopicQueueMappingContext #构建mappingContext,#这个ctx包含了静态队列即新增的逻辑队列的信息if requestHeader.getLo() ==False: #如果该topic没有使用逻辑队列则置空该ctx的指定字段return new TopicQueueMappingContext(topic,null,null,null,null) #许多字段都设置为空表示没有使用逻辑队列#后面流程的代码会检测这个ctx的这些字段mappingDetail = TopicQueueMappingManager.getTopicQueueMapping(topic)#尝试获取该topic对应的逻辑队列和物理队列的映射信息#即这个ctx保存的是这个topic对应的逻辑队列的所有信息if mappingDetail == null: #如果没有找到则说明没有使用静态队列return new TopicQueueMappingContext(topic, null, null, null, null)#许多字段都设置为空表示没有使用逻辑队列TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId) #根据请求中的globalId即逻辑队列的id获取对应的信息return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem);#构建并返回ctx#此条件下返回对的ctx包含了静态队列的相关信息#!!!1-静态队列:逻辑队列映射到物理队列#!!!即用户访问逻辑队列,然后broker上会把逻辑队列映射到实际的物理队列#!!!也就是说broker这里先判断这个请求是不是用了逻辑队列#!!!如果没用就返回ctx1,如果用了就查找逻辑队列和物理队列的映射信息#!!!然后返回ctx2。#!!!所以下一步就是检测返回的ctx的指定字段,如果用了逻辑队列#!!!则需要把请求中的队列id重写为实际的物理队列的id#!!!也就是说用户请求中的id用户以为是物理队列的id#!!!实际上这个id是逻辑队列的id,broker会自动转换成实际的物理队里的id#!!!这个静态逻辑队列是rocketmq5新增的流处理方面的,略#!!!2-笔记:不管用没用,反正后面的代码都是通过这个ctx来获取信息#!!!所以可以用一个ctx来实现多种场景,一套代码,即代码更通用TopicQueueMappingManager.rewriteRequestForStaticTopic #rewrite操作就是检测ctx判断是否用了逻辑队列#如果用了,就把请求的队列id重写为该逻辑队列对应的实际的物理队列的id if mappingContext.getMappingDetail() == null: #如果ctx.mappingDetail为空则表示没用逻辑队列return null #此时返回null表okrequestHeader.setQueueId(mappingItem.getQueueId()) #getQueueId表示实际的物理队列的id#这里就用实际的物理队列的id替换掉请求中的逻辑队列idreturn null #return null 表okSendMessageProcessor.buildMsgContext #这里根据前面的信息来构建sendmsg使用的msgCtxSendMessageProcessor.executeSendMessageHookBefore #执行send前的hook操作#开始sendif requestHeader.isBatch(): #如果是批量发送SendMessageProcess.sendBatchMessage #则调用sendBatchMessageAbstractSendMessageProcessor.executeSendMessageHookAfter #处理afterHook消息else : #反之则是发送单条消息SendMessageProcessor.sendMessage #调用sendMessageSendMessageProcessor.preSend #preSend:1:创建response对象;2:检查请求RemotingCommand.setXXX #设置response对象的各种字段......AbstractSendMessageProcessor.msgCheck #检查请求PermName.isWriteable #1:检查broker是否可写TopicValidator.validateTopic #2:校验topic是否有效#1:topic名字不能为空;2:topic名不能含有非法字符;3:topic名长度不能超过限制TopicValidator.isNotAllowedSendTopic #3:检查topic是否可以推送消息(通过检查该topic是否在黑名单中)TopicConfigManager.selectTopicConfig #通过topic检查了则获取topic的信息,如果不存在则创建if null == topicConfig: #如果不存在TopicConfigManager.createTopicInSendMessageMethod #则创建topic...topic创建过程待补充...queueIdInt = requestHeader.getQueueId(); #检查请求中的queueId即物理队列id是否有效idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());#检查最大且有效地队列idif queueIdInt >= idValid: #这个队列id不能超过读或者写队列的最大idreturn failed #后面读写队列时肯定会检测队列是否可读可写,所以不用担心#要开始处理请求了SendMessageRequestHeader.getQueueId #获取queueId,不管是否用的是逻辑队列#经过前面的处理,到此处queueid必定是物理队列idif queueIdInt < 0:queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()) #如果queueId为-1表示随便放到一个队列就行#???我猜有些逻辑比如顺序消息/分组消息#应该就是通过控制queueId来实现的#这个逻辑应该是放在客户端sdk中,client先获取所有队列信息#然后再根据需要来确定把消息放到哪个队列#broker只需要根据消息中指定的queueid来执行就好了MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); #构建内部使用的消息,就是说我们存的时候使用的消息不是请求中的消息#请求中的消息只是broker内部使用的MessageExtBrokerInner消息的一个字段#就是后续流程中使用的就是inner消息了而不是最初的请求消息了msgInner.setTopic(requestHeader.getTopic()); #设置inner消息的topicmsgInner.setQueueId(queueIdInt); #设置inner消息的queueid字段...省去一系列msgInner.setXXX...SendMessageProcessorhandleRetryAndDLQ #处理重试和死信队列if null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX): #如果这是重试topicif reconsumeTimes > maxReconsumeTimes || #如果重试次数超过了阈值 sendRetryMessageToDeadLetterQueueDirectly #或者配置了直接发送到死信队列 {newTopic = MixAll.getDLQTopic(groupName); #先获取消费者组对应的死信队列topicqueueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP); #为该消息创建一个死信idmsg.setTopic(newTopic); #然后就把当前请求的目的topic重置为死信队列的topic#意思就是本次消息发送请求还是会继续写,但是会丢到死信队列#!!!sendMessage流程不仅仅用于生产者生产消息#!!!还用于消费者消费失败消息投递回broker#!!!即SendMessage就是SendMessage的功能#!!!不管调用者是谁msg.setQueueId(queueIdInt);}if Boolean.parseBoolean(traFlag) && #如果是事务消息!(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0) #并且xxx,这两个条件没看懂为啥{ if BrokerConfig.isRejectTransactionMessage: #如果我们配置broker拒绝事务消息return #那么直接返回表示拒绝本次消息sendTransactionPrepareMessage = true; #反之设置标记,标记本次是事务消息}else:sendTransactionPrepareMessage = false;if BrokerConfig.isAsyncSendEnable() #如果是配置文件中设置的是异步master{ #即本次消息无需等待slave同步if sendTransactionPrepareMessage: #并且如果是事务消息TransactionalMessageServiceImpl.asyncPrepareMessage #则异步执行事务消息第一阶段:prepare#!!!事务消息的流程只是简单修改一下topic#!!!然后当成普通消息来调用asyncPutMessage来处理#!!!即asyncPutMessage是sendMessage最核心的流程#!!!不管同步还是异步,事务还是非事务#!!!最终都会来到asyncPutMessage/asyncPutBatchMessageTransactionMessageBridge.asyncPutHalfMessage #修改消息然后putmessageTransactionMessageBridge.parseHalfMessageInner #修消息的各种属性topic=TransactionalMessageUtil.buildHalfTopic() #事务半消息topic名字是固定的,为RMQ_SYS_TRANS_HALF_TOPICMessageExtBrokerInner.setTopic(topic); #!!!最核心的就是修改消息的目的topic名字MessageExtBrokerInner.setQueueId(0); #修改queueidMessageExtBrokerInner.setXXX #还会新增一大堆属性,略DefaultMessageStore.asyncPutMessage #修改了消息的topic后就可以把该消息当做普通的消息来走asyncPutMessage流程......-- -- - > commit or rollback#当本地事务执行完毕后就会返回消息给broker,不过发的是EndTransaction消息而不是sendMessage消息EndTransactionProcessor.processRequest if MessageSysFlag.TRANSACTION_COMMIT_TYPE: #如果是结果是commit#就是把消息写入commitLogTransactionalMessageServiceImpl.commitMessage #从commitLog文件中先取出之前保存的prepare状态的halfMsg#!!!任何topic下的消息都是写入同一个commitLog文件中TransactionalMessageServiceImpl.getHalfMessageByOffset(commitLogOffset) #根据offset直接从commitLog文件获取halfMsgEndTransactionProcessor. rejectCommitOrRollback #检查消息是否超时#就是说不能发送halfMsg开始事务后隔很久才提交EndTransactionProcessor.checkPrepareMessage #检查prepare消息的状态就是有可能prepareMsg#和endTransactionMsg的各种信息匹配不上EndTransactionProcessor.endMessageTransaction #如果一切ok,那么就从从halfMsg中还原原消息MessageExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)) #很简单,就是把topic改回原来的真实的topicEndTransactionProcessor.sendFinalMessage #就是把还原的消息走一遍putMessage流程来保存到commitLog文件DefaultMessageStore.putMessage DefaultMessageStore.waitForPutResult(DefaultMessageStore.asyncPutMessage(msg)#最终还是来到asyncPutMessage#只不过这里会等待直到这个异步操作完成#!!!此时事务消息就像一个普通消息一样了#!!!一旦写入,就对消费者可见TransactionMessageServiceImpl.deletePrepareMessage #put完成后从halfMsg Topic中删除该消息#当然不是物理删除,而是写入一条commit消息#因为消息一旦写入commitLog就是不可变的#和commit相比没有put操作,因为rollback就代表取消TransactionMessageServiceImpl.getOpMessage #创建一条op消息topic=RMQ_SYS_TRANS_OP_HALF_TOPIC #这条消息会发往指定的OP_HALF_TOPICnew Message(topic,TransactionalMessageUtil.REMOVE_TAG) #这条op会打上REMOVE_TAG#因为代表对应的halfMsg可以删除了因为事务已完成TransactionalMessageBridge.writeOp #调用putMessage把刚创建的OP消息写入OP_HALF_TOPIC#任何消息的写入操作最终都会调用asyncPutMessage函数else if MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: #如果是rollback,则删除halfMsgTransactionMessageServiceImpl.rollbackMessage #就去根据offset直接从commitLog中取出该消息EndTransactionProcessor. rejectCommitOrRollback #检查消息是否超时#就是说不能发送halfMsg开始事务后隔很久才提交EndTransactionProcessor.checkPrepareMessage #检查prepare消息的状态就是有可能prepareMsg#和endTransactionMsg的各种信息匹配不上TransactionMessageServiceImpl.deletePrepareMessage #直接从halfMsg Topic中删除该消息......-- -- - > check#因为事务可能超时,所以会有一个定时线程不断扫描halfMsg Topic下的所有消息#如果超过指定时间,则回查生产者,如果达到最大回查次数,则丢到一个类似死信队列的队列里去TransactionalMessageServiceImpl.run #一个单独的check线程while !this.isStopped: #每隔一段时间就检查一下所有未完成的事务,默认30sBrokerConfig.getTransactionCheckInterval TransactionalMessageServiceImpl.waitForRunning #执行wait和checkTransactionalMessageServiceImpl.onWaitEnd #在wait结束的后执行check操作TransactionalMessageServiceImpl.check #执行check操作topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC #halfMsg对应的topic就是这个topicTransactionalMessageBridge.fetchMessageQueues #获取这个topic下的所有队列#!!!queueid相同的读写队列是同一个队列#因为所有topic的消息都存在同一个commitLog文件中#所以commitLog中同topic的消息不是连续存放的#所以肯定不能直接遍历commitLog文件#因为每个topic的消费队列中保存的肯定是这个topic的消息#所以直接遍历该topic的所有队列里的消息即可#!!!就是像消费者一样消费队列里的消息#!!!消费完就后就不应该再可见for MessageQueue messageQueue : msgQueues: #遍历halfmsg topic下的所有消费队列opQueue = getOpQueue(messageQueue) #获取该halfMsg Queue对应的opQueue#即一条halfMsg代表2pc中的一条prepare日志#opQueue中保存的消息代表的是某个事务已经完成#即一条opMsg代表2pc中的一条commited或rollback日志halfOffset=TransactionalMessageBridge.fetchConsumeOffset(messageQueue)#获取halfQueue的consumeOffsetopOffset=TransactionalMessageBridge.fetchConsumeOffset(opQueue); #获取opQueue的consumeOffsetTransactionalMessageServiceImpl.fillOpRemoveMap#读取opQueue,来标记哪些事务已经完成了TransactionalMessageServiceImpl.pullOpMsg #从op队列消费消息 #???暂不清楚这个操作是否会更新consumeOffset#???因为下面有一个手动更新offset的方法#会把已经完成的事务消息放到一个removeMap中#空消息则直接放到doneOpOffset#因为空消息直接算已经处理完毕while true: #遍历该队列的所有消息if System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT:break #限制单个队列的遍历时间为1分钟if removeMap.containsKey(i): #如果这个halfMsg对应的offset在removeMap中#removeMap:key=halfOffset,value=opOffset#那么就表明这个消息已经commit/rolllbackdoneOpOffset.add #把对应的opMsg放到doneOpOffset队列表示已完成无需进行checkelse:TransactionalMessageServiceImpl.getHalfMsg(messageQueue, i)#根据offset从halfMsgQueue中获取halfMsgTransactionalMessageServiceImpl.pullHalfMsg #pull方式从halfMsg Queue中取出消息#???暂不知道是否会更新offset#???看代码好像不会if TransactionalMessageServiceImpl.needDiscard || #如果消息重试的次数超过了阈值则返回true#如果没有,则msg的重试次数+1TransactionalMessageServiceImpl.needSkip #如果halfmsg所在的文件已经expire了#默认72h即消息过期了{DefaultTransactionalMessageCheckListener.resolveDiscardMsg #那么就把该消息丢到类死信队列DefaultTransactionalMessageCheckListener.toMessageExtBrokerInner #修改msg的目的topic TopicConfigManager.createTopicOfTranCheckMaxTime RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC #目的topic为RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC MessageExt.setTopic(topic) #设置消息的目的topicDefaultMessageStore.putMessage #此时把消息当做普通消息一样丢到类死信队列#后面就是asyncPutMessage流程了,略}if msgExt.getStoreTimestamp() >= startTime:#如果消息的存储时间晚于check操作的开始时间#表明这些消息是在check开启之后才存储的,很新#所以无需check,因为queue中消息是递增的#所以这个消息之后的消息都比当前消息新,#所以后面的消息更加不需要检查,所以直接breakbreakvalueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();#消息生存时间=now-消息产生时间checkImmunityTime = transactionTimeout #checkImmunityTime表示消息冷却时间#即生存时间<冷却时间的消息都可以跳过检查#因为该事务消息才开始,没必要检查checkImmunityTimeStr = msgExt.getUserProperty(CHECK_IMMUNITY_TIME_IN_SECONDS)if null != checkImmunityTimeStr: #timeStr不为空表示我们设置了该消息的冷却时间checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout)if -1 == tiemstr: #如果timestr!=-1则表明设置了有效冷却时间checkImmunityTime=transactionTimeout #则用我们配置的冷却时间else checkImmunityTime *= 1000 #否则直接用事务超时时间来表示冷却时间,默认是6s#即如果事务没有超时就不会checkif valueOfCurrentMinusBorn < checkImmunityTime: #如果消息还没达到检查时间ok=checkPrepareQueueOffset #检查这条免检消息是否在removemap中#在则说明该消息已经完成了,可以跳过,返回true#否则需要把该消息重新写到halfMsgQueue的末尾#如果消息追加到halfMsg Topic末尾失败#则返回false表示不可以跳过检查需要继续往下走#然后在check实施环节写入halfMsgQueue中property=msgExt.getUserProperty(PREPARED_QUEUE_OFFSET) #获取immu halfMsg的字段if property==null: #如果为空表明这是第一次遇到这个消息TransactionalMessageServiceImpl.putImmunityMsgBackToHalfQueue #那么直接put immu消息,put的时候会设置这个属性字段 #如果put成功就可以跳过本次check否则本次必须checkelse: #msg中取出的属性值不为null则表明不是第一次处理这个消息if -1 == prepareQueueOffset: #但是属性值为-1即无效值return false #则直接返回false表示不可以跳过else:if removeMap.containsKey(prepareQueueOffset): #如果在removeMap中则表示这个消息虽然处于免检状态#但是已经commited/rollback即事务消息已完成return true #那么就可以跳过即返回trueelse:TransactionalMessageServiceImpl.putImmunityMsgBackToHalfQueue #否则要put immu halfMsg TransactionalMessageBridge.renewImmunityHalfMessageInner #设置该属性字段TransactionalMessageBridge.renewHalfMessageInner #copy一份旧消息 msgInner.setWaitStoreMsgOK(false) #肯定是异步写入因为同步效率肯定很低msgExt.getUserProperty(PREPARED_QUEUE_OFFSET) #看消息是否已经设置了该属性值if null != queueOffsetFromPrepare: #如果有 MessageAccessor.putProperty(queueOffsetFromPrepare) #则还是设置原值else: #如果没有 MessageAccessor.putProperty(msgExt.getQueueOffset()) #则从原消息中获取#???没搞懂这个字段干嘛用的#???也没搞懂这个逻辑,他都有了还设置干嘛 DefaultMessageStore.putMessage #renew之后就是调用put把消息写回halfmsg topicif ok: #如果checkPrepareQueueOffset ok则跳过这条消息newOffset = i + 1 #更新索引为下一条消息 i++continue #直接返回else: #如果消息没有配置checkImmunityTimestr#那么就默认使用的是事务超时时间if 0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) #如果消息还没达到检查冷却时间,说明消息过新 {break; #那么这条消息以及这条消息之后的消息都直接跳过#即结束本队列的本次check之旅 }boolean isNeedCheck = (opMsg == null && #如果op列表为空 valueOfCurrentMinusBorn > checkImmunityTime) || #并且该halfMsg超过了检查冷却时间(opMsg != null && #或者对应的opmsg队列不为空opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout) ||#???并且最后一条op消息的时间超过了事务超时时间(valueOfCurrentMinusBorn <= -1) #或者消息的生存时间小于等于-1#那么该事务消息就需要回查生产者if isNeedCheck: #如果需要回查TransactionalMessageServiceImpl.putBackHalfMsgQueue #把消息重新写回halfMsg队列末尾TransactionalMessageBridge.renewHalfMessageInner TransactionalMessageBridge.putMessageReturnResult DefaultMessageStore.putMessage AbstractTransactionalMessageCheckListener.resolveHalfMsg #回查生产者事务状态thread.run{ #丢到异步线程去发送AbstractTransactionalMessageCheckListener.sendCheckMessageBroker2Client.checkProducerTransactionState #发送回查消息给client}else:nextOpOffset= pullResult.getNextBeginOffset() #更新变量来开始下一次循环 pullResult = fillOpRemoveMap #end while if newOffset != halfOffset: #如果halfMsg queue消费了消息TransactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset) #则更新halfMsgQueue的offsetConsumerOffsetManager.commitOffset #提交consumer的commitOffsetlong newOpOffset = calculateOpOffset(doneOpOffset, opOffset) #更新 if newOpOffset != opOffset:transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset) #end for即本队列已结束开始下一个队列#!!!因为消费流程还没看所以上面的回查逻辑可能还有点小错和迷糊,但八九不离十了#!!!但八九不离十了,这里总结一下大概检查逻辑:#!!!halfMsg queue和opQueue的offset都是递增的#!!!如果该事务消息已经完成则不需要检查,#!!!可以不作任何处理,即跳过check#!!!如果该事务消息还没有完成且还没达到检查时间#!!!因为queue中的消息时间是递增的,#!!!所以后面的消息都可以跳过检查#!!!也就是说该队列这个点之后的消息都留到下次循环再去处理#!!!所以这个队列以及队列之后的消息无需再次写到halfmsg queue#!!!直接从原位置开始再次检查就行#!!!如果某个消息还没有完成且本次不需要check#!!!因为halfMsg queue的offset是只增不减的#!!!所以再次把该消息追加到halfMsg queue的末尾#!!!然后继续检查该消息对应的queue的下一条消息#!!!这样当offset增加后还是可以处理该消息的#!!!总之核心就是offset只能增不能减#!!!核心就是不能回过头去读当前offset之前的数据#20241121 22:42 又是加班攒调休的一天,今天好冷,还差一点就能捋清了,#但是不想弄了,太伤神了,专注力已经无了else:DefaultMessageStore.asyncPutMessage #如果不是事务消息则异步写入存储,默认是DefaultMessageStore#!!!一个broker只有一个MessageStore对象for all hook in putMessageHookList : #执行before putMsg hookputMessageHook.executeBeforePutMessage(msg); CommitLog.asyncPutMessage #执行写入操作,即提交commitLog,commitLog包含完整消息#!!!一个broker只有一个MessageStore对象,#!!!一个MessageStore对象只有一个CommitLog对象#!!!也就是说不管有多少个队列,都是用的同一个CommitLog#!!!所以这里没有根据queueid来选择对应的commitLog#!!!因为一个broker只有一个commitlog#!!!一个CommitLog对象代表一个大的逻辑文件CommitLog#!!!在这个逻辑CommitLog文件中offset是连续的从0开始的#!!!这个逻辑CommitLog文件实际上是由多个小的mappedFile组成#!!!这些mappedFile有两个偏移,起始偏移和文件内偏移#!!!起始偏移表示mappedFile的第一个字节在整个逻辑CommitLog文件中的位移#!!!文件内偏移就是当前mappedFile文件的写入位置即文件内偏移#!!!这些MappedFile的[起始offset,endOffset]是连续的#!!!比如第一个是[0,x],那么第二个就是[x+1,y]...[n+1,z]#!!!也就是说物理读写队列实际上都是逻辑上的#!!!底层只有一个逻辑CommitLog文件#!!!还有,静态队列的逻辑队列则是构建在逻辑读写队列之上的逻辑读写队列if !MessageConfig.isDuplicationEnable(): #如果没有开启消息重复,则会给每条消息添加一个时间戳以区分唯一和有序#这个开关默认是关闭的,即默认是会加时间戳的MessageExt.setStoreTimestamp(System.currentTimeMillis())#设置消息的时间戳MessageExt.setBodyCRC #设置消息的crcMessageExt.setVersion #设置消息的版本MessageExt.setxxxx #还会设置消息的一系列字段,略ThreadLocal<PutMessageThreadLocal>.get #获取put操作相对应的配置,比如encoder、builderString topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg) #!!!topicQueueKey=TopicName-QueueIdkeyBuilder.append(messageExt.getTopic());keyBuilder.append('-');keyBuilder.append(messageExt.getQueueId());MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile() #获取最后一个内存映射文件(前面的都写满了)#一个brokerif mappedFile == null: #如果为空则表示集群还没有存储过消息currOffset = 0; #这里先设置offset=0即消息的存储索引是从0开始else:currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition() #否则offset=文件起始偏移+文件内偏移,#即内容会追加到该文件末尾#即offset表示消息的全局offset#这个offset会跟着消息一同存储到磁盘needAckNums = MessageStoreConfig.getInSyncReplicas() #needAckNums表示需要几个副本写入成功才算成功 #InSyncReplcias表示处于In-sync状态的副本数即健康的副本数#默认为1,因为master也算他本身的一个副本,#就是说至少要本身写成功才算成功,当然,这是废话#如果master是ASYNC_MASTER或者#enableControllerMode和ackAckInSyncStateSet这两个都开启的时候#In-sync replicas将被忽略,也就是说写完master就直接返回了#即使in-sync>1也不会去写副本#这里说的副本、slave是同一个意思ha = needHandleHA(msg) #是否需要处理HAif !messageExt.isWaitStoreMsgOK || #如果该消息配置为不需要等待storeok即写入完成MessageStoreConfig.isDuplicationEnable || #???(不太懂)或者开启了DuplicationEnable开关(不过默认是关的)BrokerRole.SYNC_MASTER != MessageStoreConfig.getBrokerRole #或者master类型不为SYNC即master为async即不需要同步{return false; #那么就不需要ha,反之则需要}return trueif needHandleHA && BrokerConfig.isEnableControllerMode: #如果需要ha并且开启了controller模式#controller模式下会自动设置节点的角色无需人工干预num=DefaultHAService.inSyncReplicasNums(currOffset) #获取健康的副本数即commitlogIndex等于master的commitlogIndex的副本数insyncNums=1 #从1开始,因为master本身也算一个健康的replicafor HAConnection conn : this.connectionList: #遍历集群所有slave连接isOk=this.isInSyncSlave(currOffset, conn) #判断该节点是否okif currOffset - conn.getSlaveAckOffset() < MessageStoreConfig.getHaMaxGapNotInSync():return true; #如果当前master节点的offset-slave的offset小于配置的阈值#那么就认为slave是in-syncreturn false; #否则就认为该slave是非sync的inSyncNums++;}if num < MessageStoreConfig.getMinInSyncReplicas: #如果haservice中显示的处于in-sync状态即健康的副本数return failed #小于要求的最小副本数,那么就返回失败if MessageStoreConfig.isAllAckInSyncStateSet: #如果配置了要求所有副本都ack才算成功(不过默认是关闭的)needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET #那么就更新needAckNums-1表示要求所有节点都ack#注意,这里是增大needAckNums,因为此时needAckNums只能增不能减#因为条件只能更严格,不能更松,如果更松就达不到ha要求了else if needHandleHA && BrokerConfig.isEnableSlaveActingMaster #如果开启了ha但是没有开启controller模式但是开启了slaveActMaster{ #不太懂,反正大意及随后master挂了但是slave可以临时充当master#完成只有master才能完成的部分功能 inSyncReplicas = Math.min(DefaultMessageStore.getAliveReplicaNumInGroup,DefaultHAService.inSyncReplicasNums)#min(存活的副本数,inSync的副本数)needAckNums = CommitLog.calcNeedAckNums needAckNums = MessageStoreConfig.getInSyncReplicas()#获取配置的要求多少个副本数才算成功#注意:MessageStoreConfig.getInsyncReplicas表示#要求这么多个副本同步成功才算成功#而DefaultHAService.inSyncReplicasNums表示处于in-sync的副本数if MessageStoreConfig.isEnableAutoInSyncReplicas: #如果配置了允许动态调整needAckNums即允许缩小needAckNums#即我配置了要求3个副本成功,但是我打开了这个autoInSync开关#那么我可以根据实际情况调小needAckNums,即我目前即使只有1个健康副本#我也能通过调小needAckNums=2来使得本次请求仍然满足一致性要求的副本数#从而本次请求可以继续往下执行#!!!但是needAckNums不能小于要求的最小副本数#!!!就是说needAckNums必须大于等于minInsync(这个参数可配置)needAckNums = Math.min(needAckNums, inSyncReplicas); #看是否需要调小needAckNums,因为我最多只有这么多个健康的副本needAckNums = Math.max(needAckNums,MessageStoreConfig.getMinInSyncReplicas()); #needAckNums必须大于配置的最少副本数}if needAckNums > inSyncReplicas: #经过调整好needAckNums代表最终要求的副本数#如果要求的副本数比可用的副本数还多,那么本次请求就无法满足副本数要求return failed #所以返回fail表示本次sendmsg操作失败TopicQueueLock.lock(topicQueueKey) #锁住指定的队列,队列key为 topicName-queueId#???是不是会同时锁住生产者消费者队列#???因为貌似sendMsg时会处理消费者队列的逻辑偏移needAssignOffset = true; #needAssignOffset表示是否要给消息赋一个逻辑位移#会同时存储消息的物理位移和逻辑位移#物理位移表示该消息在commitLog中的全局存储位移(实际也是一个逻辑位移) #消息存储后会被分配给某个消费队列,然后这个消息在这个消费队列中的位移#就叫做逻辑位移if MessageStoreConfig.isDuplicationEnable && #如果开启了Duplication(不过默认是关闭的)MessageStoreConfig.getBrokerRole != BrokerRole.SLAVE #并且broker是master{ needAssignOffset = false #那么此时就不需要给消息赋逻辑位移。#???不懂,为什么此时就不需要}if needAssignOffset: #如果要给消息赋值逻辑位移DefaultMessageStore.assignOffset #那么就设置消息在消费队列中的offset即逻辑位移tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()) #获取消息的标志if tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || #如果不是事务消息tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) #或者该事物消息已经提交即已经commit { #事务消息已提交就表示该消息可以被正常消费了AbstractConsumeQueueStore.assignQueueOffset(msg) #给消息赋值逻辑位移ConsumeQueueStore.findOrCreateConsumeQueue #先获取消息会被分配给哪个消费队列,#这是个多态方法,这里为ConsumerQueueStoreConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic); #根据topic获取所有的队列信息ConsumeQueueInterface logic = map.get(msg.getQueueId()); #!!!sendMsg请求中的queueId是读队列id也是写队列id #!!!所以这里直接根据请求消息中的queueid获取对应消费队列对象if logic !=null: #如果找到了,就直接返回return logic #logic这个变量名也可以表明消费队列实际也是一个逻辑队列else: #没有找到则表明这个队列一直没人消费过,所以创建新的#新的消费队列对象的offset会从0开始...创建ConsumrQueue对象,略... #!!!一个consuemrQueue对应一个文件#!!!这个文件用来存放分配给这个consuermQueue的msg的offset#!!!消费消息的时候先从consumerQueue对应的文件中读取offset#!!!然后再拿着这个offset去commitLog中读#!!!也就是说顺序写随机读return newLogic......msg.setQueueOffset(consumerQueue.getQueueOffset) #设置消息的queueOffset即消息在指定消费队列中的逻辑位移}PutMessageLock.lock() #PutMessage锁if !MessageStoreConfig.isDuplicationEnable: #如果关闭了消息重复,则给每个消息加上一个时间戳(默认是关闭的)msg.setStoreTimestamp(beginLockTimestamp) #pustMessage函数开头设置过一次,这里又设置一次,即更新为当前时间DefaultMappedFile.appendMessage #写入内存映射文件switch (result.getStatus()) {case PUT_OK: #如果提交到commitlog文件okCommitLog.onCommitLogAppend #就执行提交完毕后对应的钩子函数,默认是空操作break;case END_OF_FILE: #如果文件满了CommitLog.onCommitLogAppend #先执行提交完毕这个动作对应的钩子函数,默认是空操作CommitLog.mappedFileQueue.getLastMappedFile #然后创建一个新的mappedfileDefaultMappedFile.appendMessage #然后在此提交到新的commitlog文件#!!!此时还只是写到内存缓冲区CommitLog.onCommitLogAppend #执行提交完毕这个动作对应的钩子函数,默认是空操作PutMessageLock.unlock() #解锁PutMessageif OK:DefaultMessageStore.increaseOffset #更新指定消费队列的queueOffset,因为消息已经成功写入磁盘了#而该消息会被分配给某个指定的队列x,也就是说队列x新增了消息#所以更新消费队列x的queueOffsetif tranType == MessageSysFlag.TRANSACTION_NOT_TYPE ||tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {ConsumeQueueStore.increaseQueueOffset(msg, messageNum); #更新指定ConsumerQueue的queuOffsetfindOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());#根据queueId获取对应的队列ConsumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, messageNum); #更新该队列的queueOffset#不知道是不是会持久化到磁盘}TopicQueueLock.unlock(topicQueueKey) #解锁队列CommitLog.handleDiskFlushAndHA #刷盘和haCommitLog.handleDiskFlush #刷盘,数据已经写入,只是还没有强制刷盘而已CommitLog.DefaultFlushManager.handleDiskFlush #根据配置文件来决定是同步刷盘还是异步刷盘...刷盘流程留到下一个大篇幅去弄了,这里暂时跳过了...if !ha: #如果不需要处理ha即没有开启ha则直接返回okreturn ok CommitLog.handleHA #处理HAif needAckNums<=1: #如果只需要1个副本ackreturn OK #直接返回ok,因为master broker本身也算一个nextOffset = result.getWroteOffset() + result.getWroteBytes() #否则需要同步,这里设置偏移量,即下一个写入位置GroupCommitRequest request = new GroupCommitRequest #构造同步请求DefaultHAService.putRequest(request); #丢给haservice线程去异步处理haDefaultHAService.getWaitNotifyObject().wakeupAll() #唤醒沉睡的haservice线程#如果所有haservice都正在运行没有sleep #那么就什么也不做#haservice是一个类似事件循环的线程#如果没事做就wait了,直到被唤醒...ha相关流程留到下一个大篇幅去弄了,这里暂时跳过了... #rocketmq5中线程a把新消息写入commitLog后#会有主从同步线程b来异步把新增的数据同步给slave#所以ha线程c的逻辑就是不断询问slave,同步到哪了#一旦某个slave返回的ackOffset>nextOffset#就知道该slave ok了,所以ackNum++#直到ackNum>needAckNum即达到quorum个slave ok就可以了#就会完成futureSendMessageProcessor.handlePutMessageResult #处理写入结果switch putMessageResult.getPutMessageStatus() case XX:sendOk=true......case YY:sendOk=falseif sendOK: #如果写入成功BrokerStatsManager.incXXXX #那么就更新broker的status就是broker的状态 SendMessageStore.rewriteResponseForStaticTopic #处理静态topic,流场景用的,暂不了解,略SendMessageStore.doResponseAbstractSendMessageProcessor.executeSendMessageHookAfter #处理afterHook消息}else{ #如果是同步写入if sendTransactionPrepareMessage: #并且如果是事务消息TransactionalMessageServiceImpl.prepareMessage #同步执行事务消息第一阶段:prepareelse:DefaultMessageStore.putMessage #如果不是事务消息则同步写入存储#getMesageStore有多个实现类,默认是DefaultMessageStoreDefaultMessageStore.waitForPutResult(DefaultMessageStore.asyncPutMessage(msg)) #同步写入底层也是调用异步写入,#只不过同步写入的时候会等待异步写入返回的future完成...下面就是异步写入的逻辑了,上面已叙述,故此处略...SendMessageProcessor.handlePutMessageResult #处理写入结果AbstractSendMessageProcessor.executeSendMessageHookAfter #处理afterHook消息}
2:handleHA流程
2.1:提交任务到haservice(上面已叙述)
CommitLog.asyncPutMessageCommitLog.handleDiskFlushAndHACommitLog.handleHAHAService haService = this.defaultMessageStore.getHaService(); #获取haservicelong nextOffset = result.getWroteOffset() + result.getWroteBytes();#获取要同步的偏移量,即下一个写入位置#!!!大意就是另一个主从同步线程不断#!!!会把master的commitLog中新增加的内容同步给slave#!!!当slave同步到nextOffset或者超过nextOffset时#!!!就表示本消息已同步,检测到quorum个slaveok ha就返回成功GroupCommitRequest request = new GroupCommitRequest(nextOffset,DefaultMessageStoreConfig.getSlaveTimeout(), needAckNums);#构建同步请求DefaultHAService.putRequest #把同步请求丢到haservice去异步执行GroupTransferService.putRequest #丢到ha的工作队列lock.lock()this.requestsWrite.add(request) #丢到write队列,这个write队列专门用来存放新增请求#他有两个队列,write和read,工作线程会从read线程取数据#而新数据则是会追加到write队列#然后工作队列空了就会来一个swap即交换write队列和read队列变量的引用#这样两个队列一个优点就是可以避免读写操作互锁lock.unlockDefaultHAService.getWaitNotifyObject().wakeupAll(); #如果haservice工作线程在sleep那么就唤醒,如果是running则什么也不做2.2:haservice启动和运行
BrokerStartup.mainBrokerStartup.startBrokerContorller.startBrokerController.startBasicServiceDefaultMessageStore.startDefaultHAService.startGroupTransferService.start......GroupTransferService.runwhile (!this.isStopped()) {GroupTransferService.waitForRunning(10); #没事干就会等待10s,有事干就会被提前唤醒GroupTransferService.onWaitEnd #等待结束的时候GroupTransferService.swapRequests #交换read/write两个队列tmp = this.requestsWritethis.requestsWrite = this.requestsReadthis.requestsRead = tmpGroupTransferService.doWaitTransfer() #前面handlHA函数里把同步请求丢到了GroupTransferService的write队列#所以doWaitTransfer会交换read/write队列,然后处理read队列中的请求#对于每个请求,就是不断询问slave同步到哪里了,#一旦超过nextOffset,slave ok 就+1知道达到quorum个节点,就完成本次请求for req : this.requestsRead #遍历read队列的所有请求for i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++: #对于每个请求,只要没有超时或者ok,就一直询问if i > 0: #如果不是第一次 GroupTransferService..waitForRunning(1) #就等一秒再开始下次询问if !allAckInSyncStateSet && req.getAckNums() <= 1:transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset()#master也算一个continue;if allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService: #allAck=true并且haservice#是AutoSwitcHAserver的实例#暂不直到AutoSwitchHAService干嘛的,#不懂为什么allAck=true时还需要加上这个条件#此时必须等待所有副本okint ackNums = 1 #ackNums从1开始,因为master也算一个for conn : haService.getConnectionList: #遍历所有slaveif SyncStateSet.contains(slaveId) && #如果该slave位于该master的slave集合中AutoSwitchHAConnection.getSlaveAckOffset>= req.getNextOffset #并且该slave报告的ackOffset超过了nextOffset{ #就表明对于这个请求,这个slave已经同步了ackNums++; #那么就把针对这个请求的ackNums+1}if ackNums >= syncStateSet.size(): #此时是要求所有都ack,所以直到所有的都ok才返回ok transferOK = true;break;else: #同上面的流程,区别在于int ackNums = 1;for conn : haService.getConnectionList:if conn.getSlaveAckOffset() >= req.getNextOffset():ackNums++;if ackNums >= req.getAckNums() #达到quorum个slave返回ok就行了transferOK = true;break;req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT)#返回结果,要么ok,要么超时#broker很多地方都是future实现的