RocketMQ的简单使用

这里需要创建2.x版本的springboot项目 

导入依赖

    <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>

定义配置文件

server:port: 3000rocketmq:name-server: xxx.xxx.xxx.xxx:9876  # NameServer 地址producer:group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义

生产者定义

这里的生产者有两个,一个是普通的,一个是延时。

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.messaging.support.MessageBuilder;@Component
@Slf4j
public class GeneralMessageDemoProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent) {SendResult sendResult;try{StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

延时的

@Component
@Slf4j
public class ScheduleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent ) {SendResult sendResult;try {StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L,6);log.info("[延时消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[延时消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

消费者定义

这里也是两个消费者,普通的和延时的不在同一个主题的内

@Slf4j
@Component
@RocketMQMessageListener(topic = "rocketmq-yhy_topic",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ消息,消息体:{}", JSON.toJSONString(message));}
}
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "Delay",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume_Delay implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ的延时消息,消息体:{}", JSON.toJSONString(message));}
}

发送消息

这里直接在启动类发送。

@SpringBootApplication
@RestController
public class RocketMQDemoApplication {@Autowiredprivate GeneralMessageDemoProduce generalMessageDemoProduce;@Autowiredprivate ScheduleProducer scheduleProducer;@PostMapping("/test/send/general-message")public String sendGeneralMessage() {String keys= UUID.randomUUID().toString();MessageEvent messageEvent=new MessageEvent("消息具体内容——yhy",keys);SendResult sendResult=generalMessageDemoProduce.sendMessage("rocketmq-yhy_topic","general",keys,messageEvent);SendResult sendResult2=scheduleProducer.sendMessage("Delay","general",keys,messageEvent);System.out.println(sendResult.getSendStatus().name() );System.out.println(sendResult2.getSendStatus().name());return sendResult.getSendStatus().name();}public static void main(String[] args) {SpringApplication.run(RocketMQDemoApplication.class, args);}
}

postman触发

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/797231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SSM+Jsp+Mysql的人事管理系统

开发语言&#xff1a;Java框架&#xff1a;ssm技术&#xff1a;JSPJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包…

深入理解JVM的内存结构及GC机制(2)

虚拟机栈占用的是操作系统内存&#xff0c;每个线程对应一个虚拟机栈&#xff0c;它是线程私有的&#xff0c;生命周期和线程一样&#xff0c;每个方法被执行时产生一个栈帧&#xff08;Statck Frame&#xff09;&#xff0c;栈帧用于存储局部变量表、动态链接、操作数和方法出…

大语言模型落地的关键技术:RAG

1、什么是RAG&#xff1f; RAG 是检索增强生成&#xff08;Retrieval-Augmented Generation&#xff09;的简称&#xff0c;是当前最火热的大语言模型应用落地的关键技术&#xff0c;主要用于提高语言模型的效果和准确性。它结合了两种主要的NLP方法&#xff1a;检索&#xff…

post请求搜索功能爬虫

<!--爬虫仅支持1.8版本的jdk--> <!-- 爬虫需要的依赖--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency>…

2023年下半年网络工程师上午真题及答案解析

1.当计算机突然断电时&#xff0c;( )中存储的信息会丢失。 A.光盘 B.ROM C.RAM D.硬盘 2.进程的状态有就绪态、运行态、阻塞态&#xff0c;其中( )的变化是不可能直接发生的。 A.就绪态到运行态 B.阻塞态到就绪态 C.运行态到阻塞态 D.阻塞态到运行态 3.分…

老板们注意了,AI可能在悄悄威胁你的工作

前天,科技新闻大佬The Register发了一篇文章,说的是AI在科研领域的管理角色越来越大,可能会让管理岗位变得过时,听起来是不是有点儿疯狂? ESMT Berlin的研究小伙伴们发现,AI能够以更大的规模和效率来管理研究项目,比如审查科学文献和预测创新化合物等等,而不是取代人类…

漂亮国的无人餐厅的机器人骚操作

导语 大家好&#xff0c;我是智能仓储物流技术研习社的社长&#xff0c;你的老朋友&#xff0c;老K。行业群 新书《智能物流系统构成与技术实践》 知名企业 读者福利&#xff1a; &#x1f449;抄底-仓储机器人-即买即用-免调试 智能制造-话题精读 1、西门子、ABB、汇川&#x…

学习vue3第十四节 Teleport 内置组件介绍

<Teleport></Teleport> 作用目的&#xff1a; 用于将指定的组件或者元素传送到指定的位置&#xff1b; 通常是自定义的全局通用弹窗&#xff0c;绑定到 body 上&#xff0c;而不是在当前元素上面&#xff1b; 使用方法&#xff1a; 接收两个参数 to: 要将目标传…

Day105:代码审计-PHP原生开发篇SQL注入数据库监控正则搜索文件定位静态分析

目录 代码审计-学前须知 Bluecms-CNVD-1Day-常规注入审计分析 emlog-CNVD-1Day-常规注入审计分析 emlog-CNVD-1Day-2次注入审计分析 知识点&#xff1a; 1、PHP审计-原生态开发-SQL注入&语句监控 2、PHP审计-原生态开发-SQL注入&正则搜索 3、PHP审计-原生态开发-SQ…

Java文件流操作

一、文件创建和删除 public static void main(String[] args) throws IOException {File file new File("..\\hello-world.txt");//..表示在上机目录下创建hello-world.txtSystem.out.println(file.getPath());//返回当前相对路径System.out.println(file.getCanoni…

vue项目 设置浏览器地址栏图标及名称

在vue项目中&#xff0c;怎样设置浏览器tab图标及名称呢&#xff1f; 方案一 1.静态配置vue项目ico 1.1将需要展示的ico放到项目文件中 1.2在项目根目录public文件中的index.html添加如下代码 <link rel"icon" href"<% BASE_URL %>favicon.ico"…

Java 组合模式

Java设计模式 - 组合模式 组合模式是结构型模式&#xff0c;因为它创建了一组对象的树结构。 组合模式将一组对象视为单个对象。 组合模式使用一个类来表示树结构。 在组合模式中&#xff0c;我们创建一个包含自己对象的类的组。 例子 以下代码使用Employee类来演示组合模…

(学习日记)2024.04.03:UCOSIII第三十一节:信号量函数接口讲解

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…

c++的学习之路:13、vector(2)

本章主要是模拟实现vector&#xff0c;文章末附上代码&#xff0c;和源码。 目录 一、STL源码 二、构造与析构 三、迭代器与【】、size、capacity、empty 四、reserve与resize 五、push_back与pop_back 六、insert与erase 七、测试 1 八、代码 九、思维导图 一、STL源…

FJSP:蜣螂优化算法( Dung beetle optimizer, DBO)求解柔性作业车间调度问题(FJSP),提供MATLAB代码

一、柔性作业车间调度问题 柔性作业车间调度问题&#xff08;Flexible Job Shop Scheduling Problem&#xff0c;FJSP&#xff09;&#xff0c;是一种经典的组合优化问题。在FJSP问题中&#xff0c;有多个作业需要在多个机器上进行加工&#xff0c;每个作业由一系列工序组成&a…

2024.4.3-day08-CSS 盒子模型(溢出显示、伪元素)

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 作业 2024.4.3-学习笔记css溢出显示单行文本溢出显示省略号多行文本溢出显示省…

多态.Java

&#xff08;1&#xff09;什么是多态&#xff1f; 同类型的对象&#xff0c;表现出不同的形态。前者指父类&#xff0c;后者指不同的子类 说简单点&#xff0c;就是父类的同一种方法&#xff0c;可以在不同子类中表现出不同的状态&#xff0c;或者说在不同子类中可以实现不同…

10.java openCV4.x 入门-特殊的Mat类汇总(二)

专栏简介 &#x1f492;个人主页 &#x1f4f0;专栏目录 点击上方查看更多内容 &#x1f4d6;心灵鸡汤&#x1f4d6;我们唯一拥有的就是今天&#xff0c;唯一能把握的也是今天建议把本文当作笔记来看&#xff0c;据说专栏目录里面有相应视频&#x1f92b; &#x1f9ed;文…

Linux 常用指令及其理论知识

个人主页&#xff1a;仍有未知等待探索-CSDN博客 专题分栏&#xff1a;http://t.csdnimg.cn/Tvyou 欢迎各位指教&#xff01;&#xff01;&#xff01; 目录 一、理论知识 二、基础指令 1、ls指令&#xff08;列出该目录下的所有子目录和文件&#xff09; 语法&#xff1a; …

论文阅读——Sat2Vid

Sat2Vid: Street-view Panoramic Video Synthesis from a Single Satellite Image 提出了一种新颖的方法&#xff0c;用于从单个卫星图像和摄像机轨迹合成时间和几何一致的街景全景视频。 即根据单个卫星图像和给定的观看位置尽可能真实地、尽可能一致地合成街景全景视频序列。…