【Python】FANUC机器人OPC UA通信并记录数据

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):client = Client(url)client.connect()return client
  • 获取根节点和对象节点
def get_root_node(client: Client):return client.get_root_node()
def get_objects_node(client: Client):return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variables

5. 备份旧CSV文件

def backup_csv_file(filename):if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):"""创建客户端实例并连接到服务端"""client = Client(url)client.connect()return clientdef get_root_node(client: Client):"""获取服务器命名空间中的根节点"""return client.get_root_node()def get_objects_node(client: Client):"""获取服务器的对象节点"""return client.get_objects_node()def get_variables(node: Node, path=""):"""遍历所有子节点并返回变量节点的路径和数值"""variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variablesdef backup_csv_file(filename):"""如果CSV文件已存在则备份"""if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/807646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从挑战到机遇:HubSpot如何帮助企业化解出海过程中的难题

企业出海挑战与对策 随着全球化的加速推进,越来越多的企业开始将目光投向海外市场,以寻求更广阔的发展空间。然而,在出海的过程中,企业往往面临着诸多挑战,其中文化差异、法律限制等问题尤为突出。今天运营坛将对这些…

创意解决方案:如何将作品集视频集中于一个二维码或链接中?

引言:随着面试环节的进一步数字化,展示自己的作品集成为了求职过程中的重要一环。但除了使用传统的方式,如百度网盘或直接发送多个视频链接,有没有更便捷的方法将作品集的多个视频放在一个链接中呢? 本文将介绍一种创意解决方案…

RocketMQ 之 IoT 消息解析:物联网需要什么样的消息技术?

作者:林清山(隆基) 前言: 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前&a…

数据库被rmallox勒索病毒加密,如何还原?

近年来,网络安全问题日益严峻,勒索病毒作为其中的一种恶意软件,已成为网络安全领域的一大难题。其中,rmallox勒索病毒以其高度的隐蔽性和破坏性,给不少企业和个人带来了严重损失。本文将从rmallox勒索病毒的特点、传播…

一个简单的UI自动化框架应用介绍

项目框架介绍 该数据自动校验小程序采用POM模型,基于Javaseleniumtestngextentsreportexcel POI开发。 框架核心功能 基于PMO模型将页面封装成java对象,并通过selenuim驱动浏览器进行操作。通过excel POI对excel文件进行操作,通过对比导出…

通过网络api获取日期对应的节假日信息

网络接口获取链接&#xff1a;免费节假日API_原百度节假日API HolidayJudge.h #pragma once#include <QtWidgets/QWidget> #include "ui_HolidayJudge.h"enum DATESTATE {WORK0,//工作日DAYOFF,//休息日HOLIDAY//节假日 };class HolidayJudge : public QWidg…

Linux之线程互斥与同步

1.线程互斥相关概念 临界资源&#xff1a;多线程执行流共享的资源就叫做临界资源 。 临界区&#xff1a;每个线程内部&#xff0c;访问临界自娱的代码&#xff0c;就叫做临界区。 互斥&#xff1a;任何时刻&#xff0c;互斥保证有且只有一个执行流进入临界区&#xff0c;访问临…

谷歌查问题

1&#xff0c;打开 it工具箱-里面啥都有 2&#xff0c;找到谷歌 3&#xff0c;访问gpt

E. Yet Another Walking Robot 又一个行走的机器人(map详解代码)

坐标平面上有一个机器人。最初&#xff0c;机器人位于该点&#xff08;0,0&#xff09; .它的路径被描述为字符串s长度n由字符“L”、“R”、“U”、“D”组成。这些字符中的每一个都对应着一些动作&#xff1a; ‘L’&#xff08;左&#xff09;&#xff1a;表示机器人从该点移…

VSCODE自动更新无法连接远程服务器报错“waiting for server log...“的解决方法

问题描述 一觉醒来打开vscode发现连接远程服务器显示无法连接&#xff0c;终端一直报错“waiting for server log…"&#xff0c;经查是因为vscode自动更新到了1.86&#xff0c;对于远程服务器的linux版本要求较高。这里记录下解决方法。 解决方法 1. 下载vscode便携版…

Sketch3D:用于草图到3D生成的样式一致性指南

Sketch3D: Style-Consistent Guidance for Sketch-to-3D Generation Sketch3D&#xff1a;用于草图到3D生成的样式一致性指南 Wangguandong Zheng 重试 错误原因 Southeast UniversityChina 重试 错误原因 wgdzhengseu.edu.cnHaifeng Xia 重试 错误原因 Southeast Universit…

设计模式总结-简单工厂模式

简单工厂模式 创建型模式创建型模式概述创建型模式种类 简单工厂模式模式定义模式动机模式结构模式分析模式实例与解析实例一&#xff1a;简单电视机工厂实例二&#xff1a;权限管理 模式优缺点简单工厂模式的优点简单工厂模式的缺点 模式适用环境模式扩展 小结 创建型模式 创…

如何在Ubuntu系统使用docker部署DbGate容器并发布至公网可访问

文章目录 1. 安装Docker2. 使用Docker拉取DbGate镜像3. 创建并启动DbGate容器4. 本地连接测试5. 公网远程访问本地DbGate容器5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定公网地址远程访问 本文主要介绍如何在Linux Ubuntu系统中使用Docker部署DbGate数据库管理工…

Unstructured - 提取非结构化数据

文章目录 一、关于 Unstructured核心概念&#x1f680; Beta 功能&#xff1a;Chipper 模型 二、安装方式一&#xff1a;使用 PYPI方式二&#xff1a;使用源码本地安装安装依赖库测试 三、在Docker运行库添加shell构建自己的 Docker image交互运行 四、PDF文档解析示例 一、关于…

【随笔】Git 高级篇 -- 快速定位分支 ^|~(二十三)

&#x1f48c; 所属专栏&#xff1a;【Git】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#x1f496; 欢迎大…

关于鸿蒙HarmonyOS,现在关注什么可以更高效

对于移动端来讲&#xff0c;今年最火的关键词除了裁员&#xff0c;我想就是鸿蒙HarmonyOS了。其实鸿蒙的推出也给安卓端的同学提供了职业发展的新路径或方向。 鸿蒙&#xff0c;原本源自中国神话传说的名字&#xff0c;如今已成为了科技领域的焦点&#xff0c;招聘网站上也出现…

【C语言】C语言题库【附源码+持续更新】

欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 目录 1、练习2-1 Programming in C is fun! 2、练习2-3 输出倒三角图案 3、练习2-4 温度转换 4、练习2-6 计算物体自由下落的距离 5、练习2-8 计算摄氏温度 6、练习2-9 整数四则运算 7、练习2-10 计算分段函数[1…

ELFK (Filebeat+ELK)日志分析系统

一. 相关介绍 Filebeat&#xff1a;轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装 Filebeat&#xff0c;并指定目录与日志格式&#xff0c;Filebeat 就能快速收集数据&#xff0c;并发送给 logstash 进或是直接发给 Elasticsearch 存储&#xff0c;性能上相…

【计算机毕业设计】网上宠物商店管理系统——后附源码

&#x1f389;**欢迎来到我的技术世界&#xff01;**&#x1f389; &#x1f4d8; 博主小档案&#xff1a; 一名来自世界500强的资深程序媛&#xff0c;毕业于国内知名985高校。 &#x1f527; 技术专长&#xff1a; 在深度学习任务中展现出卓越的能力&#xff0c;包括但不限于…

DSP笔记8-通用GPIO

电源类 AD引脚类 系统相关JTAG 时钟 GPIO (general purpose input output)复用&#xff0c; 复用&#xff0c;I/O引脚&#xff0c;外设的功能引脚&#xff0c; 88个GPIO引脚&#xff0c;通用的输入输出口&#xff0c;功能复用的。 GPIO特点 输入电平与TTL电平兼容。>2.0V…