python3实现类似expect shell的交互式与SFTP的脚本

前面写过一篇关于python实现类似expect shell的交互式能力的文章,现在补全一下加上sftp的能力脚本。
例子在代码中__example()方法。

依赖paramiko库,所以需要执行pip install paramiko来安装。

import os
import queue
import re
import threading
import time
import traceback
import stat
import datetimeimport paramiko
from paramiko import SSHClient, SSHException, SFTPClient, Channelclass SFTPMultipleClient(object):"""支持多sftp连接拉取文件支持快捷执行远程命令"""def __init__(self, host, port, username, pwd, work_count=1) -> None:super().__init__()# client = SSHClient()# client.set_missing_host_key_policy(paramiko.AutoAddPolicy())# client.connect(hostname=host, timeout=60, port=port, username=username, password=pwd)# self.ssh_client = client# self.sftp_client = client.open_sftp()self.host = hostself.port = portself.username = usernameself.pwd = pwdself.work_count = work_countself.thread_local_data = threading.local()self.pull_queue = queue.Queue()self.ssh_clientlist = []self.sftp_clientlist = []self.thread_list = {}self.email_send_files = []self.ptySessions = [](self.ssh_client, self.sftp_client) = self._gen_sftp_client(host, port, username,pwd)  # type:(SSHClient, SFTPClient)for i in range(work_count):self.thread_list[self._start_back_task(self._pull_event_handler)] = Falsedef _start_back_task(self, fun, args=()):t = threading.Thread(target=fun, daemon=True, args=args)t.start()return tdef remote_scp_progress(self, a, b):"""远程scp进度打印:param a: 当前已传输大小(单位字节):param b: 文件总大小(单位字节)"""if a > b:a = bsecond = self.thread_local_data.timer.last_press_time_deltaif second > 1 or a == b:self.thread_local_data.timer.press()speed = a - self.thread_local_data.last_progressself.thread_local_data.last_progress = aremaining_time_second = (b - a) / speeds = self.thread_local_data.get_file_desc + "%s  %s %02d%s %dKB/s %02d:%02d " % \(a, b, int(a / b * 100), "%", speed / 1024, remaining_time_second / 60, remaining_time_second % 60)print(s)# print(s, end="", flush=True)# _back = ""# for i in range(len(s)):#     _back += '\b'# print(_back, end="")def _gen_sftp_client(self, host, port, username, pwd) -> (SSHClient, SFTPClient):"""生成sftp客户端当连接断开会进行重试,最大重试连接5次"""_try_count = 0_ssh_client = SSHClient()_ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())_sftp_client = Nonetname = threading.current_thread().namestime = 1while _try_count < 5:connect_failed = Falsetry:_ssh_client.connect(hostname=host, timeout=60, port=port, username=username,password=pwd)_sftp_client = _ssh_client.open_sftp()print(tname + " ssh连接成功.")breakexcept Exception as e:connect_failed = Truetraceback.print_exc()if connect_failed:try:_ssh_client.close()if _sftp_client:_sftp_client.close()except Exception as e:traceback.print_exc()_try_count += 1if _try_count == 5:raise Exception("尝试重新连接达到最大次数.断开连接.")traceback.print_exc()print(tname + " 第%d次重新连接ssh..." % _try_count)stime *= 2time.sleep(stime)self.ssh_clientlist.append(_ssh_client)self.sftp_clientlist.append(_sftp_client)self.thread_local_data.ssh_client = _ssh_clientself.thread_local_data.sftp_client = _sftp_clientreturn _ssh_client, _sftp_clientdef _pull_event_handler(self):"""拉取任务处理"""self._gen_sftp_client(self.host, self.port, self.username, self.pwd)# self.thread_local_data.ssh_client = _ssh_client# self.thread_local_data.sftp_client = _sftp_clientwhile True:self.thread_list[threading.current_thread()] = Falsedata = self.pull_queue.get(block=True, timeout=None)self.thread_list[threading.current_thread()] = True# 通过队列插入None来结束线程,如果有n个线程在监听队列,那么需要n个None来结束if data is None:self.thread_list[threading.current_thread()] = Falsebreaktry:self._remote_scp(self.thread_local_data.ssh_client, self.thread_local_data.sftp_client,data.remote_path,data.local_path, data.max_size, callback=self.remote_scp_progress)fun = data.call_bak_funif fun:call_bak_fun_args = ()call_bak_fun_kwargs = {}if type(fun) in [list, tuple]:if not callable(fun[0]):raise Exception("call_bak_fun 不是可调用对象")if len(fun) > 1:call_bak_fun_args = fun[1]if len(fun) > 2:call_bak_fun_kwargs = fun[2]else:if not callable(fun):raise Exception("call_bak_fun 不是可调用对象")fun[0](*call_bak_fun_args, **call_bak_fun_kwargs)except Exception as e:traceback.print_exc()# pull_queue.put(data)# print("线程%s 结束." % threading.current_thread().name)def _remote_scp(self, _ssh_client, _sftp_client, _remote_path, _local_path, max_size=None, callback=None):reconnect = Falsetry:if os.path.exists(_local_path):rf_stat = _sftp_client.lstat(_remote_path)lf_stat = os.stat(_local_path)if rf_stat.st_size == lf_stat.st_size:print(_local_path + " already exists.")returnif max_size and rf_stat.st_size > max_size:  # 如果大于1m则认为是空板图片print(_local_path + " > " + max_size + " skipped.")returnprint(threading.current_thread().name + "copy file:%s << %s\t" % (_local_path, _remote_path))self.thread_local_data.get_file_desc = "copy file:%s << %s\t" % (_local_path, _remote_path)self.thread_local_data.timer = Timer()self.thread_local_data.last_progress = 0_sftp_client.get(_remote_path, _local_path, callback=callback)except FileNotFoundError as e:traceback.print_exc()print("continue...")except SSHException as e:traceback.print_exc()reconnect = Trueexcept OSError as e:print("os error====")traceback.print_exc()reconnect = Trueif reconnect:print("重新连接ssh...")_sftp_client.close()_ssh_client.close()self.sftp_clientlist.remove(_sftp_client)self.ssh_clientlist.remove(_ssh_client)_ssh_client, _sftp_client = self._gen_sftp_client(self.host, self.port, self.username, self.pwd)self._remote_scp(_ssh_client, _sftp_client, _remote_path, _local_path, max_size, callback)if callback:print()def pull_file(self, remote_path, local_path, max_size=None, print_progress=False):"""拉取文件:param remote_path: 远程文件路径:param local_path: 本地文件路径:param max_size: 远程文件如果超过了这个大小,则不做拉取:param print_progress: 是否打印文件拉取进度"""if print_progress:self._remote_scp(self.ssh_client, self.sftp_client, remote_path, local_path, max_size,self.remote_scp_progress)else:self._remote_scp(self.ssh_client, self.sftp_client, remote_path, local_path, max_size)def submit_pull_work(self, remote_path, local_path, max_size=None, call_bak_fun=None):"""提交拉取文件任务:param remote_path: 远程文件路径:param local_path: 本地文件路径:param max_size: 远程文件如果超过了这个大小,则不做拉取:param call_bak_fun: 拉取完成回调方法"""e_data = Task(remote_path, local_path, max_size, call_bak_fun)self.pull_queue.put(e_data)def exec_command(self, command, *args, raise_e=True, **kwargs):"""执行远程命令"""stdin, stdout, stderr = self.ssh_client.exec_command(command, *args, **kwargs)if raise_e and stdout.channel.recv_exit_status() != 0:raise Exception(stderr.readline())return stdin, stdout, stderrdef mkdir(self, path, recursive=True, mode=0o777):"""创建文件夹:param path: 远程文件夹:param recursive: 是否递归创建 默认true:param mode: 权限,默认0o777-全部权限"""if not recursive:self.sftp_client.mkdir(path, mode)returndirs = path.split(os.path.sep)current_dir = "/"for i, d in enumerate(dirs):d = os.path.join(current_dir, d)create = Truetry:if stat.S_ISDIR(self.sftp_client.stat(d).st_mode):create = Falseexcept FileNotFoundError:create = Trueif create:self.sftp_client.mkdir(d, mode)current_dir = ddef destroy(self):"""销毁当前实例"""while not self.pull_queue.empty():  # 等待队列任务处理完毕time.sleep(0.2)# 结束线程for thread in self.thread_list:self.pull_queue.put(None)  # 通过None来停止线程while True:if thread.is_alive() and self.thread_list[thread]:time.sleep(1)else:break# self.thread_list.pop(thread)# for i in range(len(self.thread_list)):#     self.pull_queue.put(None)  # 通过None来停止线程#     self.thread_list.pop()  # 移除成员for ptySession in self.ptySessions:ptySession.destroy()for i in range(len(self.sftp_clientlist)):sftp_client = self.sftp_clientlist.pop()sftp_client.close()for i in range(len(self.ssh_clientlist)):ssh_client = self.ssh_clientlist.pop()ssh_client.close()def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.destroy()def open_pty_session(self, ending_char, timeout=30):"""从当前channel开启一个session并激活@return:"""_session = self.ssh_client.get_transport().open_session(timeout=1 * 3600)  # type:Channel_session.get_pty()_session.invoke_shell()_ptysession = PtySession(_session, ending_char=ending_char, timeout=timeout)self.ptySessions.append(_ptysession)return _ptysessiondef listdir_attr(self, path):"""列出远程目录:param path: 远程目录:return: list of `.SFTPAttributes` objects"""_reconnect = Falsetry:return self.sftp_client.listdir_attr(path)except SSHException as e:traceback.print_exc()_reconnect = Trueexcept OSError as e:traceback.print_exc()_reconnect = Trueif _reconnect:print("重新连接ssh...")self.sftp_client.close()self.ssh_client.close()self.sftp_clientlist.remove(self.sftp_client)self.ssh_clientlist.remove(self.ssh_client)self.ssh_client, self.sftp_client = self._gen_sftp_client(self.host, self.port, self.username, self.pwd)return self.listdir_attr(path)# 一次读取字节长度
recv_len = 1024 * 5class Task(object):def __init__(self, remote_path, local_path, max_size=None, call_bak_fun=None) -> None:super().__init__()self.remote_path = remote_path  # 远程路径self.local_path = local_path  # 本地路径self.max_size = max_size  # 最大大小,当大于这个大小将跳过不做处理self.call_bak_fun = call_bak_fun  # 回调方法 格式:(fn,arg,kw)class PtySession(object):def __init__(self, session, ending_char, timeout=30) -> None:super().__init__()self.session = session  # type:Channelself.last_line = ""self.ending_char = ending_char  # 每执行一个命令之后,标记输出的结束字符self.clear_tail()self.timeout = timeout  # 超时时间,秒def clear_tail(self, _ending_char=None):"""清理输出还处于缓冲区中未读取的流"""if not _ending_char:_ending_char = self.ending_charwhile True:time.sleep(0.2)# self.session.recv_ready()在读取过程中不一定总是True,只有当读取缓冲流中有字节读取时,才会为True。所以在读取头一次后获取下次流到缓冲区中前为Falseif self.session.recv_ready():self.last_line = self.session.recv(recv_len)self.last_line = self.last_line.decode('utf-8')print(self.last_line)if re.search(_ending_char, self.last_line):breakdef destroy(self):"""销毁并关闭session:return::rtype:"""self.clear_tail()self.session.close()def exp(self, *exp_cmds):"""期望并执行,与expect的用法类似。session.exp(("\$", "scp test.txt luckydog@127.0.0.1:~/\r",(("yes/no", "yes\r",("Password:", "luckydog\r")),("Password:", "luckydog\r")))):param exp_cmds: 第一个元素为获取的期望结束字符,第二个元素为需要执行的命令,如果传入的第三个元素,则第三个元素必须为元祖,并且也同父级一样,属递归结构。类似GNU的递归缩写。:type exp_cmds: tuple"""interval = 0.2cur_time = 0.0try:while True:if self.session.recv_ready():self.last_line = self.session.recv(recv_len).decode('utf-8')print(self.last_line)elif self.session.send_ready():for exp_cmd in exp_cmds:_cmd = exp_cmd[1]if not _cmd.endswith("\r"):_cmd += "\r"match = re.search(exp_cmd[0], self.last_line)if match and match.group():self.session.send(_cmd)# 清空最后一行数据缓存,便于下个命令的读取流输出。此行代码去除,会导致无法等待命令执行完毕提前执行后续代码的问题。self.last_line = ""if len(exp_cmd) == 3 and exp_cmd[2]:self.exp(*exp_cmd[2])returncur_time += intervalif cur_time >= self.timeout:raise Exception("timeout...")time.sleep(interval)except Exception as e:traceback.print_exc()# finally:#     self.clear_tail()def send(self, cmd, _ending_char=None):"""单纯的发送命令到目标服务器执行。:param cmd: 命令:type cmd: str"""self.last_line = ""if not cmd.endswith("\r"):cmd += "\r"self.session.send(cmd)self.clear_tail(_ending_char)# -----------------------
class Timer(object):def __init__(self) -> None:super().__init__()self._start_time = Noneself._end_time = Noneself._seconds_delta = Noneself._last_press_time = None  # 最后一个计次时间def start(self):if not self._start_time:self._start_time = datetime.datetime.now()self._last_press_time = self._start_timeelse:raise Exception("timer is already started.")return self._start_timedef end(self):if not self._end_time:self._end_time = datetime.datetime.now()self._seconds_delta = (self._end_time - self._start_time).total_seconds()else:raise Exception("timer is already stopped.")def press(self):if not self._start_time:self._last_press_time = self.start()return 0now = datetime.datetime.now()last_time = self._last_press_timeself._last_press_time = nowreturn (now - last_time).total_seconds()@propertydef last_press_time_delta(self):if not self._last_press_time:self.press()return 0return (datetime.datetime.now() - self._last_press_time).total_seconds()@propertydef start_time(self):return self._start_time@propertydef end_time(self):return self._end_timedef timedelta_seconds(self):if self._seconds_delta:return self._seconds_deltaelse:raise Exception("timer not stopped.")# ===================test=========================
def _file_pull_complete(filename, **kw):print(filename + "文件上传完成")def __example():sftp = SFTPMultipleClient(host="130.16.16.18", port=22, username="baiyang", pwd="xxx")# 创建目录sftp.mkdir("/home/username/123/")# 下载文件到本地(同步执行)sftp.pull_file("/home/username/temp/rumenz.img",  # 远程文件"/Users/username/rumenz.img",  # 本地路径文件,需要带上文件名print_progress=True  # 打印文件拉取进度)# 提交下载远程文件异步任务(异步线程执行不阻塞)sftp.submit_pull_work("/home/username/temp/rumenz.img",  # 远程文件"/Users/username/rumenz.img",  # 本地路径文件,需要带上文件名call_bak_fun=(_file_pull_complete, ("rumenz.img",))  # 文件上传完成回调,第一个元素为方法,第二个是入参)# 列出文件列表files = sftp.listdir_attr("/home/baiyang")for f in files:print(f)# 开启交互回话session = sftp.open_pty_session("\$")# 发送单条命令session.send("cd ~/123")# # 发送单条命令并等待获取_ending_char符号(结束符支持el)session.send("scp test.txt username@130.16.16.133:~/\r", "Password:|yes/no")# # 当最后一行为yes/no 则执行yes,如果是Password则输入密码session.exp(("yes/no", "yes\r",("Password:", "xxx\r")),("Password:", "xxx\r"))# scp方式2'''伪代码if (yes/no):send yesif (Password:)send 密码elif (Password:)send 密码'''session.exp(("\$", "scp test.txt username@130.16.16.133:~/\r",  # 期望$,并发送scp命令(("yes/no", "yes\r", (  # 如果返回yes/no,则发送yes"Password:", "xxx\r")),  # 发送yes后,如果返回Password:结尾,则发送密码("Password:", "xxx\r")  # 如果返回的是Password:结尾,则发送密码)))# 销毁session.destroy()sftp.destroy()# 支持通过with的方式使用SFTPMultipleClient,这样不需要显示的调用 sftp.destroy()with SFTPMultipleClient(host="130.16.16.18", port=22, username="baiyang", pwd="xxx") as sftp:# 列出文件列表files = sftp.listdir_attr("/home/baiyang")for f in files:print(f)if __name__ == '__main__':__example()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/165887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

综合实力盘点高性价比还优质的云服务器:亚马逊云科技仍然领跑市场

如果说云计算是一条流向数字化未来的河流&#xff0c;那亚马逊云科技毫无疑问是航行在最前面的帆船&#xff1b;如果说云计算是一条通往数字化未来的铁轨&#xff0c;那亚马逊云科技就是行驶在最前面的高铁。接下来回首往昔&#xff0c;以史为镜&#xff0c;得出云服务器哪家便…

毛里塔尼亚市场开发攻略,收藏一篇就够了

毛里塔尼亚是非洲西北部的一个国家&#xff0c;也是中国长期援建的一个国家&#xff0c;也是一带一路上的国家。毛里塔尼亚生产生活资料依赖进口&#xff0c;长期依赖跟我们国家的贸易关系也是比较紧密的&#xff0c;今天就来给大家介绍一下毛里塔尼亚的市场开发公路。文章略长…

Python监控服务进程及自启动服务方法与实践

1. 需求概述 当我们在Windows Server环境中部署XX系统的实际应用中&#xff0c;往往会遇到一些运维管理的挑战。为了确保系统的持续稳定运行&#xff0c;特别是在服务程序因各种原因突然关闭的情况下&#xff0c;我们可以借助Python的强大生态系统来构建一个监控与自动重启的管…

分布式链路追踪入门篇-基础原理与快速应用

为什么需要链路追踪&#xff1f; 我们程序员在日常工作中&#xff0c;最常做事情之一就是修bug了。如果程序只是运行在单机上&#xff0c;我们最常用的方式就是在程序上打日志&#xff0c;然后程序运行的过程中将日志输出到文件上&#xff0c;然后我们根据日志去推断程序是哪一…

Comsol Multiphysics 6.2 for Mac建模仿真软件

COMSOL Multiphysics是一款多物理场仿真软件&#xff0c;旨在帮助工程师、科学家和研究人员解决各种复杂的工程和科学问题。该软件使用有限元分析方法&#xff0c;可以模拟和分析多个物理场的相互作用&#xff0c;包括结构力学、热传导、电磁场、流体力学和化学反应等。 COMSOL…

一些好用的前端小插件(转自知乎)

一些好用的前端小插件&#xff08;2&#xff09; 1. cropper.js Cropper.js 2.0 是一系列用于图像裁剪的 Web 组件。 官网地址&#xff1a;https://fengyuanchen.github.io/cropperjs/v2/zh/ 2. Vditor Vditor是一款浏览器端的 Markdown 编辑器&#xff0c;支持所见即所得、…

2024年度投资策略:AI大模型和半导体国产化加速

今天分享的是AI系列深度研究报告&#xff1a;《2024年度投资策略&#xff1a;AI大模型和半导体国产化加速》。 &#xff08;报告出品方&#xff1a;东方证券&#xff09; 报告共计&#xff1a;48页 前言: 行情回顾与未来展望 电子板块涨幅转正&#xff0c;信心逐渐回归。截至…

人人都会Blazor —— 3.3 参数

参数最常见的使用,目的是使组件可以接收动态数据。 声明参数 参数使用 [Parameter] 特性的公共 C# 属性进行定义。 在下面的示例中,内置引用类型 (System.String) 和用户定义的引用类型 (PanelBody) 作为组件参数进行传递。 PanelBody.cs: public class PanelBody {publ…

SQL注入漏洞发现和利用,以及SQL注入的防护

一、背景 SQL注入漏洞是一种常见的软件安全问题&#xff0c;它发生在应用程序的数据库层中。其核心原理是将用户输入的数据当做代码来执行&#xff0c;违反了“数据与代码分离”的原则。具体来说&#xff0c;攻击者通过构造恶意的SQL查询语句&#xff0c;使得应用程序在执行SQ…

Android NFC手机上实现卡模拟

1&#xff0c; 问&#xff1a;能否在AndroidNFC手机上实现卡模拟&#xff1f; 答&#xff1a;在技术上可行&#xff0c;但是&#xff0c;对一般开发人员来讲&#xff0c;目前看来仅仅是技术上可行。 2&#xff0c; 问&#xff1a;具体如何实现呢&#xff1f; 答&#xff1…

git的使用记录

GitHub是公有的远程仓库&#xff0c;Gitlab是私有的远程仓库。 git add file git commit -m "add file" git mv filea fileb git log 显示提交记录 git log --oneline 一行的简略信息显示 git log --oneline --decorate 显示当前指针 git reset --ha…

矩阵知识补充

正交矩阵 定义&#xff1a; 正交矩阵是一种满足 A T A E A^{T}AE ATAE的方阵 正交矩阵具有以下几个重要性质&#xff1a; A的逆等于A的转置&#xff0c;即 A − 1 A T A^{-1}A^{T} A−1AT**A的行列式的绝对值等于1&#xff0c;即 ∣ d e t ( A ) ∣ 1 |det(A)|1 ∣det(A)∣…

通用功能——git 攻略

摘要 本文主要介绍git常用命令的使用方法&#xff0c;同时介绍一些常见问题的处理方法&#xff0c;持续更新中… git命令通用选项 大多数git命令都适用的选项列表如下&#xff1a; -v, --verbose show hash and subject, give twice for upstream branch -q, --quie…

Vim 一下日志文件,Java 进程没了?

一次端口告警&#xff0c;发现 java 进程被异常杀掉&#xff0c;而根因竟然是因为在问题机器上 vim 查看了 nginx 日志。下面我将从时间维度详细回顾这次排查&#xff0c;希望读者在遇到相似问题时有些许启发。 时间线 15:19 收到端口异常 odin 告警。 状态:P1故障 名称:应用端…

黑马点评笔记 redis实现优惠卷秒杀

文章目录 难题全局唯一IDRedis实现全局唯一Id 超卖问题问题解决方案乐观锁问题 一人一单 难题 要解决优惠卷秒杀的问题我们要考虑到三个个问题&#xff0c;全局唯一ID&#xff0c;超卖问题&#xff0c;一人一单。 全局唯一ID 用户抢购时&#xff0c;就会生成订单并保存到同一…

【git】pip install git+https://github.com/xxx/xxx替换成本地下载编译安装解决网络超时问题

目录 &#x1f311;&#x1f311; 背景 &#x1f312; &#x1f312;作用 &#x1f314;&#x1f314; 问题 &#x1f314;&#x1f314;解决方案 &#x1f319;方法一 &#x1f319;方法二 &#x1f31d;&#x1f31d;我的解决方案 整理不易&#xff0c;欢迎一键三连…

7-12 统计投票情况(集合)

7-12 统计投票情况&#xff08;集合&#xff09; 分数 10 作者 python课程组 单位 福州大学至诚学院 利用集合分析活动投票情况。 第一小队有五名队员&#xff0c;序号是1,2,3,4,5&#xff1b;第二小队也有五名队员&#xff0c;序号6,7,8,9,10。 输入一个由得票队员编号组成的…

分布式篇---第三篇

系列文章目录 文章目录 系列文章目录前言一、什么是补偿事务?二、消息队列是怎么实现的?三、那你说说Sagas事务模型前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。…

qgis添加postgis数据

左侧浏览器-PostGIS-右键-新建连接 展开-双击即可呈现 可以点击编辑按钮对矢量数据编辑后是直接入库的&#xff0c;因此谨慎使用。

【DQN】基于pytorch的强化学习算法Demo

目录 简介代码 简介 DQN&#xff08;Deep Q-Network&#xff09;是一种基于深度神经网络的强化学习算法&#xff0c;于2013年由DeepMind提出。它的目标是解决具有离散动作空间的强化学习问题&#xff0c;并在多个任务中取得了令人瞩目的表现。 DQN的核心思想是使用深度神经网…