Python 中整洁的并行输出

原文:https://bernsteinbear.com/blog/python-parallel-output/
代码:https://gist.github.com/tekknolagi/4bee494a6e4483e4d849559ba53d067b

Python 并行输出

使用进程和锁并行输出多个任务的状态。
在这里插入图片描述

注:以下代码在linux下可用,windows下可能要进行修改。

假设你有一个程序,它对列表进行一些处理:

def log(repo_name, *args):print(f"{repo_name}:", *args)def randsleep():import randomimport timetime.sleep(random.randint(1, 5))def func(repo_name):log(repo_name, "Starting")randsleep()  # Can be substituted for actual worklog(repo_name, "Installing")randsleep()log(repo_name, "Building")randsleep()log(repo_name, "Instrumenting")randsleep()log(repo_name, "Running tests")randsleep()log(repo_name, f"Result in {repo_name}.json")repos = ["repoA", "repoB", "repoC", "repoD"]
for repo in repos:func(repo)

这很好。它有效。有点吵,但有效。但随后你发现了一件好事:你的程序是数据并行。也就是说,您可以并行处理:
在这里插入图片描述

import multiprocessing# ...with multiprocessing.Pool() as pool:pool.map(func, repos, chunksize=1)

不幸的是,输出有点笨拙。虽然每行仍然很好输出一个 repo,但它正在左右喷出行,并且这些行是混合的。
在这里插入图片描述

幸运的是,StackOverflow 用户 Leedehai是终端专业用户,知道如何在控制台中一次重写多行。我们可以根据自己的需要调整这个答案:

def fill_output():to_fill = num_lines - len(last_output_per_process)for _ in range(to_fill):print()def clean_up():for _ in range(num_lines):print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole linedef log(repo_name, *args):with terminal_lock:last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)clean_up()sorted_lines = last_output_per_process.items()for repo_name, last_line in sorted_lines:print(f"{repo_name}: {last_line}")fill_output()def func(repo_name):# ...with terminal_lock:del last_output_per_process[repo_name]# ...repos = ["repoA", "repoB", "repoC", "repoD"]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
with multiprocessing.Manager() as manager:last_output_per_process = manager.dict()terminal_lock = manager.Lock()fill_output()with multiprocessing.Pool() as pool:pool.map(func, repos, chunksize=1)clean_up()

在这里插入图片描述

这会将每个项目的状态(一次一行)打印到终端。它将按项目添加到的 last_output_per_process 顺序打印,但您可以通过(例如)按字母数字排序来更改它: sorted(last_output_per_process.items())

请注意,我们必须锁定数据结构和终端输出,以避免事情被破坏;它们在过程之间共享(pickled,via Manager )。

如果日志输出有多行长,或者其他人正在用 stdout / stderr (也许是流浪的 print )搞砸,我不确定这会做什么。如果您发现或有整洁的解决方案,请写信。

这种技术对于任何具有线程和锁的编程语言来说可能是相当可移植的。关键的区别在于这些实现应该使用线程而不是进程;我做进程是因为它是 Python。

最终版

import multiprocessing
import random
import timeclass Logger:def __init__(self, num_lines, last_output_per_process, terminal_lock):self.num_lines = num_linesself.last_output_per_process = last_output_per_processself.terminal_lock = terminal_lockdef fill_output(self):to_fill = self.num_lines - len(self.last_output_per_process)for _ in range(to_fill):print()def clean_up(self):for _ in range(self.num_lines):print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole linedef log(self, repo_name, *args):with self.terminal_lock:self.last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)self.clean_up()sorted_lines = self.last_output_per_process.items()for repo_name, last_line in sorted_lines:print(f"{repo_name}: {last_line}")self.fill_output()def done(self, repo_name):with self.terminal_lock:del self.last_output_per_process[repo_name]class MultiprocessingLogger(Logger):def __init__(self, num_lines, manager):super().__init__(num_lines, manager.dict(), manager.Lock())class FakeLock:def __enter__(self):passdef __exit__(self, exc_type, exc_value, traceback):passclass SingleProcessLogger(Logger):def __init__(self, num_lines):super().__init__(num_lines, {}, FakeLock())def randsleep():time.sleep(random.randint(1, 2) / random.randint(1, 5))def func(repo_name):logger.log(repo_name, "Starting")randsleep()logger.log(repo_name, "Installing")randsleep()logger.log(repo_name, "Building")randsleep()logger.log(repo_name, "Instrumenting")randsleep()logger.log(repo_name, "Running tests")randsleep()logger.log(repo_name, f"Result in {repo_name}.json")randsleep()logger.done(repo_name)def multi_process_demo():ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"repos = [f"repo{letter}" for letter in ascii_uppercase]num_procs = multiprocessing.cpu_count()num_lines = min(len(repos), num_procs)with multiprocessing.Manager() as manager:global loggerlogger = MultiprocessingLogger(num_lines, manager)# Make space for our outputlogger.fill_output()with multiprocessing.Pool(num_procs) as pool:pool.map(func, repos, chunksize=1)logger.clean_up()def single_process_demo():repo = "repoA"num_lines = 1global loggerlogger = SingleProcessLogger(num_lines)logger.fill_output()func(repo)logger.clean_up()if __name__ == "__main__":multi_process_demo()# single_process_demo()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/1033.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

win10 系统怎么开启 guest 账户?

win10 系统怎么开启 guest 账户? 段子手168 前言: guest 账户即所谓的来宾账户,我们可以通过该账户访问计算机,如打印机共享等,但会在一定程度上受到限制。下面分享 WIN10 系统开启 guest 来宾账户的几种方法。 方法…

设备连接IoT云平台指南

一、简介 设备与IoT云间的通讯协议包含了MQTT,LwM2M/CoAP,HTTP/HTTP2,Modbus,OPC-UA,OPC-DA。而我们设备端与云端通讯主要用的协议是MQTT。那么设备端与IoT云间是如何创建通信的呢?以连接华为云IoT平台为例…

SpringBoot集成EasyExcel 3.x:高效实现Excel数据的优雅导入与导出

目录 介绍 快速开始 引入依赖 简单导出 定义实体类 自定义转换器 定义接口 测试接口 复杂导出 自定义注解 定义实体类 数据映射与平铺 自定义单元格合并策略 定义接口 测试接口 一对多导出 自定义单元格合并策略 测试数据 简单导入 定义接口 测试接口 参…

供应链系统搭建|主流电商平台商品采集|一键搬家|订单物流回传API接口

搭建供应链系统时,您可能需要与电商平台进行集成,以实现订单管理、库存同步、物流跟踪等功能。以下是一些常见的电商接口,可以帮助您构建供应链系统: 1. **淘宝开放平台接口**:淘宝开放平台提供了丰富的接口&#xff…

ssh-key关于authorized_keys电脑与linux互相认证

思路: 在A上生成公钥私钥。将公钥拷贝给server B,要重命名成authorized_keys(从英文名就知道含义了)Server A向Server B发送一个连接请求。Server B得到Server A的信息后,在authorized_key中查找,如果有相应的用户名和IP&#xf…

ubuntu 查询mysql的用户名和密码 ubuntu查看username

ubuntu 查询mysql的用户名和密码 ubuntu查看username 文章标签mysqlUbuntu用户名文章分类MySQL数据库 一.基本命令 1.查看Ubuntu版本 $ lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 16.04.5 LTS Release: 16.04 Coden…

Java数据结构-堆和优先级队列

目录 1. 相关概念2. PriorityQueue的实现2.0 搭建整体框架2.1 堆的创建和调整2.2 插入元素2.3 出堆顶元素 3. 全部代码(包含大根堆和小根堆)4. PriorityQueue的使用5. Top-K问题 之前我们学习的二叉树的存储方式是链式存储,(不清楚…

java的各种锁

我们先来看看有什么锁 一、java锁 1、乐观锁 乐观锁 是一种乐观思想 ,假定当前环境是读多写少,遇到并发写的概率比较低,读数 据时认为别的线程不会正在进行修改(所以没有上锁)。写数据时,判断当前 与期望…

算法打卡day48|动态规划篇16| Leetcode 583. 两个字符串的删除操作、72. 编辑距离

算法题 Leetcode 583. 两个字符串的删除操作 题目链接:583. 两个字符串的删除操作 大佬视频讲解:583. 两个字符串的删除操作视频讲解 个人思路 本题和115.不同的子序列相比,变为了两个字符串都可以删除,整体思路是不变的,依旧…

vue3中web前端JS动画案例(二)多物体运动-多值运动

<script setup> import { ref, onMounted, watch } from vue // ----------------------- 01 js 动画介绍--------------------- // 1、匀速运动 // 2、缓动运动&#xff08;常见&#xff09; // 3、透明度运动 // 4、多物体运动 // 5、多值动画// 6、自己的动画框架 // …

在PostgreSQL中如何实现递归查询,例如使用WITH RECURSIVE构建层次结构数据?

文章目录 解决方案使用WITH RECURSIVE进行递归查询示例代码 总结 在PostgreSQL中&#xff0c;递归查询是一种非常强大的工具&#xff0c;它可以用来查询具有层次结构或树形结构的数据。例如&#xff0c;你可能会在员工-经理关系、目录结构或组织结构图中遇到这样的数据。为了处…

MybatisPlus 逻辑删除

目录 一、配置MybatisPlus 二、添加注解 三、调用MybatisPlus的删除方法 四、测试结果 一、配置MybatisPlus # mybatis-plus mybatis-plus:# 全局配置global-config:db-config:# 全局逻辑删除的字段名logic-delete-field: deleted# 逻辑已删除值(默认为 d)logic-delete-va…

为什么科拓停车选择OceanBase来构建智慧停车SaaS应用

本文来自OceanBase的客户——拓客停车的实践分享 科拓停车简介与业务背景 作为智慧停车行业的佼佼者&#xff0c;科拓停车致力于提供全方位的智慧停车解决方案。服务涵盖车场运营管理、互联网智慧停车平台以及停车场增值服务等。通过不断研发创新&#xff0c;打造出了多样化的…

Linux的主机状态

查看系统资源占用 可以通过top命令查看CPU、内存使用情况&#xff0c;类似Windows的任务管理器 默认每5秒刷新一次&#xff0c;语法&#xff1a;直接输入top即可&#xff0c;按q或ctrl c退出 第一行&#xff1a; top&#xff1a;命令名称&#xff0c;14:39:58&#xf…

Sentinel 流控注解使用

大概原理&#xff1a;通过反射解析注解 SentinelResource信息完成调用&#xff0c;处理方法&#xff0c;类似AOP编程 处理方法的返回类型要保持一致&#xff0c;参数和顺序保持一致&#xff0c; 可以在参数列表最后加 com.alibaba.csp.sentinel.slots.block.BlockException; …

什么是时间序列分析

时间序列分析是现代计量经济学的重要内容&#xff0c;广泛应用于经济、商业、社会问题研究中&#xff0c;在指标预测中具有重要地位&#xff0c;是研究统计指标动态特征和周期特征及相关关系的重要方法。 一、基本概念 经济社会现象随着时间的推移留下运行轨迹&#xff0c;按…

现代农业AI智能化升级之路:机器学习在现代农业领域的现状与未来发展

&#x1f9d1; 作者简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

学习笔记-数据结构-线性表(2024-04-18)- 单向链表选择排序

试以单向链表为存储结构实现简单选择排序的算法。 实现递增排序&#xff0c;首先选择一个元素作为第一个比较值&#xff0c;遍历其他所有的元素&#xff0c;如果发现其他元素中有比它小的元素&#xff0c;则交换两个元素&#xff0c;这样每一趟都能找到符合要求的最小值 正经…

展开说说:Android Fragment完全解析-卷一

1、是什么 Fragment 中文意思是碎片&#xff0c;Android 3.0推出的一个系统组件&#xff0c;主打一个在应用界面中可模块化又可重复使用。 Fragment 它很独立&#xff0c;它可以定义和管理自己的布局&#xff0c;具有自己的生命周期&#xff0c;并且可以处理自己的输入事件。…

4.18学习总结

多线程补充 等待唤醒机制 现在有两条线程在运行&#xff0c;其中一条线程可以创造一个特殊的数据供另一条线程使用&#xff0c;但这个数据的创建也有要求&#xff1a;在同一时间只允许有一个这样的特殊数据&#xff0c;那么我们要怎样去完成呢&#xff1f;如果用普通的多线程…