Python MQTT 连接新版ONENet
简介
前几个教程我们使用mqtt.fx连接了新版的ONENet, 只是跑通了MQTT协议,但是在实际操作下还需要实现具体环境、具体设备的MQTT连接,本章教程将以Python MQTT的方式连接 ONENet
参考文档:
paho-mqtt · PyPI
OneNET - 中国移动物联网开放平台 (10086.cn)
准备环境
pip安装 paho-mqtt
pip install paho-mqtt
获取ONENet 三元组
准备好Onenet的三元组
三元组分别为
DeviceName=“wenshidap” #设备ID
Productid = “kuerSLKlo8” #产品ID
accesskey=“QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk=”#秘钥
根据ONENet手册的文档说明,mqtt连接onenet需要进行鉴权,可以访问链接查看鉴权算法,但是我们可以根据鉴权算法说明生成鉴权的秘钥
使用Pyhon生成的鉴权秘钥的函数为
#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'# res = 'products/%s' % id # 通过产品ID访问产品API# res = 'userid/%s' % id # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return token
MQTT订阅和发布Topic说明
我们可以点击产品开发->设备开发->topic管理->数据流topic来查看当前设备可以订阅哪些topic,在这里我们是上传数据流,所以我们只关心发布的topic 和订阅上传成功和上传失败的topic
发布的topic :$sys/kuerSLKlo8/{device-name}/dp/post/json
只需要将我们的device-name换成当前的devicename即可 在本项目就是wenshidap
即:$sys/kuerSLKlo8/wenshidap/dp/post/json
订阅的topic:
上传成功:$sys/kuerSLKlo8/{device-name}/dp/post/json/accepted
上传失败:$sys/kuerSLKlo8/{device-name}/dp/post/json/rejected
当我们订阅上传成功topic时,数据流上传成功后就会返回msg的id 失败时 reject topic就会返回失败的原因
MQTT连接ONENet主程序
import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311
import struct
import json
import base64
import hmac
import time
from urllib.parse import quoteServerUrl = "mqtts.heclouds.com" #服务器url
ServerPort = 1883#服务器端口
DeviceName="wenshidap" #设备ID
Productid = "kuerSLKlo8" #产品ID
accesskey="QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk="# 发布的topic
Pub_topic1 = "$sys/"+Productid+"/"+ DeviceName+"/dp/post/json"#需要订阅的topic
#数据上传成功的消息
Sub_topic1 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/accepted"
#接收数据上传失败的消息
Sub_topic2 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/rejected"#测试用json数据格式
jsonstr = "{\"id\": 123,\"dp\": {\"ConEnv_Temp\": [{\"v\": 22.1}],\"ConEnv_Humi\": [{\"v\": 61.2}]}}"#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'# res = 'products/%s' % id # 通过产品ID访问产品API# res = 'userid/%s' % id # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return tokendef on_subscribe(client, userdata, mid, reason_code_list, properties):# Since we subscribed only for a single channel, reason_code_list contains# a single entryif reason_code_list[0].is_failure:print(f"Broker rejected you subscription: {reason_code_list[0]}")else:print(f"Broker granted the following QoS: {reason_code_list[0].value}")def on_unsubscribe(client, userdata, mid, reason_code_list, properties):# Be careful, the reason_code_list is only present in MQTTv5.# In MQTTv3 it will always be emptyif len(reason_code_list) == 0 or not reason_code_list[0].is_failure:print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")else:print(f"Broker replied with failure: {reason_code_list[0]}")client.disconnect()# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
def on_connect(client, userdata, flags, reason_code, properties):if reason_code.is_failure:print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")else:# we should always subscribe from on_connect callback to be sure# our subscribed is persisted across reconnections.# client.subscribe("$SYS/#")print("连接结果:" + mqtt.connack_string(reason_code))#连接成功后就订阅topicclient.subscribe(Sub_topic1)client.subscribe(Sub_topic2)# 从服务器接收发布消息时的回调。
def on_message(client, userdata, message):print(str(message.payload,'utf-8'))#当消息已经被发送给中间人,on_publish()回调将会被触发
def on_publish(client, userdata, mid):print(str(mid))def main():passw=get_token(DeviceName,accesskey)print(passw)mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,DeviceName)mqttc.on_connect = on_connectmqttc.on_message = on_messagemqttc.on_subscribe = on_subscribemqttc.on_unsubscribe = on_unsubscribe# client = mqtt.Client(DeviceName,protocol=MQTTv311)#client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书mqttc.connect(ServerUrl, port=ServerPort, keepalive=120)mqttc.username_pw_set(Productid,passw)mqttc.loop_start()while(1):mqttc.publish(Pub_topic1,jsonstr,qos=0)print("okk")time.sleep(2)if __name__ == '__main__':main()
运行测试一下
可以看到我们订阅的topic 返回了我们消息的ID 123 说明我们的数据上传成功 ,平台上也可以看到我们的数据流