使用 RocketMQ 实现消息的顺序消费

在分布式系统中,保持消息的顺序性是一个常见且重要的问题。RocketMQ 提供了一种有效的方式来确保消息的顺序消费。本文将通过代码示例,介绍如何使用 RocketMQ 实现消息的顺序生产和消费。

环境准备

在开始之前,请确保您已经配置好 RocketMQ 环境,并且在 MqConstant 类中定义了 RocketMQ 的 NameServer 地址。

顺序消息的生产

首先,我们需要编写生产者代码来发送顺序消息。我们会创建两个示例,一个简单的顺序生产示例,另一个则是基于业务逻辑(如订单流程)的顺序生产示例。

简单的顺序生产者

package com.takumilove.demo;import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;public class FOrderlyTest {@Testpublic void orderlyProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();for (int i = 0; i < 10; i++) {Message message = new Message("orderlyTopic", ("我是第" + i + "个消息").getBytes());producer.send(message);}producer.shutdown();System.out.println("发送完毕:");}
}

基于业务逻辑的顺序生产者

在这个示例中,我们假设有一个 Order 类表示订单,订单包含了 idorderNumberpricedatestatus 等信息。

package com.takumilove.demo;import com.takumilove.constant.MqConstant;
import com.takumilove.domain.Order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Test;import java.util.Arrays;
import java.util.Date;
import java.util.List;public class FOrderlyTest {@Testpublic void orderlyProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();List<Order> orderList = Arrays.asList(new Order(1, 111, 59D, new Date(), "下订单"),new Order(2, 111, 59D, new Date(), "物流"),new Order(3, 111, 59D, new Date(), "签收"),new Order(4, 112, 89D, new Date(), "下订单"),new Order(5, 112, 89D, new Date(), "物流"),new Order(6, 112, 89D, new Date(), "拒收"));orderList.forEach(order -> {Message message = new Message("orderlyTopic", order.toString().getBytes());try {producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {int queueNumber = list.size();Integer i = (Integer) o;return list.get(i % queueNumber);}}, order.getOrderNumber());} catch (Exception e) {System.out.println("发送失败" + e.getMessage());}});producer.shutdown();System.out.println("发送完毕:");}
}

顺序消息的消费

接下来,我们编写消费者代码来消费这些顺序消息。我们将分别展示简单顺序消费者和基于业务逻辑的顺序消费者。

简单的顺序消费者

package com.takumilove.demo;import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;import java.util.List;public class FOrderlyTest {@Testpublic void orderlyConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("orderlyTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(new String(list.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();}
}

基于业务逻辑的顺序消费者

package com.takumilove.demo;import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;import java.util.List;public class FOrderlyTest {@Testpublic void orderlyConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);consumer.subscribe("orderlyTopic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {MessageExt messageExt = list.get(0);System.out.println(new String(messageExt.getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read();}
}

总结

通过以上示例,我们展示了如何使用 RocketMQ 实现消息的顺序生产和消费。无论是简单的消息还是基于业务逻辑的消息,都可以通过 RocketMQ 提供的顺序消费机制来保证消息的有序性。这对于订单系统等需要严格顺序的场景尤为重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/47746.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT纯代码实现滑动开关控件

开关按钮大家应该很熟悉&#xff0c;在设置里面经常遇到&#xff0c;切换时候的滑动效果比较帅气。通常说的开关按钮&#xff0c;有两个状态&#xff1a;on、off。大部分的开关按钮控件&#xff0c;基本上有两大类&#xff0c;第一类是纯代码绘制&#xff0c;这种对代码的掌控度…

dhtmlx-gantt甘特图数据展示

官网文档&#xff1a;甘特图文档 实现效果&#xff1a; 首先需要下载 dhtmlx-gantt组件 npm i dhtmlx-gantt //我项目中使用的是"dhtmlx-gantt": "^8.0.6" 这个版本&#xff0c;不同的版本api或是文档中存在的方法稍有差异 界面引用 <template>&l…

目标检测算法与应用算法 DS集成 接口相关_v0.1

目录 文章目录 目录0. 目标GPS信息、速度、加速度、航向角信息的输出1. 目标检测算法接口1.1 模型相关1.2 检测结果相关 2. 应用算法接口2.1 bool cross_line; //跨线&#xff08;变道压线检测&#xff09;2.2 bool break_in; //闯入&#xff08;目标闯入&#xff09;2.3 bool …

Linux HOOK机制与Netfilter HOOK

一. 什么是HOOK&#xff08;钩子&#xff09; 在计算机中&#xff0c;基本所有的软件程序都可以通过hook方式进行行为拦截&#xff0c;hook方式就是改变原始的执行流。 二. Linux常见的HOOK方式 1、修改函数指针。 2、用户态动态库拦截。 ①利用环境变量LD_PRELOAD和预装载机…

【Python】python中list的迭代

什么是迭代&#xff1a; 迭代其实就是遍历整个数据结构 nums [3,4,5] for n in nums:print(n)上述代码中&#xff0c;我们定义了一个nums列表&#xff0c;并且使用for循环对其进行遍历。其实整个过程就是迭代&#xff0c;所谓迭代&#xff0c;就是对数据集中每一个元素对其进…

STM32自己从零开始实操:PCB全过程

一、PCB总体分布 以下只能让大家看到各个模块大致分布在板子的哪一块&#xff0c;只能说每个人画都有自己的理由&#xff1a; 电源&#xff1a;从外部接入电源&#xff0c;5V接到中间&#xff0c;向上变成4V供给无线&#xff0c;向下变成3V供给下面的接口&#xff08;也刻意放…

无极与有极电容的区别

无极性电容与有极性电容&#xff1a;差异与应用探索 在电子元件的广阔世界里&#xff0c;电容器无疑是不可或缺的一部分。它们以储存电荷和调节电路中的电压与电流而闻名。然而&#xff0c;电容器并非一概而论&#xff0c;其中最为显著的区别之一就是无极性电容与有极性电容。…

Springboot中常见的注解及其底层实现?

Spring Boot 是一个用于简化 Spring 应用初始搭建以及开发过程的框架&#xff0c;它大量使用了注解来简化配置和提高开发效率。以下是一些常见的 Spring Boot 注解及其底层实现&#xff1a; ### 1. SpringBootApplication 这是一个复合注解&#xff0c;包含了 Configuration、…

DP讨论——访问者模式

学而时习之&#xff0c;温故而知新。 访问者模式 角色 3个角色&#xff0c;访问者类&#xff0c;被访问者类&#xff0c;管理被访问者类的类。 特色 所谓访问者模式&#xff0c;我感觉就是被访问的类的方法形参是别的对象引用&#xff0c;然后临时过来进入一下&#xff0c…

每日一练 - BGP 路由表中的团体属性

01 真题题目 下面一台路由器的输出信息&#xff0c;关于这段信息描述正确的是? A.目的网段 1.1.1.0/24 所携带的团体属性值是 NO-EXPORT&#xff0c; 表明该路由条目不能通告给任何 BGP 邻居 B.目的网段 1.1.1.0/24 所携带的图体属性值是 NO-EXPORT&#xff0c; 表明试路由…

Python面试整理-Python中的变量和赋值:理解变量的命名规则、赋值方式以及变量类型

在Python中,变量用于存储数据。以下是关于变量的命名规则、赋值方式和变量类型的详细说明: 变量的命名规则 1. 字母、数字和下划线: ● 变量名必须以字母(a-z,A-Z)或下划线(_)开头,后续字符可以是字母、数字(0-9)或下划线。 ● 例如:my_var, _var2, var3 2.

Three.JS 使用RGBELoader和CubeTextureLoader 添加环境贴图

导入RGBELoader模块&#xff1a; import { RGBELoader } from "three/examples/jsm/loaders/RGBELoader.js"; 使用 addRGBEMappingk(environment, background,url) {rgbeLoader new RGBELoader();rgbeLoader.loadAsync(url).then((texture) > {//贴图模式 经纬…

三个国产数据库调研(达梦,PolarDB,TDSQL

三个国产数据库调研&#xff1a;达梦&#xff0c;PolarDB&#xff0c;TDSQL 1. 整体描述2. 达梦数据库2.1 相关网站2.2 接入工作2.3 工具使用2.4 总结 3. PolarDB数据库3.1 相关网站3.2 产品对比3.3 接入工作 4. TDSQL数据库4.1 相关网站4.2 产品对比4.3 接入工作 5. 对比总结5…

git使用-命令行+VS Code结合使用

一、Git常用命令 // 显示当分支的状态。它会列出已修改、已暂存和未跟踪的文件 git status// 列出本地仓库中所有的分支&#xff0c;其中会特殊显示当前所在分支 git branch// 在当前分支的基础上创建一个新的分支&#xff0c;并切换到这个新的分支上 git checkout -b 新分支…

问题:向上对齐对象的快捷键是: #学习方法#笔记

问题&#xff1a;向上对齐对象的快捷键是: A、T B、L C、R D、W 参考答案如图所示

做一只勤劳的小蜜蜂

机缘 成为创作者的初心&#xff0c;对我而言&#xff0c;是一个融合了个人兴趣、职业成长以及对知识传播热爱的复杂而纯粹的情感交织。回顾这段旅程的起点&#xff0c;几个核心驱动力始终引领着我前行&#xff1a; 1、记录与反思&#xff1a;在职业生涯的早期&#xff0c;我遇…

WordPress与 wp-cron.php

WordPress 傲居全球最流行的内容管理系统&#xff08;CMS&#xff09;之位&#xff0c;占据了互联网约43%的网站后台&#xff0c;这主要得益于其直观易用的用户界面以及丰富的扩展功能&#xff0c;特别是为新手用户提供了极大的便利。 然而&#xff0c;在畅享WordPress带来的便…

Leetcode 1302.层数最深子叶结点的和

大家好&#xff0c;今天我给大家分享一下我关于这个题的想法&#xff0c;我这个题过程比较复杂&#xff0c;但大家如果觉得好的话&#xff0c;就请给个免费的赞吧&#xff0c;谢谢了^ _ ^ 1.题目要求: 给你一棵二叉树的根节点 root &#xff0c;请你返回 层数最深的叶子节点的…

Go语言并发编程-Context上下文

Context上下文 Context概述 Go 1.7 标准库引入 context&#xff0c;译作“上下文”&#xff0c;准确说它是 goroutine 的上下文&#xff0c;包含 goroutine 的运行状态、环境、现场等信息。 context 主要用来在 goroutine 之间传递上下文信息&#xff0c;包括&#xff1a;取…

准备跳槽了(仍然底层为主,ue独立游戏为辅)

思考再三&#xff0c;准备跳槽了。 一、跳槽原因&#xff1a; 今年经济形势非常不好。那我为什么还要跳槽呢&#xff1f;因为干不下去了。公司是末位淘汰制&#xff0c;而我绩效垫底了。给我的整改措施中&#xff0c;部门经理让我三个月搞定60个bug&#xff0c;我觉得简直是送…