python实现MQTT协议(发布者,订阅者,topic)

python实现MQTT协议

一、简介

1.1 概述

本文章针对物联网MQTT协议完成python实现

1.2 环境

  • Apache-apollo创建broker
  • Python实现发布者和订阅者

1.3 内容

  • MQTT协议架构说明 :

  • 利用仿真服务体会 MQTT协议

  • 针对MQTT协议进行测试

任务1:MQTT协议应用场景

说明: MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、 易于实现。这些特点使它适用于受限环境。该协议的特点有:

1 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2 对负载内容屏蔽的消息传输。
3 使用 TCP/IP 提供网络连接。

物联网应用场景:

image-20230901160722558

协议角色分工:

客户端分为2种角色:发布者(Publisher)和订阅者(Subscriber)。
每一个发布者(Publisher)可以发送不同类型的消息,我们把消息的类型叫做主题(topic),
MQTT通信中的消息都属于某一个主题 ,而只有订阅了这个主题的订阅者(Subscriber)才能收到属于这个主题的消息。
发布者和订阅者不需要在意和知道对方的存在(不需要知道对方的IP和端口),也不需要直接与对方建立连接。因为通信中存在着一个叫代理 (MQTT broker)的第三种角色,也可以叫MQTT服务器(MQTT server)。 
发布者、订阅者只需要知道MQTT服务器的IP和端口即可,并和它直接建立连接通信。MQTT代理作为消息的 中转,它过滤所有接受到的消息,并按照一定的机制(MQTT标准规定是基于主题的消息过滤派发方式,而具 体的MQTT服务器软件也提供了其他的派发方式)分发它们,使得所有注册到MQTT代理的订阅者只接收到他 们订阅了的消息,而不会收到他不关心的消息。
当发布者发布一条消息的时候,他必须同时指定消息的主题和消息的负载。MQTT代理在收到发布者发过来的 消息时,无需访问消息负载,他只是访问消息的主题信息,然后根据这主题派发给订阅者。需要注意的是,一个客户端可以同时既当发布者又当订阅者。比如一个开发板连接了一盏LED灯,它可以发布灯的暗/亮状态 信息,也可以从其他节点订阅对灯的控制消息。

3.生产者(发布消息)和消费者(消耗消息-订阅者)模式理解

生产者:wifi设备采集各种物联网传感器比如温度重力传感器
消费者:客户端比如手机
原理如下:

image-20230901095257410

任务2:搭建Mqtt协议服务 (broker)

前提:安装JDK和JAVA_HOME环境变量

1 下载Apollo服务器 地址 http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/

2 进入bin目录命令行 输入:

D:\softwares\apache-apollo-1.7.1\bin\apollo.cmd create jwbroker

3:broker\etc\apollo.xml文件下是配置服务器信息的文件
初始默认帐号是admin,密码password;

4:启动命令行: (以一个实例为单位进行创建的)

进入... jwbroker创建实例的\bin\ 目录,
在CMD输入命令apollo-broker.cmd run,可以使用TAB键自动补全,运行后输出信息如下:验证:
MQTT服务器TCP连接端口: tcp://0.0.0.0:61613
后台web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/

出现如下图表示启动成功

image-20230901154500857

安装mqtt需要的包:

pip install paho-mqtt

发布者publish创建:

import time
from paho.mqtt import publish
#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))client.subscribe("jw-temperature") # 发布主题def on_message(client,userdata,msg):print(msg.topc +  "消息发送!" + msg.payload.decode("utf-8"))if __name__ == '__main__':print("消息发布!----我是一个发布者:正在给设备和传感器发布主题-----")client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))for i in range(20):time.sleep(2)publish.single("lightChange","现在天黑了", qos = 1, hostname = HOST, port = PORT, client_id = client_id,auth = {'username': "admin", 'password': "password"})print("已发送"+str(i+1)+"条消息")

订阅者light_subcribe创建:

import paho.mqtt.client as mqtt
import time#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].
'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and  reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic+ msg.payload.decode("utf-8") +  ",回调消息:收到收到!我已经接收到发布者的消息,并且打开了光传感器" )def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手电筒打开----我是一个订阅者:需要消费主题-----")client_loop()

订阅者phone_subcribe创建:

import paho.mqtt.client as mqtt
import time#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613
'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and  reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic  + msg.payload.decode("utf-8")+  ",回调消息:收到收到!我已经接收到发布者的消息并给用户反馈手电筒已经打开")def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手机启动----我是一个订阅者:需要消费主题-----")client_loop()

演示:分别运行publish和light_subcribe和phone_subcribe

publish:

image-20230901160129432

light_subcribe:

image-20230901160148825

phone_subcribe:

image-20230901160158514

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/66691.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

maven搭建spring项目

前提 安装jdk 安装maven 安装eclipse 创建maven项目 搭建spring项目 pom.xml <dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.0.4.RELEASE</version> </dependency&…

【java 入侵 C# 之路】1-入门

感谢 https://www.cnblogs.com/mww-NOTCOPY/p/12213373.html 百度百科 jvm对应clr java se runtime对应 .net framework&#xff0c; jdk对应 .net framework sdk&#xff0c; java对应C# .NET 是开发者平台&#xff0c;它包含开发环境、技术框架、社区论坛、服务支持等&…

学习pytorch8 土堆说卷积操作

土堆说卷积操作 官网debug torch版本只有nn 没有nn.functional代码执行结果 B站小土堆视频学习笔记 官网 https://pytorch.org/docs/stable/nn.html#convolution-layers 常用torch.nn, nn是对nn.functional的封装&#xff0c;使函数更易用。 卷积核从输入图像左上角&#xf…

Bito使用手册

第一步&#xff1a;输入网站 https://alpha.bito.co/bitoai/ 第二步&#xff1a;填写邮箱 第三步&#xff1a;登录邮箱&#xff0c;获取验证码 第四步&#xff1a;填写验证码 第五步&#xff1a;完成

【LeetCode-中等题】994. 腐烂的橘子

文章目录 题目方法一&#xff1a;bfs层序遍历 题目 该题值推荐用bfs&#xff0c;因为是一层一层的感染&#xff0c;而不是一条线走到底的那种&#xff0c;所以深度优先搜索不适合 方法一&#xff1a;bfs层序遍历 广度优先搜索&#xff0c;就是从起点出发&#xff0c;每次都尝…

UG\NX CAM二次开发 查询工序所在的几何组TAG UF_OPER_ask_geom_group

文章作者:代工 来源网站:NX CAM二次开发专栏 简介: UG\NX CAM二次开发 查询工序所在的几何组TAG UF_OPER_ask_geom_group 效果: 代码: void MyClass::do_it() { int count=0;tag_t * objects;UF_UI_ONT_ask_selected_nodes(&count, &objects);for (in…

linux 下安装配置nexus

一、安装包获取方式 方式一 1、直接把下载好的安装包上传到服务器中 方式二 2、通过wget安装Nexus压缩包 ①、可以使用以下命令进行安装Nexus的最新版本 wget https://download.sonatype.com/nexus/3/latest-unix.tar.gz②、也可以点击官网复制想要下载的Nexus压缩包进行安装…

【Linux系列】离线安装openjdk17的rpm包

首发博客地址 首发博客地址[1] 系列文章地址[2] 视频地址[3] 准备 RPM 包 请从官网下载&#xff1a;https://www.oracle.com/java/technologies/downloads/#java17[4] 如需不限速下载&#xff0c;请关注【程序员朱永胜】并回复 1020 获取。 安装 yum localinstall jdk-17_linux…

C++智能指针之weak_ptr(保姆级教学)

目录 C智能指针之weak_ptr 概述 作用 本文涉及的所有程序 使用说明 weak_ptr的常规操作 lock(); use_count(); expired(); reset(); shared_ptr & weak_ptr 尺寸 智能指针结构框架 常见使用问题 shared_ptr多次引用同一数据&#xff0c;会导致两次释放同一内…

没有使用sniffer dongle在windows抓包蓝牙方法分享

网上很多文章都是介绍买一个sniffer dongle来抓蓝牙数据,嫌麻烦又费钱,目前找到一个好方法,不需要sniffer就可以抓蓝牙数据过程,现分享如下: (1)在我资源附件找到相关安装包或者查看如下链接 https://learn.microsoft.com/zh-cn/windows-hardware/drivers/bluetooth/testing-bt…

【Python】批量下载页面资源

【背景】 有一些非常不错的资源网站,比如一些MP3资源网站。资源很丰富,但是每一个资源都不大,一个一个下载费时费力,想用Python快速实现可复用的批量下载程序。 【思路】 获得包含资源链接的静态页面,用beautifulsoup分析页面,获得所有MP3资源的实际地址,然后下载。…

安卓逆向 - Frida反调试绕过

本文仅供学习交流&#xff0c;只提供关键思路不会给出完整代码&#xff0c;严禁用于非法用途&#xff0c;谢绝转载&#xff0c;若有侵权请联系我删除&#xff01; 本文案例 app&#xff1a;5Lqs5LicYXBwMTEuMy4y 一、引言&#xff1a; Frida是非常优秀的一款 Hook框架&#…

uni-app:允许字符间能自动换行(英文字符、数字等)

<template><view class"container"><!-- 这里是你的文本内容 -->{{ multilineText }}</view> </template><style> .container {word-break: break-all; } </style>例如&#xff1a; <template><view class"…

jQuery成功之路——jQuery的DOM操作简单易懂

jQuery的DOM操作 1.jQuery操作内容 jQuery操作内容 1. text() 获取或修改文本内容 类似于 dom.innerText 2. html() 获取或修改html内容 类似 dom.innerHTML 注意: 1. text() 是获取设置所有 2. html() 是获取第一个,设置所有 <!DOCTYPE html> <html lang"zh…

Android学习之路(13) Handler详解

1. 简介 Handler是一套 Android 消息传递机制,主要用于线程间通信。 用最简单的话描述&#xff1a; handler其实就是主线程在起了一个子线程&#xff0c;子线程运行并生成Message&#xff0c;Looper获取message并传递给Handler&#xff0c;Handler逐个获取子线程中的Message.…

Rstudio开不开了怎么办?R is taking longer to start than usual

Rstudio Server 启动时卡死 在使用 linux 服务器版 RstudioServer 的过程中&#xff0c;发现出现了一个问题&#xff0c;导致没有办法正常载入工作页面&#xff0c;网页提示信息是“R is taking longer to start than usual”&#xff0c;直接翻译过来就是“这次启动 R 会比平…

淘宝/天猫获得淘宝商品详情 API 接口文档

item_get-获得淘宝商品详情 API测试工具 注册开通 taobao.item_get 公共参数 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;secretString是调用密钥api_nameString是API接口名称&#xff08;包括在请求地址中&#xff09;[item_sear…

cms系统稳定性压力测试出现TPS抖动和毛刺的性能bug【杭州多测师_王sir】

一、并发线程数100&#xff0c;分10个阶梯&#xff0c;60秒加载时间&#xff0c;运行1小时进行压测&#xff0c;到10分钟就出现如下 二、通过jstat -gcutil 16689 1000进行监控

机器学习——决策树与随机森林

机器学习——决策树与随机森林 文章目录 前言一、决策树1.1. 原理1.2. 代码实现1.3. 网格搜索1.4. 可视化决策树 二、随机森林算法2.1. 原理2.2. 代码实现 三、补充&#xff08;过拟合与欠拟合&#xff09;总结 前言 决策树和随机森林都是常见的机器学习算法&#xff0c;用于分…

牛客网刷题

牛客网刷题-C&C 2023年9月3日15:58:392023年9月3日16:37:01 2023年9月3日15:58:39 2023年9月3日16:37:01 整型常量和实型常量的区别