将mqtt的消息存储至mysql数据库

Step1. mqtt消息注册及处理

使用python来做:

import paho.mqtt.client as mqtt
import mqtt_msghub as mqtt_msghub # mqtt payload is dealing here...# MQTT服务器信息broker = '192.168.0.16'
port = 1883
#topic = 'sensor/shake/measure/1'
username = "xxxxx"
password = "xxxxx"# 连接回调函数
def on_connect(client, userdata, flags, rc):if rc == 0:print("连接成功")else:print("连接失败,错误码:" + str(rc))# 接收消息回调函数
def on_message(client, userdata, msg):print("收到消息:")print("主题:" + msg.topic)#print("消息:" + str(msg.payload.decode())) //have bin format payload, can not transfter into str.mqtt_msghub.mqtt_dealmsg(client, msg.topic, msg.payload)# 创建MQTT客户端对象
client = mqtt.Client()# 设置连接回调函数
client.on_connect = on_connect# 设置接收消息回调函数
client.on_message = on_message# 连接MQTT服务器
client.username_pw_set(username, password)
client.connect(broker, port)# 订阅主题
mqtt_msghub.mqtt_topic_register(client) #at msgHub moule.# 开始循环监听消息,Ctrl+C中断退出
client.loop_forever()

Step1.1 看看topic注册及消息处理部分

from xml.dom import registerDOMImplementation
import payload_type_exchange as typeExchange
import paho.mqtt.client as mqtt
import table_common_table_crud as dbhelper
import gp_mysql_server as gpmysql #mysql.mqtt_topics = {"sensor/shake/xx/%d": "d_shake_xxx_ch%02d","sensor/shake/yyy/%d": "d_shake_yyy_ch%02d",
};def mqtt_topic_register(client):global mqtt_topics;for item in mqtt_topics:key = item;topic = key.replace('%d', '+');client.subscribe(topic);print("mqtt register:[%s]" % topic);def mqtt_get_topic_related_db_table(client, topic):dbtable = "";global mqtt_topics;if(topic in mqtt_topics): #for dict of python, iterator is just key itself.return (0, mqtt_topics[topic]);parts = topic.split("/");for item in mqtt_topics:key = itemkeyparts = key.split("/");parts_pre = parts[:-1];keyparts_pre = keyparts[:-1];if(parts_pre == keyparts_pre):ch = int(parts[-1])dbtable = mqtt_topics[item] % chreturn (0, dbtable);return (-1, dbtable);def getdbtable_macrotype(dbtablename):parts = dbtablename.split("_");return parts[:-1];def mqtt_dealmsg(client, topic, payload):(ret, dbtable) = mqtt_get_topic_related_db_table(client, topic); #check if this msg need to save to dbif(ret!=0): return;(ret, dbDict) = payload_to_dictOfDbField(payload, dbtable);  #get db insert cmd related dictionary object(dbDict)if(ret != 0): return;dbhelper.insert_data(gpmysql.gpDbConn(), dbtable, dbDict); //call SQL_insert helper cmd.def payload_to_dictOfDbField(payload, dbtable):dbtable_class = getdbtable_macrotype(dbtable);if(dbtable_class == "d_shake_envelope".split("_")):return (0, typeExchange.payload_to_d_shake_envelope(payload));  //the indepentant type-convertion functions.else:print("unknown payload:%s, ommited to save to DB!" % dbtable);return (-1, "unknow payload");

Step2 通用的写mySql的辅助函数

代码中包含有两类payload的数据库入库接口,对于json格式,比较容易处理。顶多做一个名称映射表。

对于二进制格式,需要先将二进制转换为一个结构化的数据,然后才能入库。因为dbtable是个二维对象,最佳的载体是python.dictionary.

这里只给出了CRUD中的C和R。UD的代码可以此类推。

def json_to_mysql_insert(json_obj, table_name):data = json.load(json.dumps(json_obj));columns = ','.join(data.keys())values = "','".join(data.values());insert_statement = "INSERT INTO {} ({}) VALUES ('{}')".format(table_name, columns, values)return insert_statementdef dictionary_to_mysql_insert(dictionary, table_name):columns = ','.join(dictionary.keys())values = ','.join(['%s'] * len(dictionary))insert_statement = "INSERT INTO {} ({}) VALUES ({})".format(table_name, columns, values)return insert_statement# 查询操作
def query_data(conn, tablename):with conn.cursor() as cursor:sql = "SELECT * FROM " + tablename;cursor.execute(sql)data = cursor.fetchall()cursor.close()# 处理数据并转换为 JSONrecords = [];results = data;for row in results:json_data = []for item_name in row:item = row[item_name];if isinstance(item, datetime.datetime):dumbTime = item.strftime('%Y-%m-%d %H:%M:%S')  # 将 datetime 对象按照指定格式转换为字符串json_data.append(dumbTime);else:if isinstance(item, bytes):dumbBytes = binascii.hexlify(item[:8]).decode()json_data.append(dumbBytes);else:if isinstance(item, decimal.Decimal):dumbDecimal = decimal.Decimal(item).to_eng_string();json_data.append(dumbDecimal);else:json_data.append(item);records.append(json_data);str =  json.dumps(records);return str;

Step2.1 原始二进制流到Dictionary的转换:

这里没有定义结构体

from asyncio.windows_events import NULL
from multiprocessing.sharedctypes import Value
from os import name
from pickle import BINBYTES
from sqlite3 import SQLITE_BUSY_SNAPSHOT
import pymysql
import json
import gp_mysql_server as gpmysql
import datetime;
import binascii;
import decimal;
import struct;
from typing import MutableSequencePT_OF_SENSOR_SHAKE_SAMPLE = 2048def payload_to_d_shake_envelope(payload):#payload is a binarray(20:08, Sep18,2023,break.)#c_struct = CStruct(payload)c_struct = payload;verHigh = c_struct[0]verLow = c_struct[1]sn = struct.unpack_from("<I", c_struct, offset=2)[0]type_str = c_struct[6:8].decode('utf-8')timeOfSample_str = c_struct[8:28].decode('utf-8').rstrip('\x00')scale = struct.unpack_from("<I", c_struct, offset=28)[0]freqCenter = struct.unpack_from("<f", c_struct, offset=32)[0]freqBand = struct.unpack_from("<f", c_struct, offset=36)[0]binData = payload[40:40+4096];measures = struct.unpack_from("<4f", c_struct, offset=40 + 2 * PT_OF_SENSOR_SHAKE_SAMPLE)#fmt to dictionary.fmtValue ={};#verHight=1, verLow=0, result="1.0"fmtValue["ver"] = "{}.{}".format(verHigh, verLow);fmtValue["type"] = type_str;fmtValue["time"] = timeOfSample_str;fmtValue["scale"] = scale;fmtValue["fs"] = freqCenter;fmtValue["band"] = freqBand;fmtValue["bin_data"] = binData;fmtValue["rms"] = measures[0];fmtValue["ppk"] = measures[1];fmtValue["kurtossis"] = measures[2];fmtValue["margin"] = measures[3];fmtValue["sn"] = sn;return fmtValue;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/82515.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【文末赠书】SRE求职必会 —— 可观测性平台可观测性工程(Observability Engineering)

文章目录 〇、导读一、实现可观测性平台的技术要点是什么&#xff1f;二、兼容全域信号量三、所谓全域信号量有哪些&#xff1f;四、统一采集和上传工具五、统一的存储后台六、自由探索和综合使用数据七、总结★推荐阅读《可观测性工程》直播预告直播主题直播时间预约直播 视频…

融云观察:AI Agent 是不是游戏赛道的下一个「赛点」?

本周四 融云直播间&#xff0c;点击报名~ ChatGPT 的出现&#xff0c;不仅让会话成为了未来商业的基本形态&#xff0c;也把大家谈论 AI 的语境从科技产业转向了 AI 与全产业的整合。 关注【融云全球互联网通信云】了解更多 而目前最热衷于拥抱生成式 AI 的行业中&#xff0c…

Ubuntu22.04 vnc远程黑屏

一、原因 原因是Ubuntu22.04使用的gnome启用了Wayland。vnc、teamviewer、向日葵、todesk等均无法使用或者远程黑屏等。 简单的说vnc、teamviewer、向日葵、todesk等均基于xorg实现&#xff08;xorg太流行&#xff09;&#xff0c;并不兼容Wayland&#xff0c;所以vnc无法正常…

SkyWalking内置MQE语法

此文档出自SkyWalking官方git https://github.com/apache/skywalking docs/en/api/metrics-query-expression.md Metrics Query Expression(MQE) Syntax MQE is a string that consists of one or more expressions. Each expression could be a combination of one or more …

stm32----ADC模数转换

一、ADC介绍 ADC&#xff0c;即模数转换器&#xff0c;它可以将模拟信号转化为数字信号。在stm32种一般有3个ADC&#xff0c;每个ADC有18个通道。 12位ADC是一种逐次逼近型模拟数字转换器&#xff0c;它有多达18个通道&#xff0c;可测量16个外部和两个内部信号源。各个通道的A…

Linux文件管理命令

Linux命令行 命令空格参数(可写可不写)空格文件(可写可不写)ls/opt 根目录下的opt文件夹ls-a 显示所有文件及隐藏文件/optls -l 详细输出文件夹内容 ls -h 输出文件大小(MB...)ls--full-time 完整时间格式输出ls-d 显示文件夹本身信息&#xff0c;不输出内容ls-t 根据最后修改…

js如何实现一个简单的节流函数?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 实现简单的节流函数⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入…

巨人互动|Facebook海外户Facebook有什么功能

Facebook是一款国际化的用于聊天的软件&#xff0c;Facebook一般情况下用户可以在其共享照片、发布评论以及在网络上发布新闻或者其他有趣内容的链接&#xff0c;观看短视频或者实时聊天等。那么Facebook也拥有广泛的功能和特点。 巨人互动|Google海外户&Google内容定位介绍…

修改root 用户的密码

修改root账号密码和有效期 一般linux系统默认root是没有密码的&#xff0c;需要启动的时候用户自己设置&#xff0c;这里介绍一种不用passwd指令添加或者修改root密码的方式 在root账户的记录中&#xff0c;用“:”符号分隔开的第二个字段通常是密码字段&#xff0c;将该字段的…

SQLite 学习笔记1 - 简介、下载、安装

SQLite 简介 SQLite是一款非常轻量级的关系数据库系统&#xff0c;支持多数SQL92标准。SQLite 是世界上使用最广泛的数据库引擎。SQLite 内置于所有手机和大多数计算机中&#xff0c;并捆绑在人们每天使用的无数其他应用程序中。 SQLite 是一个由C语音开发的嵌入式库&#xff…

lv4 嵌入式开发-9 静态库与动态库的使用

目录 1 库的概念 2 库的知识 3 静态库特点 4 静态库 4.1静态库创建 4.2 编译生成目标文件 4.3 创建静态库 hello 4.4 查看库中符号信息 4.5 链接静态库 5 共享库特点 6 共享库 6.1 共享库创建 6.2 编译生成目标文件 6.3 创建共享库 common 6.4为共享库文件创建…

分布式调度 Elastic-job

分布式调度 Elastic-job 1.概述 1.1什么是任务调度 我们可以思考一下下面业务场景的解决方案: 某电商平台需要每天上午10点&#xff0c;下午3点&#xff0c;晚上8点发放一批优惠券某银行系统需要在信用卡到期还款日的前三天进行短信提醒某财务系统需要在每天凌晨0:10分结算…

04-Flask-新版Flask运行方式

新版Flask运行方式 前言老版本运行方式新版本运行方式命令行方式运行pycharm运行 前言 本篇来学习下新版Flask运行方式 老版本运行方式 app.run()&#xff1a;1.0之前版本 # -*- coding: utf-8 -*- # Time : 2023/9/16 # Author : 大海# 导入flask from flask import F…

SWC 流程

一个arxml 存储SWC &#xff08;可以存多个&#xff0c;也可以一个arxml存一个SWC&#xff09;一个arxml 存储 composition &#xff08;只能存一个&#xff09;一个arxml 存储 system description (通过import dbc自动生成system) 存储SWC和composition的arxml文件分开&#…

树回归CART

之前线性回归创建的模型需要拟合所有的样本点&#xff0c;但数据特征众多&#xff0c;关系复杂时&#xff0c;构建全局模型就很困难。之前构建决策树使用的算法是ID3。 ID3 的做法是每次选取当前最佳的特征来分割数据&#xff0c;并按照该特征的所有可能取值来切分。也就是说&…

第2章_freeRTOS入门与工程实践之单片机程序设计模式

本教程基于韦东山百问网出的 DShanMCU-F103开发板 进行编写&#xff0c;需要的同学可以在这里获取&#xff1a; https://item.taobao.com/item.htm?id724601559592 配套资料获取&#xff1a;https://rtos.100ask.net/zh/freeRTOS/DShanMCU-F103 freeRTOS系列教程之freeRTOS入…

目标跟踪:Mobile Vision Transformer-based Visual Object Tracking

论文作者&#xff1a;Goutam Yelluru Gopal,Maria A. Amer 作者单位&#xff1a;Concordia University 论文链接&#xff1a;https://arxiv.org/pdf/2309.05829v1.pdf 项目链接&#xff1a;https://github.com/goutamyg/MVT 内容简介&#xff1a; 1&#xff09;方向&#…

【iOS】单例模式

文章目录 前言一、单例模式简介二、单例模式优缺点优点缺点 三、模式介绍1.懒汉模式2. 饿汉模式 总结 前言 在最初进行OC的学习时笔者了解过单例模式的基本使用&#xff0c;现撰写博客加深对单例模式的理解 一、单例模式简介 单例模式是一种常见的设计模式&#xff0c;其主要…

基于HOG特征提取和GRNN神经网络的人脸表情识别算法matlab仿真,测试使用JAFFE表情数据库

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 1.HOG特征提取 2.GRNN神经网络 3.JAFFE表情数据库 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 .....................................…

Vue3 菜鸟入门(二)超详细:基本框架 模板语法和指令

【学习笔记】Vue3 菜鸟入门&#xff08;二&#xff09;超详细&#xff1a;基本框架 模板语法和指令 关键词&#xff1a;Vue 、Vue 3、Java、Spring Boot、Idea、数据库、一对一、培训、教学本文主要内容含Vue 基本框架 模板语法、指令计划1小时完成&#xff0c;请同学尽量提前…