python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。
一、业务描述
我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。
1、ModBusTCP服务端交互流程
大概类似如下交互时序,其中PLC将用我们的脚本代替。
2、控制
根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。
3、状态
根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。
4、结果
根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。
5、 地址空间
(1)控制
类型:HoldingRegisters、Coils
起始地址与寄存器数量:自定义
(2)状态
类型:HoldingRegisters、DiscreteInputs、InputRegisters
起始地址与寄存器数量:自定义
(3)结果
类型:HoldingRegisters、InputRegisters
起始地址与寄存器数量:自定义
二、程序整体设计
class myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1: # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1] # 翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1: # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":...else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)] # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":...elif addrtype=="DiscreteInputs":...else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):if addrtype=="HoldingRegisters":...elif addrtype=="InputRegisters":...else:raise ValueError("plc_out_addrtype的值错误!")if __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():myclient = myModBusTCPclient(MDclient)myclient.ctrl(ByteSwap=1, binary="1100000000000000")time.sleep(1)myclient.status(ByteSwap=1)
1、程序结构
上述代码定义了一个名为 myModBusTCPclient
的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:
构造函数 __init__
:
接收一个参数 obj
,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj
中。
custom_binary
方法:
将给定的整数 num
转换为指定长度 length
的二进制字符串。可选参数 ByteSwap
用于指定是否进行字节交换。如果 ByteSwap
为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。
custom_num
方法:
接收一个二进制字符串 binary
,根据给定的长度 length
和是否进行字节交换 ByteSwap
将其转换为整数。返回转换得到的整数。
ctrl
方法:
根据给定的地址类型 addrtype
(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap
、二进制字符串 binary
、Modbus 地址 address
和从站号 slave
,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers
方法写入寄存器。
status
方法:
根据给定的地址类型 addrtype
、是否进行字节交换 ByteSwap
、寄存器地址 reg_addr
、寄存器数量 reg_nb
和从站号 slave
,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers
方法读取寄存器。
plc_out
方法:
根据给定的地址类型 addrtype
和是否进行字节交换 ByteSwap
,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。
if __name__ == "__main__":
部分:
在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient
。通过 myModBusTCPclient
类创建了一个自定义的客户端对象 myclient
。调用了 ctrl
方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status
方法,从 Modbus 服务器读取数据。
2、不同地址空间的请求
在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。
为了满足业务,需要对原有的pymodbus进行封装改造。
为了满足大端序和小端序,需要加入字节交换的操作。