Springboot整合阿里云ONS RocketMq(4.0 http)

1. 引入依赖

<!--阿里云ons,方便的接入到云服务-->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.4.Final</version>
</dependency>

2. 配置

配置注意事项:

  1. nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
    image.png
  2. 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
  3. 订阅关系参考官方文档: 订阅关系一致
  4. 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
aliyun:rocketmq:accessKey: LTAI5txxxxxxxsecretKey: Afq06tBxrdBxxxxxxxxnameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80orderGroupId: GID_xxxxxx_testorderTag: 'order'orderTopic: vehicle-order-testmarketGroupId: GID_xxxxxx2_testmarketTag: 'market'marketTopic: vehicle-market-testvehicleGroupId: GID_xxxxxx3_testvehicleTag: 'vehicle'vehicleTopic: vehicle-order-test

3. 配置类

3.1 基本配置类

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** Rocket MQ 配置类* @author zr 2024/3/1*/
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq")
@Data
public class RocketMqConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String marketGroupId;private String marketTopic;private String marketTag;private String orderTopic;private String orderGroupId;private String orderTag;private String vehicleTopic;private String vehicleGroupId;private String vehicleTag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}

3.2 生产者配置

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author zr 2024/3/1*/
@Configuration
public class ProducerConfig {@Autowiredprivate RocketMqConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}
}

3.3 消费者配置

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.vehicle.manager.core.listener.VehicleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** RocketMq消费者* @author zr 2024/3/1*/
@Configuration
public class VehicleConsumerConfig {@Autowiredprivate RocketMqConfig mqConfig;@Autowiredprivate VehicleListener vehicleListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildVehicleBuyerConsumer() {ConsumerBean consumerBean = new ConsumerBean();//配置文件Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);//订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getVehicleTopic());subscription.setExpression(mqConfig.getVehicleTag());subscriptionTable.put(subscription, vehicleListener);//订阅多个topic如上面设置consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}

4. 生产者工具类

  • MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
  • 参数说明:
    • topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
    • tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
    • key – 业务主键
    • body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
package com.vehicle.manager.core.util;import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.vehicle.manager.core.config.RocketMqConfig;
import com.vehicle.manager.core.mapper.MessageRecordMapper;
import com.vehicle.manager.core.model.entity.MessageRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.LocalDateTime;/*** RocketMessageProducer rocketMQ消息生产者* @author zr 2024/3/1*/
@Component
@Slf4j
public class RocketMessageProducer {private static ProducerBean producer;private static RocketMqConfig mqConfig;private  static MessageRecordMapper messageRecordMapper;@Autowiredprivate  MessageRecordMapper messageRecordMapperInstance;@PostConstructpublic void init() {RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;}public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {this.producer = producer;this.mqConfig = mqConfig;}/*** 生产车辆服务普通消息* @param tag* @param key* @param body*/public  static void producerVehicleMsg(String tag, String key, String body) {Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());long time = System.currentTimeMillis();try {SendResult sendResult = producer.send(msg);assert sendResult != null;log.info(time+ " Send mq message success.Topic is:" + msg.getTopic()+ " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())+ " msgId is:" + sendResult.getMessageId());MessageRecord messageRecord = new MessageRecord();messageRecord.setPlatformType("mq");messageRecord.setMessageType("order");messageRecord.setMqMessageTopic(msg.getTopic());messageRecord.setMqMessageTag(msg.getTag());messageRecord.setMqMessageKey(msg.getKey());messageRecord.setMqMessageId(sendResult.getMessageId());messageRecord.setCreatedTime(LocalDateTime.now());messageRecord.setMessageContent(new String(msg.getBody()));messageRecordMapper.insert(messageRecord);} catch (ONSClientException e) {e.printStackTrace();log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());}}/*** 生产车辆服务延时普通消息* @param tag  order:订单服务   vehicle:主要用于本服务的超时回应* @param key* @param body* @param delay 延迟秒*/public  static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());long time = System.currentTimeMillis();msg.setStartDeliverTime(time+ delay*1000);try {SendResult sendResult = producer.send(msg);assert sendResult != null;log.info(time+ " 发送消息成功.Topic is:" + msg.getTopic()+ " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())+ " msgId 为:" + sendResult.getMessageId());} catch (ONSClientException e) {e.printStackTrace();log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());}}
}

5. 消费者监听

package com.vehicle.manager.core.listener;import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
import com.vehicle.manager.core.service.HlCarService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author zr 2024/3/1*/
@Component
@Slf4j
public class VehicleListener implements MessageListener {@Autowiredprivate HlCarService hlCarService;@Overridepublic Action consume(Message message, ConsumeContext context) {log.info("VehicleReceive 消息: " + message);try {byte[] body = message.getBody();String s = new String(body);log.info(s);// VehicleMQMessageDTO需要自行根据业务封装VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);log.info(vehicleMQMessageDTO.toString());// 以下做你的业务处理// .........return Action.CommitMessage;//进行消息的确认} catch (Exception e) {log.info(e.getMessage());//消费失败return Action.ReconsumeLater;}}
}

6. 测试

6.1 发送消息

package com.vehicle.manager.core;import com.alibaba.fastjson.JSON;
import com.vehicle.manager.api.StartApplication;
import com.vehicle.manager.core.util.RocketMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** @author zr 2024/3/1*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class MqTest {@Testpublic void producerMsg() {RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));}
}

6.2 接收消息

image.png

7. 延时消息

如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/28720.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年下一个风口是什么?萤领优选 轻资产创业项目全国诚招合伙人

2024年&#xff0c;全球经济与科技发展的步伐不断加快&#xff0c;各行各业都在探寻新的增长点与风口。在这样的时代背景下&#xff0c;萤领优选作为一个轻资产创业项目&#xff0c;正以其独特的商业模式和前瞻的市场洞察力&#xff0c;吸引着众多创业者的目光。(领取&#xff…

SqlSugar有实体CURD应用-C#

本文所述开发环境&#xff1a;.C#、NET8、Visual Studio2022 SqlSugar有实体查询数据表 首先根据《SqlSugar使用DbFirst对象根据数据库表结构创建实体类-C#》中的描述的表结构创建所有表的实体类如下&#xff1a; 表名创建的实体类名tb_studentStudenttb_teacherTeachertb_c…

++++++局部变量、全局变量及变量的存储类别++++====+++指针+++

局部变量、全局变量及变量的存储类别 局部变量与全局变量的基本概念 局部变量&#xff1a;在函数内部定义的变量称为局部变量&#xff0c;也称为内部变量。它们只在定义它们的函数内部有效&#xff0c;即只有在这个函数被调用时&#xff0c;局部变量才会被分配内存空间&#x…

【已解决】better-scroll在PC端如何开启鼠标滚动以及如何始终显示滚动条

总结 需要安装插件 mouse-wheel 和 scrollbar 在PC端如何开启鼠标滚动? 需要安装官方提供的滚动插件&#xff1a;mouse-wheel https://better-scroll.github.io/docs/zh-CN/plugins/mouse-wheel.html 为了开启鼠标滚动功能&#xff0c;你需要首先引入 mouseWheel 插件&…

光伏工程开发的详细步骤

光伏工程作为可再生能源领域的重要组成部分&#xff0c;其开发过程涉及多个环节&#xff0c;包括开发、测绘、设计、施工和运维等。下面将详细介绍这些步骤。 一、开发阶段 1、前期调研&#xff1a;对目标地区进行能源政策、市场需求、资源条件等方面的调研&#xff0c;评估项…

SuperMap GIS基础产品FAQ集锦(20240617)

一、SuperMap iDesktopX 问题1&#xff1a;请问udbx数据源的数据集最多支持多少个属性字段&#xff1f; 现在客户合并数据集后属性字段有1119个&#xff0c;导致无法复制数据集 11.1.1 【问题原因】理论上是没有上限&#xff0c;我们底层没有针对这点进行限制&#xff0c;通常…

.gitignore文件忽略的内容不生效问题解决

文章目录 ①&#xff1a;现象②&#xff1a;原因③&#xff1a;解决 ①&#xff1a;现象 在已经提交过的git管理的项目中&#xff0c; 新增加一个.gitignore文件&#xff0c;文件内忽略内容不生效或者修改.gitignore文件之后&#xff0c;文件内新增的忽略内容不生效 ②&#…

了解 Blazor Server App 项目结构

在本文中&#xff0c;你将获得以下问题的答案 先决条件如何创建 Blazor Server App 项目&#xff1f;Blazor Server 应用程序的项目结构是什么样的&#xff1f;每个默认文件夹有什么用如何设置启动razor组件或页面运行&#xff1f; 先决条件 HTML、CSS 和 Javascript 的基本…

如何使用任意浏览器远程访问本地搭建的Jellyfin影音平台

文章目录 前言1. Jellyfin服务网站搭建1.1 Jellyfin下载和安装1.2 Jellyfin网页测试 2.本地网页发布2.1 cpolar的安装和注册2.2 Cpolar云端设置2.3 Cpolar本地设置 3.公网访问测试4. 结语 前言 本文主要分享如何使用Windows电脑本地部署Jellyfin影音服务并结合cpolar内网穿透工…

如何在招聘中开始使用AI?

在人工智能时代&#xff0c;招聘团队面临着“用更少的钱做更多的事情”的压力&#xff1a;用更少的钱和更少的团队。根据一项调查&#xff0c;58%的受访者认为&#xff0c;“提高我们助教团队的效率&#xff0c;降低成本”是明年招聘职位的首要任务。在招聘中使用人工智能是提高…

【机器学习300问】119、什么是语言模型?

语言模型&#xff08;Language Models&#xff09;是自然语言处理&#xff08;NLP&#xff09;的重要组成部分&#xff0c;它的目的是量化一段文本或一个序列的概率。简单讲就是你给语言模型一个句子&#xff0c;它给你计算出特定语言中这个句子出现的概率。这样的概率度量可以…

【Netty】nio阻塞非阻塞Selector

阻塞VS非阻塞 阻塞 阻塞模式下&#xff0c;相关方法都会导致线程暂停。 ServerSocketChannel.accept() 会在没有建立连接的时候让线程暂停 SocketChannel.read()会在没有数据的时候让线程暂停。 阻塞的表现就是线程暂停了&#xff0c;暂停期间不会占用CPU&#xff0c;但线程…

Excel 常用技巧(四)

Microsoft Excel 是微软为 Windows、macOS、Android 和 iOS 开发的电子表格软件&#xff0c;可以用来制作电子表格、完成许多复杂的数据运算&#xff0c;进行数据的分析和预测&#xff0c;并且具有强大的制作图表的功能。由于 Excel 具有十分友好的人机界面和强大的计算功能&am…

直播美颜SDK技术指南:实现实时美颜效果的算法方案

本篇文章&#xff0c;小编将探讨直播美颜SDK的技术实现和算法方案。 一、美颜技术概述 美颜技术通过一系列图像处理算法&#xff0c;实时美颜效果可以在视频直播过程中实时呈现&#xff0c;提升用户的直播体验。为了实现这些效果&#xff0c;需要结合图像处理和计算机视觉技术…

生产 的mybatisplus 日志输入到日志文件

默认是输出到控制台.不输出到日志文件 输出到日志文件.需要修改配置 第一步. logging:config: classpath:logback-wshoto.xml第二步 mybatis-plus:configuration:cache-enabled: truedefault-executor-type: reuselog-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl第三步…

别太小看“静态免杀“

0x01 简述 免杀总体来说可分为两种&#xff0c;静态免杀/动态免杀。往往来说&#xff0c;我们更注重于在内部代码层面实现一些免杀技巧&#xff0c;但在有些时候&#xff0c;动态免杀静态免杀以"打组合拳"的方式效果往往会更出人所料。 当我们的程序生成后&#xf…

Spring IOC 控制反转(注解版)

Spring IOC 控制反转 文章目录 Spring IOC 控制反转一、前言什么是控制反转&#xff08;IOC&#xff09;什么是依赖注入&#xff08;DI&#xff09; 二、介绍 IOC2.1 传统思想代码2.2 解决方案2.3 IOC思想代码2.4 IOC 使用&#xff08;Autowired依赖注入&#xff09;2.5 IOC 优…

从零开始学GeoServer源码(一)(搭建开发环境Win10+IDEA23.3.5+jdk11+geoserver2.24.x)

搭建开发环境 参考资料 0、基础环境准备0.1、idea0.2、jdk0.3、源码 1、导入工程2、配置启动环境2.1、打开新增配置面板2.2、配置工作目录2.2.1、从常用配置中选择2.2.2、直接粘贴 2.3最终效果 3、调整源码3.1、添加maven引用3.2、注释无效代码3.3、删除测试代码 4、修改运行端…

LeetCode esay mid 记录

1486. 数组异或操作 感觉一般也用不到 emmm 灵茶山艾府传送门 推导过程可以结合官网部分观看 重点由两部分的结合 将特定部分转换为常见部分 0到n的异或和表示 2595. 奇偶位数 0x555是十六进制数&#xff0c;转换为二进制为 0101 0101 0101 class Solution {public int[…

【面试 - 页面优化举例】页面跳转卡顿问题解决 - 页面跳转速度优化

目录 为何要优化如何优化优化1 - 懒加载优化2 - el-tree 子节点默认不展开 为何要优化 页面A跳转到也页面B时&#xff0c;页面出现卡顿情况&#xff1a; 【问题】页面A → 页面B时&#xff0c;页面B进入到了 created 钩子后过了六七秒才进入到 mounted 钩子&#xff1b;【分析经…