springboot整合mqtt实现消息订阅和推送

前言

mica-mqtt-client-spring-boot-starter是一个基于Spring Boot的MQTT客户端启动器,它集成了mica-mqtt客户端,提供了在Spring Boot应用程序中使用MQTT协议进行消息通信的能力。以下是关于mica-mqtt-client-spring-boot-starter的简介:

特点:

  • 简单易用:通过Spring Boot的自动配置,可以轻松地集成到Spring应用程序中,并使用Spring的注解或Java配置进行MQTT客户端的配置。

  • 低延迟:支持MQTT协议,能够实现实时消息通信,具有较低的延迟。

  • 高性能:基于mica-mqtt客户端,具有高效的消息处理和网络通信能力,能够处理大量的并发连接和消息。

  • 集群支持:支持基于Redis的发布/订阅模式的集群,可以实现多个节点之间的消息同步和负载均衡。

  • 使用场景:适用于需要使用MQTT协议进行消息通信的物联网、实时应用、移动应用等领域。可以在云端或边缘端使用,实现设备与设备之间、设备与服务器之间的消息通信。

  • 集成方式:通过在Spring Boot项目中添加相关依赖,并配置MQTT客户端的相关参数,即可快速集成mica-mqtt-client-spring-boot-starter。具体的使用方法可以参考官方文档和示例代码。

  • 注意事项:在使用过程中需要注意确保网络连接的稳定性和安全性,并根据实际需求进行适当的配置和优化。同时,也需要关注数据安全和隐私保护等方面的问题。

总之,mica-mqtt-client-spring-boot-starter是一个方便、高效、可靠的MQTT客户端启动器,适用于需要使用MQTT协议进行消息通信的Spring Boot应用程序。

功能

  • 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。

  • 支持 websocket mqtt 子协议(支持 mqtt.js)。

  • 支持 http rest api,http api 文档详见[1]。

  • 支持 MQTT client 客户端。

  • 支持 MQTT server 服务端。

  • 支持 MQTT client、server 共享订阅支持(捐助VIP版采用 topic 树存储,跟 topic 数无关,百万 topic 性能依旧)。

  • 支持 MQTT 遗嘱消息。

  • 支持 MQTT 保留消息。

  • 支持自定义消息(mq)处理转发实现集群。

  • MQTT 客户端 阿里云 mqtt 连接 demo。

  • 支持 GraalVM 编译成本机可执行程序。

  • 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。

  • mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。

  • 基于 redis pub/sub 实现集群,详见 mica-mqtt-broker 模块[2]

教程

添加依赖

在springboot项目中添加maven依赖:

        <!-- https://mvnrepository.com/artifact/net.dreamlu/mica-mqtt-client-spring-boot-starter --><dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqtt-client-spring-boot-starter</artifactId><version>2.2.8</version></dependency>

配置参数

在spring配置文件中配置mqtt相关参数,配置如下:

mqtt:server:    enabled: false              # 是否开启服务端,默认:falseclient:enabled: true               # 是否开启客户端,默认:falseip: 172.16.10.203   # 连接的服务端 ip ,默认:127.0.0.1port: 1883                  # 端口:默认:1883name: Mica2-Mqtt2-Client      # 名称,默认:Mica-Mqtt-ClientclientId: coalface_safety_3d            # 客户端Id(非常重要,一般为设备 sn,不可重复)user-name: admin           # 认证的用户名 你的用户名password: 3@!cHy@j       # 认证的密码timeout: 5                  # 连接超时时间,单位:秒,默认:5秒reconnect: true             # 是否重连,默认:truere-interval: 5000           # 重连时间,默认 5000 毫秒version: MQTT_3_1           # mqtt 协议版本,默认:3.1.1read-buffer-size: 8092      # 接收数据的 buffer size,默认:8092max-bytes-in-message: 8092  # 消息解析最大 bytes 长度,默认:8092buffer-allocator: heap      # 堆内存和堆外内存,默认:堆内存keep-alive-secs: 60         # keep-alive 心跳维持时间,单位:秒clean-session: false         # mqtt clean session,默认:truewill-message:                # 消息遗嘱qos: at_least_oncessl:enabled: false            # 是否开启 ssl 认证,2.1.0 开始支持双向认证keystore-path:            # 可选参数:ssl 双向认证 keystore 目录,支持 classpath:/ 路径。keystore-pass:            # 可选参数:ssl 双向认证 keystore 密码truststore-path:          # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。truststore-pass:          # 可选参数:ssl 双向认证 truststore 密码
  • 注意:ssl 存在三种情况
服务端开启ssl客户端
ClientAuth 为 NONE(不需要客户端验证)仅仅需要开启 ssl 即可不用配置证书
ClientAuth 为 OPTIONAL(与客户端协商)需开启 ssl 并且配置 truststore 证书
ClientAuth 为 REQUIRE (必须的客户端验证)需开启 ssl 并且配置 truststore、 keystore证书

创建订阅

创建一个mqtt订阅消息监听类,例如SimulationSubscriber,代码如下:


import com.alibaba.fastjson.JSONObject;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.tio.utils.buffer.ByteBufferUtil;/*** @author tarzan*/
@Component
@Slf4j
public class SimulationSubscriber {@MqttClientSubscribe("tuoyuan/publish/zj/#")public void zjOne(String topic, byte[] payload){String[] strs=topic.split("/");String ID=strs[strs.length-1];log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID);}@MqttClientSubscribe("/sys/${deviceName}/thing/sub/register")public void thingSubRegister(String topic, byte[] payload) {// 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +// 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));}@MqttClientSubscribe("/tianma/publish/cmj")public void cmj(@Header("topic") String topic,@Payload byte[] payload) {System.out.println("*****************gc**************************************"+topic);JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));//业务的处理System.out.println("*****************test**************************************"+jsonObject);}@MqttClientSubscribe("/tianma/publish/zj")public void zj(@Header("topic") String topic,@Payload byte[] payload) {System.out.println("*****************gc**************************************"+topic);JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));//业务的处理System.out.println("*****************test**************************************"+jsonObject);}@MqttClientSubscribe("/tianma/publish/gbj")public void gbj(@Header("topic") String topic,@Payload byte[] payload) {System.out.println("*****************gc**************************************"+topic);JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));//业务的处理System.out.println("*****************test**************************************"+jsonObject);}@MqttClientSubscribe("/tianma/publish/ltl")public void ltl(@Header("topic") String topic,@Payload byte[] payload) {System.out.println("*****************gc**************************************"+topic);JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));//业务的处理System.out.println("*****************test**************************************"+jsonObject);}@MqttClientSubscribe("/tianma/publish/ntl")public void ntl(@Header("topic") String topic,@Payload byte[] payload) {System.out.println("*****************gc**************************************"+topic);JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));//业务的处理System.out.println("*****************test**************************************"+jsonObject);}@MqttClientSubscribe("/tianma/publish/ccl")public void ccl(@Header("topic") String topic,@Payload byte[] payload) {System.out.println("*****************gc**************************************"+topic);JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));//业务的处理System.out.println("*****************test**************************************"+jsonObject);}}
  • @Header(“topic”) 和@Payload 注解可以省略

  • tuoyuan/publish/zj/# 中的# 是通配符

    • 在MQTT协议中,#是一个通配符,代表匹配该主题的所有子主题。例如,如果你订阅了主题sports/baseball/#,那么你将接收到所有以sports/baseball/开头的主题的消息。

    • 请注意,通配符#只能用于多层的主题名称中,并且只能用于最后一个级别。例如,sports/baseball/#是有效的,但#sports/baseball或sports/#/baseball都是无效的。

    • 除了#之外,MQTT协议还支持一个单层通配符+,它代表只匹配该级别的主题。例如,如果你订阅了主题sports/baseball/+,那么你将只接收到以sports/baseball/开头,且后面跟着至少一个字符的主题的消息。

    • 请注意,使用通配符时需要谨慎,因为它们可能会匹配到意外的主题。确保你的订阅主题明确,并且只匹配你感兴趣的主题。

  • /sys/${deviceName}/thing/sub/register

    • 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
    • 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。

创建发布

创建一个mqtt消息发布接口类,例如 MqttTestController,代码如下:


import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.springblade.core.secure.annotation.NoToken;
import org.springblade.core.tool.api.R;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;/*** @author tarzan*/
@RestController
@Api(tags = "mqtt测试")
@NoToken
@RequestMapping("/mqtt")
@AllArgsConstructor
@Slf4j
public class MqttTestController {private final MqttClientTemplate mqttClientTemplate;@ApiOperation(value = "消息发送")@PostMapping("/publish")private R<Boolean> publish(String topic, String msg) {return R.status(mqttClientTemplate.publish(topic, msg.getBytes(StandardCharsets.UTF_8)));}}

接口测试

接口调用
在这里插入图片描述
控制台输出
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/658848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ fill()函数最详细介绍

文章目录 函数参数介绍函数功能函数使用注意点使用例子1.将数组arr[5]所有元素初始化为02.字符数组初始化3.vector对象 fill函数是C标准库中的一个算法函数&#xff0c;用于将指定范围内的元素赋值为给定的值。 函数参数介绍 fill( first, last, value );它接受三个参数&#…

idea激活教程(2020.1.4及以上版本)

首先点击试用版本&#xff0c;进入软件&#xff0c;再依次进行一下操作 一、在idea的Plugins配置中添加Z大的插件市场 上图中加载出来的插件是默认的&#xff0c;大家不用在意&#xff0c;直接点击“Manage Plugin Repositoryies…”打开配置弹窗 点击号&#xff0c;添加一行…

Qt 基础之QDataTime

Qt 基础之QDataTime 引言一、获取(设定)日期和时间二、时间戳三、时间计算 (重载运算符) 引言 QDataTime是Qt框架中用于处理日期和时间的类。它提供了操作和格式化日期、时间和日期时间组合的功能。QDataTime可以用于存储和检索日期和时间、比较日期和时间、对日期和时间执行算…

linux kernel 内存踩踏之KASAN(一)

一、背景 linux 内核出现内存类问题时&#xff0c;我们常用的调试工具就是kasan&#xff0c;kasan有三种模式&#xff1a; 1. Generic KASAN &#xff08;这个就是我们最常用的&#xff0c;1 debug byte indicate 8 bytes use state, 对标用户层 asan&#xff09; 2. Softwa…

Java强训day13(选择题编程题)

选择题 编程题 题目1 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);String s sc.nextLine();char[] c s.toCharArray();int i 0;int t 0;while (i < c.length) {if (c[i] ! \") {…

音视频数字化(数字与模拟-音频广播)

在互联网飞速发展的今天,每晚能坐在电视机前面的人越来越少,但是每天收听广播仍旧是很多人的习惯。 从1906年美国费森登在实验室首次进行无线电广播算起,“广播”系统已经陪伴人们115年了。1916年,收音机开始上市,收音机核心是“矿石”。1920年开始“调幅”广播,1941年开…

1.理解AOP,使用AOP

目录 1AOP基础 1.1 AOP概述 1.2AOP快速使用 2.3 AOP核心概念 1AOP基础 首先介绍一下什么是AOP&#xff0c;再通过一个快速入门程序&#xff0c;让大家快速体验AOP程序的开发。最后再介绍AOP当中所涉及到的一些核心的概念。 1.1 AOP概述 什么是AOP&#xff1f; 说白了&am…

晶体塑性有限元 Abaqus 三维泰森多边形(voronoi模型)插件 V8.0

更多内容见公众号“320科技工作室”&#xff0c;有需要欢迎通过公众号联系我们。

除毛可以用宠物空气净化器吗?猫用空气净化器哪些品牌吸毛好?

作为一位长期养猫的铲屎官&#xff0c;我深刻理解只有养猫人才懂的困扰&#xff0c;那就是家里到处都是猫毛和异味。我发现自从开始养猫之后&#xff0c;家里的空气质量变得不佳。猫毛和皮屑飞扬&#xff0c;而且室内空气中的污染物也越来越多。这种低质量的空气对我们的健康有…

刚刚,ChatGPT再爆安全漏洞!大量私密对话被泄露

ChatGPT 又一次陷入了安全漏洞风波。 国外一位用户在使用ChatGPT时表示&#xff0c;他原本只是进行一个无关的查询&#xff0c;却意外发现在和ChatGPT的聊天记录中出现了不属于自己的对话内容。 对话泄露了很多非用户本人的信息&#xff0c;其中包含了大量敏感内容。 如下图…

GPT教我如何成为rapper

提示词基本三要素:任务、角色、细节。其中任务是核心,角色和细节是可选项。 角色和细节可以引导ChatGPT更加准确地输出预期结果。 只有这三者结合才能让ChatGPT发挥出强大的语言模型能力。 任务 这个比较好理解,比如热爱唱跳rap的我,想给ChatGPT下达一个任务:如何才能…

C++ 中关键字delete用法

文章目录 代码如下&#xff1a; #include <stdio.h> #include <iostream> using namespace std;class Student { public:Student(){};Student(const Student&) delete; //禁用拷贝构造函数 也就是在拷贝构造函数声明的右侧加上delete }; int main(int argc, c…

语言革命:NLP与GPT-3.5如何改变我们的世界

文章目录 &#x1f4d1;前言一、技术进步与应用场景1.1 技术进步1.2 应用场景 二、挑战与前景三、伦理和社会影响四、实践经验五、总结与展望 &#x1f4d1;前言 自然语言处理&#xff08;Natural Language Processing&#xff0c;NLP&#xff09;是人工智能领域的一个重要分支…

基于Atmel Studio环境下开发AVR点灯示例

基于Atmel Studio环境下开发AVR点灯示例 &#x1f4cd;相关篇《ATMega328PB-AU烧录bootloader探索研究》 &#x1f531;《Atmel Studio开发环境下配合AVRDUDESS配置烧录快捷方式》 ✨谈起Atmel Studio开发环境&#xff0c;如果不是使用AVR单片机的人来说&#xff0c;可能比较…

SDN 拓扑感知技术带你逃离灾难,轻松实现云灾备

在这个数字化时代&#xff0c;云计算已经成为企业和组织提供各种软件应用服务和海量数据处理的普遍选择。但是&#xff0c;云计算环境下的数据量庞大且分布广泛&#xff0c;系统故障、网络攻击等风险可能会对业务应用和数据造成影响&#xff0c;导致服务中断、数据丢失等问题&a…

【学网攻】 第(17)节 -- 命名ACL访问控制列表

系列文章目录 目录 前言 一、ACL(访问控制列表)是什么&#xff1f; 二、实验 1.引入 总结 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节 -- 交换机划分Vlan【学网攻】 第…

Linux--redhat9创建软件仓库

1.插入光盘&#xff0c;挂载镜像 模拟插入光盘: 点击:虚拟机-可移动设备-CD/DVD 设备状态全选&#xff0c;使用ISO影响文件选择当前版本镜像&#xff0c;点击确认。 2.输入: df -h 可以显示&#xff0c;默认/dev/sr0文件为光盘文件&#xff0c;挂载点为/run/media/root/镜像…

GoLang和GoLand的安装和配置

1. GoLang 1.1 特点介绍 Go 语言保证了既能达到静态编译语言的安全和性能&#xff0c;又达到了动态语言开发维护的高效率&#xff0c;使用一个表达式来形容 Go 语言&#xff1a;Go C Python , 说明 Go 语言既有 C 静态语言程序的运行速度&#xff0c;又能达到 Python 动态语…

Flutter canvas 画一条波浪线 进度条

之前用 Flutter Canvas 画过一个三角三角形&#xff0c;html 的 Canvas 也画过一次类似的&#xff0c; 今天用 Flutter Canvas 试了下 感觉差不多&#xff1a; html 版本 大致效果如下&#xff1a; 思路和 html 实现的类似&#xff1a; 也就是找出点的位置&#xff0c;使用二阶…

OCP NVME SSD规范解读-8.SMART日志要求-4

SMART-21&#xff1a;这段描述解释了一个与设备内部I/O操作非对齐相关的计数器功能。该计数器记录的是由NVMe SSD执行的、起始地址未按照设备内部间接寻址单元&#xff08;IU&#xff0c;Indirection Unit&#xff09;大小进行对齐的写入I/O操作数量。 “Alignment”指的是每次…