SpringBoot集成RocketMQ

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

环境搭建:

采用docker-compose搭建,具体配置如下


version: '3'
services:# rocket mq name serverrmqnamesrv:image: apache/rocketmq:4.9.6restart: alwayscontainer_name: rocket-server# environment:#   JAVA_OPT_EXT: "-server -Xms64m -Xmx64m -Xmn64m"# volumes:# 映射本地目录权限一定要设置为 777 权限,否则启动不成功# - ../volumes/data/rocket/server/logs:/home/rocketmq/logsnetworks:- rocketmqports:- 9876:9876command: sh mqnamesrv# rocket mq brokerrmqbroker:image: apache/rocketmq:4.9.6restart: alwayscontainer_name: rocket-brokervolumes:# 映射本地目录权限一定要设置为 777 权限,否则启动不成功# - ../volumes/data/rocket/broker/logs:/home/rocketmq/logs# - ../volumes/data/rocket/broker/store:/home/rocketmq/store- ./config/broker.conf:/opt/rocketmq-4.9.6/conf/broker.confenvironment:- NAMESRV_ADDR=rmqnamesrv:9876# - JAVA_OPTS:=-Duser.home=/opt- JAVA_OPT_EXT=-server -Xms64m -Xmx64m -Xmn64mdepends_on:- rmqnamesrvnetworks:- rocketmqports:- 10909:10909- 10911:10911command: sh mqbroker -c /opt/rocketmq-4.9.6/conf/broker.conf# rocket console 这个可以不需要rmqdashboard:image: apacherocketmq/rocketmq-dashboard:1.0.0restart: alwayscontainer_name: rocket-dashboardenvironment:- JAVA_OPTS=-Drocketmq.config.namesrvAddr=rmqnamesrv:9876 -Dserver.port=8180 -Drocketmq.config.isVIPChannel=false# - JAVA_OPT_EXT=-Xms128m -Xmx128m -Xmn128mdepends_on:- rmqnamesrvnetworks:- rocketmqports:- 8180:8180networks:rocketmq:driver: bridgestack:driver: bridge

运行docker-compose:

docker-compose -f docker-compose-rocketmq.yml -p rocketmq up -d
注:修改 xx/rocketmq/rocketmq_broker/conf/broker.conf中配置brokerIP1为宿主机IP

访问地址:http://ip地址:8180:
在这里插入图片描述

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rocketmq</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.8.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><scope>test</scope></dependency></dependencies></project>

二、配置文件和启动类


server:port: 8088
#rocketmq配置
rocketmq:name-server: 10.11.68.77:9876# 生产者配置producer:isOnOff: on# 发送同一类消息的设置为同一个group,保证唯一group: hyh-rocketmq-groupgroupName: hyh-rocketmq-group# 服务地址namesrvAddr: 10.11.68.77:9876# 消息最大长度 默认1024*4(4M)maxMessageSize: 4096# 发送消息超时时间,默认3000sendMsgTimeout: 3000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}
}

三、实体类

public class Person {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}

四、listener监听

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "PERSON_ADD") //配置文件中:hyh-rocketmq-group
public class PersonMqListener implements RocketMQListener<Person>{@Overridepublic void onMessage(Person person){System.out.println("接收到消息,开始消费..name:" + person.getName() + ",age:" + person.getAge());}
}

五、工具util

@Component
public class RocketMqHelper{private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostConstructpublic void init(){LOG.info("---RocketMq助手初始化---");}//发送异步消息public void asyncSend(Enum topic,Message<?> message){asyncSend(topic.name(), message, getDefaultSendCallBack());}public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {asyncSend(topic.name(), message, sendCallback);}public void asyncSend(String topic, Message<?> message) {rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack());}public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {rocketMQTemplate.asyncSend(topic, message, sendCallback);}public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);}public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);}//发送给顺序消息public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {syncSendOrderly(topic.name(), message, hashKey);}public void syncSendOrderly(String topic, Message<?> message, String hashKey) {LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey);rocketMQTemplate.syncSendOrderly(topic, message, hashKey);}public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey + ",timeout:" + timeout);rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}//默认Callback函数private SendCallback getDefaultSendCallBack() {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOG.info("---发送MQ成功---");}@Overridepublic void onException(Throwable throwable) {throwable.printStackTrace();LOG.error("---发送MQ失败---"+throwable.getMessage(), throwable.getMessage());}};}@PreDestroypublic void destroy() {LOG.info("---RocketMq助手注销---");}}

六、测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTests {@Autowiredprivate RocketMqHelper rocketMqHelper;@Testpublic void testProducter() throws InterruptedException {for(int i=0;i<1000000;i++){Person person = new Person();person.setName("heyuhua");person.setAge(25);rocketMqHelper.asyncSend("PERSON_ADD", MessageBuilder.withPayload(person).build());Thread.sleep(1000);}}}

结果:


2024-02-02 15:20:27.101 INFO 11188 --- [ublicExecutor_3] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:28.116 INFO 11188 --- [ublicExecutor_4] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:29.130 INFO 11188 --- [ublicExecutor_5] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:30.131 INFO 11188 --- [ublicExecutor_7] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:31.142 INFO 11188 --- [ublicExecutor_8] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:32.156 INFO 11188 --- [ublicExecutor_9] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
2024-02-02 15:20:33.167 INFO 11188 --- [blicExecutor_10] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
2024-02-02 15:20:34.182 INFO 11188 --- [ublicExecutor_3] com.et59.rocketmq.util.RocketMqHelper : ---发送MQ成功---
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25
接收到消息,开始消费..name:heyuhua,age:25

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/724193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

对VisionPro的认识,CogPMAlingTool模板匹配工具练习

什么是VisionPro&#xff1f; 在认识VisionPro之前我们需要先熟悉一下图片的各种格式 这里我们可以参考来自githubcurry博主的文章 图片各种格式的区别以及计算机如何存储图片 VisionPro 是由世界领先的机器视觉公司 Cognex 开发的一款专业机器视觉软件。它提供了强大的图像…

【一】【SQL Server】如何运用SQL Server中查询设计器通关数据库期末查询大题

职工考勤20170320 职工考勤20170320数据库展示 职工考勤表展示 职务代码表展示 一、基本操作 代码方式&#xff1a; --第一大题、基本操作 ALTER TABLE [dbo].[职工考勤表] DROP COLUMN [照片];EXEC sp_rename dbo.职工考勤表.职工编号, 工号, COLUMN;ALTER TABLE 职工考勤表 A…

华为HQoS配置案例

HQoS基于层次化调度&#xff0c;cpe上支持三级队列&#xff1a; level3流队列&#xff1a;每个用户的同类业务是一个业务流&#xff0c;针对每个用户不同的业务流进行队列调度&#xff0c;流队列一般与业务类型对应&#xff08;EF、AF、BE等&#xff09;。 level2用户队列&…

2024年安卓开发者跳槽指南,音视频时代你还不会NDK开发

正式加入字节跳动&#xff0c;分享一点面试小经验 今天正式入职了字节跳动。工号超吉利&#xff0c;尾数是3个6。然后办公环境也很好&#xff0c;这边一栋楼都是办公区域。公司内部配备各种小零食、饮料&#xff0c;还有免费的咖啡。15楼还有健身房。而且公司包三餐来着。下午…

【YOLO v5 v7 v8 v9小目标改进】新CNN架构 InceptionNeXt:怎么让大卷积核既好用又快

新CNN架构 InceptionNeXt&#xff1a;怎么让大卷积核既好用又快 提出背景问题: 如何提高大核心卷积的效率&#xff0c;同时保持或提升模型性能&#xff1f; 改进思路MetaNeXtInception深度卷积InceptionNeXt 小目标涨点YOLO v5 魔改YOLO v7 魔改YOLO v8 魔改YOLO v9 魔改 提出背…

RUUFFY、由利、希亦内衣洗衣机怎么样?测评对比谁更好?

我们传统的洗衣机其实并不可以用来清洗内衣裤&#xff0c;以及袜子这类小型的衣物&#xff0c;容易出现细菌的交叉感染&#xff0c;而且传统洗衣机单单清洗这些小件衣物并不划算&#xff0c;需要很多的水量&#xff0c;有些全自动的大型洗衣机需要更多的衣物同时清洗才能正常工…

110. 平衡二叉树【简单】

110. 平衡二叉树【简单】 题目描述&#xff1a; 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a; 一个二叉树每个节点的左右两个子树的高度差的绝对值不超过 1 。 示例 1&#xff1a; 输入&#xff1a;r…

IDEA修改git提交者的信息

git提交后&#xff0c;idea会记录下提交人的信息&#xff0c;如果不修改提交人信息的话&#xff0c;会有一个默认值。避免每次提交都要填提交人信息&#xff0c;直接设置成自己想要的默认值&#xff0c;该怎么操作&#xff1f; 提交的时候在这里修改提交人信息 避免每次都去设置…

和鲸科技受邀参与湖南省气象信息中心开展人工智能研究型业务支撑平台学术交流

为推进湖南省机器学习统一平台建设&#xff0c;2 月 29 日&#xff0c;湖南省气象信息中心开展学术讲座活动&#xff0c;活动由中心副主任冯冼主持&#xff0c;中心业务骨干、湖南省气象台、湖南分院等技术人员参加。 本次讲座邀请上海和今信息科技有限公司&#xff08;简称“…

【企业动态】欢迎法国客户来访东胜物联,深入探讨智能化合作

本周&#xff0c;来自法国的客户莅临我司工厂进行实地参观考察。客户是一家历史悠久的设备供应商&#xff0c;其产品涵盖冷链、餐饮、农业等多个行业应用领域&#xff0c;正致力于从传统设备向智能设备转型&#xff0c;希望将设备接入物联网。在此次访问中&#xff0c;他们参观…

fastapi_mail发送邮件,邮件附件文件重命名

当fastapi_mail发送邮件的时候&#xff0c;想对附件的名称进行重命名 以下是fastapi_mail下的schemas.py的源码 import os from enum import Enum from mimetypes import MimeTypes from typing import Dict, List, Optional, Unionfrom pydantic import BaseModel, EmailStr,…

python大数据分析游戏行业中的 Apache Kafka:用例 + 架构!

python大数据分析游戏行业中的 Apache Kafka&#xff1a;用例 架构&#xff01; 这篇博文探讨了使用 Apache Kafka 的事件流如何提供可扩展、可靠且高效的基础设施&#xff0c;让游戏玩家开心并让游戏公司取得成功。讨论了游戏行业中的各种用例和架构&#xff0c;包括在线和移…

C++初阶篇----类与对象中卷

目录 引言1. 构造函数1.1概念1.2 特性 2. 析构函数2.1 概念2.2 特性 3. 拷贝构造函数3.1 概念3.2特征 4. 赋值运算符重载4.1 运算符重载4.2 赋值运算符重载4.3 前置和后置重载 5.日期类的实现6.const成员7.取地址及const取地址操作符重载 引言 当一个类既没有成员变量又没有成…

汉诺塔问题(C语言)

一&#xff1a;问题 汉诺塔&#xff08;Tower of Hanoi&#xff09;&#xff0c;又称河内塔&#xff0c;是一个源于印度古老传说的益智玩具。大梵天创造世界的时候做了三根金刚石柱子&#xff0c;在一根柱子上从下往上按照大小顺序摞着64片黄金圆盘。大梵天命令婆罗门把圆盘从…

QVector和QString互相转换

我的画图项目需要读写自定义虚线样式 {...comboBox_penStyle new QComboBox;QStringList SL_penStyle;SL_penStyle << "______" << "----------" << ".........." << "-.-.-.-.-." << "-..-..-..…

【OJ】日期差值与日期累加

个人主页 &#xff1a; zxctscl 如有转载请先通知 文章目录 1. KY111 日期差值1.1 题目分析1.2 代码 2. KY258 日期累加2.1 题目分析2.2 代码 1. KY111 日期差值 1.1 题目分析 日期之间比较可能会出现给的两个年月日都不相同&#xff0c;这个就不好作差&#xff0c;每个月给的…

LeetCode每日一题 二叉树的最大深度(二叉树)

题目描述 给定一个二叉树 root &#xff0c;返回其最大深度。二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3 示例 2&#xff1a; 输入&#xff1a;root [1,nul…

前端知识点、技巧、webpack、性能优化(持续更新~)

1、 请求太多 页面加载慢 &#xff08;webpack性能优化&#xff09; 可以把 图片转换成 base64 放在src里面 减少服务器请求 但是图片会稍微大一点点 以上的方法不需要一个一个自己转化 可以在webpack 进行 性能优化 &#xff08;官网有详细描述&#xff09;

【详识C语言】自定义类型之一:结构体

本文重点 结构体 结构体类型的声明 结构的自引用 结构体变量的定义和初始化 结构体内存对齐 结构体传参 结构体实现位段&#xff08;位段的填充&可移植性&#xff09; 结构体 结构体的声明 结构的基础知识 结构是一些值的集合&#xff0c;这些值称为成员变量。结构的每个…

在 SpringBoot3 中使用 Mybatis-Plus 报错

在 SpringBoot3 中使用 Mybatis-Plus 报错 Property ‘sqlSessionFactory’ or ‘sqlSessionTemplate’ are required Caused by: java.lang.IllegalArgumentException: Property sqlSessionFactory or sqlSessionTemplate are requiredat org.springframework.util.Assert.no…