【进阶】 --- 多线程、多进程、异步IO实用例子:https://blog.csdn.net/lu8000/article/details/82315576
python之爬虫_并发(串行、多线程、多进程、异步IO):https://www.cnblogs.com/fat39/archive/2004/01/13/9044474.html
Python 并发总结,多线程,多进程,异步IO:https://www.cnblogs.com/junmoxiao/p/11948993.html
asyncio --- 异步 I/O 官方文档:https://docs.python.org/zh-cn/3.10/library/asyncio.html
关于asyncio异步io并发编程:https://zhuanlan.zhihu.com/p/158641367
支持 asyncio 的异步Python库:https://github.com/aio-libs
知乎专栏: https://zhuanlan.zhihu.com/zarten 之 Python中协程异步IO(asyncio)详解( https://zhuanlan.zhihu.com/p/59621713 )
asyncio:异步I/O、事件循环和并发工具:https://www.cnblogs.com/sidianok/p/12210857.html
在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。以下代码默认运行环境为 python3。
- httpie:HTTPie 使用详解:https://zhuanlan.zhihu.com/p/45093545
- grequests,Requests + Gevent,访问:https://github.com/kennethreitz/grequests
- gevent,一个高并发的网络性能库,访问:http://www.gevent.org/
- twisted,基于事件驱动的网络引擎框架。访问:https://twistedmatrix.com/trac/
一、多线程、多进程
1.同步执行
2.多线程执行
3.多线程 + 回调函数执行
4.多进程执行
5.多进程 + 回调函数执行
二、异步
1.asyncio 示例 1
asyncio 示例 2
python 异步编程之 asyncio(百万并发)
学习 python 高并发模块 asynio
2.asyncio + aiohttp
3.asyncio + requests
4.gevent + requests
5.grequests
6.Twisted示例
7.Tornado
8.Twisted更多
9.史上最牛逼的异步 IO 模块
一、多线程、多进程
1. 同步执行
示例 1( 同步执行 ):
import requests
import time
from lxml import etreeurls = ['https://blog.csdn.net/Jmilk/article/details/103218919','https://blog.csdn.net/stven_king/article/details/103256724','https://blog.csdn.net/csdnnews/article/details/103154693','https://blog.csdn.net/dg_lee/article/details/103951021','https://blog.csdn.net/m0_37907797/article/details/103272967','https://blog.csdn.net/zzq900503/article/details/49618605','https://blog.csdn.net/weixin_44339238/article/details/103977138','https://blog.csdn.net/dengjin20104042056/article/details/103930275','https://blog.csdn.net/Mind_programmonkey/article/details/103940511','https://blog.csdn.net/xufive/article/details/102993570','https://blog.csdn.net/weixin_41010294/article/details/104009722','https://blog.csdn.net/yunqiinsight/article/details/103137022','https://blog.csdn.net/qq_44210563/article/details/102826406',
]def get_title(url: str):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/86.0.4240.183 Safari/537.36'}r = requests.get(url, headers=headers)if 200 == r.status_code:title = etree.HTML(r.content).xpath('//h1[@class="title-article"]/text()')[0]print(title)else:print(f'[status_code:{r.status_code}]:{r.url}')def main():for url in urls:get_title(url)if __name__ == '__main__':start = time.time()main()print(f'cost time: {time.time() - start}s')
使用 httpx 模块的同步调用( httpx 即可同步,也可异步 )
import time
import httpxdef make_request(client):resp = client.get('https://httpbin.org/get')result = resp.json()print(f'status_code : {resp.status_code}')assert 200 == resp.status_codedef main():session = httpx.Client()# 100 次调用for _ in range(10):make_request(session)if __name__ == '__main__':# 开始start = time.time()main()# 结束end = time.time()print(f'同步:发送100次请求,耗时:{end - start}')
2. 多线程执行(线程池)
from concurrent.futures import ThreadPoolExecutor
import requestsdef fetch_sync(r_url):response = requests.get(r_url)print(f"{r_url} ---> {response.status_code}")url_list = ['https://www.baidu.com','https://www.bing.com'
]
pool = ThreadPoolExecutor(5)
for url in url_list:pool.submit(fetch_sync, url)
pool.shutdown(wait=True)
3. 多线程 + 回调函数执行
from concurrent.futures import ThreadPoolExecutor
import requestsdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):resp = future.result()print(f"{resp.url} ---> {resp.status_code}")url_list = ['https://www.baidu.com','https://www.bing.com'
]
pool = ThreadPoolExecutor(5)
for url in url_list:v = pool.submit(fetch_sync, url)v.add_done_callback(callback)
pool.shutdown(wait=True)
4. 多进程执行
import requests
from concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url)return responseif __name__ == '__main__':url_list = ['https://www.baidu.com','https://www.bing.com']with futures.ProcessPoolExecutor(5) as executor:res = [executor.submit(fetch_sync, url) for url in url_list]print(res)
示例 :
import requests
from concurrent import futures
import time
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_sync(args):i, r_url = argsprint(f'index : {i}')response = requests.get(r_url, verify=False)time.sleep(2)return response.status_codedef callback(future):print(future.result())if __name__ == '__main__':# url_list = ['https://www.github.com', 'https://www.bing.com']url = 'https://www.github.com'with futures.ProcessPoolExecutor(5) as executor:for index in range(1000):v = executor.submit(fetch_sync, (index, url))v.add_done_callback(callback)pass
5. 多进程 + 回调函数执行
import requests
from concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):print(future.result())if __name__ == '__main__':url_list = ['https://www.github.com', 'https://www.bing.com']with futures.ProcessPoolExecutor(5) as executor:for url in url_list:v = executor.submit(fetch_sync, url)v.add_done_callback(callback)pass
二、异步
对于 事件循环,可以动态的增加协程 到 事件循环 中, 而不是在一开始就确定所有需要协程。
协程 只运行在 事件循环 中。
默认情况下 asyncio.get_event_loop() 是一个 select模型 的 事件循环。
默认的 asyncio.get_event_loop() 事件循环属于主线程。
参考 python asyncio 协程:https://blog.csdn.net/dashoumeixi/article/details/81001681
提示:一般是一个线程一个事件循环,为什么要一个线程一个事件循环
如果你要使用多个事件循环 ,创建线程后调用
lp = asyncio.new_event_loop() # 创建一个新的事件循环
asyncio.set_event_loop(lp) # 设置当前线程的事件循环
核心思想: yield from / await 就这2个关键字,运行(驱动)一个协程, 同时交出当前函数的控制权,让事件循环执行下个任务。
yield from 的实现原理 :https://blog.csdn.net/dashoumeixi/article/details/84076812
要搞懂 asyncio 协程,还是先把生成器弄懂,如果对生成器很模糊,比如 yield from 生成器对象,这个看不懂的话,建议先看 :python生成器 yield from:https://blog.csdn.net/dashoumeixi/article/details/80936798
有 2 种方式让协程运行起来,协程(生成器)本身是不运行的
- 1. await / yield from 协程,这一组能等待协程完成。
- 2. asyncio.ensure_future / async(协程) ,这一组不需要等待协程完成。
注意:
- 1. 协程就是生成器的增强版 ( 多了send 与 yield 的接收 ),在 asyncio 中的协程 与 生成器对象不同点:
asyncio协程: 函数内部不能使用 yield [如果使用会抛RuntimeError],只能使用 yield from / await,
一般的生成器: yield 或 yield from 2个都能用,至少使用一个。这 2个本来就是一回事, 协程只是不能使用 yield- 2. 在 asycio 中所有的协程都被自动包装成一个个 Task / Future 对象,但其本质还是一个生成器,因此可以 yield from / await Task/Futrue
基本流程:
- 1. 定义一个协程 (async def 或者 @asyncio.coroutine 装饰的函数)
- 2. 调用上述函数,获取一个协程对象 【不能使用yield,除非你自己写异步模块,毕竟最终所调用的还是基于yield的生成器函数】。通过 asyncio.ensure_future 或 asyncio.async 函数调度协程(这部意味着要开始执行了) ,返回了一个 Task 对象,Task对象 是 Future对象 的 子类, ( 这步可作可不作,只要是一个协程对象,一旦扔进事件队列中,将自动给你封装成Task对象 )
- 3. 获取一个事件循环 asyncio.get_event_loop() ,默认此事件循环属于主线程
- 4. 等待事件循环调度协程
后面的例子着重说明了一下 as_completed,附加了源码。 先说明一下:
- 1. as_completed 每次迭代返回一个协程,
- 2. 这个协程内部从 Queue 中取出先完成的 Future 对象
- 3. 然后我们再 await coroutine
示例 1:
import asyncio"""第一个例子没什么用.注意: 协程 与 生成器 的用法是一样的. 需要调用之后才产生对象.
"""async def func():print('hi')lp = asyncio.get_event_loop() # 获取事件循环# 放到进事件循环里.注意,func() 而不是func. 需要调用之后才是协程对象.
lp.run_until_complete(func())
示例 2:
import asyncio"""用async def (新语法) 定义一个函数,同时返回值asyncio.sleep 模拟IO阻塞情况 ; await 相当于 yield from.await 或者 yield from 交出函数控制权(中断),让事件循环执行下个任务 ,一边等待后面的协程完成
"""async def func(i):print('start')await asyncio.sleep(i) # 交出控制权print('done')return ico = func(2) # 产生协程对象
print(co)
lp = asyncio.get_event_loop() # 获取事件循环
task = asyncio.ensure_future(co) # 开始调度
lp.run_until_complete(task) # 等待完成
print(task.result()) # 获取结果
添加回调
示例 1:
import asyncio"""添加一个回调:add_done_callback
"""async def func(i):print('start')await asyncio.sleep(i)return idef call_back(v):print('callback , arg:', v, 'result:', v.result())if __name__ == '__main__':co = func(2) # 产生协程对象lp = asyncio.get_event_loop() # 获取事件循环# task = asyncio.run_coroutine_threadsafe(co) # 开始调度task = asyncio.ensure_future(co) # 开始调度task.add_done_callback(call_back) # 增加回调lp.run_until_complete(task) # 等待print(task.result()) # 获取结果
子协程调用原理图
官方的一个实例如下
从下面的原理图我们可以看到
- 1 当 事件循环 处于运行状态的时候,任务Task 处于pending(等待),会把控制权交给委托生成器 print_sum
- 2 委托生成器 print_sum 会建立一个双向通道为Task和子生成器,调用子生成器compute并把值传递过去
- 3 子生成器compute会通过委托生成器建立的双向通道把自己当前的状态suspending(暂停)传给Task,Task 告诉 loop 它数据还没处理完成
- 4 loop 会循环检测 Task ,Task 通过双向通道去看自生成器是否处理完成
- 5 子生成器处理完成后会向委托生成器抛出一个异常和计算的值,并关闭生成器
- 6 委托生成器再把异常抛给任务(Task),把任务关闭
- 7 loop 停止循环
call_soon、call_at、call_later、call_soon_threadsafe
- call_soon 循环开始检测时,立即执行一个回调函数
- call_at 循环开始的第几秒s执行
- call_later 循环开始后10s后执行
- call_soom_threadsafe 立即执行一个安全的线程
import asyncioimport timedef call_back(str_var, loop):print("success time {}".format(str_var))def stop_loop(str_var, loop):time.sleep(str_var)loop.stop()# call_later, call_at
if __name__ == "__main__":event_loop = asyncio.get_event_loop()event_loop.call_soon(call_back, 'loop 循环开始检测立即执行', event_loop)now = event_loop.time() # loop 循环时间event_loop.call_at(now + 2, call_back, 2, event_loop)event_loop.call_at(now + 1, call_back, 1, event_loop)event_loop.call_at(now + 3, call_back, 3, event_loop)event_loop.call_later(6, call_back, "6s后执行", event_loop)# event_loop.call_soon_threadsafe(stop_loop, event_loop)event_loop.run_forever()
不同线程中的事件循环
事件循环中维护了一个队列(FIFO, Queue) ,通过另一种方式来调用:
import time
import datetime
import asyncio"""事件循环中维护了一个FIFO队列通过call_soon 通知事件循环来调度一个函数.
"""def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')time.sleep(x)print(f'func invoked:{x}')loop = asyncio.get_event_loop()
loop.call_soon(func, 1) # 调度一个函数
loop.call_soon(func, 2)
loop.call_soon(func, 3)
loop.run_forever() # 阻塞'''
x:1, start time:2020-10-01 15:45:46
func invoked:1
x:2, start time:2020-10-01 15:45:47
func invoked:2
x:3, start time:2020-10-01 15:45:49
func invoked:3
'''
可以看到以上操作是同步的。下面通过 asyncio.run_coroutine_threadsafe
函数可以把上述函数调度变成异步执行:
import time
import datetime
import asyncio"""1.首先会调用asyncio.run_coroutine_threadsafe 这个函数.2.之前的普通函数修改成协程对象
"""async def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')await asyncio.sleep(x)print(f'func invoked:{x}, now:{datetime.datetime.now().replace(microsecond=0)}')loop = asyncio.get_event_loop()
co1 = func(1)
co2 = func(2)
co3 = func(3)
asyncio.run_coroutine_threadsafe(co1, loop) # 调度
asyncio.run_coroutine_threadsafe(co2, loop)
asyncio.run_coroutine_threadsafe(co3, loop)
loop.run_forever() # 阻塞'''
x:1, start time:2020-10-01 15:49:32
x:2, start time:2020-10-01 15:49:32
x:3, start time:2020-10-01 15:49:32
func invoked:1, now:2020-10-01 15:49:33
func invoked:2, now:2020-10-01 15:49:34
func invoked:3, now:2020-10-01 15:49:35
'''
上面 2 个例子只是告诉你 2 件事情。
- 1. run_coroutine_threadsafe是异步线程安全 ,call_soon是同步。
- 2. run_coroutine_threadsafe 这个函数 对应 ensure_future (只能作用于同一线程中)。
可以在一个子线程中运行一个事件循环,然后在主线程中动态的添加协程,这样既不阻塞主线程执行其他任务,子线程也可以异步的执行协程。
注意:默认情况下获取的 event_loop 是主线程的,所以要在子线程中使用 event_loop 需要 new_event_loop 。如果在子线程中直接获取 event_loop 会抛异常 。
源代码中的判断:isinstance(threading.current_thread(), threading._MainThread)
示例:
import os
import sys
import queue
import threading
import time
import datetime
import asyncio"""1. call_soon , call_soon_threadsafe 是同步的2. asyncio.run_coroutine_threadsafe(coro, loop) -> 对应 asyncio.ensure_future是在 事件循环中 异步执行。
"""# 在子线程中执行一个事件循环 , 注意需要一个新的事件循环
def thread_loop(loop: asyncio.AbstractEventLoop):print('线程开启 tid:', threading.currentThread().ident)asyncio.set_event_loop(loop) # 设置一个新的事件循环loop.run_forever() # run_forever 是阻塞函数,所以,子线程不会退出。async def func(x, q):current_time = datetime.datetime.now().replace(microsecond=0)msg = f'func: {x}, time:{current_time}, tid:{threading.currentThread().ident}'print(msg)await asyncio.sleep(x)q.put(x)if __name__ == '__main__':temp_queue = queue.Queue()lp = asyncio.new_event_loop() # 新建一个事件循环, 如果使用默认的, 则不能放入子线程thread_1 = threading.Thread(target=thread_loop, args=(lp,))thread_1.start()co1 = func(2, temp_queue) # 2个协程co2 = func(3, temp_queue)asyncio.run_coroutine_threadsafe(co1, lp) # 开始调度在子线程中的事件循环asyncio.run_coroutine_threadsafe(co2, lp)print(f'开始事件:{datetime.datetime.now().replace(microsecond=0)}')while 1:if temp_queue.empty():print('队列为空,睡1秒继续...')time.sleep(1)continuex = temp_queue.get() # 如果为空,get函数会直接阻塞,不往下执行current_time = datetime.datetime.now().replace(microsecond=0)msg = f'main :{x}, time:{current_time}'print(msg)time.sleep(1)
下面例子中 asyncio.ensure_future/async 都可以换成 asyncio.run_coroutine_threadsafe 【 在不同线程中的事件循环 】:
ThreadPollExecutor 和 asyncio 完成阻塞 IO 请求
在 asyncio 中集成线程池处理耗时IO
在协程中同步阻塞的写法,但有些时候不得已就是一些同步耗时的接口
可以把 线程池 集成到 asynico 模块中
import asyncio
from concurrent import futurestask_list = []
loop = asyncio.get_event_loop()
executor = futures.ThreadPoolExecutor(3)def get_url(t_url=None):print(t_url)for url in range(20):url = "http://shop.projectsedu.com/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)task_list.append(task)loop.run_until_complete(asyncio.wait(task_list))
示例代码:
# 使用多线程:在 协程 中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparsedef get_url(url):# 通过socket请求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"# 建立socket连接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) # 阻塞不会消耗cpu# 不停的询问连接是否建立好, 需要while循环不停的去检查状态# 做计算任务或者再次发起其他的连接请求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":import timestart_time = time.time()loop = asyncio.get_event_loop()executor = ThreadPoolExecutor(3)tasks = []for url in range(20):url = "http://shop.projectsedu.com/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))print("last time:{}".format(time.time() - start_time))
不用集成也是可以的,但是要在函数的前面加上 async 使同步变成异步写法
#使用多线程:在携程中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
import timeasync def get_html(url):#通过socket请求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"#建立socket连接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) #阻塞不会消耗cpu#不停的询问连接是否建立好, 需要while循环不停的去检查状态#做计算任务或者再次发起其他的连接请求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()tasks = [get_html("http://shop.projectsedu.com/goods/2/") for i in range(10)]loop.run_until_complete(asyncio.wait(tasks))print(time.time() - start_time)
asyncio 的 同步 和 通信
在多少线程中考虑安全性,需要加锁,在协程中是不需要的
import asynciototal = 0
lock = Noneasync def add():global totalfor _ in range(1000):total += 1async def desc():global total, lockfor _ in range(1000):total -= 1if __name__ == '__main__':tasks = [add(), desc()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))print(total)
在有些情况中,对协程还是需要类似锁的机制
示例:parse_response 和 use_response 有共同调用的代码,get_response、parse_response 去请求的时候 如果 get_response 也去请求,会触发网站的反爬虫机制.
这就需要我们像上诉代码那样加 lock,同时 get_response 和 use_response 中都调用了parse_response,我们想在 get_response 中只请求一次,下次用缓存,所以要用到锁
import asyncio
import aiohttp
from asyncio import Lockcache = {}
lock = Lock()async def get_response(url):async with lock: # 等价于 with await lock: 还有async for 。。。类似的用法# 这里使用async with 是因为 Lock中有__await__ 和 __aenter__两个魔法方法# 和线程一样, 这里也可以用 await lock.acquire() 并在结束时 lock.releaseif url in cache:return cache[url]print("第一次请求")response = aiohttp.request('GET', url)cache[url] = responsereturn responseasync def parse_response(url):response = await get_response(url)print('parse_response', response)# do some parseasync def use_response(url):response = await get_response(url)print('use_response', response)# use response to do something interestingif __name__ == '__main__':tasks = [parse_response('baidu'), use_response('baidu')]loop = asyncio.get_event_loop()# loop.run_until_complete将task放到loop中,进行事件循环, 这里必须传入的是一个listloop.run_until_complete(asyncio.wait(tasks))
输出结果如下
asyncio 通信 queue
协程是单线程的,所以协程中完全可以使用全局变量实现 queue 来相互通信,但是如果想要在 queue 中定义存放有限的最大数目,需要在 put 和 get 的前面都要加 await
from asyncio import Queuequeue = Queue(maxsize=3)
await queue.get()
await queue.put()
一个事件循环中执行多个 task,实现并发执行
future 和 task:
- future 是一个结果的容器,结果执行完后在内部会回调 call_back 函数
- task 是 future 的子类,可以用来激活协程。( task 是 协程 和 Future 的 桥梁 )
wait、gather、await
1. wait、gather 这2个函数都是用于获取结果的,且都不阻塞,直接返回一个生成器对象可用于 yield from / await
2. 两种用法可以获取执行完成后的结果:
第一种: result = asyncio.run_until_completed(asyncio.wait/gather) 执行完成所有之后获取结果
第二种: result = await asyncio.wait/gather 在一个协程内获取结果3. as_completed 与并发包 concurrent 中的行为类似,哪个任务先完成哪个先返回,内部实现是 yield from Queue.get()
4. 嵌套:await / yield from 后跟协程,直到后面的协程运行完毕,才执行 await / yield from 下面的代码,整个过程是不阻塞的
wait 和 gather 区别
这两个都可以添加多个任务到事件循环中
一般使用 asyncio.wait(tasks) 的地方也可以使用 asyncio.gather(tasks) ,但是 wait 接收一堆 task,gather接收一个 task 列表。
asyncio.wait(tasks)方法返回值是两组 task/future的 set。dones, pendings = await asyncio.wait(tasks)
其中
- dones 是 task的 set,
- pendings 是 future 的 set。
asyncio.gather(tasks) 返回一个结果的 list。
gather 比 wait 更加的高级
- 可以对任务进行分组
- 可以取消任务
import asyncio
import timeasync def get_html(url):global indexprint(f"{index} start get url")await asyncio.sleep(2)index += 1print(f"{index} end get url")if __name__ == "__main__":start_time = time.time()index = 1loop = asyncio.get_event_loop()tasks = [get_html("http://www.imooc.com") for i in range(10)]# gather和wait的区别# tasks = [get_html("http://www.imooc.com") for i in range(10)]# loop.run_until_complete(asyncio.wait(tasks))group1 = [get_html("http://projectsedu.com") for i in range(2)]group2 = [get_html("http://www.imooc.com") for i in range(2)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)loop.run_until_complete(asyncio.gather(group1, group2))print(time.time() - start_time)
示例 1:
import asyncio"""并发 执行多个任务。调度一个Task对象列表调用 asyncio.wait 或者 asyncio.gather 获取结果
"""async def func(i):print('start')# 交出控制权,事件循环执行下个任务,同时等待完成await asyncio.sleep(i)return iasync def func_sleep():await asyncio.sleep(2)def test_1():# asyncio create_task永远运行# https://www.pythonheidong.com/blog/article/160584/ca5dc07f62899cedad64/lp = asyncio.get_event_loop()tasks = [lp.create_task(func(i)) for i in range(3)]lp.run_until_complete(func_sleep())# 或者# lp.run_until_complete(asyncio.wait([func_sleep(), ]))def test_2(): lp = asyncio.get_event_loop()# tasks = [func(i) for i in range(3)]# tasks = [asyncio.ensure_future(func(i)) for i in range(3)] # asyncio.ensure_future# 或者tasks = [lp.create_task(func(i)) for i in range(3)] # lp.create_tasklp.run_until_complete(asyncio.wait(tasks))for task in tasks:print(task.result())if __name__ == '__main__':# test_1()test_2()pass
示例 2:
import asyncio"""通过 await 或者 yield from 形成1个链, 后面跟其他协程. 形成一个链的目的很简单,当前协程需要这个结果才能继续执行下去.就跟普通函数调用其他函数获取结果一样
"""async def func(i):print('start')await asyncio.sleep(i)return iasync def to_do():print('to_do start')tasks = []# 开始调度3个协程对象for i in range(3):tasks.append(asyncio.ensure_future(func(i)))# 在协程内等待结果. 通过 await 来交出控制权, 同时等待tasks完成task_done, task_pending = await asyncio.wait(tasks)print('to_do get result')# 获取已经完成的任务for task in task_done:print('task_done:', task.result())# 未完成的for task in task_pending:print('pending:', task)if __name__ == '__main__':lp = asyncio.get_event_loop() # 获取事件循环lp.run_until_complete(to_do()) # 把协程对象放进去# lp.close() # 关闭事件循环
as_completed 函数返回一个迭代器,每次迭代一个协程。
事件循环内部有一个 Queue(queue.Queue 线程安全) , 先完成的先入队。
as_completed 迭代的协程源码是 : 注意 yield from 后面可以跟 iterable
#简化版代码
f = yield from done.get() # done 是 Queue
return f.result()
例子:
asyncio.as_completed 返回一个生成器对象 , 因此可用于迭代
每次从此生成器中返回的对象是一个个协程(生成器),哪个最先完成哪个就返回, 而要从 生成器/协程 中获取返回值,就必须使用 yield from / await , 简单来说就是:生成器的返回值在异常中, 详情参考最上面的基础链接
import asyncioasync def func(x):# print('\t\tstart ',x)await asyncio.sleep(5)# print('\t\tdone ', x)return xasync def to_do():# 在协程内调度2个协程tasks = [asyncio.ensure_future(func(i)) for i in range(2)]# 使用as_completed:先完成,先返回.# 每次迭代返回一个协程.# 这个协程:_wait_for_one,内部从队列中产出一个最先完成的Future对象for coroutine in asyncio.as_completed(tasks):result = await coroutine # 等待协程,并返回先完成的协程print('result :', result)print('all done')lp = asyncio.get_event_loop()
lp.set_debug(True)
lp.run_until_complete(to_do()) # 调度协程
获取多个并发的 task 的结果。
( task 是 协程 和 Future 的 桥梁。 )
-
如果我们要获取 task 的结果,一定要创建一个task,就是把我们的协程绑定要 task 上,这里直接用事件循环 loop 里面的 create_task 就可以搞定。
-
我们假设有3个并发的add任务需要处理,然后调用 run_until_complete 来等待3个并发任务完成。
-
调用 task.result 查看结果,这里的 task 其实是 _asyncio.Task,是封装的一个类。大家可以在 Pycharm 中找 asyncio 里面的源码,里面有一个 tasks 文件。
爬取有道词典
玩并发比较多的是爬虫,爬虫可以用多线程,线程池去爬。但是我们用 requests 的时候是阻塞的,无法并发。所以我们要用一个更牛逼的库 aiohttp,这个库可以当成是异步的 requests。
1). 爬取有道词典
有道翻译的API已经做好了,我们可以直接调用爬取。然后解析网页,获取单词的翻译。然后解析网页,网页比较简单,可以有很多方法解析。因为爬虫文章已经泛滥了,我这里就不展开了,很容易就可以获取单词的解释。
2). 代码的核心框架
-
设计一个异步的框架,生成一个事件循环
-
创建一个专门去爬取网页的协程,利用aiohttp去爬取网站内容
-
生成多个要翻译的单词的url地址,组建一个异步的tasks, 扔到事件循环里面
-
等待所有的页面爬取完毕,然后用pyquery去一一解析网页,获取单词的解释,部分代码如下:
import time
import asyncio
import aiohttp
from pyquery import PyQuery as pqdef decode_html(html_content=None):url, resp_text = html_contentdoc = pq(resp_text)des = ''for li in doc.items('#phrsListTab .trans-container ul li'):des += li.text()return url, desasync def fetch(session: aiohttp.ClientSession = None, url=None):async with session.get(url=url) as resp:resp_text = await resp.text()return url, resp_textasync def main(word_list=None):url_list = ['http://dict.youdao.com/w/{}'.format(word) for word in word_list]temp_task_list = []async with aiohttp.ClientSession() as session:for url in url_list:temp_task_list.append(fetch(session, url))html_list = await asyncio.gather(*temp_task_list)for html_content in html_list:print(decode_html(html_content))if __name__ == '__main__':start_time = time.time()text = 'apple'word_list_1 = [ch for ch in text]word_list_2 = [text for _ in range(100)]loop = asyncio.get_event_loop()task_list = [main(word_list_1),main(word_list_2),]loop.run_until_complete(asyncio.wait(task_list))print(time.time() - start_time)
谈到 http 接口调用,Requests 大家并不陌生,例如,robotframework-requests、HttpRunner 等 HTTP 接口测试库/框架都是基于它开发。
这里将介绍另一款http接口测试框架 httpx,snaic 同样也集成了 httpx 库。httpx 的 API 和 Requests 高度一致。github: https://github.com/encode/httpx
安装:pip install httpx
httpx 简单使用
import json
import httpxr = httpx.get("http://httpbin.org/get")
print(r.status_code)
print(json.dumps(r.json(), ensure_ascii=False, indent=4))
带参数的 post 调用
import json
import httpxpayload = {'key1': 'value1', 'key2': 'value2'}
r = httpx.post("http://httpbin.org/post", data=payload)
print(r.status_code)
print(json.dumps(r.json(), ensure_ascii=False, indent=4))
httpx 异步调用。接下来认识 httpx 的异步调用:
import json
import httpx
import asyncioasync def main():async with httpx.AsyncClient() as client:resp = await client.get('http://httpbin.org/get')result = resp.json()print(json.dumps(result, ensure_ascii=False, indent=4))asyncio.run(main())
httpx 异步调用
import httpx
import asyncio
import timeasync def request(client):global indexresp = await client.get('http://httpbin.org/get')index += 1result = resp.json()print(f'{index} status_code : {resp.status_code}')assert 200 == resp.status_codeasync def main():async with httpx.AsyncClient() as client:# 50 次调用task_list = []for _ in range(50):req = request(client)task = asyncio.create_task(req)task_list.append(task)await asyncio.gather(*task_list)if __name__ == "__main__":index = 0# 开始start = time.time()asyncio.run(main())# 结束end = time.time()print(f'异步:发送 50 次请求,耗时:{end - start}')
想搞懂 异步框架 和异步接口的调用,可以按这个路线学习:1.python异步编程、2.python Web异步框架(tornado/sanic)、3.异步接口调用(aiohttp/httpx)
1. asyncio
示例 1:( Python 3.5+ 之前的写法 )
import asyncio@asyncio.coroutine
def func1():print('before...func1......')yield from asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
改进,使用 async / await 关键字 ( Python 3.5+ 开始引入了新的语法 async 和 await )
import asyncioasync def func1():print('before...func1......')await asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
示例 2 :
import asyncioasync def fetch_async(host, url='/'):print(host, url)reader, writer = await asyncio.open_connection(host, 80)request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)request_header_content = bytes(request_header_content, encoding='utf-8')writer.write(request_header_content)await writer.drain()text = await reader.read()print(host, url, text)writer.close()tasks = [fetch_async('www.cnblogs.com', '/wupeiqi/'),fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
]loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
示例 3:
#!/usr/bin/env python
# encoding:utf-8
import asyncio
import aiohttp
import timeasync def download(url): # 通过async def定义的函数是原生的协程对象print("get: %s" % url)async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)# response = await resp.read()async def main():start = time.time()await asyncio.wait([download("http://www.163.com"),download("http://www.mi.com"),download("http://www.baidu.com")])end = time.time()print("Complete in {} seconds".format(end - start))loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Python 异步编程之 asyncio(百万并发)
前言:python 由于 GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病。然而在 IO 密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了 python 性能方面的短板,如最新的微服务框架 japronto,resquests per second 可达百万级。
python 还有一个优势是库(第三方库)极为丰富,运用十分方便。asyncio 是 python3.4 版本引入到标准库,python2x 没有加这个库,毕竟 python3x 才是未来啊,哈哈!python3.5 又加入了 async/await 特性。在学习 asyncio 之前,我们先来理清楚 同步/异步的概念:
- 同步 是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。。。
- 异步 是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。
aiohttp 使用
如果需要并发 http 请求怎么办呢,通常是用 requests,但 requests 是同步的库,如果想异步的话需要引入 aiohttp。这里引入一个类,from aiohttp import ClientSession,首先要建立一个 session 对象,然后用 session 对象去打开网页。session 可以进行多项操作,比如 post、get、put、head 等。
基本用法:
async with ClientSession() as session:async with session.get(url) as response:
aiohttp 异步实现的例子:
import asyncio
from aiohttp import ClientSessiontasks = []
url = "http://httpbin.org/get?args=hello_word"async def hello(t_url):async with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()# response = await req.text()# print(response)print(f'{req.url} : {req.status}')if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(hello(url))
首先 async def 关键字定义了这是个异步函数,await 关键字加在需要等待的操作前面,req.read() 等待 request 响应,是个耗 IO 操作。然后使用 ClientSession 类发起 http 请求。
异步请求多个URL
如果我们需要请求多个 URL 该怎么办呢?
- 同步的做法:访问多个 URL时,只需要加个 for 循环就可以了。
- 但异步的实现方式并没那么容易:在之前的基础上需要将 hello() 包装在 asyncio 的 Future 对象中,然后将 Future对象列表 作为 任务 传递给 事件循环。
import datetime
import asyncio
from aiohttp import ClientSessiontask_list = []
url = "https://www.baidu.com/{}"async def hello(t_url):ret_val = Noneasync with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()print(f'Hello World:{datetime.datetime.now().replace(microsecond=0)}')# print(response)ret_val = req.statusreturn ret_valdef run():for i in range(5):one_task = asyncio.ensure_future(hello(url.format(i)))task_list.append(one_task)if __name__ == '__main__':loop = asyncio.get_event_loop()run()result = loop.run_until_complete(asyncio.wait(task_list))# 方法 1 : 获取结果for task in task_list:print(task.result())# 方法 2 : 获取结果finish_task, pending_task = resultprint(f'finish_task count:{len(pending_task)}')for task in finish_task:print(task.result())print(f'pending_task count:{len(pending_task)}')for task in pending_task:print(task.result())'''
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
404
404
404
404
404
finish_task count:0
404
404
404
404
404
pending_task count:0
'''
收集 http 响应
上面介绍了访问不同 URL 的异步实现方式,但是我们只是发出了请求,如果要把响应一一收集到一个列表中,最后保存到本地或者打印出来要怎么实现呢?
可通过 asyncio.gather(*tasks) 将响应全部收集起来,具体通过下面实例来演示。
import time
import asyncio
from aiohttp import ClientSessiontask_list = []
temp_url = "https://www.baidu.com/{}"async def hello(url=None):async with ClientSession() as session:async with session.get(url) as request:# print(request)print('Hello World:%s' % time.time())return await request.read()def run():for i in range(5):task = asyncio.ensure_future(hello(temp_url.format(i)))task_list.append(task)result = loop.run_until_complete(asyncio.gather(*task_list))print(f'len(result) : {len(result)}')for item in result:print(item)if __name__ == '__main__':loop = asyncio.get_event_loop()run()
限制并发数(最大文件描述符的限制)
提示:此方法也可用来作为异步爬虫的限速方法(反反爬)
假如你的并发达到 2000 个,程序会报错:ValueError: too many file descriptors in select()。报错的原因字面上看是 Python 调取的 select 对打开的文件有最大数量的限制,这个其实是操作系统的限制,linux 打开文件的最大数默认是 1024,windows 默认是 509,超过了这个值,程序就开始报错。这里我们有三种方法解决这个问题:
- 1. 限制并发数量。(一次不要塞那么多任务,或者限制最大并发数量)
- 2. 使用回调的方式。
- 3. 修改操作系统打开文件数的最大限制,在系统里有个配置文件可以修改默认值,具体步骤不再说明了。
不修改系统默认配置的话,个人推荐限制并发数的方法,设置并发数为 500,处理速度更快。
使用 semaphore = asyncio.Semaphore(500) 以及在协程中使用 async with semaphore: 操作具体代码如下:
# coding:utf-8
import time, asyncio, aiohttpurl = 'https://www.baidu.com/'index = 0async def hello(url, semaphore):global indexasync with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as response:print(f'{index} : ', end='')await asyncio.sleep(2)print(response.status)return await response.read()async def run():# 为了看效果,这是设置 100 个任务,并发限制为 5semaphore = asyncio.Semaphore(5) # 限制并发量为500to_get = [hello(url.format(), semaphore) for _ in range(100)] # 总共1000任务await asyncio.wait(to_get)if __name__ == '__main__':# now=lambda :time.time()loop = asyncio.get_event_loop()loop.run_until_complete(run())# loop.close()
示例代码:
import asyncio
import aiohttpasync def get_http(url):async with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as res:global countcount += 1print(count, res.status)if __name__ == '__main__':count = 0semaphore = asyncio.Semaphore(500)loop = asyncio.get_event_loop()temp_url = 'https://www.baidu.com/s?wd={0}'tasks = [get_http(temp_url.format(i)) for i in range(600)]loop.run_until_complete(asyncio.wait(tasks))loop.close()
示例代码:
from aiohttp import ClientSession
import asyncio# 限制协程并发量
async def hello(sem, num):async with sem:async with ClientSession() as session:async with session.get(f'http://httpbin.org/get?a={num}') as response:r = await response.read()print(f'[{num}]:{r}')await asyncio.sleep(1)def main():loop = asyncio.get_event_loop()tasks = []sem = asyncio.Semaphore(5) # thisfor index in range(100000):task = asyncio.ensure_future(hello(sem, index))tasks.append(task)feature = asyncio.ensure_future(asyncio.gather(*tasks))try:loop.run_until_complete(feature)finally:loop.close()if __name__ == "__main__":main()
aiohttp 实现高并发爬虫 ( 异步 mysql )
python asyncio并发编程:https://www.cnblogs.com/crazymagic/articles/10066619.html
# asyncio爬虫, 去重, 入库import asyncio
import re
import aiohttp
import aiomysql
from pyquery import PyQuerystopping = Falsestart_url = 'http://www.jobbole.com'
waiting_urls = []
seen_urls = set() # 实际使用爬虫去重时,数量过多,需要使用布隆过滤器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html): # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool): # 获取文章详情并解析入库html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text() # 为了简单, 只获取title的内容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql) # 插入数据库# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0: # 如果使用asyncio.Queue的话, 不需要我们来处理这些逻辑。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?jobbole.com/\d+/', url):if url not in seen_urls: # 是没有处理过的url,则处理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql连接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset autocommit必须设置, 这是坑, 不写数据库写入不了中文数据async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()
学习 python 高并发模块 asynio
参考:Python黑魔法 --- 异步IO( asyncio) 协程:https://www.jianshu.com/p/b5e347b3a17c
Python 中重要的模块 --- asyncio:https://www.cnblogs.com/zhaof/p/8490045.html
Python 协程深入理解:https://www.cnblogs.com/zhaof/p/7631851.html
asyncio 是 python 用于解决异步io编程的一整套解决方案
创建一个 asyncio 的步骤如下
- 创建一个 event_loop 事件循环,当启动时,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。
- 创建协程: 使用 async 关键字定义的函数就是一个协程对象。在协程函数内部可以使用 await 关键字用于阻塞操作的挂起。
- 将协程注册到事件循环中。协程的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
一、定义一个协程
import time
import asyncioasync def do_some_work(x):print("waiting:", x)start = time.time()
# 这里是一个协程对象,这个时候do_some_work函数并没有执行
coroutine = do_some_work(2)
print(coroutine)
# 创建一个事件loop
loop = asyncio.get_event_loop()
# 将协程注册到事件循环,并启动事件循环
loop.run_until_complete(coroutine)print("Time:", time.time() - start)
二、创建一个 task
一个协程对象 就是 一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态。
在上面的代码中,在注册事件循环的时候,其实是 run_until_complete 方法将协程包装成为了一个任务(task)对象。 task 对象是 Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。
import asyncio
import timestart = lambda: time.time()async def do_some_work(x):print("waiting:", x)start = start()coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print("Time:", time.time() - start)
loop.create_task,asyncio.async / asyncio.ensure_future 和 Task 有什么区别?
BaseEventLoop.create_task(coro) 、asyncio.async(coro)、Task(coro)安排协同程序执行,这似乎也可以正常工作。那么,所有这些之间有什么区别?
- 在 Python> = 3.5中,已将 async 设为关键字,所以 asyncio.async 必须替换为 asyncio.ensure_future
- create_task 的存在理由:
第三方事件循环可以使用其自己的Task子类来实现互操作性。在这种情况下,结果类型是Task的子类。
这也意味着您不应直接创建 Task ,因为不同的事件循环可能具有不同的创建"任务"的方式。 - 另一个重要区别是,除了接受协程外, ensure_future 也接受任何等待的对象;而 create_task 只接受协程。
那么用 ensure_future 还是 create_task 函数声明对比:
- asyncio.ensure_future(coro_or_future, *, loop=None)
- BaseEventLoop.create_task(coro)
显然,ensure_future 除了接受 coroutine 作为参数,还接受 future 作为参数。
看 ensure_future 的代码,会发现 ensure_future 内部在某些条件下会调用 create_task,综上所述:
- encure_future: 最高层的函数,推荐使用!
- create_task: 在确定参数是 coroutine 的情况下可以使用。
- Task: 可能很多时候也可以工作,但真的没有使用的理由!
为了 interoperability,第三方的事件循环可以使用自己的 Task 子类。这种情况下,返回结果的类型是 Task 的子类。
所以,不要直接创建 Task 实例,应该使用 ensure_future() 函数或 BaseEventLoop.create_task() 方法。
asyncio.ensure_future 与 BaseEventLoop.create_task 对比简单的协同程序
From:asyncio.ensure_future与BaseEventLoop.create_task对比简单的协同程序?-python黑洞网
看过几个关于asyncio的基本Python 3.5教程,它们以各种方式执行相同的操作。在这段代码中:
import asyncioasync def doit(i):print("Start %d" % i)await asyncio.sleep(3)print("End %d" % i)return iif __name__ == '__main__':loop = asyncio.get_event_loop()# futures = [asyncio.ensure_future(doit(i), loop=loop) for i in range(10)]# futures = [loop.create_task(doit(i)) for i in range(10)]futures = [doit(i) for i in range(10)]result = loop.run_until_complete(asyncio.gather(*futures))print(result)
上面定义
futures
变量的所有三个变体都实现了相同的结果,那么他们有什么区别?有些情况下我不能只使用最简单的变体(协程的简单列表)吗?asyncio.create_task(coro) 和 asyncio.ensure_future(obj)
从 Python 3.7 开始,为此目的添加了
asyncio.create_task(coro)
高级功能,可以使用它来代替从 coroutimes 创建任务的其他方法。但是,如果需要从任意等待创建任务,应该使用
asyncio.ensure_future(obj)
。推荐:使用 asyncio.ensure_future(obj) 来代替 asyncio.create_task(coro)
ensure_future VS create_task
ensure_future
是创建一个方法Task从coroutine。它基于参数以不同的方式创建任务(包括使用create_task
协同程序和类似未来的对象)。
create_task
是一种抽象的方法AbstractEventLoop
。不同的事件循环可以不同的方式实现此功能。
您应该使用ensure_future
创建任务。create_task
只有在你要实现自己的事件循环类型时才需要。
“当从协程创建任务时,你应该使用适当命名的loop.create_task()
”
在任务中包装协程 - 是一种在后台启动此协程的方法。这是一个例子:
import asyncioasync def msg(text):await asyncio.sleep(0.1)print(text)async def long_operation():print('long_operation started')await asyncio.sleep(3)print('long_operation finished')async def main():await msg('first')# Now you want to start long_operation, but you don't want to wait it finised:# long_operation should be started, but second msg should be printed immediately.# Create task to do so:task = asyncio.ensure_future(long_operation())await msg('second')# Now, when you want, you can await task finised:await taskif __name__ == "__main__":loop = asyncio.get_event_loop()loop.run_until_complete(main())'''
输出:
first
long_operation started
second
long_operation finished
'''
创建任务:
- 可以通过 loop.create_task(coroutine) 创建 task,
- 也可以通过 asyncio.ensure_future(coroutine) 创建 task。
使用这两种方式的区别在 官网( 协程与任务 — Python 3.10.4 文档 )上有提及。
task / future 以及使用 async 创建的都是 awaitable 对象,都可以在 await 关键字之后使用。
future 对象意味着在未来返回结果,可以搭配回调函数使用。
要真正运行一个协程,asyncio 提供了三种主要机制
( https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.ensure_future )
- 1.
asyncio.run()
函数用来运行最高层级的入口点 "main()" 函数。
import asyncioasync def main():print('hello')await asyncio.sleep(1)print('world')asyncio.run(main())
- 2. 使用 await ( 即 等待一个协程 )
import asyncio
import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():print(f"started at {time.strftime('%X')}")await say_after(3, 'hello')await say_after(1, 'world')print(f"finished at {time.strftime('%X')}")asyncio.run(main())
预期的输出:
started at 17:13:52
world
hello
finished at 17:13:55
- 3. 使用 asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。
import asyncio
import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():task1 = asyncio.create_task(say_after(3, 'hello'))task2 = asyncio.create_task(say_after(1, 'world'))print(f"started at {time.strftime('%X')}")# Wait until both tasks are completed (should take# around 2 seconds.)await task1await task2asyncio.run(main())
预期的输出:
started at 17:14:32
world
hello
finished at 17:14:34
获取协程的返回值
- 1 创建一个任务 task
- 2 通过调用 task.result 获取协程的返回值
import asyncio
import timeasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html("http://httpbin.org"))loop.run_until_complete(task)print(task.result())
三、绑定回调
执行成功进行回调处理
可以通过 add_done_callback( 任务) 添加回调,因为这个函数只接受一个回调的函数名,不能传参,我们想要传参可以使用偏函数
# 获取协程的返回值
import asyncio
import time
from functools import partialasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"def callback(url, future):print(url)print("send email to bobby")if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html("http://www.imooc.com"))task.add_done_callback(partial(callback, "http://www.imooc.com"))loop.run_until_complete(task)print(task.result())
asnycio 异步请求+异步回调:asnycio 异步请求+异步回调_xiaobai8823的博客-CSDN博客
当使用 ensure_feature 创建任务的时候,可以使用任务的 task.add_done_callback(callback)方法,获得对象的协程返回值。
import asyncio
import timeasync def do_some_work(x):print("waiting:", x)return "Done after {}s".format(x)def callback(future):print("callback:", future.result())start = time.time()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)
四、阻塞 ( 使用 await 让出控制权,挂起当前操作 )
前面提到 asynic 函数内部可以使用 await 来针对耗时的操作进行挂起。
import asyncio
import timeasync def do_some_work(x):print("waiting:", x)# await 后面就是调用耗时的操作await asyncio.sleep(x)return "Done after {}s".format(x)coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
五、并发 和 并行
- 并发:同一时刻 同时 发生
- 并行:同一时间 间隔 发生
并发通常是指有多个任务需要同时进行,并行则是同一个时刻有多个任务执行.
当有多个任务需要并行时,可以将任务先放置在任务队列中,然后将任务队列传给 asynicio.wait 方法,这个方法会同时并行运行队列中的任务。将其注册到事件循环中。
import asyncioasync def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)
]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
六、嵌套协程
使用 async 可以定义协程,协程用于耗时的 io 操作,我们也可以封装更多的 io 操作过程,这样就实现了嵌套的协程,即一个协程中 await 了另外一个协程,如此连接起来。
import asyncioasync def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]dones, pendings = await asyncio.wait(tasks)for task in dones:print("Task ret:", task.result())# results = await asyncio.gather(*tasks)# for result in results:# print("Task ret:",result)loop = asyncio.get_event_loop()
loop.run_until_complete(main())
使用 asyncio.wait 的结果如下,可见返回的结果 dones 并不一定按照顺序输出
waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
Time: 4.006587505340576
使用 await asyncio.gather(*tasks) 得到的结果如下,是按照列表顺序进行返回的
waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004234313964844
上面的程序将 main 也定义为协程。我们也可以不在 main 协程函数里处理结果,直接返回 await 的内容,那么最外层的 run_until_complete 将会返回main协程的结果。
import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]return await asyncio.gather(*tasks)# return await asyncio.wait(tasks)也可以使用。注意gather方法需要*这个标记start = now()loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:print("Task ret:", result)print("Time:", now() - start)
也可以使用 as_complete 方法实现嵌套协程
import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]for task in asyncio.as_completed(tasks):result = await taskprint("Task ret: {}".format(result))start = now()loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now() - start)
七、协程停止
创建 future 的时候,task 为 pending,事件循环调用执行的时候当然就是 running,调用完毕自然就是 done,如果需要停止事件循环,就需要先把 task 取消。可以使用 asyncio.Task 获取事件循环的 task。
future 对象有如下几个状态:Pending、Running、Done、Cacelled
import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3),
]start = now()loop = asyncio.get_event_loop()
try:loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:print(asyncio.Task.all_tasks())for task in asyncio.Task.all_tasks():print(task.cancel())loop.stop()loop.run_forever()
finally:loop.close()print("Time:", now() - start)
启动事件循环之后,马上 ctrl+c,会触发 run_until_complete 的执行异常 KeyBorardInterrupt。然后通过循环 asyncio.Task 取消 future。可以看到输出如下:
Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547
True 表示 cannel 成功,loop stop 之后还需要再次开启事件循环,最后在 close,不然还会抛出异常.
循环 task,逐个 cancel 是一种方案,可是正如上面我们把 task 的列表封装在 main 函数中,main 函数外进行事件循环的调用。这个时候,main 相当于最外出的一个task,那么处理包装的main 函数即可。
task取消和子协程调用原理
程序运行时 通过 ctl +c 取消任务 调用task.cancel()取消任务
import asyncio
import timeasync def get_html(sleep_times):print("waiting")await asyncio.sleep(sleep_times)print("done after {}s".format(sleep_times))if __name__ == "__main__":task1 = get_html(2)task2 = get_html(3)task3 = get_html(3)tasks = [task1, task2, task3]loop = asyncio.get_event_loop()try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e:all_tasks = asyncio.Task.all_tasks()for task in all_tasks:print("cancel task")print(task.cancel())loop.stop()loop.run_forever()finally:loop.close()
在终端执行:python ceshi.py ,运行成功后 按 ctl +c 取消任务
不同线程的事件循环( 线程、线程池 )
很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被 block。
import asyncio
from threading import Thread
import timenow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
启动上述代码之后,当前线程不会被 block,新线程中会按照顺序执行 call_soon_threadsafe 方法注册的 more_work 方法, 后者因为 time.sleep 操作是同步阻塞的,因此运行完毕more_work 需要大致 6 + 3
使用 线程池
# -*- coding:utf-8 -*-
import asyncio
import time
from concurrent.futures import ThreadPoolExecutorthread_pool = ThreadPoolExecutor(5)
tasks = []func_now = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = func_now()
new_loop = asyncio.new_event_loop()thread_pool.submit(start_loop, new_loop)print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
主线程创建事件循环,子线程开启无限事件循环
import asyncio
import time
from threading import Threadnow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_some_work(x):print('Waiting {}'.format(x))await asyncio.sleep(x)print('Done after {}s'.format(x))def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
上述的例子,主线程中创建一个 new_loop,然后在另外的子线程中开启一个无限事件循环。 主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被 block。一共执行的时间大概在 6s 左右。
master - worker 主从模式
对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似 master-worker 的方式,master 主要用户获取队列的 msg,worker 用户处理消息。
为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用 redis 的队列。主线程中有一个是无限循环,用户消费队列。
import time
import asyncio
from threading import Thread
import redisdef get_redis(): # 返回一个 redis 连接对象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)def start_loop(loop): # 开启事件循环asyncio.set_event_loop(loop)loop.run_forever()async def worker(task):print('Start worker')while True:# start = now()# task = rcon.rpop("queue") # 从 redis 中 取出的数据# if not task:# await asyncio.sleep(1)# continueprint('Wait ', int(task)) # 取出了相应的任务await asyncio.sleep(int(task))print('Done ', task, now() - start)now = lambda: time.time()
rcon = get_redis()
start = now()
# 创建一个事件循环
new_loop = asyncio.new_event_loop()
# 创建一个线程 在新的线程中开启事件循环
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True) # 设置线程为守护模式
t.start() # 开启线程try:while True:task = rcon.rpop("queue") # 不断从队列中获取任务if not task:time.sleep(1)continue# 包装为 task ins, 传入子线程中的事件循环asyncio.run_coroutine_threadsafe(worker(task), new_loop)
except Exception as e:print('error', e)new_loop.stop() # 出现异常 关闭时间循环
finally:pass
给队列添加一些数据:
127.0.0.1:6379[3]> lpush queue 2
(integer) 1
127.0.0.1:6379[3]> lpush queue 5
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
可以看见输出:
Waiting 2
Done 2
Waiting 5
Waiting 1
Done 1
Waiting 1
Done 1
Done 5
我们发起了一个耗时5s的操作,然后又发起了连个1s的操作,可以看见子线程并发的执行了这几个任务,其中5s awati的时候,相继执行了1s的两个任务。
改进:
import time
import redis
import asyncio
from threading import Threadredis_queue_name = 'redis_list:test'def get_redis(): # 返回一个 redis 连接对象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.StrictRedis(connection_pool=connection_pool)def add_data_to_redis_list(*args):redis_conn = get_redis()redis_conn.lpush(redis_queue_name, *args)def start_loop(loop=None): # 开启事件循环asyncio.set_event_loop(loop)loop.run_forever()async def worker(task=None):print('Start worker')while True:# task = redis_conn.rpop("queue") # 从 redis 中 取出的数据# if not task:# await asyncio.sleep(1)# continueprint('Wait ', int(task)) # 取出了相应的任务# 这里只是简单的睡眠传入的秒数await asyncio.sleep(int(task))def main():redis_conn = get_redis()# 创建一个事件循环new_loop = asyncio.new_event_loop()# 创建一个线程 在新的线程中开启事件循环t = Thread(target=start_loop, args=(new_loop,))t.setDaemon(True) # 设置线程为守护模式t.start() # 开启线程try:while True:task = redis_conn.rpop(name=redis_queue_name) # 不断从队列中获取任务if not task:time.sleep(1)continue# 包装为 task , 传入子线程中的事件循环asyncio.run_coroutine_threadsafe(worker(task), new_loop)except Exception as e:print('error', e)new_loop.stop() # 出现异常 关闭时间循环finally:new_loop.close()if __name__ == '__main__':# data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]# add_data_to_redis_list(*data_list)main()pass
redis队列模型( 生产者 --- 消费者 )
参考:Python中协程异步IO(asyncio)详解 - 知乎
下面代码的主线程和双向队列的主线程有些不同,只是换了一种写法而已,代码如下
生产者代码:
import redisconn_pool = redis.ConnectionPool(host='127.0.0.1')
redis_conn = redis.Redis(connection_pool=conn_pool)redis_conn.lpush('coro_test', '1')
redis_conn.lpush('coro_test', '2')
redis_conn.lpush('coro_test', '3')
redis_conn.lpush('coro_test', '4')
消费者代码:
import asyncio
from threading import Thread
import redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在执行name:', name)await asyncio.sleep(2)return '返回结果:' + nameredis_conn = get_redis()new_loop = asyncio.new_event_loop()
loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()# 循环接收redis消息并动态加入协程
while True:msg = redis_conn.rpop('coro_test')if msg:asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)
改进:
import asyncio
from threading import Thread
import redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在执行name:', name)await asyncio.sleep(2)return '返回结果:' + nameif __name__ == '__main__':redis_queue_name = 'redis_list:test'redis_conn = get_redis()for num in range(1, 10):redis_conn.lpush(redis_queue_name, num)new_loop = asyncio.new_event_loop()loop_thread = Thread(target=start_thread_loop, args=(new_loop,))loop_thread.setDaemon(True)loop_thread.start()# 循环接收redis消息并动态加入协程while True:msg = redis_conn.rpop(name=redis_queue_name)if msg:asyncio.run_coroutine_threadsafe(thread_example('King_' + msg.decode('utf-8')), new_loop)pass
停止子线程
如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,会抛出KeyboardInterrupt错误,我们修改一下主循环:
try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:print(e)new_loop.stop()
可是实际上并不好使,虽然主线程 try 了 KeyboardInterrupt异常,但是子线程并没有退出,为了解决这个问题,可以设置子线程为守护线程,这样当主线程结束的时候,子线程也随机退出。
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True) # 设置子线程为守护线程
t.start()try:while True:# print('start rpop')task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:print(e)new_loop.stop()
线程停止程序的时候,主线程退出后,子线程也随机退出才了,并且停止了子线程的协程任务。
aiohttp
在消费队列的时候,我们使用 asyncio 的 sleep 用于模拟耗时的 io 操作。以前有一个短信服务,需要在协程中请求远程的短信 api,此时需要是需要使用 aiohttp 进行异步的 http 请求。大致代码如下:
server.py
import time
from flask import Flask, requestapp = Flask(__name__)@app.route('/<int:x>')
def index(x):time.sleep(x)return "{} It works".format(x)@app.route('/error')
def error():time.sleep(3)return "error!"if __name__ == '__main__':app.run(debug=True)
/
接口表示短信接口,/error
表示请求/
失败之后的报警。
async-custoimer.py
import time
import asyncio
from threading import Thread
import redis
import aiohttpdef get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)return await resp.text()async def do_some_work(x):print('Waiting ', x)try:ret = await fetch(url='http://127.0.0.1:5000/{}'.format(x))print(ret)except Exception as e:try:print(await fetch(url='http://127.0.0.1:5000/error'))except Exception as e:print(e)else:print('Done {}'.format(x))new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)
t.start()try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:print('error')new_loop.stop()
finally:pass
有一个问题需要注意,我们在fetch的时候try了异常,如果没有try这个异常,即使发生了异常,子线程的事件循环也不会退出。主线程也不会退出,暂时没找到办法可以把子线程的异常raise传播到主线程。(如果谁找到了比较好的方式,希望可以带带我)。
对于 redis 的消费,还有一个 block 的方法:
try:while True:_, task = rcon.brpop("queue")asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:print('error', e)new_loop.stop()
finally:pass
使用 brpop方法,会 block 住 task,如果主线程有消息,才会消费。测试了一下,似乎 brpop 的方式更适合这种队列消费的模型。
127.0.0.1:6379[3]> lpush queue 5
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
可以看到结果
Waiting 5
Waiting 1
Waiting 1
200
1 It works
Done 1
200
1 It works
Done 1
200
5 It works
Done 5
协程消费
主线程用于监听队列,然后子线程的做事件循环的worker是一种方式。还有一种方式实现这种类似master-worker的方案。即把监听队列的无限循环逻辑一道协程中。程序初始化就创建若干个协程,实现类似并行的效果。
import time
import asyncio
import redisnow = lambda : time.time()def get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()async def worker():print('Start worker')while True:start = now()task = rcon.rpop("queue")if not task:await asyncio.sleep(1)continueprint('Wait ', int(task))await asyncio.sleep(int(task))print('Done ', task, now() - start)def main():asyncio.ensure_future(worker())asyncio.ensure_future(worker())loop = asyncio.get_event_loop()try:loop.run_forever()except KeyboardInterrupt as e:print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())loop.stop()loop.run_forever()finally:loop.close()if __name__ == '__main__':main()
这样做就可以多多启动几个worker来监听队列。一样可以到达效果。
总结
上述简单的介绍了asyncio的用法,主要是理解事件循环,协程和任务,future的关系。异步编程不同于常见的同步编程,设计程序的执行流的时候,需要特别的注意。毕竟这和以往的编码经验有点不一样。可是仔细想想,我们平时处事的时候,大脑会自然而然的实现异步协程。比如等待煮茶的时候,可以多写几行代码。
相关代码文件的 Gist
参考:Threaded Asynchronous Magic and How to Wield It
示例:
参考:Python 中的并发处理之 asyncio 包使用的详解:https://www.jb51.net/article/137681.htm
import asyncio
import itertools
import sysasync def spin(msg):for char in itertools.cycle('|/-\\'):status = char + ' ' + msgprint(status)try:# 使用 await asyncio.sleep(.1) 代替 time.sleep(.1),这样的休眠不会阻塞事件循环。await asyncio.sleep(.1)except asyncio.CancelledError:# 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求,因此退出循环。breakasync def slow_function():# 假装等待I/O一段时间# await asyncio.sleep(3) 表达式把控制权交给主循环,在休眠结束后恢复这个协程。await asyncio.sleep(3)return 42async def supervisor():# asyncio.ensure_future(...) 函数排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回。spinner = asyncio.ensure_future(spin('thinking!'))print('spinner object:', spinner)t_result = await slow_function() # 驱动 slow_function() 函数。结束后,获取返回值。# 同时,事件循环继续运行,因为slow_function 函数最后使用 await asyncio.sleep(3) 表达式把控制权交回给了主循环。# Task 对象可以取消;取消后会在协程当前暂停的 yield 处抛出 asyncio.CancelledError 异常。# 协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消。spinner.cancel()return t_resultif __name__ == '__main__':loop = asyncio.get_event_loop() # 获取事件循环的引用# 驱动 supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值。result = loop.run_until_complete(supervisor())loop.close()print('Answer:', result)
二、避免阻塞型调用
1、有两种方法能避免阻塞型调用中止整个应用程序的进程:
- 在单独的线程中运行各个阻塞型操作。
- 把每个阻塞型操作转换成非阻塞的异步调用。
使用多线程处理大量连接时将耗费过多的内存,故此通常使用回调来实现异步调用。
2、使用Executor对象防止阻塞事件循环:
使用 loop.run_in_executor 把阻塞的作业(例如保存文件)委托给线程池做。
示例:
import asyncio
import timeasync def shop(delay, what):print(what)await asyncio.sleep(delay)print(what,"...出来了")async def main():task1 = asyncio.create_task(shop(8, '女朋友看衣服..'))task2 = asyncio.create_task(shop(5, '体验手机..'))print(time.ctime(), "开始逛街")await task1await task2print(time.ctime(), "结束.")asyncio.run(main())
Python --- aiohttp 的使用
参考: https://www.cnblogs.com/ssyfj/p/9222342.html
1. aiohttp 的简单使用(配合asyncio模块)
import asyncio
import aiohttpasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:# 或者直接 await r.read()不编码,直接读取,适合于图像等无法编码文件resp = await r.text(encoding="utf-8")print(resp)print('*' * 50)tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.chouti.com/')
]if __name__ == '__main__':event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
2. 发起一个 session 请求
import asyncio
import aiohttpasync def fetch_async(url):print(url)async with aiohttp.ClientSession() as session: # 协程嵌套,只需要处理最外层协程即可fetch_asyncasync with session.get(url) as resp:print(resp.status)# 因为这里使用到了 await 关键字,实现异步,所有他上面的函数体需要声明为异步asyncprint(await resp.text())print('*' * 50)if __name__ == '__main__':tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.cnblogs.com/ssyfj/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
除了上面的 get 方法外,会话还支持 post,put,delete .... 等
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')
不要为每次的连接都创建一次session,一般情况下只需要创建一个session,然后使用这个session执行所有的请求。
每个session对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。
3. 在 url 中传递参数(其实与 requests 模块使用大致相同)
只需要将参数字典,传入 params 参数中即可
import asyncio
import aiohttp
from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)# print(await r.read())ss = etree.HTML(text=await r.text())movie_name = ss.xpath('//ol[@class="grid_view"]//div[@class="hd"]//a//span[1]//text()')print(f'len: {len(list(map(lambda i=None: print(i), movie_name)))}')if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
4. 获取响应内容(由于获取响应内容是一个阻塞耗时过程,所以我们使用await实现协程切换)
(1)使用 text() 方法
import asyncio
import aiohttp
from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset) # 查看默认编码为 utf-8print(await r.text()) # 不编码使用默认编码,也可以使用 encoding 指定编码if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
(2)使用 read() 方法,不进行编码,为字节形式
import asyncio
import aiohttp
from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(await r.read())if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
(3)注意:text()、read() 方法是把整个响应体读入内存,如果你是获取大量的数据,请考虑使用 "字节流"(StreamResponse)
5. 特殊响应内容 json(和上面一样)
import asyncio
import aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset)# print(await r.json()) # 可以设置编码,设置处理函数print(await r.read())if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
6. 字节流形式获取数据(不像 text、read 一次获取所有数据)
注意:我们获取的 session.get() 是 Response 对象,他继承于 StreamResponse
import asyncio
import aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(await r.content.read(10)) # 读取前10字节if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
下面字节流形式读取数据,保存文件
import asyncio
import aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
注意:
async with session.get(url,params=params) as r: # 异步上下文管理器with open(filename,"wb") as fp: # 普通上下文管理器
两者的区别:在于异步上下文管理器中定义了 __aenter__和__aexit__方法
异步上下文管理器指的是在enter
和exit
方法处能够暂停执行的上下文管理器
为了实现这样的功能,需要加入两个新的方法:__aenter__
和__aexit__
。这两个方法都要返回一个 awaitable类型的值。
推文:异步上下文管理器:https://blog.csdn.net/tinyzhao/article/details/52684473
7. 自定义请求头(和 requests 一样)
import asyncio
import aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:headers = {'Content-Type': 'text/html; charset=utf-8'}async with session.get(url, params=params, headers=headers) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
8. 自定义 cookie
注意:对于自定义cookie,我们需要设置在 ClientSession(cookies=自定义cookie字典),而不是session.get()中
class ClientSession:def __init__(self, *, connector=None, loop=None, cookies=None,headers=None, skip_auto_headers=None,auth=None, json_serialize=json.dumps,request_class=ClientRequest, response_class=ClientResponse,ws_response_class=ClientWebSocketResponse,version=http.HttpVersion11,cookie_jar=None, connector_owner=True, raise_for_status=False,read_timeout=sentinel, conn_timeout=None,timeout=sentinel,auto_decompress=True, trust_env=False,trace_configs=None):
使用:
cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:
9. 获取当前访问网站的 cookie
async with session.get(url) as resp:print(resp.cookies)
10. 获取网站的响应状态码
async with session.get(url) as resp:print(resp.status)
11. 查看响应头
resp.headers # 来查看响应头,得到的值类型是一个dict:
resp.raw_headers # 查看原生的响应头,字节类型
12. 查看重定向的响应头(我们此时已经到了新的网址,向之前的网址查看)
resp.history # 查看被重定向之前的响应头
13. 超时处理
默认的IO操作都有5分钟的响应时间 我们可以通过 timeout 进行重写:
async with session.get('https://github.com', timeout=60) as r:...
如果 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长。
14. ClientSession 用于在多个连接之间(同一网站)共享cookie,请求头等
import asyncio
import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("https://segmentfault.com/q/1010000007987098") as r:print(session.cookie_jar.filter_cookies("https://segmentfault.com"))async with session.get("https://segmentfault.com/hottest") as rp:print(session.cookie_jar.filter_cookies("https://segmentfault.com"))if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q
Set-Cookie: my_cookie=my_value
Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q
Set-Cookie: my_cookie=my_value
我们最好使用session.cookie_jar.filter_cookies()获取网站cookie,不同于requests模块,虽然我们可以使用rp.cookies有可能获取到cookie,但似乎并未获取到所有的cookies。
import asyncio
import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("https://segmentfault.com/q/1010000007987098") as rp_1:print('1' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))# 首次访问会获取网站设置的 cookie# Set-Cookie: PHPSESSID=web2~jh3ouqoabvr4e72f87vtherkp6; Domain=segmentfault.com; Path=/ print('2' * 50)print(f'rp_1.cookies:{rp_1.cookies}')async with session.get("https://segmentfault.com/hottest") as rp_2:print('3' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print('4' * 50)print(f'rp_2.cookies:{rp_2.cookies}') # 为空,服务端未设置 cookieasync with session.get("https://segmentfault.com/newest") as rp_3:print('5' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print('6' * 50)print(f'rp_3.cookies:{rp_3.cookies}') # 为空,服务端未设置cookieif __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
运行结果:
11111111111111111111111111111111111111111111111111
Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9
Set-Cookie: my_cookie=my_value
22222222222222222222222222222222222222222222222222
rp_1.cookies:Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9; Domain=segmentfault.com; Path=/
33333333333333333333333333333333333333333333333333
Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9
Set-Cookie: my_cookie=my_value
44444444444444444444444444444444444444444444444444
rp_2.cookies:
55555555555555555555555555555555555555555555555555
Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9
Set-Cookie: my_cookie=my_value
66666666666666666666666666666666666666666666666666
rp_3.cookies:
总结:
当我们使用 rp.cookie 时,只会获取到当前 url 下设置的 cookie,不会维护整站的cookie而session.cookie_jar.filter_cookies("https://segmentfault.com")会一直保留这个网站的所有设置cookies,含有我们在会话时设置的cookie,并且会根据响应修改更新cookie。这个才是我们需要的而我们设置cookie,也是需要在aiohttp.ClientSession(cookies=cookies)中设置
ClientSession 还支持 请求头,keep-alive连接和连接池(connection pooling)
15. cookie的安全性
默认 ClientSession 使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受 url 和 ip 地址产生的 cookie,只能接受 DNS 解析 IP 产生的 cookie。可以通过设置 aiohttp.CookieJar 的 unsafe=True 来配置:
jar = aiohttp.CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)
16. 控制同时连接的数量(连接池)
TCPConnector 维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求
import asyncio
import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}conn = aiohttp.TCPConnector(limit=2) # 默认100,0表示无限async with aiohttp.ClientSession(cookies=cookies, connector=conn) as session:for i in range(7, 35):url = "https://www.ckook.com/list-%s-1.html" % iasync with session.get(url) as rp:print('---------------------------------')print(rp.status)if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
限制同时打开连接到同一端点的数量( (host, port, is_ssl) 三的倍数),可以通过设置 limit_per_host 参数:
limit_per_host: 同一端点的最大连接数量。同一端点即(host, port, is_ssl)完全相同
conn = aiohttp.TCPConnector(limit_per_host=30)#默认是0
在协程下测试效果不明显
17. 自定义域名解析地址
我们可以指定域名服务器的 IP 对我们提供的get或post的url进行解析:
from aiohttp.resolver import AsyncResolverresolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
conn = aiohttp.TCPConnector(resolver=resolver)
18. 设置代理
aiohttp支持使用代理来访问网页:
async with aiohttp.ClientSession() as session:async with session.get("http://python.org", proxy="http://some.proxy.com") as resp:print(resp.status)
当然也支持需要授权的页面:
async with aiohttp.ClientSession() as session:proxy_auth = aiohttp.BasicAuth('user', 'pass') # 用户,密码async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp:print(resp.status)
或者通过这种方式来验证授权:
session.get("http://python.org", proxy="http://user:pass@some.proxy.com")
19. post传递数据的方法
(1)模拟表单
payload = {'key1': 'value1', 'key2': 'value2'}
async with session.post('http://httpbin.org/post', data=payload) as resp:print(await resp.text())
注意:data=dict 的方式 post 的数据将被转码,和 form 提交数据是一样的作用,如果你不想被转码,可以直接以字符串的形式 data=str 提交,这样就不会被转码。
(2)post json
payload = {'some': 'data'}async with session.post(url, data=json.dumps(payload)) as resp:
其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串可以被识别为json格式
(3)post 小文件
url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}await session.post(url, data=files)
url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')await session.post(url, data=data)
如果将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。
(4)post 大文件
aiohttp支持多种类型的文件以流媒体的形式上传,所以我们可以在文件未读入内存的情况下发送大文件。
@aiohttp.streamer
def file_sender(writer, file_name=None):with open(file_name, 'rb') as f:chunk = f.read(2 ** 16)while chunk:yield from writer.write(chunk)chunk = f.read(2 ** 16)# Then you can use `file_sender` as a data provider:async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp:print(await resp.text())
(5)从一个url获取文件后,直接post给另一个url
r = await session.get('http://python.org')
await session.post('http://httpbin.org/post',data=r.content)
(6)post 预压缩数据
在通过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(通常是deflate 或 zlib)作为content-encoding的值:
import zlibasync def my_coroutine(session, headers, my_data):data = zlib.compress(my_data)headers = {'Content-Encoding': 'deflate'}async with session.post('http://httpbin.org/post', data=data, headers=headers):pass
Python 协程爬虫 --- aiohttp + aiomultiprocess 使用
aiohttp 是基于 asyncio 的一个异步http客户端和服务器
官方文档:https://aiohttp.readthedocs.io/en/stable/client_quickstart.html
aiomultiprocess :https://github.com/omnilib/aiomultiprocess
简单实用例子
async def funct(index): print("start ", index) async with aiohttp.ClientSession() as session: async with session.get("https://movie.douban.com/top250?start=0", timeout=5) as resp: print(resp.status)print(await resp.text()) print("end ", index)
aiohttp.ClientSession()
创建会话,session 提供了各种请求方法,如 get、post、delete、put 等。认识新的关键字 async with,因为是协程的上下文管理,所以多了async关键字。这个不是强制使用的,你也可以自己手动关闭会话,但是一定要记得关闭
注意:
- 1、不要为每个请求创建会话。每个应用程序很可能需要一个会话来执行所有请求。
- 2、aiohttp 在发送请求之前在内部执行 URL 规范化。要禁用规范化,请使用
encoded=True
参数进行URL构建
获取响应信息
resp.status # 状态码
await resp.text() # 获取响应正文,可以指定编码
await resp.read() # 读取二进制响应内容
await resp.json() # 获取json响应内容
await resp.content.read(size) # 读取流
注意事项:aiohttp 是在 await resp.text() 之后才发起请求的,所以必须调用之后才能获取响应的内容,不然会出现异常 aiohttp.client_exceptions.ClientConnectionError: Connection closed
aiomultiprocess
asyncio 和多处理本身是有用的,但有局限性:asyncio 仍然不能超过 GIL 的速度,并且多处理一次只能处理一个任务。但是,他们在一起可以充分实现自己的真正潜力。
aiomultiprocess 提供了一个简单的界面,同时在每个子进程上运行完整的 asyncio 事件循环,从而实现了 Python 应用程序从未有过的并发级别。每个子进程可以一次执行多个协程,仅受工作量和可用内核数限制。
注:aiomultiprocess 需要 Python 3.6 或更高版本
import asyncio
from aiohttp import request
from aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:async for result in pool.map(get, urls):... # process resultif __name__ == '__main__':# Python 3.7asyncio.run(main())# Python 3.6# loop = asyncio.get_event_loop()# loop.run_until_complete(main())
用法:在子进程中执行协程
import asyncio
from aiohttp import request
from aiomultiprocess import Processasync def put(url, params):async with request("PUT", url, params=params) as response:passasync def main():p = Process(target=put, args=("https://jreese.sh", {}))await pif __name__ == "__main__":asyncio.run(main())
如果您想从协程中获取结果 Worker
,请使用以下方法:
import asyncio
from aiohttp import request
from aiomultiprocess import Workerasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():p = Worker(target=get, args=("https://jreese.sh",))response = await pif __name__ == "__main__":asyncio.run(main())
如果您需要一个托管的工作进程池,请使用 Pool
:
import asyncio
from aiohttp import request
from aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:result = await pool.map(get, urls)if __name__ == "__main__":asyncio.run(main())
示例:
import time
import json
import datetime
import asyncio
import hashlib
from aiomultiprocess import Pool
from redis import *
from pybloom_live import BloomFilter
import aiohttp#
# Public variable
#Bloom_data = BloomFilter(1000000000, 0.01)
DB_get_question = StrictRedis(host='62.234.9.254', port=6480, password='lingmiao2015', db=4
)
pipeline_redis = DB_get_question.pipeline()#
# Public functions
#def md5(data):"""对数据进行MD5加密:param data::return:"""md5_qa = hashlib.md5(data.encode('utf8')).hexdigest()md5_qa = bytes(md5_qa, encoding='utf8')return md5_qaasync def get(data):"""协程函数:param url::return:"""# while True:# print('data:',data)# try:url = ''async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:get_proxy = DB_get_question.spop('IP_PROXY')response = await session.post(url, json=data, timeout=7, proxy={"http": "http://{}".format(get_proxy)})result = await response.text()hjson = json.loads(result)content = hjson['results'][0]['values']['text']# print('data:',data)print('\033[32;1mget_question\033[0m:', content)await asyncio.sleep(0.1)return content# except:# open('error_url.txt','a').write(url + '\n')# await get(data)async def request():"""使用进程加异步协程发送请求:return:"""key_number = 0datas = ['']split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '').replace('b', '')while len(datas) != 0:key_number += 1if len(datas) > 1:async with Pool() as pool:get_proxy = DB_get_question.spop('IP_PROXY')result_list = await pool.map(get, datas)# print(result_list)for result in result_list:if result:# print('key',key)# print('phone', phone)if '请求次数' in result or 'key不对' in result or '请求内容为空' in result:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')breakmd5_qa = md5(result)if md5_qa not in Bloom_data:Bloom_data.add(md5_qa)# pipeline_redis.lpush('total_question_list', result)pipeline_redis.sadd('get_question', result)pipeline_redis.execute()datas.clear()question_number = 0while True:question_number += 1pipeline_redis.spop('original_question_set')if question_number == 100:question_list = pipeline_redis.execute()breakdatas = {}print('datas', datas)print(datas)if key_number == 500:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')key2_set_number = DB_get_question.scard('key2_set')if key2_set_number < 5:with open('key2_total.txt', 'r') as f_key:for key in f_key:key = key.strip()pipeline_redis.sadd('key2_set', key)pipeline_redis.execute()key_number = 0coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
基于 asyncio、aiohttp、xpath 的异步爬虫
参看:https://blog.csdn.net/weixin_34290390/article/details/88772610
# asyncio爬虫, 去重, 入库import asyncio
import re
import aiohttp
import aiomysql
from pyquery import PyQuerystopping = Falsestart_url = 'http://www.jobbole.com'
waiting_urls = []
seen_urls = set() # 实际使用爬虫去重时,数量过多,需要使用布隆过滤器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html): # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool): # 获取文章详情并解析入库html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text() # 为了简单, 只获取title的内容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql) # 插入数据库# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0: # 如果使用asyncio.Queue的话, 不需要我们来处理这些逻辑。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?jobbole.com/\d+/', url):if url not in seen_urls: # 是没有处理过的url,则处理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql连接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset autocommit必须设置, 这是坑, 不写数据库写入不了中文数据async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()
改 进( 生产者 --- 消费者 从 redis 取数据 )
config.py
import os
import syssys.path.append(os.getcwd())
sys.path.append("..")
sys.path.append(os.path.abspath("../../"))DEV_OR_PRD = 'dev'
REDIS_CONFIG = Noneif 'dev' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,'db': 0,'password': None}pass
elif 'prd' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,}pass
produce.py
import redis
import json
import requests
from config import REDIS_CONFIGdef add_task():payload = {}headers = {'Host': 'www.kuwo.cn','csrf': 'E7D1IGX45D','Cookie': 'kw_token=E7D1IGX45D; kw_token=AKH1VOZ2767'}queue_name = 'redis_list:test'redis_conn = redis.StrictRedis(**REDIS_CONFIG)for _ in range(100):for page_num in range(1, 11):url = f"http://www.kuwo.cn/api/www/bang/bang/musicList?bangId=16&pn={page_num}&rn=30"response = requests.request("GET", url, headers=headers, data=payload)try:music_list = response.json()['data']['musicList']except BaseException as e:music_list = []for item in music_list:task = {'crawl_url': item['pic'],'song_name': item['name']}redis_conn.lpush(queue_name, json.dumps(task, ensure_ascii=False))print(f'task:{task}')print(f'page {page_num} end')if __name__ == '__main__':add_task()pass
consumer.py
import re
import asyncio
import aiohttp
import redis
import json
import datetime
from config import REDIS_CONFIGmax_worker = 50
current_worker = 0def write_file(future=None):resp_data = future.result()if not resp_data:returntime_int = int(datetime.datetime.now().timestamp() * 1000)with open(f'./img/{time_int}.jpg', 'wb') as f:f.write(resp_data)async def fetch(url, session):global current_workercurrent_worker += 1try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:# data = await resp.text()data = await resp.read()current_worker -= 1return dataexcept Exception as e:print(e)return Noneasync def consumer(redis_conn=None, queue_name=None):async with aiohttp.ClientSession() as session:while True:task_string = redis_conn.rpop(queue_name)cha = current_worker - max_workerif cha >= 0 or not task_string:print('超过最大worker, 或者任务为空, 睡1秒继续。。。')await asyncio.sleep(1)continuetask_string = task_string.decode('utf-8')task_dict = json.loads(task_string)crawl_url = task_dict['crawl_url']asyncio_task = asyncio.ensure_future(fetch(crawl_url, session))asyncio_task.add_done_callback(write_file)# await asyncio.sleep(0.001)def main():queue_name = 'redis_list:test'redis_conn_pool = redis.ConnectionPool(**REDIS_CONFIG)redis_conn = redis.StrictRedis(connection_pool=redis_conn_pool)event_loop = asyncio.get_event_loop()try:event_loop.run_until_complete(consumer(redis_conn=redis_conn, queue_name=queue_name))except KeyboardInterrupt as e:event_loop.stop()event_loop.run_forever()finally:event_loop.close()redis_conn.close()if __name__ == '__main__':main()
2. asyncio + aiohttp
import aiohttp
import asyncioasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:response = await r.text(encoding="utf-8")# 或者直接await r.read()不编码,直接读取,适合于图像等无法编码文件# data = await r.read()# print(url, data)print(url, r.status)print(url, response)if __name__ == '__main__':tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.chouti.com/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
示例:
参考:深入理解协程(四):async/await异步爬虫实战:https://www.cnblogs.com/ghostlee/p/12208564.html
import asyncio
import time
import aiohttp
from lxml import etreeurls = ['https://blog.csdn.net/Jmilk/article/details/103218919','https://blog.csdn.net/stven_king/article/details/103256724','https://blog.csdn.net/csdnnews/article/details/103154693','https://blog.csdn.net/dg_lee/article/details/103951021','https://blog.csdn.net/m0_37907797/article/details/103272967','https://blog.csdn.net/zzq900503/article/details/49618605','https://blog.csdn.net/weixin_44339238/article/details/103977138','https://blog.csdn.net/dengjin20104042056/article/details/103930275','https://blog.csdn.net/Mind_programmonkey/article/details/103940511','https://blog.csdn.net/xufive/article/details/102993570','https://blog.csdn.net/weixin_41010294/article/details/104009722','https://blog.csdn.net/yunqiinsight/article/details/103137022','https://blog.csdn.net/qq_44210563/article/details/102826406',
]async def async_get_url(url):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36'}async with aiohttp.ClientSession() as session: # 解释1async with session.get(url, headers=headers) as r:html = await r.read()try:title = etree.HTML(html).xpath('//h1[@class="title-article"]/text()')[0]print(title)except IndexError:print(f'Fail URL: {r.url}')def async_main():loop = asyncio.get_event_loop()tasks = [async_get_url(url) for url in urls]loop.run_until_complete(asyncio.wait(tasks))# loop.close()if __name__ == '__main__':start = time.time()async_main()print(f'cost time: {time.time() - start}s')
运行结果:
Fail URL: https://blog.csdn.net/weixin_44339238/article/details/103977138
网页实现一个简单的音乐播放器(大佬别看。(⊙﹏⊙))
AES中ECB模式的加密与解密(Python3.7)
【程序人生】程序员接私活常用平台汇总
OOM别慌,手把手教你定位
致 Python 初学者
【图解算法面试】记一次面试:说说游戏中的敏感词过滤是如何实现的?
8年经验面试官详解 Java 面试秘诀
4G EPS 的网络协议栈
【历史总结】Android-Universal-Image-Loader源码分析
你不得不了解的卷积神经网络发展史
java进阶(四)------java编程规范---代码质量检测工具FindBugs、PMD和CheckStyle的安装
中国数据库OceanBase登顶之路
cost time: 0.5409884452819824s
解释1:此处为异步的上下文管理器,是aiohttp
官方文档提供的写法。如果对上下文管理器不是很了解的话,可以参看【吃透Python上下文管理器】。
用时:0.5409884452819824s。从两种爬虫的输出结果中可以看到:
- 文章标题的顺序不同。同步爬虫会按照
urls
内部的url顺序依次爬取文章标题。而异步爬虫爬取的顺序并不完全和urls
中的url顺序相同。 - 爬取速度差异很大。异步爬虫速度大概是普通同步爬虫的8~10倍。异步爬虫充分利用了网络请求这段时间。从而提高了爬取效率。
3. asyncio + requests
import asyncio
import requestsasync def fetch_async(func, *args, **kwargs):inner_loop = asyncio.get_event_loop()future = inner_loop.run_in_executor(None, func, *args)response = await futureprint(response.url, response.content)if __name__ == '__main__':tasks = [fetch_async(requests.get, 'https://www.cnblogs.com/wupeiqi/'),fetch_async(requests.get, 'https://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()
示例:
import functools # at the top with the other imports
import asyncio
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)async def fetch_async(lp, func, *args, **kwargs):# inner_loop = asyncio.get_event_loop()# future = inner_loop.run_in_executor(None, func, *args)print(f'*args: {args}')future = lp.run_in_executor(None, functools.partial(func, *args, **kwargs))response = await futureprint(response.url, response.content)if __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [fetch_async(loop, requests.get, 'https://www.cnblogs.com/wupeiqi/', verify=False),fetch_async(loop, requests.get, 'https://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')]results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()
示例:
task 是 协程 和 Future 的 桥梁。
import requests
import asyncio
from concurrent import futures'''
使用多线程:在协程中集成阻塞io原型 :awaitable loop.run_in_executor(executor, func, *args)
参数 : executor 可以是 ThreadPoolExecutor / ProcessPool , 如果是None 则使用默认线程池
可使用 yield from 或 await 挂起函数
作用 : 例如在异步事件循环中的写文件操作(或者其他IO操作), 这种慢的操作交给线程去做
'''def get_url(r_url=None):r = requests.get(url=r_url, verify=False)print(r.json())if __name__ == "__main__":loop = asyncio.get_event_loop()executor = futures.ThreadPoolExecutor(3)tasks = []for index in range(20):url = f"http://shop.projectsedu.com/goods/{index}/"task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))
示例:
import asyncio
from concurrent import futuresdef block_func():with open("c:/test.txt", 'rb') as fd:return fd.read(500)async def todo(lp: asyncio.AbstractEventLoop):reader = await lp.run_in_executor(None, block_func) # 默认线程池print("reader:", reader)with futures.ThreadPoolExecutor() as ex:reader = await lp.run_in_executor(ex, block_func) # 自己创建一个线程池让事件循环调用print("reader :", reader)loop = asyncio.get_event_loop()
loop.run_until_complete(todo(loop))
4. gevent + requests
from gevent import monkey
monkey.patch_all()import gevent
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_async(method, url, **req_kwargs):print(method, url, req_kwargs)response = requests.request(method=method, url=url, **req_kwargs)# print(f'{response.url}, {response.content}')print(f'{response.url}, {response.status_code}')def test_1():# ##### 发送请求 #####gevent.joinall([gevent.spawn(fetch_async, method='get', url='https://www.python.org/', verify=False),gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', verify=False),gevent.spawn(fetch_async, method='get', url='https://github.com/', verify=False),])def test_2():# #### 发送请求(协程池控制最大协程数量) #####from gevent.pool import Poolpool = Pool(None)gevent.joinall([pool.spawn(fetch_async, method='get', url='https://www.python.org/', verify=False),pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', verify=False),pool.spawn(fetch_async, method='get', url='https://www.github.com/', verify=False),])if __name__ == '__main__':# test_1()test_2()
5. grequests
import grequestsrequest_list = [grequests.get('http://httpbin.org/delay/1', timeout=0.001),grequests.get('http://fakedomain/'),grequests.get('http://httpbin.org/status/500')
]# ##### 执行并获取响应列表 #####
# response_list = grequests.map(request_list)
# print(response_list)# ##### 执行并获取响应列表(处理异常) #####
def exception_handler(request, exception):print(request, exception)print("Request failed")response_list = grequests.map(request_list, exception_handler=exception_handler)
print(response_list)
6. Twisted 示例
twisted学习笔记No.3 Web Clients:https://www.cnblogs.com/tracylining/p/3353808.html
from twisted.web.client import Agent
from twisted.web.client import defer
from twisted.internet import reactordef all_done(arg):reactor.stop()def callback(contents):print(contents)deferred_list = []url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:agent = Agent(reactor)d = agent.request(b'GET', url.encode('utf-8'))# d.addCallbacks(printResource, printError)d.addCallback(callback)deferred_list.append(d)d_list = defer.DeferredList(deferred_list)
d_list.addBoth(all_done)
reactor.run()
7. Tornado
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop
from tornado.httpclient import HTTPResponsedef handle_response(response: HTTPResponse):"""处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop():param response::return:"""if response.error:print("Error:", response.error)else:# print(response.body)print(f'{response.effective_url} : status_code : {response.code}')def func():url_list = ['https://www.baidu.com','https://www.bing.com',]for url in url_list:print(url)http_client = AsyncHTTPClient()http_client.fetch(HTTPRequest(url), handle_response)ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()
8. Twisted 更多
from twisted.internet import reactor
from twisted.web.client import getPage
from urllib import parsedef one_done(arg: bytes):print(arg.decode('utf-8'))reactor.stop()post_data = parse.urlencode({'check_data': 'adf'})
post_data = bytes(post_data, encoding='utf8')
headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),method=bytes('POST', encoding='utf8'), postdata=post_data, cookies={}, headers=headers
)
response.addBoth(one_done)
reactor.run()
以上均是 Python 内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】.
9.史上最牛逼的异步IO模块( select、poll、epoll )
select
import select
import socket
import timeclass AsyncTimeoutException(TimeoutError):"""请求超时异常类"""def __init__(self, msg):self.msg = msgsuper(AsyncTimeoutException, self).__init__(msg)class HttpContext(object):"""封装请求和相应的基本数据"""def __init__(self, sock, host, port, method, url, data, callback, timeout=5):"""sock: 请求的客户端socket对象host: 请求的主机名port: 请求的端口port: 请求的端口method: 请求方式url: 请求的URLdata: 请求时请求体中的数据callback: 请求完成后的回调函数timeout: 请求的超时时间"""self.sock = sockself.callback = callbackself.host = hostself.port = portself.method = methodself.url = urlself.data = dataself.timeout = timeoutself.__start_time = time.time()self.__buffer = []def is_timeout(self):"""当前请求是否已经超时"""current_time = time.time()if (self.__start_time + self.timeout) < current_time:return Truedef fileno(self):"""请求sockect对象的文件描述符,用于select监听"""return self.sock.fileno()def write(self, data):"""在buffer中写入响应内容"""self.__buffer.append(data)def finish(self, exc=None):"""在buffer中写入响应内容完成,执行请求的回调函数"""if not exc:response = b''.join(self.__buffer)self.callback(self, response, exc)else:self.callback(self, None, exc)def send_request_data(self):content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (self.method.upper(), self.url, self.host, self.data,)return content.encode(encoding='utf8')class AsyncRequest(object):def __init__(self):self.fds = []self.connections = []def add_request(self, host, port, method, url, data, callback, timeout):"""创建一个要请求"""client = socket.socket()client.setblocking(False)try:client.connect((host, port))except BlockingIOError as e:pass# print('已经向远程发送连接的请求')req = HttpContext(client, host, port, method, url, data, callback, timeout)self.connections.append(req)self.fds.append(req)def check_conn_timeout(self):"""检查所有的请求,是否有已经连接超时,如果有则终止"""timeout_list = []for context in self.connections:if context.is_timeout():timeout_list.append(context)for context in timeout_list:context.finish(AsyncTimeoutException('请求超时'))self.fds.remove(context)self.connections.remove(context)def running(self):"""事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""while True:r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)if not self.fds:returnfor context in r:sock = context.sockwhile True:try:data = sock.recv(8096)if not data:self.fds.remove(context)context.finish()breakelse:context.write(data)except BlockingIOError as e:breakexcept TimeoutError as e:self.fds.remove(context)self.connections.remove(context)context.finish(e)breakfor context in w:# 已经连接成功远程服务器,开始向远程发送请求数据if context in self.fds:data = context.send_request_data()context.sock.sendall(data)self.connections.remove(context)self.check_conn_timeout()if __name__ == '__main__':def callback_func(context, response, ex):""":param context: HttpContext对象,内部封装了请求相关信息:param response: 请求响应内容:param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None):return:"""print(context, response, ex)obj = AsyncRequest()url_list = [{'host': 'www.google.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': 'www.baidu.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': 'www.bing.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},]for item in url_list:print(item)obj.add_request(**item)obj.running()
示例 2:
import socket
import selectclass HttpRequest:def __init__(self, sk, host, callback):self.socket = skself.host = hostself.callback = callbackdef fileno(self):return self.socket.fileno()class AsyncRequest:def __init__(self):self.conn = []self.connection = [] # 用于检测是否已经连接成功def add_request(self, host, callback):try:sk = socket.socket()sk.setblocking(0)sk.connect((host, 80,))except BlockingIOError as e:passrequest = HttpRequest(sk, host, callback)self.conn.append(request)self.connection.append(request)def run(self):while True:rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)for w in wlist:print(w.host, '连接成功...')# 只要能循环到,表示socket和服务器端已经连接成功tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" % (w.host,)w.socket.send(bytes(tpl, encoding='utf-8'))self.connection.remove(w)for r in rlist:# r,是HttpRequestrecv_data = bytes()while True:try:chunck = r.socket.recv(8096)recv_data += chunckexcept Exception as e:breakr.callback(recv_data)r.socket.close()self.conn.remove(r)if len(self.conn) == 0:breakdef f1(data):print('保存到文件', data)def f2(data):print('保存到数据库', data)url_list = [{'host': 'www.baidu.com', 'callback': f1},{'host': 'cn.bing.com', 'callback': f2},{'host': 'www.cnblogs.com', 'callback': f2},
]req = AsyncRequest()
for item in url_list:req.add_request(item['host'], item['callback'])req.run()