前言
原理:使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
解决问题:事务消息,主要解决生产方和消费方的数据最终一致性问题。
实现方式:二阶段消息 + 反查机制
源码版本:4.9.3
源码架构图
源码解析
接下来,将会按照上面提到的事务消息原理和源码架构图中的顺序,进行源码实现分析。
1. 发送半事务消息
源码入口 org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction(org.apache.rocketmq.common.message.Message, java.lang.Object)
public TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException {if (null == this.transactionListener) {throw new MQClientException("TransactionListener is null", null);}msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);}
接下来对消息添加PROPERTY_TRANSACTION_PREPARED半事务标记,发送事务消息,发送成功后执行本地事务。
// 发送事务消息public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {// 检查,事务监听器不能为空TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// ignore DelayTimeLevel parameterif (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 添加事务消息标记,预处理事务消息,半half消息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 发送消息sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// 重要,发送消息到broker后,mq客户端内部执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}
... 接下来的发送流程和发送普通消息几乎一致,broker接受到事务消息后的处理流程也和普通消息几乎一致,不一致的地方在于,broker的ReputMessageService重分发消息服务线程,在后台读取 CommitLog 数据分发到 ConsumeQueue时,不会分发半Half事务消息和回滚事务消息对应的CommitLog。因为半事务消息,需要等待本地事务执行完,才能投递到消费者队列。
消费分发代码:org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {// 事务类型final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {// 非事务和提交事务,写入消息到消费队列case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:DefaultMessageStore.this.putMessagePositionInfo(request);break;// 预处理事务和回滚事务,不做处理case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}}
2. 执行本地事务 与 提交本地事务状态
接着上面的发送消息往后看,发送事务消息成功后,就会执行本地事务。
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// 重要,发送消息到broker后,mq客户端内部执行本地事务localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}
这里使用的transactionListener 事务监听器,使我们在发起本地事务的代码里写的。如example工程中给的:
public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}
本地事务执行成功后会发送提交消息或回滚消息到broker master节点。
broker收到消息后,由EndTransactionProcessor 结束事务消息处理器进行处理。针对 CommitType类型消息,去除半事务消息标记,发送最终形态的消息,且删除原 CommitLog中的消息。针对 RollBackType类型的消息,删除原CommitLog中的消息。
org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {// 封装响应final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 提取请求头final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);// 处理结束事务消息的broker只能是master节点,否者直接返回if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}// 默认falseif (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {// 分类处理switch (requestHeader.getCommitOrRollback()) {// 非事务类型,打印日志,返回nullcase MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}// 提交事物类型,break,继续处理case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}// 回滚事物类型,break,继续处理case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();// 提交事物if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 调用事物消息提交服务,实现是从消息存储组件中拿到预处理消息result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查预处理消息,封装命令协议RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 预处理消息转换为broker内部消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());// 删除预处理事务消息标记MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 发送最终消息RemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}}// 回滚事物else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 回滚消息,从消息存储组件中拿到预处理消息result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查消息,封装命令协议RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 删除预处理事务消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;}
3. 未收到事务提交状态时,回查本地事务状态
可以看到源码架构图中,broker 内部有一个后台线程,TransactionalMessageCheckService 事务消息检查服务,它会定时扫描没有提交事物状态的事务,向 mqClient 发起检查本地事务状态请求。
org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
// 事务消息检查服务
public class TransactionalMessageCheckService extends ServiceThread {@Overrideprotected void onWaitEnd() {long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // 6sint checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); // 15long begin = System.currentTimeMillis();log.info("Begin to check prepare message, begin time:{}", begin);this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);}}
... 通过事务消息服务 TransactionalMessageService扫描到需要检查的事务后会调用 Broker2Client组件发送请求到mq客户端。
org.apache.rocketmq.broker.client.net.Broker2Client#checkProducerTransactionState
// 检查生产者事务状态public void checkProducerTransactionState(final String group, // 生产者组final Channel channel, // 生产者长连接final CheckTransactionStateRequestHeader requestHeader, // 检查事务状态请求头final MessageExt messageExt // 消息扩展) throws Exception {// 构造检查事务状态请求RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);request.setBody(MessageDecoder.encode(messageExt, false));try {// 通过网络通信服务器向生产者发送检查事务状态请求,单向发送this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",group, messageExt.getMsgId(), e.toString());}}
mqClient接受请求
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {// 检查事务状态case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);}
接着调用生产者实例,执行检查 producer.checkTransactionState(addr, messageExt, requestHeader);
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 解码检查事务状态请求头final CheckTransactionStateRequestHeader requestHeader =(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());// 解码消息体final MessageExt messageExt = MessageDecoder.decode(byteBuffer);if (messageExt != null) {if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));}String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {messageExt.setTransactionId(transactionId);}// 事务消息必须有生产分组名称final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {// 获取生产者实例MQProducerInner producer = this.mqClientFactory.selectProducer(group);if (producer != null) {// 从netty网络通道中获取客户端地址final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());// 检查事务状态producer.checkTransactionState(addr, messageExt, requestHeader);} else {log.debug("checkTransactionState, pick producer by group[{}] failed", group);}} else {log.warn("checkTransactionState, pick producer group failed");}} else {log.warn("checkTransactionState, decode message failed");}return null;}
生产者实例在本地执行,我们发送事务消息时,设置的transactionCheckListener 事务检查监听器。然后将本地事务执行状态,重新提交到 broker(接下来的流程就和发送事务消息后,执行本地事务,提交本地事务状态的流程一样了)。
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState
// 检查事务状态,未收到half消息时,进行事务回查@Overridepublic void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {// 封装一步任务,放进线程池异步执行Runnable request = new Runnable() {private final String brokerAddr = addr;private final MessageExt message = msg;private final CheckTransactionStateRequestHeader checkRequestHeader = header;private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();@Overridepublic void run() {TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();TransactionListener transactionListener = getCheckListener();if (transactionCheckListener != null || transactionListener != null) {LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable exception = null;try {// 调用本地事务监听器,检查本地事务状态if (transactionCheckListener != null) {localTransactionState = transactionCheckListener.checkLocalTransactionState(message);} else if (transactionListener != null) {log.debug("Used new check API in transaction message");// 检查本地事务状态localTransactionState = transactionListener.checkLocalTransaction(message);} else {log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);}} catch (Throwable e) {log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);exception = e;}this.processTransactionState(localTransactionState,group,exception);} else {log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}}// 处理事务状态private void processTransactionState(// 调用本地事务监听器,查询到的本地事务状态final LocalTransactionState localTransactionState,// 生产者分组final String producerGroup,// 异常final Throwable exception) {// 封装事务状态请求final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {// 提交消息状态case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;// 回滚消息状态case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;// 未知消息状态case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}// 执行钩子函数doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);try {// 通过netty网络通信客户端组件,向broker发送事务状态请求,oneway模式,不需要感知发送结果DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request);}