Broker提交或回滚事务消息
当生产者本地事务处理完成并且Broker回查事务消息后,不管执行Commit还是Rollback,都会根据用户本地事务的执行结果发送一个End_transaction的RPC请求给Broker,Broker端处理该请求的类是EndTransactionProcessor
第一步,End_Transaction请求校验,主要检查项如下
1.Broker角色检查。Slave Broker不处理事务消息
2.事务消息类型检查。EndTransactionProcessor只处理
Commit或Rollback类型的事务消息,其余消息不处理,这里区分了事务回查
第二步,进行Commit或Rollback。
根据生产者请求头中的参数判断,是Commit请求还是Rollback请求,然后分别进行处理
commitMessage()
提交Half消息/这是事务消息服务接口中的一个方法,根据消息位点查询了Half消息,并将Half消息返回
checkPrepareMessage()
Half消息数据校验。校验内容包括发送消息的生产者组与当前执行Commit/Rollback的生产者是否一致,当前Half消息是否与请求Commit/Rollback的消息是否是同一条消息
endMessageTransaction()
消息对象类型转化,将MessageExt对象转化为MessageExtBrokerInner对象,并且还原消息之前的Topic和ConsumeQueue等信息
sendFinalMessage()
将还原后的事务消息最终发送到CommitLog中,一旦发送成功,消费者就可以正常拉取消息并消费
deletePrepareMessage()
在sendFinalMessage()执行成功后,删除Half消息。其实RocketMQ是不能真正删除消息的,其实质是顺序写磁盘,相当于做了一个"假删除"。"假删除"通过putOpMessage()方法将消息保存到TransactionMessageUtil.
buildOpTopic()的Topic中,并且做上标记TransactionalMessageUtil.REMOVETAG,表示消息已删除
- 如果消息被标记为已删除,则调用addRemoveTagInTransactionOp()方法,利用标记为已删除的OP消息构造Message消息对象,并且调用存储方法保存消息
- TransactionalMessageUtil.buildOpTopic()方法跟保存Half消息时的逻辑类似
- Half消息保存在名为MixAll.RMQ_SYS_TRANS_HALF_TOPIC的Topic中,执行Commit和Rollback后的消息都保存在MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC
对象中,以便Broker判断是否需要回查生产者事务的执行状态。
- 调用存储层方法,真正地将OP消息保存到了CommitLog中
Rollback实现逻辑
Rollback并没有真正删除消息,而是标记Half消息为删除,在Broker回查时机会跳过不检查
rollbackMessage()
该方法与CommitMessage()方法一样,都是查询Half消息并返回消息对象。
checkPrepareMessage()
消息校验,与Commit调用的是同一个方法
deletePrepareMessage()
删除Half消息,与Commit调用的是同一个方法