文章目录
- @[toc]
- 程序与进程的区别与联系
- 同步任务
- 示例
- 并行任务
- 示例
- 进程调度的“随机性”
- 进程属性与方法
- process_object.start()方法
- process_object.join()方法
- process_object.daemon属性
- 没有设置守护进程的情况
- 设置守护进程的情况
- process_object.current_process()方法
- 进程通信
- 队列的使用
- put()方法与full()方法
- get()方法与empty()方法
- put_nowait()方法
- get_nowait()方法
- task_done()方法与join()方法
- 队列内置计数器的值不为0
- 队列内置计数器的值为0
- 使用队列完成进程通信
- 进程池
- multiprocessing.Pool
- ProcessPoolExecutor API
- 进程池的创建
- done()方法
- cancel()方法
- result()方法
- as_completed()方法
- map()方法
- wait()方法
- 自定义进程类
- 示例
文章目录
- @[toc]
- 程序与进程的区别与联系
- 同步任务
- 示例
- 并行任务
- 示例
- 进程调度的“随机性”
- 进程属性与方法
- process_object.start()方法
- process_object.join()方法
- process_object.daemon属性
- 没有设置守护进程的情况
- 设置守护进程的情况
- process_object.current_process()方法
- 进程通信
- 队列的使用
- put()方法与full()方法
- get()方法与empty()方法
- put_nowait()方法
- get_nowait()方法
- task_done()方法与join()方法
- 队列内置计数器的值不为0
- 队列内置计数器的值为0
- 使用队列完成进程通信
- 进程池
- multiprocessing.Pool
- ProcessPoolExecutor API
- 进程池的创建
- done()方法
- cancel()方法
- result()方法
- as_completed()方法
- map()方法
- wait()方法
- 自定义进程类
- 示例
个人主页:丷从心·
系列专栏:Python基础
学习指南:Python学习指南
程序与进程的区别与联系
- 程序是指存储在磁盘或其他存储介质上的一组指令或代码,是静态文件
- 进程是程序的执行实例,是一个动态的实体,具有独立的内存空间和系统资源
同步任务
- 我们在此之前编写的代码都是同步代码,代码从上到下按顺序执行,如果前一个任务没有完成,那么不能运行之后的任务
示例
import timedef work_1():print('任务1...')time.sleep(2)def work_2():print('任务2...')time.sleep(2)start = time.time()work_1()
work_2()end = time.time()print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 4.020323038101196 s
- 可以看到整个程序用时 4 4 4秒,
work_2()
需要等待work_1()
运行结束后才能运行
并行任务
- 使用进程来运行上面的代码,能够优化运行时间
示例
import time
import multiprocessingdef work_1():print('任务1...')time.sleep(2)def work_2():print('任务2...')time.sleep(2)# Windows 操作系统下进程任务必须拥有 main 入口
if __name__ == '__main__':# 通过 Process 类创建进程对象, 并使用 target 绑定进程对象要运行的任务p1 = multiprocessing.Process(target=work_1)p2 = multiprocessing.Process(target=work_2)start = time.time()# 运行进程p1.start()p2.start()p1.join()p2.join()end = time.time()print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 2.6914994716644287 s
- 可以看到整个程序用时约 3 3 3秒,
work_1()
和work_2
并行运行
进程调度的“随机性”
- 下面的示例可以看到操作系统调度进程时的“随机性”
import time
import threadingdef work_1():for i in range(5):print('任务1...')time.sleep(2)def work_2():for i in range(5):print('任务2...')time.sleep(2)t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)t1.start()
t2.start()
任务2...
任务1...
任务1...
任务2...
任务2...
任务1...
任务1...
任务2...
任务2...
任务1...
- 可以看到任务 1 1 1和任务 2 2 2的调度顺序是我们无法确定的,是由操作系统的调度算法决定的
进程属性与方法
- 在学习进程方法之前,我们需要知道
Python
程序是如何被运行的- 一个
Python
文件被解释器运行时会在操作系统中创建一个进程 - 然后该进程会创建一个线程来运行文件中的代码,这个程序最初创建的线程称为主线程
- 当主线程运行到
p = multiprocessing.Process()
时会创建一个新的进程,称为子进程 - 主进程与子进程由操作系统进行调度,并发地运行,具体如何调度进程由操作系统的调度算法决定
- 子进程在运行时,主进程不会等待子进程,而是继续向下执行,直到执行到文件末尾没有代码时,主进程会等待子进程运行结束后再退出
- 一个
process_object.start()方法
-
p = multiprocessing.Process()
只是创建了一个进程,并不会运行进程代码 -
p.start()
使进程p
达到就绪状态,等待操作系统进行调度,具体何时调度由操作系统决定 -
以上面的并行任务的代码为例,先注释掉
p1.join()
和p2.join()
import time
import multiprocessingdef work_1():print('任务1...')time.sleep(2)def work_2():print('任务2...')time.sleep(2)# Windows 操作系统下进程任务必须拥有 main 入口
if __name__ == '__main__':# 通过 Process 类创建进程对象, 并使用 target 绑定进程对象要运行的任务p1 = multiprocessing.Process(target=work_1)p2 = multiprocessing.Process(target=work_2)start = time.time()# 运行进程p1.start()p2.start()# p1.join()# p2.join()end = time.time()print(f'总共用时: {end - start} s')
总共用时: 0.012919902801513672 s
任务1...
任务2...
- 可以看到主进程没有等待子进程,而是继续向下执行
- 当执行到
end = time.time()
时,此时end
记录的时间是主进程运行到这行代码的时间 - 之后运行
print(f'总共用时: {end - start} s')
,输出时间0.012919902801513672 s
,此时执行到了文件末尾没有其他代码,主进程会等待子进程运行结束后再退出 - 为了能正确记录进程运行的时间,我们需要让主进程等待子进程
process_object.join()方法
p.join()
使主进程等待子进程,子进程任务执行结束后主进程再继续向下执行- 仍然以上面的并行任务的代码为例,取消注释
p1.join()
和p2.join()
import time
import multiprocessingdef work_1():print('任务1...')time.sleep(2)def work_2():print('任务2...')time.sleep(2)# Windows 操作系统下进程任务必须拥有 main 入口
if __name__ == '__main__':# 通过 Process 类创建进程对象, 并使用 target 绑定进程对象要运行的任务p1 = multiprocessing.Process(target=work_1)p2 = multiprocessing.Process(target=work_2)start = time.time()# 运行进程p1.start()p2.start()p1.join()p2.join()end = time.time()print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 2.6913790702819824 s
- 可以看到主进程等待子进程运行结束后才继续向下执行,正确记录了子进程运行的时间
process_object.daemon属性
- 设置守护进程,需要在进程启动之前进行设置
- 如果一个进程是守护进程,那么主进程运行到文件末尾后不论子进程任务是否结束都会自动退出
没有设置守护进程的情况
import time
import multiprocessingdef work():for i in range(5):print(i)time.sleep(1)if __name__ == '__main__':p = multiprocessing.Process(target=work)# p.daemon = Truep.start()print('主进程即将退出...')
主进程即将退出...
0
1
2
3
4
设置守护进程的情况
import time
import multiprocessingdef work():for i in range(5):print(i)time.sleep(1)if __name__ == '__main__':p = multiprocessing.Process(target=work)p.daemon = Truep.start()print('主进程即将退出...')
主进程即将退出...
- 可以看到并没有输出 0 0 0、 1 1 1、 2 2 2、 3 3 3、 4 4 4,主进程就退出了
process_object.current_process()方法
process_object.current_process()
方法用于获取当前进程对象的引用- 可以用来获取进程名称和进程号
import multiprocessing
import osdef work():pid = multiprocessing.current_process().pidppid = os.getppid()name = multiprocessing.current_process().nameprint(f'pid: {pid},\t ppid: {ppid}, name: {name}')if __name__ == '__main__':for i in range(5):t = multiprocessing.Process(target=work)t.name = f'进程-{i}'t.start()t.join()
pid: 16464, ppid: 23240, name: 进程-0
pid: 21140, ppid: 23240, name: 进程-1
pid: 7884, ppid: 23240, name: 进程-2
pid: 19412, ppid: 23240, name: 进程-3
pid: 6196, ppid: 23240, name: 进程-4
os.getppid()
方法用于获取进程的父进程号
进程通信
队列的使用
put()方法与full()方法
put()
方法用于将数据上传到队列full()
方法用于判断队列是否已满
from queue import Queuequeue = Queue(3) # 设置队列最大容量为 3queue.put(1)
queue.put(2)
queue.put(3)print(queue.full())queue.put(4) # 因为队列已满, 所以会导致阻塞, 直到队列重新有空闲空间时解阻塞
True
- 当队列中已满时,使用
put()
方法会阻塞,直到队列重新重新有空闲空间时解阻塞
get()方法与empty()方法
get()
方法用于从队列中读取数据empty()
方法用于判断队列是否为空
from queue import Queuequeue = Queue()queue.put(1)
queue.put(2)
queue.put(3)print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get()) # 因为队列为空, 所以会导致阻塞, 直到队列重新有数据时解阻塞
1
2
3
- 当队列中为空时,使用
get()
方法会阻塞,直到队列重新有数据时解阻塞
put_nowait()方法
put_nowait()
方法用于向队列中上传数据,如果队列已满,则抛出异常
from queue import Queuequeue = Queue(3) # 设置队列最大容量为 3queue.put(1)
queue.put(2)
queue.put(3)queue.put_nowait(4) # 如果队列已满, 则抛出异常
Traceback (most recent call last):File "C:/Users/FOLLOW_MY_HEART/Desktop/Python Basics/【Python基础】进程/test.py", line 9, in <module>queue.put_nowait(4) # 如果队列已满, 则抛出异常File "D:\Environment\anaconda\envs\py310\lib\queue.py", line 191, in put_nowaitreturn self.put(item, block=False)File "D:\Environment\anaconda\envs\py310\lib\queue.py", line 137, in putraise Full
queue.Full
get_nowait()方法
get_nowait()
方法用于从队列中读取数据,如果队列为空,则抛出异常
from queue import Queuequeue = Queue()queue.put(1)
queue.put(2)
queue.put(3)print(queue.get())
print(queue.get())
print(queue.get())
print(queue.get_nowait()) # 如果队列为空, 则抛出异常
1
2
3
Traceback (most recent call last):File "C:/Users/FOLLOW_MY_HEART/Desktop/Python Basics/【Python基础】进程/test.py", line 12, in <module>print(queue.get_nowait()) # 如果队列为空, 则抛出异常File "D:\Environment\anaconda\envs\py310\lib\queue.py", line 199, in get_nowaitreturn self.get(block=False)File "D:\Environment\anaconda\envs\py310\lib\queue.py", line 168, in getraise Empty
_queue.Empty
task_done()方法与join()方法
-
queue.get()
会使队列内置计数器的值加 1 1 1,而queue.get()
不会使队列内置计数器的值减 1 1 1 -
task_done()
方法用于将队列内置计数器的值减 1 1 1 -
join()
方法用于判断队列内置计数器的值是否为 0 0 0,如果不为 0 0 0则会阻塞,直到队列内置计数器的值为 0 0 0时解阻塞
队列内置计数器的值不为0
from queue import Queuequeue = Queue()
queue.put(1)queue.get()queue.join()print('不能运行到这里...')
队列内置计数器的值为0
from queue import Queuequeue = Queue()
queue.put(1)queue.get()
queue.task_done()queue.join()print('能够运行到这里...')
能够运行到这里...
使用队列完成进程通信
from multiprocessing import Process, Queue, Event# 向队列中写入数据
def write(queue, event):for num in range(10):queue.put(num)event.set()print('正在上传...')queue.put('END')event.set()# 从队列中读取数据
def read(queue, event):while True:event.wait()num = queue.get()if num == 'END':breakprint(f'从队列中读取到的数据为: {num}')if __name__ == '__main__':# 创建队列对象queue = Queue()event = Event()p_write = Process(target=write, args=(queue, event))p_read = Process(target=read, args=(queue, event))p_write.start()p_read.start()p_write.join()p_read.join()print('任务完成...')
正在上传...
从队列中读取到的数据为: 0
正在上传...
从队列中读取到的数据为: 1
正在上传...
正在上传...
正在上传...
从队列中读取到的数据为: 2
正在上传...
从队列中读取到的数据为: 3
从队列中读取到的数据为: 4
从队列中读取到的数据为: 5
正在上传...
正在上传...
正在上传...
从队列中读取到的数据为: 6
从队列中读取到的数据为: 7
从队列中读取到的数据为: 8
正在上传...
从队列中读取到的数据为: 9
任务完成...
进程池
- 进程对象的创建需要时间,在需要创建大量进程对象的时候会发生性能下降的情况
- 使用进程池会创建出一定数量的进程对象,并且进程在执行完任务后不会被操作系统销毁,这样下一个任务可以重复使用之前创建的这些进程对象
multiprocessing.Pool
import time
import os
from multiprocessing import Pooldef get_html(page):time.sleep(1)pid = os.getpid()print(f'pid: {pid},\t Successfully obtained page {page}...')return pageif __name__ == '__main__':# 创建进程池对象pool = Pool(3)# 通过 apply_async 提交需要异步执行的函数到进程池中for i in range(10):pool.apply_async(get_html, args=(i,))pool.close() # 关闭进程池提交pool.join()
pid: 20516, Successfully obtained page 1...
pid: 23224, Successfully obtained page 0...
pid: 15036, Successfully obtained page 2...
pid: 23224, Successfully obtained page 4...
pid: 20516, Successfully obtained page 3...
pid: 15036, Successfully obtained page 5...
pid: 20516, Successfully obtained page 7...
pid: 23224, Successfully obtained page 6...
pid: 15036, Successfully obtained page 8...
pid: 20516, Successfully obtained page 9...
ProcessPoolExecutor API
进程池的创建
import time
from concurrent.futures import ProcessPoolExecutordef get_html(page):print(f'Successfully obtained page {page}...')time.sleep(1)return pageif __name__ == '__main__':# 创建进程池对象executor = ProcessPoolExecutor(max_workers=2)# 通过 submit 提交需要执行的函数到进程池中task_1 = executor.submit(get_html, 1)task_2 = executor.submit(get_html, 2)
Successfully obtained page 1...
Successfully obtained page 2...
done()方法
done()
方法用于判断某个任务是否完成
import time
from concurrent.futures import ProcessPoolExecutordef get_html(page):print(f'Successfully obtained page {page}...')time.sleep(1)return pageif __name__ == '__main__':# 创建进程池对象executor = ProcessPoolExecutor(max_workers=2)# 通过 submit 提交需要执行的函数到进程池中task_1 = executor.submit(get_html, 1)task_2 = executor.submit(get_html, 2)# done() 方法用于判断某个任务是否完成print(f'task_1 完成情况: {task_1.done()}')
task_1 完成情况: False
Successfully obtained page 1...
Successfully obtained page 2...
cancel()方法
cancel()
方法用于取消未运行的任务,已经运行的任务无法被取消
import time
from concurrent.futures import ProcessPoolExecutordef get_html(page):print(f'Successfully obtained page {page}...')time.sleep(1)return pageif __name__ == '__main__':# 创建进程池对象executor = ProcessPoolExecutor(max_workers=1)# 通过 submit 提交需要执行的函数到进程池中task_1 = executor.submit(get_html, 1)task_2 = executor.submit(get_html, 2)# cancel() 方法用于取消未运行的任务, 已经运行的任务无法被取消print(f'task_2 任务取消: {task_2.cancel()}')
task_2 任务取消: True
Successfully obtained page 1...
- 通过将
max_workers
的值修改为 1 1 1,使得task_2
未能运行时就被取消
result()方法
submit()
方法的返回值是一个future
对象- 通过对
future
对象调用result()
方法可以获取任务的返回值
import time
from concurrent.futures import ProcessPoolExecutordef get_html(page):print(f'Successfully obtained page {page}...')time.sleep(1)return pageif __name__ == '__main__':# 创建进程池对象executor = ProcessPoolExecutor(max_workers=2)# 通过 submit 提交需要执行的函数到进程池中task_1 = executor.submit(get_html, 1)task_2 = executor.submit(get_html, 2)# 通过对 future 对象调用 result() 方法获取任务的返回值print(f'task_1 返回结果: {task_1.result()}')print(f'task_2 返回结果: {task_2.result()}')
Successfully obtained page 1...
Successfully obtained page 2...
task_1 返回结果: 1
task_2 返回结果: 2
as_completed()方法
as_completed()
方法用于获取已经执行成功的任务的返回值
import time
from concurrent.futures import ProcessPoolExecutor, as_completeddef get_html(page):print(f'Successfully obtained page {page}...')time.sleep(1)return pageif __name__ == '__main__':# 创建进程池对象executor = ProcessPoolExecutor(max_workers=2)page_list = [1, 2, 3, 4]# 批量提交任务并获取已经执行成功的任务的返回值all_tasks = [executor.submit(get_html, page) for page in page_list]# 只要进程任务执行完就能获取到返回值, 完成一个任务获取一个任务的返回值for future in as_completed(all_tasks):data = future.result()print(f'Get data {data}')
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Get data 1
Successfully obtained page 4...
Get data 2
Get data 3
Get data 4
map()方法
map()
方法用于提交任务并获取已经执行成功的任务的返回值
import time
from concurrent.futures import ProcessPoolExecutordef get_html(page):print(f'Successfully obtained page {page}...')time.sleep(1)return pageif __name__ == '__main__':# 创建进程池对象executor = ProcessPoolExecutor(max_workers=2)page_list = [1, 2, 3, 4]# map() 方法用于提交任务并获取已经执行成功的任务的返回值for data in executor.map(get_html, page_list):print(f'Get data {data}') # 打印的返回值顺序与列表顺序一致
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Successfully obtained page 4...
Get data 1
Get data 2
Get data 3
Get data 4
wait()方法
wait()
方法用于使主进程堵塞,直到指定任务完成后,主进程才解堵塞
import time
from concurrent.futures import ProcessPoolExecutor, waitdef get_html(page):print(f'Successfully obtained page {page}...')time.sleep(1)return pageif __name__ == '__main__':# 创建进程池对象executor = ProcessPoolExecutor(max_workers=2)page_list = [1, 2, 3, 4]# 批量提交任务并获取已经执行成功的任务的返回值all_tasks = [executor.submit(get_html, page) for page in page_list]# wait() 方法用于使主进程堵塞, 直到指定任务完成后, 主进程才解堵塞wait(all_tasks)print('主进程解堵塞, 执行剩余代码...')
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Successfully obtained page 4...
主进程解堵塞, 执行剩余代码...
自定义进程类
- 自定义进程类需要继承
Process
类 - 需要重写
Process
类中的run()
方法,用于运行进程任务
示例
import requests
import multiprocessingclass ProcessSpider(multiprocessing.Process):def __init__(self, url):super().__init__()self.url = urldef run(self):response = requests.get(self.url).contentfile_name = self.url.split('/')[-1]with open(file_name, 'wb') as f:f.write(response)print('下载完成...')if __name__ == '__main__':url_list = ['http://pic.bizhi360.com/bbpic/98/10798.jpg','http://pic.bizhi360.com/bbpic/92/10792.jpg','http://pic.bizhi360.com/bbpic/86/10386.jpg']for url in url_list:process_spider = ProcessSpider(url)process_spider.start()
下载完成...
下载完成...
下载完成...