贴上一段实现代码, 业务中可以参考使用
@Component
public class TestIdpSender {public void sendInTransaction(String topic, String tag, String message){Message<String> msg = MessageBuilder.withPayload(message).build();String dt = topic + ":" + tag;//发送事务消息(这里是half消息)TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(dt, msg, dt);}}@RocketMQTransactionListener
public class TestIdpTransactionCallBack implements RocketMQLocalTransactionListener{@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try{//正常的业务逻辑//业务日志return RocketMQLocalTransactionState.COMMIT;}catch(Exection e){//异常日志return RocketMQLocalTransactionState.UNKNOWN;}return RocketMQLocalTransactionState.UNKNOWN; }@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {//查询业务状态//根据业务状态返回return RocketMQLocalTransactionState.COMMIT;;}
}
大致描述一下这个流程:
1、发送事务消息时时候首先向RocketMQ发送一条半消息,Broker会将该消息保存到事务消息日志中,并标记为prepared状态, 然后通知producer执行本地事务。
2、producer执行本地事务。执行成功后需通知Broker提交事务消息。
3、Broker收到提交的消息后将该半消息状态改为committed,并将消息从事务消息日志中取出投放到对应topic的messageQueue中, 此时consumer就可以正常消费该消息了。
4、如果本地事务执行失败,需要通知Brocker回滚事务的话,Brocker会将该消息状态改为rollback,并将消息从事务消息日志中删除,从而保证消息不被消费者消费
流程图如下:
整个流程中
1-2失败, 根本不用执行本地事务, broker也不会接受到消息
3本地事务执行失败, 可以考虑在第4步发送rollback给broker, 将消息删除
如果4失败, broker会通过5反差业务, 获取消息事务状态
如果5也失败, 那这条half消息会被保存72小时, 这期间producer还可以向broker发送ack推进消息