阿里云RocketMQ消费MQTT消息

业务背景:

        项目中涉及的消息队列既有RocketMQ,又有MQTT,均为阿里云提供(阿里云有专门的“微消息队列 MQTT 版”模块,但博主公司消息队列的实例都在“消息队列 RocketMQ 版”模块下,只是实例不同,猜测是做了适配,有清楚的大佬欢迎指点)。其中MQTT的消息是由硬件设备上报而来,由java服务进行消费,使用一套内部框架连接。现因框架存在适配问题,经讨论决定放弃使用,需改造原消费代码。

技术方案:

       查询资料与询问同事得知两种消息队列在底层逻辑上高度相似,可以用RocketMQ方式连接MQTT的topic并消费。在实际改造过程中发现二者均可通过com.aliyun.openservices.ons.api.ONSFactory#createConsumer这一阿里云官方接口进行连接。而项目中已有一套配置用于连接RocketMQ,两类消息队列的地址、密钥等实例信息并不相同。因此改造的重点其实就是实现RocketMQ多实例配置。

代码实现:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 自定义注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface OnsMqttMessageListener {String topic();String consumerGroup();}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;/*** 参数类*/
@Data
@Component
@EnableConfigurationProperties
@ConfigurationProperties(prefix = "aliyun.rocketmq")
public class OnsMqttMessageProperties {private String onsAddr;private String accessKey;private String secretKey;private String groupId;}
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.geelytech.bms.annotation.OnsMqttMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.Properties;/*** 配置类*/
@Slf4j
@Component
public class OnsMqttMessageConsumerConfig implements ApplicationListener<ApplicationReadyEvent> {@Autowiredprivate OnsMqttMessageProperties onsMqttMessageProperties;/*** 注入所有MessageListener实例*/@Autowiredprivate Map<String, MessageListener> messageListeners;@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {String onsAddr = onsMqttMessageProperties.getOnsAddr();String accessKey = onsMqttMessageProperties.getAccessKey();String secretKey = onsMqttMessageProperties.getSecretKey();log.info("ConsumerSubscriptionConfig onApplicationEvent messageListeners={}", messageListeners);// 订阅每个MessageListenerfor (Map.Entry<String, MessageListener> entry : messageListeners.entrySet()) {MessageListener listener = entry.getValue();OnsMqttMessageListener annotation = AnnotationUtils.findAnnotation(listener.getClass(), OnsMqttMessageListener.class);if (annotation != null) {String topic = annotation.topic();String consumerGroup = annotation.consumerGroup();Properties properties = new Properties();properties.setProperty(PropertyKeyConst.GROUP_ID, consumerGroup);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, onsAddr);properties.setProperty(PropertyKeyConst.AccessKey, accessKey);properties.setProperty(PropertyKeyConst.SecretKey, secretKey);Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(topic, "*", listener);consumer.start();}}}}
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.geelytech.bms.annotation.OnsMqttMessageListener;
import com.geelytech.satellite.constant.TopicConstant;
import com.geelytech.satellite.dto.eventOriginalData.BizSwapDone;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import static com.aliyun.openservices.ons.api.Action.CommitMessage;@Slf4j
@Component
@OnsMqttMessageListener(topic = "", consumerGroup = "")
public class BmsBizSwapDoneConsumer implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext consumeContext) {// 业务逻辑return CommitMessage;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/4700.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Oracle】python调取oracle数据教程

目录 &#xff08;1&#xff09;安装python和相关库 1.python的下载和安装 2.python安装cx_Oracle库和pandas库 3.本机安装instantclient 数据库客户端 先安装instantclient 然后设置环境变量 &#xff08;2&#xff09;准备好连接Oracle数据库地址等五项信息 &#xf…

Java基础(3)String、StringBuffer、StringBuilder

在Java中&#xff0c;字符串处理是日常开发的重要组成部分。主要有三种类型的类用于创建和操作字符串&#xff1a;String、StringBuffer和StringBuilder。虽然这三个类都能够处理字符串&#xff0c;但它们在功能和性能方面存在显著差异。 String String是不可变的&#xff08…

VUE3与Uniapp 五 (v-if、v-show和template的使用)

<template><!-- v-if如果是false&#xff0c;则不会出现在DOM中&#xff0c;不会被渲染&#xff1b;v-show如果为false&#xff0c;则会出现在DOM中&#xff0c;并加载资源&#xff08;如图片&#xff09;&#xff0c;只是CSS隐藏了。 --><view v-if"day1&…

秋招后端开发面试题 - Java多线程(上)

目录 Java多线程前言面试题线程和进程&#xff1f;说说线程有几种创建方式&#xff1f;为什么调用 start() 方法时会执行 run() 方法&#xff0c;那怎么不直接调用 run() 方法&#xff1f;线程有哪些常用的调度方法&#xff1f;线程有几种状态&#xff1f;守护线程了解吗&#…

深入理解汇编中的ZF、OF、SF标志位和条件跳转

本节课在线学习视频&#xff1a;https://pan.quark.cn/s/bbc4781e5336 汇编语言中的程序控制流常依赖于处理器的状态标志来进行决策。在x86架构中&#xff0c;ZF&#xff08;Zero Flag&#xff09;、OF&#xff08;Overflow Flag&#xff09;和SF&#xff08;Sign Flag&#x…

Linux(Centos 7)环境下安装wget,并且更换阿里云镜像

Linux(Centos 7) Minimal 安装后&#xff0c;由于没有预装wget&#xff0c;在使用wget命令去下载安装相关应用时&#xff0c;提示&#xff1a;“wget: command not found” 先在Linux服务器窗口中&#xff0c;输入如下命令&#xff0c;检查Linux服务器有没有安装过wget。 rpm -…

Django信号(Signals)使用案例:自动化工作流程

Django信号&#xff08;Signals&#xff09;是一种可以让应用程序组件之间进行解耦的机制。它允许在特定事件发生时发送信号&#xff0c;其他组件可以监听这些信号并做出相应的处理。 在自动化工作流程中&#xff0c;Django信号可以用来触发自动化任务或流程。以下是一个使用D…

deepflow grafana plugin 编译问题解决

修改tsconfig.js 增加"noImplicitAny": false&#xff0c;解决代码类型没有指定&#xff0c;显示Any 错误 To solve the error, explicitly set the parameters type to any, use a more specific type or set noImplicitAny to false in tsconfig.json. https://b…

【大学生电子竞赛题目分析】——2023年H题《信号分离装置》

今年的大赛已临近落幕&#xff0c;笔者打算陆续对几个熟悉领域的题目作一番分析与讨论&#xff0c;今天首先分析H题。 网上有一些关于H题的分析&#xff0c;许多都是针对盲信号分析的。然而本题具有明确的信号频率范围&#xff0c;明确的信号可能频率&#xff0c;明确的信号波…

Jmeter Beanshell 设置全局变量

//获取token import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONArray; import java.util.*; import org.apache.jmeter.util.JMeterUtils; //获取可上机机器 String response prev.getResponseDataAsString(); JSONObject responseObect JSONObjec…

什么是跨域? 出现原因及解决方法

什么是跨域? 出现原因及解决方法 什么是跨域 跨域&#xff1a;浏览器对于javascript的同源策略的限制 。 同源政策的目的&#xff0c;是为了保证用户信息的安全&#xff0c;防止恶意的网站窃取数据。 设想这样一种情况&#xff1a;A 网站是一家银行&#xff0c;用户登录以后…

K8S哲学 - statefulSet 灰度发布

kubectl get - 获取资源及配置文件 kubectl get resource 【resourceName -oyaml】 kubectl create - 指定镜像创建或者 指定文件创建 kubectl create resource 【resourceName】 --imagemyImage 【-f my.yaml】 kubectl delete kubectl describe resource resourc…

OceanBase 分布式数据库【信创/国产化】- 登录 OceanBase 租户

本心、输入输出、结果 文章目录 OceanBase 分布式数据库【信创/国产化】- 登录 OceanBase 租户前言OceanBase 数据更新架构OceanBase 租户架构登录系统租户通过 MySQL 客户端登录通过 OBClient 登录登录最佳实践登录用户租户登录 Meta 租户OceanBase 分布式数据库【信创/国产化…

UCOSIII章节介绍

UCOSIII章节介绍 一、第一部分&#xff08;第一章 至 第三十二章&#xff09;1、整体介绍2、单章介绍第一章 至 第三章&#xff1a;总体概览第四章 至 第五章&#xff1a;准备工作&#xff0c;配置KEIL工程环境第六章&#xff1a;准备工作&#xff0c;KEIL仿真介绍第七章&#…

怎么把试卷答案去掉再打印出来?

在学习中&#xff0c;试卷无疑是检验学习成果的重要工具。然而&#xff0c;当我们想重新练习这些试卷&#xff0c;加深对知识点的理解和记忆时&#xff0c;答案的存在往往会成为他们复习路上的“绊脚石”。那么&#xff0c;有没有一种方法可以轻松去除试卷上的答案&#xff0c;…

亚马逊云科技AWS将推出数据工程师全新认证(有资料)

AWS认证体系最近更新&#xff0c;在原有12张的基础上&#xff0c;将在2023年11月27日添加第13张&#xff0c;数据工程师助理级认证(Data Engineer Associate)&#xff0c;并且在2024/1/12前半价(省75刀&#xff1d;544人民币。 原有的数据分析专家级认证(Data Analytics Specia…

Unity功能——开发中逻辑坐标和世界坐标是什么?

声明&#xff1a;本文为个人笔记&#xff0c;用于学习研究使用非商用&#xff0c;内容为个人研究及综合整理所得&#xff0c;若有违规&#xff0c;请联系&#xff0c;违规必改。 Unity功能——逻辑坐标和世界坐标 文章目录 Unity功能——逻辑坐标和世界坐标一.开发环境二.问题描…

qt-C++笔记之滑动条QSlider和QProgressBar进度条

qt-C笔记之滑动条QSlider和QProgressBar进度条 —— 2024-04-28 杭州 本例来自《Qt6 C开发指南》 文章目录 qt-C笔记之滑动条QSlider和QProgressBar进度条1.运行2.阅读笔记3.文件结构4.samp4_06.pro5.main.cpp6.widget.h7.widget.cpp8.widget.ui 1.运行 2.阅读笔记 3.文件结构…

RuoYi-Vue-Plus (SPEL 表达式)

RuoYi-Vue-Plus 中SPEL使用 DataScopeType 枚举类中&#xff1a; /*** 部门数据权限*/DEPT("3", " #{#deptName} #{#user.deptId} ", " 1 0 "), PlusDataPermissionHandler 拦截器中定义了解析器&#xff1a; buildDataFilter 方法中根据注解的…

[LitCTF 2023]Ping、[SWPUCTF 2021 新生赛]error、[NSSCTF 2022 Spring Recruit]babyphp

[LitCTF 2023]Ping 尝试ping一下127.0.0.1成功了&#xff0c;但要查看根目录时提示只能输入IP 查看源代码&#xff0c;这段JavaScript代码定义了一个名为check_ip的函数&#xff0c;用于验证输入是否为有效的IPv4地址。并且使用正则表达式re来匹配IPv4地址的格式。 对于这种写…