【Python】FANUC机器人OPC UA通信并记录数据

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):client = Client(url)client.connect()return client
  • 获取根节点和对象节点
def get_root_node(client: Client):return client.get_root_node()
def get_objects_node(client: Client):return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variables

5. 备份旧CSV文件

def backup_csv_file(filename):if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'def getLogger(filename: str):if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)logger = logging.Logger(filename[:-4].upper(), logging.INFO)formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")fh.setFormatter(formatter)ch = logging.StreamHandler()ch.setFormatter(formatter)logger.addHandler(fh)logger.addHandler(ch)return loggerLOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):"""创建客户端实例并连接到服务端"""client = Client(url)client.connect()return clientdef get_root_node(client: Client):"""获取服务器命名空间中的根节点"""return client.get_root_node()def get_objects_node(client: Client):"""获取服务器的对象节点"""return client.get_objects_node()def get_variables(node: Node, path=""):"""遍历所有子节点并返回变量节点的路径和数值"""variables = {}children: List[Node] = node.get_children()for child in children:try:name: ua.QualifiedName = child.get_browse_name()new_path = f"{path}/{name.Name}"if child.get_node_class() == ua.NodeClass.Variable:value = child.get_value()if isinstance(value, list):value = ','.join(str(x) for x in value)if isinstance(value, str):value = value.replace('\n', '\\n').replace(',', ' ')variables[new_path] = valueelse:variables.update(get_variables(child, new_path))except Exception as e:LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")return variablesdef backup_csv_file(filename):"""如果CSV文件已存在则备份"""if not os.path.exists(BACKUP_DIR):os.makedirs(BACKUP_DIR)if os.path.exists(filename):modification_time = os.path.getmtime(filename)modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"try:shutil.move(filename, new_filename)LOGGER.info(f"文件已移动到 {new_filename}")except Exception as e:LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")if __name__ == "__main__":try:client = connect_to_server(SERVER_URL)root_node = get_root_node(client)objects_node = get_objects_node(client)backup_csv_file(CSV_FILENAME)with open(CSV_FILENAME, mode='w', newline='') as csvfile:num = 0while True:variables = get_variables(objects_node)if num == 1:writer = csv.DictWriter(csvfile, fieldnames=variables.keys())writer.writeheader()writer.writerow(variables)csvfile.flush()num += 1LOGGER.info("数据记录:" + str(num))time.sleep(1)except KeyboardInterrupt:print("程序被用户中断")finally:client.disconnect()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/807646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

聊聊Redis消息队列stream

前言 本期和大家一起探讨了如何基于 redis 实现消息队列,其中实现方案包括三类: redis list:最简单粗暴的实现,存在问题包括:不支持发布/订阅模式、消费端缺少 ack 机制redis pub/sub:支持发布/订阅模式&…

无“相关性”是指商品与分享内容无相关性,下列哪个行为不属于(无)相关性”违规?()

需要查看更多试题和答案,可以前往(题海舟试题答案)进行搜题查看。可以搜“题干关键词”。 无“相关性”是指商品与分享内容无相关性,下列哪个行为不属于(无)相关性”违规?() A.篮球比赛直播,售卖球衣、球鞋、球类与球…

从挑战到机遇:HubSpot如何帮助企业化解出海过程中的难题

企业出海挑战与对策 随着全球化的加速推进,越来越多的企业开始将目光投向海外市场,以寻求更广阔的发展空间。然而,在出海的过程中,企业往往面临着诸多挑战,其中文化差异、法律限制等问题尤为突出。今天运营坛将对这些…

快速开始vue3

版本 node (20.11.1)vue3 (3.4.21)脚手架创建项目并运行 安装脚手架并创建项目npm create vue@latest这一指令将会安装并执行 create-vue,它是 Vue 官方的项目脚手架工具 2) 安装以下进行选择 ## 配置项目名称 √ Project name: vue3_test ## 是否添加TypeScript支持 √ Add…

创意解决方案:如何将作品集视频集中于一个二维码或链接中?

引言:随着面试环节的进一步数字化,展示自己的作品集成为了求职过程中的重要一环。但除了使用传统的方式,如百度网盘或直接发送多个视频链接,有没有更便捷的方法将作品集的多个视频放在一个链接中呢? 本文将介绍一种创意解决方案…

如何使用try-with-resources关闭非自己创建的InputStream

如何使用try-with-resources关闭非自己创建的InputStream 在Java中,不论InputStream是自己创建的还是由外部提供的,只要它是AutoCloseable的实例,你都可以使用try-with-resources语句来确保它在不再需要时被自动关闭。 try-with-resources语…

RocketMQ 之 IoT 消息解析:物联网需要什么样的消息技术?

作者:林清山(隆基) 前言: 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前&a…

数据库被rmallox勒索病毒加密,如何还原?

近年来,网络安全问题日益严峻,勒索病毒作为其中的一种恶意软件,已成为网络安全领域的一大难题。其中,rmallox勒索病毒以其高度的隐蔽性和破坏性,给不少企业和个人带来了严重损失。本文将从rmallox勒索病毒的特点、传播…

一个简单的UI自动化框架应用介绍

项目框架介绍 该数据自动校验小程序采用POM模型,基于Javaseleniumtestngextentsreportexcel POI开发。 框架核心功能 基于PMO模型将页面封装成java对象,并通过selenuim驱动浏览器进行操作。通过excel POI对excel文件进行操作,通过对比导出…

Docker搭建CouchPotato

使用 CouchPotato Docker 镜像搭建电影下载管理器 CouchPotato 是一个电影下载管理器,它可以帮助用户自动搜索、下载和管理电影。通过 Docker 镜像,可以在服务器上轻松部署 CouchPotato,并让其运行在容器中,以便在任何设备上访问…

通过网络api获取日期对应的节假日信息

网络接口获取链接&#xff1a;免费节假日API_原百度节假日API HolidayJudge.h #pragma once#include <QtWidgets/QWidget> #include "ui_HolidayJudge.h"enum DATESTATE {WORK0,//工作日DAYOFF,//休息日HOLIDAY//节假日 };class HolidayJudge : public QWidg…

Linux之线程互斥与同步

1.线程互斥相关概念 临界资源&#xff1a;多线程执行流共享的资源就叫做临界资源 。 临界区&#xff1a;每个线程内部&#xff0c;访问临界自娱的代码&#xff0c;就叫做临界区。 互斥&#xff1a;任何时刻&#xff0c;互斥保证有且只有一个执行流进入临界区&#xff0c;访问临…

关于本博客作者的声明

鉴于鸭某兽公司的恶意推测、试图抹黑本人及本人所在公司&#xff0c;臆测本人及本人所在公司对本人博客名称进行模仿。为了对相关情况进行澄清&#xff0c;现本人声明如下&#xff1a; 本博客&#xff08;名称&#xff1a;小飞鱼通达二开&#xff09;&#xff0c; 网址为&…

【Unity】如何让GameObject的长宽自适应屏幕分辨率

【背景】 用一个长方形的GameObject代表电影屏幕,希望这个GameObject能够随着当前屏幕分辨率的大小适当变化,Texture会呈现当前屏幕的桌面画面,如果不一致会比例失调。 【分析】 Awake函数中就完成处理。获得当前屏幕分辨率,用适当倍数计算后付给GameObject的Transform下…

谷歌查问题

1&#xff0c;打开 it工具箱-里面啥都有 2&#xff0c;找到谷歌 3&#xff0c;访问gpt

E. Yet Another Walking Robot 又一个行走的机器人(map详解代码)

坐标平面上有一个机器人。最初&#xff0c;机器人位于该点&#xff08;0,0&#xff09; .它的路径被描述为字符串s长度n由字符“L”、“R”、“U”、“D”组成。这些字符中的每一个都对应着一些动作&#xff1a; ‘L’&#xff08;左&#xff09;&#xff1a;表示机器人从该点移…

VSCODE自动更新无法连接远程服务器报错“waiting for server log...“的解决方法

问题描述 一觉醒来打开vscode发现连接远程服务器显示无法连接&#xff0c;终端一直报错“waiting for server log…"&#xff0c;经查是因为vscode自动更新到了1.86&#xff0c;对于远程服务器的linux版本要求较高。这里记录下解决方法。 解决方法 1. 下载vscode便携版…

Vue3 v-bind绑定css中的var变量实现动态样式

在日常的开发中&#xff0c;我们常常遇到这样的需求&#xff1a;点击一个button改变页面中某个元素的样式&#xff0c;在这样的场景中&#xff0c;我们可以使用v-bind绑定css中的var变量&#xff0c;来动态的切换元素的样式 一个小栗子&#xff0c;在setup语法糖环境下&#xf…

敏捷开发是什么?敏捷开发的流程有什么?

敏捷开发是什么&#xff1f; 敏捷开发是一种灵活且迭代的软件开发方法论&#xff0c;它强调快速响应变化、高效协作、持续交付价值以及高度关注业务目标与客户满意度。敏捷开发采用短周期&#xff08;称为“迭代”或“冲刺”&#xff09;来开发、测试和交付可用的产品增量&…

日志埋点功能

前言 开发中经常会有日志埋点需求, 用于统计接口的请求量、处理速度等等,为此本篇幅从一下几个维度进行分析,从零到有搭建。 技术架构解析 实现日志埋点功能,从字面意思就可以想到功能大致分为两个方向: 1、 埋点功能(logback + 封装通用SDK方法 + 共享文件夹(如果是多…