MQTT服务搭建及python使用示例

1、MQTT协议

1.1、MQTT介绍

        MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定的环境下使用。MQTT协议使用TCP/IP协议栈进行通讯,支持多种编程语言和平台,并且能够提供可靠的消息传递机制。

        在MQTT中,设备可以发布消息到特定的主题(topic),同时其他设备可以订阅这些主题以接收相应的消息。这种发布/订阅模式使得设备之间的通讯更加灵活和高效。MQTT还支持三种服务质量等级:至多一次(at most once)、至少一次(at least once)和恰好一次(exactly once),以满足不同场景下对消息传递可靠性的需求。

        优点:代码量少,开销低,带宽占用小,即时通讯协议。

1.2、MQTT概念

        订阅(Subscribtion):订阅包含主题筛选器( Topic Filter )和最大服务质量( QoS )。订阅会与一个会话( Session )关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

        会话(Session):每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

        主题名(Topic Name):连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。需要注意的是, MQTT 中消息主题按照层级命名,使用 ‘/’ 进行分割。此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:

        单层通配符 + :单层通配符只能匹配一层的主题,例如: China/Beijing/+ ,可以匹配的只有 Beijing 这个主 题下面一层的主题,例如 Xicheng, DongCheng, Xuanwu 等等。

        多层通配符 # :顾名思义,多层通配符就是可以匹配多个层级的主题,例如: China/# ,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong ,等等。

        主题筛选器(Topic Filter):一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

        消息订阅:消息订阅者所具体接收的内容。

1.3、MQTT中的角色

        Publisher 和 Subscriber 为客户端,Broker 为服务器端,消息主题为消息类型,Broker 根据 Topic 过滤消息,并将消息向客户端推送。

MQTT 中用 QoS 表示服务质量, MQTT 协议中有三种服务质量 (QoS) :

        1 ) QoS =0 ,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。

        2 ) QoS =1, 至少一次,保证包会到达目的地,但是可能出现重包。

        3 ) QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。

客户端:

        Publisher 和 Subscriber 都属于客户端。

        发布应用消息给其它相关的客户端。

        订阅以请求接受相关的应用消息。

        取消订阅以移除接受应用消息的请求。

        从服务端断开连接

服务器:

        服务器端即所谓的 MQTT Broker 服务器。

        接受来自客户端的网络连接。

        接受客户端发布的应用消息。

        处理客户端的订阅和取消订阅请求。

        转发应用消息给符合条件的已订阅客户端。

MQTT 提供的公共服务器端( Broker )有:

test.mosquitto.orgbroker.hivemq.comiot.eclipse.org

2、基于公共服务示例

        在次,选择使用EMQX提供的免费MQTT公共服务器,但同样可以选择其他任何MQTT broker。The Free Global Public MQTT Broker | Try Now | EMQ (emqx.com)

Broker: broker.emqx.io 
TCP Port: 1883 
Websocket Port: 8083

python库:pip install paho-mqtt=="1.6.1"

消息发布代码,pub.py

import time
import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'def connect_mqtt():def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef publish(client):msg_count = 0while True:time.sleep(1)msg = f"messages: {msg_count}"result = client.publish(topic, msg)# result: [0, 1]status = result[0]if status == 0:print(f"Send `{msg}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}")msg_count += 1def run():client = connect_mqtt()client.loop_start()publish(client)if __name__ == '__main__':run()

运行打印结果:

消息订阅代码,sub.py:

import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'def connect_mqtt() -> mqtt_client:def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef subscribe(client: mqtt_client):def on_message(client, userdata, msg):print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")client.subscribe(topic)client.on_message = on_messagedef run():client = connect_mqtt()subscribe(client)client.loop_forever()if __name__ == '__main__':run()

运行打印结果:

        可以看到,发布和订阅成功。注意:每个客户端的ID唯一,不能重复!

3、基于Apollo服务示例

3.1、安装Apollo服务

        服务器端搭建:

Index of /dist/activemq/activemq-apollo/1.7.1

解压并打开目标如下:

        Apollo中间件其实是免安装的,我们只需要下载apache-apollo-1.7.1-windows-distro.zip,然后解压到某个文件夹就可以了。在这里我解压到D:\dist\apache-apollo-1.7.1。在apache-apollo-1.7.1/bin目录下打开终端执行命令:apollo create myapollo

        若出现如下错误 Loading configuration file ‘D:\phpStudy\apache-apollo-1.7.1\bin\mybroker\etc\apollo.xml’.Startup failed: java.lang.NoClassDefFoundError: javax/xml/bind/ValidationEventHandler,处理方式:换上jdk1.8版本即可。

        然后在生成的myapollo目录的bin目录下执行:pollo-broker.cmd run,结果如下:

        服务搭建成功,进入后台管理,打开网页,输入ip + : 61680 进入后台管理 ,默认用户名admin 密码 password,例如:http://127.0.0.1:61680

3.2、示例代码

本地服务基础信息:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

python库:pip install paho-mqtt=="1.6.1"

发布主题,publish.py

import sys
import time
import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc):print("Connected with result code " + str(rc))def on_subscribe(client, userdata, mid, granted_qos):print("消息发送成功")client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)i = 0
while True:try:# 发布MQTT信息sensor_data = "test" + str(i)client.publish(topic="public", payload=sensor_data, qos=0)time.sleep(5)i += 1except KeyboardInterrupt:print("EXIT")client.disconnect()sys.exit(0)

订阅主题,subscribe.py

import time
import paho.mqtt.client as mqtt# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):if rc == 0:print("连接成功")print("Connected with result code " + str(rc))def on_message(client, userdata, msg):print(msg.topic + " " + str(msg.payload))client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)# client.subscribe("public")
client.subscribe([("public", 0), ("test", 2)])  # 订阅
client.loop_forever()

执行脚本后,正确大小消息。浏览器可见连接记录。

4、EMQX客户端工具

下载地址:MQTTX 下载选择适合您的平台,立即开始使用 MQTTX。icon-default.png?t=N7T8https://mqttx.app/zh/downloads

4.1、公共服务测试消息接收

创建连接:

Host:为代码中定义好的 broker.emqx.io 
Port:为代码中定义好的 1883 
用户名、密码根据需要添加

添加订阅:

主题为:/flask/mqtt

在MQTTX中发布消息:

测试成功。

4.2、自建服务测试消息接收

MQTT版本选择3.1.1,以下参数,与服务保持一致:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

其它测试步骤,一样。(创建主题,测试)

5、EMQX代理服务器

        windows下搭建MQTT代理服务,与Apollo服务功能一样,更方便好用,下载地址:https://www.emqx.io/zh/downloads

        下载后解压,进入目录bin文件下,执行:emqx start 

        打开浏览器输入 http://localhost:18083 进入EMQ的web控制台,输入用户名:admin 密码:public 进行登录。登录进入后,界面如下:

至此,代理服务器已经创建完成!客户端就可以连接代理服务器进行消息的分发和订阅!

附录:

java1.8.1下载,地址:Java Downloads | Oracleicon-default.png?t=N7T8https://www.oracle.com/java/technologies/downloads/#java8-windows

参考:

1、嵌入式QT- QT使用MQTT

嵌入式QT- QT使用MQTT_qt mqtt-CSDN博客

2、Python实现通信协议(mqtt)5星,mqtt flask

【小沐学Python】Python实现通信协议(mqtt)_python mqtt-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/8592.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

联发科天玑AI开发套件亮相:开发智能终端生成式AI应用的全面工具

在今日召开的天玑开发者大会2024(MDDC 2024)上,联发科向外界展示了其最新力作——天玑AI开发套件。该套件是为合作伙伴提供终端生成式AI应用开发的一站式工具包,意在简化并加速开发过程。 联发科推出的天玑AI开发套件包括四大核心…

去除图片水印软件-inpaint

一、普通使用教程 亲眼看看使用 Inpaint 从照片中删除不需要的元素是多么容易: 1.1加载图片 1.2 选择要纠正的问题区域 1.3 告别不需要的对象并保存 二、功能 1 修复旧照片 老并不总是意味着坏。我们拥有的一些旧照片对我们来说仍然很重要,因为它们仍…

minio上传文件失败如何解决

1. 做了什么操作 通过接口上传excel文件,返回响应值 2. 错误如图 2. 如何解决 根据错误描述定位到了部署minio的地方minio通过docker部署,找到docker - compose发现配置文件中minio有两个端口,一个是用于api的,一个是用于管理界面…

Dell EMC Storage Unity: Remove/Install Memory Module

SP A 一个内存故障 点击system view -> Enclosures->Top查看 再次查看Alert, 确认内存出现问题 进入Service , 将SP A置为service状态 移出SP A ,进行内存更换 更换完内存后,将SP A插入设备,并进行线缆连接 进入…

6层板学习笔记2

说明:笔记基于6层全志H3消费电子0.65MM间距BGA 67、多层板的电源建议直接大面积铺铜,不建议走线,铺铜充分满足其载流能力 68、凡亿推荐表层1OZ的铜厚线宽20MIL能承载1A的电流,内层0.5OZ的铜厚线宽为40MIL能承载1A的电流,过孔直径20MIL(0.5MM)能承载1A左右的电流,实际设…

Django项目中的Nginx+uWSGI

Django项目中的NginxuWSGI部署 配合另一篇博客共同饮用Django项目服务器部署(2024最新) 一:Nginx uWSGI部署框架 用户浏览器向nginx发送请求,nginx判断请求是动态海事静态,如果是静态请求,则直接返回静态…

(五)JSP教程——response对象

response对象主要用于动态响应客户端请求(request),然后将JSP处理后的结果返回给客户端浏览器。JSP容器根据客户端的请求建立一个默认的response对象,然后使用response对象动态地创建Web页面、改变HTTP标头、返回服务器端地状态码…

c++ socket基于TCP

linux网络编程基础api socket 地址api:ip地址和端口对,成为 soccket 地址。 socket 基础api: sys/socket.h 中,包括创建、命名、监听 socket ;接受连接、发起连接、读写数据、获取地址信息、检测带外标记、读取设置 s…

unity ui 同屏

一共有三个摄像机,上屏,下屏 和 类似照相机的ccamera 类似照相机的ccamera的设置: 下屏摄像机设置: 下屏交互的Canvas设置: 新建一个canvas,下面放上rawimage: 如果下屏不想显示的内容&#xf…

创建和管理数据库

1. 一条数据的存储过程 存储数据是处理数据的第一步.只有正确的把数据存储起来,我们才能进行有效的处理和分析.否则,只能是一团乱麻.在MySQL中,一个完整的数据存储过程一共有四步 : 创建数据库,确认字段,创建数据表&a…

【练习3】

1.将二叉搜索树转为排序的双向链表 (好久没看数据结构,忘完了,学习大佬的代码) class Solution { public:Node* prenullptr,*headnullptr; //pre为每次遍历时的前一个节点,head记录头节点Node* treeToDoublyList(Node* root) {if…

漏扫神器Invicti V2024.4.0专业版

前言 Invicti Professional是Invicti Security公司推出的一个产品,它是一种高级的网络安全扫描工具。Invicti Professional旨在帮助组织发现和修复其网络系统中的潜在安全漏洞和弱点。它提供了全面的漏洞扫描功能,包括Web应用程序和网络基础设施的漏洞扫…

xhs 旋转滑块流程分析

声明 本文章中所有内容仅供学习交流,抓包内容、敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除! 前言 本文首发于公众号…

近似消息传递算法(AMP)单测量模型(SMV)

1、算法解决问题 很多人致力于解决SLM模型的求逆问题,即知道观测值和测量矩阵(字典之类的),要求未知变量的值。SLM又叫做标准线性模型,后续又在此基础上进行升级变为广义线性模型。即SLM是yAxe,这里是线性…

STM32单片机实战开发笔记-独立看门狗IWDG

嵌入式单片机开发实战例程合集: 链接:https://pan.baidu.com/s/11av8rV45dtHO0EHf8e_Q0Q?pwd28ab 提取码:28ab IWDG模块测试 1、功能描述 STM32F10X内置两个看门狗,提供了更高的安全性,时间的精确下性和使用的灵活性…

【JAVA基础之时间API】自定义时间格式

🔥作者主页:小林同学的学习笔录 🔥mysql专栏:小林同学的专栏 目录 1.Date类 1.1 概述 1.2 构造方法 1.3 常用方法 2.SimpleDateFormat类 2.1 概述 2.2 构造方法 2.3 格式规则 2.4 常用方法 3.Calendar类 3.1 概述…

SparkSQL优化

SparkSQL优化 优化说明 缓存数据到内存 Spark SQL可以通过调用spark.sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表用一种柱状格式( an inmemory columnar format)缓存至内存中。然后Spark SQL在执行查询任务…

PotPlayer v1.7.22218 全格式影音播放器,无广绿色版!

软件介绍 PotPlayer是一款多功能且免费的媒体播放软件,兼容多种音频和视频格式。提供了丰富的功能性以及个性化设置,以迎合不同用户的需求。友好的用户界面,允许用户自定义皮肤和快捷键,提升了操作的便利性。 此外,Po…

DenseCLIP环境配置

直接看raoyongming/DenseCLIP: [CVPR 2022] DenseCLIP: Language-Guided Dense Prediction with Context-Aware Prompting (github.com) 但这里的环境配置可能和现在不太适配,自己配了好久没弄好 后面尝试了另外的版本的(但这个版本少了一些内容&#…

力扣打卡第二天

206. 反转链表 class Solution { public:ListNode* reverseList(ListNode* head) {// //迭代法// ListNode *pre nullptr;// ListNode *curr head;// while(curr){// ListNode *next curr -> next;// curr -> next pre;// pre curr;// curr next;/…