基本概念:
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息传输协议,专为低带宽、高延迟或不稳定的网络环境设计。以下是MQTT实现收发消息的基本原理:
- 客户端-服务器模型:
MQTT基于客户端-服务器模型工作。客户端(可以是发布者或订阅者)发起连接请求到服务器(也称为代理Broker),并可以订阅主题、发布消息到主题或从订阅的主题接收消息。 - 发布/订阅模式:
MQTT使用发布/订阅模式进行消息传递。发布者(Publisher)将消息发布到特定的主题(Topic),而订阅者(Subscriber)通过订阅这些主题来接收消息。这种解耦的通信方式使得数据分发变得非常灵活高效。 - 消息结构:
MQTT传输的消息分为两部分:主题(Topic)和负载(Payload)。主题可以理解为消息的类型,而负载是消息的具体内容。订阅者订阅某个主题后,就会收到该主题下的消息内容。 - 服务质量(QoS):
MQTT协议支持三种不同的QoS级别,分别是至多一次(QoS 0)、至少一次(QoS 1)和恰好一次(QoS 2)。这些级别的选择能够保证消息传递的可靠性和效率。例如,QoS 1确保消息至少被传送一次,但可能会导致消息重复;而QoS 2则确保消息只被传送一次,避免了消息的重复或丢失。 - 保留消息:
发布者发布的消息可以被设置为保留消息。当订阅者订阅该主题时,它会接收到最新的保留消息。这确保了订阅者不会错过任何重要的信息,即使在它离线期间也有消息发布。 - 遗嘱消息:
发布者可以设置一个遗嘱消息(Last Will and Testament,LWT)。当发布者意外断开连接时,MQTT代理会自动发布这个遗嘱消息到指定的主题上。这样,即使发布者离线,订阅者也能接收到相关通知。 - 会话保持:
MQTT客户端和代理之间的连接具有会话保持机制。这确保了在网络不稳定或客户端重启的情况下,连接状态和订阅信息能够得到保持,从而保证了消息传递的稳定性和连续性。
综上所述,MQTT通过客户端-服务器模型、发布/订阅模式、主题和负载结构、服务质量选择、保留消息、遗嘱消息以及会话保持等机制实现了高效可靠的消息收发功能。这些特性使得MQTT在物联网(IoT)领域得到了广泛应用,如智能家居、工业自动化等场景。
组件封装(mqttComp.vue)
@vue/cli-plugin-babel 版本<5 —> npm i mqtt@4.3.7 --save
@vue/cli-plugin-babel 版本>=5 —> npm i mqtt@5.6.2 --save
注意: 在mqtt4.x版本的时候,请求路径后面的"/mqtt"是直接在 brokerUrl中配置,即本文的(tailPath)
但是在mqtt5+版本的时候,请求路径后面的"/mqtt"是在opts中配置的,即本文的(mqttOpts.path)
请根据你的版本选择对应的拼接方式
<!--* @Description: mqtt连接和消息发送组件 页面* @Author: mhf* @Date: 2024-05-25 00:49:23* @params:
-->
<template><div></div>
</template><script>
import mqtt from "mqtt";export default {name: "mqttComp",props: {topic: String, // 订阅主题mqttUrl: {type: Object,default: () => {return {head: "ws", // 必须是 ws 或 wss (mqtt:// 或 mqtts:// 必须让后端开放websocket服务)host: "xx.xxx.xx.xxx", // ip地址port: 8083, // 服务端口tailPath: "mqtt", // 服务路径};},}, // 服务地址mqttOpts: {type: Object,default: () => {return {keepalive: 60,clientId: "clientId-" + Math.random().toString(16).substr(2, 8),username: "username",password: "password",connectTimeout: 10 * 1000,path: "/mqtt"};},}, // 连接配置},data() {return {client: "",clientCount: 0,receivedMessage: null, // 用于存储接收到的消息};},watch: {topic(newTopic) {if (newTopic && this.client) {this.client.unsubscribe(this.topic);this.client.subscribe(newTopic);}},},methods: {async initMqtt() {let opts = JSON.parse(JSON.stringify(this.mqttOpts));this.client = mqtt.connect(`${this.mqttUrl.head}://${this.mqttUrl.host}:${this.mqttUrl.port}/${this.mqttUrl.tailPath}`,opts);this.client.on("connect", this.handleConnect);this.client.on("message", this.handleMessage);this.client.on("reconnect", this.handleReconnect);this.client.on("error", this.handleError);},handleConnect() {console.log("mqtt_连接成功");this.client.subscribe(this.topic);},handleMessage(topic, message) {this.receivedMessage = JSON.parse(message?.toString() || {});this.$emit("message-received", this.receivedMessage);},handleReconnect(error) {console.log(`正在第${this.clientCount}重连`, error);this.clientCount++;if (this.clientCount >= 10) {this.client.end();}},handleError(error) {console.log("连接失败", error);},/*** MQTT实现发送消息* @param: topic: 主题* @param: message: 消息体* @author: mhf* @time: 2024-05-24 14:26:51**/mqttPublish(topic, message) {this.client.publish(topic, JSON.stringify(message));},},async mounted() {await this.initMqtt();},beforeDestroy() {this.client.end();},
};
</script><style lang="scss" scoped></style>
组件使用
<template><div><mqttComp ref="mqttComp" :mqttUrl="mqttUrl" :mqttOpts="mqttOpts" :topic="parentTopic" @message-received="getMqttMessage"/><el-button @click="sendMqttMessage">发送MQTT消息</el-button></div>
</template><script>
import mqttComp from "./mqttComp.vue";export default {components: {mqttComp,},data() {return {parentTopic: "aaa",mqttUrl: {}, // todo 填写你的配置信息mqttOpts: {}, // todo 填写你的配置信息};},methods: {getMqttMessage(receivedMessage) {console.log("mqtt接收到的消息是:", receivedMessage);},sendMqttMessage() {this.$refs.mqttComp.mqttPublish("parentTopic", "来自父组件的消息");},},
};
</script>