Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动

为什么要使用MQ?

在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

image-20231227120405997

源码地址:Gitee

整合RocketMQ

依赖版本

  • JDK 17
  • Spring Boot 3.2.0
  • RocketMQ-Client 5.0.4
  • RocketMQ-Starter 2.2.0

可以参考这篇进行RocketMQ安装

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:
image-20231227062105302

参考配置文件

# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group,保证唯一group: event-mq-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false

参考Issue

  • 方法一 :通过@Import(RocketMQAutoConfiguration.class)在配置类中引入

  • 方法二:在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports
    文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题

import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}

RocketMQ操作工具

RocketMQ Message实体

import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {/*** 消息队列主题*/@NotBlank(message = "MQ Topic 不能为空")private String topic;/*** 延迟级别*/@Builder.Defaultprivate DelayLevel delayLevel = DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private List<T> messages;/*** 使用有序消息发送时,指定发送到队列*/private String hashKey;/*** 任务Id,用于日志打印相关信息*/@Builder.Defaultprivate String taskId = IdUtil.fastSimpleUUID();
}

RocketMQTemplate 二次封装

import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
@Slf4j
@Component
public class RocketMQService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;/*** 异步发送消息回调** @param taskId 任务Id* @param topic  消息主题* @return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());}};}/*** 发送同步消息,使用有序发送请设置HashKey** @param message 消息参数*/public <T> void syncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** @param message 消息参数*/public <T> void syncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息,异步返回消息结果** @param message 消息参数*/public <T> void asyncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** @param message 消息参数*/public <T> void asyncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;** @param message 消息参数*/public <T> void sendOneWay(RocketMQMessage<T> message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** @param message 消息体* @param batch   是否为批量操作*/public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]": "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}

定义RocketMQ消费者

import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("MQListener 接收消息 : {}", message);}
}

定义测试类发送消息

import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
@SpringBootTest
public class MQTest {@Resourceprivate RocketMQService rocketMQService;@Testpublic void sendMessage() {int count = 1;while (count <= 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count++).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}

测试

springboot3-RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/581426.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink on K8S生产集群使用StreamPark管理

&#xff08;一&#xff09;直接部署&#xff08;手动测试用&#xff0c;不推荐&#xff09; Flink on Native Kubernetes 目前支持 Application 模式和 Session 模式&#xff0c;两者对比 Application 模式部署规避了 Session 模式的资源隔离问题、以及客户端资源消耗问题&am…

使用 async-profiler 分析 CPU 和 内存使用情况

async-profiler 是非常主流的 Java Profiling 工具之一&#xff0c;且对 Linux 支持良好&#xff0c;适合分析运行在服务器上的 Java 应用程序在 CPU 和内存上的占用情况。本文介绍一下 async-profiler 的安装和使用方法。 1. 安装 wget https://github.com/jvm-profiling-too…

人工智能 机器学习 深度学习:概念,关系,及区别说明

如果过去几年&#xff0c;您读过科技主题的文章&#xff0c;您可能会遇到一些新词汇&#xff0c;如人工智能&#xff08;Artificial Intelligence&#xff09;、机器学习&#xff08;Machine Learning&#xff09;和深度学习&#xff08;Deep Learning&#xff09;等。这三个词…

Tuxera NTFS for Mac2024免费Mac读写软件下载教程

在日常生活中&#xff0c;我们使用Mac时经常会遇到外部设备不能正常使用的情况&#xff0c;如&#xff1a;U盘、硬盘、软盘等等一系列存储设备&#xff0c;而这些设备的格式大多为NTFS&#xff0c;Mac系统对NTFS格式分区存在一定的兼容性问题&#xff0c;不能正常读写。 那么什…

WPF+Halcon 培训项目实战(1-5):Halcon安装,图像处理,Halcon简单模板匹配

文章目录 前言相关链接项目专栏我个人对就业市场的评价Halcon安装实战1-4&#xff1a;Halcon基础实战5&#xff1a;模板匹配[形状匹配]实战代码 结尾 前言 为了更好地去学习WPFHalcon&#xff0c;我决定去报个班学一下。原因无非是想换个工作。相关的教学视频来源于下方的Up主…

案例189:基于微信小程序的高校教务管理系统设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder …

Eclipse安装Jrebel eclipse免重启加载项目

每次修改JAVA文件都需要重新启动项目&#xff0c;加载时间太长&#xff0c;eclipse安装jrebel控件,避免重启项目节省时间。 1、Help->Eclipse Marketplace 2、搜索jrebel 3、Help->jrebel->Configuration 配置jrebel 4、激活jrebel 5、在红色框中填入 http://jrebel…

ajax请求——XMLHttpRequest请求

个人练习笔记-----Ajax01 一、GET <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</t…

嵌入式高薪岗位解析——单机片开发

很多人都问&#xff0c;什么专业适合做单机片开发呢 那么以下是一些可能适合从事单片机开发岗位的专业&#xff1a; 电子信息工程专业物联网工程专业嵌入式技术与应用专业应用电子技术专业软件技术专业自动化控制专业信息工程专业电气自动化等相关专业 此外&#xff0c;从事…

Windows/Linux环境登入mysql、mysqldump命令等多方式解决方案之简易记录

Windows/Linux环境登入mysql、mysqldump命令等多方式解决方案之简易记录 之前发布过Window方式,这次结合以上主题,完善下Linux相关登入方式过程,纯属做个记录,有需要的朋友可以做个学习参考。 一、Windows环境提示“‘mysql’ 不是内部或外部命令,也不是可运行的程序或批…

赏金猎人必学站点(梯外)

What to learn? Technical- Computer Fundamentals https://www.comptia.org/training/by-certification/a https://www.youtube.com/watch?vtIfRDPekybU https://www.tutorialspoint.com/computer_fundamentals/index.htm https://onlinecourses.swayam2.ac.in/cec19_cs…

Spring Cloud Gateway 常见过滤器的基本使用

目录 1. 过滤器的作用 2. Spring Cloud Gateway 过滤器的类型 2.1 内置过滤器 2.1.1 AddResponseHeader 2.1.2 AddRequestHeader 2.1.3 PrefixPath 2.1.4 RequestRateLimiter 2.1.5 Retry 2.2 自定义过滤器 1. 过滤器的作用 过滤器通常用于拦截、处理或修改数据流和事…

Oracle database 静默安装 oracle12c 一键安装 12.1.0.2

基于oracle安装包中应答文件实现一键安装 注意此安装脚本基于12.1.0.2 安装包 原始安装包结构为两个压缩包 此脚本使用安装包为原始压缩包解压后、 重新封装为一个.zip压缩包 建议在linux 环境下解压重新压缩后 使用该脚本 支持环境: Linux :centerOS 7 oracle :12.1.0.…

Docker 概念介绍

1、Docker 简介 Docker一个快速交付应用、运行应用的技术: 可以将程序及其依赖、运行环境一起打包为一个镜像&#xff0c;可以迁移到任意Linux操作系统运行时利用沙箱机制形成隔离容器&#xff0c;各个应用互不干扰启动、移除都可以通过一行命令完成&#xff0c;方便快捷 Doc…

three.js实现3D汽车展厅效果展示

项目搭建 本案例还是借助框架书写three项目&#xff0c;借用vite构建工具搭建vue项目&#xff0c;搭建完成之后&#xff0c;用编辑器打开该项目&#xff0c;在终端执行 npm i 安装一下依赖&#xff0c;安装完成之后终端在安装 npm i three 即可。 因为我搭建的是vue3项目&…

【Python学习笔记(十)】串口被占用导致无法访问的解决办法

串口被占用导致无法访问的解决办法 前言正文1、封装串口打开函数2、解决过程3、实现效果 前言 在项目开发中需要用到串口进行通讯&#xff0c;但当有其他串口调试工具、串口助手等打开占用了某一端口&#xff0c;打开串口时会导致程序卡死&#xff0c;针对这一问题的出现及解决…

苹果手机打开Microsoft Outlook日历ics文件方法

作为一名经常需要处理各种日程安排的苹果用户&#xff0c;我深知ics文件的重要性。ics文件通常来自于我们日常使用的日历应用&#xff0c;比如Microsoft Outlook&#xff0c;是日程信息的标准格式。但很多时候&#xff0c;当我们尝试打开这些ics文件时&#xff0c;却会遇到种种…

gulimall-002 分布式基础概念

1、微服务概念 微服务是一种非常流行的架构风格。 拒绝大型单体应用&#xff0c;基于业务边界进行服务微化拆分&#xff0c;各个服务独立部署运行。 每个服务运行在自己的单个进程使用轻量级机制通信可以使用不同的编程语言编写以及不同的数据存储技术 2、集群&分布式&…

pyCharm 打印控制台中文乱码解决办法

解决方法 在 "File" -> "Settings" 中的控制台设置&#xff1a; 在 "File" -> "Settings" 中&#xff0c;你可以找到 "Editor" -> "General" -> "Console"。在这里&#xff0c;你可能会找到…

docker学习(十八、network介绍)

[TOC]添加链接描述 首先&#xff0c;我们要知道什么是 Docker 网络。简单来说&#xff0c;它就是 Docker 中用于实现容器间通信的一个东西。 network相关内容&#xff1a; docker学习&#xff08;十八、network介绍&#xff09; docker学习&#xff08;十九、network使用示例br…