Spring Cloud —— RocketMQ 的消息类型

导航

  • 引言
  • 一、普通消息
    • 1.1 可靠同步发送
    • 1.2 可靠异步发送
    • 1.3 单向发送
  • 二、顺序消息
  • 三、事务消息
    • 3.1 什么是事务消息
    • 3.2 事务消息示例
      • 1、编写本地事务逻辑
      • 2、发送半事务消息
      • 3、注册本地事务监听器
      • 4、测试

引言

本文承接《Spring Cloud —— 消息队列与 RocketMQ》

RocketMQ 提供了多种场景所需的消息类型,包括普通消息、顺序消息、事务消息,本文分别针对这些消息类型予以展开介绍。

一、普通消息

普通消息分为三种发送方式可靠同步发送、可靠异步发送、单向发送

简言之,可靠同步发送就是消息发送方直到收到MQ的发送结果才发送下一条消息;可靠异步发送就是消息接收方暂时不关心发送结果,连续发送消息,采用消息发送回调的方式接收MQ的发送结果响应;单向发送就是不同步等待发送结果也不设置任何回调函数。

1.1 可靠同步发送

可靠同步发送,表示发送方会同步等待 MQ 的发送结果,可以使用 rocketMQTemplate.syncSend(…) 来实现。
syncSend 有很多重载方法,包括可以在参数列表中指定一个毫秒级的超时时间。
syncSend 如何设置标签?

syncSend(“topic:tag”, 其他参数);

@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testSyncSend() {SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1:testTag", "这是一条同步消息");log.info("同步消息发送结果:{}", sendResult);}
}

1.2 可靠异步发送

可靠异步发送,表示不等待MQ返回响应,而通过回调接口接收服务器响应,并对发送结果进行处理。异步发送一般用于链路耗时较长,对RT 响应时间较为敏感的业务场景。

由于junit运行完会立即退出,因此需要 Thread.sleep 避免 JVM shutdown,实际开发不需要。

@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testAsyncSend() throws InterruptedException {rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("发送结果:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.error("消息发送异常,{}", throwable);}});System.out.println("================");// 实际开发不需要Thread.sleep(10000);}
}

执行结果:

================
2021-10-05 09:04:16.284  INFO [service-order,,,] 7608 --- [ublicExecutor_1] com.morty.rocketmq.MessageTypeTest       : 发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A803781DB858644D46168BB8FC0000, offsetMsgId=C0A8018C00002A9F000000000002FF47, messageQueue=MessageQueue [topic=test-topic-1, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=1]

1.3 单向发送

单向发送,表示发送方只负责发送消息,不等待服务器回应,且没有回调函数触发,即只发送请求不等待应答
适用于某些耗时非常短,但对可靠性要求不高的场景,例如日志收集。

@Slf4j
@SpringBootTest(classes = OrderApplication.class)
@RunWith(SpringRunner.class)
public class MessageTypeTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testOneWay() {rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息。");}
}    

二、顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
Broker 中默认有4个 ConsumeQueue 用来作为消息的传输通道,如果不做特殊要求,消息会分散到不同的 Queue 中,导致消息的乱序。因此,如果希望消息严格保证顺序发送和接收,就必须可以保证顺序的消息发送 API ,使得这些 Message 可以发送到同一个 Queue 中。
在这里插入图片描述
对于可靠同步、可靠异步,以及单向发送的场景,都提供了 xxxSendOrderly(…) 方法,除了保证消息可以分配到同一个 queue 中,以保证消息的有序性之外,没有任何其他区别。
sendOrderly(…) 方法除了需要基本的信息之外,还需要传入一个唯一的 HashKey,只要能够保证唯一即可。

@Test
public void testOneWayOrderly() {rocketMQTemplate.sendOneWayOrderly("test-topic-1", "这是一条单向消息。",String.valueOf(System.currentTimeMillis()));
}

如何验证消息是否被分配到了同一个 queue ?在RocketMQ 控制台的主题中找到如下按钮:
在这里插入图片描述
如果消息能够发送到同一个 queue,那么这几个 queue 中只会有一个 queue 的最大位点发生变化,由此就可以推断消息是否被分配到了同一个 queue 中:
在这里插入图片描述

三、事务消息

本节内容参考:消息类型-事务消息

3.1 什么是事务消息

RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致性
在这里插入图片描述
上图是 RocketMQ 提供的事务消息工作流程图,这是一种非常典型的分布式事务的解决方案。

半事务消息(half message)
指暂不能投递的消息,发送方已经成功地将消息发送到 RocketMQ 服务端,但是MQ未收到生产者对该消息的二次确认,此时该消息被标记为“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查(check back)
MQ服务端针对半事务消息主动向生产者查询其事务状态。由于网络闪断、生产者重启等原因,导致某些事务消息的二次确认丢失,MQ服务端通过扫描发现某些消息长期处于“半事务消息”状态,需要主动向消息生产者询问该消息的最终状态(commit 或 rollback),该询问过程即为消息回查

3.2 事务消息示例

完成一个订单创建的事务消息案例。本地事务采用本地事务表的方式记录事务的状态。

本地事务表
或本地消息表,是一种记录本地事务状态的独立表结构,专门用于存储事务信息,简化并统一本地事务的回查逻辑。表中的每条记录都代表一个已经成功执行的事务。一般会将本地事务表的入库操作和某个业务放在同一个事务中,这样就可以保证事务信息存在,那么事务一定成功。

事务消息的编码步骤要紧扣 RocketMQ 事务消息的流程。
在这里插入图片描述

1、编写本地事务逻辑

为下单逻辑增加事务属性,并在其中加入事务消息记录的逻辑。使用 shop_tx_log 来完成本地事务记录的工作,在执行下单后,同一事务中,完成事务入库的操作。

@Data
@Entity(name = "shop_tx_log")
public class TxLog {@Idprivate String txId;private Date date;
}
public interface TxLogDao extends JpaRepository<TxLog, String> {
}
@Transactional
public void createOrder(String txId, Order order) {// 保存订单orderDao.save(order);TxLog txLog = new TxLog();txLog.setTxId(txId);txLog.setDate(new Date());// 记录事务日志txLogDao.save(txLog);
}

2、发送半事务消息

在 OrderService 下新增半事务消息发送接口:

/*** 下单半事务消息*/
public void createOrderHalfMsg(Order order) {String txId = UUID.randomUUID().toString();rocketMQTemplate.sendMessageInTransaction("tx_producer_group","tx_topic",MessageBuilder.withPayload(order).setHeader("txId", txId).build(),order);
}

sendMessageInTransaction(…) 方法传入四个参数

第3个参数:org.springframework.messaging.support.MessageBuilder 用于构建 Message 对象,withPayload() 传入一个核心的消息实体对象,setHeader() 可以为 Message 对象设置消息头,这里把 txId 放入消息头中以备后面的消息回查。
第4个参数:Object 对象,用于后续执行本地事务时需要使用的数据

这一步骤是 RocketMQ 事务消息的第一步——发送半事务消息,也是代表开启一个以RocketMQ 为基础的分布式事务,除了设置一些基本的消息内容之外(分组、主题等),还需要通过构建MessageBuilder来构建Message,并绑定一个该分布式事务的 transaction Id,和执行后面执行本地事务的必要参数。

3、注册本地事务监听器

RocketMQLocalTransactionListener 提供了事务消息流程中“执行本地事务”和“消息回查”两个步骤的监听入口。

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;/*** 本地事务监听器** @data 2021/10/5 15:03*/
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderMQListener implements RocketMQLocalTransactionListener {@Autowiredprivate OrderService orderService;@Autowiredprivate TxLogDao txLogDao;/*** 执行本地事务*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {String txId = (String) message.getHeaders().get("txId");// 执行本地事务orderService.createOrder(txId, (Order) o);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}/*** 事务回查*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String txId = (String) message.getHeaders().get("txId");TxLog txLog = txLogDao.findById(txId).get();if (txLog != null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}}
}

这里有个小问题,在发送半事务消息的时候,已经有发送结果,那为什么不直接在收到发送成功的响应后直接执行本地事务呢?还要再创建 executeLocalTransaction 这样的回调方法才去执行本地事务?

我认为是因为由MQ主动调用回调函数来执行本地事务具有更强的可靠性。如果直接以发送半消息的结果作为依据来执行本地事务,一旦由于网络或发送端重启等原因未收到半消息的发送结果,就会导致本地事务无法触发,系统的容错性偏低。而提供了回调接口,就可以由MQ来触发本地事务的执行,MQ也可以更好的将本地事务的执行和MQ半事务消息的提交绑定到同一个事务中,更利于事务的管控。

4、测试

上面三步已经基本把事务消息的代码编写完毕,只要在 Controller 层调用 半消息发送方法就可以完成整个事务消息功能。
这里需要对 OrderMQListener 的两个回调函数 executeLocalTransaction() 和 checkLocalTransaction() 打上断点,并检查执行 executeLocalTransaction 时是否完成 txLog 对象的入库。
在这里插入图片描述
从测试结果来看,并没有什么问题。

如何测试消息回查呢?我们可以直接在 executeLocalTransaction() 返回前杀死 order-service ,这样MQ Server 就收不到二次确认的信息,从而会触发消息回查方法。

可以使用 kill 命令,这里简单介绍下 Windows 下是如何操作的。

D:\idea-workspace\shop>jps
11792 Jps
18372 RemoteMavenServer
23284 OrderApplication
9780 rocketmq-console-ng-1.0.0.jar
13080 Launcher
15672 nacos-server.jar
20840 ProductApplication
8200
D:\idea-workspace\shop>taskkill -F /pid 23284
成功: 已终止 PID 为 23284 的进程。

OrderApplication 已经停止,再次启动后,不多一会就可以收到 MQ 的消息回查请求触发 checkLocalTransaction() 方法。测试成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/558614.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

城市运行一网统管_全国率先!“一屏观天下、一网管全城”,临港城市运行“一网统管”平台启动建设...

景区里是否出现了大客流&#xff1f;渣土车是否有违规&#xff1f;工地上有没有安全隐患&#xff1f;……8月12日&#xff0c;临港新片区城市运行“一网统管”平台正式启动建设&#xff0c;临港新片区城市运行“一网统管”中长期规划也正式发布&#xff0c;通过构建具有临港新片…

Spring Cloud Alibaba —— Nacos Config 配置中心

导航引言一、什么是配置中心二、常见的配置中心组件三、Nacos Config 入门四、Nacos Config 动态配置4.1 硬编码方式&#xff08;默认支持动态生效&#xff09;4.2 属性注入五、配置共享5.1 相同微服务不同环境间共享5.2 不同微服务配置共享六、Nacos Config 的几个概念总结引言…

codesys com库_CODESYS在线库,酷德网镜像站启用

近期由于国内网络问题&#xff0c;造成 stroe.codesys.com 网站无法访问。在线库无法下载。为了不影响广大CODESYS用户的正常使用&#xff0c;酷德网建立stroe.codesys.com的国内镜像站:主站&#xff1a; https://store.hicodesys.com:8421/CODESYSLibs/备用站&#xff1a; …

分布式事务的解决思路与方案

导航一、事务的种类与场景二、分布式事务解决方案2.1 全局事务2.2 可靠消息事务2.3 最大努力通知2.4 TCC 事务三、TCC 模式常见问题3.1 二阶段幂等3.2 空回滚3.3 资源悬挂一、事务的种类与场景 本地事务实际上就是指数据库的事务&#xff0c;参考《MySQL —— 事务与隔离级别总…

css3 下边框缓缓划过_干货来袭!web前端开发工程师必看之如何使用CSS3实现瀑布流效果?...

首先,我们来看一下什么是瀑布流布局效果,比如电商网站 蘑菇街原理图:在一个大盒子里&#xff0c;放置多个小盒子&#xff0c;小盒子的大小可以不一致&#xff0c;长短不一样&#xff0c;呈现一种瀑布流的效果。使用CSS3S实现只需要如下4步:1. 准备图片素材2. 书写相应HTML结构3…

Spring Cloud Alibaba —— Seata 分布式事务框架

导航一、Seata 介绍二、Seata 的工作原理2.1 三个角色2.2 工作流程三、Seata AT 工作机制3.1 一阶段3.2 二阶段四、案例演示&#xff08;待补充&#xff09;一、Seata 介绍 官网&#xff1a;Seata 官网 Seata 是2019 年阿里巴巴中间件团队发起的开源项目&#xff0c;其前身是…

云麦体脂秤华为体脂秤_华为、小米和有品体脂秤哪个品牌好?三款智能体脂秤横评结果排行...

如今生活水平的提高&#xff0c;也让更多人开始关注健康问题。由于大部分时间都忙于工作&#xff0c;本身就运动少、体重超标等等。如果长期得不到控制的话&#xff0c;会造成日后脂性肝炎、肝纤维化、肝癌&#xff0c;想想都可怕&#xff0c;在意识到这样的严重性&#xff0c;…

Guava常用工具类的使用

导航引言一、Lists.partition引言 本文用于记录工作中常用到的 Guava 工具类的使用。 依赖引入&#xff1a; <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>20.0</version></depend…

idea 调用c#接口_Dubbo 接口测试方法

一.直接通telnet然后用dubbo协议调用方法&#xff08;1&#xff09;在项目的配置文件中可以看到dubbo.protocol.port10022说明dubbo对外暴漏的端口为10022&#xff0c;直接用telnet访问此端口。telnet lcoalhost 10022然后就能看到说明连接成功。用ls查看服务查看服务下有那些方…

MySQL 面试问答

导航一、什么是回表查询&#xff1f;如何避免回表查询&#xff1f;二、为什么MySQL建议使用自增主键&#xff1f;什么是代理主键、业务主键&#xff1f;三、为什么MySQL建议单表不超过2000W数据&#xff1f;四、MySQL自增id用完了怎么办&#xff1f;五、MySQL自增主键是连续的吗…

tcs标准编写软件_【公益培训】知你所需 | 标准编写格式及TCS模板应用线上公益培训...

企业标准编写的水平及TCS工具使用的能力是实施企业标准化工作的基础。TCS标准编写软件是辅助标准编写的工具性软件&#xff0c;方便标准编写人员快捷准确的编写标准草案&#xff0c;有效提升标准供给质量。为贯彻落实疫情防控和助力企业复工复产工作&#xff0c;山东标准化协会…

Linux进阶之路——常用命令总结

一、帮助命令 help man type区分内建、外建命令 【扩展】关于内建命令与外建命令。 内建命令属于shell程序的一部分&#xff0c;包含一些比较简单的Linux命令。这些命令被写在/bin/bash 文件的 builtins 里面&#xff0c;由shell程序识别并在shell程序内部完成运行。通常在Li…

bios设置 联想m8000t_怎么进bios设置硬盘启动顺序

操作说明&#xff1a;1、不同电脑进BIOS按键不一样&#xff0c;常见的有del、F1、F2、Esc、enter、F8、F9等2、在电脑启动时&#xff0c;不停按Del、F2等按键会进入BIOS设置界面&#xff0c;开机按哪个键进BIOS设置BIOS类型一&#xff1a;CMOS Setup Utility1、启动时按Del进入…

MySQL 基础 ————事务与隔离级别总结

引言 在处理并发读或写时&#xff0c;可以通过实现一个由两种类型的锁组成的锁系统来解决问题&#xff1a; 共享锁&#xff08;shared lock&#xff09;和排它锁&#xff08;exclusive lock&#xff09;&#xff0c;也叫读锁&#xff08;read lock&#xff09;和写锁&#xff0…

32f407tim4时钟源频率_慎重选择时钟发生器,别让这俩指标影响你的ADC 「图片」...

系统设计师通常侧重于为应用选择最合适的数据转换器&#xff0c;在向数据转换器提供输入的时钟发生器件的选择上往往少有考虑。然而&#xff0c;如果不慎重考虑时钟发生器的相位噪声和抖动性能&#xff0c;数据转换器动态范围和线性度性能可能受到严重的影响。系统考虑因素采用…

Spring —— IoC 容器详解

引言 本篇博客总结自官网的《The IoC Container》&#xff0c;其中会结合王富强老师的《Spring揭秘》融入自己的语言和理解&#xff0c;争取通过这一篇文章彻底扫除spring IOC的盲区。 本文介绍什么是 IoC 容器&#xff0c;什么是 Bean&#xff0c;依赖&#xff0c;Bean Defi…

nvidia控制面板点了没反应win7_win7系统Nvidia控制面板怎么设置?

许多用户不知道Nvidia控制面板怎么设置?那么Nvidia控制面板如何设置呢?其实设置的方法很简单。接下来&#xff0c;小编就把Nvidia控制面板设置的方法告诉大家。1、首先在桌面右键点击选择NVIDIA控制面板。2、显卡的设置性能肯定是要高好了&#xff0c;所以在性能设置方面&…

切割 字符串_web前端如何使用字符串

一、字符串概述定义&#xff1a;字符串就是用单引号或者双引号包裹起来的&#xff0c;零个或多个排列在一起的字符。例如&#xff1a;’javascript‘, “”, “345” , ’9-11a$‘, “xiao_yuanLian”嵌套&#xff1a;字符串可以嵌套。在单引号包裹的字符串内部&#xff0c;应该…

卡尔曼_卡尔曼滤波最完整公式推导

卡尔曼滤波是一种利用线性系统状态方程&#xff0c;通过系统输入输出观测数据&#xff0c;对系统状态进行最优估计的算法。由于观测数据中包括系统中的噪声和干扰的影响&#xff0c;所以最优估计也可看作是滤波过程。上面一段话来自百度百科&#xff0c;其实最核心的意思就是卡…

Redis 缓存实战——缓存、数据库一致性问题分析与解决方案

引言 缓存与数据库一致性的问题自从出现了缓存概念后就一直被提及&#xff0c;它是一个缓存方案的先天缺陷&#xff0c;只要存在缓存&#xff0c;就势必会讨论缓存与数据库一致性的问题。 一致性问题还广泛存在于各种分布式存储场景中&#xff0c;如主从一致性等等。 本篇博…