阻塞、非阻塞、同步与异步
阻塞与非阻塞
进程运行的三种状态:运行、就绪、阻塞
阻塞和非阻塞:
阻塞:程序运行时,遇到了IO,程序挂起,cpu被切走.
非阻塞: 程序没有遇到IO,程序遇到IO但是我通过某种手段,让cpu强行运行我的程序.
提交任务的角度:
同步: 提交一个任务,自任务开始运行直到此任务结束(可能有IO),返回一个返回值之后,我在提交下一个任务.
异步:一次提交多个任务,然后我就直接执行下一行代码.
同步调用与异步调用
同步调用:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import osdef task(i):print(f'{os.getpid()}开始任务')time.sleep(random.randint(1,3))print(f'{os.getpid()}任务结束')return i
if __name__ == '__main__':# 同步调用pool = ProcessPoolExecutor()for i in range(10):obj = pool.submit(task,i)# obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能是结束了.# obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务.print(f'任务结果:{obj.result()}') # 进程执行完成后返回结果pool.shutdown(wait=True)# shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后,在执行. 有点类似与join.# shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务.# 一个任务是通过一个函数实现的,任务完成了他的返回值就是函数的返回值.print('===主')
异步调用:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import osdef task(i):print(f'{os.getpid()}开始任务')time.sleep(random.randint(1,3))print(f'{os.getpid()}任务结束')return i
if __name__ == '__main__':# 异步调用pool = ProcessPoolExecutor()for i in range(10):pool.submit(task,i) # 未解决异步调用返回值问题pool.shutdown(wait=True)print('===主')
异步调用获取结果的两种方式:
方式1:统一回收结果: 任务结束后对所有动态对象取结果,获取函数返回值
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import osdef task(i):print(f'{os.getpid()}开始任务')time.sleep(random.randint(1,3))print(f'{os.getpid()}任务结束')return iif __name__ == '__main__':# 异步调用pool = ProcessPoolExecutor()l1 = []for i in range(10):obj = pool.submit(task,i)l1.append(obj)pool.shutdown(wait=True)print(l1)for i in l1:print(i.result())print('===主')
# 结果:
12708开始任务
8632开始任务
1848开始任务
14544开始任务
10704开始任务
18776开始任务
18480开始任务
18548开始任务
13916开始任务
17144开始任务
1848任务结束
14544任务结束
18548任务结束
8632任务结束
10704任务结束
18480任务结束
13916任务结束
17144任务结束
12708任务结束
18776任务结束
[<Future at 0x232b4a377b8 state=finished returned int>, <Future at 0x232b4a82c88 state=finished returned int>, <Future at 0x232b4a82d30 state=finished returned int>, <Future at 0x232b4a82dd8 state=finished returned int>, <Future at 0x232b4a82e80 state=finished returned int>, <Future at 0x232b4a82f28 state=finished returned int>, <Future at 0x232b4a8d048 state=finished returned int>, <Future at 0x232b4a8d128 state=finished returned int>, <Future at 0x232b4a8d208 state=finished returned int>, <Future at 0x232b4a8d2e8 state=finished returned int>]
0
1
2
3
4
5
6
7
8
9
===主
方式2:异步调用+回调函数
异步调用+回调函数
requests模块:
浏览器工作原理: 服务端发送一个请求,服务端验证你的请求,如果正确,给你的浏览器返回一个文件,浏览器接收到文件,将文件里面的代码渲染成你看到的漂亮美丽的模样.
爬虫原理:
1. 利用代码模拟一个浏览器,进行浏览器的工作流程得到一堆源代码.
2. 对源代码进行数据清洗得到我想要数据.
import requests
ret = requests.get('http://www.baidu.com')
if ret.status_code == 200:print(ret.text)
引入回调函数的三个过程:
版本1:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requestsdef task(url):content = requests.get(url)return content.textdef parse(obj):return len(obj.result())if __name__ == '__main__':pool = ThreadPoolExecutor(4)url_list = ['http://www.JD.com','http://www.JD.com', 'https://home.cnblogs.com/u/lifangzheng/','https://wizardforcel.gitbooks.io/gopl-zh/content/ch0/ch0-01.html', 'https://www.pypypy.cn/#/','https://www.liaoxuefeng.com/', 'https://home.cnblogs.com/u/lifangzheng/','https://home.cnblogs.com/u/lifangzheng/', 'https://gitee.com/clover16', 'https://gitee.com/clover16']obj_list = []for url in url_list:obj = pool.submit(task,url)obj_list.append(obj)pool.shutdown(wait=True)for res in obj_list:print(parse(res.result()))
# 版本一的两个缺陷:
# 1. 异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值.(效率低,不能实时的获取结果)
# 2. 分析结果流程是串行,影响效率.
版本2:(基于版本一的第二个缺点改进而来)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requestsdef task(url):content = requests.get(url)return parse(content.text)def parse(obj):return len(obj.result()) # 嵌套函数,并发执行中分析结果,增加了函数的耦合性
# 并发执行任务,每个任务是通过网页获取源码+数据分析,此任务最好是IO阻塞,才能发挥最大的效果
if __name__ == '__main__':pool = ThreadPoolExecutor(4)url_list = ['http://www.JD.com','http://www.JD.com', 'https://home.cnblogs.com/u/lifangzheng/','https://wizardforcel.gitbooks.io/gopl-zh/content/ch0/ch0-01.html', 'https://www.pypypy.cn/#/','https://www.liaoxuefeng.com/', 'https://home.cnblogs.com/u/lifangzheng/','https://home.cnblogs.com/u/lifangzheng/', 'https://gitee.com/clover16', 'https://gitee.com/clover16']obj_list = []for url in url_list:obj = pool.submit(task,url)obj_list.append(obj)pool.shutdown(wait=True)for res in obj_list:print(parse(res.result()))
版本3:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requestsdef task(url):content = requests.get(url)return content.textdef parse(obj):print(len(obj.result()))if __name__ == '__main__':pool = ThreadPoolExecutor(4)url_list = ['http://www.JD.com','http://www.JD.com', 'https://home.cnblogs.com/u/lifangzheng/','https://wizardforcel.gitbooks.io/gopl-zh/content/ch0/ch0-01.html', 'https://www.pypypy.cn/#/','https://www.liaoxuefeng.com/', 'https://home.cnblogs.com/u/lifangzheng/','https://home.cnblogs.com/u/lifangzheng/', 'https://gitee.com/clover16', 'https://gitee.com/clover16']for url in url_list:obj = pool.submit(task,url)obj.add_done_callback(parse) # add_done_callback函数无返回值# 线程发布后,由空闲线程执行回调函数
PS:异步处理IO类型,回调处理非IO类型
异步调用和回调函数的关系?
异步站在发布任务的角度,回调站在接收结果的角度: 回调函数 按顺序接收每个任务的结果,进行下一步处理.
线程池+回调函数和进程池+回调函数的小区别:
进程池+回调: 回调函数由主进程去执行,线程池+回调: 回到函数由空闲的线程去执行.