MQ四兄弟:如何保证消息顺序性

在当今的分布式系统架构中,消息队列(MQ)是不可或缺的组成部分。它们在确保系统组件之间高效通信方面发挥着关键作用。特别是在金融交易、物流跟踪等对消息处理顺序有严格要求的场景中,消息队列的顺序性保证显得更为重要。接下来,我们将深入探讨RabbitMQ、RocketMQ、Kafka和Pulsar这四个广泛使用的消息队列系统,分析它们是如何确保消息的顺序性,并附上相应的代码示例。

RabbitMQ

RabbitMQ作为一款成熟的开源消息队列,,基于AMQP(Advanced Message Queuing Protocol)协议构建,广泛应用于企业级应用中。虽然RabbitMQ本身并不保证严格的全局顺序性,但可以通过特定的设计模式来实现消息顺序性。

  1. 单一队列和单一消费者模式:确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。因为队列本身就是一个先进先出的结构。
  2. 消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。

以下是一个简单的Java代码片段,展示了如何在RabbitMQ中发送消息。请注意,这个例子没有包含消息排序的逻辑,因为它依赖于具体的业务场景和消息结构。


public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}

创建了一个连接和一个通道,然后声明了一个队列。之后,我们发布了一个简单的消息到队列中。为了保证消息的顺序性,我们需要确保所有消息都是通过同一个通道发送,并且在消费端也是由同一个消费者按顺序接收处理。

RocketMQ

RocketMQ作为阿里巴巴开源的分布式消息队列,在保证消息顺序性方面提供了一种基于MessageQueueSelector的解决方案。其核心思路是将有序的消息写入特定的队列,从而使消费端固定消费某个队列时,就能够按顺序消费消息。

具体来说,RocketMQ中有两个重要概念:

  • Topic: 逻辑上的消息主题
  • MessageQueue: 物理上存储消息的队列

一个Topic包含多个MessageQueue,消息会根据其内容进行哈希计算,分配到不同的MessageQueue中。用户可以通过提供MessageQueueSelector,对特定类型的消息强制分配到同一个MessageQueue,从而保证顺序性。

示例代码:

生产者

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("nameserver:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID" + orderId, ("Hello RocketMQ " + i).getBytes());
// 发送有序消息
producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg; // 订单ID作为选择器的参数int index = orderId % mqs.size(); // 根据订单ID计算MessageQueue索引return mqs.get(index); // 返回该索引对应的MessageQueue}
}, orderId);

通过上述代码,发送端可以将具有相同订单号的消息发送到同一个MessageQueue。

消费端

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");consumer.setNamesrvAddr("nameserver:9876");consumer.subscribe("TopicTest", "TagA");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {System.out.printf("Consumer: %s %n", new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

消费端只需固定消费指定的MessageQueue,即可以保证消息按顺序被消费。

Kafka

Kafka通过Partition(分区)的概念来保证消息的顺序性。同一个Partition中的消息是有序的,但不同Partition之间是无序的。Producer在发送消息时可以指定消息要发送到的分区。Kafka默认提供了基于key的分区策略,确保具有相同key的消息会被发送到同一个分区,从而保证这些消息在这个分区内的顺序性。

以下是一个简单的 Java 代码示例,展示了如何在 Kafka 中发送和消费有序消息:

生产者代码

public class OrderProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 假设我们有10个订单,每个订单的消息需要顺序处理for (int orderId = 0; orderId < 10; orderId++) {for (int i = 0; i < 5; i++) { // 每个订单发送5条消息String message = String.format("Order %d, Message %d", orderId, i);producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));}}producer.close();}
}

producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));

第二个参数是消息的键(key),这里使用订单ID作为键,确保相同订单ID的消息发送到同一个分区

消费者代码


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order_consumer_group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("OrderTopic"));

在生产者代码中,我们使用了相同的 key(即订单ID)来确保消息被发送到同一个 Partition。在消费者代码中,我们订阅了整个 Topic,但由于我们使用了相同的 key 来发送消息,Kafka 会自动将具有相同 key 的消息路由到同一个 Partition,从而保证顺序性。

Pulsar

Apache Pulsar 通过分区主题(Partitioned Topics)来保证消息的顺序性。在Pulsar中,每个分区可以看作是一个独立的消息队列,分区内的消息保持发送顺序。为了确保消息的顺序性,生产者在发送消息时需要指定一个键(Key),Pulsar会根据这个键将消息路由到特定的分区。这样,具有相同键的消息就会被发送到同一个分区,并且按照发送的顺序进行消费。

生产者代码示例:


public class PulsarOrderProducer {public static void main(String[] args) throws Exception {PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Producer<String> producer = client.newProducer(Schema.STRING).topic("persistent://public/default/my-topic").create();for (int i = 0; i < 100; i++) {String key = "OrderID" + (i % 10); // 假设OrderID是业务键String value = "Message" + i;producer.newMessage().key(key).value(value).send();}producer.close();client.close();}
}

消费者代码示例:

public class PulsarOrderConsumer {public static void main(String[] args) throws Exception {PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Consumer<String> consumer = client.newConsumer(Schema.STRING).topic("persistent://public/default/my-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Exclusive).subscribe();while (true) {Message<String> msg = consumer.receive();try {// 处理消息System.out.printf("Message with key %s: %s", msg.getKey(), msg.getValue());consumer.acknowledge(msg);} catch (Exception e) {consumer.negativeAcknowledge(msg);}}}
}

在消费者代码中,我们使用了SubscriptionType.Exclusive,使订阅被独占,确保只有一个消费者能够消费分区内的消息,从而保证了消息的顺序性。

总结

尽管RabbitMQ、RocketMQ、Kafka和Pulsar这些消息队列系统虽然在实现细节上有所不同,但它们保证消息顺序性的核心思想都是相似的,即确保具有相同特征的消息被发送到同一队列或分区中,由于队列数据结构本身就是先进先出的结构,因此只需要消费者从该队列按顺序消费,就能够保证消息的有序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/44159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用libguestfs挂载qcow2磁盘镜像

挂载qcow2磁盘镜像的第一种方法是使用 libguestfs&#xff0c;它提供了一系列工具来访问和编辑 VM 磁盘镜像。libguestfs 支持几乎所有类型的磁盘镜像&#xff0c;包括 qcow2。你可以像下面这样&#xff0c;在Linux上安装libguestfs工具集。 1、安装guestmount工具 在基于 De…

主干网络篇 | YOLOv5/v7 更换骨干网络之 MobileNetV3 | 基于神经网络搜索的轻量级网络(2)

主干网络篇 | YOLOv5/v7 更换骨干网络之 MobileNetV3 | 基于神经网络搜索的轻量级网络 概述 YOLOv5和YOLOv7是目前主流的轻量级目标检测模型&#xff0c;在速度和精度方面取得了良好的平衡。然而&#xff0c;传统的YOLOv5/v7模型使用FPN和CSPNet等结构作为主干网络&#xff0…

SMU Summer 2024 Contest Round 2

[ABC357C] Sierpinski carpet - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 思路:通过因为图形的生成过程是完全一样的。可以通过递归&#xff0c;不断分形。函数process(x,y,k)定义为以坐标(x,y)为左上角,填充sqrt3(k)级的地毯。 int n; int c[800][800]; 默认全为…

【杂说咋说】近年来国土空间规划行业人员转行分析

这几年&#xff0c;国土空间规划行业的人员流动引起了不少关注。我们可以从几个方面来看这些变化&#xff1a; 考公务员 许多从事国土空间规划的专业人员选择了考公务员。这种选择相对稳定&#xff0c;不需要熬夜加班&#xff0c;工作环境也更为舒适。尤其是进入国家机关或住…

POSIX互斥锁和条件变量

一.概述 1.POXIS介绍 POXIS是一种操作系统接口标准&#xff0c;全称为“可移植操作系统接口”。 它最初由IEEE组织制定&#xff0c;目的是为了使不同的操作系统之间可以互相兼容。POSIX标准定义了一系列API&#xff08;应用程序接口&#xff09;和命令行工具&#xff0c;这些…

Mybatis核心问题总结

对MyBatis源码的理解 ORM框架&#xff1a;CRUD操作 1。SQL解析&#xff1a; 映射文件、注解--》映射器解析 XMLMapperBuilder MapperAnnotationBuilder 2。SQL执行: SqlSession 接口--》Executor --》 SimpleExecutor ReuseExecutor 【Statement--JDBC】 3。结果映射&…

Go语言---Json

JSON (JavaScript Object Notation)是一种比XML 更轻量级的数据交换格式&#xff0c;在易于人们阅读和编写的同时&#xff0c;也易于程序解析和生成。尽管JSON是 JavaScript的一个子集&#xff0c;但 JSON采用完全独立于编程语言的文本格式&#xff0c;且表现为键/值对集合的文…

【大模型LLM面试合集】大语言模型架构_layer_normalization

2.layer_normalization 1.Normalization 1.1 Batch Norm 为什么要进行BN呢&#xff1f; 在深度神经网络训练的过程中&#xff0c;通常以输入网络的每一个mini-batch进行训练&#xff0c;这样每个batch具有不同的分布&#xff0c;使模型训练起来特别困难。Internal Covariat…

【C++高阶】高效数据存储:理解并模拟实现红黑树Map与Set

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;了解 红黑树 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀模拟实现Map与Set &#x1f4d2;1.…

js ES6 part1

听了介绍感觉就是把js在oop的使用 作用域 作用域&#xff08;scope&#xff09;规定了变量能够被访问的“范围”&#xff0c;离开了这个“范围”变量便不能被访问&#xff0c; 作用域分为&#xff1a; 局部作用域、 全局作用域 1. 函数作用域&#xff1a; 在函数内部声明的…

爬取天气数据,利用Pyecharts作轮播图

爬取网站链接&#xff1a;https://lishi.tianqi.com/xiamen/202312.html 爬取了厦门市2023年一整年的天气数据&#xff0c;包括最高温&#xff0c;最低温&#xff0c;天气&#xff0c;风力风向等 爬虫代码&#xff1a; import requests import pandas as pd import csv from…

UML建模案例分析-时序图和类图的对应关系

概念 简单地说&#xff0c;类图定义了系统中的对象&#xff0c;时序图定义了对象之间的交互。 例子 一个电子商务系统&#xff0c;会员可通过电子商务系统购买零件。具体功能需求如下&#xff1a; 会员请求结账时&#xff0c;系统验证会员的账户是否处于登录状态&#xff1…

极狐GitLab 17.0 重磅发布,100+ DevSecOps功能更新来啦~【三】

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab &#xff1a;https://gitlab.cn/install?channelcontent&utm_sourcecsdn 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署…

【基础篇】1.8 C语言基础(二)

2.9 预处理指令和宏定义 在STM32开发中,预处理和宏定义常用于配置硬件参数、启用或禁用特定功能、以及优化代码以适应不同的硬件配置或应用场景。通过合理地使用预处理和宏定义,我们可以编写更加灵活、可配置和高效的代码。 预处理指令如#include、#define等在C语言编程中起…

防火墙图形化界面策略和用户认证(华为)

目录 策略概要认证概要实验拓扑图题目要求一要求二要求三要求四要求五要求六 策略概要 安全策略概要&#xff1a; 安全策略&#xff08;Security Policy&#xff09;在安全领域具有双重含义。宏观上&#xff0c;安全策略指的是一个组织为保证其信息安全而建立的一套安全需求、…

uniapp 微信小程序接入MQTT

MQTT安装 前期准备 由于微信小程序需要wss&#xff0c;所以要有域名SSL证书 新建目录/srv/mosquitto/config&#xff0c;/srv/mosquitto/config/cert 目录/srv/mosquitto/config中新建配置文件mosquitto.conf&#xff0c;文件内容 persistence true persistence_location /m…

深入探索Apache Flink:流处理的艺术与实践

在当今的大数据时代&#xff0c;流处理已成为处理实时数据的关键技术。Apache Flink&#xff0c;作为一个开源的流处理框架&#xff0c;以其高吞吐量、低延迟和精确一次&#xff08;exactly-once&#xff09;的语义处理能力&#xff0c;在众多流处理框架中脱颖而出。本文将深入…

在树莓派设备上导出系统镜像

镜像导出 前提条件&#xff1a; 已获取可以正常使用的设备。已获取鼠标、键盘和电源适配器。已将设备接入可正常使用的网络。 操作步骤&#xff1a; 连接适配器给设备上电&#xff0c;正常启动设备&#xff0c;连接鼠标和键盘。在终端命令窗格执行如下命令&#xff0c;安装…

数据模型-ER图在数据模型设计中的应用

ER图在数据模型设计中的应用 1. ER图概述&#xff1a;起源与发展​ 实体-关系图&#xff08;Entity Relationship Diagram&#xff0c;简称ER图&#xff09;起源于1970年代&#xff0c;由Peter Chen首次提出&#xff0c;作为描述数据和信息间关系的图形化语言。随着数据库技术…

[PM]流程与结构设计

流程图 流程就是为了达到特定目标, 进行的一系列有逻辑性的操作步骤, 由两个及已上的步骤, 完成一个完整的行为过程, 即可称为流程, 流程图就是对这个过程的图形化展示 分类 业务流程图 概念: 描述业务流程的一种图, 通过特定符号和连线表示具体某个业务的处理步骤和过程作…