RocketMQ事务消息 超时重发还是原来的消息吗?

以下面的一个demo例子来分析一下,探索RocketMQ事务消息原理。

    public static final String PRODUCER_GROUP = "tran-test";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "Test";public static void main(String[] args) throws Exception {TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println(String.format("executeLocalTransaction: %s", msg.getTransactionId()));return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println(String.format("checkLocalTransaction: tranId=%s, commitLogOffset=%s, queueOffset=%s, msgId=%s",msg.getTransactionId(), msg.getCommitLogOffset(),msg.getQueueOffset(), msg.getMsgId()));return LocalTransactionState.UNKNOW;}};TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);producer.setTransactionListener(transactionListener);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();Message msg = new Message(TOPIC, "test".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println(String.format("sendResult: tranId=%s, offsetMsgId=%s, queueOffset=%s msgId=%s",sendResult.getTransactionId(), sendResult.getOffsetMsgId(),sendResult.getQueueOffset(), sendResult.getMsgId()));CountDownLatch countDownLatch = new CountDownLatch(1);countDownLatch.await();}
executeLocalTransaction: C0DE00428BEC18B4AAC27F377B6E0000
sendResult: tranId=C0DE00428BEC18B4AAC27F377B6E0000, offsetMsgId=null, queueOffset=82 msgId=C0DE00428BEC18B4AAC27F377B6E0000
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315411, queueOffset=83, msgId=C0DE004200002A9F0000000000141253
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315805, queueOffset=84, msgId=C0DE004200002A9F00000000001413DD
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316199, queueOffset=85, msgId=C0DE004200002A9F0000000000141567
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316593, queueOffset=86, msgId=C0DE004200002A9F00000000001416F1
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316987, queueOffset=87, msgId=C0DE004200002A9F000000000014187B
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317381, queueOffset=88, msgId=C0DE004200002A9F0000000000141A05
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317775, queueOffset=89, msgId=C0DE004200002A9F0000000000141B8F
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318169, queueOffset=90, msgId=C0DE004200002A9F0000000000141D19
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318563, queueOffset=91, msgId=C0DE004200002A9F0000000000141EA3
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318957, queueOffset=92, msgId=C0DE004200002A9F000000000014202D
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319352, queueOffset=93, msgId=C0DE004200002A9F00000000001421B8
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319747, queueOffset=94, msgId=C0DE004200002A9F0000000000142343
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320142, queueOffset=95, msgId=C0DE004200002A9F00000000001424CE
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320537, queueOffset=96, msgId=C0DE004200002A9F0000000000142659
checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320932, queueOffset=97, msgId=C0DE004200002A9F00000000001427E4

通过上述例子的输出结果可以发现,checkLocalTransaction中queueOffset、msgId都发生的变化。那么在broker中到底发生了什么呢。

事务消息原理

当客户端发送一个事务消息时,MessageConst.PROPERTY_TRANSACTION_PREPARED=“true” 标记这个消息是一个事务消息。

        SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}

broker在收到消息时会取出traFlag,如果traFlag=true消息将交给TransactionalMessageService处理

        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);boolean sendTransactionPrepareMessage = false;if (Boolean.parseBoolean(traFlag)&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return response;}sendTransactionPrepareMessage = true;}long beginTimeMillis = this.brokerController.getMessageStore().now();if (brokerController.getBrokerConfig().isAsyncSendEnable()) {CompletableFuture<PutMessageResult> asyncPutMessageFuture;if (sendTransactionPrepareMessage) {//处理事务消息asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}

TransactionalMessageService在保存消息时会将原来的topic使用RMQ_SYS_TRANS_HALF_TOPIC来替换,原topic信息存放在properties中。这样在是先把消息保存下来,而不让Consumer立刻就能收到。
当收到TransactionMQProducer发来的COMMIT_MESSAGE时,再将消息从RMQ_SYS_TRANS_HALF_TOPIC取出替换成原来的topic写入。同时再向RMQ_SYS_TRANS_OP_HALF_TOPIC的topic中也写一份。
broker通过对比RMQ_SYS_TRANS_OP_HALF_TOPIC和RMQ_SYS_TRANS_HALF_TOPIC中是否同时存在来判断事务消息是否结束了。
当收到的不是COMMIT_MESSAGE而是UNKNOW时,TransactionalMessageCheckService会定时回调TransactionMQProducer#checkLocalTransaction查询本地事务状态,默认最多检查15次。

在这里插入图片描述

TransactionalMessageCheckService

TransactionalMessageCheckService是一个运行在broker中的一个线程,线程默认每1分钟执行一次来检测系统中超时的half事务消息并发起重试。

    @Overridepublic void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {try {String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);if (msgQueues == null || msgQueues.size() == 0) {log.warn("The queue of topic is empty :" + topic);return;}log.debug("Check topic={}, queues={}", topic, msgQueues);for (MessageQueue messageQueue : msgQueues) {long startTime = System.currentTimeMillis();//每个half queue都有一个对应的op queueMessageQueue opQueue = getOpQueue(messageQueue);//获取当前未完成的half queue的offsetlong halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);//获取当前已完成的op queue的offsetlong opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);if (halfOffset < 0 || opOffset < 0) {log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,halfOffset, opOffset);continue;}......// single threadint getMessageNullCount = 1;long newOffset = halfOffset;long i = halfOffset;long nextOpOffset = pullResult.getNextBeginOffset();int putInQueueCount = 0;int escapeFailCnt = 0;while (true) {if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);break;}if (removeMap.containsKey(i)) {......} else {//从RMQ_SYS_TRANS_HALF_TOPIC取出half消息GetResult getResult = getHalfMsg(messageQueue, i);MessageExt msgExt = getResult.getMsg();if (msgExt == null) {if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {break;}............//是否需要丢弃消息if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {listener.resolveDiscardMsg(msgExt);newOffset = i + 1;i++;continue;}......//判断上次check是否超时boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime|| opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout|| valueOfCurrentMinusBorn <= -1;if (isNeedCheck) {//超时if (!putBackHalfMsgQueue(msgExt, i)) {continue;}putInQueueCount++;log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}",msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),msgExt.getQueueOffset(), msgExt.getCommitLogOffset());//重新给TransactionListener发起check请求listener.resolveHalfMsg(msgExt);..................if (newOffset != halfOffset) {transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);}long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) {transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);}                            

上述代码中有三个比较重要的细节,needDiscard、putBackHalfMsgQueue和listener.resolveHalfMsg。
needDiscard:从half queue取出来后判断消息的TRANSACTION_CHECK_TIMES属性是否大于15次。
小于15次,则TRANSACTION_CHECK_TIMES属性值+1。
大于15次,则从RMQ_SYS_TRANS_HALF_TOPIC中丢弃,通过listener.resolveDiscardMsg保存在TRANS_CHECK_MAX_TIME_TOPIC中交由人工处理。
putBackHalfMsgQueue:将消息重新插入一份到RMQ_SYS_TRANS_HALF_TOPIC,因为CommitLog的applyOnly特性不能修改原消息。所以需要重新apply消息导致queueOffset、commitLogOffset、msgId都发生了变化。

    private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {msgExt.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());msgExt.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());log.debug("Send check message, the offset={} restored in queueOffset={} "+ "commitLogOffset={} "+ "newMsgId={} realMsgId={} topic={}",offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),msgExt.getTopic());return true;

listener.resolveHalfMsg:通过回调resolveHalfMsg方法向TransactionMQProducer重发check。

    public void resolveHalfMsg(final MessageExt msgExt) {if (executorService != null) {executorService.execute(new Runnable() {@Overridepublic void run() {try {sendCheckMessage(msgExt);} catch (Exception e) {LOGGER.error("Send check message error!", e);}}});} else {LOGGER.error("TransactionalMessageCheckListener not init");}}public void sendCheckMessage(MessageExt msgExt) throws Exception {CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));msgExt.setStoreSize(0);String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);if (channel != null) {//取出与broker相连的netty channel发送check消息brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);} else {LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);}}

half消息示意图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/121125.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【CSS】CSS 属性计算过程

1. 概述 我们所书写的任何一个 HTML 元素&#xff0c;实际上都有完整的一整套 CSS 样式。如果没有修改某样式&#xff0c;大概率可能使用默认值。 例如&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"&…

3台Centos7快速部署Kafka集群

首先&#xff0c;我要说&#xff0c;Kafka 是强依赖于 ZooKeeper 的&#xff0c;所以在设置 Kafka 集群之前&#xff0c;我们首先需要设置一个 ZooKeeper 集群。 部署ZooKeeper需要安装jdk yum install java-1.8.0-openjdk 安装完以后 下面是详细的步骤&#xff1a; 1. 安装和…

iOS_Crash 四:的捕获和防护

文章目录 1.Crash 捕获1.2.NSException1.2.C异常1.3.Mach异常1.4.Unix 信号 2.Crash 防护2.1.方法未实现2.2.KVC 导致 crash2.3.KVO 导致 crash2.4.集合类导致 crash2.5.其他需要注意场景&#xff1a; 1.Crash 捕获 根据 Crash 的不同来源&#xff0c;分为以下三类&#xff1a…

NReco.LambdaParser使用案例

使用案例集合&#xff1a; private async void RuleEngine_Click(object sender, EventArgs e){#region 获取变量string expression this.Rule.Text.Trim();string pattern "\$(.*?)\$";MatchCollection matches Regex.Matches(expression, pattern);foreach (Ma…

day38(VueJS)概念 开发模式 框架的特点 启动步骤 指令 以及 小案例

概念 概念&#xff1a;Vue 是一套使用 Javascript 构建用户界面的渐进式框架。 Vue 框架涉及的内容有&#xff1a;Vue.js 开发概述、环境搭建、 Vue 指令、组件化应用构建、组件通信、组件嵌套、自定义指令、 自定义过滤器、组件属性、组件的路由、路由跳转。 注意&#xff1a…

unity 基于UGUI的无限动态滚动列表

基于UGUI的动态滚动列表&#xff0c;主要支持以下功能&#xff1a; 继承自UGUI的SrollRect&#xff0c;支持ScrollRect的所有功能&#xff1b; 使用对象池来管理列表元素&#xff0c;以实现列表元素的复用&#xff1b; 支持一行多个元素或一列多个元素&#xff1b; 可使用不…

ubuntu 22.04 jammy 手动安装 python 3.6

1 准备编译器 sudo apt-get install -y make build-essential libssl-dev zlib1g-dev \ libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev \ libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev \ libgdbm-dev libnss3-dev libedit-dev libc…

vue3 项目搭建教程整理

这里写目录标题 一、node二、npm三、搭建项目四、安装插件 一、node 查node版本&#xff1a;node –v 管理nodejs的版本 二、npm 查npm版本&#xff1a; npm –v1、安装指定版本&#xff08;6.14是具体的版本号&#xff09; npm install npm6.14 -g2、安装最新版本 npm ins…

【MyBatis Plus】深入探索 MyBatis Plus 的条件构造器,自定义 SQL语句,Service 接口的实现

文章目录 前言一、条件构造器1.1 什么是条件构造器1.2 QueryWrapper1.3 UpdateWrapper1.4 LambdaWrapper 二、自定义 SQL 语句2.1 自定义 SQL 的基本用法2.2 自定义 SQL 实现多表查询 三、Service 接口3.1 对 Service 接口的认识3.2 实现 Service 接口3.3 实现增删改查功能3.4 …

el-form那些事

vue3element-plus el-form那些事 输入框后拼接文字 输入框后拼接文字 <el-form-item :label"t(location.locationLength)" prop"locationLength"><el-input v-model"form.locationLength" :placeholder"t(location.inputLocation…

Windoes定时任务、设置定时重启系统

步骤一&#xff1a; 打开计算机管理 通过&#xff1a;control(控制面板&#xff09;或者compmgmt.msc(计算机管理&#xff09;打开程序 步骤二&#xff1a;打开——>系统工具 步骤三&#xff1a; 选择——>任务计划程序 步骤四&#xff1a; 可选择创建新文件命名&…

正点原子嵌入式linux驱动开发——Linux PWM驱动

PWM是很常用到功能&#xff0c;可以通过PWM来控制电机速度&#xff0c;也可以使用PWM来控制LCD的背光亮度。本章就来学习一下如何在Linux下进行PWM驱动开发。 PWM驱动解析 不在介绍PWM是什么了&#xff0c;直接进入使用。 给LCD的背光引脚输入一个PWM信号&#xff0c;这样就…

记一次企业微信的(CorpID)和密钥(Secret)泄漏的利用案例

文章目录 一、介绍二、利用过程1、获取AccessToken2、获取企业微信接口IP段3、获取企业微信回调IP段4、通过部门ID,查看返回的ID5、通过部门ID,查看用户列表6、通过便利ID,发现用户信息泄露,可以进行提交报告7、通过添加接口,添加企业账号8、登陆企业账号进行测试三、参考…

【随机过程】布朗运动

这里写目录标题 Brownian motion Brownian motion The brownian motion 1D and brownian motion 2D functions, written with the cumsum command and without for loops, are used to generate a one-dimensional and two-dimensional Brownian motion, respectively. 使用cu…

主动调度是如何发生的

计算机主要处理计算、网络、存储三个方面。计算主要是 CPU 和内存的合作&#xff1b;网络和存储则多是和外部设备的合作&#xff1b;在操作外部设备的时候&#xff0c;往往需要让出 CPU&#xff0c;就像上面两段代码一样&#xff0c;选择调用 schedule() 函数。 上下文切换主要…

【杂记】java 大集合进行拆分

日常中需要对一个大的集合进行拆分成多个小集合&#xff0c;其主要思路为&#xff1a; 设置需要拆分多少个小集合 A大集合里面有多少条数据 B计算出每个集合里面有多个条数据 CB/A计算出看是否存在余数 DB%A采用集合(List.subList())的方法对大集合进行拆分,循环A变进行集合拆…

Kafka - 3.x 图解Broker总体工作流程

文章目录 Zk中存储的kafka的信息Kafka Broker总体工作流程1. broker启动后向zk中注册2. Controller谁先启动注册&#xff0c;谁说了算3. 由选举出来的Controller监听brokers节点的变化4. Controller决定leader选举5. Controller将节点信息上传到Zk中6. 其他Controller从zk中同步…

Fourier分析导论——第1章——Fourier分析的起源(E.M. Stein R. Shakarchi)

第 1 章 Fourier分析的起源 (The Genesis of Fourier Analysis) Regarding the researches of dAlembert and Euler could one not add that if they knew this expansion, they made but a very imperfect use of it. They were both persuaded that an arbitrary and d…

jenkins配置gitlab凭据

下载Credentials Binding插件&#xff08;默认是已经安装了&#xff09; 在凭据配置里添加凭据类型 点击保存 Username with password&#xff1a; 用户名和密码 SSH Username with private 在凭据管理里面添加gitlab账号和密码 点击全局 点击添加凭据&#xff08;版本不同…

Go RESTful API 接口开发

文章目录 什么是 RESTful APIGo 流行 Web 框架-GinGo HelloWorldGin 路由和控制器Gin 处理请求参数生成 HTTP 请求响应Gin 的学习内容实战用 Gin 框架开发 RESTful APIOAuth 2.0接口了解用 Go 开发 OAuth2.0 接口示例 编程有一个准则——Don‘t Repeat Yourself&#xff08;不要…