RocketMQ(三):集成SpringBoot

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot


目录

  • 一、搭建环境
  • 二、不同类型消息
    • 1、同步消息
    • 2、异步消息
    • 3、单向消息
    • 4、延迟消息
    • 5、顺序消息
    • 6、带tag消息
    • 7、带key消息

一、搭建环境

  • 需要创建两个服务,消息生产服务和消息消费者服务
  • 生产消息存在多个服务,消费则统一由一个服务处理
  • 这样可以做到解耦

pom.xml

  • 生产者和消费者都需要
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>

生产者配置文件

  • 设置统一的生产者组,这样发送消息时就不用指定了
rocketmq:name-server: 127.0.0.1:9876     # rocketMq的nameServer地址producer:group: boot-producer-group        # 生产者组别send-message-timeout: 3000  # 消息发送的超时时间retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数max-message-size: 4194304       # 消息的最大长度

生产者配置文件

  • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
rocketmq:name-server: localhost:9876

二、不同类型消息

直接引入即可

@Autowired
private RocketMQTemplate rocketMQTemplate;

1、同步消息

生产消息

  • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
  • 比如:重要的消息通知,短信通知等
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");

消费消息

  • RocketMQListener的泛型类型即消息类型
    • MessageExt类型是消息的所有内容
    • 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
  • onMessage方法内没有报错就是签收了,报错就是拒收会重试
@Component
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}

2、异步消息

  • 发送异步消息,发送完以后会有一个异步通知
  • 不影响程序往下执行
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失败" + throwable.getMessage());}
});

3、单向消息

  • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
  • 例如日志信息的发送
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

4、延迟消息

  • RocketMQ不支持任意时间的延时
  • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
  • private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
  • 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);

5、顺序消息

生产消息

  • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 发送  一般都是以json的方式进行处理// 根据第三个参数计算hash值决定消息放入哪个队列rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});

消费消息

  • 默认是并发消费模式,可以设置为单线程顺序模式
  • 设置消费重试次数
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}

6、带tag消息

  • tag带在主题后面用:来携带
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");

7、带key消息

Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "10086").build();
rocketMQTemplate.syncSend("bootKeyTopic", message);

获取带key和tag的消费者

  • 过滤模式有两种:正则表达式和sql92方式
  • keys从MessageExt对象中获取
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// tag过滤模式selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println("获取keys: " + message.getKeys());System.out.println("消息内容: " + new String(message.getBody()));}
}

查看源码

  • destination目标 = 主题 : 标签
  • keys从消息头里面获取

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148111.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Accelerate 0.24.0文档 三:超大模型推理(内存估算、Sharded checkpoints、bitsandbytes量化、分布式推理)

文章目录 一、内存估算1.1 Gradio Demos1.2 The Command 二、使用Accelerate加载超大模型2.1 模型加载的常规流程2.2 加载空模型2.3 分片检查点&#xff08;Sharded checkpoints&#xff09;2.4 示例&#xff1a;使用Accelerate推理GPT2-1.5B2.5 device_map 三、bitsandbytes量…

3297:【例50.3】 平衡数《信息学奥赛一本通编程启蒙(C++版)》

3297&#xff1a;【例50.3】 平衡数《信息学奥赛一本通编程启蒙&#xff08;C版&#xff09;》 【题目描述】 平衡数&#xff1a;如果正整数 x 的每一位数字 d 恰好在 x 中出现了 d 次&#xff0c;则认为 x 是平衡数。例如 x122&#xff0c;其中 对于百位数 d1&#xff0c;其…

LeetCode【13】罗马数字转整数

题目&#xff1a; 思路&#xff1a; 第十二题的逆运算&#xff0c;方法同理。需要注意的是IV、IX、XL、XC、CD、CM这六种特殊的情况。正常情况下每个字符找到对应的数值累加&#xff0c;这六种特殊字符都是左边的数值比右边的数值小。 这里以IV举例&#xff0c;IV对应数字是1和…

详解如何使用Jenkins一键打包部署SpringBoot项目

目录 1、Jenkins简介 2、Jenkins的安装及配置 2.1、Docker环境下的安装​编辑 2.2、Jenkins的配置 3、打包部署SpringBoot应用 3.1、在Jenkins中创建执行任务 3.2、测试结果 1、Jenkins简介 任何简单操作的背后&#xff0c;都有一套相当复杂的机制。本文将以SpringBoot应…

C++ 好玩的约瑟夫环(单链表版本)

【题目描述】 有M个人&#xff0c;编号分别为1到M&#xff0c;玩约瑟夫环游戏&#xff0c;最初时按编号顺序排成队列&#xff1b;每遍游戏开始时&#xff0c;有一个正整数报数密码N&#xff0c;队列中人依次围坐成一圈&#xff0c;从队首的人开始报数&#xff0c;报到N的人出列…

文本向量化

文本向量化表示的输出比较 import timeimport torch from transformers import AutoTokenizer, AutoModelForMaskedLM, AutoModel# simcse相似度分数 def get_model_output(model, tokenizer, text_str):"""验证文本向量化表示的输出:param model: 模型的…

linux进程间通信之信号量

注意请不要把它与之前所说的信号混淆起来&#xff0c;信号与信号量是不同的两种事物。 摘要 本文旨在深入探讨Linux进程间通信中的信号量机制&#xff0c;包括其工作原理、系统调用接口以及实际应用场景。通过理论分析和示例代码的解读&#xff0c;本文将帮助读者更好地理解信号…

开发者分享 | Ascend C算子开发及单算子调用

本文分享自《AscendC算子开发及单算子调用》&#xff0c;作者&#xff1a;goldpancake。 笔者在阅读Ascend C官方文档的过程中发现&#xff0c;对于初学者来说&#xff0c;尤其是第一次接触异构编程思想的初学者&#xff0c;有部分内容是无需特别关注的&#xff0c;例如算子工…

linux systemd start stop enable disable命令区别

一、systemd 的服务在三个文件件下 /lib/systemd/system /etc/systemd/system /usr/lib/systemd/system 终于明白这几个命令的区别 systemd star systemd stop systemd enable systemd disable 二、 1、用ssh服务为例&#xff0c;&#xff0c;ssh是客户端&#xff0c;远程ss…

线性表--顺序表-1

文章目录 主要内容一.基础练习题1.从顺序表中删除具有最小值的元素&#xff08;假设唯一&#xff09;并由函数返回被删元素的值。空出位置由最后元素填补&#xff0c;若顺序表为空&#xff0c;则显示出错信息并退出运行。代码如下&#xff08;示例&#xff09;: 2.设计一个高效…

持续集成交付CICD:Jenkins通过API触发流水线

目录 一、理论 1.HTTP请求 2.调用接口的方法 3.HTTP常见错误码 二、实验 1.Jenkins通过API触发流水线 三、问题 1.如何拿到上一次jenkinsfile文件进行自动触发流水线 一、理论 1.HTTP请求 &#xff08;1&#xff09;概念 HTTP超文本传输协议&#xff0c;是确保服务器…

JS特效:跟随鼠标移动的小飞机

前端网页中&#xff0c;用JS实现鼠标移动时&#xff0c;页面中的小飞机向着鼠标移动。 效果 源码 <!DOCTYPE html> <html><head><style>*{margin: 0;padding: 0;}body{height: 100vh;background: linear-gradient(200deg,#005bea,#00c6fb);}#plane{…

[C/C++]数据结构 链表(单向链表,双向链表)

前言: 上一文中我们介绍了顺序表的特点及实现,但是顺序表由于每次扩容都是呈二倍增长(扩容大小是自己定义的),可能会造成空间的大量浪费,但是链表却可以解决这个问题. 概念及结构: 链表是一种物理存储结构上非连续、非顺序的存储结构,数据元素的逻辑顺序是通过链表中的指针链接…

HC-SR501传感器制作一个报警系统

接线图&#xff1a; 引脚连接&#xff1a; 1. 将 PIR 信号引脚连接到 arduino 数字 引脚 13。 2. 将 PIR V 引脚连接 到 arduino 5v 引脚。 3. 将 PIR GND 引脚连接到 arduino GND 引脚。 4. 将arduino数字 引脚12连接 到220欧姆电阻&#xff0c;并将该电阻连接到 LED V …

Java Swing猜单词游戏

内容要求 1&#xff09; 本次程序设计是专门针对 Java 课程的,要求使用 Java 语言进行具有一定代码量的程序开发。程序的设计要结合一定的算法&#xff0c;在进行代码编写前要能够设计好自己的算法。 2&#xff09;本次程序设计涉及到 Java 的基本语法&#xff0c;即课堂上所…

提升工作效率,打造精细思维——OmniOutliner 5 Pro for Mac

在当今快节奏的工作环境中&#xff0c;如何高效地组织和管理我们的思维和任务成为了关键。而OmniOutliner 5 Pro for Mac正是为此而生的一款强大工具。无论你是专业写作者、项目经理还是学生&#xff0c;OmniOutliner 5 Pro for Mac都能帮助你提升工作效率&#xff0c;打造精细…

Fibonacci 数列与黄金分割

mapp[1 for item in range(30)] for item in range(3,30):mapp[item]mapp[item-1]mapp[item-2]pass numint(input()) if num>19:print("0.61803399")pass else:anss float((mapp[num]*1.0) / (mapp[num 1]*1.0))print(format(anss,.8f))进行短程的打表就可以看出…

实用篇-ES-DSL查询文档

数据的存储不是目的&#xff0c;我们希望从海量的酒店数据中检索出需要的信息&#xff0c;这就是ES的搜索功能 官方文档: https://elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html#query-dsl。DSL是用来查询文档的 Elasticsearch提供了基于JSON的DSL来定…

将给定的表达式树(二叉树)转换为等价的中缀表达式(通过括号反映操作符的计算次序)并输出

要将给定的表达式树转换为等价的中缀表达式&#xff0c;可以通过遍历表达式树的方式来实现。以下是一个以递归方式实现的示例代码&#xff1a; class Node:def __init__(self, value):self.value valueself.left Noneself.right Nonedef convert_to_infix_expression(root)…

阿里云ESSD云盘、高效云盘和SSD云盘介绍和IOPS性能参数表

阿里云服务器系统盘或数据盘支持多种云盘类型&#xff0c;如高效云盘、ESSD Entry云盘、SSD云盘、ESSD云盘、ESSD PL-X云盘及ESSD AutoPL云盘等&#xff0c;阿里云服务器网aliyunfuwuqi.com详细介绍不同云盘说明及单盘容量、最大/最小IOPS、最大/最小吞吐量、单路随机写平均时延…