这是本人学习的总结,主要学习资料如下
- 马士兵教育
- rocketMq官方文档
目录
- 1、分布式事务的难题
- 2、解决方式
- 2.1、半事务消息和事务回查
- 2.2、代码样例
- 2.2.1、TransactionListener
- 2.2.2、TransactionMQProducer
- 2.2.3、MessageListenerConcurrently
- 2.2.4、流程图
1、分布式事务的难题
现有两个系统,A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务,任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ
。
以最朴素的想法,现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。
- A系统先扣钱再发送MQ:这样的弊端是无法确定消息有没有发送到MQ,或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。
- A系统先发送MQ再扣钱:这样的弊端是发送消息后,A系统可能出现错误回滚。而B收到了消息就正常消费,完全不知道A那边出了问题。
2、解决方式
2.1、半事务消息和事务回查
半事务消息
:半事务消息是指向RocketMQ
发送一条消息,但这个消息只存放在CommitLog
中,并不在ConsumeQueue
展示。也就是说该消息被RocketMQ
接收了,但是消费者却无法消费到这条消息。事务回查
:在半事务消息发送成功后。A系统执行事务,如果成功则MQ将消息变成正常消息,失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话,还可以发送UNKNOWN给MQ,这样MQ就会有定时器去检查事务是否完成。
RocketMQ
会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。
所以呢,通过半事务消息
和事务回查
就能保证A系统和发送消息具有事务,即扣款失败则不发送消息,扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。
消费者需要保证幂等性,失败后重试,即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束,如果尝试多次后还是失败,那就需要一个机制来通知A系统,让他把扣掉的钱加回去。
2.2、代码样例
2.2.1、TransactionListener
一个接口规范,我们需要实现这个接口来定义本地事务和事务回查。
就是本地事务具体执行,成功后怎么办,失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener
的实现中。
TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,A扣100块// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;//或者业务比较复杂,不想在这个阶段就关闭事务,可以返回Unknown,之后就需要MQ定时事务回查return LocalTransactionState.UNKNOW;}@Override// 事务回查,默认一分钟一次public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("事务回查, " + new SimpleDateFormat("yyyyMMdd, HH:mm:ss").format(new Date()));// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;// 业务比较长,还不确定成功或失败,返回unknown,下次再查return LocalTransactionState.UNKNOW;}};
2.2.2、TransactionMQProducer
半事务消息的生产者,在DefaultMQProducer
的基础上新增了一个重要的参数,类型是ExecutorService
。这个线程池是用来生产线程去完成事务回查。
但是事务回查的逻辑不需要定义在线程的run()
方法中,这一部分放在TransactionListener
中。
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");producer.setNamesrvAddr("localhost:9876");// build a thread pool used to for MQ to call back to check transactionExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), (r) -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();try{Message msg = new Message("transaction_producer", null, "A give B 100 dollar".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);}catch(Exception e) {// rollbackSystem.out.println("rollback");}producer.shutdown();
2.2.3、MessageListenerConcurrently
消费者部分就比较简单,只要listener是MessageListenerConcurrently
就好。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionalTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {try{for(MessageExt msg: list) {// simulate DB actionSystem.out.println("update B where transactionId" + msg.getTransactionId());System.out.println("Success consume msg: " + msg.getMsgId());}} catch (Exception e) {e.printStackTrace();System.out.println("Failed to consume meg, try more times");// means that failed to consume this msg. In next time will still consume this msg.return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();
while(true){
}