rocketmq原理_彻底看懂RocketMQ事务实现原理

70a091ba2870cc60cedb459769dba29a.png

面试中经常会问到比如RocketMQ的事务是如何实现的呢?学习框架,我们不仅要熟练使用,更要掌握设计及原理,才算熟悉一个框架。

1 RocketMQ 事务使用案例

public class CreateOrderService {  @Autowired  private OrderDao orderDao;  @Autowired  private ExecutorService executorService;  private TransactionMQProducer producer;  // 初始化transactionListener 和 producer  @Init  public void init() throws MQClientException {    TransactionListener transactionListener = createTransactionListener();    producer = new TransactionMQProducer("myGroup");    producer.setExecutorService(executorService);    producer.setTransactionListener(transactionListener);    producer.start();  }  // 创建订单服务的请求入口  @PUT  @RequestMapping(...)  public boolean createOrder(@RequestBody CreateOrderRequest request) {    // 根据创建订单请求创建一条消息    Message msg = createMessage(request);    // 发送事务消息    SendResult sendResult = producer.sendMessageInTransaction(msg, request);    // 返回:事务是否成功    return sendResult.getSendStatus() == SendStatus.SEND_OK;  }  private TransactionListener createTransactionListener() {    return new TransactionListener() {      @Override      public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {        CreateOrderRequest request = (CreateOrderRequest ) arg;        try {          // 执行本地事务创建订单          orderDao.createOrderInDB(request);          // 如果没抛异常说明执行成功,提交事务消息          return LocalTransactionState.COMMIT_MESSAGE;        } catch (Throwable t) {          // 失败则直接回滚事务消息          return LocalTransactionState.ROLLBACK_MESSAGE;        }      }            // 反查本地事务      @Override      public LocalTransactionState checkLocalTransaction(MessageExt msg) {        // 从消息中获得订单ID        String orderId = msg.getUserProperty("orderId");        // 去db查询订单号是否存在,若存在则提交事务        // 若不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW        return orderDao.isOrderIdExistsInDB(orderId)?                LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;      }    };  }}

如上案例展示了一个订单创建服务,即往db插一条订单记录,并发一条创建订单的消息,要求写db和发消息俩个操作在一个事务内执行。
首先在init()方法中初始化了transactionListener和发生RocketMQ事务消息的变量producer。

  • createOrder()
    真正提供创建订单服务的方法,根据请求的参数创建一条消息,然后调用 producer发事务消息,并返回事务执行结果。

  • createTransactionListener()
    在init()方法中调用,构造实现RocketMQ的TransactionListener接口的匿名类,该接口需要实现如下两个方法:

    • executeLocalTransaction:执行本地事务,在这里我们直接把订单数据插入到数据库中,并返回本地事务的执行结果。7ca732c3b2bfceb7a512a785d29c9ea0.png

    • checkLocalTransaction:反查本地事务,上述流程中是在db中查询订单号是否存在,若存在则提交事务,若不存在,可能本地事务失败了,也可能本地事务还在执行,所以返回UNKNOW30ced6f7a4cd198cb82225ca2b4e3d7b.png

这样便使用RocketMQ的事务简单实现了一个创建订单的分布式事务。

2 RocketMQ事务消息实现原理

2.1 Pro端如何发事务消息?

DefaultMQProducerImpl#sendMessageInTransaction

public TransactionSendResult sendMessageInTransaction(final Message msg,    final LocalTransactionExecuter localTransactionExecuter, final Object arg)    throws MQClientException {    TransactionListener transactionListener = getCheckListener();    if (null == localTransactionExecuter && null == transactionListener) {        throw new MQClientException("tranExecutor is null", null);    }    // ignore DelayTimeLevel parameter    if (msg.getDelayTimeLevel() != 0) {        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);    }    Validators.checkMessage(msg, this.defaultMQProducer);    SendResult sendResult = null;    // 给待发送消息添加属性,表明是一个事务消息(即半消息)    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());    // 像发送普通消息一样,把这条事务消息发往Broker    try {        sendResult = this.send(msg);    } catch (Exception e) {        throw new MQClientException("send message Exception", e);    }    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;    Throwable localException = null;    switch (sendResult.getSendStatus()) {        // 事务消息发送成功        case SEND_OK: {            try {                if (sendResult.getTransactionId() != null) {                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                }                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                if (null != transactionId && !"".equals(transactionId)) {                    msg.setTransactionId(transactionId);                }                if (null != localTransactionExecuter) {                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                } else if (transactionListener != null) {                    log.debug("Used new transaction API");                    // 开始执行本地事务                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                }                if (null == localTransactionState) {                    localTransactionState = LocalTransactionState.UNKNOW;                }                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                    log.info("executeLocalTransactionBranch return {}", localTransactionState);                    log.info(msg.toString());                }            } catch (Throwable e) {                log.info("executeLocalTransactionBranch exception", e);                log.info(msg.toString());                localException = e;            }        }        break;        case FLUSH_DISK_TIMEOUT:        case FLUSH_SLAVE_TIMEOUT:        case SLAVE_NOT_AVAILABLE:            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;            break;        default:            break;    }    // 事务过程的最后,给Broker发送提交或回滚事务的RPC请求。    try {        this.endTransaction(sendResult, localTransactionState, localException);    } catch (Exception e) {        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);    }    TransactionSendResult transactionSendResult = new TransactionSendResult();    transactionSendResult.setSendStatus(sendResult.getSendStatus());    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());    transactionSendResult.setMsgId(sendResult.getMsgId());    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());    transactionSendResult.setTransactionId(sendResult.getTransactionId());    transactionSendResult.setLocalTransactionState(localTransactionState);    return transactionSendResult;}

有事务反查机制作兜底,该RPC请求即使失败或丢失,也不会影响事务最终的结果。最后构建事务消息的发送结果,并返回。

2.2 Broker端如何处理事务消息?

SendMessageProcessor#asyncSendMessage

55cf747e8f8b6e05c7d2807dfcba46ee.png

跟进去看看真正处理半消息的业务逻辑,这段处理逻辑在类

TransactionalMessageBridge

  • putHalfMessage7eee033b71c585793609d7f891a518fc.png

  • parseHalfMessageInnereddd49d37d0c7834aaee0aaed1db8671.png

    RocketMQ并非将事务消息保存至消息中 client 指定的 queue,而是记录了原始的 topic 和 queue 后,把这个事务消息保存在

15c0a278e7eb6cb20e83dc6aea4fa7e0.png设计思想ad3c9915eb7ab773fd7b5afcb8c272d3.png
  • 特殊的内部 topic:RMQ_SYS_TRANS_HALF_TOPIC

  • 序号为 0 的 queue

这套 topic 和 queue 对消费者不可见,因此里面的消息也永远不会被消费。这就保证在事务提交成功之前,这个事务消息对 Consumer 是消费不到的。

2.3 Broker端如何事务反查?

在Broker的TransactionalMessageCheckService服务中启动了一个定时器,定时从事务消息queue中读出所有待反查的事务消息。

AbstractTransactionalMessageCheckListener#resolveHalfMsg

  • 针对每个需要反查的半消息,Broker会给对应的Producer发一个要求执行事务状态反查的RPC请求71528fef9cf5e6a4a280286868d88344.png

  • AbstractTransactionalMessageCheckListener#sendCheckMessage3d7ae52543b9f0fe0ef8aea4aef65a0e.png

  • Broker2Client#checkProducerTransactionStateb90f9f0b8b4333d184a41fc8194f7c9e.png根据RPC返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。

最后,提交或者回滚事务。首先把半消息标记为已处理

  1. 如果是提交事务,就把半消息从半消息队列中复制到该消息真正的topic和queue中

  2. 如果是回滚事务,什么都不做

EndTransactionProcessor#processRequestf6d1f78b4c12d8a2e7bc7ef434bf6766.png

最后结束该事务。

3 总结

  • 整体实现流程cb01c466e8b063dd9959cc4c297f110f.png

RocketMQ是基于两阶段提交来实现的事务,把这些事务消息暂存在一个特殊的queue中,待事务提交后再移动到业务队列中。最后,RocketMQ的事务适用于解决本地事务和发消息的数据一致性问题。

参考

  • https://juejin.im/post/6844904193526857742

e91cd6bfd444ef28bd22abcbaa9ea44f.png

42fb790eec56ff362a95348bdd49cd08.png点个在看支持我吧,转发就更好了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/520993.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

要活 102 年,阿里凭借的是什么?

戳蓝字“CSDN云计算”关注我们哦!作者 | 屠敏责编 | 阿秃由浅到深、由轻到重、由拥抱到创新,看似“风平浪静”的历史背后,中国互联网江湖的发展实则“波涛汹涌”。对于身处核心位置的科技巨头阿里巴巴而言,将如何更好地去实现“希…

阿里云 x 蒙牛 | 打通数据孤岛,基于MaxCompute实现产销协同的智慧运营

每一个公司转型的背后 都有着不为人知的秘密 今天,让我们一起探秘 内蒙古蒙牛乳业(集团)股份有限公司是中国发展速度最快的乳品企业之一,2017年实现收入601.56亿元,位列全球乳业第10位,与2016年同比收入增…

(Docker实战) 第3篇:Centos7 拉取和部署Mysql

文章目录搭建mysql1. 创建mysql的配置文件2. 创建mysql配置/srv/mysql/conf/custom.cnf3. 下载并安装mysql 5.7(注意修改密码)搭建mysql 1. 创建mysql的配置文件 mkdir -p /srv/mysql/conf /srv/mysql/logs /srv/mysql/data2. 创建mysql配置/srv/mysql…

Hadoop迁移MaxCompute神器之DataX-On-Hadoop使用指南

DataX-On-Hadoop即使用hadoop的任务调度器,将DataX task(Reader->Channel->Writer)调度到hadoop执行集群上执行。这样用户的hadoop数据可以通过MR任务批量上传到MaxCompute、RDS等,不需要用户提前安装和部署DataX软件包,也不需要另外为…

(Docker实战) 第三篇:配置_开发环境

文章目录一、开发环境-建立开发环境1.1. 常用环境下载地址1.2. IDEA的maven配置1.3. 导入项目示例1.4. IDEA教程,如何从eclipse过渡1.5. gblfy的快捷键风格一、开发环境-建立开发环境 1.1. 常用环境下载地址 jdk8下载地址: https://www.oracle.com/te…

阿里云史上最大技术升级:面向万物智能的飞天2.0

1991年,《科学美国人》杂志描绘了一种“无处不在的计算设备,没人会感觉到它的存在”,拉开了万物智能的序幕。 27年后的2018杭州•云栖大会上,阿里云公布了面向万物智能的新一代云计算操作系统——飞天2.0,可满足百亿级…

杭州·云栖大会宣布多款核心云产品降价,最高降幅达90%

2018杭州云栖大会,阿里云宣布开启新一轮核心产品降价,再次用科技普惠广大开发者和用户,加速产业升级。本次降价涉及近20款产品,产品包括智能语音交互、图像识别、性能测试PTS、云数据库RDS等,其中事务消息降价90%&…

(Docker实战) 第四篇:建立持续集成环境01

标签: gblfy技术文档 文章目录一、 jenkins环境建立1.1. jenkins安装1.2. 安装jenkins前提:下载jdk1.3. 配置jenkins1.3.1. 配置远程发布插件Publish over SSH 选择系统设置1.3.2. 配置jdk,git,maven 选择系统管理->全局工具配…

系统无法分配所需内存_Innodb内存管理解析

本文主要介绍innodb的内存管理,涉及基础的内存分配结构、算法以及buffer pool的实现细节,提及change buffer、自适应hash index和log buffer的基本概念和内存基本配比,侧重点在内存的分配和管理方式。本文所述内容基于mysql8.0版本。基础内存…

飞天2.0面向万物智能的操作系统正式启幕

原文链接 本文为云栖社区原创内容,未经允许不得转载。

阿里数据总监分享《阿里数据中台建设实践案例》,PPT+语音讲解!

戳蓝字“CSDN云计算”关注我们哦!作者 | 技术领导力责编 | 阿秃本文整理自,阿里巴巴集团数据部商业应用总监列文,在“2019年阿里云(上海)峰会”上的分享,以PPT图片文字语音的方式呈现给各位社区读者。如果…

(Docker实战) 第五篇:建立持续集成环境02

标签: gblfy技术文档 文章目录一、 jenkins实战,持续集成实际项目1.1. 安装maven插件1.1.1. 找到Maven Integration这个插件1.1.2. 确保jenkins的maven本地仓库有一定的权限1.1.3. 创建job1.1.4. 配置参数1.1.5. 配置git仓库信息1.1.7. 配置构建信息1.1.…

阿里云OCR证件识别商业化发布,减少人工审核误差节省成本

随着互联网的发展,利用现代信息技术开展高效快捷便民服务工作,已经势在必行。证件识别算法,极大地提高了办公效率,现已在互联网金融、银行保险、电信通讯运营商、智能交通、政府、航空、社保局等行业领域被广泛运用。 近日&#…

(Docker实战) 第六篇:建立持续集成环境03

标签: gblfy技术文档 文章目录六、 git核心概念6.1. git学习地址6.2. github和码云的介绍6.3. git常用命令和操作6.3.1. 命令行常用命令6.3. IDEA操作方法(具体看视频)6.3.1. 合并分支6.3.2. compare with6.3.3. rename6.3.4. 看历史记录(所有的和单个文…

程序员:站在“自学”鄙视链顶端的王者

我在大学的时候,真的遇到一个神人,叫他小马吧。几乎没见过小马上课,第一节实验课就完成全学期所有实验,大一就自学大二课程,大四还没毕业就拿到了阿里offer,然后在我们苦兮兮找工作的时候,人家已…

用户需求源源不断,阿里云网络创新不止

2018杭州云栖大会,阿里云网络产品重点介绍了两个创新产品,智能接入网关和云企业网,以及全球领先的云网络系统-飞天洛神。智能接入网关是业内主要云服务商中第一家提供这样产品的,云企业网更是业内首创的多地域互联产品…

漫画:要跳槽?这道缓存设计题你有必要看看!

戳蓝字“CSDN云计算”关注我们哦!作者 | 程序员吴小胖责编 | 阿秃金九银十招聘季,社畜跳槽,学生出笼,也是非常热闹。不过今年继续互联网寒冬,能苟着还是苟着吧,猥琐发育别浪。苟着除了写Bug,还…

阿里云高级技术专家赵伟:安全加速 SCDN 设计与案例

此前,阿里云发布了SCDN安全加速解决方案,在CDN加速的基础上,将专业的安全能力赋能 CDN,实现既有加速又有安全的服务。在本次杭州云栖-飞天技术汇CDN与边缘计算专场中,阿里云高级技术专家赵伟从业务背景、架构设计和客户…

新品发布、降价普惠、拥抱开源、出海全球化 | 杭州云栖企业数字化转型峰会上的那些关键词

9月19日,在杭州云栖大会 - 企业数字化转型峰会现场,阿里巴巴中间件产品总监赵林分享了2018 Aliware的最新产品动态。本文将为您梳理Aliware在出海全球化、开源支持、消息队列高级特性降价、链路追踪新品发布、应用高可用新品发布、CloudToolkit 新品发布…

华为获颁中国首个5G基站设备进网许可证:可支持中国规模部署;IBM推出新一代企业平台Z15;Testin最新AI产品发布……...

关注并标星星CSDN云计算极客头条:速递、最新、绝对有料。这里有企业新动、这里有业界要闻,打起十二分精神,紧跟fashion你可以的!每周两次,打卡即read更快、更全了解泛云圈精彩newsgo go go 荣耀V30 5G手机概念图&#…