目录
多线程&多任务介绍
多线程管理
1. 拷贝依赖
2. 使用示例
多任务管理
1. 拷贝依赖
2. 使用示例
多线程&多任务介绍
多线程&多任务通常是指将一个任务或多个任务运行在子线程,并且子线程可以独立启动,或通过线程池启动。子线程通常用于执行以下类型的任务:
- 长时间运行的任务:子线程适合处理那些耗时较长的任务,以避免阻塞主线程的执行。例如,进行复杂的计算、图像处理、视频编解码等任务可以放在子线程中执行,以保持应用程序的响应性。
- 阻塞型任务:如果有一些阻塞型的操作,可能会导致主线程被阻塞,例如进行网络请求、文件读写、数据库查询等。将这些任务放在子线程中执行可以确保主线程的流畅运行,同时避免应用程序的无响应状态。
- 并行处理任务:如果有多个独立的任务需要同时执行,可以将它们分配给多个子线程来实现并行处理。例如,批量下载多个文件、同时进行多个数据处理任务等。
- 异步操作:子线程可以用于执行异步操作,例如在后台处理数据、定时任务、监听外部设备的输入等。这样可以保持应用程序的其他部分正常运行,同时处理异步任务。
需要注意的是,使用子线程需要谨慎处理线程间的数据共享和同步问题,以避免出现竞态条件或其他并发问题。在 PyQt 中,可以使用线程间通信机制(如信号槽机制)来安全地传递数据和操作UI元素。
多线程管理
通常线程有两种启动方式,分别对应不同的使用场景:
- 将任务直接运行在一个独立的线程里
用于执行一次性长期任务(例如:并行任务,多线程下载),长期循环阻塞式接收任务(例如:接收蓝牙、网络、串口消息)
- 将任务运行在一个线程池的线程里
用于执行频繁触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载),因为线程池可以重复利用其内部维护的N个线程,按需创建,减少资源的申请与释放操作。
1. 拷贝依赖
"""
线程管理器,用于管理线程的创建、启动、停止等。
包含两种线程启动方式:
1. start 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
2. start_in_thread_pool 在一个线程池里运行任务,用于执行非定期触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载)其中包含了一个 Worker 类● Worker 类用于创建线程任务
Worker 类继承自 QRunnable 类,用于创建线程任务。● 其中包含了一个 run 方法,用于执行线程任务● 一个 signal_connect 方法,用于连接信号槽● 一个 stop 方法,用于停止线程任务● 一个 emit_msg 方法,用于发送消息。
"""
import inspect
from typing import Callablefrom PyQt5.QtCore import pyqtSignal, QObject, QRunnable, QThread, QThreadPoolclass WorkerSignals(QObject):signal_finished = pyqtSignal()signal_error = pyqtSignal(Exception)signal_result = pyqtSignal(object)signal_msg = pyqtSignal(object)class Worker(QRunnable):def __init__(self, target: Callable, args=None, kwargs=None):super().__init__()self.setAutoDelete(True) # 自动删除,避免内存泄漏self.__func = targetself.__args = args if args else ()self.__kwargs = kwargs if kwargs else {}self.__signals = WorkerSignals()self.is_running = Falseself.worker_thread: WorkerThread = Nonedef run(self):self.is_running = Truetry:# 如果func的第一个参数是Worker类型,则将self作为第一个参数传入if self.__is_worker_func(self.__func):result = self.__func(self, *self.__args, **self.__kwargs)else:# 否则,直接传入参数result = self.__func(*self.__args, **self.__kwargs)self.__signals.signal_result.emit(result)except Exception as e:self.__signals.signal_error.emit(e)finally:self.is_running = Falseself.__signals.signal_finished.emit()def signal_connect(self, msg_handler=None, result_handler=None, finished_handler=None, error_handler=None):if msg_handler:self.__signals.signal_msg.connect(msg_handler)if result_handler:self.__signals.signal_result.connect(result_handler)if finished_handler:self.__signals.signal_finished.connect(finished_handler)if error_handler:self.__signals.signal_error.connect(error_handler)return selfdef stop(self):self.is_running = Falsedef emit_msg(self, msg):self.__signals.signal_msg.emit(msg)def start(self, daemon=True):"""1. 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息):return:"""self.worker_thread = WorkerThread(self)self.worker_thread.daemon = daemonself.worker_thread.start()return self.worker_threaddef start_in_thread_pool(self):"""2. 在一个线程池里运行任务,用于执行非定期的短期任务,避免线程资源浪费(例如:文件写出、网络数据下载):param refresh_worker: 任务"""QThreadPool.globalInstance().start(self)@classmethoddef __is_worker_func(cls, func: Callable):"""判断一个函数是否是worker函数,worker函数的第一个参数必须是Worker类型:param func::return:"""sig = inspect.signature(func)# 判断第一个参数是否是Worker类型,或者参数名是否是workerparam_keys = list(sig.parameters.keys())if len(param_keys) > 0:first_param = sig.parameters[param_keys[0]]if first_param.annotation == Worker:return Trueif first_param.name == "worker":return Truereturn Falseclass WorkerThread(QThread):def __init__(self, worker: Worker):super().__init__()self.__worker = workerdef run(self):self.__worker.run()
2. 使用示例
以下代码分别显示了如下两个应用场景:
- 使用方式1:单线程循环接收消息示例
- 使用方式2:利用线程池异步执行多个独立任务示例
代码如下:
# 使用示例
import sys
import os
import time
import threadingfrom PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QApplication
import requestsfrom qt_worker import Workerdef long_time_recv_task(worker: Worker, title, start):counter = startthread_name = threading.currentThread().namewhile worker.is_running:# 模拟阻塞(等待网络、串口、蓝牙等)time.sleep(1)# 模拟收到消息worker.emit_msg(f"{title} long time task {counter} : {thread_name}")counter += 1if counter >= 110:breakreturn "refresh_worker done: {}".format(counter)@pyqtSlot(object)
def on_result_received(msg):thread_name = threading.currentThread().nameprint(f"result: < {msg} > {thread_name}")def single_recv_thread_test():"""单线程循环接收消息:return:"""worker = Worker(long_time_recv_task, args=("消息接收",), kwargs={"start": 100})worker.signal_connect(msg_handler=lambda msg: print(msg),result_handler=on_result_received,)worker.start()def pic_download_task(url):"""下载图片, 保存到pic目录:param url: 图片地址:return:"""response = requests.get(url)if response.status_code != 200:print("连接图片服务器失败:", response.status_code)returnfile_name = url.split('/')[-1]# 如果pic目录不存在,则创建if not os.path.exists('pic'):os.mkdir('pic')with open(f"pic/{file_name}", 'wb') as f:f.write(response.content)# 返回f的绝对路径return "{} -> {}".format(url, os.path.abspath(f.name))def thread_pool_test():"""利用线程池连续下载多个图片文件:return:"""pics = ["https://www.baidu.com/img/bd_logo1.png","https://c-ssl.duitang.com/uploads/blog/202305/26/EWSwLxqBhV5zZJa.jpg","https://c-ssl.duitang.com/uploads/blog/202305/26/lGSxjBMefx04z33.jpg","https://c-ssl.duitang.com/uploads/blog/202305/26/XxSLogyQCQd9emB.jpg","https://c-ssl.duitang.com/uploads/item/202002/26/20200226215648_yynrr.jpg",]for pic in pics:worker = Worker(pic_download_task, args=(pic,))worker.signal_connect(result_handler=lambda msg: print("保存成功:", msg))worker.start_in_thread_pool()if __name__ == '__main__':app = QApplication(sys.argv)# 使用方式1:单线程循环接收消息示例single_recv_thread_test()# 使用方式2:利用线程池异步执行多个任务示例(下载多个图片文件)thread_pool_test()sys.exit(app.exec_())
多任务管理
指开启一个长期运行的线程,在线程内部,运行一个循环,循环中阻塞式地接收用户发来的任务(定期或非定期),并及时按照用户预定的函数进行执行(通常要消耗一小段时间)。这个多任务管理器应有如下特点:
- 不会阻塞主线程(保障主界面操作顺滑)
- 一旦任务完成,将执行结果返回给任务的发布者。这可以通过回调函数、事件或其他适当的机制来实现。
- 如果任务执行过程中发生异常,需要将异常信息返回给任务的发布者,以便了解并采取适当的处理措施。
- 可以保障任务的执行顺序和发布任务的顺序一致(通过队列保证要求)
- 这个长期任务管理器可以通过调用方法进行停止
1. 拷贝依赖
这段代码定义了一个 TaskWorker
类,该类继承自 QThread
类,并使用队列实现了多个任务的异步执行。
"""
运行一个独立线程,用于执行长期循环发送任务,可以随时执行异步任务,内部维护一个消息队列(例如:发送蓝牙、串口消息)这段代码定义了一个 TaskWorker 类,该类继承自 QThread 类,并使用队列实现了任务的异步执行。"""from queue import Queuefrom PyQt5.QtCore import QThread, pyqtSignalclass TaskWorker(QThread):taskResult = pyqtSignal(object)taskError = pyqtSignal(Exception)taskFinished = pyqtSignal()def __init__(self, do_task, parent=None):super(TaskWorker, self).__init__(parent)self.do_task = do_taskself.task_queue = Queue()self.is_running = Truedef run(self):while self.is_running:# 不断从队列中取出任务并执行,如果没有任务则阻塞task_arg = self.task_queue.get()# 如果取出的任务为 None,且线程已设置为关闭,则退出线程if task_arg is None and not self.is_running:breaktry:result = self.do_task(task_arg)self.taskResult.emit(result)except Exception as e:self.taskError.emit(e)self.taskFinished.emit()def signal_connect(self, result_handler=None, finished_handler=None, error_handler=None):# Connect the worker's signal to the handler slotif result_handler is not None:self.taskResult.connect(result_handler)if finished_handler is not None:self.taskFinished.connect(finished_handler)if error_handler is not None:self.taskError.connect(error_handler)returndef join_queue(self, task):if not self.is_running:returnself.task_queue.put(task)def stop(self):self.is_running = False# Put a None task to the queue to stop the threadself.task_queue.put(None)
2. 使用示例
主程序中创建了一个 TaskWorker
实例,将任务添加到任务队列中,并使用手动/定时器定期添加任务。
当任务完成时,TaskWorker
实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。
代码具体步骤如下:
- 在主程序中定义一个
do_task
函数,该函数用于执行任务。- 在主程序中定义两个槽函数
on_result
和on_error
,分别用于处理任务完成和任务出错的信号。- 创建一个
TaskWorker
实例,并传入任务函数,函数里是任务要执行的内容- 将
TaskWorker
实例的信号连接到槽函数。- 运行
TaskWorker
实例的start
方法启动线程- 运行主程序,通过各种方式把任务及任务参数通过
join_queue
加入队列- 等待多个任务执行完成。
"""
主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用定时器定期添加任务。
当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。代码具体步骤如下:1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。2. 在主程序中定义两个槽函数 on_result 和 on_error,分别用于处理任务完成和任务出错的信号。3. 创建一个 TaskWorker 实例。4. 将 TaskWorker 实例的信号连接到槽函数。5. 将任务添加到任务队列中,并使用定时器定期添加任务。6. 运行主程序,等待任务执行完成。
"""
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import pyqtSlot, QTimer
import sys
import threading
import timefrom qt_task import TaskWorkerdef do_task(task_arg):a, b = task_argrst = a / b# Simulate a time-consuming tasktime.sleep(1) # Pause for 1 secondsreturn f"Task signal_result {a} / {b} = {rst}"@pyqtSlot(object)
def on_result(result):# print(threading.currentThread())print("on result: ", result)@pyqtSlot(Exception)
def on_error(e):# print(threading.currentThread())print("on error: ", e)if __name__ == '__main__':app = QApplication(sys.argv)print("main: ", threading.currentThread())task_worker = TaskWorker(do_task)task_worker.signal_connect(result_handler=on_result,finished_handler=lambda: print("on finished"),error_handler=on_error,)task_worker.start()# Add tasks to the task managertask_worker.join_queue((3, 2))task_worker.join_queue((4, 2))task_worker.join_queue((5, 2))task_worker.join_queue((5, 0))# Use a timer to add tasks periodicallytimer = QTimer()timer.timeout.connect(lambda: task_worker.join_queue((5, 2)))timer.start(3000) # Add a task every 5 seconds# 执行一个10秒后的延时任务QTimer.singleShot(10000, lambda: task_worker.stop())sys.exit(app.exec_())