Java的RocketMQ使用

在 Spring Boot 中,RocketMQ 和 Kafka 都是常用的消息中间件,它们的使用方法有一些相似之处,也有各自的特点。

一、RocketMQ 在 Spring Boot 中的使用

  1. 引入依赖

    • 在项目的pom.xml文件中添加 RocketMQ 的依赖。
    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
    </dependency>
    
  2. 配置 RocketMQ

    • application.propertiesapplication.yml文件中配置 RocketMQ 的相关参数,如 namesrvAddr(NameServer 地址)等。
    rocketmq.name-server=127.0.0.1:9876
    
  3. 生产者

    • 创建一个生产者类,使用@Resource注入RocketMQTemplate
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;@Component
    public class RocketMQProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
    }
    
  4. 消费者

    • 创建一个消费者类,使用@RocketMQMessageListener注解指定监听的主题和消费组。
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;@Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);}
    }
    

二、Kafka 在 Spring Boot 中的使用

  1. 引入依赖

    • pom.xml文件中添加 Kafka 的依赖。
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.12</version>
    </dependency>
    
  2. 配置 Kafka

    • application.propertiesapplication.yml文件中配置 Kafka 的相关参数,如 bootstrapServers(Kafka 服务器地址)等。
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    
  3. 生产者

    • 创建一个生产者类,使用@Resource注入KafkaTemplate
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;@Component
    public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
    }
    
  4. 消费者

    • 创建一个消费者类,使用@KafkaListener注解指定监听的主题和消费组。
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;@Component
    public class KafkaConsumer {@KafkaListener(topics = "your_topic", groupId = "your_consumer_group")public void onMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);}
    }
    

总的来说,RocketMQ 和 Kafka 在 Spring Boot 中的使用都比较方便,具体选择哪种消息中间件可以根据项目的实际需求来决定。RocketMQ 在一些场景下可能具有高吞吐量、低延迟等优势,而 Kafka 则在大规模分布式系统中被广泛应用,具有高可靠性和可扩展性。

二、如何保证消息队列顺序性

1、发送端保证顺序性

  1. 合理设计业务

    • 确保具有顺序性要求的消息被发送到同一个主题(Topic)的同一个队列(Queue)中。比如,将同一类业务的消息按照特定规则进行分类,使得它们都进入相同的队列。
    • 一个业务场景的消息尽量由一个发送端来发送消息,避免多个发送端发送可能导致的乱序。
  2. 使用同步发送

    • 在发送消息时,使用同步发送方式send(Message msg, long timeout),确保消息成功发送后再进行下一个消息的发送。这样可以避免异步发送可能导致的消息乱序情况。

2、消费端保证顺序性

  1. 单线程消费

    • 消费者在消费消息时,采用单线程的方式进行消费。这样可以确保同一队列中的消息按照发送的顺序被依次处理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理接收到的消息System.out.println("Received message: " + message);}
    }
    

    在实际应用中,可以将消费逻辑放在一个单独的方法中,然后在这个方法中进行顺序处理,确保消息的顺序性。

  2. 避免并发处理

    • 确保在消费消息的过程中,不会出现并发处理的情况。比如,不要在消费消息的同时启动其他异步任务或者多线程处理,以免破坏消息的顺序性。

3、设置队列数量

  1. 控制队列数量
    • 如果业务对消息顺序性要求非常严格,可以考虑减少主题下的队列数量。通常情况下,一个主题可以包含多个队列,消息会被随机分发到不同的队列中。如果队列数量较少,那么消息更有可能被发送到同一个队列中,从而更容易保证顺序性。

通过以上方法,可以在一定程度上保证 RocketMQ 消息的顺序性。但需要注意的是,保证消息顺序性可能会牺牲一定的性能和吞吐量,因此需要根据实际业务需求进行权衡和选择。

一、如何确保消息队列的可靠性

1、发送端

  1. 同步发送与确认

    • 使用同步发送方式send(Message msg, long timeout),该方法会等待消息发送成功的确认,确保消息被正确地发送到 Broker。如果发送失败或超时,可以进行重试或其他错误处理操作。
    try {SendResult sendResult = rocketMQTemplate.syncSend(topic, message);System.out.println("Message sent successfully: " + sendResult);
    } catch (Exception e) {System.out.println("Failed to send message: " + e.getMessage());// 进行重试或其他错误处理
    }
    
  2. 事务消息

    • 对于一些需要保证事务一致性的场景,可以使用 RocketMQ 的事务消息机制。发送事务消息分为两个阶段,首先发送半事务消息,然后执行本地事务,根据本地事务的结果决定提交或回滚事务消息。
    @Service
    public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage() {TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transactionTopic", new Message<>("transactionMessage"), null);System.out.println("Transaction message sent: " + result);}
    }
    

2、Broker 端

  1. 持久化存储

    • RocketMQ 支持消息的持久化存储,可以将消息存储在磁盘上,以防止消息丢失。通过配置broker.conf文件中的flushDiskType参数,可以选择同步刷盘或异步刷盘方式。同步刷盘可以保证消息在写入磁盘后才返回成功响应,但会影响性能;异步刷盘可以提高性能,但在系统故障时可能会丢失部分未刷盘的消息。
  2. 高可用部署

    • 部署多主多从的 RocketMQ 集群,当主节点出现故障时,从节点可以自动切换为主节点,保证消息服务的可用性。同时,可以配置主从同步方式,确保消息在主从节点之间的可靠同步。

3、消费端

  1. 消费确认

    • 消费者在成功处理消息后,需要向 Broker 发送消费确认。可以通过设置consumeModeCONSUME_PASSIVELY(被动消费模式),并在处理完消息后手动调用acknowledge()方法进行确认。如果消费失败,可以选择重试或者将消息发送到死信队列进行后续处理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {try {// 处理消息System.out.println("Received message: " + message);// 确认消费成功getRocketMQListenerContainer().acknowledge();} catch (Exception e) {System.out.println("Failed to process message: " + e.getMessage());// 可以选择重试或者发送到死信队列}}
    }
    
  2. 重试机制

    • 配置消费者的重试次数和重试时间间隔,当消费失败时,RocketMQ 会自动进行重试。可以在application.propertiesapplication.yml中配置rocketmq.retry.timesrocketmq.retry.interval参数来控制重试策略。

通过以上措施,可以在不同阶段保证 RocketMQ 消息的可靠性,确保消息在生产、存储和消费过程中不会丢失或出现错误。

三、保证消息处理的幂等性
在 RocketMQ 中,可以通过以下几种方式来保证消息处理的幂等性:

1、业务层面设计

  1. 使用唯一标识

    • 在业务中为每条消息生成一个唯一的标识,比如使用业务流水号、订单号等作为消息的唯一标识。在消费消息时,先根据这个唯一标识判断该消息是否已经被处理过。如果已经处理过,则直接忽略该消息。
    • 例如在电商系统中,订单创建的消息可以使用订单号作为唯一标识。消费者在处理消息时,先查询数据库中是否存在该订单号对应的处理记录,如果存在则说明该消息已经被处理过,不再重复处理。
    @Service
    public class OrderProcessingService {@Autowiredprivate JdbcTemplate jdbcTemplate;public void processOrderMessage(String orderId) {boolean isProcessed = isOrderProcessed(orderId);if (isProcessed) {return;}// 处理订单逻辑System.out.println("Processing order: " + orderId);markOrderAsProcessed(orderId);}private boolean isOrderProcessed(String orderId) {int count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM processed_orders WHERE order_id =?",Integer.class, orderId);return count > 0;}private void markOrderAsProcessed(String orderId) {jdbcTemplate.update("INSERT INTO processed_orders (order_id) VALUES (?)",orderId);}
    }
    
  2. 利用数据库约束

    • 可以在数据库中使用唯一索引、主键约束等方式来保证业务数据的唯一性。在处理消息时,如果违反了这些约束,则说明该消息已经被处理过,不再重复处理。
    • 比如在用户注册的场景中,可以在数据库的用户表中使用用户名或邮箱作为唯一索引。当消费用户注册的消息时,尝试插入用户数据,如果插入失败(因为违反唯一索引约束),则说明该用户已经注册过,不再重复处理。
    @Service
    public class UserRegistrationService {@Autowiredprivate JdbcTemplate jdbcTemplate;public void registerUser(String username, String password) {try {jdbcTemplate.update("INSERT INTO users (username, password) VALUES (?,?)",username, password);} catch (DataIntegrityViolationException e) {// 处理插入失败的情况,可能是用户已存在System.out.println("User already exists: " + username);}}
    }
    

2、技术层面实现

  1. 分布式锁
    • 可以使用分布式锁来保证同一时间只有一个消费者实例在处理特定的消息。在处理消息之前,先获取分布式锁,如果获取成功则处理消息,处理完成后释放锁。如果获取锁失败,则说明该消息正在被其他实例处理,当前实例可以选择等待或者直接忽略该消息。
    • 可以使用 Redis 或 Zookeeper 等实现分布式锁。以 Redis 为例,可以使用 SETNX 命令来实现分布式锁。
    @Service
    public class MessageProcessingService {@Autowiredprivate StringRedisTemplate redisTemplate;public void processMessage(String messageId) {String lockKey = "message_lock_" + messageId;boolean locked = tryLock(lockKey);if (!locked) {return;}try {boolean isProcessed = isMessageProcessed(messageId);if (isProcessed) {return;}// 处理消息逻辑System.out.println("Processing message: " + messageId);markMessageAsProcessed(messageId);} finally {releaseLock(lockKey);}}private boolean tryLock(String key) {return redisTemplate.opsForValue().setIfAbsent(key, "locked", Duration.ofSeconds(30));}private void releaseLock(String key) {redisTemplate.delete(key);}private boolean isMessageProcessed(String messageId) {// 判断消息是否已处理的逻辑return false;}private void markMessageAsProcessed(String messageId) {// 标记消息已处理的逻辑}
    }
    

通过以上方法,可以有效地保证 RocketMQ 消息处理的幂等性,避免因重复消费消息而导致的业务数据不一致问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【优选算法篇】踏入算法的深邃乐章:滑动窗口的极致探秘

文章目录 C 滑动窗口详解&#xff1a;进阶题解与思维分析前言第二章&#xff1a;进阶挑战2.1 水果成篮解法一&#xff1a;滑动窗口解法二&#xff1a;滑动窗口 数组模拟哈希表复杂度分析&#xff1a;图解分析&#xff1a;示例&#xff1a;滑动窗口执行过程图解&#xff1a; 详…

C for Graphic:径向模糊

最近要做一系列特效需求&#xff0c;顺便记录一下。 径向模糊&#xff08;也叫辐射模糊&#xff09;&#xff1a;一种由内向外发散的模糊的效果 原理&#xff1a;获取中心点&#xff08;centeruv&#xff09;到当前像素&#xff08;pixeluv&#xff09;的朝向法向…

RFC2616 超文本传输协议 HTTP/1.1

一、URL-俗称“网址” HTTP 使用 URL(Uniform Resource Locator&#xff0c;统一资源定位符)来定位资源&#xff0c;它是 URI(Uniform Resource Identifier&#xff0c;统一资源标识符)的子集&#xff0c;URL 在 URI 的基础上增加了定位能力 URI 除了包含 URL&#xff0c;还包…

车载实操:一对一实操学习、CANoe实操学习、推荐就业机会、就业技术支持、协助面试辅导

FOTA模块中OTA的知识点&#xff1a;1.测试过程中发现哪几类问题&#xff1f; 可能就是一个单键的ecu&#xff0c;比如升了一个门的ecu&#xff0c;他的升了之后就关不上&#xff0c;还有就是升级组合ecu的时候&#xff0c;c屏上不显示进度条。 2.在做ota测试的过程中&#xff…

5G NR:UE初始接入信令流程浅介

UE初始接入信令流程 流程说明 用户设备&#xff08;UE&#xff09;向gNB-DU发送RRCSetupRequest消息。gNB-DU 包含 RRC 消息&#xff0c;如果 UE 被接纳&#xff0c;则在 INITIAL UL RRC MESSAGE TRANSFER 消息中包括为 UE 分配的低层配置&#xff0c;并将其传输到 gNB-CU。IN…

PFC和LLC的本质和为什么要用PFC和LLC电路原因

我们可以用电感和电容的特性,以及电压和电流之间的不同步原理来解释PFC(功率因数校正)和LLC(谐振变换器)。 电感和电容的基本概念 电感(Inductor): 电感是一种储存电能的组件。它的电流变化比较慢,电流在电感中延迟,而电压变化得比较快。可以把电感想象成一个“滞后…

『Mysql集群』Mysql高可用集群之主从复制 (一)

Mysql主从复制模式 主从复制有一主一从、主主复制、一主多从、多主一从等多种模式. 我们可以根据它们的优缺点选择适合自身企业情况的主从复制模式进行搭建 . 一主一从 主主复制 (互为主从模式): 实现Mysql多活部署 一主多从: 提高整个集群的读能力 多主一从: 提高整个集群的…

transformers 推理 Qwen2.5 等大模型技术细节详解(一)transformers 初始化和对象加载(文末免费送书)

上周收到一位网友的私信&#xff0c;希望老牛同学写一篇有关使用 transformers 框架推理大模型的技术细节的文章。 老牛同学刚开始以为这类的文章网上应该会有很多&#xff0c;于是想着百度几篇质量稍高一点的回复这位网友。结果&#xff0c;老牛同学搜索后发现&#xff0c;类…

信息与计算科学:“数学 + 计算机”,奏响未来科技新乐章

在当今科技飞速发展的时代&#xff0c;有一个专业如同一颗闪耀的新星&#xff0c;散发着独特的魅力&#xff0c;那就是信息与计算科学专业。 一、专业全貌&#xff1a;追根溯源&#xff0c;领略交叉之美 &#xff08;一&#xff09;专业的诞生与发展 1998 年&#xff0c;教育…

一图解千言,了解常见的流程图类型及其作用

在企业管理、软件研发过程中&#xff0c;经常会需要进行各种业务流程梳理&#xff0c;而流程图就是梳理业务时必要的手段&#xff0c;同时也是梳理的产出。但在不同的情况下适用的流程图又不尽相同。 本文我们就一起来总结一下8 种最常见的流程图类型 数据流程图 数据流程图&…

RHCE——例行性工作

准备工作 [rootlocalhost ~]# cat /etc/yum.repos.d/aliyun.repo [ali-app] nameali-app baseurlhttps://mirrors.aliyun.com/centos-stream/9-stream/AppStream/x86_64/os/ gpgcheck0[ali-base] nameali-base baseurlhttps://mirrors.aliyun.com/centos-stream/9-stream/Base…

JS | JS中类的 prototype 属性和__proto__属性

大多数浏览器的 ES5 实现之中&#xff0c;每一个对象都有__proto__属性&#xff0c;指向对应的构造函数的prototype属性。Class 作为构造函数的语法糖&#xff0c;同时有prototype属性和__proto__属性&#xff0c;因此同时存在两条继承链。 构造函数的子类有prototype属性。‌ …

倍福中控显示屏维修控制面板CP7732-1207-0030

使用的环境条件不当可能会损坏设备。 保护设备&#xff0c;防止灰尘、湿气和热量进入。 使用注意事项&#xff1a; 空气流通不畅 设备安装不正确会阻碍设备内的空气流通&#xff0c;从而导致过热和功能受损。 只能按所示方向将设备安装在相应的壁上。 该设备设计用于安装在…

05 P1157 组合的输出

题目&#xff1a; 代码&#xff1a; #include<iostream> using namespace std; # define M 500 #include<algorithm>int sa[100005],k,n,count1;bool func(int n) {int mark0;if(n1){return 1;}else{for(int i2;i<n-1;i){if(n%i0){mark1;return 0;}}if(mark0)r…

强化学习案例:美团是如何在推荐系统中落地强化学习

目录 美团的强化学习应用场景和分析 场景举例 使用原因 强化学习的六大要素 智能体 环境 行动 奖励 目标 状态 美团强化学习模型设计 美团强化学习工程落地 总体的数据结构关系图 实现步骤 1. 日志收集与实时处理&#xff08;Log Collector, Online Joiner&…

PyTorch 2.5 发布带来一些新特性和改进

官网&#xff1a;https://github.com/pytorch/pytorchGitHub&#xff1a;https://github.com/pytorch/pytorch原文&#xff1a;https://github.com/pytorch/pytorch/releases/tag/v2.5.0 主要亮点 (Highlights)] SDPA CuDNN 后端&#xff1a;为 torch.nn.functional.scaled_d…

C++标准模板库--vector

vector 介绍 vector&#xff08;向量&#xff09;是一种序列容器&#xff0c;表示为可以改变大小的数组。vector中的元素使用连续的存储位置&#xff0c;这意味着也可以使用指向其元素的常规指针偏移量来访问任意元素&#xff0c;且与数组一样高效。但与数组不同的是&#xff…

React Componet类组件详解(老项目)

React类组件是通过创建class继承React.Component来创建的&#xff0c;是React中用于构建用户界面的重要部分。以下是对React类组件的详细解释&#xff1a; 一、定义与基本结构 类组件使用ES6的class语法定义&#xff0c;并继承自React.Component。它们具有更复杂的功能&#…

流量PID控制(开度前馈量计算+辅助PID)

和流体流速(瞬时流量)相关的计算请参考下面文章链接: 1、PLC通过伯努利方程近似计算水箱流量 PLC通过伯努利方程近似计算水箱流量(FC)-CSDN博客文章浏览阅读1.6k次。本文介绍了如何使用PLC通过伯努利方程近似计算水箱中的液体流量,主要涉及流量计算、模型验证、梯形图编程及…

C++学习路线(二十)

项目 模块划分 推箱子游戏 地图初始化 热键控制 推箱子控制 游戏结束 地图初始化 坐标系&#xff08;650&#xff0c;650&#xff09; 地图表示&#xff1a; 使用二维数组 游戏道具展示&#xff08;墙 箱子 箱子目的地 小人 地板&#xff09; 判断游戏…