RocketMQ—RocketMQ集成SpringBoot

RocketMQ—RocketMQ集成SpringBoot

新建生产者的boot项目和消费者的boot项目,pom文件重点如下:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins>
</build>

02-boot-producer和03-boot-consumer分别对应生产者和消费者。

项目结构

生产者

生产者yml文件如下:

rocketmq:name-server: 地址:端口producer:group: boot-producer-group

同步发送消息

生产者同步发送消息的代码如下:

@SpringBootTest
class Rocketmq02BootProducerApplicationTests {//注入rocketMQTemplate@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void producer(){rocketMQTemplate.syncSend("bootTestTopic","这是boot的一个消息");}}

运行完毕看面板如下:

面板

发送异步消息

// 异步
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失败" + throwable.getMessage());}
});

发送单向消息

rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

延迟消息

// 延迟消息
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3); //第三个参数表示连接消息队列的超时时间,第四个参数表示延时等级

顺序消息

MSGModel类如下

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {private String orderSn;private Integer userId;private String desc; // 下单 短信 物流}

发送顺序消息的生产者如下:

//发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 发送  一般都是以json的方式进行处理rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());//第二个参数表示消息内容	第三个参数表示hashKey
});

带标签的消息

@Test
void tagKeyTest() throws Exception {rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
}

带key的消息

@Test
void tagKeyTest() throws Exception {// key是写带在消息头的Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "key-id-1").build();rocketMQTemplate.syncSend("bootKeyTopic", message);}

消费者

yml配置文件如下:

server:port: 8890
rocketmq:name-server: 地址:端口

简单消费者

消费者代码如下

@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-test-consumer-group")
public class ASimpleMsgListener implements RocketMQListener<MessageExt> {//如果泛型指定固定类型,消息体就是我们的参数//MessageExt 是消息所有内容,可以拿到所有内容/*** 这个方法就是消费消息的方法* 只要没有报错,就签收了* 如果报错了,就是拒收,就会重试* @param message 是消息内容*/@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}

运行结果

顺序消息的消费者

@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}

带tag的消费者

@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// tag过滤模式selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式  这种一般不用,这种默认没有开启,需要在sql92 //需要在broker.conf配置文件中开启enbalePropertyFilter=true
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/722361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker中使用nginx-rtmp推拉网络摄像头视频流

前言&#xff1a; 该部分比较麻烦&#xff0c;闹腾了好久&#xff08;ffmpeg推拉流没学过&#xff0c;事实证明依葫芦画瓢是不行滴&#xff0c;后面有时间再学吧&#xff09;&#xff0c;后来借助chatGPT勉强解决&#xff0c;但不是很懂。因个人能力有限&#xff0c;只复述操作…

Starknet(strk) 跨链桥教程:手把手教你用bitget钱包跨链

摘要&#xff1a;通过Rhino.fi &#xff0c;将资产无缝桥接至Starknet&#xff08;web3.bitget.com/zh/assets/starknet-wallet&#xff09;变得高效且具有成本效益&#xff0c;Rhino.fi 是一个以其快速处理时间和低交易费用而闻名的平台。它专为与 MetaMask 等流行的 Web 3 钱…

长整数拼接后求余数

原题&#xff1a;2024牛客寒假算法基础集训营3 M题 方法一&#xff1a; 用递归进行长整数除法&#xff0c;暴力寻找&#xff0c;但是会超时 #include <bits/stdc.h> using namespace std; bool f(string &s, int ans,int count) {if (count > s.size()) {if (a…

Newsmy储能电源与您相约九州汽车生态博览

2024年3月7日—10日&#xff0c;第24届 深圳国际智慧出行、汽车改装及汽车服务业态博览会&#xff08;以下简称“九州汽车生态博览会”&#xff09;将在深圳国际会展中心&#xff08;宝安&#xff09;举办&#xff0c;Newsmy纽曼集团将在3号馆32523展位&#xff0c;携全系产品与…

2024年3月6日 十二生肖 今日运势

小运播报&#xff1a;2024年3月6日&#xff0c;星期三&#xff0c;农历正月廿六 &#xff08;甲辰年丁卯月己巳日&#xff09;&#xff0c;法定工作日。 红榜生肖&#xff1a;牛、猴、鸡 需要注意&#xff1a;鼠、虎、猪 喜神方位&#xff1a;东北方 财神方位&#xff1a;正…

nginx 常见面面问题

1.什么是 Nginx&#xff1f; Nginx 是一个 轻量级 / 高性能的反向代理 Web 服务器&#xff0c;用于 HTTP、HTTPS、SMTP、POP3 和 IMAP 协议。他实现非常高效的反向代理、负载平衡&#xff0c;他可以处理 2-3 万并发连接数&#xff0c;官方监测能支持 5 万并发&#xff0c;现在中…

Java毕业设计 基于SpringBoot 众筹网

Java毕业设计 基于SpringBoot 众筹网 SpringBoot 众筹网 功能介绍 注册 邮箱验证码 登录 忘记密码 首页 图片轮播 关于我们 项目列表 发布项目 我的添加项目 提交审核 已在募捐 项目详情 项目介绍 项目进展 捐赠列表 评论 新闻列表 发布新闻 新闻详情 评论新闻 联系我们 提交…

Linux笔记--静态库和动态库

库是指在我们的应用中&#xff0c;有一些公共代码是需要反复使用&#xff0c;就把这些代码编译为"库"文件;在链接步骤中&#xff0c;链接器将从库文件取得所需的代码&#xff0c;复制到生成的可执行文件中。 Linux中常见的库文件有两种&#xff0c;一种.a为后缀&…

GIS之深度学习10:运行Faster RCNN算法

&#xff08;未完成&#xff0c;待补充&#xff09; 获取Faster RCNN源码&#xff08;开源的很多&#xff09; 替换自己的数据集&#xff08;图片标签文件&#xff09; 打开终端&#xff0c;进入gpupytorch环境 运行voc_annotation.py文件生成与训练文件 E:\DeepLearningMode…

人事档案转出需要注意哪些方面

人事档案转出是指将员工的人事档案从一个部门、公司或组织转移到另一个部门、公司或组织的过程。这个过程需要注意以下几个方面&#xff1a; 1.法律合规&#xff1a;在进行人事档案转出前&#xff0c;要确保遵守相关的法律法规和公司内部规定。例如&#xff0c;要确保有合法的授…

Redis 7.0版本主从复制机制

1、引言 Redis是一个开源、高性能、内存键值存储系统&#xff0c;同时也提供了数据结构服务器的功能。它支持五种主要的数据类型&#xff1a;字符串&#xff08;String&#xff09;、哈希表&#xff08;Hashes&#xff09;、列表&#xff08;Lists&#xff09;、集合&#xff…

【TEE】内存完整性保护

Hash Functions&Merkle Tree 对读操作进行完整性检查&#xff0c;通过在加载的块上重新计算一个哈希&#xff0c;然后根据片外地址将得到的哈希与片上哈希比较。 缺点&#xff1a;不可承受的片上存储开销&#xff0c;并假设128位哈希和512位cache line&#xff0c;其开销为…

LABEL-EFFICIENT SEMANTIC SEGMENTATION WITHDIFFUSION MODELS

基于扩散模型的标签高效语义分割 摘要&#xff1a; 去噪扩散概率模型最近受到了很多研究的关注&#xff0c;因为它们优于gan等替代方法&#xff0c;并且目前提供了最先进的生成性能。扩散模型的优越性能使其成为一些应用程序的吸引人的工具&#xff0c;包括绘图&#xff0c;超…

STM32CubeIDE基础学习-新建STM32CubeIDE基础工程

STM32CubeIDE基础学习-新建STM32CubeIDE基础工程 前言 有开发过程序的朋友都清楚&#xff0c;后面开发是不需要再新建工程的&#xff0c;一般都是在初学时或者有特殊需要的时候才需要新建项目工程的。 后面开发都是可以在这种已有的工程上添加相关功能就行&#xff0c;只要前…

基于openKylin与RISC-V的MindSpore AI项目实践

项目目标&#xff1a; 在openKylin系统上安装和配置MindSpore框架。开发一个简单的图像分类模型&#xff0c;并在RISC-V平台上进行训练和推理。根据RISC-V的特性&#xff0c;对MindSpore框架进行必要的优化。 目录 项目目标&#xff1a; 训练模型 编写训练代码&#xff0c;设…

外包干了3个月,技术倒退明显

先说情况&#xff0c;大专毕业&#xff0c;18年通过校招进入湖南某软件公司&#xff0c;干了接近6年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试&#xf…

挑战给女神节送礼物,怎么寄快递才能快速的送到他手中呢?

马上就是三八女神节了&#xff0c;怎么样&#xff1f;你给心爱的她或者敬爱的她准备礼物了吗&#xff0c;如果已经准备好&#xff0c;你该怎么送给她呢&#xff1f;是当面送给她&#xff1f;还是通过快递打包送给她呢&#xff1f;这里推荐使用闪侠惠递寄快递发货给他吧&#xf…

SQL Server 阻止了对组件 ‘Ole Automation Procedures‘ 的 过程‘sys.sp_OACreate‘ 的访问

SQL Server 阻止了对组件 Ole Automation Procedures 的 过程sys.sp_OACreate 的访问&#xff0c;因为此组件已作为此服务器安全配置的一部分而被关闭。系统管理员可以通过使用 sp_configure 启用 Ole Automation Procedures。有关启用 Ole Automation Procedures 的详细信息&a…

【Python】使用numpy进行神经网络激活函数算法描述

【Python】使用numpy进行神经网络激活函数算法描述 系统&#xff1a;macOS 10.14.5 IDE&#xff1a;PyCharm 2018.2.4 一、What 1.1 NumPy NumPy(Numerical Python) 是 Python 语言的一个扩展程序库&#xff0c;支持大量的维度数组与矩阵运算&#xff0c;此外也针对数组运算提供…

基于灰狼算法GWO的城市三维无人机路径规划(复杂地形三维航迹路径规划)

摘要 本文提出了一种利用灰狼算法GWO来解决城市环境下无人机三维路径规划问题的方法。这种方法将复杂的无人机航迹规划任务转化为一个优化问题&#xff0c;然后运用灰狼算法GWO来解决这个优化问题。灰狼算法GWO是一种模拟灰狼种群捕猎行为的优化算法&#xff0c;它具备强大的全…