RocketMQ—RocketMQ集成SpringBoot

RocketMQ—RocketMQ集成SpringBoot

新建生产者的boot项目和消费者的boot项目,pom文件重点如下:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins>
</build>

02-boot-producer和03-boot-consumer分别对应生产者和消费者。

项目结构

生产者

生产者yml文件如下:

rocketmq:name-server: 地址:端口producer:group: boot-producer-group

同步发送消息

生产者同步发送消息的代码如下:

@SpringBootTest
class Rocketmq02BootProducerApplicationTests {//注入rocketMQTemplate@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void producer(){rocketMQTemplate.syncSend("bootTestTopic","这是boot的一个消息");}}

运行完毕看面板如下:

面板

发送异步消息

// 异步
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失败" + throwable.getMessage());}
});

发送单向消息

rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

延迟消息

// 延迟消息
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3); //第三个参数表示连接消息队列的超时时间,第四个参数表示延时等级

顺序消息

MSGModel类如下

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {private String orderSn;private Integer userId;private String desc; // 下单 短信 物流}

发送顺序消息的生产者如下:

//发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 发送  一般都是以json的方式进行处理rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());//第二个参数表示消息内容	第三个参数表示hashKey
});

带标签的消息

@Test
void tagKeyTest() throws Exception {rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
}

带key的消息

@Test
void tagKeyTest() throws Exception {// key是写带在消息头的Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "key-id-1").build();rocketMQTemplate.syncSend("bootKeyTopic", message);}

消费者

yml配置文件如下:

server:port: 8890
rocketmq:name-server: 地址:端口

简单消费者

消费者代码如下

@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-test-consumer-group")
public class ASimpleMsgListener implements RocketMQListener<MessageExt> {//如果泛型指定固定类型,消息体就是我们的参数//MessageExt 是消息所有内容,可以拿到所有内容/*** 这个方法就是消费消息的方法* 只要没有报错,就签收了* 如果报错了,就是拒收,就会重试* @param message 是消息内容*/@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}

运行结果

顺序消息的消费者

@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}

带tag的消费者

@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// tag过滤模式selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式  这种一般不用,这种默认没有开启,需要在sql92 //需要在broker.conf配置文件中开启enbalePropertyFilter=true
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/722361.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker 中的挂载机制

Docker 挂载机制 Docker 的挂载机制允许将宿主机的文件或目录挂载到 Docker 容器内部&#xff0c;这样容器就可以访问宿主机上的文件系统。Docker 提供了多种挂载方式&#xff0c;包括以下几种&#xff1a; 绑定挂载&#xff08;Bind Mounts&#xff09;&#xff1a;通过绑定挂…

Docker中使用nginx-rtmp推拉网络摄像头视频流

前言&#xff1a; 该部分比较麻烦&#xff0c;闹腾了好久&#xff08;ffmpeg推拉流没学过&#xff0c;事实证明依葫芦画瓢是不行滴&#xff0c;后面有时间再学吧&#xff09;&#xff0c;后来借助chatGPT勉强解决&#xff0c;但不是很懂。因个人能力有限&#xff0c;只复述操作…

Starknet(strk) 跨链桥教程:手把手教你用bitget钱包跨链

摘要&#xff1a;通过Rhino.fi &#xff0c;将资产无缝桥接至Starknet&#xff08;web3.bitget.com/zh/assets/starknet-wallet&#xff09;变得高效且具有成本效益&#xff0c;Rhino.fi 是一个以其快速处理时间和低交易费用而闻名的平台。它专为与 MetaMask 等流行的 Web 3 钱…

长整数拼接后求余数

原题&#xff1a;2024牛客寒假算法基础集训营3 M题 方法一&#xff1a; 用递归进行长整数除法&#xff0c;暴力寻找&#xff0c;但是会超时 #include <bits/stdc.h> using namespace std; bool f(string &s, int ans,int count) {if (count > s.size()) {if (a…

Newsmy储能电源与您相约九州汽车生态博览

2024年3月7日—10日&#xff0c;第24届 深圳国际智慧出行、汽车改装及汽车服务业态博览会&#xff08;以下简称“九州汽车生态博览会”&#xff09;将在深圳国际会展中心&#xff08;宝安&#xff09;举办&#xff0c;Newsmy纽曼集团将在3号馆32523展位&#xff0c;携全系产品与…

2024年3月6日 十二生肖 今日运势

小运播报&#xff1a;2024年3月6日&#xff0c;星期三&#xff0c;农历正月廿六 &#xff08;甲辰年丁卯月己巳日&#xff09;&#xff0c;法定工作日。 红榜生肖&#xff1a;牛、猴、鸡 需要注意&#xff1a;鼠、虎、猪 喜神方位&#xff1a;东北方 财神方位&#xff1a;正…

nginx 常见面面问题

1.什么是 Nginx&#xff1f; Nginx 是一个 轻量级 / 高性能的反向代理 Web 服务器&#xff0c;用于 HTTP、HTTPS、SMTP、POP3 和 IMAP 协议。他实现非常高效的反向代理、负载平衡&#xff0c;他可以处理 2-3 万并发连接数&#xff0c;官方监测能支持 5 万并发&#xff0c;现在中…

Java毕业设计 基于SpringBoot 众筹网

Java毕业设计 基于SpringBoot 众筹网 SpringBoot 众筹网 功能介绍 注册 邮箱验证码 登录 忘记密码 首页 图片轮播 关于我们 项目列表 发布项目 我的添加项目 提交审核 已在募捐 项目详情 项目介绍 项目进展 捐赠列表 评论 新闻列表 发布新闻 新闻详情 评论新闻 联系我们 提交…

Linux笔记--静态库和动态库

库是指在我们的应用中&#xff0c;有一些公共代码是需要反复使用&#xff0c;就把这些代码编译为"库"文件;在链接步骤中&#xff0c;链接器将从库文件取得所需的代码&#xff0c;复制到生成的可执行文件中。 Linux中常见的库文件有两种&#xff0c;一种.a为后缀&…

GIS之深度学习10:运行Faster RCNN算法

&#xff08;未完成&#xff0c;待补充&#xff09; 获取Faster RCNN源码&#xff08;开源的很多&#xff09; 替换自己的数据集&#xff08;图片标签文件&#xff09; 打开终端&#xff0c;进入gpupytorch环境 运行voc_annotation.py文件生成与训练文件 E:\DeepLearningMode…

Linux配置VNC实现远程控制,提高运维效率

VNC介绍 1.1 VNC简介 VNC (Virtual Network Console)是虚拟网络控制台的缩写。它 是一款优秀的远程控制工具软件&#xff0c;可以实现远程控制计算机的功能。 1.2 VNC组成 VNC基本上是由两部分组成&#xff0c;在任何安装了客户端的应用程序(vncviewer)的计算机都能十分方便…

蓝桥杯第1390题——A Careful Approach

题目描述 如果你认为参加一个编程比赛让你感到有压力&#xff0c;那么请你想象你是一个空中交通管制员。因为人命关天&#xff0c;所以一个空中交通管制员必须在时刻变化的环境中专注于任务&#xff0c;解决不可预知的事件。 让我们将目光转向飞机的着陆流程。飞机进入目的地飞…

人事档案转出需要注意哪些方面

人事档案转出是指将员工的人事档案从一个部门、公司或组织转移到另一个部门、公司或组织的过程。这个过程需要注意以下几个方面&#xff1a; 1.法律合规&#xff1a;在进行人事档案转出前&#xff0c;要确保遵守相关的法律法规和公司内部规定。例如&#xff0c;要确保有合法的授…

vue-lazyload 图片懒加载的原理与使用

一、图片懒加载vue-lazyload是什么&#xff1f; 背景&#xff1a; 图片是非常占用页面渲染时间的&#xff0c;尤其是一些图片比较多的页面&#xff0c;过多的图片可能会造成页面的卡顿&#xff0c;降低流畅度影响用户体验&#xff0c;我们在实际开发中&#xff0c;对于处于视口…

Redis 7.0版本主从复制机制

1、引言 Redis是一个开源、高性能、内存键值存储系统&#xff0c;同时也提供了数据结构服务器的功能。它支持五种主要的数据类型&#xff1a;字符串&#xff08;String&#xff09;、哈希表&#xff08;Hashes&#xff09;、列表&#xff08;Lists&#xff09;、集合&#xff…

【TEE】内存完整性保护

Hash Functions&Merkle Tree 对读操作进行完整性检查&#xff0c;通过在加载的块上重新计算一个哈希&#xff0c;然后根据片外地址将得到的哈希与片上哈希比较。 缺点&#xff1a;不可承受的片上存储开销&#xff0c;并假设128位哈希和512位cache line&#xff0c;其开销为…

LABEL-EFFICIENT SEMANTIC SEGMENTATION WITHDIFFUSION MODELS

基于扩散模型的标签高效语义分割 摘要&#xff1a; 去噪扩散概率模型最近受到了很多研究的关注&#xff0c;因为它们优于gan等替代方法&#xff0c;并且目前提供了最先进的生成性能。扩散模型的优越性能使其成为一些应用程序的吸引人的工具&#xff0c;包括绘图&#xff0c;超…

STM32CubeIDE基础学习-新建STM32CubeIDE基础工程

STM32CubeIDE基础学习-新建STM32CubeIDE基础工程 前言 有开发过程序的朋友都清楚&#xff0c;后面开发是不需要再新建工程的&#xff0c;一般都是在初学时或者有特殊需要的时候才需要新建项目工程的。 后面开发都是可以在这种已有的工程上添加相关功能就行&#xff0c;只要前…

智能合约的编程语言

智能合约的编程语言 Solidity: https://learnblockchain.cn/docs/solidity/ 相关资料&#xff1a; https://guide.pseudoyu.com/docs/study_path/ 智能合约的技术栈 Hardhat https://hardhat.org/ Truffle https://trufflesuite.com/docs/truffle/ Remix https://hard…

基于openKylin与RISC-V的MindSpore AI项目实践

项目目标&#xff1a; 在openKylin系统上安装和配置MindSpore框架。开发一个简单的图像分类模型&#xff0c;并在RISC-V平台上进行训练和推理。根据RISC-V的特性&#xff0c;对MindSpore框架进行必要的优化。 目录 项目目标&#xff1a; 训练模型 编写训练代码&#xff0c;设…