目录
1. 事务消息
1.1 RocketMQ事务消息的原理
1.2 RocketMQ订单支付功能设计
1. 事务消息
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
1.1 RocketMQ事务消息的原理
-
半事务消息发送:生产者将半事务消息发送至RocketMQ服务端。
-
消息持久化及返回Ack确认:RocketMQ服务端接收到半事务消息并持久化成功后,向生产者返回Ack确认消息已经发送成功。此时消息状态为半事务消息。
-
执行本地事务逻辑:根据发送结果执行本地事务,如果写入失败,此时half消息对业务不可见,本地事务逻辑不执行。
-
提交二次确认结果:根据本地事务状态执行Commit或者Rollback。RocketMQ 的事务消息分为3种状态,分别是提交状态、回滚状态、未知状态。
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown: 未知状态,它代表需要检查消息队列来确定状态。 -
消息回查:(1) 对没有Commit/Rollback的事务消息,从服务端发起一次回查 (2) Producer收到回查消息,检查回查消息对应的本地事务的状态 (3) 根据本地事务状态,重新Commit或者Rollback。第一次回查后仍未获取到事务状态,则之后每隔30s会再次回查,最多重试15次,超过了就会默认丢弃此消息。
1.2 RocketMQ订单支付功能设计
数据库设计
/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*//*!40101 SET NAMES utf8 */;/*!40101 SET SQL_MODE=''*/;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;USE `shop`;/*Table structure for table `shop_order` */DROP TABLE IF EXISTS `shop_order`;CREATE TABLE `shop_order` (`id` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL COMMENT '订单id',`total_num` INT DEFAULT NULL COMMENT '数量合计',`moneys` INT DEFAULT NULL COMMENT '金额合计',`pay_type` VARCHAR(1) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '支付类型,1、在线支付、0 货到付款',`create_time` DATETIME DEFAULT NULL COMMENT '订单创建时间',`update_time` DATETIME DEFAULT NULL COMMENT '订单更新时间',`pay_time` DATETIME DEFAULT NULL COMMENT '付款时间',`consign_time` DATETIME DEFAULT NULL COMMENT '发货时间',`end_time` DATETIME DEFAULT NULL COMMENT '交易完成时间',`username` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '用户名称',`recipients` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人',`recipients_mobile` VARCHAR(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人手机',`recipients_address` VARCHAR(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人地址',`weixin_transaction_id` VARCHAR(30) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '交易流水号',`order_status` INT DEFAULT NULL COMMENT '订单状态,0:未完成,1:已完成,2:已退货',`pay_status` INT DEFAULT NULL COMMENT '支付状态,0:未支付,1:已支付,2:支付失败',`is_delete` INT DEFAULT NULL COMMENT '是否删除',PRIMARY KEY (`id`),KEY `create_time` (`create_time`),KEY `status` (`order_status`),KEY `payment_type` (`pay_type`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_bin;/*Data for the table `shop_order` *//*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
添加RocketMQ依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
bootstrap.yaml配置
server:port: 8085
spring:application:name: mall-orderdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTCusername: rootpassword: 123456cloud:nacos:config:file-extension: yamlserver-addr: localhost:8848discovery:#Nacos的注册地址server-addr: localhost:8848rocketmq:name-server: localhost:9876producer:group: test-group-producer
Service层
public interface OrderService extends IService<Order> {//添加订单void add(Order order);//修改订单支付状态void pay(String id);
}@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order>implements OrderService{@AutowiredOrderMapper orderMapper;@Transactional(rollbackFor = Exception.class)@Overridepublic void add(Order order) {order.setCreateTime(new Date());orderMapper.insert(order); //这里仅仅生成订单,还有扣减库存等等一系列操作省略}@Transactional(rollbackFor = Exception.class)@Overridepublic void pay(String id) {//模拟支付完成,修改订单的支付状态Order order = orderMapper.selectById(id);order.setPayStatus(1);order.setPayTime(new Date());orderMapper.updateById(order);}
}
创建生产者
@RestController
@Slf4j
public class TestController {@AutowiredOrderMapper orderMapper;@AutowiredRocketMQTemplate rocketMQTemplate;@RequestMapping("/send")public String send(){String id = UUID.randomUUID().toString();String msg = "订单"+id+"支付成功";Order order=new Order();order.setId(id);order.setCreateTime(new Date());order.setMoneys(100);order.setUsername("张三");Message<String> message = MessageBuilder.withPayload(msg).setHeader("key",id).build();TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order", message, order);String transactionId = result.getTransactionId();String status = result.getSendStatus().name();log.info("发送消息成功 transactionId={} status={} ",transactionId,status);return "success";}
}
创建消费者
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "test-consumer",topic = "order",messageModel = MessageModel.CLUSTERING)
public class RocketMQListen implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);System.out.println(body);}
}
生产者消息监听器
@Component
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {@AutowiredOrderService orderService;@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {Order order = (Order) o;try {//生成订单orderService.add(order);return RocketMQLocalTransactionState.UNKNOWN;}catch (Throwable throwable){throwable.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String key = message.getHeaders().get("key").toString();System.out.println("回查订单id "+key+" 回查时间"+new Date());Order order = orderService.getById(key);if(order!=null) {long l = new Date().getTime() - order.getCreateTime().getTime();long time = l / (1000 * 60);//超时1分钟后,就会把未支付的订单进行删除if (time > 1) {orderService.removeById(key);System.out.println("订单" + key + "删除");//订单,库存等一系列操作return RocketMQLocalTransactionState.ROLLBACK;}Integer payStatus = order.getPayStatus();if (payStatus == 1) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.UNKNOWN;}elsereturn RocketMQLocalTransactionState.ROLLBACK;}
}
测试
这里通过生产者发送五个事务消息,生成五个订单,然后两个订单在一分钟内修改支付状态为已支付,超时一分钟未支付就会删除订单回退。运行截图如下: