RocketMQ发送和接收方式详解

RocketMQ有几种发送方式

RocketMQ 提供了几种不同的消息发送方式,以满足不同场景下的需求。这些发送方式主要包括:同步发送(Synchronous):这是最常见的一种发送方式,客户端发送消息后,会等待服务器端的响应。只有当消息完全被服务器接收,客户端才会继续执行。这种方式适用于对可靠性要求较高的场景。异步发送(Asynchronous):客户端在发送消息后不会立即等待服务器的响应,而是通过一个回调接口来处理服务器的响应,这样可以提高消息发送的吞吐量。这种方式适用于对响应时间要求较敏感的场景。单向发送(One-way):这种方式是最轻量级的一种发送方式,客户端只负责发送消息,不等待服务器的响应,也不关心消息是否被服务器接收。这种方式的可靠性最低,但吞吐量最高,适用于对可靠性要求不高的日志收集等场景。顺序发送(Ordered):RocketMQ 还支持顺序发送消息,确保特定主题的消息能按照发送的顺序被消费。这对于需要保证消息顺序的业务场景(如订单处理)非常重要。**顺序必须是同步的**这些发送方式可以根据具体的业务需求和场景来选择使用,以达到最佳的性能和可靠性平衡。

不同发送方式的调用方式或者不同配置

在使用 RocketMQ 进行消息发送时,可以根据不同的发送需求选择合适的发送方式。以下是不同发送方式的基本调用方式或配置示例:

1. 同步发送(Synchronous)
同步发送是指消息发送方将消息发送给服务器后,会等待服务器的响应,确认消息已经被接收后才继续执行后续操作。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// 调用send方法发送消息,并获取发送结果
SendResult sendResult = producer.send(msg);// 打印发送结果
System.out.printf("%s%n", sendResult);// 当不再发送消息时,关闭Producer实例
producer.shutdown();

2. 异步发送(Asynchronous)
异步发送是指消息发送方发送消息后,不会立即等待服务器的响应,而是提供一个回调接口,服务器响应时通过回调接口来通知发送方。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送消息
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功的回调System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// 消息发送失败的回调System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}
});// 关闭Producer实例
producer.shutdown();

3. 单向发送(One-way)
单向发送是指消息发送方只负责发送消息,不等待服务器的响应也不关心消息是否被服务器接收,适用于对可靠性要求不高的场景。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));// 单向发送消息,没有返回结果
producer.sendOneway(msg);// 关闭Producer实例
producer.shutdown();

4. 顺序发送(Ordered)
顺序发送保证了特定主题的消息能按照发送的顺序来消费。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();String[] tags = new String[]{"TagA", "TagB", "TagC"};
// 订单列表
List<OrderStep> orderList = new ArrayList<OrderStep>();
orderList.add(new OrderStep(15103111039L, "创建"));
orderList.add(new OrderStep(15103111065L, "支付"));
orderList.add(new OrderStep(15103111039L, "完成"));for (int i = 0; i < 10; i++) {// 加入订单的时间戳来模拟简单的订单IDString body = orderList.get(i % orderList.size()) + " 时间:" + System.currentTimeMillis();Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;  // 根据订单id选择发送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i % orderList.size()).getOrderId());  // 订单idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));
}// 关闭Producer实例
producer.shutdown();

每种发送方式都有其适用的场景,可以根据实际的业务需求选择使用。在实际开发中,需要根据具体的业务逻辑对示例代码进行适当的调整和优化。

不同接收方式的调用方式或者不同配置

1. 集群消费(Clustering)
集群消费模式下,同一个消费者组中的多个消费者实例共同消费主题下的消息,每条消息只会被消费一次。

// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 设置NameServer地址
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {// 消费消息System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));}// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
// 启动消费者实例
consumer.start();

2. 广播消费(Broadcasting)
广播消费模式下,消息会被消费者组中的每个消费者都消费一次。

// 实例化消费者,并设置消费模式为广播模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();

3. 顺序消费(Orderly)
顺序消费保证了特定主题的消息能按照发送的顺序来消费。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s, QueueId: %d %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getQueueId());}return ConsumeOrderlyStatus.SUCCESS;}
});
consumer.start();

4. 延迟消费
延迟消费不是通过消费者的特定设置来实现的,而是在发送消息时设置消息的延迟等级。

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("nameserver1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes());
// 设置延迟等级3,这会使消息延迟10s再被消费
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
producer.shutdown();

接收延迟消息的代码与普通消息的接收方式相同,不需要特殊配置。

5. 重试和死信队列
对于处理失败的消息,RocketMQ 会自动重试,无需特别配置。如果重试次数达到上限仍然失败,消息会被转移到死信队列。消费死信队列中的消息需要订阅特定的Topic(%DLQ%+消费者组名)。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅死信队列
consumer.subscribe("%DLQ%ConsumerGroupName", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.printf("DLQ Message: %s %n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();

不同的接收方式适用于不同的业务场景,可以根据实际需求选择最合适的方式。在实际应用中,可能需要结合业务逻辑对示例代码进行适当的调整和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/755852.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python-GEE绘制DEM精美图片

目录 上传矢量和DEM获取添加颜色条参考文章 先连接上GEE的自己的项目 import ee import geemap geemap.set_proxy(port33210) ee.Authenticate() ee.Initialize(projecta-flyllf0313)上传矢量和DEM获取 使用Google Earth Engine&#xff08;GEE&#xff09;和Google Earth Eng…

基于单片机的模糊PID炉温控制系统设计

摘 要 电热炉是在工业热处理的生产中广泛使用的一种设备&#xff0c;电热炉的温度控制系统存在时变性&#xff0c;非线性&#xff0c;滞后性等特征&#xff0c;难以用常规PID的控制器对系统达到很好的控制效果。当控温精度的要求高时&#xff0c;使用传统的控制理论方法难以达…

亮相AWE 2024,日立中央空调打造定制空气新体验

日立中央空调于3月14日携旗下空气定制全新成果&#xff0c;亮相2024中国家电及消费电子博览会&#xff08;简称AWE 2024&#xff09;现场&#xff0c;围绕“科创先行 智引未来”这一主题&#xff0c;通过技术与产品向行业与消费者&#xff0c;展现自身对于家居空气的理解。 展会…

kanzi颜色工作流程

线性和非线性伽玛色彩空间 RGB 颜色空间的目的是表示在计算机显示器上显示的颜色。目前&#xff0c;sRGB是非线性伽玛色彩空间的标准。之所以需要它&#xff0c;是因为人类对光的感知是非线性的&#xff0c;而且计算机显示器对光强度具有非线性响应。 人眼比浅色更能区分深色…

Android 13 源码编译及报错修复

下载AOSP指定分支 repo init -u git://aosp../platform/manifest -b android-13.0.0_r83 同步代码到本地 repo sync -c 初始化编译环境, 选择构建目标 source build/envsetup.sh lunch 选择需要构建的目标&#xff0c;此处以aosp_arm64-eng为例 进行固件编译 make -j12 期间编译…

力扣热门算法题 49. 字母异位词分组,50. Pow(x, n),51. N 皇后

49. 字母异位词分组&#xff0c;50. Pow(x, n)&#xff0c;51. N 皇后&#xff0c;每题做详细思路梳理&#xff0c;配套Python&Java双语代码&#xff0c; 2024.03.19 可通过leetcode所有测试用例。 目录 49. 字母异位词分组 解题思路 完整代码 python Java 50. Pow(x…

接口测试工具:Postman详解

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号【互联网杂货铺】&#xff0c;回复 1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Postman 是一款功能强大的 API 开发和测试工具&#xff0c;以下…

【JDK原理】类加载约束条件

JVM简介 JVM&#xff08;Java虚拟机&#xff09;是Java程序的运行环境&#xff0c;它负责将Java字节码加载到内存中并执行。在JVM中&#xff0c;类加载是一个重要的过程&#xff0c;它负责将类的字节码加载到内存中&#xff0c;并对类进行验证、准备和解析&#xff0c;最终生成…

STM32F411 Micropython使用日记

1、开发板购买&#xff1a;推荐淘宝“无名科技Nologo” 19.8包邮到手&#xff1b;买开发板还需要买SPI NorFlash&#xff0c;推荐8MB的flash&#xff0c;不懂的可以问卖家&#xff0c;买回来需要焊接好&#xff0c;也可以找店家试试看能不能帮忙焊接&#xff0c;不然micropytho…

Java中加减乘除运算工具类

Java 中的 double 类型不能用于精确的加减乘除运算&#xff0c;这是因为计算机使用二进制来表示浮点数&#xff0c;而二进制无法精确表示所有十进制数。 原因&#xff1a; 浮点数由两部分组成&#xff1a;指数和尾数。指数表示浮点数的大小&#xff0c;尾数表示浮点数的精度。…

八节【DBA从入门到实践】课程,带你快速掌握OceanBase运维管理核心技能

为帮助用户及开发者更好、更快地掌握OceanBase DBA核心技能&#xff0c;OceanBase社区设计了配套教程——“DBA从入门到实践”。8期教程带大家循序渐进掌握OceanBase运维管理核心技能。搭配随堂习题和OceanBase技术专家在线答疑&#xff0c;快速掌握重要知识点&#xff0c;并轻…

【DL经典回顾】激活函数大汇总(二十七)(Bent Identity附代码和详细公式)

激活函数大汇总&#xff08;二十七&#xff09;&#xff08;Bent Identity附代码和详细公式&#xff09; 更多激活函数见激活函数大汇总列表 一、引言 欢迎来到我们深入探索神经网络核心组成部分——激活函数的系列博客。在人工智能的世界里&#xff0c;激活函数扮演着不可或…

直观与交互:山海鲸可视化软件与Excel传统表格的对比

作为一名长期使用Excel进行数据处理和分析的用户&#xff0c;最近我尝试了一款名为山海鲸的可视化软件&#xff0c;发现它与Excel传统表格之间存在诸多明显的差异。接下来&#xff0c;我将从个人体验视角出发&#xff0c;谈谈这两种工具的不同之处。 首先&#xff0c;从数据呈…

汇编语言和IBM的关系

一 缺乏汇编的硬件没有灵魂 1964年&#xff0c;在IBM没有发明System 360大型计算机之前&#xff0c;IBM已经发明了很多计算机。如IBM 1952年发布的第一台商用计算机&#xff1a;701计算机。1959年&#xff0c;IBM首次利用晶体管、磁芯存储器、印刷电路技术&#xff0c;发明了小…

家谱系统的app的主要功能介绍

家谱系统的app通常具备一系列功能&#xff0c;这些功能旨在帮助用户更好地记录、管理和分享家族的历史和文化。以下是一些常见的家谱系统app的功能介绍&#xff1a; 家谱查看&#xff1a;用户可以通过app登录自己的账号&#xff0c;查看对应的家谱信息。这包括家族成员的基本信…

组件化开发

一、引言 Vue.js 的组件化开发是其核心特性之一&#xff0c;它允许开发者将复杂的界面拆分为可复用的、独立的、小的组件&#xff0c;从而提高开发效率和代码的可维护性。 二、关键点 1.组件的定义 在components下创建.vue文件timecard.vue用来编辑内容。 文件创建完毕后&am…

Linux Shell中的echo命令详解

Linux Shell中的echo命令详解 在Linux Shell中&#xff0c;echo命令是一个常用的内置命令&#xff0c;用于在终端上显示文本或字符串。它主要用于显示变量的值&#xff0c;创建文件的内容&#xff0c;或者简单地输出一些信息。在本文中&#xff0c;我们将详细探讨echo命令的用…

迷宫问题(c++题解)

题目描述 设有一个 N*N(2<N<10)方格的迷宫&#xff0c;入口和出口分别在左上角和右上角。迷宫格子中 分别放 0 和 1&#xff0c;0 表示可通&#xff0c;1 表示不能&#xff0c;入口和出口处肯定是 0。迷宫走的规则如下所示&#xff1a; 即从某点开始&#xff0c;有八个方…

000_【基础篇】SpringBoot概述

介绍 springboot 是 spring 提供的一个子项目&#xff0c;用于快速构建 spring 应用程序 传统的 SSM 框架要导入很多依赖的 jar 包以及配置很多的配置文件&#xff0c;麻烦、繁琐 springboot 特性 springboot 主要&#xff08;还有其他的一些特性&#xff09;有起步依赖和…

精酿啤酒:开启时尚派对的钥匙

Fendi club啤酒&#xff0c;一个代表着时尚与品味的品牌&#xff0c;如今进入了啤酒市场&#xff0c;推出了名为“Fendi club”的啤酒。这一创新的举措不仅展现了品牌的多元化发展&#xff0c;更为消费者提供了一种全新的时尚生活方式。 Fendi club啤酒不仅仅是一种产品&#x…