今天学学消息队列RocketMQ:消息类型

        RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。

普通消息

        普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}

顺序消息

        当我开始对消息的时序性有要求的时候,普通消息就无法满足我们的需求了。当我们要求顺序消费的时候,我们的Topic就不能采用多对多的形式,存放在多个MessageQueue中,而是需要牺牲一部分性能,存放在一个MessageQueue中。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(0), null);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println("RocketMQ test msg " + i);System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 跟普通消费不同的地方,这里采用了顺序消费方法。consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();
}

 延时消息

        通过指定的通知时间间隔,让消息不会立刻被消费者收到。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());// 指定延时等级// DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";msg.setDelayTimeLevel(4);// 按毫秒延时msg.setDelayTimeMs(1L);// 按秒延时msg.setDelayTimeSec(1);SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}

30秒的延迟

事务消息

        事务消息是RocketMQ提供的一种高级的消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息的生命周期

        (1)初始化:半事务消息被生产者构建并完成初始化,待发送到RocketMQ服务端的状态。

        (2)事务待提交:半事务消息被发送到服务端,和普通消息不同,半事务消息并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

        (3)消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

        (4)提交事务待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见。

        (5)消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一段时间后没有收到消费者的响应,服务端会对消息进行重试处理。

        (6)消费完成:消费者完成了消费动作,并向服务端提交了消费结果,服务端标记当前消息已经被处理。

        (7)消息删除:服务端按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

// 生产者
public static void main(String[] args) throws MQClientException {TransactionMQProducer producer = new TransactionMQProducer("rmq-group");TransactionListener listener = new TransactionListenerImpl();producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.setTransactionListener(listener);producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println(new Date());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}

 控制台打印

// 生产者控制台打印
Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:45 CST 2023
7F00000154F073D16E938497DE420000
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
93============================
Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态
Wed Jul 26 17:55:46 CST 2023
7F00000154F073D16E938497E2340001
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
94============================
Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:47 CST 2023
7F00000154F073D16E938497E6230002
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
95============================
Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:48 CST 2023
7F00000154F073D16E938497EA180003
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
96============================
Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:49 CST 2023
7F00000154F073D16E938497EE0B0004
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
97============================
Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:50 CST 2023
7F00000154F073D16E938497F1F70005
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
98============================
Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:51 CST 2023
7F00000154F073D16E938497F5EC0006
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
99============================
Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:52 CST 2023
7F00000154F073D16E938497F9E50007
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
100============================
Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:53 CST 2023
7F00000154F073D16E938497FDD30008
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
101============================
Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:54 CST 2023
7F00000154F073D16E93849801C90009
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
102============================// 消费者控制台打印
Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023 
Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023 
Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023 

这里消费者只收到了执行事务成功的5,6,9。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/10665.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Staples Drop Ship EDI 需求分析

Staples 是一家美国零售公司&#xff0c;总部位于马萨诸塞州弗拉明汉&#xff0c;主要提供支持工作和学习的产品和服务。该公司于 1986 年在马萨诸塞州布莱顿开设了第一家门店。到 1996 年&#xff0c;该公司已跻身《财富》世界 500 强&#xff0c;后来又收购了办公用品公司 Qu…

MySQL优化(面试)

文章目录 通信优化查询缓存语法解析及查询优化器查询优化器的策略 性能优化建议数据类型优化索引优化 优化关联查询优化limit分页对于varchar end mysql查询过程: 客户端向MySQL服务器发送一条查询请求服务器首先检查查询缓存&#xff0c;如果命中缓存&#xff0c;则立刻返回存…

初识mysql数据库之事务的概念及操作

目录 一、数据库多客户端访问问题 1. 数据库的CURD无限制带来的问题 2. 如何解决CURD导致的问题 二、事务的概念 1. 什么是事务 2. 事务的四个属性 3. mysql对事务的管理 4. 为什么会有事务 5. 事务的版本支持 三、事务的操作 1. 事务提交方式 2. 事务操作的准备工…

哈工大计算机网络课程局域网详解之:无线局域网

哈工大计算机网络课程局域网详解之&#xff1a;无线局域网 文章目录 哈工大计算机网络课程局域网详解之&#xff1a;无线局域网IEEE 802.11无线局域网802.11体系结构802.11&#xff1a;信道与AP关联 本节介绍一下平时经常使用的一个无线局域网技术&#xff0c;也就是通常我们使…

3ds MAX绘制茶壶

综合一下之前的内容画个茶壶 长方形&#xff0c;然后转化为可编辑多边形&#xff0c;添加节点并设置圆角&#xff0c;如下图 车削生成一个圆环&#xff0c;其实这一步也可以用一个圆柱体和两个圆角圆柱体解决 效果如下&#xff1a; 茶壶的底座绘制好了 接下来是茶壶的上半边 …

TypeScript -- 函数

文章目录 TypeScript -- 函数JS -- 函数的两种表现形式函数声明函数的表达式es6 箭头函数 TS -- 定义一个函数TS -- 函数声明使用接口(定义)ts 定义参数可选参数写法 -- ?的使用TS函数 -- 设置剩余参数函数重载 TypeScript – 函数 JS – 函数的两种表现形式 我们熟知js有两…

history命令:显示命令执行时间

1.修改配置文件 vim /etc/profile 添加内容 export HISTTIMEFORMAT"%Y-%m-%d %H:%M:%S " ​ #注意&#xff1a;在末尾的“引号”与“S”之间&#xff0c;加入一位空格&#xff0c;将日期时间和历史命令用空格相隔开来。 你也可以换一种清晰的形式&#xff0c;效果…

实验三 贪心算法

实验三 贪心算法 迪杰斯特拉的贪心算法实现 优先队列等 1.实验目的 1、掌握贪心算法的基本要素 &#xff1a;最优子结构性质和贪心选择性质 2、应用优先队列求单源顶点的最短路径Dijkstra算法&#xff0c;掌握贪心算法。 2.实验环境 Java 3.问题描述 给定带权有向图G (V…

背包问题求具体方案数问题--板子题

12. 背包问题求具体方案 - AcWing题库 思路&#xff1a;先将v[i]和w[i]先输入进去&#xff0c;然后我们进行倒叙dp&#xff0c;这个做的目的就是为了后边我们为了匹配确定路径做好准备&#xff0c;如果我们倒叙输入进去&#xff0c;我们再正序的时候就可以用推导式来进行路径输…

DevOps(四)

CD(二) 1. CDStep 1 - 上传代码Step 2 - 下载代码Step 3 - 检查代码Step 4 - 编译代码Step 5 - 上传仓库Step 6 - 下载软件Step 7 - 制作镜像Step 8 - 上传镜像Step 9 - 部署服务2. 整体预览2.1 预览1. 修改代码2. 查看sonarqube检查结果3. 查看nexus仓库4. 查看harbor仓库5.…

Hadoop简介以及集群搭建详细过程

Hadoop简介以及集群搭建详细过程 hadoop集群简介hadoop部署模式Hadoop集群安装1.集群角色规划2.服务器基础环境准备3.上传安装包hadoop安装包目录结构5.编辑hadoop配置文件6.分发安装包7.配置hadoop环境变量8.NameNode format(格式化操作) hadoop集群启动关闭-手动逐个进程启停…

【低代码专题方案】使用iPaaS平台下发数据,快捷集成MDM类型系统

01 场景背景 伴随着企业信息化建设日趋完善化、体系化&#xff0c;使用的应用系统越来越多&#xff0c;业务发展中沉淀了大量数据。主数据作为数据治理中枢&#xff0c;保存大量标准数据库&#xff0c;如何把庞大的数据下发到各个业务系统成了很棘手的问题。 传统的数据下发方…

android app控制ros机器人一

android开发app&#xff0c;进而通过控制ros机器人&#xff0c;记录开发过程 查阅资料&#xff1a; rosjava使用较多&#xff0c;已经开发好的app也有开源的案例 rosjava GitHub https://github.com/ros-autom/RobotCA https://github.com/ROS-Mobile/ROS-Mobile-Android…

Pandas时序数据分析实践—概述

时序数据&#xff0c;作为一种时间上有序的数据形式&#xff0c;无疑是我们日常生活中最常见的数据类型之一。它记录了事件、现象或者过程随时间的变化&#xff0c;是对于许多实际场景的忠实反映。而在众多时序数据的应用领域中&#xff0c;跑步训练记录莫过于是一项令人着迷的…

亲测解决Git inflate: data stream error (incorrect data check)

Git inflate: data stream error (incorrect data check) error: unable to unpack… 前提是你的repository在github等服务器或者其他路径有过历史备份/副本&#xff0c;不要求是最新版本的&#xff0c;只要有就可能恢复你做的所有工作。 执行git fsck --full检查损坏的文件 在…

《TCP IP网络编程》第十一章

第 11 章 进程间通信 11.1 进程间通信的基本概念 通过管道实现进程间通信&#xff1a; 进程间通信&#xff0c;意味着两个不同的进程中可以交换数据。下图是基于管道&#xff08;PIPE&#xff09;的进程间通信的模型&#xff1a; 可以看出&#xff0c;为了完成进程间通信&…

数据决定AIGC的高度,什么又决定着数据的深度?

有人曾言&#xff0c;数据决定人工智能发展的天花板。深以为然。 随着ChatGPT等AIGC应用所展现出的强大能力&#xff0c;人们意识到通用人工智能的奇点正在来临&#xff0c;越来越多的企业开始涌入这条赛道。在AIGC浪潮席卷全球之际&#xff0c;数据的重要性也愈发被业界所认同…

MySQL基础(四)数据库备份

目录 前言 一、概述 1.数据备份的重要性 2.造成数据丢失的原因 二、备份类型 &#xff08;一&#xff09;、物理与逻辑角度 1.物理备份 2.逻辑备份 &#xff08;二&#xff09;、数据库备份策略角度 1.完整备份 2.增量备份 三、常见的备份方法 四、备份&#xff08…

vue实现flv格式视频播放

公司项目需要实现摄像头实时视频播放&#xff0c;flv格式的视频。先百度使用flv.js插件实现&#xff0c;但是两个摄像头一个能放一个不能放&#xff0c;没有找到原因。&#xff08;开始两个都能放&#xff0c;后端更改地址后不有一个不能放&#xff09;但是在另一个系统上是可以…

wangEditor初探

1、前言 现有的Quill比较简单&#xff0c;无法满足业务需求&#xff08;例如SEO的图片属性编辑需求&#xff09; Quill已经有比较长的时间没有更新了&#xff0c;虽然很灵活&#xff0c;但是官方demo都没有一个。 业务前期也没有这块的需求&#xff0c;也没有考虑到这块的扩展…