Step1. mqtt消息注册及处理
使用python来做:
import paho.mqtt.client as mqtt
import mqtt_msghub as mqtt_msghub # mqtt payload is dealing here...# MQTT服务器信息broker = '192.168.0.16'
port = 1883
#topic = 'sensor/shake/measure/1'
username = "xxxxx"
password = "xxxxx"# 连接回调函数
def on_connect(client, userdata, flags, rc):if rc == 0:print("连接成功")else:print("连接失败,错误码:" + str(rc))# 接收消息回调函数
def on_message(client, userdata, msg):print("收到消息:")print("主题:" + msg.topic)#print("消息:" + str(msg.payload.decode())) //have bin format payload, can not transfter into str.mqtt_msghub.mqtt_dealmsg(client, msg.topic, msg.payload)# 创建MQTT客户端对象
client = mqtt.Client()# 设置连接回调函数
client.on_connect = on_connect# 设置接收消息回调函数
client.on_message = on_message# 连接MQTT服务器
client.username_pw_set(username, password)
client.connect(broker, port)# 订阅主题
mqtt_msghub.mqtt_topic_register(client) #at msgHub moule.# 开始循环监听消息,Ctrl+C中断退出
client.loop_forever()
Step1.1 看看topic注册及消息处理部分
from xml.dom import registerDOMImplementation
import payload_type_exchange as typeExchange
import paho.mqtt.client as mqtt
import table_common_table_crud as dbhelper
import gp_mysql_server as gpmysql #mysql.mqtt_topics = {"sensor/shake/xx/%d": "d_shake_xxx_ch%02d","sensor/shake/yyy/%d": "d_shake_yyy_ch%02d",
};def mqtt_topic_register(client):global mqtt_topics;for item in mqtt_topics:key = item;topic = key.replace('%d', '+');client.subscribe(topic);print("mqtt register:[%s]" % topic);def mqtt_get_topic_related_db_table(client, topic):dbtable = "";global mqtt_topics;if(topic in mqtt_topics): #for dict of python, iterator is just key itself.return (0, mqtt_topics[topic]);parts = topic.split("/");for item in mqtt_topics:key = itemkeyparts = key.split("/");parts_pre = parts[:-1];keyparts_pre = keyparts[:-1];if(parts_pre == keyparts_pre):ch = int(parts[-1])dbtable = mqtt_topics[item] % chreturn (0, dbtable);return (-1, dbtable);def getdbtable_macrotype(dbtablename):parts = dbtablename.split("_");return parts[:-1];def mqtt_dealmsg(client, topic, payload):(ret, dbtable) = mqtt_get_topic_related_db_table(client, topic); #check if this msg need to save to dbif(ret!=0): return;(ret, dbDict) = payload_to_dictOfDbField(payload, dbtable); #get db insert cmd related dictionary object(dbDict)if(ret != 0): return;dbhelper.insert_data(gpmysql.gpDbConn(), dbtable, dbDict); //call SQL_insert helper cmd.def payload_to_dictOfDbField(payload, dbtable):dbtable_class = getdbtable_macrotype(dbtable);if(dbtable_class == "d_shake_envelope".split("_")):return (0, typeExchange.payload_to_d_shake_envelope(payload)); //the indepentant type-convertion functions.else:print("unknown payload:%s, ommited to save to DB!" % dbtable);return (-1, "unknow payload");
Step2 通用的写mySql的辅助函数
代码中包含有两类payload的数据库入库接口,对于json格式,比较容易处理。顶多做一个名称映射表。
对于二进制格式,需要先将二进制转换为一个结构化的数据,然后才能入库。因为dbtable是个二维对象,最佳的载体是python.dictionary.
这里只给出了CRUD中的C和R。UD的代码可以此类推。
def json_to_mysql_insert(json_obj, table_name):data = json.load(json.dumps(json_obj));columns = ','.join(data.keys())values = "','".join(data.values());insert_statement = "INSERT INTO {} ({}) VALUES ('{}')".format(table_name, columns, values)return insert_statementdef dictionary_to_mysql_insert(dictionary, table_name):columns = ','.join(dictionary.keys())values = ','.join(['%s'] * len(dictionary))insert_statement = "INSERT INTO {} ({}) VALUES ({})".format(table_name, columns, values)return insert_statement# 查询操作
def query_data(conn, tablename):with conn.cursor() as cursor:sql = "SELECT * FROM " + tablename;cursor.execute(sql)data = cursor.fetchall()cursor.close()# 处理数据并转换为 JSONrecords = [];results = data;for row in results:json_data = []for item_name in row:item = row[item_name];if isinstance(item, datetime.datetime):dumbTime = item.strftime('%Y-%m-%d %H:%M:%S') # 将 datetime 对象按照指定格式转换为字符串json_data.append(dumbTime);else:if isinstance(item, bytes):dumbBytes = binascii.hexlify(item[:8]).decode()json_data.append(dumbBytes);else:if isinstance(item, decimal.Decimal):dumbDecimal = decimal.Decimal(item).to_eng_string();json_data.append(dumbDecimal);else:json_data.append(item);records.append(json_data);str = json.dumps(records);return str;
Step2.1 原始二进制流到Dictionary的转换:
这里没有定义结构体
from asyncio.windows_events import NULL
from multiprocessing.sharedctypes import Value
from os import name
from pickle import BINBYTES
from sqlite3 import SQLITE_BUSY_SNAPSHOT
import pymysql
import json
import gp_mysql_server as gpmysql
import datetime;
import binascii;
import decimal;
import struct;
from typing import MutableSequencePT_OF_SENSOR_SHAKE_SAMPLE = 2048def payload_to_d_shake_envelope(payload):#payload is a binarray(20:08, Sep18,2023,break.)#c_struct = CStruct(payload)c_struct = payload;verHigh = c_struct[0]verLow = c_struct[1]sn = struct.unpack_from("<I", c_struct, offset=2)[0]type_str = c_struct[6:8].decode('utf-8')timeOfSample_str = c_struct[8:28].decode('utf-8').rstrip('\x00')scale = struct.unpack_from("<I", c_struct, offset=28)[0]freqCenter = struct.unpack_from("<f", c_struct, offset=32)[0]freqBand = struct.unpack_from("<f", c_struct, offset=36)[0]binData = payload[40:40+4096];measures = struct.unpack_from("<4f", c_struct, offset=40 + 2 * PT_OF_SENSOR_SHAKE_SAMPLE)#fmt to dictionary.fmtValue ={};#verHight=1, verLow=0, result="1.0"fmtValue["ver"] = "{}.{}".format(verHigh, verLow);fmtValue["type"] = type_str;fmtValue["time"] = timeOfSample_str;fmtValue["scale"] = scale;fmtValue["fs"] = freqCenter;fmtValue["band"] = freqBand;fmtValue["bin_data"] = binData;fmtValue["rms"] = measures[0];fmtValue["ppk"] = measures[1];fmtValue["kurtossis"] = measures[2];fmtValue["margin"] = measures[3];fmtValue["sn"] = sn;return fmtValue;