RocketMQ(三):集成SpringBoot

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot


目录

  • 一、搭建环境
  • 二、不同类型消息
    • 1、同步消息
    • 2、异步消息
    • 3、单向消息
    • 4、延迟消息
    • 5、顺序消息
    • 6、带tag消息
    • 7、带key消息

一、搭建环境

  • 需要创建两个服务,消息生产服务和消息消费者服务
  • 生产消息存在多个服务,消费则统一由一个服务处理
  • 这样可以做到解耦

pom.xml

  • 生产者和消费者都需要
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>

生产者配置文件

  • 设置统一的生产者组,这样发送消息时就不用指定了
rocketmq:name-server: 127.0.0.1:9876     # rocketMq的nameServer地址producer:group: boot-producer-group        # 生产者组别send-message-timeout: 3000  # 消息发送的超时时间retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数max-message-size: 4194304       # 消息的最大长度

生产者配置文件

  • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
rocketmq:name-server: localhost:9876

二、不同类型消息

直接引入即可

@Autowired
private RocketMQTemplate rocketMQTemplate;

1、同步消息

生产消息

  • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
  • 比如:重要的消息通知,短信通知等
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");

消费消息

  • RocketMQListener的泛型类型即消息类型
    • MessageExt类型是消息的所有内容
    • 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
  • onMessage方法内没有报错就是签收了,报错就是拒收会重试
@Component
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}

2、异步消息

  • 发送异步消息,发送完以后会有一个异步通知
  • 不影响程序往下执行
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失败" + throwable.getMessage());}
});

3、单向消息

  • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
  • 例如日志信息的发送
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

4、延迟消息

  • RocketMQ不支持任意时间的延时
  • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
  • private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
  • 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);

5、顺序消息

生产消息

  • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 发送  一般都是以json的方式进行处理// 根据第三个参数计算hash值决定消息放入哪个队列rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});

消费消息

  • 默认是并发消费模式,可以设置为单线程顺序模式
  • 设置消费重试次数
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}

6、带tag消息

  • tag带在主题后面用:来携带
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");

7、带key消息

Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "10086").build();
rocketMQTemplate.syncSend("bootKeyTopic", message);

获取带key和tag的消费者

  • 过滤模式有两种:正则表达式和sql92方式
  • keys从MessageExt对象中获取
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// tag过滤模式selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println("获取keys: " + message.getKeys());System.out.println("消息内容: " + new String(message.getBody()));}
}

查看源码

  • destination目标 = 主题 : 标签
  • keys从消息头里面获取

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148111.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Accelerate 0.24.0文档 三:超大模型推理(内存估算、Sharded checkpoints、bitsandbytes量化、分布式推理)

文章目录 一、内存估算1.1 Gradio Demos1.2 The Command 二、使用Accelerate加载超大模型2.1 模型加载的常规流程2.2 加载空模型2.3 分片检查点&#xff08;Sharded checkpoints&#xff09;2.4 示例&#xff1a;使用Accelerate推理GPT2-1.5B2.5 device_map 三、bitsandbytes量…

LeetCode【13】罗马数字转整数

题目&#xff1a; 思路&#xff1a; 第十二题的逆运算&#xff0c;方法同理。需要注意的是IV、IX、XL、XC、CD、CM这六种特殊的情况。正常情况下每个字符找到对应的数值累加&#xff0c;这六种特殊字符都是左边的数值比右边的数值小。 这里以IV举例&#xff0c;IV对应数字是1和…

详解如何使用Jenkins一键打包部署SpringBoot项目

目录 1、Jenkins简介 2、Jenkins的安装及配置 2.1、Docker环境下的安装​编辑 2.2、Jenkins的配置 3、打包部署SpringBoot应用 3.1、在Jenkins中创建执行任务 3.2、测试结果 1、Jenkins简介 任何简单操作的背后&#xff0c;都有一套相当复杂的机制。本文将以SpringBoot应…

文本向量化

文本向量化表示的输出比较 import timeimport torch from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModel# simcse相似度分数 def get_model_output(model, tokenizer, text_str):"""验证文本向量化表示的输出:param model: 模型的…

linux systemd start stop enable disable命令区别

一、systemd 的服务在三个文件件下 /lib/systemd/system /etc/systemd/system /usr/lib/systemd/system 终于明白这几个命令的区别 systemd star systemd stop systemd enable systemd disable 二、 1、用ssh服务为例&#xff0c;&#xff0c;ssh是客户端&#xff0c;远程ss…

持续集成交付CICD:Jenkins通过API触发流水线

目录 一、理论 1.HTTP请求 2.调用接口的方法 3.HTTP常见错误码 二、实验 1.Jenkins通过API触发流水线 三、问题 1.如何拿到上一次jenkinsfile文件进行自动触发流水线 一、理论 1.HTTP请求 &#xff08;1&#xff09;概念 HTTP超文本传输协议&#xff0c;是确保服务器…

JS特效:跟随鼠标移动的小飞机

前端网页中&#xff0c;用JS实现鼠标移动时&#xff0c;页面中的小飞机向着鼠标移动。 效果 源码 <!DOCTYPE html> <html><head><style>*{margin: 0;padding: 0;}body{height: 100vh;background: linear-gradient(200deg,#005bea,#00c6fb);}#plane{…

[C/C++]数据结构 链表(单向链表,双向链表)

前言: 上一文中我们介绍了顺序表的特点及实现,但是顺序表由于每次扩容都是呈二倍增长(扩容大小是自己定义的),可能会造成空间的大量浪费,但是链表却可以解决这个问题. 概念及结构: 链表是一种物理存储结构上非连续、非顺序的存储结构,数据元素的逻辑顺序是通过链表中的指针链接…

HC-SR501传感器制作一个报警系统

接线图&#xff1a; 引脚连接&#xff1a; 1. 将 PIR 信号引脚连接到 arduino 数字 引脚 13。 2. 将 PIR V 引脚连接 到 arduino 5v 引脚。 3. 将 PIR GND 引脚连接到 arduino GND 引脚。 4. 将arduino数字 引脚12连接 到220欧姆电阻&#xff0c;并将该电阻连接到 LED V …

提升工作效率,打造精细思维——OmniOutliner 5 Pro for Mac

在当今快节奏的工作环境中&#xff0c;如何高效地组织和管理我们的思维和任务成为了关键。而OmniOutliner 5 Pro for Mac正是为此而生的一款强大工具。无论你是专业写作者、项目经理还是学生&#xff0c;OmniOutliner 5 Pro for Mac都能帮助你提升工作效率&#xff0c;打造精细…

Fibonacci 数列与黄金分割

mapp[1 for item in range(30)] for item in range(3,30):mapp[item]mapp[item-1]mapp[item-2]pass numint(input()) if num>19:print("0.61803399")pass else:anss float((mapp[num]*1.0) / (mapp[num 1]*1.0))print(format(anss,.8f))进行短程的打表就可以看出…

实用篇-ES-DSL查询文档

数据的存储不是目的&#xff0c;我们希望从海量的酒店数据中检索出需要的信息&#xff0c;这就是ES的搜索功能 官方文档: https://elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html#query-dsl。DSL是用来查询文档的 Elasticsearch提供了基于JSON的DSL来定…

阿里云ESSD云盘、高效云盘和SSD云盘介绍和IOPS性能参数表

阿里云服务器系统盘或数据盘支持多种云盘类型&#xff0c;如高效云盘、ESSD Entry云盘、SSD云盘、ESSD云盘、ESSD PL-X云盘及ESSD AutoPL云盘等&#xff0c;阿里云服务器网aliyunfuwuqi.com详细介绍不同云盘说明及单盘容量、最大/最小IOPS、最大/最小吞吐量、单路随机写平均时延…

SpringBoot-AOP-基础到进阶

SpringBoot-AOP AOP基础 学习完spring的事务管理之后&#xff0c;接下来我们进入到AOP的学习。 AOP也是spring框架的第二大核心&#xff0c;我们先来学习AOP的基础。 在AOP基础这个阶段&#xff0c;我们首先介绍一下什么是AOP&#xff0c;再通过一个快速入门程序&#xff0c…

【我和Python算法的初相遇】——体验递归的可视化篇

&#x1f308;个人主页: Aileen_0v0 &#x1f525;系列专栏:PYTHON数据结构与算法学习系列专栏&#x1f4ab;"没有罗马,那就自己创造罗马~" 目录 递归的起源 什么是递归? 利用递归解决列表求和问题 递归三定律 递归应用-整数转换为任意进制数 递归可视化 画…

Docker安装MinIO遇到的问题汇总——持续更新中

文章目录 Docker安装MinIO遇到的坑前言问题1&#xff1a;执行docker run报错Error response from daemon问题2&#xff1a;启动MinIO容器浏览器无法访问问题3&#xff1a;上传文件报错InvalidResponseException问题4&#xff1a;上传文件报错Connection refused最终的启动指令问…

Jmeter 吞吐量Per User作用

第一点&#xff1a;Per User仅在Total Execution时生效 第二点&#xff1a;Per User 选中后 聚合报告中将统计的的样本数将变成线程组配置的线程数*吞吐量控制器配置的执行样本数量&#xff08;前提是线程组配置执行接口的次数线程数*循环数 大于吞吐量控制器配置的执行样本数…

gittee启动器

前言 很多小伙伴反馈不是使用gitee&#xff0c;不会寻找好的项目&#xff0c;在拿到一个项目不知道从哪里入手。 鼠鼠我呀就是宠粉&#xff0c;中嘞&#xff0c;老乡。整&#xff01;&#xff01;&#xff01; git的基本指令 在使用gitee的时候呢&#xff0c;我们只需要记住…

Adversarially Robust Neural Architecture Search for Graph Neural Networks

Adversarially Robust Neural Architecture Search for Graph Neural Networks----《面向图神经网络的对抗鲁棒神经架构搜索》 摘要 图神经网络&#xff08;GNN&#xff09;在关系数据建模方面取得了巨大成功。尽管如此&#xff0c;它们仍然容易受到对抗性攻击&#xff0c;这对…

力扣周赛372 模拟 思维 位运算 java

100131. 使三个字符串相等 ⭐ AC code class Solution {public int findMinimumOperations(String s1, String s2, String s3) {int len1 s1.length();int len2 s2.length();int len3 s3.length();int n Math.min(len1,len2);n Math.min(n,len3);int i 0;while(i < n…