mqtt详细介绍及集成到springboot

mqtt详细介绍及集成到springboot

  • 1.mqtt发布/订阅消息参数详细介绍
  • 2. mqtt客户端连接参数介绍
  • 3. docker-compose搭建mqtt服务端
  • 4. springboot集成mqtt实现发布订阅
  • 5. 测试
  • 注意事项

1.mqtt发布/订阅消息参数详细介绍

  • 1.1. qos
  • QoS=0 ,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver
    发送消息,如果发送失败,也就算了;
  • QoS=1 ,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender
    向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息; .
  • QoS=2 ,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender
    尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保
    证 Receiver 不会因为消息重传而收到重复的消息。
  • QoS 在发布与订阅中的区别
    当客户端 A 的发布 QoS 大于 客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 B 的订阅 QoS。
    当客户端 A 的发布 QoS 小于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 A 的发布 QoS。
    总结:服务端给订阅端发送消息时,发布端的QoS 和 订阅端的QoS 谁小,就用谁的QoS级别。

2. mqtt客户端连接参数介绍

  • cleanSession
    为 true 时表示创建一个新的会话,每次连接时不会持久化订阅信息或消息队列。如果连接断开,服务器会丢失客户端的会话信息(例如订阅的主题)。
    为 false 时表示创建一个持久会话,在客户端断开连接后会话仍然保持,服务器不会丢失客户端的会话信息(例如订阅的主题),直到会话超时注销。

  • keepAliveInterval
    心跳时间间隔,默认60s
    MQTT 协议中约定:在 1.5*Keep Alive 的时间间隔内,如果 Broker 没有收到来自 Client 的任何数据包,那么 Broker 认为它和 Client 之间的连接已经断开;同样地, 如果 Client 没有收到来自 Broker 的任何数据包,那么 Client 认为它和 Broker 之间的连接已经断开。
    emqx中可以通过日志追踪查看心跳日志。在这里插入图片描述在这里插入图片描述

  • clientId

    • 唯一标识客户端:MQTT 服务器根据 clientId 来识别和管理不同的客户端连接。如果两个客户端连接时使用相同的 clientId,服务器可能会断开第一个客户端的连接并接受第二个客户端的连接,导致第一个客户端的连接被踢出。
    • 消息持久化和会话管理:客户端连接时可以选择是否持久化会话。如果会话持久化,服务器会将客户端的订阅信息、消息等数据与 clientId 绑定,并在客户端断开后保留这些信息,直到客户端重新连接时恢复会话。
    • 维护订阅状态:使用 clientId,服务器能够记住客户端订阅的主题和 QoS 等信息,即使客户端断开了连接,只要在重新连接时使用相同的 clientId,服务器就会恢复这些订阅状态。
    • 如果需要会话持久化、订阅信息持久化等功能时,最好使用固定的 clientId,这样可以确保重新连接时,服务器能够恢复会话信息

3. docker-compose搭建mqtt服务端

不会安装docker的可以参考文章:linux安装docker和docker-compose详细教程
docker-compose.yml配置

version: '3.3'
services:mqtt:image: emqx/emqx:latestcontainer_name: mqtt_server  ports:- "1883:1883"- "8083:8083"- "18083:18083"networks:- mqttvolumes:
#      - ./conf/emqx.conf:/opt/emqx/etc/emqx.conf 
#      - ./data:/opt/emqx/data	- ./log:/opt/emqx/lognetworks:mqtt:

保存后直接docker-compose up -d启动,访问页面:http://192.168.80.251:18083/
默认账号:admin 密码:public

4. springboot集成mqtt实现发布订阅

gitee项目地址:https://gitee.com/wangyunchao6/springboot-mqtt.git

pom依赖

        <!--        mqtt依赖开始--><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!--        mqtt依赖结束-->

application.yml配置

mqtt:broker-url: tcp://192.168.80.251:1883 # 替换为你的 MQTT 服务器ip地址client-id: mqtt-server #可以随便写username: admin # 如果需要认证password: public # 如果需要认证default-topic: test/topic

mqtt连接配置
注意:订阅主题的时候也可以不调用callback 方法,直接在subscribe中处理业务逻辑。

            mqttClient.subscribe(topic,2, (t, msg) -> {System.out.println("Received message from topic: " + t + ", Message: " + new String(msg.getPayload()));});

MqttServerConfig,用这个作为mqtt的服务端订阅test/topic主题

package com.example.springbootmqtt.config;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 接收消息的mqtt服务端配置*/
@Configuration
@Slf4j
public class MqttServerConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);//心跳时间间隔,默认60S
//        options.setKeepAliveInterval(30);//连接超时时间
//        options.setConnectionTimeout(10);//设置自动连接options.setAutomaticReconnect(true);client.connect(options);client.setCallback(new MqttCallback() {/*** 当客户端与 MQTT Broker 的连接意外断开时触发此方法。* 断开的原因会通过参数 cause 传递过来*/@Overridepublic void connectionLost(Throwable cause) {if (!client.isConnected()) {try {client.reconnect();} catch (MqttException e) {log.error("connectio lost,Throwable={}",cause.getMessage());log.error("connectio lost,MqttException={}",e.getMessage());}}}/*** 当客户端收到一条消息时触发此方法。*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Received message from topic: " + topic);String payload = new String(message.getPayload());// Handle message based on topicswitch (topic) {case "topic1":break;case "topic2":break;case "topic3":break;default:System.out.println("Unknown topic: " + topic);break;}}/*** 当客户端发送的消息成功到达 Broker(仅对 QoS 1 和 QoS 2 消息有效)时触发此方法。* 用于确认消息已经完成传输。*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// Not used for subscriberstry {System.out.println("Delivery complete. Message: " + token.getMessage());} catch (Exception e) {e.printStackTrace();}            }});//服务启动时订阅主题client.subscribe("test/topic", 2);return client;}
}

MockMqttClientOneConfig,用这个模拟mqtt客户端向test/topic主题发送数据

package com.example.springbootmqtt.config;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 模拟发送消息的mqtt客户端1配置*/
@Configuration
@Slf4j
public class MockMqttClientOneConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttClient mockMqttClientOne() throws MqttException {String clientId = "mock-mqtt-client-one";MqttClient client = new MqttClient(brokerUrl, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);//心跳时间间隔,默认60S
//        options.setKeepAliveInterval(30);//连接超时时间
//        options.setConnectionTimeout(10);//设置自动连接options.setAutomaticReconnect(true);client.connect(options);client.setCallback(new MqttCallback() {/*** 当客户端与 MQTT Broker 的连接意外断开时触发此方法。* 断开的原因会通过参数 cause 传递过来*/@Overridepublic void connectionLost(Throwable cause) {if (!client.isConnected()) {try {client.reconnect();} catch (MqttException e) {log.error("connectio lost,Throwable={}",cause.getMessage());log.error("connectio lost,MqttException={}",e.getMessage());}}}/*** 当客户端收到一条消息时触发此方法。*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Received message from topic: " + topic);String payload = new String(message.getPayload());// Handle message based on topicswitch (topic) {case "topic1":break;case "topic2":break;case "topic3":break;default:System.out.println("Unknown topic: " + topic);break;}}/*** 当客户端发送的消息成功到达 Broker(仅对 QoS 1 和 QoS 2 消息有效)时触发此方法。* 用于确认消息已经完成传输。*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// Not used for subscriberstry {System.out.println("Delivery complete. Message: " + token.getMessage());} catch (Exception e) {e.printStackTrace();}            }});return client;}
}

MqttController

package com.example.springbootmqtt.controller;import com.example.springbootmqtt.service.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;// 发布消息@GetMapping("/publish")public String publish(String topic, String message) {mqttService.sendMessage(topic, message);return "Message published to topic: " + topic;}// 订阅主题@GetMapping("/subscribe")public String subscribe( String topic) {mqttService.subscribe(topic);return "Subscribed to topic: " + topic;}// 模拟MockMqttClientOne客户端发布消息@GetMapping("/mockClientOnepublish")public String mockClientOnepublish( String topic, String message) {mqttService.mockClientOnepublish(topic, message);return "mockClientOnepublish Message published to topic: " + topic;}// 模拟MockMqttClientTwo客户端发布消息@GetMapping("/mockClientTwoPublish")public String mockClientTwoPublish( String topic, String message) {mqttService.mockClientTwoPublish(topic, message);return "mockClientTwoPublish Message published to topic: " + topic;}}

MqttService

package com.example.springbootmqtt.service;public interface MqttService {void sendMessage(String topic, String message);void mockClientOnepublish(String topic, String message);void mockClientTwoPublish(String topic, String message);void subscribe(String topic);void sendDefaultMessage(String message);void subscribeDefaultTopic();}

MqttServiceImpl

package com.example.springbootmqtt.service.impl;import com.example.springbootmqtt.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.UUID;@Slf4j
@Service
public class MqttServiceImpl implements MqttService {@Autowired@Qualifier("mqttClient")private MqttClient mqttClient;@Autowired@Qualifier("mockMqttClientOne")private MqttClient mockMqttClientOne;@Autowired@Qualifier("mockMqttClientTwo")private MqttClient mockMqttClientTwo;@Value("${mqtt.default-topic}")private String defaultTopic;// 发送消息@Overridepublic void sendMessage(String topic, String message) {try {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(1);mqttClient.publish(topic, mqttMessage);System.out.println("Message sent to topic: " + topic + ", Message: " + message);} catch (MqttException e) {e.printStackTrace();}}@Overridepublic void mockClientOnepublish(String topic, String message) {try {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(1);mockMqttClientOne.publish(topic, mqttMessage);System.out.println("Message sent to topic: " + topic + ", Message: " + message);} catch (MqttException e) {e.printStackTrace();}}@Overridepublic void mockClientTwoPublish(String topic, String message) {try {String clientId = UUID.randomUUID().toString();MqttClient client = new MqttClient("tcp://192.168.80.251:1883", clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName("admin");options.setPassword("public".toCharArray());options.setCleanSession(true);//心跳时间间隔,默认60S
//            options.setKeepAliveInterval(30);//连接超时时间
//        options.setConnectionTimeout(10);//设置自动连接options.setAutomaticReconnect(true);client.connect(options);MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(1);client.publish(topic, mqttMessage);System.out.println("Message sent to topic: " + topic + ", Message: " + message);} catch (MqttException e) {e.printStackTrace();}}// 订阅消息@Overridepublic void subscribe(String topic) {try {mqttClient.subscribe(topic, (t, msg) -> {System.out.println("Received message from topic: " + t + ", Message: " + new String(msg.getPayload()));});System.out.println("Subscribed to topic: " + topic);} catch (MqttException e) {e.printStackTrace();}}// 默认主题发送消息@Overridepublic void sendDefaultMessage(String message) {sendMessage(defaultTopic, message);}// 默认主题订阅@Overridepublic void subscribeDefaultTopic() {subscribe(defaultTopic);}
}

5. 测试

启动程序,调用接口:http://127.0.0.1:8080/mqtt/mockClientTwoPublish?topic=test/topic&message=aaaaaa
查看打印结果
在这里插入图片描述

注意事项

先说结论:

  • messageArrived:由 消息订阅者 使用,在消息被成功接收到时触发。
  • deliveryComplete:由 消息发布者 使用,在消息被成功传输(且确认完成)时触发。

向主题test/topic发送一条消息,看上图的控制台输出结果,消息发布者并没有进入messageArrived方法,消息订阅者并没有进入deliveryComplete方法,所以在编写代码时只需要根据自己的角色,在对应方法写业务逻辑即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/67231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot的租房网站系统

作者&#xff1a;学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等 文末获取“源码数据库万字文档PPT”&#xff0c;支持远程部署调试、运行安装。 项目包含&#xff1a; 完整源码数据库功能演示视频万字文档PPT 项目编码&#xff1…

自动化办公|xlwings简介

xlwings 是一个开源的 Python 库&#xff0c;旨在实现 Python 与 Microsoft Excel 的无缝集成。它允许用户使用 Python 脚本自动化 Excel 操作&#xff0c;读取和写入数据&#xff0c;执行宏&#xff0c;甚至调用 VBA 脚本。这使得数据分析、报告生成和其他与 Excel 相关的任务…

概率函数,累计分布函数

四. 累计分布函数 1. 累计分布函数&#xff08;CDF, Cumulative Distribution Function&#xff09; 累计分布函数是用来描述随机变量取值小于或等于某个给定值的概率。它适用于离散型和连续型随机变量&#xff0c;并且能够通过概率质量函数&#xff08;PMF&#xff09;或概率…

Flutter项目适配鸿蒙

Flutter项目适配鸿蒙 前言Flutter项目适配鸿蒙新工程直接支持ohos构建新项目编译运行 适配已有的Flutter项目 前言 目前市面上使用Flutter技术站的app不在少数&#xff0c;对于Flutter的项目&#xff0c;可能更多的是想直接兼容Harmonyos&#xff0c;而不是直接在重新开发一个…

链家房价数据爬虫和机器学习数据可视化预测

完整源码项目包获取→点击文章末尾名片&#xff01;

【20250113】基于肌肉形变测量的连续步态相位估计算法,可自适应步行速度和地形坡度...

【基本信息】 论文标题&#xff1a;Continuous Gait Phase Estimation by Muscle Deformations with Speed and Ramp Adaptability 发表期刊&#xff1a;IEEE Sensors Journal 发表时间&#xff1a;2024年5月30日 【访问链接】 论文链接&#xff1a;https://ieeexplore.ieee.or…

【全套】基于分类算法的学业警示预测信息管理系统

【全套】基于分类算法的学业警示预测信息管理系统 【摘 要】 随着网络技术的发展基于分类算法的学业警示预测信息管理系统是一种新的管理方式&#xff0c;同时也是现代学业预测信息管理的基础&#xff0c;利用互联网的时代与实际情况相结合来改变过去传统的学业预测信息管理中…

小程序组件 —— 31 事件系统 - 事件绑定和事件对象

小程序中绑定事件和网页开发中绑定事件几乎一致&#xff0c;只不过在小程序不能通过 on 的方式绑定事件&#xff0c;也没有 click 等事件&#xff0c;小程序中绑定事件使用 bind 方法&#xff0c;click 事件也需要使用 tap 事件来进行代替&#xff0c;绑定事件的方式有两种&…

邮箱发送验证码(nodemailer)

邮箱发送验证码 打开SMTP 服务使用 Node.js 邮件发送模块&#xff08;nodemailer&#xff09;封装验证码组件 开发中经常会遇到需要验证码&#xff0c;不过手机验证码需要money&#xff0c;不到必要就不必花费&#xff0c;所以可以使用邮箱发送验证码 打开SMTP 服务 根据自己想…

AV1视频编解码简介、码流结构(OBU)

我的音视频/流媒体开源项目(github) 目录 一、AV1编码技术 二、AV1码流结构(OBU) 三、IVF文件格式 四、ffmpeg支持AV1 五、关于常见格式对AV1的封装 一、AV1编码技术 AV1是由开放媒体联盟(AOM&#xff0c;Alliance for Open Media)在2018年发布的&#xff0c;AV1的前身…

Sentaurus TCAD学习笔记:transform指令

目录 一、transform指令简介二、transform指令的实现1.cut指令2.flip指令3.rotate指令4.stretch指令5.translate指令6.reflect指令 三、transform指令示例 一、transform指令简介 在Sentaurus中&#xff0c;如果需要对器件进行翻转、平移等操作&#xff0c;可以通过transform指…

kafka消费堆积问题探索

背景 我们的商城项目用PHP写的&#xff0c;原本写日志方案用的是PHP的方案&#xff0c;但是&#xff0c;这个方案导致资源消耗一直降不下来&#xff0c;使用了20个CPU。后面考虑使用通过kafka的方案写日志&#xff0c;商城中把产生的日志丢到kafka中&#xff0c;在以go写的项目…

【opencv】第7章 图像变换

7.1 基 于OpenCV 的 边 缘 检 测 本节中&#xff0c;我们将一起学习OpenCV 中边缘检测的各种算子和滤波器——Canny 算子、Sobel 算 子 、Laplacian 算子以及Scharr 滤波器。 7.1.1 边缘检测的一般步骤 在具体介绍之前&#xff0c;先来一起看看边缘检测的一般步骤。 1.【第…

[Qt]常用控件介绍-多元素控件-QListWidget、QTableWidget、QQTreeWidget

目录 1.多元素控件介绍 2.ListWidget控件 属性 核心方法 核心信号 细节 Demo&#xff1a;编辑日程 3.TableWidget控件 核心方法 QTableWidgetItem核心信号 QTableWidgetItem核心方法 细节 Demo&#xff1a;编辑学生信息 4.TreeWidget控件 核心方法 核心信号…

[Linux]从零开始的STM32MP157交叉编译环境配置

一、前言 最近该忙的事情也是都忙完了&#xff0c;也是可以开始好好的学习一下Linux了。之前九月份的时候就想入手一块Linux的开发板用来学习Linux底层开发。之前在NXP和STM32MP系列之间犹豫&#xff0c;思来想去还是入手了一块STM32MP157。当然不是单纯因为MP157的性能在NXP之…

小程序如何引入腾讯位置服务

小程序如何引入腾讯位置服务 1.添加服务 登录 微信公众平台 注意&#xff1a;小程序要企业版的 第三方服务 -> 服务 -> 开发者资源 -> 开通腾讯位置服务 在设置 -> 第三方设置 中可以看到开通的服务&#xff0c;如果没有就在插件管理中添加插件 2.腾讯位置服务…

添加计算机到AD域中

添加计算机到AD域中 一、确定计算机的DNS指向域中的DNS二、打开系统设置三、加域成功后 一、确定计算机的DNS指向域中的DNS 二、打开系统设置 输入域管理员的账密 三、加域成功后 这里有显示&#xff0c;就成功了。

从epoll事件的视角探讨TCP:三次握手、四次挥手、应用层与传输层之间的联系

目录 一、应用层与TCP之间的联系 二、 当通信双方中的一方如客户端主动断开连接时&#xff0c;仅是在客户端的视角下连接已经断开&#xff0c;在服务端的眼中&#xff0c;连接依然存在&#xff0c;为什么&#xff1f;——触发EPOLLRDHUP事件&#xff1a;对端关闭连接或停止写…

使用RSyslog将Nginx Access Log写入Kafka

个人博客地址&#xff1a;使用RSyslog将Nginx Access Log写入Kafka | 一张假钞的真实世界 环境说明 CentOS Linux release 7.3.1611kafka_2.12-0.10.2.2nginx/1.12.2rsyslog-8.24.0-34.el7.x86_64.rpm 创建测试Topic $ ./kafka-topics.sh --zookeeper 192.168.72.25:2181/k…

使用 Docker 部署 Java 项目(通俗易懂)

目录 1、下载与配置 Docker 1.1 docker下载&#xff08;这里使用的是Ubuntu&#xff0c;Centos命令可能有不同&#xff09; 1.2 配置 Docker 代理对象 2、打包当前 Java 项目 3、进行编写 DockerFile&#xff0c;并将对应文件传输到 Linux 中 3.1 编写 dockerfile 文件 …