用Python采用Modbus-Tcp的方式读取PLC模块数据

使用计算器得到需要的寄存器地址

这里PLC地址是83,对应的程序16进制读取地址是53

实际上由于PLC地址从1开始,所以这里实际地址应该是52,因为计算机从0开始

在这里插入图片描述

使用网络调试助手生成报文

在这里插入图片描述

使用Python中的内置函数int()。以下是将人员卡号’b’3b44’'转换为十进制的示例代码:

card_number = '3b44'
decimal_number = int(card_number, 16)
print(decimal_number)

使用response[-4:]获取了响应数据的后4个字节作为value96。然后,通过struct.unpack(‘>f’, value96)[0]将4字节的二进制字符串解包为单精度浮点数,并将其打印出来。

#实时电量
request = bytes.fromhex("00 20 00 00 00 06 01 03 00 5F 00 02 ")
client_socket.send(request)
response = client_socket.recv(1024)value96 = response[-4:]
value96 = struct.unpack('>f', value96)[0]
value96=value96*10.00
value96=round(value96,2)
print("实时电量 单精度浮点数: {:.2f}".format(value96))

value40 是一个包含两个字节的字节串,即 b’\x00\x00’,将其转换为二进制,并保留8位。

以下是将字节串转换为二进制并保留8位的示例代码:

value40 = b'\x00\x00'
binary_value = bin(int.from_bytes(value40, byteorder='big'))[2:].zfill(8)
print("状态:", binary_value)

DEMO代码

#!D:/workplace/python
# -*- coding: utf-8 -*-
# @File  : main0523_04.py
# @Author:Romulushe
# @Time    : 2023/5/23 10:38
# @Software: PyCharm
# @Use: PyCharm
import socket
import struct
import time
import binasciiinterval = 5
ip_address = ''#根据实际情况自定义
port_number = 502
polling_interval = float(interval)def print_binary_value(value, name):binary_value = bin(int.from_bytes(value, byteorder='big'))[2:].zfill(8)[::-1]print(f"{name}: {binary_value}")with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:try:client_socket.settimeout(3)client_socket.connect((ip_address, port_number))while True:try:##电压request = bytes.fromhex("00 22 00 00 00 06 01 03 00 53 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value84 = response[-4:]value84= struct.unpack('>f', value84)[0]# value84 = value84 * 10.00value84 = round(value84, 2)print("实时电压 单精度浮点数: {:.2f}".format(value84))##电流request = bytes.fromhex("00 24 00 00 00 06 01 03 00 4F 00 02  ")client_socket.send(request)response = client_socket.recv(1024)value80 = response[-4:]value80 = struct.unpack('>f', value80)[0]# value84 = value80 * 10.00value80 = round(value80, 2)print("实时电流 单精度浮点数: {:.2f}".format(value80))##实时温度1request = bytes.fromhex("00 2E 00 00 00 06 01 03 00 57 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value88 = response[-4:]value88 = struct.unpack('>f', value88)[0]# value88 = value88 * 10.00value88 = round(value88, 2)print("实时温度1 单精度浮点数: {:.2f}".format(value88))#实时温度2request = bytes.fromhex("00 2C 00 00 00 06 01 03 00 5B 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value92 = response[-4:]value92 = struct.unpack('>f', value92)[0]# value92 = value92 * 10.00value92 = round(value92, 2)print("实时温度2 单精度浮点数: {:.2f}".format(value92))##车速request = bytes.fromhex("00 31 00 00 00 06 01 03 00 3B 00 01 ")client_socket.send(request)response = client_socket.recv(1024)speed= binascii.hexlify(response)[-4:]speed = int(speed, 16)print("车速:", speed)# #实时电量request = bytes.fromhex("00 20 00 00 00 06 01 03 00 5F 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value96 = response[-4:]value96 = struct.unpack('>f', value96)[0]value96=value96*10.00value96=round(value96,2)print("实时电量 单精度浮点数: {:.2f}".format(value96))#充放电状态#超速报警#低电量报警#温度过高1#温度过高2#00 18 00 00 00 06 01 03 00 28 00 01request = bytes.fromhex("00 33 00 00 00 06 01 03 00 28 00 01 ")client_socket.send(request)response = client_socket.recv(1024)value40 = response[-2:]value40= bin(int.from_bytes(value40, byteorder='big'))[2:].zfill(8)[::-1]print("充放电状态:",value40[4])print("超速报警:", value40[0])print("低电量报警:", value40[2])print("温度1过高:", value40[3])print("温度2过高:", value40[7])#车牌号#request = bytes.fromhex("00 00 00 00 00 06 01 03 00 1E 00 02 ")  # 车牌号client_socket.send(request)response = client_socket.recv(1024)car_num= binascii.hexlify(response)[-8:-4]car_num = int(car_num, 16)print("车牌号:", car_num)#人员卡号request = bytes.fromhex("00 00 00 00 00 06 01 03 00 42 00 02 ")  # 人员卡号client_socket.send(request)response = client_socket.recv(1024)card = binascii.hexlify(response)[-8:-4]card = int(card, 16)print("人员卡号:", card)except socket.timeout:print('TIMEOUT ERROR: 服务器未及时响应')except Exception as e:print('CONNECT ERROR:', e)

DEMO结果

E:\software\python\python.exe E:/projects/Forklift/t2.py实时电压 单精度浮点数: 48.85实时电流 单精度浮点数: 0.26实时温度1 单精度浮点数: 31.10实时温度2 单精度浮点数: 30.85车速: 0实时电量 单精度浮点数: 68.52充放电状态: 0超速报警: 0低电量报警: 0温度1过高: 0温度2过高: 0车牌号: 15172人员卡号: 10763

其他:地址表

在这里插入图片描述

附带数据库&log存储的代码:

#!D:/workplace/python
# -*- coding: utf-8 -*-
# @File  : main0523_04.py
# @Author:Romulushe
# @Time    : 2023/5/23 10:38
# @Software: PyCharm
# @Use: PyCharm
import socket
import struct
import time
import binascii
import pymysql
import os,sys,datetime,logginginterval = 5
ip_address = ''#根据实际情况自定义
port_number = ''#根据实际情况自定义
polling_interval = float(interval)base_path = os.path.dirname(os.path.realpath(sys.argv[0]))def get_log_path():return os.path.join(base_path, 'logs')def cleanup_logs():log_path = get_log_path()current_time = time.time()for file_name in os.listdir(log_path):file_path = os.path.join(log_path, file_name)if os.path.isfile(file_path):creation_time = os.path.getctime(file_path)if current_time - creation_time > (3 * 24 * 60 * 60):os.remove(file_path)def configure_logging():log_path = get_log_path()os.makedirs(log_path, exist_ok=True)log_filename = get_log_filename()log_file = os.path.join(log_path, log_filename)logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', filename=log_file)def get_log_filename():now = datetime.datetime.now()return now.strftime("%Y-%m-%d_%H-%M.log")def create_new_log():log_path = get_log_path()log_files = os.listdir(log_path)if len(log_files) >= 20:oldest_file = min(log_files)os.remove(os.path.join(log_path, oldest_file))log_filename = get_log_filename()log_filepath = os.path.join(log_path, log_filename)return log_filepathdef check_log_size(log_filepath):log_size = os.path.getsize(log_filepath)if log_size > 2 * 1024 * 1024:# 创建新的日志文件new_log_filepath = create_new_log()try:shutil.move(log_filepath, new_log_filepath)return new_log_filepathexcept PermissionError:insert_log(logger, f'{log_filepath} {PermissionError}', log_filepath)time.sleep(0.1)return log_filepathreturn log_filepathdef insert_log(logger, log_message, log_filepath):log_filepath = check_log_size(log_filepath)# 创建文件处理器file_handler = RotatingFileHandler(log_filepath, maxBytes=2 * 1024 * 1024, backupCount=1)file_handler.setLevel(logging.DEBUG)formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)# 添加文件处理器到日志记录器logger.addHandler(file_handler)try:logger.debug(log_message)except PermissionError:insert_log(logger, f'{log_message} {PermissionError}', log_filepath)time.sleep(0.1)  # 延迟0.1秒# 移除文件处理器logger.removeHandler(file_handler)
# 连接数据库
def connect_database(host, port, user, password, db_name):try:conn = pymysql.connect(host=host, port=port, user=user, password=password, db=db_name)# print("成功连接到数据库")return connexcept pymysql.Error as e:insert_log(logger, f'DATABASE CONNECT FAILED', log_filepath)# print(f"数据库连接失败: {e}")# 插入数据
def insert_data(conn, types,table_name, create_time, location_no, parameter_desc, location, param_value):try:cursor = conn.cursor()sql = f"INSERT INTO {table_name} (TYPE, CREATE_TIME, LOCATION_NO, PARAMETER_DESC, LOCATION, PARAMVALUE) " \f"VALUES (%s, %s, %s, %s, %s, %s)"cursor.execute(sql, (types,create_time, location_no, parameter_desc, location, param_value))conn.commit()# print(f"数据插入成功: {create_time}, {parameter_desc}: {param_value}")cursor.close()except pymysql.Error as e:insert_log(logger, f'DATABASE INSERT DATA FAILED', log_filepath)# print(f"数据插入失败: {e}")with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:configure_logging()cleanup_logs()log_filepath = create_new_log()logger = logging.getLogger()logger.setLevel(logging.DEBUG)try:client_socket.settimeout(3)client_socket.connect((ip_address, port_number))# 连接数据库host = ''#根据实际情况自定义port = 3306user = ''#根据实际情况自定义password = ''#根据实际情况自定义db_name = ''#根据实际情况自定义table_name =''#根据实际情况自定义try:# 连接数据库conn = connect_database(host, port, user, password, db_name)except Exception as e:print("COON ERROR:",e)while True:try:types ='叉车'location ='F2堆02'##电压request = bytes.fromhex("00 22 00 00 00 06 01 03 00 53 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value84 = response[-4:]value84= struct.unpack('>f', value84)[0]# value84 = value84 * 10.00value84 = round(value84, 2)print("实时电压 单精度浮点数: {:.2f}".format(value84))location_no ='F01-1'parameter_desc ='实时电压'param_value=value84insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)##电流request = bytes.fromhex("00 24 00 00 00 06 01 03 00 4F 00 02  ")client_socket.send(request)response = client_socket.recv(1024)value80 = response[-4:]value80 = struct.unpack('>f', value80)[0]# value84 = value80 * 10.00value80 = round(value80, 2)print("实时电流 单精度浮点数: {:.2f}".format(value80))location_no = 'F01-2'parameter_desc = '实时电流'param_value=value80insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)##实时温度1request = bytes.fromhex("00 2E 00 00 00 06 01 03 00 57 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value88 = response[-4:]value88 = struct.unpack('>f', value88)[0]# value88 = value88 * 10.00value88 = round(value88, 2)print("实时温度1 单精度浮点数: {:.2f}".format(value88))location_no = 'F01-3'parameter_desc = '实时温度1'param_value = value88insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)#实时温度2request = bytes.fromhex("00 2C 00 00 00 06 01 03 00 5B 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value92 = response[-4:]value92 = struct.unpack('>f', value92)[0]# value92 = value92 * 10.00value92 = round(value92, 2)print("实时温度2 单精度浮点数: {:.2f}".format(value92))location_no = 'F01-4'parameter_desc = '实时温度2'param_value = value92insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)##车速request = bytes.fromhex("00 31 00 00 00 06 01 03 00 3B 00 01 ")client_socket.send(request)response = client_socket.recv(1024)speed= binascii.hexlify(response)[-4:]speed = int(speed, 16)print("车速:", speed)location_no = 'F01-5'parameter_desc = '车速'param_value = speedinsert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)# #实时电量request = bytes.fromhex("00 20 00 00 00 06 01 03 00 5F 00 02 ")client_socket.send(request)response = client_socket.recv(1024)value96 = response[-4:]value96 = struct.unpack('>f', value96)[0]value96=value96*10.00value96=round(value96,2)print("实时电量 单精度浮点数: {:.2f}".format(value96))location_no = 'F01-6'parameter_desc = '实时电量'param_value = value96insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)#充放电状态#超速报警#低电量报警#温度过高1#温度过高2#00 18 00 00 00 06 01 03 00 28 00 01request = bytes.fromhex("00 33 00 00 00 06 01 03 00 28 00 01 ")client_socket.send(request)response = client_socket.recv(1024)value40 = response[-2:]value40= bin(int.from_bytes(value40, byteorder='big'))[2:].zfill(8)[::-1]print("充放电状态:",value40[4])print("超速报警:", value40[0])print("低电量报警:", value40[2])print("温度1过高:", value40[3])print("温度2过高:", value40[7])location_no = 'F01-7'parameter_desc = '充放电状态'param_value = value40[4]insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)location_no = 'F01-8'parameter_desc = '超速报警'param_value = value40[0]insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)location_no = 'F01-9'parameter_desc = '低电量报警'param_value = value40[2]insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)location_no = 'F01-10'parameter_desc = '温度1过高'param_value = value40[3]insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)location_no = 'F01-11'parameter_desc = '温度2过高'param_value = value40[7]insert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)# 车牌号#request = bytes.fromhex("00 00 00 00 00 06 01 03 00 1E 00 02 ")  # 车牌号client_socket.send(request)response = client_socket.recv(1024)car_num = binascii.hexlify(response)[-8:-4]car_num = int(car_num, 16)print("车牌号:", car_num)location_no = 'F01-12'parameter_desc = '车牌号'param_value = car_numinsert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)# 人员卡号request = bytes.fromhex("00 00 00 00 00 06 01 03 00 42 00 02 ")  # 人员卡号client_socket.send(request)response = client_socket.recv(1024)card = binascii.hexlify(response)[-8:-4]card = int(card, 16)print("人员卡号:", card)location_no = 'F01-13'parameter_desc = '人员卡号'param_value = cardinsert_data(conn, types, table_name, create_time, location_no, parameter_desc, location, param_value)except socket.timeout:print('TIMEOUT ERROR: 服务器未及时响应')except Exception as e:print('CONNECT ERROR:', e)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/4522.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决appium-doctor报gst-launch-1.0.exe and/or gst-inspect-1.0.exe cannot be found

一、下载gst-launch-1.0.exe and gst-inspect-1.0.exe 下载地址:Download GStreamer runtime installer 和 development installer 两个应用程序都要下载并安装 二、运行安装 下载好后点击安装会弹出如下界面,点击“更多信息”展开,点击“仍然…

拓宽“主航道”的Serverless与EDA领域,亚马逊云科技不断创新开拓

在新潮如走马灯般变换的时尚界,每隔几年就会刮起一阵复古风。被誉为“时尚教父”的著名设计师安德烈莱昂塔利曾说:“时尚总是在寻找新的灵感和方向,而复古是其中一个重要的来源。” 无独有偶。日新月异的高科技领域也会出现公认的“过时”…

msys2安装与配置: 在windows上使用linux工具链g++和包管理工具pacman C++开发

文章目录 为什么用这个msys2下载、doc安装,很简单初次运行,做些配置更新软件安装与卸载方法安装必要的软件包设置win环境变量在windows terminal中使用在vscode中使用 为什么用这个msys2 方便windows上的C开发demo,不需要VS了方便C开发安装o…

跨越山海,爱在滇西|拓数派为滇西孩子点亮科学梦想

近日,拓数派在共青团浙江大学委员会、景东县教育体育局和景东团县委等单位指导下开展“爱在滇西”2023年公益助学活动,并携手浙大国际科创中心、浙大微纳电子学院、启真科技控股公司和北京德恒律所共同向景东浙大求是中学捐赠爱心助学金,用于…

【RISC-V】昉·星光 2单板计算机初始调试记录

博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 本人就职于国际知名终端厂商,负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G算力网络技术标准研究。 博客…

JavaSwing+MySQL的飞机订票系统(内含oracle版本)

点击以下链接获取源码: https://download.csdn.net/download/qq_64505944/88055544 JDK1.8 MySQL5.7 功能:接收客户端发来的数据、处理客户端发来的数据、发送数据包到客户端;客户端:查询所有航班的信息、查看自己所定的票、订票…

mac批量提取文件夹的名称,怎么操作?

mac批量提取文件夹的名称,怎么操作?很多小伙伴想知道在mac电脑上可以一键快速批量的将大量文件夹的名提取出来,而不是采用一个一个名称提取的方法,这是一个有利于提高工作效率的办法,这一项技能在网上几乎找不到解决办…

cloudwatch agent通过squid代理上传指标到cloudwatch

1.安装cloudwatch agent代理 1.1、安装cloudwatch代理包 2.2、更改程序包的目录 3.3.创建cloudwatch代理配置文件 运行以下命令配置向导 sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard 按照提示选择个性化服务 2.安装squid服务器 简…

Git基本操作命令

** 创建仓库 **,用于被git管理 第一步: $ mkdir learngit $ cd learngit $ pwd /Users/michael/learngit第二步: 通过git init命令把这个目录变成Git可以管理的仓库: $ git init** 提交代码 **: 第一步&#xff…

JVM概述

1.什么是虚拟机? 虚拟机就是一台虚拟的计算机。它是一款软件,它分为系统虚拟机(比如VMware)和程序虚拟机(比如Java虚拟机)。 2.JVM的作用 Java虚拟机负责装载字节码文件到内部,编译为对应平台上的机器码指令来执行,还有自动的垃…

系统学习Linux-Rsync远程数据同步服务(三)

一、概述 rsync是linux 下一个远程数据同步工具 他可通过LAN/WAN快速同步多台主机间的文件和目录,并适当利用rsync 算法减少数据的传输 会对比两个文件的不同部分,传输差异部分,因此传输速度相当快 rsync可拷贝、显示目录属性&#xff0c…

透视表可视化简单案例

import pandas as pd import numpy as np import os basepath/Users/kangyongqing/Documents/kangyq/202307/标准版学期制C2/pathos.path.join(basepath,02freetime.csv) dtpd.read_csv(path,dtype{shifen:object}) print(dt.head()) import matplotlib.pyplot as pltfor i in …

自动化测试-selenium环境搭建

文章目录 1. 什么是自动化2. 自动化测试分类3. selenium的环境搭建4. 测试selenium 1. 什么是自动化 自动化是指使用软件工具、脚本或程序来执行一系列的任务、操作或流程,而无需人工干预或指导。 自动化测试:使用自动化工具和脚本来执行测试用例&#x…

奖牌数领跑是为何?解码长沙华中医卫科技中等职业学校的国防教育成功之道

近些年,越来越多的学生、家长、企业开始重新审视职业教育的价值。在产教融合的大背景下,职业教育已经成为了高新产业发展的人才引擎,职业教育的教学模式、软硬件配置乃至未来毕业生的就业情况,已完全变了模样。 实际上&#xff0…

Windows 组策略 部署打印机

一、服务端 1、打印机管理:添加打印机 2、选择打印机 3、第一次安装,选择这个 4、下载驱动,从磁盘安装 5、已成功安装 6、选中打印机右击属性:列出目录 7、创建一个组策略 8、组策略设置 用户设置 → 首选项 → 控制面板 → 打印…

shell脚本备份数据库

首先是在本地windows环境下尝试备份数据库 打开mysql的bin目录,然后在地址栏cmd,进入cmd界面,输入mysqldump命令,-u输入用户名,-p输入密码 还有数据库名称,以及后面要保存到的位置 mysqldump -uroot -p tes…

webpack插件compression-webpack-plugin

Vue配置compression-webpack-plugin实现Gzip压缩 1、为什么要压缩? 打包的时候开启gzip可以很大程度减少包的大小,页面大小可以变为原来的30%甚至更小,非常适合于上线部署。更小的体积对于用户体验来说就意味着更快的加载速度以及更好的用户…

Langchain-ChatGLM配置文件参数测试

1 已知可能影响对话效果的参数(位于configs/model_config.py文件): # 文本分句长度 SENTENCE_SIZE 100# 匹配后单段上下文长度 CHUNK_SIZE 250 # 传入LLM的历史记录长度 LLM_HISTORY_LEN 3 # 知识库检索时返回的匹配内容条数 VECTO…

python web开发之WSGI/uwsgi/uWSGI详解

1. 三者的定义 WSGI是一种通信协议。uwsgi是一种传输协议。uWSGI是实现了uwsgi和WSGI两种协议的Web服务器。 2.三者的使用场景 WSGI,全称 Web Server Gateway Interface,是为 Python 语言定义的 Web 服务器和 Web 应用程序或框架之间的一种简单而通用的接…

基于微信小程序的求职招聘系统设计与实现(Java+spring boot+MySQL+微信小程序)

获取源码或者论文请私信博主 演示视频: 基于微信小程序的求职招聘系统设计与实现(Javaspring bootMySQL微信小程序) 使用技术: 前端:html css javascript jQuery ajax thymeleaf 微信小程序 后端:Java s…