协程asyncio_Asyncio深入浅出

Asyncio是一个异步编程的框架,可以解决异步编程,协程调度问题,线程问题,是整个异步IO的解决方案。

在学习asyncio之前,我们先来理清楚同步/异步的概念:

同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行 一直等待

异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。

异步IO采用消息循环的模式,重复“读取消息—处理消息”的过程,也就是说异步IO模型”需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程。

  • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • async/await 关键字: 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。(async和await这两个关键词是在python3.5开始正式提出定义,asyncio是python解决异步io编程的一个完整框架。关于定义和原理请参考官方文档: 协程与任务, 如何理解await)

其中协程编程离不开的三大要点:

  • 事件循环
  • 回调
  • epoll/select(IO多路复用)

以下内容有删减的摘自 :

Asyncio并发编程​www.langzi.fun
52eb1d4c002a9ad682c10eb857457323.png

事件循环

简单案例(访问一个网站)

async def get_url_title(url):
# 使用关键词async定义一个协程print('开始访问网站:{}'.format(url))await asyncio.sleep(2)# 这一步至关重要# asyncio.sleep(2) 功能:异步非阻塞等待2s,作用是模拟访问网站消耗的时间# await 的作用类似 yield,即这个时候把线程资源控制权交出去,监听这个描述符直到这个任务完成# await 后面只能接三种类型'''1. 协程:Python 协程属于 可等待 对象,因此可以在其他协程中被等待:2. 任务:任务 被用来设置日程以便 并发 执行协程。(当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务,该协程将自动排入日程准备立即运行)3. Future 对象:Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。(当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。)如果await time.sleep(2) 是会报错的'''print('网站访问成功')if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 一行代码创造事件循环loop.run_until_complete(get_url_title('http://www.langzi.fun'))# 这是一个阻塞的方法,可以理解成多线程中的join方法# 直到get_url_title('http://www.langzi.fun')完成后,才会继续执行下面的代码end_time = time.time()print('消耗时间:{}'.format(end_time-start_time))

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
消耗时间:2.0018768310546875

简单案例(访问多个网站)

协程的优势是多任务协作,单任务访问网站没法发挥出他的功能,一次性访问多个网站或者一次性等待多个IO响应时间才能发挥它的优势。

# -*- coding:utf-8 -*-
import asyncio
import timeasync def get_url_title(url):print('开始访问网站:{}'.format(url))await asyncio.sleep(2)print('网站访问成功')if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 创造一个事件循环tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]# 这个列表代表总任务量,即执行10次get_url_title()函数loop.run_until_complete(asyncio.wait(tasks))# asyncio.wait后面接上非空可迭代对象,一般来说是功能函数列表# 功能是一次性提交多个任务,等待完成# loop.run_until_complete(asyncio.gather(*tasks))# 和上面代码功能一致,但是gather更加高级,如果是列表就需要加上*# 这里会等到全部的任务执行完后才会执行后面的代码end_time = time.time()print('消耗时间:{}'.format(end_time-start_time))

对一个网站发起10次请求,返回结果:

开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
消耗时间:2.0015649795532227

gather与wait的区别:

  • gather更擅长于将函数聚合在一起
  • wait更擅长筛选运行状况

即gather更加高级,他可以将任务分组,也可以取消任务

import asyncioasync def get_url_title(url):print('开始访问网站:{}'.format(url))await asyncio.sleep(2)print('网站访问成功')return 'success'if __name__ == '__main__':loop = asyncio.get_event_loop()# 使用wait方法# tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]# loop.run_until_complete(asyncio.wait(tasks))# 使用gather方法实现分组导入(方法1)group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]group2 = [get_url_title('http://www.baidu.com')for i in range(5)]loop.run_until_complete(asyncio.gather(*group1,*group2))# 这种方法会把两个全部一次性导入# 使用gather方法实现分组导入(方法2)group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]group2 = [get_url_title('http://www.baidu.com')for i in range(5)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)#group2.cancel() 取消group2任务loop.run_until_complete(asyncio.gather(group1,group2))# 这种方法会先把group1导入,然后导入group2

返回结果:

开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.baidu.com
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
开始访问网站:http://www.baidu.com
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功

另外一种使用gather获取返回结果:

import asyncioasync def get_url_title(url):print('开始访问网站:{}'.format(url))await asyncio.sleep(2)print('网站访问成功')return 'success'if __name__ == '__main__':loop = asyncio.get_event_loop()# 使用gather方法传递任务获取结果group1 = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))loop.run_until_complete(asyncio.gather(group1))# 如果不是列表就不需要加*print(group1.result())

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
success

还有一些复杂的区别转移到python 异步协程中查看

协程的调用和组合十分灵活,尤其是对于结果的处理,如何返回,如何挂起,需要逐渐积累经验和前瞻的设计。

简单案例(获取返回值)

# -*- coding:utf-8 -*-
import asyncio
import timeasync def get_url_title(url):print('开始访问网站:{}'.format(url))await asyncio.sleep(2)print('网站访问成功')return 'success'if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 创建一个事件循环get_future = loop.create_task(get_url_title('http://www.langzi.fun'))#get_future = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))# 这两行代码功能用法一模一样loop.run_until_complete(get_future)print('获取结果:{}'.format(get_future.result()))# 获取结果end_time = time.time()print('消耗时间:{}'.format(end_time-start_time))

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
获取结果:success
消耗时间:2.0019724369049072

如果是多个网址传入,访问多个网址的返回值呢?只需要把前面的知识点汇总一起即可使用:

if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 创建一个事件循环tasks = [loop.create_task(get_url_title('http://www.langzi.fun')) for i in range(10)]# 把所有要返回的函数加载到一个列表loop.run_until_complete(asyncio.wait(tasks))# 这里和上面用法一样print('获取结果:{}'.format([x.result() for x in tasks]))# 因为结果都在一个列表,在列表中取值即可end_time = time.time()print('消耗时间:{}'.format(end_time-start_time))

返回结果:

开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
开始访问网站:http://www.langzi.fun
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
网站访问成功
获取结果:['success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success']
消耗时间:2.0016491413116455

简单案例(回调函数)

上面的例子是一个协程函数,当这个协程函数的await xxx执行完毕后,想要执行另一个函数后,然后再返回这个协程函数的返回结果该这么做:

# -*- coding:utf-8 -*-
import asyncio
from functools import partial
# partial的功能是 固定函数参数,返回一个新的函数。你可以这么理解:
'''
from functools import partialdef go(x,y):return x+yg = partial(go,y=2)print(g(1))
返回结果:3g = partial(go,x=5,y=2)print(g())
返回结果:7'''
async def get_url_title(url):print('开始访问网站:{}'.format(url))await asyncio.sleep(2)print('网站访问成功')# 当这个协程函数快要结束返回值的时候,会调用下面的call_back函数# 等待call_back函数执行完毕后,才返回这个协程函数的值return 'success'def call_back(future,url):# 注意这里必须要传递future参数,因为这里的future即代表下面的get_future对象print('检测网址:{}状态正常'.format(url))if __name__ == '__main__':loop = asyncio.get_event_loop()# 创建一个事件循环get_future = loop.create_task(get_url_title('http://www.langzi.fun'))# 将一个任务注册到loop事件循环中get_future.add_done_callback(partial(call_back,url = 'http://www.langzi.fun'))# 这里是设置,当上面的任务完成要返回结果的时候,执行call_back函数# 注意call_back函数不能加上(),也就意味着你只能依靠partial方法进行传递参数loop.run_until_complete(get_future)# 等待任务完成print('获取结果:{}'.format(get_future.result()))# 获取结果

返回结果:

开始访问网站:http://www.langzi.fun
网站访问成功
检测网址:http://www.langzi.fun状态正常
获取结果:success

梳理

  1. 协程函数必须要使用关键词async定义
  2. 如果遇到了要等待的对象,必须要使用await
  3. 使用await后面的任务,必须是可等待对象(三种主要类型: 协程, 任务 和 Future.)
  4. 运行前,必须要创建一个事件循环(loop = asyncio.get_event_loop(),一行代码即可)
  5. 然后把任务加载到该事件循环中即可
  6. 如果需要获取协程函数的返回值,需要使用loop.create_task()或asyncio.ensure_future()函数,在最后使用.result()获取返回结果。
  7. 如果想要把多个任务注册到loop中,需要使用一个列表包含他们,调用的时候使用asyncio.wait(list)

取消协程任务

存在多个任务协程,想使用ctrl c退出协程,使用例子讲解:

import asyncio
async def get_time_sleep(t):print('开始运行,等待:{}s'.format(t))await asyncio.sleep(t)print('运行结束')if __name__ == '__main__':loop = asyncio.get_event_loop()# 创建一个事件循环task_1 = get_time_sleep(1)task_2 = get_time_sleep(2)task_3 = get_time_sleep(3)tasks = [task_1,task_2,task_3]# 三个协程任务加载到一个列表try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt:# 当检测到键盘输入 ctrl c的时候all_tasks = asyncio.Task.all_tasks()# 获取注册到loop下的所有taskfor task in all_tasks:print('开始取消协程')task.cancel()# 取消该协程,如果取消成功则返回Trueloop.stop()# 停止循环loop.run_forever()# loop事件循环一直运行# 这两步必须要做finally:loop.close()# 关闭事件循环

run_forever 会一直运行,直到 stop 被调用,但是你不能像下面这样调 stop

loop.run_forever()
loop.stop()

run_forever 不返回,stop 永远也不会被调用。所以,只能在协程中调 stop:

async def do_some_work(loop, x):print('Waiting ' + str(x))await asyncio.sleep(x)print('Done')loop.stop()

这样并非没有问题,假如有多个协程在 loop 里运行:

asyncio.ensure_future(do_some_work(loop, 1))
asyncio.ensure_future(do_some_work(loop, 3))loop.run_forever()

第二个协程没结束,loop 就停止了——被先结束的那个协程给停掉的。
要解决这个问题,可以用 gather 把多个协程合并成一个 future,并添加回调,然后在回调里再去停止 loop。

async def do_some_work(loop, x):print('Waiting ' + str(x))await asyncio.sleep(x)print('Done')def done_callback(loop, futu):loop.stop()loop = asyncio.get_event_loop()futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))
futus.add_done_callback(functools.partial(done_callback, loop))loop.run_forever()

其实这基本上就是 run_until_complete 的实现了,run_until_complete 在内部也是调用 run_forever。

关于loop.close(),简单来说,loop 只要不关闭,就还可以再运行。

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

但是如果关闭了,就不能再运行了:

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))  # 此处异常

梳理

  1. 通过gather()启动的协程任务,是可以直接取消的,并且还能获取取消是否成功
  2. 可以通过 asyncio.Task.all_tasks()获取所有的协程任务
  3. 如果使用run_forever()的话会一直运行,只能通过loop.stop()停止

协程相互嵌套

import asyncio
async def sum_tion(x,y):print('开始执行传入参数相加:{} + {}'.format(x,y))await asyncio.sleep(1)# 模拟等待1Sreturn (x+y)async def print_sum(x,y):result = await sum_tion(x,y)print(result)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(print_sum(1000,2000))loop.close()

返回结果:

开始执行传入参数相加:1000 + 2000
3000

执行流程:

  1. run_until_complete运行,会注册task(协程:print_sum)并开启事件循环
  2. print_sum协程中嵌套了子协程,此时print_sum协程暂停(类似委托生成器),转到子协程(协程:sum_tion)中运行代码,期间子协程需sleep1秒钟,直接将结果反馈到event loop中,即将控制权转回调用方,而中间的print_sum暂停不操作
  3. 1秒后,调用方将控制权给到子协程(调用方与子协程直接通信),子协程执行接下来的代码,直到再遇到wait(此实例没有)
  4. 最后执行到return语句,子协程向上级协程(print_sum抛出异常:StopIteration),同时将return返回的值返回给上级协程(print_sum中的result接收值),print_sum继续执行暂时时后续的代码,直到遇到return语句
  5. 向 event loop 抛出StopIteration异常,此时协程任务都已经执行完毕,事件循环执行完成(event loop :the loop is stopped),close事件循环。

如果想要获取协程嵌套函数返回的值,就必须使用回调:

import asyncio
async def sum_tion(x,y)->int:print('开始执行传入参数相加:{} + {}'.format(x,y))await asyncio.sleep(1)# 模拟等待1Sreturn (x+y)async def print_sum(x,y):result = await sum_tion(x,y)return resultdef callback(future):return future.result()if __name__ == '__main__':loop = asyncio.get_event_loop()future = loop.create_task(print_sum(100,200))# 如果想要获取嵌套协程返回的值,就必须使用回调future.add_done_callback(callback)loop.run_until_complete(future)print(future.result())loop.close()

返回结果:

开始执行传入参数相加:100 + 200
300

定时启动任务

asyncio提供定时启动协程任务,通过call_soon,call_later,call_at实现,他们的区别如下:

call_soon

call_soon是立即执行

def callback(sleep_times):print("预计消耗时间 {} s".format(sleep_times))
def stoploop(loop):print('时间消耗完毕')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()# 创建一个事件循环loop.call_soon(callback,5)# 立即启动callback函数loop.call_soon(stoploop,loop)# 上面执行完毕后,立即启动执行stoploop函数loop.run_forever()#要用这个run_forever运行,因为没有传入协程print('总共耗时:{}'.format(time.time()-start_time))

返回结果:

预计消耗时间 5 s
时间消耗完毕
总共耗时:0.0010013580322265625

call_later

call_later是设置一定时间启动执行

def callback(sleep_times):print("预计消耗时间 {} s".format(sleep_times))
def stoploop(loop):print('时间消耗完毕')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()loop.call_later(1,callback,1.0)# 等待1秒后执行callback函数,传入参数是1.0loop.call_later(5,stoploop,loop)# 等待5秒后执行stoploop函数,传入参数是looploop.run_forever()print('总共耗时:{}'.format(time.time()-start_time))

返回结果:

预计消耗时间 1.0 s
时间消耗完毕
总共耗时:5.002613544464111

call_at

call_at类似与call_later,但是他指定的时间不再是传统意义上的时间,而是loop的内部时钟时间,效果和call_later一样, call_later内部其实调用了call_later

import time
import asynciodef callback(loop):print("传入loop.time()时间为: {} s".format(loop.time()))
def stoploop(loop):print('时间消耗完毕')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()now = loop.time()# loop内部的时钟时间loop.call_at(now+1,callback,loop)# 等待loop内部时钟时间加上1s后,执行callba函数,传入参数为looploop.call_at(now+3,callback,loop)# 等待loop内部时钟时间加上3s后,执行callba函数,传入参数为looploop.call_at(now+5,stoploop,loop)# 等待loop内部时钟时间加上1s后,执行stoploop函数,传入参数为loop

返回结果:

传入loop.time()时间为: 3989.39 s
传入loop.time()时间为: 3991.39 s
时间消耗完毕
总共耗时:5.002060174942017

call_soon_threadsafe 线程安全的call_soon

call_soon_threadsafe用法和call_soon一致。但在涉及多线程时, 会使用它.

梳理

  1. call_soon直接启动
  2. call_later自己定时启动
  3. call_at根据loop.time()内部的时间,设置等待时间启动
  4. call_soon_threadsafe和call_soon方法一致,是保证线程安全的
  5. 他们都是比较底层的,在正常使用时很少用到。

结合线程池

Asyncio是异步IO编程的解决方案,异步IO是包括多线程,多进程,和协程的。所以asyncio是可以完成多线程多进程和协程的,在开头说到,协程是单线程的,如果遇到阻塞的话,会阻塞所有的代码任务,所以是不能加入阻塞IO的,但是比如requests库是阻塞的,socket如果不设置setblocking(false)的话,也是阻塞的,这个时候可以放到一个线程中去做也是可以解决的,即在协程中集成阻塞IO,就加入多线程一起解决问题。

用requests完成异步编程(使用线程池)

from concurrent.futures import ThreadPoolExecutor
import requests
import asyncio
import time
import redef get_url_title(url):# 功能是获取网址的标题r = requests.get(url)try:title = re.search('<title>(.*?)</title>',r.content.decode(),re.S|re.I).group(1)except Exception as e:title = eprint(title)if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 创建一个事件循环p = ThreadPoolExecutor(5)# 创建一个线程池,开启5个线程tasks = [loop.run_in_executor(p,get_url_title,'http://www.langzi.fun')for i in range(10)]# 这一步很重要,使用loop.run_in_executor()函数:内部接受的是阻塞的线程池,执行的函数,传入的参数# 即对网站访问10次,使用线程池访问loop.run_until_complete(asyncio.wait(tasks))# 等待所有的任务完成print(time.time()-start_time)

返回结果:

Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
5.589502334594727

访问10次消耗时间为5.5s,尝试将 p = ThreadPoolExecutor(10),线程数量设置成10个线程,消耗时间为4.6s,改用从进程池p = ProcessPoolExecutor(10),也是一样可以运行的,不过10个进程消耗时间也是5.5s,并且消耗更多的CPU资源。

### 用socket完成异步编程(使用线程池)

import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
import time
import redef get_url(url):# 通过socket请求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = '/'# 建立socket连接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.connect((host, 80))client.send("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode('utf8'))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode('utf8')html_data = data.split('rnrn')[1]# 把请求头信息去掉, 只要网页内容title = re.search('<title>(.*?)</title>',html_data,re.S|re.I).group(1)print(title)client.close()if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()p = ThreadPoolExecutor(3)  # 线程池 放3个线程tasks = [loop.run_in_executor(p,get_url,'http://www.langzi.fun') for i in range(10)]loop.run_until_complete(asyncio.wait(tasks))print('last time:{}'.format(time.time() - start_time))

返回结果:

Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 
last time:5.132313966751099

使用socket完成http请求(未使用线程池)

import asyncio
from urllib.parse import urlparse
import timeasync def get_url(url):# 通过socket请求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = '/'# 建立socket连接reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接writer.write("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode('utf8'))all_lines = []async for raw_line in reader:  # __aiter__ __anext__魔法方法line = raw_line.decode('utf8')all_lines.append(line)html = 'n'.join(all_lines)return htmlasync def main():tasks = []tasks = [asyncio.ensure_future(get_url('http://www.langzi.fun')) for i in range(10)]for task in asyncio.as_completed(tasks):  # 完成一个 print一个result = await taskprint(result)if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()loop.run_until_complete(main())print('last time:{}'.format(time.time() - start_time))

asyncio协程和之前讲解的select事件循环原理是一样的

梳理

  1. 协程中遇到必须要使用阻塞任务的时候,可以把阻塞代码放到线程池中运行
  2. 线程池中的代码放到loop.run_in_executor()里面,并且所有任务保存到列表
  3. 最后通过loop.run_until_complate(asyncio.wait(任务列表))中运行
  4. asyncio能通过socket实现与服务端建立连接

与多进程的结合

既然异步协程和多进程对网络请求都有提升,那么为什么不把二者结合起来呢?在最新的 PyCon 2018 上,来自 Facebook 的 John Reese 介绍了 asyncio 和 multiprocessing 各自的特点,并开发了一个新的库,叫做 aiomultiprocess

这个库的安装方式是:

pip3 install aiomultiprocess

需要 Python 3.6 及更高版本才可使用。

使用这个库,我们可以将上面的例子改写如下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Poolstart = time.time()async def get(url):session = aiohttp.ClientSession()response = await session.get(url)result = await response.text()session.close()return resultasync def request():url = 'http://127.0.0.1:5000'urls = [url for _ in range(100)]async with Pool() as pool:result = await pool.map(get, urls)return resultcoroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)end = time.time()
print('Cost time:', end - start)

这样就会同时使用多进程和异步协程进行请求,但在真实情况下,我们在做爬取的时候遇到的情况千变万化,一方面我们使用异步协程来防止阻塞,另一方面我们使用 multiprocessing 来利用多核成倍加速,节省时间其实还是非常可观的。

同步与通信

和多线程多进程任务一样,协程也可以实现和需要进行同步与通信。

简单例子(顺序启动多任务)

协程是单线程的,他的执行依赖于事件循环中最后的loop.run_until_complate()

import asyncionum = 0async def add():global numfor i in range(10):await asyncio.sleep(0.1)num += i
async def desc():global numfor i in range(10):await asyncio.sleep(0.2)num -= iif __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [add(),desc()]loop.run_until_complete(asyncio.wait(tasks))# 这里执行顺序是先执行add函数,然后执行desc函数# 所以最后的结果是0loop.close()print(num)

返回结果:

0

这里使用一个共有变量,协程下不需要加锁。

简单例子(Lock(锁))

# -*- coding:utf-8 -*-
import asyncio
import functoolsdef unlock(lock):print('线程锁释放成功')lock.release()async def test(locker, lock):print(f'{locker} 等待线程锁释放')# ---------------------------------# with await lock:#     print(f'{locker} 线程锁上锁')# ---------------------------------# 上面这两行代码等同于:# ---------------------------------# await lock.acquire()# print(f'{locker} 线程锁上锁')# lock.release()# ---------------------------------await lock.acquire()print(f'{locker} 线程锁上锁')lock.release()print(f'{locker} 线程锁释放')async def main(loop):lock = asyncio.Lock()await lock.acquire()loop.call_later(0.5, functools.partial(unlock, lock))# call_later() 表达推迟一段时间的回调, 第一个参数是以秒为单位的延迟, 第二个参数是回调函数await asyncio.wait([test('任务 1 ', lock), test('任务 2', lock)])if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()

返回结果:

任务 1  等待线程锁释放
任务 2 等待线程锁释放
线程锁释放成功
任务 1  线程锁上锁
任务 1  线程锁释放
任务 2 线程锁上锁
任务 2 线程锁释放

简单例子(Semaphore(信号量))

可以使用 Semaphore(信号量) 来控制并发访问的数量:

import asyncio
from aiohttp import ClientSessionasync def fetch(sem,url):async with sem:# 最大访问数async with ClientSession() as session:async with session.get(url) as response:status = response.statusres = await response.text()print("{}:{} ".format(response.url, status))return resif __name__ == '__main__':loop = asyncio.get_event_loop()url = "http://www.langzi.fun"sem = asyncio.Semaphore(1000)# 设置最大并发数为1000tasks = [loop.create_task(fetch(sem,url))for i in range(100)]# 对网站访问100次loop.run_until_complete(asyncio.wait(tasks))

简单例子(Condition(条件))

import asyncioasync def consumer(cond, name, second):# 消费者函数await asyncio.sleep(second)# 等待延迟with await cond:await cond.wait()print('{}: 得到响应'.format(name))async def producer(cond):await asyncio.sleep(2)for n in range(1, 3):with await cond:print('生产者 {} 号'.format(n))cond.notify(n=n) # 挨个通知单个消费者await asyncio.sleep(0.1)async def producer2(cond):await asyncio.sleep(2)with await cond:print('释放信号量,通知所有消费者')cond.notify_all()# 一次性通知全部的消费者async def main(loop):condition = asyncio.Condition()# 设置信号量task = loop.create_task(producer(condition))# producer 和 producer2 是两个协程, 不能使用 call_later(), 需要用到 create_task() 把它们创建成一个 taskconsumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()print('---分割线---')task = loop.create_task(producer2(condition))consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()# 取消任务if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()

返回结果:

生产者 1 号
c1: 得到响应
生产者 2 号
c2: 得到响应
---分割线---
释放信号量,通知所有消费者
c1: 得到响应
c2: 得到响应

简单例子(Event(事件))

与 Lock(锁) 不同的是, 事件被触发的时候, 两个消费者不用获取锁, 就要尽快地执行下去了

import asyncio
import functoolsdef set_event(event):print('开始设置事件')event.set()async def test(name, event):print('{} 的事件未设置'.format(name))await event.wait()print('{} 的事件已设置'.format(name))async def main(loop):event = asyncio.Event()# 声明事件print('事件是否设置: {}'.format(event.is_set()))loop.call_later(0.1, functools.partial(set_event, event))# 在0.1s后执行set_event()函数,对事件进行设置await asyncio.wait([test('e1', event), test('e2', event)])print('最终事件状态: {}'.format(event.is_set()))if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()

返回结果:

事件是否设置: False
e1 的事件未设置
e2 的事件未设置
开始设置事件
e1 的事件已设置
e2 的事件已设置
最终事件状态: True

简单例子(协程间通信)

协程是单线程,因此使用list、dict就可以实现通信,而不会有线程安全问题,当然可以使用asyncio.Queue

from asyncio import Queue
queue = Queue(maxsize=3)   
# queue的put和get需要用await

举个例子:

import asyncio
from asyncio import Queue
import random
import string
q = Queue(maxsize=100)async def add():while 1:await q.put(random.choice(string.ascii_letters))async def desc():while 1:res = await q.get()print(res)await asyncio.sleep(1)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait([add(),desc()]))loop.run_forever()

返回结果:

D
b
S
x
...

加速asyncio

uvloop,这个使用库可以有效的加速asyncio,本库基于libuv,也就是nodejs用的那个库。使用它也非常方便,不过目前不支持windows

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

没错就是2行代码,就可以提速asyncio。

tokio同样可以做异步资源循环

import tokio
asyncio.set_event_loop_policy(tokio.EventLoopPolicy())

参考:

python异步编程之asyncio(百万并发) - 三只松鼠 - 博客园​www.cnblogs.comAsyncio并发编程​www.langzi.fun
52eb1d4c002a9ad682c10eb857457323.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/471746.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python 迭代器、生成器、yield、iter

文章目录1. 迭代器2. 生成器3. 标准库3.1 过滤3.2 映射3.3 合并3.4 排列组合3.5 重新排列4. yield from5. 可迭代的归约函数6. iter 还可以传入2个参数7. 生成器当成协程learn from 《流畅的python》 1. 迭代器 所有生成器都是迭代器&#xff0c;因为生成器完全实现了迭代器接…

java微信学习 接入

现在实习的公司要做微信开发&#xff0c;然而一直没安排任务&#xff0c;所以一直在看微信接口&#xff0c;记录下学习的内容 微信开发肯定要看的就是微信公众平台开发者文档&#xff0c;上面有每种接口的调用格式&#xff0c;刚开始学习的时候自己申请了一个订阅号&#xff0c…

LeetCode 1976. 到达目的地的方案数(迪杰斯特拉 Python 优先队列)

文章目录1. 题目2. 解题1. 题目 你在一个城市里&#xff0c;城市由 n 个路口组成&#xff0c;路口编号为 0 到 n - 1 &#xff0c;某些路口之间有 双向 道路。 输入保证你可以从任意路口出发到达其他任意路口&#xff0c;且任意两个路口之间最多有一条路。 给你一个整数 n 和…

c++ sendmessage 鼠标 坐标是相对自身吗_【科普】你真的足够了解五轴加工吗?看完豁然开朗!...

近年来五轴联动数控加工中心在各领域得到了越来越广泛的应用。在实际应用中&#xff0c;每当人们碰见异形复杂零件高效、高质量加工难题时&#xff0c;五轴联动技术无疑是解决这类问题的重要手段。越来越多的厂家倾向于寻找五轴设备来满足高效率、高质量的加工。但是&#xff0…

32g内存 android开发,16G走开 我要32G内存的安卓手机

对于很多用户来说&#xff0c;在准备换手机的时候最纠结的莫过于是选择16G的还是32G的&#xff0c;毕竟价格相差好几百&#xff0c;但这里给你的建议是买32G的&#xff0c;即使贵点&#xff0c;但长远来说&#xff0c;是利大于弊的&#xff0c;为什么这么说呢&#xff0c;且听小…

andriod studio 运行 无结果_无负压静音供水设备下篇一

1、无负压静音供水设备的安全接地保护a )控制柜的金属柜体上应有可靠的接地保护&#xff0c;与接地点相连的保护导线的截面应符合GB/T3797-2005中4.10.6的规定。与接地点连接的导线必须是黄、绿双色或铜编织线&#xff0c;并有明显的接地标识。a) 主接地点与设备任何有关的、因…

html设置div页面最底,使用css让大图片不超过网页宽度

让大图片不超过网页宽度&#xff0c;让图片不撑破通过CSS样式设置的DIV宽度&#xff01;接下来&#xff0c;我们来介绍下网站在开发DIVCSS的时候会遇到一个问题&#xff0c;在发布一个大图片的时候因为图片过宽会撑破自己设置的div宽度的问题。图片撑破布局原因1、由于浏览器版…

java上机题四取三排列_java语言特性概述

一.前言 我们都知道java是面向对象的编程&#xff0c;其中四个基本特性&#xff1a;抽象、封装、继承、多态。这四个特性&#xff0c;概括起来可以这么理解&#xff0c;抽象、封装、继承是多态的基础&#xff0c;多态是抽象、封装、继承的表现。二. JAVA 语言特点 a) 跨平台&am…

LeetCode 1984. 学生分数的最小差值

文章目录1. 题目2. 解题1. 题目 给你一个 下标从 0 开始 的整数数组 nums &#xff0c;其中 nums[i] 表示第 i 名学生的分数。另给你一个整数 k 。 从数组中选出任意 k 名学生的分数&#xff0c;使这 k 个分数间 最高分 和 最低分 的 差值 达到 最小化 。 返回可能的 最小差…

C++中vector使用详细说明 (转)

转自&#xff1a;http://blog.chinaunix.net/uid-26000296-id-3785610.html http://www.cnblogs.com/mr-wid/archive/2013/01/22/2871105.html 一、向量的介绍 向量 vector 是一种对象实体, 能够容纳许多其他类型相同的元素, 因此又被称为容器。 与string相同, vector 同属于…

c++ 一行输出八个数字_R语言笔记(三):数据输入与输出

本文主要介绍数据基本的输入与输出方法&#xff0c;内容包括&#xff1a;1. 数据的输入1.1 scan(), edit(), fix()1.2 调用 R 包自带数据1.3 调用本地数据2. 数据的输出1. 数据的输入1.1 scan(), edit(), fix()手动输入数据主要有以下几种方式&#xff1a;x <- c() # c() 进…

LeetCode 1985. 找出数组中的第 K 大整数(排序)

文章目录1. 题目2. 解题1. 题目 给你一个字符串数组 nums 和一个整数 k 。 nums 中的每个字符串都表示一个不含前导零的整数。 返回 nums 中表示第 k 大整数的字符串。 注意&#xff1a;重复的数字在统计时会视为不同元素考虑。 例如&#xff0c;如果 nums 是 [“1”,“2”,…

firefox应用自动全屏显示_【b】—自动化测试:基础selenium—API

一、浏览器对象# 导入webdriverfrom selenium import webdriver# 创建一个浏览器对象driver webdriver.Firefox()# 设置全屏# driver.maximize_window()# 获取当前浏览器尺寸# size driver.get_window_size()# print(size)# 设置浏览器尺寸driver.set_window_size(400, 400)s…

【linux高级程序设计】(第十三章)Linux Socket网络编程基础 2

BSD Socket网络编程API 创建socket对象 int socket (int __domain, int __type, int __protocol) &#xff1a;成功返回socket文件描述符&#xff0c;失败返回-1. 参数1&#xff1a;socket对象使用的地址簇或协议簇 常用的有PF_LOCAL(本机通信)、PF_INET(IPv4协议簇)、PF_INET6…

数据库中有痣但是有时取不到_农村这种长得像“泥鳅”的鱼,以前没人吃,现在可能有钱都吃不到...

只说真话的农民公众号原创文章&#xff0c;严禁转载在农村中有很多不能叫出名字的花草和野味&#xff0c;它们当中虽然有些长得比较奇怪&#xff0c;名字也比较奇怪&#xff0c;但是却是非常好的疗补食物。有些花草是治疗疾病的良药&#xff0c;有些野味现在也被搬上了酒桌。但…

python 使用 asyncio 包处理并发

文章目录1. 线程与协程对比2. 使用 asyncio 和 aiohttp 下载3. 避免阻塞型调用4. 使用 asyncio.as_completed5. 使用Executor对象&#xff0c;防止阻塞事件循环6. 从回调到期物和协程learn from 《流畅的python》 1. 线程与协程对比 threading import threading import iter…

LeetCode 1991. 找到数组的中间位置(前缀和)

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始的整数数组 nums &#xff0c;请你找到 最左边 的中间位置 middleIndex &#xff08;也就是所有可能中间位置下标最小的一个&#xff09;。 中间位置 middleIndex 是满足 nums[0] nums[1] ... nums[middleIndex-1] n…

LeetCode 1992. 找到所有的农场组(BFS)

文章目录1. 题目2. 解题1. 题目 给你一个下标从 0 开始&#xff0c;大小为 m x n 的二进制矩阵 land &#xff0c;其中 0 表示一单位的森林土地&#xff0c;1 表示一单位的农场土地。 为了让农场保持有序&#xff0c;农场土地之间以矩形的 农场组 的形式存在。 每一个农场组都…

VS2015开发Android,自带模拟器无法调试、加载程序,算是坑吗

VS2015出来后&#xff0c;确定变化很大&#xff0c;什么android、ios的&#xff0c;不在话下。对于我这样传统型的人&#xff0c;也第一时间试用了一下&#xff08;vs2003->vs2008->vs2012->vs2015&#xff09;。以前用eclipse开发过android小程序&#xff0c;现在想试…

amd cpu 安卓模拟器_夜神模拟器常见问题解答_v20201025

MAC版本常见问题Mac版本模拟器常见问题MAC模拟器出现“您应该将它移到废纸篓”解决办法MAC版本模拟器功能介绍MAC版本模拟器界面Mac版模拟器可能无法启动(卡99%)的原因及解决方式其他问题如何在夜神模拟器中安装自己想要的游戏/应用Sample CA 2证书没网解决办法GlobalSignature…