MQ四兄弟:如何保证消息顺序性

在当今的分布式系统架构中,消息队列(MQ)是不可或缺的组成部分。它们在确保系统组件之间高效通信方面发挥着关键作用。特别是在金融交易、物流跟踪等对消息处理顺序有严格要求的场景中,消息队列的顺序性保证显得更为重要。接下来,我们将深入探讨RabbitMQ、RocketMQ、Kafka和Pulsar这四个广泛使用的消息队列系统,分析它们是如何确保消息的顺序性,并附上相应的代码示例。

RabbitMQ

RabbitMQ作为一款成熟的开源消息队列,,基于AMQP(Advanced Message Queuing Protocol)协议构建,广泛应用于企业级应用中。虽然RabbitMQ本身并不保证严格的全局顺序性,但可以通过特定的设计模式来实现消息顺序性。

  1. 单一队列和单一消费者模式:确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。因为队列本身就是一个先进先出的结构。
  2. 消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。

以下是一个简单的Java代码片段,展示了如何在RabbitMQ中发送消息。请注意,这个例子没有包含消息排序的逻辑,因为它依赖于具体的业务场景和消息结构。


public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}

创建了一个连接和一个通道,然后声明了一个队列。之后,我们发布了一个简单的消息到队列中。为了保证消息的顺序性,我们需要确保所有消息都是通过同一个通道发送,并且在消费端也是由同一个消费者按顺序接收处理。

RocketMQ

RocketMQ作为阿里巴巴开源的分布式消息队列,在保证消息顺序性方面提供了一种基于MessageQueueSelector的解决方案。其核心思路是将有序的消息写入特定的队列,从而使消费端固定消费某个队列时,就能够按顺序消费消息。

具体来说,RocketMQ中有两个重要概念:

  • Topic: 逻辑上的消息主题
  • MessageQueue: 物理上存储消息的队列

一个Topic包含多个MessageQueue,消息会根据其内容进行哈希计算,分配到不同的MessageQueue中。用户可以通过提供MessageQueueSelector,对特定类型的消息强制分配到同一个MessageQueue,从而保证顺序性。

示例代码:

生产者

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("nameserver:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID" + orderId, ("Hello RocketMQ " + i).getBytes());
// 发送有序消息
producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg; // 订单ID作为选择器的参数int index = orderId % mqs.size(); // 根据订单ID计算MessageQueue索引return mqs.get(index); // 返回该索引对应的MessageQueue}
}, orderId);

通过上述代码,发送端可以将具有相同订单号的消息发送到同一个MessageQueue。

消费端

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");consumer.setNamesrvAddr("nameserver:9876");consumer.subscribe("TopicTest", "TagA");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {System.out.printf("Consumer: %s %n", new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

消费端只需固定消费指定的MessageQueue,即可以保证消息按顺序被消费。

Kafka

Kafka通过Partition(分区)的概念来保证消息的顺序性。同一个Partition中的消息是有序的,但不同Partition之间是无序的。Producer在发送消息时可以指定消息要发送到的分区。Kafka默认提供了基于key的分区策略,确保具有相同key的消息会被发送到同一个分区,从而保证这些消息在这个分区内的顺序性。

以下是一个简单的 Java 代码示例,展示了如何在 Kafka 中发送和消费有序消息:

生产者代码

public class OrderProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 假设我们有10个订单,每个订单的消息需要顺序处理for (int orderId = 0; orderId < 10; orderId++) {for (int i = 0; i < 5; i++) { // 每个订单发送5条消息String message = String.format("Order %d, Message %d", orderId, i);producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));}}producer.close();}
}

producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));

第二个参数是消息的键(key),这里使用订单ID作为键,确保相同订单ID的消息发送到同一个分区

消费者代码


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order_consumer_group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("OrderTopic"));

在生产者代码中,我们使用了相同的 key(即订单ID)来确保消息被发送到同一个 Partition。在消费者代码中,我们订阅了整个 Topic,但由于我们使用了相同的 key 来发送消息,Kafka 会自动将具有相同 key 的消息路由到同一个 Partition,从而保证顺序性。

Pulsar

Apache Pulsar 通过分区主题(Partitioned Topics)来保证消息的顺序性。在Pulsar中,每个分区可以看作是一个独立的消息队列,分区内的消息保持发送顺序。为了确保消息的顺序性,生产者在发送消息时需要指定一个键(Key),Pulsar会根据这个键将消息路由到特定的分区。这样,具有相同键的消息就会被发送到同一个分区,并且按照发送的顺序进行消费。

生产者代码示例:


public class PulsarOrderProducer {public static void main(String[] args) throws Exception {PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Producer<String> producer = client.newProducer(Schema.STRING).topic("persistent://public/default/my-topic").create();for (int i = 0; i < 100; i++) {String key = "OrderID" + (i % 10); // 假设OrderID是业务键String value = "Message" + i;producer.newMessage().key(key).value(value).send();}producer.close();client.close();}
}

消费者代码示例:

public class PulsarOrderConsumer {public static void main(String[] args) throws Exception {PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Consumer<String> consumer = client.newConsumer(Schema.STRING).topic("persistent://public/default/my-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Exclusive).subscribe();while (true) {Message<String> msg = consumer.receive();try {// 处理消息System.out.printf("Message with key %s: %s", msg.getKey(), msg.getValue());consumer.acknowledge(msg);} catch (Exception e) {consumer.negativeAcknowledge(msg);}}}
}

在消费者代码中,我们使用了SubscriptionType.Exclusive,使订阅被独占,确保只有一个消费者能够消费分区内的消息,从而保证了消息的顺序性。

总结

尽管RabbitMQ、RocketMQ、Kafka和Pulsar这些消息队列系统虽然在实现细节上有所不同,但它们保证消息顺序性的核心思想都是相似的,即确保具有相同特征的消息被发送到同一队列或分区中,由于队列数据结构本身就是先进先出的结构,因此只需要消费者从该队列按顺序消费,就能够保证消息的有序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/44159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用libguestfs挂载qcow2磁盘镜像

挂载qcow2磁盘镜像的第一种方法是使用 libguestfs&#xff0c;它提供了一系列工具来访问和编辑 VM 磁盘镜像。libguestfs 支持几乎所有类型的磁盘镜像&#xff0c;包括 qcow2。你可以像下面这样&#xff0c;在Linux上安装libguestfs工具集。 1、安装guestmount工具 在基于 De…

Go语言---Json

JSON (JavaScript Object Notation)是一种比XML 更轻量级的数据交换格式&#xff0c;在易于人们阅读和编写的同时&#xff0c;也易于程序解析和生成。尽管JSON是 JavaScript的一个子集&#xff0c;但 JSON采用完全独立于编程语言的文本格式&#xff0c;且表现为键/值对集合的文…

【大模型LLM面试合集】大语言模型架构_layer_normalization

2.layer_normalization 1.Normalization 1.1 Batch Norm 为什么要进行BN呢&#xff1f; 在深度神经网络训练的过程中&#xff0c;通常以输入网络的每一个mini-batch进行训练&#xff0c;这样每个batch具有不同的分布&#xff0c;使模型训练起来特别困难。Internal Covariat…

【C++高阶】高效数据存储:理解并模拟实现红黑树Map与Set

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;了解 红黑树 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀模拟实现Map与Set &#x1f4d2;1.…

js ES6 part1

听了介绍感觉就是把js在oop的使用 作用域 作用域&#xff08;scope&#xff09;规定了变量能够被访问的“范围”&#xff0c;离开了这个“范围”变量便不能被访问&#xff0c; 作用域分为&#xff1a; 局部作用域、 全局作用域 1. 函数作用域&#xff1a; 在函数内部声明的…

爬取天气数据,利用Pyecharts作轮播图

爬取网站链接&#xff1a;https://lishi.tianqi.com/xiamen/202312.html 爬取了厦门市2023年一整年的天气数据&#xff0c;包括最高温&#xff0c;最低温&#xff0c;天气&#xff0c;风力风向等 爬虫代码&#xff1a; import requests import pandas as pd import csv from…

UML建模案例分析-时序图和类图的对应关系

概念 简单地说&#xff0c;类图定义了系统中的对象&#xff0c;时序图定义了对象之间的交互。 例子 一个电子商务系统&#xff0c;会员可通过电子商务系统购买零件。具体功能需求如下&#xff1a; 会员请求结账时&#xff0c;系统验证会员的账户是否处于登录状态&#xff1…

防火墙图形化界面策略和用户认证(华为)

目录 策略概要认证概要实验拓扑图题目要求一要求二要求三要求四要求五要求六 策略概要 安全策略概要&#xff1a; 安全策略&#xff08;Security Policy&#xff09;在安全领域具有双重含义。宏观上&#xff0c;安全策略指的是一个组织为保证其信息安全而建立的一套安全需求、…

uniapp 微信小程序接入MQTT

MQTT安装 前期准备 由于微信小程序需要wss&#xff0c;所以要有域名SSL证书 新建目录/srv/mosquitto/config&#xff0c;/srv/mosquitto/config/cert 目录/srv/mosquitto/config中新建配置文件mosquitto.conf&#xff0c;文件内容 persistence true persistence_location /m…

在树莓派设备上导出系统镜像

镜像导出 前提条件&#xff1a; 已获取可以正常使用的设备。已获取鼠标、键盘和电源适配器。已将设备接入可正常使用的网络。 操作步骤&#xff1a; 连接适配器给设备上电&#xff0c;正常启动设备&#xff0c;连接鼠标和键盘。在终端命令窗格执行如下命令&#xff0c;安装…

[PM]流程与结构设计

流程图 流程就是为了达到特定目标, 进行的一系列有逻辑性的操作步骤, 由两个及已上的步骤, 完成一个完整的行为过程, 即可称为流程, 流程图就是对这个过程的图形化展示 分类 业务流程图 概念: 描述业务流程的一种图, 通过特定符号和连线表示具体某个业务的处理步骤和过程作…

极狐GitLab亮相世界人工智能大会,开启开源大模型赋能软件研发新时代

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab &#xff1a;https://gitlab.cn/install?channelcontent&utm_sourcecsdn 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署…

285个地级市-胡焕庸线数据

全国285个地级市-胡焕庸线数据.zip资源-CSDN文库 胡焕庸线&#xff1a;中国人口与生态的分界线 胡焕庸线&#xff0c;一条在中国地理学界具有划时代意义的分界线&#xff0c;由著名地理学家胡焕庸于1935年提出。这条线从黑龙江省的瑷珲&#xff08;现黑河市&#xff09;延伸至…

HippoRAG如何从大脑获取线索以改进LLM检索

知识存储和检索正在成为大型语言模型(LLM)应用的重要组成部分。虽然检索增强生成(RAG)在该领域取得了巨大进步&#xff0c;但一些局限性仍然没有克服。 俄亥俄州立大学和斯坦福大学的研究团队推出了HippoRAG&#xff0c;这是一种创新性的检索框架&#xff0c;其设计理念源于人类…

数学建模美赛论文文档

目录 1. 摘要&#xff1a;1.1 阅读并理解题目1.2 背景介绍1.3 问题提出 2. 目录&#xff1a;2.1 引言&#xff08;Introduction&#xff09;2.2 假设与合理性说明&#xff08;Assumptions and Justifications&#xff09;2.3 符号说明&#xff08;Notations&#xff09;2.4 模型…

线下线上游戏电竞陪伴APP小程序H5同城线下约玩APP开发,语聊约玩平台搭建游戏陪玩APP源码

开发一款线下陪玩约玩APP的实际意义和在生活中的应用场景 1、满足社交需求:现代社会人们的社交圈往往受到时间、地点和其他限制的影响。线下陪玩约玩APP可以提供一个平台&#xff0c;让用户通过约玩的方式结识新朋友、扩大社交圈 2、解决孤独感:有些人由于工作忙碌、居住环境单…

论文阅读2-《Dynamic Multimodal Fusion》

摘要 &#xff08;DynMM&#xff09;&#xff0c;一种新的方法&#xff0c;自适应融合多模态数据和 d在推理过程中生成依赖于数据的前向路径。为此&#xff0c;我们提出了一种门控功能来提供基于多模态特征和一个的模态级或融合级决策提高计算效率的源感知损失函数。 细节 模…

185240-00G 同轴连接器

型号简介 185240-00G是Southwest Microwave的2.92 mm连接器。该连接器采用铍铜合金、工具钢和不锈钢等优质材料&#xff0c;并经过金镀层和钝化处理&#xff0c;确保其稳定可靠&#xff0c;经久耐用。它还兼容欧盟 RoHS 和 WEEE 指令&#xff0c;是一位环保使者&#xff0c;致力…

AI绘画Midjourney从入门到实战应用

大家好&#xff0c;我是爱编程的喵喵。双985硕士毕业&#xff0c;现担任全栈工程师一职&#xff0c;热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…

概率论习题

泊松分布习题 假设你在医院值班&#xff0c;每天需要安保人员出动的次数N~P(1),则关于任一天安保人员出动次数&#xff1a; A&#xff1a;出动一次的概率是多少 B&#xff1a;出动次数小于等于一次的概率为 C&#xff1a;出动次数小于一次的概率为 D&#xff1a;若随机事件发生…