SpringBoot整合【RocketMQ】

目录

1.POM文件添加依赖及yml配置

2.RocketmqUtil

3.生产者(异步发送示例)

4.消费者

5.测试


1.POM文件添加依赖及yml配置

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
rocketmq:name-server: 127.0.0.1:9876producer:group: My_Groupsend-message-timeout: 3000retry-times-when-send-failed: 3retry-times-when-send-async-failed: 3

2.RocketmqUtil

package com.kaying.marketing.platform.common.util.rocketMq;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** @Description: RocketMQ消息的生产者* @Author: hwk*/@Component
@Slf4j
public class RocketMqUtil {@Autowiredprivate RocketMQTemplate rocketMqTemplate;public void sendMsg(String topic,String data) {rocketMqTemplate.convertAndSend(topic,data);log.info("【RocketMQ】发送同步消息:{}", data);}public void asyncSend(String topic, String tag, String data,Integer messageDelayLevel) {rocketMqTemplate.asyncSend(topic + ":" + tag, MessageBuilder.withPayload(data).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功log.error("消息发送成功"+sendResult);}@Overridepublic void onException(Throwable throwable) {// 消息发送异常log.error("异步发送消息异常。topic:" + topic + ";tag:" + tag + ";mqMsg" + data, throwable);}},3000L,// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d// messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19messageDelayLevel);}/*** 发送同步消息:消息响应后发送下一条消息** @param topic 消息主题* @param tag   消息tag* @param key   业务号* @param data  消息内容*/public void sendSyncMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSend(destination, message);log.info("【RocketMQ】发送同步消息:{}", sendResult);}/*** 发送异步消息:异步回调通知消息发送的状况** @param topic 消息主题* @param tag   消息tag* @param key   业务号* @param data  消息内容*/public void sendAsyncMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSend(destination, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());}});}/*** 发送单向消息:消息发送后无响应,可靠性差,效率高** @param topic 消息主题* @param tag   消息tag* @param key   业务号* @param data  消息内容*/public void sendOneWayMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.sendOneWay(destination, message);}/*** 同步延迟消息** @param topic      主题* @param tag        标签* @param key        业务号* @param data       消息体* @param timeout    发送消息的过期时间* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h**/public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {// messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d// messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSend(destination, message, timeout, delayLevel);log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);}/*** 异步延迟消息** @param topic      主题* @param tag        标签* @param key        业务号* @param data       消息体* @param timeout    发送消息的过期时间* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSend(destination, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());}}, timeout, delayLevel);}/*** 同步顺序消息** @param topic 主题* @param tag   标签* @param key   业务号* @param data  消息体*/public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;SendResult sendResult = rocketMqTemplate.syncSendOrderly(destination, message, key);log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);}/*** 异步顺序消息** @param topic 主题* @param tag   标签* @param key   业务号* @param data  消息体*/public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {//消息Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();//主题String destination = topic + ":" + tag;rocketMqTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());}});}
}

3.生产者(异步发送示例)

//异步发送消息代码示例
rocketMqUtil.sendAsyncMsg(RocketConstant.TEST_TOPIC1, RocketConstant.TEST_TAG1, UUID.randomUUID().toString(), "测试消息一");

4.消费者

简单的负载均衡消费的示例(指定topic和tag,相同的组即为负载均衡消费)

也可以指定不同的topic和不同的tag进行消息区分

注意线上和本地连接同一个MQ也会导致负载均衡,导致线上消息丢失

    @RocketMQMessageListener(consumerGroup = "1",topic = RocketConstant.TEST_TOPIC1,selectorExpression = RocketConstant.TEST_TAG1)@Servicepublic class RocketConsumerTag1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {String orderNo = message;log.info("tag1,接收:{}", orderNo);}}@RocketMQMessageListener(consumerGroup ="1",topic = RocketConstant.TEST_TOPIC1,selectorExpression = RocketConstant.TEST_TAG1)@Servicepublic class RocketConsumerTag2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {String orderNo = message;log.info("tag2,接收:{}", orderNo);}}

5.测试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/726735.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【安装教程】安装tensorflow-gpu版本

【安装教程】安装tensorflow-gpu版本 NOTE:准备工作TensorFlow安装1、 确定TensorFlow版本2、使用pip直接安装3、输入安装指令 TensorFlow安装后测试 NOTE: 如果不是首次安装tensorflow&#xff0c;卸载TensorFlow相关的内容&#xff0c;包括依赖的包&#xff08;tensorflow-e…

光线追踪10 - Dielectrics( 电介质 )

水、玻璃和钻石等透明物质都属于电介质。当光线射入这些物质时&#xff0c;会分为反射光线和折射&#xff08;透射&#xff09;光线。我们将通过随机选择反射或折射来处理这一现象&#xff0c;每次相互作用只生成一条散射光线。11.1 Refraction 最难调试的部分是折射光线。通常…

铅酸废电池回收螯合树脂CH-90除镉系统

项目名称 某再生资源公司铅酸废电池回收除镉项目 工艺选择 化学沉淀系统过滤系统螯合树脂深度除镉系统 工艺原理 镉离子沉淀后进入螯合树脂除镉树脂 项目背景 铅酸蓄电池作为广泛应用的化学电源&#xff0c;凭借其电压稳定性、优异的功率性能&#xff0c;以及高性价比等…

LVS集群(Linux Virtual server)

集群概念lvs模型lvs调度算法lvs实现lvs高可用性&#xff0c;负载均衡 1 集群和分布式 系统性能扩展方式&#xff1a; Scale UP&#xff1a;垂直扩展&#xff0c;向上扩展,增强&#xff0c;性能更强的计算机运行同样的服务 升级单机的硬件设备Scale Out&#xff1a;水平扩展…

Linux Ubuntu系统安装MySQL并实现公网连接本地数据库【内网穿透】

文章目录 前言1 .安装Docker2. 使用Docker拉取MySQL镜像3. 创建并启动MySQL容器4. 本地连接测试4.1 安装MySQL图形化界面工具4.2 使用MySQL Workbench连接测试 5. 公网远程访问本地MySQL5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定TCP地址远程访问 前言 本文主…

el-table 插入单选并进行校验

<template><div><el-form :model"list" ref"ruleForm"><el-table :data"list.tableData" style"width: 100%"><el-table-column prop"time" label"日期" width"180"><…

STM32 学习9 中断、外部中断及定时器中断

STM32 学习9 中断、外部中断及定时器中断 一、STM32中断介绍一、STM32中断介绍1. 什么是中断&#xff1f;2. 中断在嵌入式系统中的作用和重要性3. STM32中断的概述 4. 中断的优先级4.1 中断优先级级别4.2 中断优先级分类&#xff08;1&#xff09;硬件优先级&#xff08;2&…

挑战杯 基于深度学习的目标检测算法

文章目录 1 简介2 目标检测概念3 目标分类、定位、检测示例4 传统目标检测5 两类目标检测算法5.1 相关研究5.1.1 选择性搜索5.1.2 OverFeat 5.2 基于区域提名的方法5.2.1 R-CNN5.2.2 SPP-net5.2.3 Fast R-CNN 5.3 端到端的方法YOLOSSD 6 人体检测结果7 最后 1 简介 &#x1f5…

Nuxt2升级Nuxt3指南(二):nuxt.config.js配置文件

一、代码移植原则 前置说明&#xff1a;根据项目开发的实际情况&#xff0c;本次升级不采用Typescript。 升级的原则是开始尽量的简单配置&#xff0c;将代码分阶段逐步移植到新版本框架上&#xff0c;遇到问题逐一排查解决。 大致阶段&#xff0c;可以分为&#xff1a; 第一…

在idea中如何开启项目的热部署

热部署&#xff1a;就是当我们IDEA的项目在运行期间&#xff0c;我们修改代码以后&#xff0c;不需要我们自己重启项目&#xff0c;IDEA就会自动的重启项目 在idea中开启项目热部署的步骤 第一步&#xff1a;引入热部署的依赖 <dependency><groupId>org.springfr…

STP---生成树协议

STP的作用 a)Stp通过阻塞端口来消除环路&#xff0c;并能够实现链路备份目的 b)消除了广播风暴 c)物理链路冗余&#xff0c;网络变成了层次化结构的网络 STP操作 选举一个根桥每个非根交换机选举一个根端口每个网段选举一个指定端口阻塞非根&#xff0c;非指定端口 STP--生成树…

基于单片机的智能空调设计

目 录 摘 要 I Abstract II 引 言 1 1 系统整体设计 3 1.1 系统方案设计 3 1.2 系统工作原理 3 2 硬件设计 5 2.1 电源模块设计 5 2.1.1 电源模块选择 5 2.1.2 电源模块电路设计 5 2.2 单片机模块设计 5 2.2.1 单片机型号选择 5 2.2.2 单片机模块电路设计 6 2.3 按键模块设计 …

vue3中el-input输入无效的原因之一

表单的model用的是&#xff1a;reactive let updateForm reactive({ id: 0, className: "" }); reactive的数据不能这么赋值&#xff0c;会破坏响应性 错误方法&#xff08;&#xff09;{ updateForm { id: 0, className: "asdasdas" }; } 解决方法&…

1.5如何缓解图像分类任务中训练数据不足带来的问题?

1.5 图像数据不足时的处理方法 场景描述 在机器学习中&#xff0c;绝大部分模型都需要大量的数据进行训练和学习(包括有监督学习和无监督学习)&#xff0c;然而在实际应用中经常会遇到训练数据不足的问题。 比如图像分类&#xff0c;作为计算机视觉最基本的任务之一&#xff0…

高效提升控制效率 | 基于ACM32 MCU的LED灯箱控制器方案

LED灯箱上各种文字、图案有序跳跃、交替辉映&#xff0c;产生强烈的视觉冲击力&#xff0c;被广泛应用于商场、美容美发、宾馆、娱乐场所等地方。 锁存器的工作原理 在LED和数码管显示方面&#xff0c;要维持一个数据的显示&#xff0c;往往要持续的快速的刷新。尤其是在四段八…

Python算法100例-3.6 自守数

1.问题描述2.问题分析3.算法设计4.求给定数的位数5.分离给定数中的最后几位6.确定程序框架7.完整的程序 1&#xff0e;问题描述 自守数是指一个数的平方的尾数等于该数自身的自然数。例如&#xff0c; 5 2 25 &#xff0c; 2 5 2 625 &#xff0c; 7 6 2 5776 &#xff0c…

java基础-锁之volatilesynchronized

文章目录 volatilevolatile内存语义volatile的可见性volatile无法保证原子性volatile禁止重排优化硬件层的内存屏障volatile内存语义的实现下面是基于保守策略的JMM内存屏障插入策略。下面是保守策略下&#xff0c;volatile写插入内存屏障后生成的指令序列示意图下图是在保守策…

Android APP性能指标(二)

文章目录 一、响应时间1.1 数据获取1.2 响应时间指标测试点1.3 启动速度测试点1.4 响应时间测试解决方法 二、流量2.1 数据获取2.2 流量测试关注点2.3 测试标准 三、电量3.1 连接手机3.2 数据获取3.3 获取APP的UID3.3 重置电池数据收集数据3.4 电量指标测试 四、温度五、性能测…

打包系统待优化点

Base.Widget.AppCompat.ActivityChooserView中相关资源重复 D:\channelPackage\ToolConfigPath\games\dcpPro\100081\mumu\tempRes\values\attrs.xml:1171: error: duplicate value for resource attr/displayOptions with config . D:\channelPackage\ToolConfigPath\games\d…

【C++精简版回顾】19.异常处理

1.throw抛出问题 int print(int a,int b) {if (b 0)throw b;return a / b; } 2.try与catch解决问题 try {print(2, 0); } catch (int b) {cout << "竟然是&#xff1a;"<<b<<endl; } 结果&#xff1a; 补充1&#xff1a;可以抛出字符串等 1.throw…