【进阶】 --- 多线程、多进程、异步IO实用例子

【进阶】 --- 多线程、多进程、异步IO实用例子:https://blog.csdn.net/lu8000/article/details/82315576

python之爬虫_并发(串行、多线程、多进程、异步IO):https://www.cnblogs.com/fat39/archive/2004/01/13/9044474.html

Python 并发总结,多线程,多进程,异步IO:https://www.cnblogs.com/junmoxiao/p/11948993.html

asyncio --- 异步 I/O 官方文档:https://docs.python.org/zh-cn/3.10/library/asyncio.html
关于asyncio异步io并发编程:https://zhuanlan.zhihu.com/p/158641367

支持 asyncio 的异步Python库:https://github.com/aio-libs

知乎专栏: https://zhuanlan.zhihu.com/zarten 之  Python中协程异步IO(asyncio)详解( https://zhuanlan.zhihu.com/p/59621713

asyncio:异步I/O、事件循环和并发工具:https://www.cnblogs.com/sidianok/p/12210857.html

在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。以下代码默认运行环境为 python3。

  • httpie:HTTPie 使用详解:https://zhuanlan.zhihu.com/p/45093545
  • grequests,Requests + Gevent,访问:https://github.com/kennethreitz/grequests
  • gevent,一个高并发的网络性能库,访问:http://www.gevent.org/
  • twisted,基于事件驱动的网络引擎框架。访问:https://twistedmatrix.com/trac/

一、多线程、多进程
        1.同步执行
        2.多线程执行
        3.多线程 + 回调函数执行
        4.多进程执行
        5.多进程 + 回调函数执行

二、异步 
        1.asyncio 示例 1
                asyncio 示例 2 
                python 异步编程之 asyncio(百万并发)
                学习 python 高并发模块 asynio
        2.asyncio + aiohttp
        3.asyncio + requests 
        4.gevent + requests
        5.grequests
        6.Twisted示例
        7.Tornado
        8.Twisted更多
        9.史上最牛逼的异步 IO 模块

一、多线程、多进程

1. 同步执行

示例 1( 同步执行

import requests
import time
from lxml import etreeurls = ['https://blog.csdn.net/Jmilk/article/details/103218919','https://blog.csdn.net/stven_king/article/details/103256724','https://blog.csdn.net/csdnnews/article/details/103154693','https://blog.csdn.net/dg_lee/article/details/103951021','https://blog.csdn.net/m0_37907797/article/details/103272967','https://blog.csdn.net/zzq900503/article/details/49618605','https://blog.csdn.net/weixin_44339238/article/details/103977138','https://blog.csdn.net/dengjin20104042056/article/details/103930275','https://blog.csdn.net/Mind_programmonkey/article/details/103940511','https://blog.csdn.net/xufive/article/details/102993570','https://blog.csdn.net/weixin_41010294/article/details/104009722','https://blog.csdn.net/yunqiinsight/article/details/103137022','https://blog.csdn.net/qq_44210563/article/details/102826406',
]def get_title(url: str):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/86.0.4240.183 Safari/537.36'}r = requests.get(url, headers=headers)if 200 == r.status_code:title = etree.HTML(r.content).xpath('//h1[@class="title-article"]/text()')[0]print(title)else:print(f'[status_code:{r.status_code}]:{r.url}')def main():for url in urls:get_title(url)if __name__ == '__main__':start = time.time()main()print(f'cost time: {time.time() - start}s')

使用 httpx 模块的同步调用( httpx 即可同步,也可异步

import time
import httpxdef make_request(client):resp = client.get('https://httpbin.org/get')result = resp.json()print(f'status_code : {resp.status_code}')assert 200 == resp.status_codedef main():session = httpx.Client()# 100 次调用for _ in range(10):make_request(session)if __name__ == '__main__':# 开始start = time.time()main()# 结束end = time.time()print(f'同步:发送100次请求,耗时:{end - start}')

2. 多线程执行(线程池)

from concurrent.futures import ThreadPoolExecutor
import requestsdef fetch_sync(r_url):response = requests.get(r_url)print(f"{r_url} ---> {response.status_code}")url_list = ['https://www.baidu.com','https://www.bing.com'
]
pool = ThreadPoolExecutor(5)
for url in url_list:pool.submit(fetch_sync, url)
pool.shutdown(wait=True)

3. 多线程 + 回调函数执行

from concurrent.futures import ThreadPoolExecutor
import requestsdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):resp = future.result()print(f"{resp.url} ---> {resp.status_code}")url_list = ['https://www.baidu.com','https://www.bing.com'
]
pool = ThreadPoolExecutor(5)
for url in url_list:v = pool.submit(fetch_sync, url)v.add_done_callback(callback)
pool.shutdown(wait=True)

4. 多进程执行

import requests
from concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url)return responseif __name__ == '__main__':url_list = ['https://www.baidu.com','https://www.bing.com']with futures.ProcessPoolExecutor(5) as executor:res = [executor.submit(fetch_sync, url) for url in url_list]print(res)

示例 :

import requests
from concurrent import futures
import time
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_sync(args):i, r_url = argsprint(f'index : {i}')response = requests.get(r_url, verify=False)time.sleep(2)return response.status_codedef callback(future):print(future.result())if __name__ == '__main__':# url_list = ['https://www.github.com', 'https://www.bing.com']url = 'https://www.github.com'with futures.ProcessPoolExecutor(5) as executor:for index in range(1000):v = executor.submit(fetch_sync, (index, url))v.add_done_callback(callback)pass

5. 多进程 + 回调函数执行

import requests
from concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):print(future.result())if __name__ == '__main__':url_list = ['https://www.github.com', 'https://www.bing.com']with futures.ProcessPoolExecutor(5) as executor:for url in url_list:v = executor.submit(fetch_sync, url)v.add_done_callback(callback)pass

二、异步 

对于 事件循环可以动态的增加协程 到 事件循环 中, 而不是在一开始就确定所有需要协程
协程 只运行在 事件循环 中。
默认情况下 asyncio.get_event_loop() 是一个 select模型事件循环
默认的 asyncio.get_event_loop() 事件循环属于主线程。

参考  python asyncio 协程:https://blog.csdn.net/dashoumeixi/article/details/81001681

提示一般是一个线程一个事件循环,为什么要一个线程一个事件循环

如果你要使用多个事件循环 ,创建线程后调用

lp = asyncio.new_event_loop() # 创建一个新的事件循环
asyncio.set_event_loop(lp)    # 设置当前线程的事件循环

核心思想: yield from / await 就这2个关键字,运行(驱动)一个协程, 同时交出当前函数的控制权,让事件循环执行下个任务。

yield from 的实现原理 :https://blog.csdn.net/dashoumeixi/article/details/84076812

要搞懂 asyncio 协程,还是先把生成器弄懂,如果对生成器很模糊,比如 yield from 生成器对象,这个看不懂的话,建议先看 :python生成器 yield from:https://blog.csdn.net/dashoumeixi/article/details/80936798

有 2 种方式让协程运行起来,协程(生成器)本身是不运行的

  • 1. await / yield from  协程,这一组能等待协程完成。
  • 2. asyncio.ensure_future / async(协程)这一组不需要等待协程完成。

注意:

  • 1. 协程就是生成器的增强版 ( 多了send 与 yield 的接收 ),在 asyncio 中的协程 与 生成器对象不同点: 
             asyncio协程: 函数内部不能使用 yield [如果使用会抛RuntimeError],只能使用 yield from / await, 
            一般的生成器: yield 或 yield from 2个都能用,至少使用一个。这 2个本来就是一回事, 协程只是不能使用 yield   
  • 2. 在 asycio 中所有的协程都被自动包装成一个个 Task / Future 对象,但其本质还是一个生成器,因此可以 yield from / await   Task/Futrue

基本流程:

  • 1. 定义一个协程 (async def 或者 @asyncio.coroutine 装饰的函数)
  • 2. 调用上述函数,获取一个协程对象 【不能使用yield,除非你自己写异步模块,毕竟最终所调用的还是基于yield的生成器函数】。通过 asyncio.ensure_futureasyncio.async 函数调度协程(这部意味着要开始执行了) ,返回了一个 Task 对象,Task对象 是 Future对象 的 子类, ( 这步可作可不作,只要是一个协程对象,一旦扔进事件队列中,将自动给你封装成Task对象 )
  • 3. 获取一个事件循环   asyncio.get_event_loop() ,默认此事件循环属于主线程
  • 4. 等待事件循环调度协程

后面的例子着重说明了一下 as_completed,附加了源码。  先说明一下:

  •     1. as_completed 每次迭代返回一个协程,
  •     2. 这个协程内部从 Queue 中取出先完成的 Future 对象
  •     3. 然后我们再 await coroutine

示例 1:

import asyncio"""第一个例子没什么用.注意: 协程 与 生成器 的用法是一样的. 需要调用之后才产生对象. 
"""async def func():print('hi')lp = asyncio.get_event_loop()  # 获取事件循环# 放到进事件循环里.注意,func() 而不是func. 需要调用之后才是协程对象.
lp.run_until_complete(func())

示例 2:

import asyncio"""用async def (新语法) 定义一个函数,同时返回值asyncio.sleep 模拟IO阻塞情况 ; await 相当于 yield from.await 或者 yield from 交出函数控制权(中断),让事件循环执行下个任务 ,一边等待后面的协程完成
"""async def func(i):print('start')await asyncio.sleep(i)  # 交出控制权print('done')return ico = func(2)  # 产生协程对象
print(co)
lp = asyncio.get_event_loop()     # 获取事件循环
task = asyncio.ensure_future(co)  # 开始调度
lp.run_until_complete(task)       # 等待完成
print(task.result())              # 获取结果

添加回调

示例 1:

import asyncio"""添加一个回调:add_done_callback
"""async def func(i):print('start')await asyncio.sleep(i)return idef call_back(v):print('callback , arg:', v, 'result:', v.result())if __name__ == '__main__':co = func(2)  # 产生协程对象lp = asyncio.get_event_loop()  # 获取事件循环# task = asyncio.run_coroutine_threadsafe(co)  # 开始调度task = asyncio.ensure_future(co)  # 开始调度task.add_done_callback(call_back)  # 增加回调lp.run_until_complete(task)  # 等待print(task.result())  # 获取结果

子协程调用原理图

官方的一个实例如下

从下面的原理图我们可以看到

  • 1 当 事件循环 处于运行状态的时候,任务Task 处于pending(等待),会把控制权交给委托生成器 print_sum
  • 2  委托生成器 print_sum 会建立一个双向通道为Task和子生成器,调用子生成器compute并把值传递过去
  • 3  子生成器compute会通过委托生成器建立的双向通道把自己当前的状态suspending(暂停)传给Task,Task 告诉 loop 它数据还没处理完成
  • 4  loop 会循环检测 Task ,Task 通过双向通道去看自生成器是否处理完成
  • 5 子生成器处理完成后会向委托生成器抛出一个异常和计算的值,并关闭生成器
  • 6 委托生成器再把异常抛给任务(Task),把任务关闭
  • 7  loop 停止循环

call_soon、call_at、call_later、call_soon_threadsafe

  • call_soon    循环开始检测时,立即执行一个回调函数
  • call_at    循环开始的第几秒s执行
  • call_later    循环开始后10s后执行
  • call_soom_threadsafe    立即执行一个安全的线程
import asyncioimport timedef call_back(str_var, loop):print("success time {}".format(str_var))def stop_loop(str_var, loop):time.sleep(str_var)loop.stop()# call_later, call_at
if __name__ == "__main__":event_loop = asyncio.get_event_loop()event_loop.call_soon(call_back, 'loop 循环开始检测立即执行', event_loop)now = event_loop.time()  # loop 循环时间event_loop.call_at(now + 2, call_back, 2, event_loop)event_loop.call_at(now + 1, call_back, 1, event_loop)event_loop.call_at(now + 3, call_back, 3, event_loop)event_loop.call_later(6, call_back, "6s后执行", event_loop)# event_loop.call_soon_threadsafe(stop_loop, event_loop)event_loop.run_forever()

不同线程中的事件循环

事件循环中维护了一个队列(FIFO, Queue) ,通过另一种方式来调用:

import time
import datetime
import asyncio"""事件循环中维护了一个FIFO队列通过call_soon 通知事件循环来调度一个函数.
"""def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')time.sleep(x)print(f'func invoked:{x}')loop = asyncio.get_event_loop()
loop.call_soon(func, 1)  # 调度一个函数
loop.call_soon(func, 2)
loop.call_soon(func, 3)
loop.run_forever()  # 阻塞'''
x:1, start time:2020-10-01 15:45:46
func invoked:1
x:2, start time:2020-10-01 15:45:47
func invoked:2
x:3, start time:2020-10-01 15:45:49
func invoked:3
'''

可以看到以上操作是同步的。下面通过 asyncio.run_coroutine_threadsafe 函数可以把上述函数调度变成异步执行:

import time
import datetime
import asyncio"""1.首先会调用asyncio.run_coroutine_threadsafe 这个函数.2.之前的普通函数修改成协程对象
"""async def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')await asyncio.sleep(x)print(f'func invoked:{x}, now:{datetime.datetime.now().replace(microsecond=0)}')loop = asyncio.get_event_loop()
co1 = func(1)
co2 = func(2)
co3 = func(3)
asyncio.run_coroutine_threadsafe(co1, loop)  # 调度
asyncio.run_coroutine_threadsafe(co2, loop)
asyncio.run_coroutine_threadsafe(co3, loop)
loop.run_forever()  # 阻塞'''
x:1, start time:2020-10-01 15:49:32
x:2, start time:2020-10-01 15:49:32
x:3, start time:2020-10-01 15:49:32
func invoked:1, now:2020-10-01 15:49:33
func invoked:2, now:2020-10-01 15:49:34
func invoked:3, now:2020-10-01 15:49:35
'''

上面 2 个例子只是告诉你 2 件事情。

  • 1. run_coroutine_threadsafe异步线程安全 ,call_soon同步
  • 2. run_coroutine_threadsafe 这个函数 对应 ensure_future (只能作用于同一线程中)

可以在一个子线程中运行一个事件循环,然后在主线程中动态的添加协程,这样既不阻塞主线程执行其他任务,子线程也可以异步的执行协程。

注意:默认情况下获取的 event_loop 是主线程的,所以要在子线程中使用 event_loop 需要 new_event_loop 。如果在子线程中直接获取 event_loop 会抛异常 。

源代码中的判断:isinstance(threading.current_thread(), threading._MainThread)

示例:

import os
import sys
import queue
import threading
import time
import datetime
import asyncio"""1. call_soon , call_soon_threadsafe 是同步的2. asyncio.run_coroutine_threadsafe(coro, loop) -> 对应 asyncio.ensure_future是在 事件循环中 异步执行。
"""# 在子线程中执行一个事件循环 , 注意需要一个新的事件循环
def thread_loop(loop: asyncio.AbstractEventLoop):print('线程开启 tid:', threading.currentThread().ident)asyncio.set_event_loop(loop)  # 设置一个新的事件循环loop.run_forever()            # run_forever 是阻塞函数,所以,子线程不会退出。async def func(x, q):current_time = datetime.datetime.now().replace(microsecond=0)msg = f'func: {x}, time:{current_time}, tid:{threading.currentThread().ident}'print(msg)await asyncio.sleep(x)q.put(x)if __name__ == '__main__':temp_queue = queue.Queue()lp = asyncio.new_event_loop()  # 新建一个事件循环, 如果使用默认的, 则不能放入子线程thread_1 = threading.Thread(target=thread_loop, args=(lp,))thread_1.start()co1 = func(2, temp_queue)  # 2个协程co2 = func(3, temp_queue)asyncio.run_coroutine_threadsafe(co1, lp)  # 开始调度在子线程中的事件循环asyncio.run_coroutine_threadsafe(co2, lp)print(f'开始事件:{datetime.datetime.now().replace(microsecond=0)}')while 1:if temp_queue.empty():print('队列为空,睡1秒继续...')time.sleep(1)continuex = temp_queue.get()  # 如果为空,get函数会直接阻塞,不往下执行current_time = datetime.datetime.now().replace(microsecond=0)msg = f'main :{x}, time:{current_time}'print(msg)time.sleep(1)

下面例子中 asyncio.ensure_future/async 都可以换成 asyncio.run_coroutine_threadsafe 【 在不同线程中的事件循环 】:

ThreadPollExecutor 和 asyncio 完成阻塞 IO 请求

在 asyncio 中集成线程池处理耗时IO

在协程中同步阻塞的写法,但有些时候不得已就是一些同步耗时的接口

可以把 线程池 集成到 asynico 模块中

import asyncio
from concurrent import futurestask_list = []
loop = asyncio.get_event_loop()
executor = futures.ThreadPoolExecutor(3)def get_url(t_url=None):print(t_url)for url in range(20):url = "http://shop.projectsedu.com/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)task_list.append(task)loop.run_until_complete(asyncio.wait(task_list))

示例代码:

# 使用多线程:在 协程 中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparsedef get_url(url):# 通过socket请求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"# 建立socket连接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80))  # 阻塞不会消耗cpu# 不停的询问连接是否建立好, 需要while循环不停的去检查状态# 做计算任务或者再次发起其他的连接请求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":import timestart_time = time.time()loop = asyncio.get_event_loop()executor = ThreadPoolExecutor(3)tasks = []for url in range(20):url = "http://shop.projectsedu.com/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))print("last time:{}".format(time.time() - start_time))

不用集成也是可以的,但是要在函数的前面加上 async 使同步变成异步写法

#使用多线程:在携程中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
import timeasync def get_html(url):#通过socket请求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = "/"#建立socket连接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) #阻塞不会消耗cpu#不停的询问连接是否建立好, 需要while循环不停的去检查状态#做计算任务或者再次发起其他的连接请求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()tasks = [get_html("http://shop.projectsedu.com/goods/2/") for i in range(10)]loop.run_until_complete(asyncio.wait(tasks))print(time.time() - start_time)

asyncio 的 同步 和 通信

在多少线程中考虑安全性,需要加锁,在协程中是不需要的

import asynciototal = 0
lock = Noneasync def add():global totalfor _ in range(1000):total += 1async def desc():global total, lockfor _ in range(1000):total -= 1if __name__ == '__main__':tasks = [add(), desc()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))print(total)

在有些情况中,对协程还是需要类似锁的机制

示例:parse_response 和 use_response 有共同调用的代码,get_response、parse_response 去请求的时候 如果 get_response 也去请求,会触发网站的反爬虫机制.
这就需要我们像上诉代码那样加 lock,同时 get_response 和 use_response 中都调用了parse_response,我们想在 get_response 中只请求一次,下次用缓存,所以要用到锁

import asyncio
import aiohttp
from asyncio import Lockcache = {}
lock = Lock()async def get_response(url):async with lock:  # 等价于 with await lock:   还有async for 。。。类似的用法# 这里使用async with 是因为 Lock中有__await__ 和 __aenter__两个魔法方法# 和线程一样, 这里也可以用 await lock.acquire() 并在结束时 lock.releaseif url in cache:return cache[url]print("第一次请求")response = aiohttp.request('GET', url)cache[url] = responsereturn responseasync def parse_response(url):response = await get_response(url)print('parse_response', response)# do some parseasync def use_response(url):response = await get_response(url)print('use_response', response)# use response to do something interestingif __name__ == '__main__':tasks = [parse_response('baidu'), use_response('baidu')]loop = asyncio.get_event_loop()# loop.run_until_complete将task放到loop中,进行事件循环, 这里必须传入的是一个listloop.run_until_complete(asyncio.wait(tasks))

输出结果如下 

asyncio 通信 queue

协程是单线程的,所以协程中完全可以使用全局变量实现 queue 来相互通信,但是如果想要在 queue 中定义存放有限的最大数目,需要在 put 和 get 的前面都要加 await 

from asyncio import Queuequeue = Queue(maxsize=3)
await queue.get()
await queue.put()

一个事件循环中执行多个 task,实现并发执行

future task: 

  • future 是一个结果的容器,结果执行完后在内部会回调 call_back 函数
  • task future 的子类,可以用来激活协程。( task 协程 Future 桥梁

waitgather、await

1. waitgather 这2个函数都是用于获取结果的,且都不阻塞,直接返回一个生成器对象可用于 yield from / await

2. 两种用法可以获取执行完成后的结果:
        第一种: result = asyncio.run_until_completed(asyncio.wait/gather)    执行完成所有之后获取结果
        第二种: result = await asyncio.wait/gather     在一个协程内获取结果

3. as_completed 与并发包 concurrent 中的行为类似,哪个任务先完成哪个先返回,内部实现是 yield from Queue.get()

4. 嵌套:await / yield from 后跟协程,直到后面的协程运行完毕,才执行 await / yield from 下面的代码整个过程是不阻塞的 

wait 和 gather 区别 

这两个都可以添加多个任务到事件循环中

一般使用 asyncio.wait(tasks) 的地方也可以使用 asyncio.gather(tasks) ,但是 wait 接收一堆 task,gather接收一个 task 列表。

asyncio.wait(tasks)方法返回值是两组 task/future的 set。dones, pendings = await asyncio.wait(tasks) 其中

  • dones 是 task的 set,
  • pendings 是 future 的 set。

asyncio.gather(tasks) 返回一个结果的 list。

gather 比 wait  更加的高级

  • 可以对任务进行分组
  • 可以取消任务
import asyncio
import timeasync def get_html(url):global indexprint(f"{index}  start get url")await asyncio.sleep(2)index += 1print(f"{index}  end get url")if __name__ == "__main__":start_time = time.time()index = 1loop = asyncio.get_event_loop()tasks = [get_html("http://www.imooc.com") for i in range(10)]# gather和wait的区别# tasks = [get_html("http://www.imooc.com") for i in range(10)]# loop.run_until_complete(asyncio.wait(tasks))group1 = [get_html("http://projectsedu.com") for i in range(2)]group2 = [get_html("http://www.imooc.com") for i in range(2)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)loop.run_until_complete(asyncio.gather(group1, group2))print(time.time() - start_time)

示例 1:

import asyncio"""并发 执行多个任务。调度一个Task对象列表调用 asyncio.wait 或者 asyncio.gather 获取结果
"""async def func(i):print('start')# 交出控制权,事件循环执行下个任务,同时等待完成await asyncio.sleep(i)return iasync def func_sleep():await asyncio.sleep(2)def test_1():# asyncio create_task永远运行# https://www.pythonheidong.com/blog/article/160584/ca5dc07f62899cedad64/lp = asyncio.get_event_loop()tasks = [lp.create_task(func(i)) for i in range(3)]lp.run_until_complete(func_sleep())# 或者# lp.run_until_complete(asyncio.wait([func_sleep(), ]))def test_2():        lp = asyncio.get_event_loop()# tasks = [func(i) for i in range(3)]# tasks = [asyncio.ensure_future(func(i)) for i in range(3)]  # asyncio.ensure_future# 或者tasks = [lp.create_task(func(i)) for i in range(3)]  # lp.create_tasklp.run_until_complete(asyncio.wait(tasks))for task in tasks:print(task.result())if __name__ == '__main__':# test_1()test_2()pass

示例 2:

import asyncio"""通过 await 或者 yield from 形成1个链, 后面跟其他协程. 形成一个链的目的很简单,当前协程需要这个结果才能继续执行下去.就跟普通函数调用其他函数获取结果一样
"""async def func(i):print('start')await asyncio.sleep(i)return iasync def to_do():print('to_do start')tasks = []# 开始调度3个协程对象for i in range(3):tasks.append(asyncio.ensure_future(func(i)))# 在协程内等待结果. 通过 await 来交出控制权, 同时等待tasks完成task_done, task_pending = await asyncio.wait(tasks)print('to_do get result')# 获取已经完成的任务for task in task_done:print('task_done:', task.result())# 未完成的for task in task_pending:print('pending:', task)if __name__ == '__main__':lp = asyncio.get_event_loop()  # 获取事件循环lp.run_until_complete(to_do())  # 把协程对象放进去# lp.close()  # 关闭事件循环

as_completed 函数返回一个迭代器,每次迭代一个协程。

事件循环内部有一个 Queue(queue.Queue 线程安全) , 先完成的先入队。

as_completed 迭代的协程源码是 :  注意 yield from 后面可以跟 iterable

#简化版代码
f = yield from done.get() # done 是 Queue
return f.result()

例子:

asyncio.as_completed 返回一个生成器对象 , 因此可用于迭代

每次从此生成器中返回的对象是一个个协程(生成器),哪个最先完成哪个就返回, 而要从 生成器/协程 中获取返回值,就必须使用 yield from / await , 简单来说就是:生成器的返回值在异常中, 详情参考最上面的基础链接

import asyncioasync def func(x):# print('\t\tstart ',x)await asyncio.sleep(5)# print('\t\tdone ', x)return xasync def to_do():# 在协程内调度2个协程tasks = [asyncio.ensure_future(func(i)) for i in range(2)]# 使用as_completed:先完成,先返回.# 每次迭代返回一个协程.# 这个协程:_wait_for_one,内部从队列中产出一个最先完成的Future对象for coroutine in asyncio.as_completed(tasks):result = await coroutine  # 等待协程,并返回先完成的协程print('result :', result)print('all done')lp = asyncio.get_event_loop()
lp.set_debug(True)
lp.run_until_complete(to_do())  # 调度协程

获取多个并发的 task 的结果。

( task 协程Future 桥梁

  • 如果我们要获取 task 的结果,一定要创建一个task,就是把我们的协程绑定要 task 上,这里直接用事件循环 loop 里面的 create_task 就可以搞定。

  • 我们假设有3个并发的add任务需要处理,然后调用 run_until_complete 来等待3个并发任务完成。

  • 调用 task.result 查看结果,这里的 task 其实是 _asyncio.Task,是封装的一个类。大家可以在 Pycharm 中找 asyncio 里面的源码,里面有一个 tasks 文件。

爬取有道词典

玩并发比较多的是爬虫,爬虫可以用多线程,线程池去爬。但是我们用 requests 的时候是阻塞的,无法并发。所以我们要用一个更牛逼的库 aiohttp,这个库可以当成是异步的 requests。

1). 爬取有道词典

有道翻译的API已经做好了,我们可以直接调用爬取。然后解析网页,获取单词的翻译。然后解析网页,网页比较简单,可以有很多方法解析。因为爬虫文章已经泛滥了,我这里就不展开了,很容易就可以获取单词的解释。

2). 代码的核心框架

  • 设计一个异步的框架,生成一个事件循环

  • 创建一个专门去爬取网页的协程,利用aiohttp去爬取网站内容

  • 生成多个要翻译的单词的url地址,组建一个异步的tasks, 扔到事件循环里面

  • 等待所有的页面爬取完毕,然后用pyquery去一一解析网页,获取单词的解释,部分代码如下:

import time
import asyncio
import aiohttp
from pyquery import PyQuery as pqdef decode_html(html_content=None):url, resp_text = html_contentdoc = pq(resp_text)des = ''for li in doc.items('#phrsListTab .trans-container ul li'):des += li.text()return url, desasync def fetch(session: aiohttp.ClientSession = None, url=None):async with session.get(url=url) as resp:resp_text = await resp.text()return url, resp_textasync def main(word_list=None):url_list = ['http://dict.youdao.com/w/{}'.format(word) for word in word_list]temp_task_list = []async with aiohttp.ClientSession() as session:for url in url_list:temp_task_list.append(fetch(session, url))html_list = await asyncio.gather(*temp_task_list)for html_content in html_list:print(decode_html(html_content))if __name__ == '__main__':start_time = time.time()text = 'apple'word_list_1 = [ch for ch in text]word_list_2 = [text for _ in range(100)]loop = asyncio.get_event_loop()task_list = [main(word_list_1),main(word_list_2),]loop.run_until_complete(asyncio.wait(task_list))print(time.time() - start_time)

谈到 http 接口调用,Requests 大家并不陌生,例如,robotframework-requests、HttpRunner 等 HTTP 接口测试库/框架都是基于它开发。

这里将介绍另一款http接口测试框架 httpx,snaic 同样也集成了 httpx 库。httpx 的 API 和 Requests 高度一致。github: https://github.com/encode/httpx

安装:pip install httpx

httpx 简单使用

import json
import httpxr = httpx.get("http://httpbin.org/get")
print(r.status_code)
print(json.dumps(r.json(), ensure_ascii=False, indent=4))

带参数的 post 调用

import json
import httpxpayload = {'key1': 'value1', 'key2': 'value2'}
r = httpx.post("http://httpbin.org/post", data=payload)
print(r.status_code)
print(json.dumps(r.json(), ensure_ascii=False, indent=4))

httpx 异步调用。接下来认识 httpx 的异步调用:

import json
import httpx
import asyncioasync def main():async with httpx.AsyncClient() as client:resp = await client.get('http://httpbin.org/get')result = resp.json()print(json.dumps(result, ensure_ascii=False, indent=4))asyncio.run(main())

httpx 异步调用

import httpx
import asyncio
import timeasync def request(client):global indexresp = await client.get('http://httpbin.org/get')index += 1result = resp.json()print(f'{index}  status_code : {resp.status_code}')assert 200 == resp.status_codeasync def main():async with httpx.AsyncClient() as client:# 50 次调用task_list = []for _ in range(50):req = request(client)task = asyncio.create_task(req)task_list.append(task)await asyncio.gather(*task_list)if __name__ == "__main__":index = 0# 开始start = time.time()asyncio.run(main())# 结束end = time.time()print(f'异步:发送 50 次请求,耗时:{end - start}')

想搞懂 异步框架 和异步接口的调用,可以按这个路线学习:1.python异步编程、2.python Web异步框架(tornado/sanic)、3.异步接口调用(aiohttp/httpx)

1. asyncio

示例 1:( Python 3.5+ 之前的写法  )

import asyncio@asyncio.coroutine
def func1():print('before...func1......')yield from asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

改进,使用 async / await 关键字 ( Python 3.5+ 开始引入了新的语法 async 和 await )

import asyncioasync def func1():print('before...func1......')await asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

示例 2 :

import asyncioasync def fetch_async(host, url='/'):print(host, url)reader, writer = await asyncio.open_connection(host, 80)request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)request_header_content = bytes(request_header_content, encoding='utf-8')writer.write(request_header_content)await writer.drain()text = await reader.read()print(host, url, text)writer.close()tasks = [fetch_async('www.cnblogs.com', '/wupeiqi/'),fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
]loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

示例 3:

#!/usr/bin/env python
# encoding:utf-8
import asyncio
import aiohttp
import timeasync def download(url):  # 通过async def定义的函数是原生的协程对象print("get: %s" % url)async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)# response = await resp.read()async def main():start = time.time()await asyncio.wait([download("http://www.163.com"),download("http://www.mi.com"),download("http://www.baidu.com")])end = time.time()print("Complete in {} seconds".format(end - start))loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Python 异步编程之 asyncio(百万并发)

前言:python 由于 GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病。然而在 IO 密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了 python 性能方面的短板,如最新的微服务框架 japronto,resquests per second 可达百万级。

python 还有一个优势是库(第三方库)极为丰富,运用十分方便。asyncio 是 python3.4 版本引入到标准库,python2x 没有加这个库,毕竟 python3x 才是未来啊,哈哈!python3.5 又加入了 async/await 特性。在学习 asyncio 之前,我们先来理清楚 同步/异步的概念

  • 同步 是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。。。
  • 异步 是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。

aiohttp 使用

  如果需要并发 http 请求怎么办呢,通常是用 requests,但 requests 是同步的库,如果想异步的话需要引入 aiohttp。这里引入一个类,from aiohttp import ClientSession,首先要建立一个 session 对象,然后用 session 对象去打开网页。session 可以进行多项操作,比如 post、get、put、head 等。

基本用法:

async with ClientSession() as session:async with session.get(url) as response:

aiohttp 异步实现的例子:

import asyncio
from aiohttp import ClientSessiontasks = []
url = "http://httpbin.org/get?args=hello_word"async def hello(t_url):async with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()# response = await req.text()# print(response)print(f'{req.url} : {req.status}')if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(hello(url))

首先 async def 关键字定义了这是个异步函数,await 关键字加在需要等待的操作前面,req.read() 等待 request 响应,是个耗 IO 操作。然后使用 ClientSession 类发起 http 请求。

异步请求多个URL

如果我们需要请求多个 URL 该怎么办呢?

  • 同步的做法:访问多个 URL时,只需要加个 for 循环就可以了。
  • 但异步的实现方式并没那么容易:在之前的基础上需要将 hello() 包装在 asyncio 的 Future 对象中,然后将 Future对象列表 作为 任务 传递给 事件循环
import datetime
import asyncio
from aiohttp import ClientSessiontask_list = []
url = "https://www.baidu.com/{}"async def hello(t_url):ret_val = Noneasync with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()print(f'Hello World:{datetime.datetime.now().replace(microsecond=0)}')# print(response)ret_val = req.statusreturn ret_valdef run():for i in range(5):one_task = asyncio.ensure_future(hello(url.format(i)))task_list.append(one_task)if __name__ == '__main__':loop = asyncio.get_event_loop()run()result = loop.run_until_complete(asyncio.wait(task_list))# 方法 1 : 获取结果for task in task_list:print(task.result())# 方法 2 : 获取结果finish_task, pending_task = resultprint(f'finish_task count:{len(pending_task)}')for task in finish_task:print(task.result())print(f'pending_task count:{len(pending_task)}')for task in pending_task:print(task.result())'''
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
Hello World:2020-12-06 16:29:02
404
404
404
404
404
finish_task count:0
404
404
404
404
404
pending_task count:0
'''

收集 http 响应

上面介绍了访问不同 URL 的异步实现方式,但是我们只是发出了请求,如果要把响应一一收集到一个列表中,最后保存到本地或者打印出来要怎么实现呢? 

可通过 asyncio.gather(*tasks) 将响应全部收集起来,具体通过下面实例来演示。

import time
import asyncio
from aiohttp import ClientSessiontask_list = []
temp_url = "https://www.baidu.com/{}"async def hello(url=None):async with ClientSession() as session:async with session.get(url) as request:# print(request)print('Hello World:%s' % time.time())return await request.read()def run():for i in range(5):task = asyncio.ensure_future(hello(temp_url.format(i)))task_list.append(task)result = loop.run_until_complete(asyncio.gather(*task_list))print(f'len(result) : {len(result)}')for item in result:print(item)if __name__ == '__main__':loop = asyncio.get_event_loop()run()

限制并发数(最大文件描述符的限制)

提示:此方法也可用来作为异步爬虫的限速方法(反反爬)

假如你的并发达到 2000 个,程序会报错:ValueError: too many file descriptors in select()。报错的原因字面上看是 Python 调取的 select 对打开的文件有最大数量的限制,这个其实是操作系统的限制,linux 打开文件的最大数默认是 1024,windows 默认是 509,超过了这个值,程序就开始报错。这里我们有三种方法解决这个问题:

  • 1. 限制并发数量。(一次不要塞那么多任务,或者限制最大并发数量)
  • 2. 使用回调的方式
  • 3. 修改操作系统打开文件数的最大限制,在系统里有个配置文件可以修改默认值,具体步骤不再说明了。

不修改系统默认配置的话,个人推荐限制并发数的方法,设置并发数为 500,处理速度更快。

使用 semaphore = asyncio.Semaphore(500) 以及在协程中使用 async with semaphore:  操作具体代码如下:

# coding:utf-8
import time, asyncio, aiohttpurl = 'https://www.baidu.com/'index = 0async def hello(url, semaphore):global indexasync with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as response:print(f'{index} : ', end='')await asyncio.sleep(2)print(response.status)return await response.read()async def run():# 为了看效果,这是设置 100 个任务,并发限制为 5semaphore = asyncio.Semaphore(5)  # 限制并发量为500to_get = [hello(url.format(), semaphore) for _ in range(100)]  # 总共1000任务await asyncio.wait(to_get)if __name__ == '__main__':#    now=lambda :time.time()loop = asyncio.get_event_loop()loop.run_until_complete(run())# loop.close()

示例代码:

import asyncio
import aiohttpasync def get_http(url):async with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as res:global countcount += 1print(count, res.status)if __name__ == '__main__':count = 0semaphore = asyncio.Semaphore(500)loop = asyncio.get_event_loop()temp_url = 'https://www.baidu.com/s?wd={0}'tasks = [get_http(temp_url.format(i)) for i in range(600)]loop.run_until_complete(asyncio.wait(tasks))loop.close()

示例代码:

from aiohttp import ClientSession
import asyncio#  限制协程并发量
async def hello(sem, num):async with sem:async with ClientSession() as session:async with session.get(f'http://httpbin.org/get?a={num}') as response:r = await response.read()print(f'[{num}]:{r}')await asyncio.sleep(1)def main():loop = asyncio.get_event_loop()tasks = []sem = asyncio.Semaphore(5)  # thisfor index in range(100000):task = asyncio.ensure_future(hello(sem, index))tasks.append(task)feature = asyncio.ensure_future(asyncio.gather(*tasks))try:loop.run_until_complete(feature)finally:loop.close()if __name__ == "__main__":main()

aiohttp 实现高并发爬虫 ( 异步 mysql )

python asyncio并发编程:https://www.cnblogs.com/crazymagic/articles/10066619.html

# asyncio爬虫, 去重, 入库import asyncio
import re
import aiohttp
import aiomysql
from pyquery import PyQuerystopping = Falsestart_url = 'http://www.jobbole.com'
waiting_urls = []
seen_urls = set()  # 实际使用爬虫去重时,数量过多,需要使用布隆过滤器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html):  # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool):  # 获取文章详情并解析入库html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text()  # 为了简单, 只获取title的内容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql)  # 插入数据库# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0:  # 如果使用asyncio.Queue的话, 不需要我们来处理这些逻辑。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?jobbole.com/\d+/', url):if url not in seen_urls:  # 是没有处理过的url,则处理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql连接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset  autocommit必须设置, 这是坑, 不写数据库写入不了中文数据async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()

学习 python 高并发模块 asynio

参考:Python黑魔法 --- 异步IO( asyncio) 协程:https://www.jianshu.com/p/b5e347b3a17c

Python 中重要的模块 --- asyncio:https://www.cnblogs.com/zhaof/p/8490045.html
Python 协程深入理解:https://www.cnblogs.com/zhaof/p/7631851.html

asyncio 是 python 用于解决异步io编程的一整套解决方案

创建一个 asyncio 的步骤如下

  1. 创建一个 event_loop 事件循环,当启动时,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。
  2. 创建协程: 使用 async 关键字定义的函数就是一个协程对象。在协程函数内部可以使用 await 关键字用于阻塞操作的挂起。
  3. 将协程注册到事件循环中。协程的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

一、定义一个协程

import time
import asyncioasync def do_some_work(x):print("waiting:", x)start = time.time()
# 这里是一个协程对象,这个时候do_some_work函数并没有执行
coroutine = do_some_work(2)
print(coroutine)
#  创建一个事件loop
loop = asyncio.get_event_loop()
# 将协程注册到事件循环,并启动事件循环
loop.run_until_complete(coroutine)print("Time:", time.time() - start)

二、创建一个 task

一个协程对象 就是 一个原生可以挂起的函数任务则是对协程进一步封装,其中包含了任务的各种状态。

在上面的代码中,在注册事件循环的时候,其实是 run_until_complete 方法将协程包装成为了一个任务(task)对象。 task 对象是 Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果

import asyncio
import timestart = lambda: time.time()async def do_some_work(x):print("waiting:", x)start = start()coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print("Time:", time.time() - start)

loop.create_taskasyncio.async / asyncio.ensure_futureTask 有什么区别?

BaseEventLoop.create_task(coro) 、asyncio.async(coro)、Task(coro)安排协同程序执行,这似乎也可以正常工作。那么,所有这些之间有什么区别?

  • 在 Python> = 3.5中,已将 async 设为关键字,所以 asyncio.async 必须替换为 asyncio.ensure_future
  • create_task 的存在理由:
            第三方事件循环可以使用其自己的Task子类来实现互操作性。在这种情况下,结果类型是Task的子类。
            这也意味着您不应直接创建 Task ,因为不同的事件循环可能具有不同的创建"任务"的方式。
  • 另一个重要区别是,除了接受协程外, ensure_future 也接受任何等待的对象;而 create_task 只接受协程。

那么用 ensure_future 还是 create_task 函数声明对比:

  • asyncio.ensure_future(coro_or_future, *, loop=None)
  • BaseEventLoop.create_task(coro)

显然,ensure_future 除了接受 coroutine 作为参数,还接受 future 作为参数。

看 ensure_future 的代码,会发现 ensure_future 内部在某些条件下会调用 create_task,综上所述:

  • encure_future: 最高层的函数,推荐使用!
  • create_task: 在确定参数是 coroutine 的情况下可以使用。
  • Task: 可能很多时候也可以工作,但真的没有使用的理由!

为了 interoperability,第三方的事件循环可以使用自己的 Task 子类。这种情况下,返回结果的类型是 Task 的子类。

所以,不要直接创建 Task 实例,应该使用 ensure_future() 函数或 BaseEventLoop.create_task() 方法。

asyncio.ensure_futureBaseEventLoop.create_task 对比简单的协同程序

From:asyncio.ensure_future与BaseEventLoop.create_task对比简单的协同程序?-python黑洞网

看过几个关于asyncio的基本Python 3.5教程,它们以各种方式执行相同的操作。在这段代码中:

import asyncioasync def doit(i):print("Start %d" % i)await asyncio.sleep(3)print("End %d" % i)return iif __name__ == '__main__':loop = asyncio.get_event_loop()# futures = [asyncio.ensure_future(doit(i), loop=loop) for i in range(10)]# futures = [loop.create_task(doit(i)) for i in range(10)]futures = [doit(i) for i in range(10)]result = loop.run_until_complete(asyncio.gather(*futures))print(result)

上面定义 futures 变量的所有三个变体都实现了相同的结果,那么他们有什么区别?有些情况下我不能只使用最简单的变体(协程的简单列表)吗?

asyncio.create_task(coro) asyncio.ensure_future(obj)

从 Python 3.7 开始,为此目的添加了 asyncio.create_task(coro) 高级功能,可以使用它来代替从 coroutimes 创建任务的其他方法。

但是,如果需要从任意等待创建任务,应该使用 asyncio.ensure_future(obj)

推荐:使用 asyncio.ensure_future(obj) 来代替 asyncio.create_task(coro)

ensure_future VS create_task

ensure_future是创建一个方法Task从coroutine。它基于参数以不同的方式创建任务(包括使用create_task协同程序和类似未来的对象)。

create_task是一种抽象的方法AbstractEventLoop。不同的事件循环可以不同的方式实现此功能。

您应该使用ensure_future创建任务。create_task只有在你要实现自己的事件循环类型时才需要。

“当从协程创建任务时,你应该使用适当命名的loop.create_task()

在任务中包装协程 - 是一种在后台启动此协程的方法。这是一个例子:

import asyncioasync def msg(text):await asyncio.sleep(0.1)print(text)async def long_operation():print('long_operation started')await asyncio.sleep(3)print('long_operation finished')async def main():await msg('first')# Now you want to start long_operation, but you don't want to wait it finised:# long_operation should be started, but second msg should be printed immediately.# Create task to do so:task = asyncio.ensure_future(long_operation())await msg('second')# Now, when you want, you can await task finised:await taskif __name__ == "__main__":loop = asyncio.get_event_loop()loop.run_until_complete(main())'''
输出:
first
long_operation started
second
long_operation finished
'''

创建任务:

  • 可以通过 loop.create_task(coroutine) 创建 task,
  • 也可以通过 asyncio.ensure_future(coroutine) 创建 task。

使用这两种方式的区别在 官网( 协程与任务 — Python 3.10.4 文档 )上有提及。

task / future 以及使用 async 创建的都是 awaitable 对象,都可以在 await 关键字之后使用。

future 对象意味着在未来返回结果,可以搭配回调函数使用

要真正运行一个协程,asyncio 提供了三种主要机制

( https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.ensure_future )

  • 1.  asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数。
import asyncioasync def main():print('hello')await asyncio.sleep(1)print('world')asyncio.run(main())
  • 2. 使用 await ( 即 等待一个协程 )
import asyncio
import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():print(f"started at {time.strftime('%X')}")await say_after(3, 'hello')await say_after(1, 'world')print(f"finished at {time.strftime('%X')}")asyncio.run(main())

预期的输出:

started at 17:13:52
world
hello
finished at 17:13:55
  • 3. 使用 asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。
import asyncio
import timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():task1 = asyncio.create_task(say_after(3, 'hello'))task2 = asyncio.create_task(say_after(1, 'world'))print(f"started at {time.strftime('%X')}")# Wait until both tasks are completed (should take# around 2 seconds.)await task1await task2asyncio.run(main())

预期的输出:

started at 17:14:32
world
hello
finished at 17:14:34

获取协程的返回值

  • 1 创建一个任务 task
  • 2 通过调用 task.result 获取协程的返回值
import asyncio
import timeasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html("http://httpbin.org"))loop.run_until_complete(task)print(task.result())

三、绑定回调

执行成功进行回调处理

可以通过  add_done_callback( 任务) 添加回调,因为这个函数只接受一个回调的函数名,不能传参,我们想要传参可以使用偏函数

# 获取协程的返回值
import asyncio
import time
from functools import partialasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"def callback(url, future):print(url)print("send email to bobby")if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html("http://www.imooc.com"))task.add_done_callback(partial(callback, "http://www.imooc.com"))loop.run_until_complete(task)print(task.result())

asnycio 异步请求+异步回调asnycio 异步请求+异步回调_xiaobai8823的博客-CSDN博客

当使用 ensure_feature 创建任务的时候,可以使用任务的 task.add_done_callback(callback)方法,获得对象的协程返回值。

import asyncio
import timeasync def do_some_work(x):print("waiting:", x)return "Done after {}s".format(x)def callback(future):print("callback:", future.result())start = time.time()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)

四、阻塞 使用 await 让出控制权,挂起当前操作 )

前面提到 asynic 函数内部可以使用 await 来针对耗时的操作进行挂起。

import asyncio
import timeasync def do_some_work(x):print("waiting:", x)# await 后面就是调用耗时的操作await asyncio.sleep(x)return "Done after {}s".format(x)coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

五、并发并行

  • 并发:同一时刻 同时 发生
  • 并行:同一时间 间隔 发生

并发通常是指有多个任务需要同时进行,并行则是同一个时刻有多个任务执行.
当有多个任务需要并行时,可以将任务先放置在任务队列中,然后将任务队列传给 asynicio.wait 方法,这个方法会同时并行运行队列中的任务。将其注册到事件循环中。

import asyncioasync def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)
]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

六、嵌套协程

使用 async 可以定义协程,协程用于耗时的 io 操作,我们也可以封装更多的 io 操作过程,这样就实现了嵌套的协程,即一个协程中 await 了另外一个协程,如此连接起来。

import asyncioasync def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]dones, pendings = await asyncio.wait(tasks)for task in dones:print("Task ret:", task.result())# results = await asyncio.gather(*tasks)# for result in results:#     print("Task ret:",result)loop = asyncio.get_event_loop()
loop.run_until_complete(main())

使用 asyncio.wait 的结果如下,可见返回的结果 dones 并不一定按照顺序输出

waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
Time: 4.006587505340576

使用 await asyncio.gather(*tasks) 得到的结果如下,是按照列表顺序进行返回的

waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004234313964844

上面的程序将 main 也定义为协程。我们也可以不在 main 协程函数里处理结果,直接返回 await 的内容,那么最外层的 run_until_complete 将会返回main协程的结果。

import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]return await asyncio.gather(*tasks)# return await asyncio.wait(tasks)也可以使用。注意gather方法需要*这个标记start = now()loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:print("Task ret:", result)print("Time:", now() - start)

也可以使用 as_complete 方法实现嵌套协程

import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]for task in asyncio.as_completed(tasks):result = await taskprint("Task ret: {}".format(result))start = now()loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now() - start)

七、协程停止

创建 future 的时候,task 为 pending,事件循环调用执行的时候当然就是 running,调用完毕自然就是 done,如果需要停止事件循环,就需要先把 task 取消。可以使用 asyncio.Task 获取事件循环的 task。
future 对象有如下几个状态:Pending、Running、Done、Cacelled

import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3),
]start = now()loop = asyncio.get_event_loop()
try:loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:print(asyncio.Task.all_tasks())for task in asyncio.Task.all_tasks():print(task.cancel())loop.stop()loop.run_forever()
finally:loop.close()print("Time:", now() - start)

启动事件循环之后,马上 ctrl+c,会触发 run_until_complete 的执行异常 KeyBorardInterrupt。然后通过循环 asyncio.Task 取消 future。可以看到输出如下:

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547

True 表示 cannel 成功,loop stop 之后还需要再次开启事件循环,最后在 close,不然还会抛出异常.

循环 task,逐个 cancel 是一种方案,可是正如上面我们把 task 的列表封装在 main 函数中,main 函数外进行事件循环的调用。这个时候,main 相当于最外出的一个task,那么处理包装的main 函数即可。

task取消和子协程调用原理

程序运行时 通过 ctl +c 取消任务 调用task.cancel()取消任务

import asyncio
import timeasync def get_html(sleep_times):print("waiting")await asyncio.sleep(sleep_times)print("done after {}s".format(sleep_times))if __name__ == "__main__":task1 = get_html(2)task2 = get_html(3)task3 = get_html(3)tasks = [task1, task2, task3]loop = asyncio.get_event_loop()try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e:all_tasks = asyncio.Task.all_tasks()for task in all_tasks:print("cancel task")print(task.cancel())loop.stop()loop.run_forever()finally:loop.close()

在终端执行:python ceshi.py  ,运行成功后 按 ctl +c 取消任务  


 

不同线程的事件循环( 线程、线程池 )

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被 block。

import asyncio
from threading import Thread
import timenow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

启动上述代码之后,当前线程不会被 block,新线程中会按照顺序执行 call_soon_threadsafe 方法注册的 more_work 方法, 后者因为 time.sleep 操作是同步阻塞的,因此运行完毕more_work 需要大致 6 + 3

使用 线程池

# -*- coding:utf-8 -*-
import asyncio
import time
from concurrent.futures import ThreadPoolExecutorthread_pool = ThreadPoolExecutor(5)
tasks = []func_now = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = func_now()
new_loop = asyncio.new_event_loop()thread_pool.submit(start_loop, new_loop)print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

主线程创建事件循环,子线程开启无限事件循环

import asyncio
import time
from threading import Threadnow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_some_work(x):print('Waiting {}'.format(x))await asyncio.sleep(x)print('Done after {}s'.format(x))def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中创建一个 new_loop,然后在另外的子线程中开启一个无限事件循环。 主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被 block。一共执行的时间大概在 6s 左右。

master - worker 主从模式

对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似 master-worker 的方式,master 主要用户获取队列的 msg,worker 用户处理消息。

为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用 redis 的队列。主线程中有一个是无限循环,用户消费队列。

import time
import asyncio
from threading import Thread
import redisdef get_redis():  # 返回一个 redis 连接对象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)def start_loop(loop):  # 开启事件循环asyncio.set_event_loop(loop)loop.run_forever()async def worker(task):print('Start worker')while True:# start = now()# task = rcon.rpop("queue")   # 从 redis 中 取出的数据# if not task:#     await asyncio.sleep(1)#     continueprint('Wait ', int(task))  # 取出了相应的任务await asyncio.sleep(int(task))print('Done ', task, now() - start)now = lambda: time.time()
rcon = get_redis()
start = now()
# 创建一个事件循环
new_loop = asyncio.new_event_loop()
# 创建一个线程 在新的线程中开启事件循环
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)  # 设置线程为守护模式
t.start()  # 开启线程try:while True:task = rcon.rpop("queue")  # 不断从队列中获取任务if not task:time.sleep(1)continue# 包装为 task ins, 传入子线程中的事件循环asyncio.run_coroutine_threadsafe(worker(task), new_loop)
except Exception as e:print('error', e)new_loop.stop()  # 出现异常 关闭时间循环
finally:pass

给队列添加一些数据:

127.0.0.1:6379[3]> lpush queue 2
(integer) 1
127.0.0.1:6379[3]> lpush queue 5
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
(integer) 1
127.0.0.1:6379[3]> lpush queue 1

可以看见输出:

Waiting  2
Done 2
Waiting  5
Waiting  1
Done 1
Waiting  1
Done 1
Done 5

我们发起了一个耗时5s的操作,然后又发起了连个1s的操作,可以看见子线程并发的执行了这几个任务,其中5s awati的时候,相继执行了1s的两个任务。

改进:

import time
import redis
import asyncio
from threading import Threadredis_queue_name = 'redis_list:test'def get_redis():  # 返回一个 redis 连接对象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.StrictRedis(connection_pool=connection_pool)def add_data_to_redis_list(*args):redis_conn = get_redis()redis_conn.lpush(redis_queue_name, *args)def start_loop(loop=None):  # 开启事件循环asyncio.set_event_loop(loop)loop.run_forever()async def worker(task=None):print('Start worker')while True:# task = redis_conn.rpop("queue")   # 从 redis 中 取出的数据# if not task:#     await asyncio.sleep(1)#     continueprint('Wait ', int(task))  # 取出了相应的任务# 这里只是简单的睡眠传入的秒数await asyncio.sleep(int(task))def main():redis_conn = get_redis()# 创建一个事件循环new_loop = asyncio.new_event_loop()# 创建一个线程 在新的线程中开启事件循环t = Thread(target=start_loop, args=(new_loop,))t.setDaemon(True)  # 设置线程为守护模式t.start()  # 开启线程try:while True:task = redis_conn.rpop(name=redis_queue_name)  # 不断从队列中获取任务if not task:time.sleep(1)continue# 包装为 task , 传入子线程中的事件循环asyncio.run_coroutine_threadsafe(worker(task), new_loop)except Exception as e:print('error', e)new_loop.stop()  # 出现异常 关闭时间循环finally:new_loop.close()if __name__ == '__main__':# data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]# add_data_to_redis_list(*data_list)main()pass

redis队列模型( 生产者 --- 消费者 )

参考:Python中协程异步IO(asyncio)详解 - 知乎

下面代码的主线程和双向队列的主线程有些不同,只是换了一种写法而已,代码如下

生产者代码:

import redisconn_pool = redis.ConnectionPool(host='127.0.0.1')
redis_conn = redis.Redis(connection_pool=conn_pool)redis_conn.lpush('coro_test', '1')
redis_conn.lpush('coro_test', '2')
redis_conn.lpush('coro_test', '3')
redis_conn.lpush('coro_test', '4')

消费者代码:

import asyncio
from threading import Thread
import redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在执行name:', name)await asyncio.sleep(2)return '返回结果:' + nameredis_conn = get_redis()new_loop = asyncio.new_event_loop()
loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()# 循环接收redis消息并动态加入协程
while True:msg = redis_conn.rpop('coro_test')if msg:asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)

改进:

import asyncio
from threading import Thread
import redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在执行name:', name)await asyncio.sleep(2)return '返回结果:' + nameif __name__ == '__main__':redis_queue_name = 'redis_list:test'redis_conn = get_redis()for num in range(1, 10):redis_conn.lpush(redis_queue_name, num)new_loop = asyncio.new_event_loop()loop_thread = Thread(target=start_thread_loop, args=(new_loop,))loop_thread.setDaemon(True)loop_thread.start()# 循环接收redis消息并动态加入协程while True:msg = redis_conn.rpop(name=redis_queue_name)if msg:asyncio.run_coroutine_threadsafe(thread_example('King_' + msg.decode('utf-8')), new_loop)pass

停止子线程

如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,会抛出KeyboardInterrupt错误,我们修改一下主循环:

try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:print(e)new_loop.stop()

可是实际上并不好使,虽然主线程 try 了 KeyboardInterrupt异常,但是子线程并没有退出,为了解决这个问题,可以设置子线程为守护线程,这样当主线程结束的时候,子线程也随机退出。

new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)    # 设置子线程为守护线程
t.start()try:while True:# print('start rpop')task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:print(e)new_loop.stop()

线程停止程序的时候,主线程退出后,子线程也随机退出才了,并且停止了子线程的协程任务。

aiohttp

在消费队列的时候,我们使用 asyncio 的 sleep 用于模拟耗时的 io 操作。以前有一个短信服务,需要在协程中请求远程的短信 api,此时需要是需要使用 aiohttp 进行异步的 http 请求。大致代码如下:

server.py

import time
from flask import Flask, requestapp = Flask(__name__)@app.route('/<int:x>')
def index(x):time.sleep(x)return "{} It works".format(x)@app.route('/error')
def error():time.sleep(3)return "error!"if __name__ == '__main__':app.run(debug=True)

接口表示短信接口,/error 表示请求/失败之后的报警。

async-custoimer.py

import time
import asyncio
from threading import Thread
import redis
import aiohttpdef get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)return await resp.text()async def do_some_work(x):print('Waiting ', x)try:ret = await fetch(url='http://127.0.0.1:5000/{}'.format(x))print(ret)except Exception as e:try:print(await fetch(url='http://127.0.0.1:5000/error'))except Exception as e:print(e)else:print('Done {}'.format(x))new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)
t.start()try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:print('error')new_loop.stop()
finally:pass

有一个问题需要注意,我们在fetch的时候try了异常,如果没有try这个异常,即使发生了异常,子线程的事件循环也不会退出。主线程也不会退出,暂时没找到办法可以把子线程的异常raise传播到主线程。(如果谁找到了比较好的方式,希望可以带带我)。

对于 redis 的消费,还有一个 block 的方法:

try:while True:_, task = rcon.brpop("queue")asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:print('error', e)new_loop.stop()
finally:pass

使用 brpop方法,会 block 住 task,如果主线程有消息,才会消费。测试了一下,似乎 brpop 的方式更适合这种队列消费的模型。

127.0.0.1:6379[3]> lpush queue 5
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
(integer) 1
127.0.0.1:6379[3]> lpush queue 1

可以看到结果

Waiting  5
Waiting  1
Waiting  1
200
1 It works
Done 1
200
1 It works
Done 1
200
5 It works
Done 5

协程消费

主线程用于监听队列,然后子线程的做事件循环的worker是一种方式。还有一种方式实现这种类似master-worker的方案。即把监听队列的无限循环逻辑一道协程中。程序初始化就创建若干个协程,实现类似并行的效果。

import time
import asyncio
import redisnow = lambda : time.time()def get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()async def worker():print('Start worker')while True:start = now()task = rcon.rpop("queue")if not task:await asyncio.sleep(1)continueprint('Wait ', int(task))await asyncio.sleep(int(task))print('Done ', task, now() - start)def main():asyncio.ensure_future(worker())asyncio.ensure_future(worker())loop = asyncio.get_event_loop()try:loop.run_forever()except KeyboardInterrupt as e:print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())loop.stop()loop.run_forever()finally:loop.close()if __name__ == '__main__':main()

这样做就可以多多启动几个worker来监听队列。一样可以到达效果。

总结

上述简单的介绍了asyncio的用法,主要是理解事件循环,协程和任务,future的关系。异步编程不同于常见的同步编程,设计程序的执行流的时候,需要特别的注意。毕竟这和以往的编码经验有点不一样。可是仔细想想,我们平时处事的时候,大脑会自然而然的实现异步协程。比如等待煮茶的时候,可以多写几行代码。

相关代码文件的 Gist

参考:Threaded Asynchronous Magic and How to Wield It

示例:

参考:Python 中的并发处理之 asyncio 包使用的详解:https://www.jb51.net/article/137681.htm

import asyncio
import itertools
import sysasync def spin(msg):for char in itertools.cycle('|/-\\'):status = char + ' ' + msgprint(status)try:# 使用 await asyncio.sleep(.1) 代替 time.sleep(.1),这样的休眠不会阻塞事件循环。await asyncio.sleep(.1)except asyncio.CancelledError:# 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求,因此退出循环。breakasync def slow_function():# 假装等待I/O一段时间# await asyncio.sleep(3) 表达式把控制权交给主循环,在休眠结束后恢复这个协程。await asyncio.sleep(3)return 42async def supervisor():# asyncio.ensure_future(...) 函数排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回。spinner = asyncio.ensure_future(spin('thinking!'))print('spinner object:', spinner)t_result = await slow_function()  # 驱动 slow_function() 函数。结束后,获取返回值。# 同时,事件循环继续运行,因为slow_function 函数最后使用 await asyncio.sleep(3) 表达式把控制权交回给了主循环。# Task 对象可以取消;取消后会在协程当前暂停的 yield 处抛出 asyncio.CancelledError 异常。# 协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消。spinner.cancel()return t_resultif __name__ == '__main__':loop = asyncio.get_event_loop()  # 获取事件循环的引用# 驱动 supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值。result = loop.run_until_complete(supervisor())loop.close()print('Answer:', result)

二、避免阻塞型调用

1、有两种方法能避免阻塞型调用中止整个应用程序的进程:

  1. 在单独的线程中运行各个阻塞型操作。
  2. 把每个阻塞型操作转换成非阻塞的异步调用。

使用多线程处理大量连接时将耗费过多的内存,故此通常使用回调来实现异步调用。

2、使用Executor对象防止阻塞事件循环:

使用 loop.run_in_executor 把阻塞的作业(例如保存文件)委托给线程池做。

示例:

import asyncio
import timeasync def shop(delay, what):print(what)await asyncio.sleep(delay)print(what,"...出来了")async def main():task1 = asyncio.create_task(shop(8, '女朋友看衣服..'))task2 = asyncio.create_task(shop(5, '体验手机..'))print(time.ctime(), "开始逛街")await task1await task2print(time.ctime(), "结束.")asyncio.run(main())

Python --- aiohttp 的使用

参考: https://www.cnblogs.com/ssyfj/p/9222342.html

1. aiohttp 的简单使用(配合asyncio模块)

import asyncio
import aiohttpasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:# 或者直接 await r.read()不编码,直接读取,适合于图像等无法编码文件resp = await r.text(encoding="utf-8")print(resp)print('*' * 50)tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.chouti.com/')
]if __name__ == '__main__':event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

2. 发起一个 session 请求

import asyncio
import aiohttpasync def fetch_async(url):print(url)async with aiohttp.ClientSession() as session:  # 协程嵌套,只需要处理最外层协程即可fetch_asyncasync with session.get(url) as resp:print(resp.status)# 因为这里使用到了 await 关键字,实现异步,所有他上面的函数体需要声明为异步asyncprint(await resp.text())print('*' * 50)if __name__ == '__main__':tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.cnblogs.com/ssyfj/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

除了上面的 get 方法外,会话还支持 post,put,delete .... 等

session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

不要为每次的连接都创建一次session,一般情况下只需要创建一个session,然后使用这个session执行所有的请求。

每个session对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。

3. 在 url 中传递参数(其实与 requests 模块使用大致相同)

只需要将参数字典,传入 params 参数中即可

import asyncio
import aiohttp
from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)# print(await r.read())ss = etree.HTML(text=await r.text())movie_name = ss.xpath('//ol[@class="grid_view"]//div[@class="hd"]//a//span[1]//text()')print(f'len: {len(list(map(lambda i=None: print(i), movie_name)))}')if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

4. 获取响应内容(由于获取响应内容是一个阻塞耗时过程,所以我们使用await实现协程切换)

(1)使用 text() 方法

import asyncio
import aiohttp
from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset)       # 查看默认编码为 utf-8print(await r.text())  # 不编码使用默认编码,也可以使用 encoding 指定编码if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

(2)使用 read() 方法,不进行编码,为字节形式

import asyncio
import aiohttp
from lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(await r.read())if __name__ == '__main__':# https://movie.douban.com/top250?start=50tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

(3)注意:text()、read() 方法是把整个响应体读入内存,如果你是获取大量的数据,请考虑使用 "字节流"(StreamResponse)

5. 特殊响应内容 json(和上面一样)

import asyncio
import aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset)# print(await r.json())  # 可以设置编码,设置处理函数print(await r.read())if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

6. 字节流形式获取数据(不像 text、read 一次获取所有数据)

注意:我们获取的 session.get() 是 Response 对象,他继承于 StreamResponse

import asyncio
import aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(await r.content.read(10))  # 读取前10字节if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

下面字节流形式读取数据,保存文件

import asyncio
import aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

注意:

async with session.get(url,params=params) as r:  # 异步上下文管理器with open(filename,"wb") as fp:                  # 普通上下文管理器

两者的区别:在于异步上下文管理器中定义了 __aenter__和__aexit__方法

异步上下文管理器指的是在enterexit方法处能够暂停执行的上下文管理器

 为了实现这样的功能,需要加入两个新的方法:__aenter__ 和__aexit__。这两个方法都要返回一个 awaitable类型的值。

推文:异步上下文管理器:https://blog.csdn.net/tinyzhao/article/details/52684473

7. 自定义请求头(和 requests 一样)

import asyncio
import aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:headers = {'Content-Type': 'text/html; charset=utf-8'}async with session.get(url, params=params, headers=headers) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# https://movie.douban.com/top250?start=100tasks = [func1('https://movie.douban.com/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

8. 自定义 cookie

注意:对于自定义cookie,我们需要设置在 ClientSession(cookies=自定义cookie字典),而不是session.get()中

class ClientSession:def __init__(self, *, connector=None, loop=None, cookies=None,headers=None, skip_auto_headers=None,auth=None, json_serialize=json.dumps,request_class=ClientRequest, response_class=ClientResponse,ws_response_class=ClientWebSocketResponse,version=http.HttpVersion11,cookie_jar=None, connector_owner=True, raise_for_status=False,read_timeout=sentinel, conn_timeout=None,timeout=sentinel,auto_decompress=True, trust_env=False,trace_configs=None):

使用:

cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:

9. 获取当前访问网站的 cookie

async with session.get(url) as resp:print(resp.cookies)

10. 获取网站的响应状态码

async with session.get(url) as resp:print(resp.status)

11. 查看响应头

resp.headers         # 来查看响应头,得到的值类型是一个dict:
resp.raw_headers   # 查看原生的响应头,字节类型

12. 查看重定向的响应头(我们此时已经到了新的网址,向之前的网址查看)

resp.history  # 查看被重定向之前的响应头

13. 超时处理

默认的IO操作都有5分钟的响应时间 我们可以通过 timeout 进行重写:

async with session.get('https://github.com', timeout=60) as r:...

如果 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长。

14. ClientSession 用于在多个连接之间(同一网站)共享cookie,请求头等

import asyncio
import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("https://segmentfault.com/q/1010000007987098") as r:print(session.cookie_jar.filter_cookies("https://segmentfault.com"))async with session.get("https://segmentfault.com/hottest") as rp:print(session.cookie_jar.filter_cookies("https://segmentfault.com"))if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()
Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q
Set-Cookie: my_cookie=my_value
Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2q
Set-Cookie: my_cookie=my_value

我们最好使用session.cookie_jar.filter_cookies()获取网站cookie,不同于requests模块,虽然我们可以使用rp.cookies有可能获取到cookie,但似乎并未获取到所有的cookies。

import asyncio
import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("https://segmentfault.com/q/1010000007987098") as rp_1:print('1' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))# 首次访问会获取网站设置的 cookie# Set-Cookie: PHPSESSID=web2~jh3ouqoabvr4e72f87vtherkp6; Domain=segmentfault.com; Path=/ print('2' * 50)print(f'rp_1.cookies:{rp_1.cookies}')async with session.get("https://segmentfault.com/hottest") as rp_2:print('3' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print('4' * 50)print(f'rp_2.cookies:{rp_2.cookies}')  # 为空,服务端未设置 cookieasync with session.get("https://segmentfault.com/newest") as rp_3:print('5' * 50)print(session.cookie_jar.filter_cookies("https://segmentfault.com"))print('6' * 50)print(f'rp_3.cookies:{rp_3.cookies}')  # 为空,服务端未设置cookieif __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

运行结果:

11111111111111111111111111111111111111111111111111
Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9
Set-Cookie: my_cookie=my_value
22222222222222222222222222222222222222222222222222
rp_1.cookies:Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9; Domain=segmentfault.com; Path=/
33333333333333333333333333333333333333333333333333
Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9
Set-Cookie: my_cookie=my_value
44444444444444444444444444444444444444444444444444
rp_2.cookies:
55555555555555555555555555555555555555555555555555
Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9
Set-Cookie: my_cookie=my_value
66666666666666666666666666666666666666666666666666
rp_3.cookies:

总结:

当我们使用 rp.cookie 时,只会获取到当前 url 下设置的 cookie,不会维护整站的cookie而session.cookie_jar.filter_cookies("https://segmentfault.com")会一直保留这个网站的所有设置cookies,含有我们在会话时设置的cookie,并且会根据响应修改更新cookie。这个才是我们需要的而我们设置cookie,也是需要在aiohttp.ClientSession(cookies=cookies)中设置

ClientSession 还支持 请求头,keep-alive连接和连接池(connection pooling)

15. cookie的安全性

默认 ClientSession 使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受 url 和 ip 地址产生的 cookie,只能接受 DNS 解析 IP 产生的 cookie。可以通过设置 aiohttp.CookieJar 的 unsafe=True 来配置:

jar = aiohttp.CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)

16. 控制同时连接的数量(连接池)

TCPConnector 维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求

import asyncio
import aiohttpasync def func1():cookies = {'my_cookie': "my_value"}conn = aiohttp.TCPConnector(limit=2)  # 默认100,0表示无限async with aiohttp.ClientSession(cookies=cookies, connector=conn) as session:for i in range(7, 35):url = "https://www.ckook.com/list-%s-1.html" % iasync with session.get(url) as rp:print('---------------------------------')print(rp.status)if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

限制同时打开连接到同一端点的数量( (host, port, is_ssl) 三的倍数),可以通过设置 limit_per_host 参数:

limit_per_host: 同一端点的最大连接数量。同一端点即(host, port, is_ssl)完全相同

conn = aiohttp.TCPConnector(limit_per_host=30)#默认是0

在协程下测试效果不明显

17. 自定义域名解析地址

我们可以指定域名服务器的 IP 对我们提供的get或post的url进行解析:

from aiohttp.resolver import AsyncResolverresolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
conn = aiohttp.TCPConnector(resolver=resolver)

18. 设置代理

aiohttp支持使用代理来访问网页:

async with aiohttp.ClientSession() as session:async with session.get("http://python.org", proxy="http://some.proxy.com") as resp:print(resp.status)

当然也支持需要授权的页面:

async with aiohttp.ClientSession() as session:proxy_auth = aiohttp.BasicAuth('user', 'pass')  # 用户,密码async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp:print(resp.status)

或者通过这种方式来验证授权:

session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

19. post传递数据的方法

(1)模拟表单

payload = {'key1': 'value1', 'key2': 'value2'}
async with session.post('http://httpbin.org/post', data=payload) as resp:print(await resp.text())

注意:data=dict 的方式 post 的数据将被转码,和 form 提交数据是一样的作用,如果你不想被转码,可以直接以字符串的形式 data=str 提交,这样就不会被转码。

(2)post json

payload = {'some': 'data'}async with session.post(url, data=json.dumps(payload)) as resp:

其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串可以被识别为json格式

(3)post 小文件

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}await session.post(url, data=files)
url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')await session.post(url, data=data)

如果将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。

(4)post 大文件

aiohttp支持多种类型的文件以流媒体的形式上传,所以我们可以在文件未读入内存的情况下发送大文件。

@aiohttp.streamer
def file_sender(writer, file_name=None):with open(file_name, 'rb') as f:chunk = f.read(2 ** 16)while chunk:yield from writer.write(chunk)chunk = f.read(2 ** 16)# Then you can use `file_sender` as a data provider:async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp:print(await resp.text())

(5)从一个url获取文件后,直接post给另一个url

r = await session.get('http://python.org')
await session.post('http://httpbin.org/post',data=r.content)

(6)post 预压缩数据

在通过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(通常是deflate 或 zlib)作为content-encoding的值:

import zlibasync def my_coroutine(session, headers, my_data):data = zlib.compress(my_data)headers = {'Content-Encoding': 'deflate'}async with session.post('http://httpbin.org/post', data=data, headers=headers):pass

Python 协程爬虫  ---  aiohttp + aiomultiprocess 使用

aiohttp 是基于 asyncio 的一个异步http客户端和服务器

官方文档:https://aiohttp.readthedocs.io/en/stable/client_quickstart.html

aiomultiprocess :https://github.com/omnilib/aiomultiprocess

简单实用例子

async def funct(index):   print("start ", index)   async with aiohttp.ClientSession() as session:       async with session.get("https://movie.douban.com/top250?start=0", timeout=5) as resp:           print(resp.status)print(await resp.text())   print("end ", index)

aiohttp.ClientSession()

创建会话,session 提供了各种请求方法,如 get、post、delete、put 等。认识新的关键字 async with,因为是协程的上下文管理,所以多了async关键字。这个不是强制使用的,你也可以自己手动关闭会话,但是一定要记得关闭

注意:

  • 1、不要为每个请求创建会话。每个应用程序很可能需要一个会话来执行所有请求。
  • 2、aiohttp 在发送请求之前在内部执行 URL 规范化。要禁用规范化,请使用 encoded=True 参数进行URL构建

获取响应信息

resp.status                   # 状态码
await resp.text()             # 获取响应正文,可以指定编码
await resp.read()             # 读取二进制响应内容
await resp.json()             # 获取json响应内容
await resp.content.read(size) # 读取流

注意事项:aiohttp 是在 await resp.text() 之后才发起请求的,所以必须调用之后才能获取响应的内容,不然会出现异常 aiohttp.client_exceptions.ClientConnectionError: Connection closed

aiomultiprocess

asyncio 和多处理本身是有用的,但有局限性:asyncio 仍然不能超过 GIL 的速度,并且多处理一次只能处理一个任务。但是,他们在一起可以充分实现自己的真正潜力。

aiomultiprocess 提供了一个简单的界面,同时在每个子进程上运行完整的 asyncio 事件循环,从而实现了 Python 应用程序从未有过的并发级别。每个子进程可以一次执行多个协程,仅受工作量和可用内核数限制。

注:aiomultiprocess 需要 Python 3.6 或更高版本

import asyncio
from aiohttp import request
from aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:async for result in pool.map(get, urls):...  # process resultif __name__ == '__main__':# Python 3.7asyncio.run(main())# Python 3.6# loop = asyncio.get_event_loop()# loop.run_until_complete(main())

用法:在子进程中执行协程

import asyncio
from aiohttp import request
from aiomultiprocess import Processasync def put(url, params):async with request("PUT", url, params=params) as response:passasync def main():p = Process(target=put, args=("https://jreese.sh", {}))await pif __name__ == "__main__":asyncio.run(main())

如果您想从协程中获取结果 Worker,请使用以下方法:

import asyncio
from aiohttp import request
from aiomultiprocess import Workerasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():p = Worker(target=get, args=("https://jreese.sh",))response = await pif __name__ == "__main__":asyncio.run(main())

如果您需要一个托管的工作进程池,请使用 Pool

import asyncio
from aiohttp import request
from aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:result = await pool.map(get, urls)if __name__ == "__main__":asyncio.run(main())

示例:

import time
import json
import datetime
import asyncio
import hashlib
from aiomultiprocess import Pool
from redis import *
from pybloom_live import BloomFilter
import aiohttp#
# Public variable
#Bloom_data = BloomFilter(1000000000, 0.01)
DB_get_question = StrictRedis(host='62.234.9.254', port=6480, password='lingmiao2015', db=4
)
pipeline_redis = DB_get_question.pipeline()#
# Public functions
#def md5(data):"""对数据进行MD5加密:param data::return:"""md5_qa = hashlib.md5(data.encode('utf8')).hexdigest()md5_qa = bytes(md5_qa, encoding='utf8')return md5_qaasync def get(data):"""协程函数:param url::return:"""# while True:# print('data:',data)# try:url = ''async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:get_proxy = DB_get_question.spop('IP_PROXY')response = await session.post(url, json=data, timeout=7, proxy={"http": "http://{}".format(get_proxy)})result = await response.text()hjson = json.loads(result)content = hjson['results'][0]['values']['text']# print('data:',data)print('\033[32;1mget_question\033[0m:', content)await asyncio.sleep(0.1)return content# except:#     open('error_url.txt','a').write(url + '\n')#     await get(data)async def request():"""使用进程加异步协程发送请求:return:"""key_number = 0datas = ['']split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '').replace('b', '')while len(datas) != 0:key_number += 1if len(datas) > 1:async with Pool() as pool:get_proxy = DB_get_question.spop('IP_PROXY')result_list = await pool.map(get, datas)# print(result_list)for result in result_list:if result:# print('key',key)# print('phone', phone)if '请求次数' in result or 'key不对' in result or '请求内容为空' in result:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')breakmd5_qa = md5(result)if md5_qa not in Bloom_data:Bloom_data.add(md5_qa)#     pipeline_redis.lpush('total_question_list', result)pipeline_redis.sadd('get_question', result)pipeline_redis.execute()datas.clear()question_number = 0while True:question_number += 1pipeline_redis.spop('original_question_set')if question_number == 100:question_list = pipeline_redis.execute()breakdatas = {}print('datas', datas)print(datas)if key_number == 500:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')key2_set_number = DB_get_question.scard('key2_set')if key2_set_number < 5:with open('key2_total.txt', 'r') as f_key:for key in f_key:key = key.strip()pipeline_redis.sadd('key2_set', key)pipeline_redis.execute()key_number = 0coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

基于 asyncio、aiohttp、xpath 的异步爬虫

参看:https://blog.csdn.net/weixin_34290390/article/details/88772610

# asyncio爬虫, 去重, 入库import asyncio
import re
import aiohttp
import aiomysql
from pyquery import PyQuerystopping = Falsestart_url = 'http://www.jobbole.com'
waiting_urls = []
seen_urls = set()  # 实际使用爬虫去重时,数量过多,需要使用布隆过滤器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html):  # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool):  # 获取文章详情并解析入库html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text()  # 为了简单, 只获取title的内容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql)  # 插入数据库# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0:  # 如果使用asyncio.Queue的话, 不需要我们来处理这些逻辑。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?jobbole.com/\d+/', url):if url not in seen_urls:  # 是没有处理过的url,则处理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql连接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset  autocommit必须设置, 这是坑, 不写数据库写入不了中文数据async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()

改    进( 生产者 --- 消费者 从 redis 取数据 )

config.py

import os
import syssys.path.append(os.getcwd())
sys.path.append("..")
sys.path.append(os.path.abspath("../../"))DEV_OR_PRD = 'dev'
REDIS_CONFIG = Noneif 'dev' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,'db': 0,'password': None}pass
elif 'prd' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,}pass

produce.py

import redis
import json
import requests
from config import REDIS_CONFIGdef add_task():payload = {}headers = {'Host': 'www.kuwo.cn','csrf': 'E7D1IGX45D','Cookie': 'kw_token=E7D1IGX45D; kw_token=AKH1VOZ2767'}queue_name = 'redis_list:test'redis_conn = redis.StrictRedis(**REDIS_CONFIG)for _ in range(100):for page_num in range(1, 11):url = f"http://www.kuwo.cn/api/www/bang/bang/musicList?bangId=16&pn={page_num}&rn=30"response = requests.request("GET", url, headers=headers, data=payload)try:music_list = response.json()['data']['musicList']except BaseException as e:music_list = []for item in music_list:task = {'crawl_url': item['pic'],'song_name': item['name']}redis_conn.lpush(queue_name, json.dumps(task, ensure_ascii=False))print(f'task:{task}')print(f'page {page_num} end')if __name__ == '__main__':add_task()pass

consumer.py

import re
import asyncio
import aiohttp
import redis
import json
import datetime
from config import REDIS_CONFIGmax_worker = 50
current_worker = 0def write_file(future=None):resp_data = future.result()if not resp_data:returntime_int = int(datetime.datetime.now().timestamp() * 1000)with open(f'./img/{time_int}.jpg', 'wb') as f:f.write(resp_data)async def fetch(url, session):global current_workercurrent_worker += 1try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:# data = await resp.text()data = await resp.read()current_worker -= 1return dataexcept Exception as e:print(e)return Noneasync def consumer(redis_conn=None, queue_name=None):async with aiohttp.ClientSession() as session:while True:task_string = redis_conn.rpop(queue_name)cha = current_worker - max_workerif cha >= 0 or not task_string:print('超过最大worker, 或者任务为空, 睡1秒继续。。。')await asyncio.sleep(1)continuetask_string = task_string.decode('utf-8')task_dict = json.loads(task_string)crawl_url = task_dict['crawl_url']asyncio_task = asyncio.ensure_future(fetch(crawl_url, session))asyncio_task.add_done_callback(write_file)# await asyncio.sleep(0.001)def main():queue_name = 'redis_list:test'redis_conn_pool = redis.ConnectionPool(**REDIS_CONFIG)redis_conn = redis.StrictRedis(connection_pool=redis_conn_pool)event_loop = asyncio.get_event_loop()try:event_loop.run_until_complete(consumer(redis_conn=redis_conn, queue_name=queue_name))except KeyboardInterrupt as e:event_loop.stop()event_loop.run_forever()finally:event_loop.close()redis_conn.close()if __name__ == '__main__':main()

2. asyncio + aiohttp

import aiohttp
import asyncioasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:response = await r.text(encoding="utf-8")# 或者直接await r.read()不编码,直接读取,适合于图像等无法编码文件# data = await r.read()# print(url, data)print(url, r.status)print(url, response)if __name__ == '__main__':tasks = [fetch_async('https://www.baidu.com/'),fetch_async('https://www.chouti.com/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

示例:

参考:深入理解协程(四):async/await异步爬虫实战:https://www.cnblogs.com/ghostlee/p/12208564.html

import asyncio
import time
import aiohttp
from lxml import etreeurls = ['https://blog.csdn.net/Jmilk/article/details/103218919','https://blog.csdn.net/stven_king/article/details/103256724','https://blog.csdn.net/csdnnews/article/details/103154693','https://blog.csdn.net/dg_lee/article/details/103951021','https://blog.csdn.net/m0_37907797/article/details/103272967','https://blog.csdn.net/zzq900503/article/details/49618605','https://blog.csdn.net/weixin_44339238/article/details/103977138','https://blog.csdn.net/dengjin20104042056/article/details/103930275','https://blog.csdn.net/Mind_programmonkey/article/details/103940511','https://blog.csdn.net/xufive/article/details/102993570','https://blog.csdn.net/weixin_41010294/article/details/104009722','https://blog.csdn.net/yunqiinsight/article/details/103137022','https://blog.csdn.net/qq_44210563/article/details/102826406',
]async def async_get_url(url):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36'}async with aiohttp.ClientSession() as session:  # 解释1async with session.get(url, headers=headers) as r:html = await r.read()try:title = etree.HTML(html).xpath('//h1[@class="title-article"]/text()')[0]print(title)except IndexError:print(f'Fail URL: {r.url}')def async_main():loop = asyncio.get_event_loop()tasks = [async_get_url(url) for url in urls]loop.run_until_complete(asyncio.wait(tasks))# loop.close()if __name__ == '__main__':start = time.time()async_main()print(f'cost time: {time.time() - start}s')

运行结果:

Fail URL: https://blog.csdn.net/weixin_44339238/article/details/103977138
网页实现一个简单的音乐播放器(大佬别看。(⊙﹏⊙))
AES中ECB模式的加密与解密(Python3.7)
【程序人生】程序员接私活常用平台汇总
OOM别慌,手把手教你定位
致 Python 初学者
【图解算法面试】记一次面试:说说游戏中的敏感词过滤是如何实现的?
8年经验面试官详解 Java 面试秘诀
4G EPS 的网络协议栈
【历史总结】Android-Universal-Image-Loader源码分析
你不得不了解的卷积神经网络发展史
java进阶(四)------java编程规范---代码质量检测工具FindBugs、PMD和CheckStyle的安装
中国数据库OceanBase登顶之路
cost time: 0.5409884452819824s

解释1:此处为异步的上下文管理器,是aiohttp官方文档提供的写法。如果对上下文管理器不是很了解的话,可以参看【吃透Python上下文管理器】。

用时:0.5409884452819824s。从两种爬虫的输出结果中可以看到:

  • 文章标题的顺序不同。同步爬虫会按照urls内部的url顺序依次爬取文章标题。而异步爬虫爬取的顺序并不完全和urls中的url顺序相同。
  • 爬取速度差异很大。异步爬虫速度大概是普通同步爬虫的8~10倍。异步爬虫充分利用了网络请求这段时间。从而提高了爬取效率。

3. asyncio + requests 

import asyncio
import requestsasync def fetch_async(func, *args, **kwargs):inner_loop = asyncio.get_event_loop()future = inner_loop.run_in_executor(None, func, *args)response = await futureprint(response.url, response.content)if __name__ == '__main__':tasks = [fetch_async(requests.get, 'https://www.cnblogs.com/wupeiqi/'),fetch_async(requests.get, 'https://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

示例:

import functools  # at the top with the other imports
import asyncio
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)async def fetch_async(lp, func, *args, **kwargs):# inner_loop = asyncio.get_event_loop()# future = inner_loop.run_in_executor(None, func, *args)print(f'*args: {args}')future = lp.run_in_executor(None, functools.partial(func, *args, **kwargs))response = await futureprint(response.url, response.content)if __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [fetch_async(loop, requests.get, 'https://www.cnblogs.com/wupeiqi/', verify=False),fetch_async(loop, requests.get, 'https://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')]results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

示例:

task 协程Future 桥梁

import requests
import asyncio
from concurrent import futures'''
使用多线程:在协程中集成阻塞io原型 :awaitable loop.run_in_executor(executor, func, *args) 
参数 : executor 可以是  ThreadPoolExecutor / ProcessPool  , 如果是None 则使用默认线程池
可使用 yield from 或 await 挂起函数
作用 : 例如在异步事件循环中的写文件操作(或者其他IO操作), 这种慢的操作交给线程去做
'''def get_url(r_url=None):r = requests.get(url=r_url, verify=False)print(r.json())if __name__ == "__main__":loop = asyncio.get_event_loop()executor = futures.ThreadPoolExecutor(3)tasks = []for index in range(20):url = f"http://shop.projectsedu.com/goods/{index}/"task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))

示例:

import asyncio
from concurrent import futuresdef block_func():with open("c:/test.txt", 'rb') as fd:return fd.read(500)async def todo(lp: asyncio.AbstractEventLoop):reader = await lp.run_in_executor(None, block_func)  # 默认线程池print("reader:", reader)with futures.ThreadPoolExecutor() as ex:reader = await lp.run_in_executor(ex, block_func)  # 自己创建一个线程池让事件循环调用print("reader :", reader)loop = asyncio.get_event_loop()
loop.run_until_complete(todo(loop))

4. gevent + requests

from gevent import monkey
monkey.patch_all()import gevent
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_async(method, url, **req_kwargs):print(method, url, req_kwargs)response = requests.request(method=method, url=url, **req_kwargs)# print(f'{response.url}, {response.content}')print(f'{response.url}, {response.status_code}')def test_1():# ##### 发送请求 #####gevent.joinall([gevent.spawn(fetch_async, method='get', url='https://www.python.org/', verify=False),gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', verify=False),gevent.spawn(fetch_async, method='get', url='https://github.com/', verify=False),])def test_2():# #### 发送请求(协程池控制最大协程数量) #####from gevent.pool import Poolpool = Pool(None)gevent.joinall([pool.spawn(fetch_async, method='get', url='https://www.python.org/', verify=False),pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', verify=False),pool.spawn(fetch_async, method='get', url='https://www.github.com/', verify=False),])if __name__ == '__main__':# test_1()test_2()

5. grequests

import grequestsrequest_list = [grequests.get('http://httpbin.org/delay/1', timeout=0.001),grequests.get('http://fakedomain/'),grequests.get('http://httpbin.org/status/500')
]# ##### 执行并获取响应列表 #####
# response_list = grequests.map(request_list)
# print(response_list)# ##### 执行并获取响应列表(处理异常) #####
def exception_handler(request, exception):print(request, exception)print("Request failed")response_list = grequests.map(request_list, exception_handler=exception_handler)
print(response_list)

6. Twisted 示例

twisted学习笔记No.3 Web Clients:https://www.cnblogs.com/tracylining/p/3353808.html

from twisted.web.client import Agent
from twisted.web.client import defer
from twisted.internet import reactordef all_done(arg):reactor.stop()def callback(contents):print(contents)deferred_list = []url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:agent = Agent(reactor)d = agent.request(b'GET', url.encode('utf-8'))# d.addCallbacks(printResource, printError)d.addCallback(callback)deferred_list.append(d)d_list = defer.DeferredList(deferred_list)
d_list.addBoth(all_done)
reactor.run()

7. Tornado

from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop
from tornado.httpclient import HTTPResponsedef handle_response(response: HTTPResponse):"""处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop():param response::return:"""if response.error:print("Error:", response.error)else:# print(response.body)print(f'{response.effective_url} : status_code : {response.code}')def func():url_list = ['https://www.baidu.com','https://www.bing.com',]for url in url_list:print(url)http_client = AsyncHTTPClient()http_client.fetch(HTTPRequest(url), handle_response)ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()

8. Twisted 更多

from twisted.internet import reactor
from twisted.web.client import getPage
from urllib import parsedef one_done(arg: bytes):print(arg.decode('utf-8'))reactor.stop()post_data = parse.urlencode({'check_data': 'adf'})
post_data = bytes(post_data, encoding='utf8')
headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),method=bytes('POST', encoding='utf8'), postdata=post_data, cookies={}, headers=headers
)
response.addBoth(one_done)
reactor.run()

以上均是 Python 内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】.

9.史上最牛逼的异步IO模块( select、poll、epoll )

select

import select
import socket
import timeclass AsyncTimeoutException(TimeoutError):"""请求超时异常类"""def __init__(self, msg):self.msg = msgsuper(AsyncTimeoutException, self).__init__(msg)class HttpContext(object):"""封装请求和相应的基本数据"""def __init__(self, sock, host, port, method, url, data, callback, timeout=5):"""sock: 请求的客户端socket对象host: 请求的主机名port: 请求的端口port: 请求的端口method: 请求方式url: 请求的URLdata: 请求时请求体中的数据callback: 请求完成后的回调函数timeout: 请求的超时时间"""self.sock = sockself.callback = callbackself.host = hostself.port = portself.method = methodself.url = urlself.data = dataself.timeout = timeoutself.__start_time = time.time()self.__buffer = []def is_timeout(self):"""当前请求是否已经超时"""current_time = time.time()if (self.__start_time + self.timeout) < current_time:return Truedef fileno(self):"""请求sockect对象的文件描述符,用于select监听"""return self.sock.fileno()def write(self, data):"""在buffer中写入响应内容"""self.__buffer.append(data)def finish(self, exc=None):"""在buffer中写入响应内容完成,执行请求的回调函数"""if not exc:response = b''.join(self.__buffer)self.callback(self, response, exc)else:self.callback(self, None, exc)def send_request_data(self):content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (self.method.upper(), self.url, self.host, self.data,)return content.encode(encoding='utf8')class AsyncRequest(object):def __init__(self):self.fds = []self.connections = []def add_request(self, host, port, method, url, data, callback, timeout):"""创建一个要请求"""client = socket.socket()client.setblocking(False)try:client.connect((host, port))except BlockingIOError as e:pass# print('已经向远程发送连接的请求')req = HttpContext(client, host, port, method, url, data, callback, timeout)self.connections.append(req)self.fds.append(req)def check_conn_timeout(self):"""检查所有的请求,是否有已经连接超时,如果有则终止"""timeout_list = []for context in self.connections:if context.is_timeout():timeout_list.append(context)for context in timeout_list:context.finish(AsyncTimeoutException('请求超时'))self.fds.remove(context)self.connections.remove(context)def running(self):"""事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""while True:r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)if not self.fds:returnfor context in r:sock = context.sockwhile True:try:data = sock.recv(8096)if not data:self.fds.remove(context)context.finish()breakelse:context.write(data)except BlockingIOError as e:breakexcept TimeoutError as e:self.fds.remove(context)self.connections.remove(context)context.finish(e)breakfor context in w:# 已经连接成功远程服务器,开始向远程发送请求数据if context in self.fds:data = context.send_request_data()context.sock.sendall(data)self.connections.remove(context)self.check_conn_timeout()if __name__ == '__main__':def callback_func(context, response, ex):""":param context: HttpContext对象,内部封装了请求相关信息:param response: 请求响应内容:param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None):return:"""print(context, response, ex)obj = AsyncRequest()url_list = [{'host': 'www.google.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': 'www.baidu.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': 'www.bing.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},]for item in url_list:print(item)obj.add_request(**item)obj.running()

示例 2:

import socket
import selectclass HttpRequest:def __init__(self, sk, host, callback):self.socket = skself.host = hostself.callback = callbackdef fileno(self):return self.socket.fileno()class AsyncRequest:def __init__(self):self.conn = []self.connection = []  # 用于检测是否已经连接成功def add_request(self, host, callback):try:sk = socket.socket()sk.setblocking(0)sk.connect((host, 80,))except BlockingIOError as e:passrequest = HttpRequest(sk, host, callback)self.conn.append(request)self.connection.append(request)def run(self):while True:rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)for w in wlist:print(w.host, '连接成功...')# 只要能循环到,表示socket和服务器端已经连接成功tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" % (w.host,)w.socket.send(bytes(tpl, encoding='utf-8'))self.connection.remove(w)for r in rlist:# r,是HttpRequestrecv_data = bytes()while True:try:chunck = r.socket.recv(8096)recv_data += chunckexcept Exception as e:breakr.callback(recv_data)r.socket.close()self.conn.remove(r)if len(self.conn) == 0:breakdef f1(data):print('保存到文件', data)def f2(data):print('保存到数据库', data)url_list = [{'host': 'www.baidu.com', 'callback': f1},{'host': 'cn.bing.com', 'callback': f2},{'host': 'www.cnblogs.com', 'callback': f2},
]req = AsyncRequest()
for item in url_list:req.add_request(item['host'], item['callback'])req.run()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/495231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

何恺明CVPR演讲:深入理解ResNet和视觉识别的表示学习(41 PPT)

来源&#xff1a;专知摘要&#xff1a;今年CVPR 2018上&#xff0c;刚获得“TPAMI”年轻研究员奖的Facebook的Kaiming He做了一个叫“Learning Deep Representations for Visual Recognition”的讲座。在今年CVPR 2018上&#xff0c;刚获得“TPAMI”年轻研究员奖的Facebook的Ka…

Python 标准库 functools 模块详解

functools 官方文档&#xff1a;https://docs.python.org/zh-cn/3/library/functools.html Python 标准模块 --- functools&#xff1a;https://www.cnblogs.com/zhbzz2007/p/6001827.html python常用模块 - functools 模块&#xff1a;https://www.cnblogs.com/su-sir/p/125…

长寿即服务:创业公司如何用AI技术颠覆传统药物研发

来源&#xff1a;资本实验室摘要&#xff1a;未来学家、奇点大学创始彼得戴曼迪斯&#xff08;Peter Diamandis&#xff09;最近就人工智能对新药研发的推动作用进行了讨论&#xff0c;并结合其新近投资的药物研发公司案例&#xff0c;进一步为我们打开了一扇通往未来新药研发的…

npm 详解

npm 官方文档&#xff1a;https://docs.npmjs.com/cli/v6/commands/npm-install/ npm 模块管理器&#xff1a;http://javascript.ruanyifeng.com/nodejs/npm.html npm 常用命令详解&#xff1a;https://blog.csdn.net/sxs1995/article/details/80729069 1. 什么是 NPM NPM 的…

视觉系统的演化之旅——视觉器官、光感受器及视觉分子

来源&#xff1a;科学网摘要&#xff1a;在所有的感觉信息中&#xff0c;视觉机制可能是最复杂的了。我们每个人都能轻而易举地欣赏大自然的美景——青翠的草木、飞舞的蝴蝶、苍茫的白雪……我们很轻松地欣赏着大自然的色彩斑斓与瞬息万状&#xff0c;但这种神奇的视觉过程到底…

MAC OS 命令行使用详解

MAC OS 命令行使用详解&#xff1a;https://blog.csdn.net/sun375867463/article/details/9812317 1 为什么要使用命令行/如何开启命令行&#xff1f; 许多功能在图形界面不提供&#xff0c;只有通过命令行来实现。Finder 会隐藏许多你不太会需要的文件&#xff0c;然而 comman…

张钹院士:走向真正的人工智能 | CCF-GAIR 2018

来源&#xff1a;AI科技评论摘要&#xff1a;6 月 29 日上午&#xff0c;清华大学研究院院长张钹院士为 CCF-GAIR 2018 主会场「AI 前沿技术」做了题为「走向真正的人工智能」&#xff08;Towards A Real Artifitial Intelligence&#xff09;的大会报告。AI科技评论按&#xf…

FoolWeb 各层代码实例

FoolWeb.DateEngine 数据库操作类库 这里现在只写了一个类用于操作mssql.将来支持别的数据库试直接扩展就行了. 下来带个代码段 1: /// <summary>通用数据库接口 2: /// </summary> 3: using System; 4: using System.Collections; 5: using System.Collections.G…

google、bing、baidu、shodan、censys、ZoomEye 搜索引擎 高级用法

Google guide &#xff1a;http://www.googleguide.com/advanced_operators_reference.html 相关资料&#xff1a; &#xff1a;http://www.bubuko.com/infodetail-2292041.html&#xff1a;http://www.pinginglab.net/open/course/9&#xff1a;https://download.csdn.net/d…

南云等PNAS研究论文:揭示儿童音乐学习向语言领域迁移的脑机制

来源&#xff1a;神经科技摘要&#xff1a;南云等的研究结果表明&#xff0c;半年的钢琴训练提升了4到5岁儿童大脑对跨音乐与语言领域的音高变化的敏感性2018年6月25日&#xff0c;美国科学院院刊PNAS在线发表了北京师范大学认知神经科学与学习国家重点实验室、IDG/McGovern脑科…

HDU1028——I gnatius and the Princess III

母函数&#xff0c;不解释…… View Code #include <stdio.h>#define N 10001int c1[N], c2[N];int main(){int n,i,j,k;while (scanf("%d", &n) ! EOF ) {for (i0;i<n;i) { c1[i]0; c2[i]0; }for (i0;i<n;…

菜鸟教程 之 JavaScript 函数(function)

From&#xff1a;https://www.runoob.com/js/js-function-definition.html JavaScript 函数定义 JavaScript 使用关键字 function 定义函数。 函数 可以 通过声明定义&#xff0c;函数 也可以 是一个表达式。函数声明 分号 是用来分隔可执行 JavaScript 语句。 由于函数声明不…

深度研报:六大视角解读人工智能,AI岂止于技术

来源&#xff1a; 亿欧摘要&#xff1a; 人工智能只是一项新技术吗&#xff1f;从任一角度诠释AI&#xff0c;都是狭隘并且不完整的。亿欧智库在最新发布的研究报告中&#xff0c;总结两年来的研究成果&#xff0c;从学界、企业、投资、国家等六大视角对人工智能进行了全面解读…

为什么有的机器学习应用公司必将失败?

作者 | Cassie Kozyrkov编译 | Leo出品 | 人工智能头条摘要&#xff1a;告诉大家一个秘密&#xff1a;当人们说起“ 机器学习 ”时&#xff0c;听起来好像只是在谈论一门学科&#xff0c;但其实是两门。如果企业不了解其中的差异&#xff0c;那么就可能招惹来满世界的麻烦。两个…

[转]迭代、集合、字典表和列表

集合在编程的过程中用的是非常的多&#xff0c;如GridViewRowCollection、ConnectionStringSettingsCollection、NameValueCollection等等。一般来说&#xff0c;集合的类都包含在System.Collections命名空间中。那众多集合之间又有什么样的联系呢&#xff1f;这需要我们从集合…

菜鸟教程 之 HTML DOM 和 浏览器BOM

HTML DOM&#xff1a;https://www.runoob.com/js/js-htmldom.html 浏览器对象模型 (BOM)&#xff1a;https://www.runoob.com/js/js-window.html DOM、DOCUMENT、BOM、WINDOW 有什么区别? &#xff1a;https://www.zhihu.com/question/33453164 通过 HTML DOM&#xff0c;Ja…

IBM:物联网应重启,否则注定会失望

来源&#xff1a;IBM商业价值研究院摘要&#xff1a;尽管技术推动着物联网向前发展&#xff0c;但与此同时&#xff0c;缺少有吸引力且可持续盈利的商业模式将阻碍物联网的发展。如果未来的商业模式不同于目前硬件和软件平台的商业模式&#xff0c;那么&#xff0c;它是什么样的…

联通、华为《5G室内覆盖》白皮书!

来源&#xff1a;5G摘要&#xff1a;近日&#xff0c;中国联通、华为联合发布了《面向5G的室内覆盖数字化演进白皮书》。干货报告未来智能实验室是人工智能学家与科学院相关机构联合成立的人工智能&#xff0c;互联网和脑科学交叉研究机构。未来智能实验室的主要工作包括&#…

JavaScript 执行机制

前端开发&#xff0c;一篇文章让你彻底搞懂&#xff0c;什么是JavaScript执行机制&#xff01;&#xff1a;https://zhuanlan.zhihu.com/p/139261821 大白话讲解 JavaScript 执行机制&#xff0c;一看就懂&#xff1a;https://www.jianshu.com/p/22641c97e351 JavaScript 运行…

互联网大脑进化简史,华为云EI智能体加入-2018年7月新版

要&#xff1a;华为云EI智能体是2018年以来产生的第八个类脑智能巨系统&#xff0c;在中国&#xff0c;目前除了小米、联想、今日头条&#xff0c;几乎所有的互联网巨头都提出了自己的”大脑“系统建设计划。1969年互联网诞生以来&#xff0c;网状模型一直是互联网最基础和重要…