Springboot整合阿里云ONS RocketMq(4.0 http)

1. 引入依赖

<!--阿里云ons,方便的接入到云服务-->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.4.Final</version>
</dependency>

2. 配置

配置注意事项:

  1. nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
    image.png
  2. 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
  3. 订阅关系参考官方文档: 订阅关系一致
  4. 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
aliyun:rocketmq:accessKey: LTAI5txxxxxxxsecretKey: Afq06tBxrdBxxxxxxxxnameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80orderGroupId: GID_xxxxxx_testorderTag: 'order'orderTopic: vehicle-order-testmarketGroupId: GID_xxxxxx2_testmarketTag: 'market'marketTopic: vehicle-market-testvehicleGroupId: GID_xxxxxx3_testvehicleTag: 'vehicle'vehicleTopic: vehicle-order-test

3. 配置类

3.1 基本配置类

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** Rocket MQ 配置类* @author zr 2024/3/1*/
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq")
@Data
public class RocketMqConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String marketGroupId;private String marketTopic;private String marketTag;private String orderTopic;private String orderGroupId;private String orderTag;private String vehicleTopic;private String vehicleGroupId;private String vehicleTag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}

3.2 生产者配置

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author zr 2024/3/1*/
@Configuration
public class ProducerConfig {@Autowiredprivate RocketMqConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}
}

3.3 消费者配置

package com.vehicle.manager.core.config;import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.vehicle.manager.core.listener.VehicleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** RocketMq消费者* @author zr 2024/3/1*/
@Configuration
public class VehicleConsumerConfig {@Autowiredprivate RocketMqConfig mqConfig;@Autowiredprivate VehicleListener vehicleListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildVehicleBuyerConsumer() {ConsumerBean consumerBean = new ConsumerBean();//配置文件Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);//订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getVehicleTopic());subscription.setExpression(mqConfig.getVehicleTag());subscriptionTable.put(subscription, vehicleListener);//订阅多个topic如上面设置consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}

4. 生产者工具类

  • MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
  • 参数说明:
    • topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
    • tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
    • key – 业务主键
    • body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
package com.vehicle.manager.core.util;import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.vehicle.manager.core.config.RocketMqConfig;
import com.vehicle.manager.core.mapper.MessageRecordMapper;
import com.vehicle.manager.core.model.entity.MessageRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.LocalDateTime;/*** RocketMessageProducer rocketMQ消息生产者* @author zr 2024/3/1*/
@Component
@Slf4j
public class RocketMessageProducer {private static ProducerBean producer;private static RocketMqConfig mqConfig;private  static MessageRecordMapper messageRecordMapper;@Autowiredprivate  MessageRecordMapper messageRecordMapperInstance;@PostConstructpublic void init() {RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;}public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {this.producer = producer;this.mqConfig = mqConfig;}/*** 生产车辆服务普通消息* @param tag* @param key* @param body*/public  static void producerVehicleMsg(String tag, String key, String body) {Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());long time = System.currentTimeMillis();try {SendResult sendResult = producer.send(msg);assert sendResult != null;log.info(time+ " Send mq message success.Topic is:" + msg.getTopic()+ " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())+ " msgId is:" + sendResult.getMessageId());MessageRecord messageRecord = new MessageRecord();messageRecord.setPlatformType("mq");messageRecord.setMessageType("order");messageRecord.setMqMessageTopic(msg.getTopic());messageRecord.setMqMessageTag(msg.getTag());messageRecord.setMqMessageKey(msg.getKey());messageRecord.setMqMessageId(sendResult.getMessageId());messageRecord.setCreatedTime(LocalDateTime.now());messageRecord.setMessageContent(new String(msg.getBody()));messageRecordMapper.insert(messageRecord);} catch (ONSClientException e) {e.printStackTrace();log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());}}/*** 生产车辆服务延时普通消息* @param tag  order:订单服务   vehicle:主要用于本服务的超时回应* @param key* @param body* @param delay 延迟秒*/public  static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());long time = System.currentTimeMillis();msg.setStartDeliverTime(time+ delay*1000);try {SendResult sendResult = producer.send(msg);assert sendResult != null;log.info(time+ " 发送消息成功.Topic is:" + msg.getTopic()+ " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())+ " msgId 为:" + sendResult.getMessageId());} catch (ONSClientException e) {e.printStackTrace();log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());}}
}

5. 消费者监听

package com.vehicle.manager.core.listener;import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
import com.vehicle.manager.core.service.HlCarService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author zr 2024/3/1*/
@Component
@Slf4j
public class VehicleListener implements MessageListener {@Autowiredprivate HlCarService hlCarService;@Overridepublic Action consume(Message message, ConsumeContext context) {log.info("VehicleReceive 消息: " + message);try {byte[] body = message.getBody();String s = new String(body);log.info(s);// VehicleMQMessageDTO需要自行根据业务封装VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);log.info(vehicleMQMessageDTO.toString());// 以下做你的业务处理// .........return Action.CommitMessage;//进行消息的确认} catch (Exception e) {log.info(e.getMessage());//消费失败return Action.ReconsumeLater;}}
}

6. 测试

6.1 发送消息

package com.vehicle.manager.core;import com.alibaba.fastjson.JSON;
import com.vehicle.manager.api.StartApplication;
import com.vehicle.manager.core.util.RocketMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** @author zr 2024/3/1*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class MqTest {@Testpublic void producerMsg() {RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));}
}

6.2 接收消息

image.png

7. 延时消息

如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/28720.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年下一个风口是什么?萤领优选 轻资产创业项目全国诚招合伙人

2024年&#xff0c;全球经济与科技发展的步伐不断加快&#xff0c;各行各业都在探寻新的增长点与风口。在这样的时代背景下&#xff0c;萤领优选作为一个轻资产创业项目&#xff0c;正以其独特的商业模式和前瞻的市场洞察力&#xff0c;吸引着众多创业者的目光。(领取&#xff…

SqlSugar有实体CURD应用-C#

本文所述开发环境&#xff1a;.C#、NET8、Visual Studio2022 SqlSugar有实体查询数据表 首先根据《SqlSugar使用DbFirst对象根据数据库表结构创建实体类-C#》中的描述的表结构创建所有表的实体类如下&#xff1a; 表名创建的实体类名tb_studentStudenttb_teacherTeachertb_c…

++++++局部变量、全局变量及变量的存储类别++++====+++指针+++

局部变量、全局变量及变量的存储类别 局部变量与全局变量的基本概念 局部变量&#xff1a;在函数内部定义的变量称为局部变量&#xff0c;也称为内部变量。它们只在定义它们的函数内部有效&#xff0c;即只有在这个函数被调用时&#xff0c;局部变量才会被分配内存空间&#x…

102. 二叉树的层序遍历

题目描述 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 自己想到的笨方法 /*** Definition for a binary tree node.* public class TreeNode {* public var val: Int* pu…

【已解决】better-scroll在PC端如何开启鼠标滚动以及如何始终显示滚动条

总结 需要安装插件 mouse-wheel 和 scrollbar 在PC端如何开启鼠标滚动? 需要安装官方提供的滚动插件&#xff1a;mouse-wheel https://better-scroll.github.io/docs/zh-CN/plugins/mouse-wheel.html 为了开启鼠标滚动功能&#xff0c;你需要首先引入 mouseWheel 插件&…

光伏工程开发的详细步骤

光伏工程作为可再生能源领域的重要组成部分&#xff0c;其开发过程涉及多个环节&#xff0c;包括开发、测绘、设计、施工和运维等。下面将详细介绍这些步骤。 一、开发阶段 1、前期调研&#xff1a;对目标地区进行能源政策、市场需求、资源条件等方面的调研&#xff0c;评估项…

python 逻辑控制语句、循环语句

文章目录 一、逻辑控制语句&#xff08;if、elif、else&#xff09; 一、逻辑控制语句&#xff08;if、elif、else&#xff09; Python 条件语句是通过一条或多条语句的执行结果&#xff08;True或者False&#xff09;来决定执行的代码块。 python 基本的逻辑判断语法&#xff…

SuperMap GIS基础产品FAQ集锦(20240617)

一、SuperMap iDesktopX 问题1&#xff1a;请问udbx数据源的数据集最多支持多少个属性字段&#xff1f; 现在客户合并数据集后属性字段有1119个&#xff0c;导致无法复制数据集 11.1.1 【问题原因】理论上是没有上限&#xff0c;我们底层没有针对这点进行限制&#xff0c;通常…

性能优化篇

1、使用 Class 代替 ProtoBuf 协议 因为 ProtoBuf 采用的是 Arena 内存分配器策略&#xff0c;有些场景会比 C的 Class 内存管理复杂&#xff0c;当有大量内存分配和释放的时候会比 Class 的性能差很多。而且 Protobuf 会不断分配和回收小内存对象&#xff0c;持续地分配和删除…

.gitignore文件忽略的内容不生效问题解决

文章目录 ①&#xff1a;现象②&#xff1a;原因③&#xff1a;解决 ①&#xff1a;现象 在已经提交过的git管理的项目中&#xff0c; 新增加一个.gitignore文件&#xff0c;文件内忽略内容不生效或者修改.gitignore文件之后&#xff0c;文件内新增的忽略内容不生效 ②&#…

了解 Blazor Server App 项目结构

在本文中&#xff0c;你将获得以下问题的答案 先决条件如何创建 Blazor Server App 项目&#xff1f;Blazor Server 应用程序的项目结构是什么样的&#xff1f;每个默认文件夹有什么用如何设置启动razor组件或页面运行&#xff1f; 先决条件 HTML、CSS 和 Javascript 的基本…

如何使用任意浏览器远程访问本地搭建的Jellyfin影音平台

文章目录 前言1. Jellyfin服务网站搭建1.1 Jellyfin下载和安装1.2 Jellyfin网页测试 2.本地网页发布2.1 cpolar的安装和注册2.2 Cpolar云端设置2.3 Cpolar本地设置 3.公网访问测试4. 结语 前言 本文主要分享如何使用Windows电脑本地部署Jellyfin影音服务并结合cpolar内网穿透工…

如何在招聘中开始使用AI?

在人工智能时代&#xff0c;招聘团队面临着“用更少的钱做更多的事情”的压力&#xff1a;用更少的钱和更少的团队。根据一项调查&#xff0c;58%的受访者认为&#xff0c;“提高我们助教团队的效率&#xff0c;降低成本”是明年招聘职位的首要任务。在招聘中使用人工智能是提高…

【机器学习300问】119、什么是语言模型?

语言模型&#xff08;Language Models&#xff09;是自然语言处理&#xff08;NLP&#xff09;的重要组成部分&#xff0c;它的目的是量化一段文本或一个序列的概率。简单讲就是你给语言模型一个句子&#xff0c;它给你计算出特定语言中这个句子出现的概率。这样的概率度量可以…

Linux的操作命令

Linux的操作命令 &#xff08;1&#xff09;使用命令切换到/etc目录&#xff0c;并显示当前工作目录路径。 切换到/etc目录&#xff1a; cd /etc显示当前工作目录路径&#xff1a; pwd当你运行pwd命令时&#xff0c;你应该看到输出为/etc。 &#xff08;2&#xff09;使用命…

【Netty】nio阻塞非阻塞Selector

阻塞VS非阻塞 阻塞 阻塞模式下&#xff0c;相关方法都会导致线程暂停。 ServerSocketChannel.accept() 会在没有建立连接的时候让线程暂停 SocketChannel.read()会在没有数据的时候让线程暂停。 阻塞的表现就是线程暂停了&#xff0c;暂停期间不会占用CPU&#xff0c;但线程…

Excel 常用技巧(四)

Microsoft Excel 是微软为 Windows、macOS、Android 和 iOS 开发的电子表格软件&#xff0c;可以用来制作电子表格、完成许多复杂的数据运算&#xff0c;进行数据的分析和预测&#xff0c;并且具有强大的制作图表的功能。由于 Excel 具有十分友好的人机界面和强大的计算功能&am…

Compose 可组合项 - 抽屉式导航栏 Drawer

官方介绍 参考文章 一、概念 Scafford 在 material 包下可以设置 drawerContent&#xff0c;在 material3 包下已经没有。 PermanentNavigationDrawer抽屉栏永久显示&#xff0c;占用屏幕。ModalNavigationDrawer抽屉栏打开时覆盖在屏幕上。DismissibleNavigationDrawer抽屉栏…

自定义异常与全局异常处理

1. 自定义异常 1.1 创建自定义异常类 当存在业务逻辑不通畅时,我们可以将其定义为异常,由于在代码中本质不是异常,所以需要我们自定义异常,这种异常也叫做业务异常 如下定义: package com.imooc.mall.exception;/*** 自定义业务异常类* 继承异常Exception 或者RuntimeExcep…

直播美颜SDK技术指南:实现实时美颜效果的算法方案

本篇文章&#xff0c;小编将探讨直播美颜SDK的技术实现和算法方案。 一、美颜技术概述 美颜技术通过一系列图像处理算法&#xff0c;实时美颜效果可以在视频直播过程中实时呈现&#xff0c;提升用户的直播体验。为了实现这些效果&#xff0c;需要结合图像处理和计算机视觉技术…