【13】PyQt多线程多任务管理

目录

多线程&多任务介绍

多线程管理

1. 拷贝依赖

2. 使用示例

多任务管理

1. 拷贝依赖

2. 使用示例


多线程&多任务介绍

多线程&多任务通常是指将一个任务或多个任务运行在子线程,并且子线程可以独立启动,或通过线程池启动。子线程通常用于执行以下类型的任务:

  • 长时间运行的任务:子线程适合处理那些耗时较长的任务,以避免阻塞主线程的执行。例如,进行复杂的计算、图像处理、视频编解码等任务可以放在子线程中执行,以保持应用程序的响应性。
  • 阻塞型任务:如果有一些阻塞型的操作,可能会导致主线程被阻塞,例如进行网络请求、文件读写、数据库查询等。将这些任务放在子线程中执行可以确保主线程的流畅运行,同时避免应用程序的无响应状态。
  • 并行处理任务:如果有多个独立的任务需要同时执行,可以将它们分配给多个子线程来实现并行处理。例如,批量下载多个文件、同时进行多个数据处理任务等。
  • 异步操作:子线程可以用于执行异步操作,例如在后台处理数据、定时任务、监听外部设备的输入等。这样可以保持应用程序的其他部分正常运行,同时处理异步任务。

需要注意的是,使用子线程需要谨慎处理线程间的数据共享和同步问题,以避免出现竞态条件或其他并发问题。在 PyQt 中,可以使用线程间通信机制(如信号槽机制)来安全地传递数据和操作UI元素。

多线程管理

通常线程有两种启动方式,分别对应不同的使用场景:

  • 将任务直接运行在一个独立的线程里

用于执行一次性长期任务(例如:并行任务,多线程下载),长期循环阻塞式接收任务(例如:接收蓝牙、网络、串口消息)

  • 将任务运行在一个线程池的线程里

用于执行频繁触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载),因为线程池可以重复利用其内部维护的N个线程,按需创建,减少资源的申请与释放操作。

1. 拷贝依赖

"""
线程管理器,用于管理线程的创建、启动、停止等。
包含两种线程启动方式:
1. start 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
2. start_in_thread_pool 在一个线程池里运行任务,用于执行非定期触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载)其中包含了一个 Worker 类● Worker 类用于创建线程任务
Worker 类继承自 QRunnable 类,用于创建线程任务。● 其中包含了一个 run 方法,用于执行线程任务● 一个 signal_connect 方法,用于连接信号槽● 一个 stop 方法,用于停止线程任务● 一个 emit_msg 方法,用于发送消息。
"""
import inspect
from typing import Callablefrom PyQt5.QtCore import pyqtSignal, QObject, QRunnable, QThread, QThreadPoolclass WorkerSignals(QObject):signal_finished = pyqtSignal()signal_error = pyqtSignal(Exception)signal_result = pyqtSignal(object)signal_msg = pyqtSignal(object)class Worker(QRunnable):def __init__(self, target: Callable, args=None, kwargs=None):super().__init__()self.setAutoDelete(True)  # 自动删除,避免内存泄漏self.__func = targetself.__args = args if args else ()self.__kwargs = kwargs if kwargs else {}self.__signals = WorkerSignals()self.is_running = Falseself.worker_thread: WorkerThread = Nonedef run(self):self.is_running = Truetry:# 如果func的第一个参数是Worker类型,则将self作为第一个参数传入if self.__is_worker_func(self.__func):result = self.__func(self, *self.__args, **self.__kwargs)else:# 否则,直接传入参数result = self.__func(*self.__args, **self.__kwargs)self.__signals.signal_result.emit(result)except Exception as e:self.__signals.signal_error.emit(e)finally:self.is_running = Falseself.__signals.signal_finished.emit()def signal_connect(self, msg_handler=None, result_handler=None, finished_handler=None, error_handler=None):if msg_handler:self.__signals.signal_msg.connect(msg_handler)if result_handler:self.__signals.signal_result.connect(result_handler)if finished_handler:self.__signals.signal_finished.connect(finished_handler)if error_handler:self.__signals.signal_error.connect(error_handler)return selfdef stop(self):self.is_running = Falsedef emit_msg(self, msg):self.__signals.signal_msg.emit(msg)def start(self, daemon=True):"""1. 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息):return:"""self.worker_thread = WorkerThread(self)self.worker_thread.daemon = daemonself.worker_thread.start()return self.worker_threaddef start_in_thread_pool(self):"""2. 在一个线程池里运行任务,用于执行非定期的短期任务,避免线程资源浪费(例如:文件写出、网络数据下载):param refresh_worker: 任务"""QThreadPool.globalInstance().start(self)@classmethoddef __is_worker_func(cls, func: Callable):"""判断一个函数是否是worker函数,worker函数的第一个参数必须是Worker类型:param func::return:"""sig = inspect.signature(func)# 判断第一个参数是否是Worker类型,或者参数名是否是workerparam_keys = list(sig.parameters.keys())if len(param_keys) > 0:first_param = sig.parameters[param_keys[0]]if first_param.annotation == Worker:return Trueif first_param.name == "worker":return Truereturn Falseclass WorkerThread(QThread):def __init__(self, worker: Worker):super().__init__()self.__worker = workerdef run(self):self.__worker.run()

2. 使用示例

以下代码分别显示了如下两个应用场景:

  • 使用方式1:单线程循环接收消息示例
  • 使用方式2:利用线程池异步执行多个独立任务示例

代码如下:

# 使用示例
import sys
import os
import time
import threadingfrom PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QApplication
import requestsfrom qt_worker import Workerdef long_time_recv_task(worker: Worker, title, start):counter = startthread_name = threading.currentThread().namewhile worker.is_running:# 模拟阻塞(等待网络、串口、蓝牙等)time.sleep(1)# 模拟收到消息worker.emit_msg(f"{title} long time task {counter} : {thread_name}")counter += 1if counter >= 110:breakreturn "refresh_worker done: {}".format(counter)@pyqtSlot(object)
def on_result_received(msg):thread_name = threading.currentThread().nameprint(f"result: < {msg} > {thread_name}")def single_recv_thread_test():"""单线程循环接收消息:return:"""worker = Worker(long_time_recv_task, args=("消息接收",), kwargs={"start": 100})worker.signal_connect(msg_handler=lambda msg: print(msg),result_handler=on_result_received,)worker.start()def pic_download_task(url):"""下载图片, 保存到pic目录:param url: 图片地址:return:"""response = requests.get(url)if response.status_code != 200:print("连接图片服务器失败:", response.status_code)returnfile_name = url.split('/')[-1]# 如果pic目录不存在,则创建if not os.path.exists('pic'):os.mkdir('pic')with open(f"pic/{file_name}", 'wb') as f:f.write(response.content)# 返回f的绝对路径return "{} -> {}".format(url, os.path.abspath(f.name))def thread_pool_test():"""利用线程池连续下载多个图片文件:return:"""pics = ["https://www.baidu.com/img/bd_logo1.png","https://c-ssl.duitang.com/uploads/blog/202305/26/EWSwLxqBhV5zZJa.jpg","https://c-ssl.duitang.com/uploads/blog/202305/26/lGSxjBMefx04z33.jpg","https://c-ssl.duitang.com/uploads/blog/202305/26/XxSLogyQCQd9emB.jpg","https://c-ssl.duitang.com/uploads/item/202002/26/20200226215648_yynrr.jpg",]for pic in pics:worker = Worker(pic_download_task, args=(pic,))worker.signal_connect(result_handler=lambda msg: print("保存成功:", msg))worker.start_in_thread_pool()if __name__ == '__main__':app = QApplication(sys.argv)# 使用方式1:单线程循环接收消息示例single_recv_thread_test()# 使用方式2:利用线程池异步执行多个任务示例(下载多个图片文件)thread_pool_test()sys.exit(app.exec_())

多任务管理

指开启一个长期运行的线程,在线程内部,运行一个循环,循环中阻塞式地接收用户发来的任务(定期或非定期),并及时按照用户预定的函数进行执行(通常要消耗一小段时间)。这个多任务管理器应有如下特点:

  • 不会阻塞主线程(保障主界面操作顺滑)
  • 一旦任务完成,将执行结果返回给任务的发布者。这可以通过回调函数、事件或其他适当的机制来实现。
  • 如果任务执行过程中发生异常,需要将异常信息返回给任务的发布者,以便了解并采取适当的处理措施。
  • 可以保障任务的执行顺序和发布任务的顺序一致(通过队列保证要求)
  • 这个长期任务管理器可以通过调用方法进行停止

1. 拷贝依赖

这段代码定义了一个 TaskWorker 类,该类继承自 QThread 类,并使用队列实现了多个任务的异步执行。

"""
运行一个独立线程,用于执行长期循环发送任务,可以随时执行异步任务,内部维护一个消息队列(例如:发送蓝牙、串口消息)这段代码定义了一个 TaskWorker 类,该类继承自 QThread 类,并使用队列实现了任务的异步执行。"""from queue import Queuefrom PyQt5.QtCore import QThread, pyqtSignalclass TaskWorker(QThread):taskResult = pyqtSignal(object)taskError = pyqtSignal(Exception)taskFinished = pyqtSignal()def __init__(self, do_task, parent=None):super(TaskWorker, self).__init__(parent)self.do_task = do_taskself.task_queue = Queue()self.is_running = Truedef run(self):while self.is_running:# 不断从队列中取出任务并执行,如果没有任务则阻塞task_arg = self.task_queue.get()# 如果取出的任务为 None,且线程已设置为关闭,则退出线程if task_arg is None and not self.is_running:breaktry:result = self.do_task(task_arg)self.taskResult.emit(result)except Exception as e:self.taskError.emit(e)self.taskFinished.emit()def signal_connect(self, result_handler=None, finished_handler=None, error_handler=None):# Connect the worker's signal to the handler slotif result_handler is not None:self.taskResult.connect(result_handler)if finished_handler is not None:self.taskFinished.connect(finished_handler)if error_handler is not None:self.taskError.connect(error_handler)returndef join_queue(self, task):if not self.is_running:returnself.task_queue.put(task)def stop(self):self.is_running = False# Put a None task to the queue to stop the threadself.task_queue.put(None)

2. 使用示例

主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用手动/定时器定期添加任务。
当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。

代码具体步骤如下:

  1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。
  2. 在主程序中定义两个槽函数 on_resulton_error,分别用于处理任务完成和任务出错的信号。
  3. 创建一个 TaskWorker 实例,并传入任务函数,函数里是任务要执行的内容
  4. TaskWorker 实例的信号连接到槽函数。
  5. 运行 TaskWorker 实例的start方法启动线程
  6. 运行主程序,通过各种方式把任务及任务参数通过join_queue加入队列
  7. 等待多个任务执行完成。
"""
主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用定时器定期添加任务。
当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。代码具体步骤如下:1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。2. 在主程序中定义两个槽函数 on_result 和 on_error,分别用于处理任务完成和任务出错的信号。3. 创建一个 TaskWorker 实例。4. 将 TaskWorker 实例的信号连接到槽函数。5. 将任务添加到任务队列中,并使用定时器定期添加任务。6. 运行主程序,等待任务执行完成。
"""
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import pyqtSlot, QTimer
import sys
import threading
import timefrom qt_task import TaskWorkerdef do_task(task_arg):a, b = task_argrst = a / b# Simulate a time-consuming tasktime.sleep(1)  # Pause for 1 secondsreturn f"Task signal_result {a} / {b} = {rst}"@pyqtSlot(object)
def on_result(result):# print(threading.currentThread())print("on result: ", result)@pyqtSlot(Exception)
def on_error(e):# print(threading.currentThread())print("on error: ", e)if __name__ == '__main__':app = QApplication(sys.argv)print("main: ", threading.currentThread())task_worker = TaskWorker(do_task)task_worker.signal_connect(result_handler=on_result,finished_handler=lambda: print("on finished"),error_handler=on_error,)task_worker.start()# Add tasks to the task managertask_worker.join_queue((3, 2))task_worker.join_queue((4, 2))task_worker.join_queue((5, 2))task_worker.join_queue((5, 0))# Use a timer to add tasks periodicallytimer = QTimer()timer.timeout.connect(lambda: task_worker.join_queue((5, 2)))timer.start(3000)  # Add a task every 5 seconds# 执行一个10秒后的延时任务QTimer.singleShot(10000, lambda: task_worker.stop())sys.exit(app.exec_())

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/202560.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度探索 Python Pyramid 框架

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com Pyramid是一个灵活且强大的Python web框架&#xff0c;广泛用于构建各种规模的Web应用程序。本文将深度探索Pyramid框架&#xff0c;介绍其核心概念、应用场景以及一些高级特性。 安装与基础用法 首先&#xf…

JS学习--类型转换

函数转换 parseInt() 转换之前&#xff0c;首先会分析该字符串。判断位置为0处的字符串&#xff0c;判断是否为有效数字&#xff0c;若否&#xff0c;直接返回NaN&#xff0c;不再继续&#xff1b; 若是&#xff0c;继续打印直到不为数字的地方停止 parseFloat() 转换之前&…

linux日志优先级

7种日志级别代号0-7 0 debug #有调试信息的&#xff0c;日志信息最多 1 info #一般信息的日志&#xff0c;最常用 2 notice #最具有重要性的普通条件的信息 常见 3 warning #警告级别 常见 4 …

探索鸿蒙 DevEcoStudio汉化+运行报错

在下载好软件&#xff0c;摸索着成功创建了一个项目的时候&#xff0c;点击运行&#xff0c;竟然失败了。而且一大堆的英文也不知道从何入手&#xff0c;从网上搜了一下&#xff0c;找到了汉化的办法&#xff0c;并且解决了问题。我这里走的是Mac的步骤&#xff0c;微软的其实一…

ReadWriteLock 和 StampedLock 的比较与解析

在多线程编程中&#xff0c;我们经常需要使用锁来保证同一时刻只有一个线程能够访问共享资源。Java提供了多种锁的实现&#xff0c;如ReentrantLock、ReadWriteLock、StampedLock等。本文将对ReadWriteLock和StampedLock进行比较&#xff0c;分析它们的原理、优缺点&#xff0c…

SpringBoot3-实现和注册拦截器

1、pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…

安卓https抓包(提供软件+视频)

遇到的问题 因为android7.0以上机制不在信任用户证书&#xff0c;导致https协议无法抓包&#xff0c;除非把证书装在系统信任的证书里&#xff0c;此时手机需要root权限&#xff0c;但是如何不需要手机root也能抓包成功呢&#xff1f;我们采用virtualxposedjusttrustme来解决&…

图灵测试:人工智能的终极挑战

图灵测试&#xff1a;人工智能的终极挑战 一、引言 在人工智能的发展历程中&#xff0c;图灵测试一直被视为一个重要的里程碑。这个由英国计算机科学家艾伦图灵提出的实验&#xff0c;旨在评估人工智能是否能够像人一样思考和表达&#xff0c;为人类与机器智能之间的界限设立了…

Chrome浏览器调整搜索标签页按钮位置

地址栏输入 chrome://flags 搜索 chrome-refresh-2023 第一项 修改为Enabled 标签搜索页按钮出现在chrome的左上角 修改为Default 标签搜索页按钮出现在chrome的右上角 修改完成后&#xff0c;点击Relaunch&#xff0c;重启浏览器&#xff0c;修改生效。

Python+Appium自动化测试大法,让你的测试效率飞升,绝不等待!封装元素定位方法超详解!

在appium自动化测试脚本运行的过程中&#xff0c;因为网络不稳定、测试机或模拟器卡顿等原因&#xff0c;有时候会出现页面元素加载超时元素定位失败的情况&#xff0c;但实际这又不是bug&#xff0c;只是元素加载较慢&#xff0c;这个时候我们就会使用元素等待的方法来避免这种…

包与字符串

包是分类管理的需要&#xff0c;建立包用:package&#xff0c;包中类的引用import 学习使用javaAPI中的字符串类String&#xff0c;学会其成员方法的使用 &#xff08;必看&#xff09;eclipse包的分层等级结构设置 因为eclipse的包的结构默认是平行等级的&#xff0c;所以要手…

python多进程和多线程

在知道和使用多进程和多线程之前 需要知道&#xff0c;进程是什么线程是什么&#xff1f; 进程&#xff1a;是计算机中正在运行的程序的实例。它是操作系统调度和管理的基本单位&#xff0c;包含程序代码、数据和执行状态等信息。*每个进程都有自己的内存空间和资源&#xff0…

观察者模式来啦

观察者模式本质上就两个关键的操作&#xff0c;观察者关心自己订阅的主题&#xff0c;主题数据有变化需要通知所有的观察者。 举个&#x1f330; 在武侠甚至玄幻小说中&#xff0c;天人合一一直都是一个重要的&#xff0c;无法缺失的概念。天人合一一直以来都是天骄的专属名词…

Java不可变集合

Java不可变集合 不可变集合&#xff1a;也就是不可以被修改的集合 创建不可变集合的应用场景 ●如果某个数据不能被修改&#xff0c;把它防御性地拷贝到不可变集合中是个很好的实践。 ●当集合对象被不可信的库调用时&#xff0c;不可变形式是安全的。 简单理解&#xff1…

ArcGIS Enterprise on Kubernetes 11.1安装示例

博客主页&#xff1a;https://tomcat.blog.csdn.net 博主昵称&#xff1a;农民工老王 主要领域&#xff1a;Java、Linux、K8S 期待大家的关注&#x1f496;点赞&#x1f44d;收藏⭐留言&#x1f4ac; 目录 安装前置条件基本安装解压文件生成秘钥执行安装脚本 配置DNS方法一方法…

【Linux 进度条小程序】缓冲区+回车换行

文章目录 回车与换行缓冲区举个栗子fflush函数倒计时小程序进度条小程序 回车与换行 回车和换行是不同的两个概念 回车&#xff1a;\r 使光标回到本行行首。 换行&#xff1a;\n使光标下移一格。 一般我们的键盘上的Enter键是回加换行键 在c语言中 \n 表示回车换行 效果和Ent…

代码随想Day24 | 回溯法模板、77. 组合

理论基础 回溯法和递归不可分割&#xff0c;回溯法是一种穷举的方法&#xff0c;通常需要剪枝来降低复杂度。回溯法有一个选择并退回的过程&#xff0c;可以抽象为树结构&#xff0c;回溯法的模板如下&#xff1a; void backtracking(参数) {if (终止条件) {存放结果;return;}…

Java动态代理实现与原理详细分析

Java动态代理实现与原理详细分析 关于Java中的动态代理&#xff0c;我们首先需要了解的是一种常用的设计模式–代理模式&#xff0c;而对于代理&#xff0c;根据创建代理类的 时间点&#xff0c;又可以分为静态代理和动态代理。 1、代理模式 代理模式是常用的java设计模式&…

graph neural network 和 geometric neural network

一、graph neural network 处理图结构数据。 学习图中节点和边的表示。 任务&#xff1a;节点分类、图分类、链接预测。 核心思想&#xff1a;通过迭代地聚合节点的邻居信息来更新每个节点的表示&#xff0c;从而捕捉图结构中的局部和全局信息。 二、geometric neural netw…

从cot到agent的survey视频笔记

参考视频&#xff1a; 从CoT到Agent的列车即将发车&#xff0c;请各位旅客尽快上车 姚杳 由于总结不易&#xff0c;所以暂时都是粉丝可见&#xff0c;如果总结的不好见谅。 核心理解点总结&#xff1a; paradigm shifts of cot when cot&#xff1f;推理多的任务时 how cot…