导航
- 引言
- 一、普通消息
- 1.1 可靠同步发送
- 1.2 可靠异步发送
- 1.3 单向发送
- 二、顺序消息
- 三、事务消息
- 3.1 什么是事务消息
- 3.2 事务消息示例
- 1、编写本地事务逻辑
- 2、发送半事务消息
- 3、注册本地事务监听器
- 4、测试
引言
本文承接《Spring Cloud —— 消息队列与 RocketMQ》
RocketMQ 提供了多种场景所需的消息类型,包括普通消息、顺序消息、事务消息,本文分别针对这些消息类型予以展开介绍。
一、普通消息
普通消息分为三种发送方式:可靠同步发送、可靠异步发送、单向发送。
简言之,可靠同步发送就是消息发送方直到收到MQ的发送结果才发送下一条消息;可靠异步发送就是消息接收方暂时不关心发送结果,连续发送消息,采用消息发送回调的方式接收MQ的发送结果响应;单向发送就是不同步等待发送结果也不设置任何回调函数。
1.1 可靠同步发送
可靠同步发送,表示发送方会同步等待 MQ 的发送结果,可以使用 rocketMQTemplate.syncSend(…) 来实现。
syncSend 有很多重载方法,包括可以在参数列表中指定一个毫秒级的超时时间。
syncSend 如何设置标签?
syncSend(“topic:tag”, 其他参数);
@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testSyncSend() {SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1:testTag", "这是一条同步消息");log.info("同步消息发送结果:{}", sendResult);}
}
1.2 可靠异步发送
可靠异步发送,表示不等待MQ返回响应,而通过回调接口接收服务器响应,并对发送结果进行处理。异步发送一般用于链路耗时较长,对RT 响应时间较为敏感的业务场景。
由于junit运行完会立即退出,因此需要 Thread.sleep 避免 JVM shutdown,实际开发不需要。
@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testAsyncSend() throws InterruptedException {rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("发送结果:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.error("消息发送异常,{}", throwable);}});System.out.println("================");// 实际开发不需要Thread.sleep(10000);}
}
执行结果:
================
2021-10-05 09:04:16.284 INFO [service-order,,,] 7608 --- [ublicExecutor_1] com.morty.rocketmq.MessageTypeTest : 发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A803781DB858644D46168BB8FC0000, offsetMsgId=C0A8018C00002A9F000000000002FF47, messageQueue=MessageQueue [topic=test-topic-1, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=1]
1.3 单向发送
单向发送,表示发送方只负责发送消息,不等待服务器回应,且没有回调函数触发,即只发送请求不等待应答。
适用于某些耗时非常短,但对可靠性要求不高的场景,例如日志收集。
@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testOneWay() {rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息。");}
}
二、顺序消息
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
Broker 中默认有4个 ConsumeQueue 用来作为消息的传输通道,如果不做特殊要求,消息会分散到不同的 Queue 中,导致消息的乱序。因此,如果希望消息严格保证顺序发送和接收,就必须可以保证顺序的消息发送 API ,使得这些 Message 可以发送到同一个 Queue 中。
对于可靠同步、可靠异步,以及单向发送的场景,都提供了 xxxSendOrderly(…) 方法,除了保证消息可以分配到同一个 queue 中,以保证消息的有序性之外,没有任何其他区别。
sendOrderly(…) 方法除了需要基本的信息之外,还需要传入一个唯一的 HashKey,只要能够保证唯一即可。
@Test
public void testOneWayOrderly() {rocketMQTemplate.sendOneWayOrderly("test-topic-1", "这是一条单向消息。",String.valueOf(System.currentTimeMillis()));
}
如何验证消息是否被分配到了同一个 queue ?在RocketMQ 控制台的主题中找到如下按钮:
如果消息能够发送到同一个 queue,那么这几个 queue 中只会有一个 queue 的最大位点发生变化,由此就可以推断消息是否被分配到了同一个 queue 中:
三、事务消息
本节内容参考:消息类型-事务消息
3.1 什么是事务消息
RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致性。
上图是 RocketMQ 提供的事务消息工作流程图,这是一种非常典型的分布式事务的解决方案。
半事务消息(half message)
指暂不能投递的消息,发送方已经成功地将消息发送到 RocketMQ 服务端,但是MQ未收到生产者对该消息的二次确认,此时该消息被标记为“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查(check back)
MQ服务端针对半事务消息主动向生产者查询其事务状态。由于网络闪断、生产者重启等原因,导致某些事务消息的二次确认丢失,MQ服务端通过扫描发现某些消息长期处于“半事务消息”状态,需要主动向消息生产者询问该消息的最终状态(commit 或 rollback),该询问过程即为消息回查。
3.2 事务消息示例
完成一个订单创建的事务消息案例。本地事务采用本地事务表的方式记录事务的状态。
本地事务表
或本地消息表,是一种记录本地事务状态的独立表结构,专门用于存储事务信息,简化并统一本地事务的回查逻辑。表中的每条记录都代表一个已经成功执行的事务。一般会将本地事务表的入库操作和某个业务放在同一个事务中,这样就可以保证事务信息存在,那么事务一定成功。
事务消息的编码步骤要紧扣 RocketMQ 事务消息的流程。
1、编写本地事务逻辑
为下单逻辑增加事务属性,并在其中加入事务消息记录的逻辑。使用 shop_tx_log 来完成本地事务记录的工作,在执行下单后,同一事务中,完成事务入库的操作。
@Data
@Entity(name = "shop_tx_log")
public class TxLog {@Idprivate String txId;private Date date;
}
public interface TxLogDao extends JpaRepository<TxLog, String> {
}
@Transactional
public void createOrder(String txId, Order order) {// 保存订单orderDao.save(order);TxLog txLog = new TxLog();txLog.setTxId(txId);txLog.setDate(new Date());// 记录事务日志txLogDao.save(txLog);
}
2、发送半事务消息
在 OrderService 下新增半事务消息发送接口:
/*** 下单半事务消息*/
public void createOrderHalfMsg(Order order) {String txId = UUID.randomUUID().toString();rocketMQTemplate.sendMessageInTransaction("tx_producer_group","tx_topic",MessageBuilder.withPayload(order).setHeader("txId", txId).build(),order);
}
sendMessageInTransaction(…) 方法传入四个参数
第3个参数:org.springframework.messaging.support.MessageBuilder 用于构建 Message 对象,withPayload() 传入一个核心的消息实体对象,setHeader() 可以为 Message 对象设置消息头,这里把 txId 放入消息头中以备后面的消息回查。
第4个参数:Object 对象,用于后续执行本地事务时需要使用的数据
这一步骤是 RocketMQ 事务消息的第一步——发送半事务消息,也是代表开启一个以RocketMQ 为基础的分布式事务,除了设置一些基本的消息内容之外(分组、主题等),还需要通过构建MessageBuilder来构建Message,并绑定一个该分布式事务的 transaction Id,和执行后面执行本地事务的必要参数。
3、注册本地事务监听器
RocketMQLocalTransactionListener 提供了事务消息流程中“执行本地事务”和“消息回查”两个步骤的监听入口。
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;/*** 本地事务监听器** @data 2021/10/5 15:03*/
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderMQListener implements RocketMQLocalTransactionListener {@Autowiredprivate OrderService orderService;@Autowiredprivate TxLogDao txLogDao;/*** 执行本地事务*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {String txId = (String) message.getHeaders().get("txId");// 执行本地事务orderService.createOrder(txId, (Order) o);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}/*** 事务回查*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String txId = (String) message.getHeaders().get("txId");TxLog txLog = txLogDao.findById(txId).get();if (txLog != null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}}
}
这里有个小问题,在发送半事务消息的时候,已经有发送结果,那为什么不直接在收到发送成功的响应后直接执行本地事务呢?还要再创建 executeLocalTransaction 这样的回调方法才去执行本地事务?
我认为是因为由MQ主动调用回调函数来执行本地事务具有更强的可靠性。如果直接以发送半消息的结果作为依据来执行本地事务,一旦由于网络或发送端重启等原因未收到半消息的发送结果,就会导致本地事务无法触发,系统的容错性偏低。而提供了回调接口,就可以由MQ来触发本地事务的执行,MQ也可以更好的将本地事务的执行和MQ半事务消息的提交绑定到同一个事务中,更利于事务的管控。
4、测试
上面三步已经基本把事务消息的代码编写完毕,只要在 Controller 层调用 半消息发送方法就可以完成整个事务消息功能。
这里需要对 OrderMQListener 的两个回调函数 executeLocalTransaction() 和 checkLocalTransaction() 打上断点,并检查执行 executeLocalTransaction 时是否完成 txLog 对象的入库。
从测试结果来看,并没有什么问题。
如何测试消息回查呢?我们可以直接在 executeLocalTransaction() 返回前杀死 order-service ,这样MQ Server 就收不到二次确认的信息,从而会触发消息回查方法。
可以使用 kill 命令,这里简单介绍下 Windows 下是如何操作的。
D:\idea-workspace\shop>jps
11792 Jps
18372 RemoteMavenServer
23284 OrderApplication
9780 rocketmq-console-ng-1.0.0.jar
13080 Launcher
15672 nacos-server.jar
20840 ProductApplication
8200
D:\idea-workspace\shop>taskkill -F /pid 23284
成功: 已终止 PID 为 23284 的进程。
OrderApplication 已经停止,再次启动后,不多一会就可以收到 MQ 的消息回查请求触发 checkLocalTransaction() 方法。测试成功!