Spring boot封装rocket mq 教程

1、rocket mq版本

      5.1.3

2、pom引入rocket mq依赖

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version></dependency>

3、发送MQ消息工具类

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;@Slf4j
public class MqSendUtil {@SneakyThrowspublic static MessageId sendMq(String topic, String tag, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag(tag)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}@SneakyThrowspublic static MessageId sendMqNoTag(String topic, String body, String... keys) {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoint = "127.0.0.1:9080";// 消息发送的目标Topic名称,需要提前创建。ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。try (Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build()) {// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys(keys)// 消息体。.setBody(body.getBytes()).build();// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);return sendReceipt.getMessageId();} catch (ClientException e) {log.error("Failed to send message", e);throw e;}}}

4、发送MQ消息测试代码

import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;public class MqSendTest {public static void test1() {MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());}public static void main(String[] args) {test1();}
}

5、MessageContext 消息内容的封装

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Collection;
import java.util.Map;@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {private String messageId;private String topic;private String body;private Map<String, String> properties;private Collection<String> keys;private Long deliveryTimestamp;private String bornHost;private Long bornTimestamp;private int deliveryAttempt;}

6、AbstractMqConsumer 发送mq消息的抽象类

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;import java.nio.charset.StandardCharsets;
import java.util.Collections;@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {public abstract String topic();public abstract String consumerGroup();public abstract String tag();public abstract void process(MessageContext messageContext);@Overridepublic void run(String... args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "127.0.0.1:9080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = StrUtil.isEmpty(tag()) ? "*" : tag();FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = consumerGroup();// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = topic();// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。MessageContext context = toMessageContext(messageView);
//                    log.info("收到mq消息主体内容:{}",context);try {process(context);} catch (Exception e) {log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);}return ConsumeResult.SUCCESS;}).build();log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);}private MessageContext toMessageContext(MessageView messageView) {Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;return MessageContext.builder().messageId(messageView.getMessageId().toString()).topic(messageView.getTopic()).body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString()).properties(messageView.getProperties()).keys(messageView.getKeys()).deliveryTimestamp(deliveryTimestamp).bornHost(messageView.getBornHost()).deliveryAttempt(messageView.getDeliveryAttempt()).build();}}

7、具体的消费类

topic指定消费者订阅的话题,comsumerGroup指明该消费者属于哪一个消费者分组,tag表明是否要获取指定标签的消息,process代表具体的业务处理逻辑,具体消息的内容可以MessageContext 类里面获取

import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {@Overridepublic String topic() {return "demo";}@Overridepublic String consumerGroup() {return "demo";}@Overridepublic String tag() {return null;}@Overridepublic void process(MessageContext messageContext) {log.info("收到消息:{}", messageContext);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/602063.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1002 写出这个数

读入一个正整数 n&#xff0c;计算其各位数字之和&#xff0c;用汉语拼音写出和的每一位数字。 输入格式&#xff1a; 每个测试输入包含 1 个测试用例&#xff0c;即给出自然数 n 的值。这里保证 n 小于 10100。 输出格式&#xff1a; 在一行内输出 n 的各位数字之和的每一…

ATTCK视角下的信息收集:主机发现

目录 1、利用协议主动探测主机存活 利用ICMP发现主机 利用ARP发现主机 利用NetBIOS协议发现主机 利用TCP/UDP发现主机 利用DNS协议发现主机 利用PRC协议发现主机程序 2、被动主机存活检测 利用Browser主机探测存活主机 利用ip段探测主机存活 利用net命令探测主机存活…

【软件测试】学习笔记-测试覆盖率

测试覆盖率通常被用来衡量测试的充分性和完整性&#xff0c;从广义的角度来讲&#xff0c;测试覆盖率主要分为两大类&#xff0c;一类是面向项目的需求覆盖率&#xff0c;另一类是更偏向技术的代码覆盖率。 需求覆盖率 需求覆盖率是指测试对需求的覆盖程度&#xff0c;通常的做…

UAV | 多算法在多场景下的无人机路径规划(Matlab)

近年来&#xff0c;无人机(unmanned aerial vehicle&#xff0c;UAV)由于其灵活度高、机动性强、安全风险系数小、成本低等特点&#xff0c;被广泛应用于搜索巡逻、侦察监视、抢险救灾、物流配送、电力巡检、农业灌溉等军用或民用任务。路径规划是无人机执行任务的关键&#xf…

第34期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大型语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以…

数据结构之绪论

一个著名公式&#xff1a; 程序数据结构算法 非数值计算&#xff1a;无法用数学的公式或方程来描述 描述非数值计算问题的数据模型不是数学方程&#xff0c;而是诸如表&#xff0c;树和图之类的具有逻辑关系的数据 数据结构&#xff1a;是一门研究非数值计算的程序设计中计算机…

pyqt6 + pycharm 搭建+使用入门

首先安装PyQt6和PyQt6-tools。使用如下命令&#xff1a; pip install PyQt6 PyQt6-tools 但是运行后会报如下错误&#xff1a; 这个时候按照提示执行命令升级pip即可 python.exe -m pip install --upgrade pip 配置pycharm&#xff1a; 打开pycharm&#xff0c;进入setting&am…

基于SpringBoot的在线问卷调查系统

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SpringBoot的在线问卷调查系统,java…

大事务提交优化

经常性的报死锁异常&#xff0c;经常性的主从延迟......通过报错信息按图索骥&#xff0c;发现代码是这样的。 这是一段商品发布的逻辑&#xff0c;我们可以看到参数校验、查询、最终的insert以及update全部揉在一个事务中。遇到批量发布商品的时候就经常出现问题了&#xff0c…

css实现一个斑马条纹动画,实现一个理发店门口的小转转,进度条动画同理!

css实现一个斑马条纹动画&#xff0c;实现一个理发店门口的小转转 前置基础知识 css背景background的重复渐变属性repeating-linear-gradient() 该属性类似于linear-gradient(),但他会在整个方向上重复渐变以覆盖整个容器 一、先写一个普通渐变例子linear-gradient() &…

【JAVA】volatile 关键字的作用

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; JAVA ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 volatile 的作用&#xff1a; 结语 我的其他博客 前言 在多线程编程中&#xff0c;保障数据的一致性和线程之间的可见性是…

Kafka(三)概述

目录 1 Kafka的应用场景1.1 活动跟踪1.2 消息传递1.3 指标和日志记录1.4 提交日志1.5 流式处理 2 Kafka的核心概念消息&#xff08;message&#xff09;键&#xff08;Key&#xff09;批次&#xff08;Batch&#xff09;模式&#xff08;Schema&#xff09;主题&#xff08;Top…

复旦MBA科创青干营(二期):探索合肥科创企业的创新之路

11月18日-19日&#xff0c;复旦MBA科创青干营二期学生开启了整合实践活动的第三次企业参访&#xff0c;前往位于合肥的蔚来第二先进制造基地、安徽万邦医药科技股份有限公司和合肥国轩高科动力能源有限公司&#xff0c;在学术导师和科创企业家“双导师”的指导下&#xff0c;深…

【C++】STL 算法 ④ ( 函数对象与谓词 | 一元函数对象 | “ 谓词 “ 概念 | 一元谓词 | find_if 查找算法 | 一元谓词示例 )

文章目录 一、函数对象与谓词1、一元函数对象2、" 谓词 " 概念3、find_if 查找算法 二、一元谓词示例1、代码示例 - 一元谓词示例2、执行结果 一、函数对象与谓词 1、一元函数对象 " 函数对象 " 是通过 重载 函数调用操作符 () 实现的 operator() , 函数对…

pythonMatplotlib六:Matplotlib的图例和注释功能

1.添加图例&#xff1a; import matplotlib.pyplot as plt# 准备数据 x [1, 2, 3, 4, 5] y1 [2, 4, 6, 8, 10] y2 [1, 3, 5, 7, 9]# 创建折线图并设置标签 plt.plot(x, y1, labelLine 1) plt.plot(x, y2, labelLine 2)# 添加图例 plt.legend()# 添加标题和标签 plt.title(&…

【数值分析】非线性方程求根,牛顿法,牛顿下山法,matlab实现

4. 牛顿法 收敛时牛顿法的收敛速度是二阶的&#xff0c;不低于二阶。如果函数有重根&#xff0c;牛顿法一般不是二阶收敛的。 x k 1 x k − f ( x k ) f ′ ( x k ) x_{k1}x_k- \frac{f(x_k)}{f(x_k)} xk1​xk​−f′(xk​)f(xk​)​ matlab实现 %% 牛顿迭代例子 f (x) x…

华为交换机常用命令

华为交换机常用命令 查看系统状态和基本信息&#xff1a; display version&#xff1a;显示设备版本信息 display device&#xff1a;显示设备硬件信息 display interface brief&#xff1a;显示接口的基本状态 display ip interface brief&#xff1a;显示IP接口的基本状态 …

创建Qt项目

项目工程名称一般不要有特殊符号&#xff0c;不要有中文 项目工程保存路径可修改的&#xff0c;但路径不要带中文 构建系统&#xff0c;有3种&#xff0c;这里使用qmake qmake和cmake区别 构建过程不同&#xff0c;项目管理不同。 1、构建过程&#xff0c;qmake是Qt框架自带的…

MySQL数据库:索引

目录 一. 索引的价值 二. 数据库与磁盘的IO 2.1 磁盘的结构 2.2 磁盘访问 2.3 MySQL与磁盘的交互 三. 对索引的理解 3.1 Page的结构 3.2 B树和B树索引结构 3.2.1 B树的结构 3.2.2 B树 3.3 聚簇索引和非聚簇索引 四. 索引的操作 4.1 索引的创建 4.2 索引的查看 4.…

new FormData 同时发送表单 json 以及文件二进制流

需要新增时同时发送表单 json 以及对应的文件即可使用以下方法传参 let formDataParams new FormData(); 首先通过 new FormData&#xff08;&#xff09; 创建你需要最后发送的表单 接着将你的对象 json 存储&#xff0c;注意使用 new Blob 创建大表单转换成 json 格式。以…