MQTT服务搭建及python使用示例

1、MQTT协议

1.1、MQTT介绍

        MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定的环境下使用。MQTT协议使用TCP/IP协议栈进行通讯,支持多种编程语言和平台,并且能够提供可靠的消息传递机制。

        在MQTT中,设备可以发布消息到特定的主题(topic),同时其他设备可以订阅这些主题以接收相应的消息。这种发布/订阅模式使得设备之间的通讯更加灵活和高效。MQTT还支持三种服务质量等级:至多一次(at most once)、至少一次(at least once)和恰好一次(exactly once),以满足不同场景下对消息传递可靠性的需求。

        优点:代码量少,开销低,带宽占用小,即时通讯协议。

1.2、MQTT概念

        订阅(Subscribtion):订阅包含主题筛选器( Topic Filter )和最大服务质量( QoS )。订阅会与一个会话( Session )关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

        会话(Session):每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

        主题名(Topic Name):连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。需要注意的是, MQTT 中消息主题按照层级命名,使用 ‘/’ 进行分割。此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:

        单层通配符 + :单层通配符只能匹配一层的主题,例如: China/Beijing/+ ,可以匹配的只有 Beijing 这个主 题下面一层的主题,例如 Xicheng, DongCheng, Xuanwu 等等。

        多层通配符 # :顾名思义,多层通配符就是可以匹配多个层级的主题,例如: China/# ,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong ,等等。

        主题筛选器(Topic Filter):一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

        消息订阅:消息订阅者所具体接收的内容。

1.3、MQTT中的角色

        Publisher 和 Subscriber 为客户端,Broker 为服务器端,消息主题为消息类型,Broker 根据 Topic 过滤消息,并将消息向客户端推送。

MQTT 中用 QoS 表示服务质量, MQTT 协议中有三种服务质量 (QoS) :

        1 ) QoS =0 ,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。

        2 ) QoS =1, 至少一次,保证包会到达目的地,但是可能出现重包。

        3 ) QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。

客户端:

        Publisher 和 Subscriber 都属于客户端。

        发布应用消息给其它相关的客户端。

        订阅以请求接受相关的应用消息。

        取消订阅以移除接受应用消息的请求。

        从服务端断开连接

服务器:

        服务器端即所谓的 MQTT Broker 服务器。

        接受来自客户端的网络连接。

        接受客户端发布的应用消息。

        处理客户端的订阅和取消订阅请求。

        转发应用消息给符合条件的已订阅客户端。

MQTT 提供的公共服务器端( Broker )有:

test.mosquitto.orgbroker.hivemq.comiot.eclipse.org

2、基于公共服务示例

        在次,选择使用EMQX提供的免费MQTT公共服务器,但同样可以选择其他任何MQTT broker。The Free Global Public MQTT Broker | Try Now | EMQ (emqx.com)

Broker: broker.emqx.io 
TCP Port: 1883 
Websocket Port: 8083

python库:pip install paho-mqtt=="1.6.1"

消息发布代码,pub.py

import time
import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'def connect_mqtt():def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef publish(client):msg_count = 0while True:time.sleep(1)msg = f"messages: {msg_count}"result = client.publish(topic, msg)# result: [0, 1]status = result[0]if status == 0:print(f"Send `{msg}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}")msg_count += 1def run():client = connect_mqtt()client.loop_start()publish(client)if __name__ == '__main__':run()

运行打印结果:

消息订阅代码,sub.py:

import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'def connect_mqtt() -> mqtt_client:def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef subscribe(client: mqtt_client):def on_message(client, userdata, msg):print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")client.subscribe(topic)client.on_message = on_messagedef run():client = connect_mqtt()subscribe(client)client.loop_forever()if __name__ == '__main__':run()

运行打印结果:

        可以看到,发布和订阅成功。注意:每个客户端的ID唯一,不能重复!

3、基于Apollo服务示例

3.1、安装Apollo服务

        服务器端搭建:

Index of /dist/activemq/activemq-apollo/1.7.1

解压并打开目标如下:

        Apollo中间件其实是免安装的,我们只需要下载apache-apollo-1.7.1-windows-distro.zip,然后解压到某个文件夹就可以了。在这里我解压到D:\dist\apache-apollo-1.7.1。在apache-apollo-1.7.1/bin目录下打开终端执行命令:apollo create myapollo

        若出现如下错误 Loading configuration file ‘D:\phpStudy\apache-apollo-1.7.1\bin\mybroker\etc\apollo.xml’.Startup failed: java.lang.NoClassDefFoundError: javax/xml/bind/ValidationEventHandler,处理方式:换上jdk1.8版本即可。

        然后在生成的myapollo目录的bin目录下执行:pollo-broker.cmd run,结果如下:

        服务搭建成功,进入后台管理,打开网页,输入ip + : 61680 进入后台管理 ,默认用户名admin 密码 password,例如:http://127.0.0.1:61680

3.2、示例代码

本地服务基础信息:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

python库:pip install paho-mqtt=="1.6.1"

发布主题,publish.py

import sys
import time
import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc):print("Connected with result code " + str(rc))def on_subscribe(client, userdata, mid, granted_qos):print("消息发送成功")client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)i = 0
while True:try:# 发布MQTT信息sensor_data = "test" + str(i)client.publish(topic="public", payload=sensor_data, qos=0)time.sleep(5)i += 1except KeyboardInterrupt:print("EXIT")client.disconnect()sys.exit(0)

订阅主题,subscribe.py

import time
import paho.mqtt.client as mqtt# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):if rc == 0:print("连接成功")print("Connected with result code " + str(rc))def on_message(client, userdata, msg):print(msg.topic + " " + str(msg.payload))client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)# client.subscribe("public")
client.subscribe([("public", 0), ("test", 2)])  # 订阅
client.loop_forever()

执行脚本后,正确大小消息。浏览器可见连接记录。

4、EMQX客户端工具

下载地址:MQTTX 下载选择适合您的平台,立即开始使用 MQTTX。icon-default.png?t=N7T8https://mqttx.app/zh/downloads

4.1、公共服务测试消息接收

创建连接:

Host:为代码中定义好的 broker.emqx.io 
Port:为代码中定义好的 1883 
用户名、密码根据需要添加

添加订阅:

主题为:/flask/mqtt

在MQTTX中发布消息:

测试成功。

4.2、自建服务测试消息接收

MQTT版本选择3.1.1,以下参数,与服务保持一致:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

其它测试步骤,一样。(创建主题,测试)

5、EMQX代理服务器

        windows下搭建MQTT代理服务,与Apollo服务功能一样,更方便好用,下载地址:https://www.emqx.io/zh/downloads

        下载后解压,进入目录bin文件下,执行:emqx start 

        打开浏览器输入 http://localhost:18083 进入EMQ的web控制台,输入用户名:admin 密码:public 进行登录。登录进入后,界面如下:

至此,代理服务器已经创建完成!客户端就可以连接代理服务器进行消息的分发和订阅!

附录:

java1.8.1下载,地址:Java Downloads | Oracleicon-default.png?t=N7T8https://www.oracle.com/java/technologies/downloads/#java8-windows

参考:

1、嵌入式QT- QT使用MQTT

嵌入式QT- QT使用MQTT_qt mqtt-CSDN博客

2、Python实现通信协议(mqtt)5星,mqtt flask

【小沐学Python】Python实现通信协议(mqtt)_python mqtt-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/8592.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

联发科天玑AI开发套件亮相:开发智能终端生成式AI应用的全面工具

在今日召开的天玑开发者大会2024(MDDC 2024)上,联发科向外界展示了其最新力作——天玑AI开发套件。该套件是为合作伙伴提供终端生成式AI应用开发的一站式工具包,意在简化并加速开发过程。 联发科推出的天玑AI开发套件包括四大核心…

去除图片水印软件-inpaint

一、普通使用教程 亲眼看看使用 Inpaint 从照片中删除不需要的元素是多么容易: 1.1加载图片 1.2 选择要纠正的问题区域 1.3 告别不需要的对象并保存 二、功能 1 修复旧照片 老并不总是意味着坏。我们拥有的一些旧照片对我们来说仍然很重要,因为它们仍…

leetcode 2266.统计打字方案数

思路:dp。 这道题其实也是爬楼梯的变形。 不过,这里需要分类讨论一下:就是选择7或者9的时候是4种递推情况,其他的都是3种。 而且,我们可以利用dp数组过程记录的特点运用在本题当中。 这里需要解决几个问题&#xf…

minio上传文件失败如何解决

1. 做了什么操作 通过接口上传excel文件,返回响应值 2. 错误如图 2. 如何解决 根据错误描述定位到了部署minio的地方minio通过docker部署,找到docker - compose发现配置文件中minio有两个端口,一个是用于api的,一个是用于管理界面…

Dell EMC Storage Unity: Remove/Install Memory Module

SP A 一个内存故障 点击system view -> Enclosures->Top查看 再次查看Alert, 确认内存出现问题 进入Service , 将SP A置为service状态 移出SP A ,进行内存更换 更换完内存后,将SP A插入设备,并进行线缆连接 进入…

6层板学习笔记2

说明:笔记基于6层全志H3消费电子0.65MM间距BGA 67、多层板的电源建议直接大面积铺铜,不建议走线,铺铜充分满足其载流能力 68、凡亿推荐表层1OZ的铜厚线宽20MIL能承载1A的电流,内层0.5OZ的铜厚线宽为40MIL能承载1A的电流,过孔直径20MIL(0.5MM)能承载1A左右的电流,实际设…

Django项目中的Nginx+uWSGI

Django项目中的NginxuWSGI部署 配合另一篇博客共同饮用Django项目服务器部署(2024最新) 一:Nginx uWSGI部署框架 用户浏览器向nginx发送请求,nginx判断请求是动态海事静态,如果是静态请求,则直接返回静态…

(五)JSP教程——response对象

response对象主要用于动态响应客户端请求(request),然后将JSP处理后的结果返回给客户端浏览器。JSP容器根据客户端的请求建立一个默认的response对象,然后使用response对象动态地创建Web页面、改变HTTP标头、返回服务器端地状态码…

c++ socket基于TCP

linux网络编程基础api socket 地址api:ip地址和端口对,成为 soccket 地址。 socket 基础api: sys/socket.h 中,包括创建、命名、监听 socket ;接受连接、发起连接、读写数据、获取地址信息、检测带外标记、读取设置 s…

unity ui 同屏

一共有三个摄像机,上屏,下屏 和 类似照相机的ccamera 类似照相机的ccamera的设置: 下屏摄像机设置: 下屏交互的Canvas设置: 新建一个canvas,下面放上rawimage: 如果下屏不想显示的内容&#xf…

创建和管理数据库

1. 一条数据的存储过程 存储数据是处理数据的第一步.只有正确的把数据存储起来,我们才能进行有效的处理和分析.否则,只能是一团乱麻.在MySQL中,一个完整的数据存储过程一共有四步 : 创建数据库,确认字段,创建数据表&a…

LeetCode题练习与总结:格雷编码--89

一、题目描述 n 位格雷码序列 是一个由 2^n 个整数组成的序列,其中: 每个整数都在范围 [0, 2^n - 1] 内(含 0 和 2^n - 1)第一个整数是 0一个整数在序列中出现 不超过一次每对 相邻 整数的二进制表示 恰好一位不同 ,…

【练习3】

1.将二叉搜索树转为排序的双向链表 (好久没看数据结构,忘完了,学习大佬的代码) class Solution { public:Node* prenullptr,*headnullptr; //pre为每次遍历时的前一个节点,head记录头节点Node* treeToDoublyList(Node* root) {if…

漏扫神器Invicti V2024.4.0专业版

前言 Invicti Professional是Invicti Security公司推出的一个产品,它是一种高级的网络安全扫描工具。Invicti Professional旨在帮助组织发现和修复其网络系统中的潜在安全漏洞和弱点。它提供了全面的漏洞扫描功能,包括Web应用程序和网络基础设施的漏洞扫…

vue3点击复制粘贴

1、安装库并引入 npm i vue-clipboard3 --save2.封装为一个函数 import useClipboard from vue-clipboard3;const { toClipboard } useClipboard();// 点击复制文本const copyText (text: string) > {return new Promise((resolve, reject) > {try {//复制toClipboar…

持续总结中!2024年面试必问 100 道 Java基础面试题(三十五)

上一篇地址:持续总结中!2024年面试必问 100 道 Java基础面试题(三十四)-CSDN博客 六十九、Java中的UUID是什么? 在Java中,UUID(Universally Unique Identifier)是一种用于生成唯一…

记录: Python解析yml文件,顺序解析,带所有文件等号

记录: Python解析yml文件,顺序解析,带所有文件等号from yaml.composer import Composer from yaml.constructor import Constructor import yamlclass ParseYml:def __init__(self):passstaticmethoddef parse(yml_pathNone):try:loader yaml.Loader(op…

xhs 旋转滑块流程分析

声明 本文章中所有内容仅供学习交流,抓包内容、敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除! 前言 本文首发于公众号…

近似消息传递算法(AMP)单测量模型(SMV)

1、算法解决问题 很多人致力于解决SLM模型的求逆问题,即知道观测值和测量矩阵(字典之类的),要求未知变量的值。SLM又叫做标准线性模型,后续又在此基础上进行升级变为广义线性模型。即SLM是yAxe,这里是线性…

STM32单片机实战开发笔记-独立看门狗IWDG

嵌入式单片机开发实战例程合集: 链接:https://pan.baidu.com/s/11av8rV45dtHO0EHf8e_Q0Q?pwd28ab 提取码:28ab IWDG模块测试 1、功能描述 STM32F10X内置两个看门狗,提供了更高的安全性,时间的精确下性和使用的灵活性…