Python 的 Gevent --- 高性能的 Python 并发框架

From:http://www.xuebuyuan.com/1604603.html
Gevent 指南(英文):http://sdiehl.github.io/gevent-tutorial
Gevent 指南(中文):http://xlambda.com/gevent-tutorial
Gevent 指南(中文)下载地址:http://download.csdn.net/download/freeking101/9924351
初试Gevent – 高性能的 Python 并发框架:http://python.jobbole.com/87041

Python 开发使用 Gevent:http://sdiehl.github.io/gevent-tutorial/

greenlet、Eventlet,gevent 推荐 )、

  • greenlet :轻量级的并行编程,调度麻烦,用生成器实现的协程而且不是真正意义上的协程,只是实现代码执行过程中的挂起,唤醒操作。Greenlet 没有自己的调度过程,所以一般不会直接使用。greenlet:http://greenlet.readthedocs.org/en/latest/
  • eventlet:是在 greenlet 的基础上实现了自己的 GreenThread,实际上就是 greenlet 类的扩展封装,而与Greenlet的不同是,Eventlet实现了自己调度器称为Hub,Hub类似于Tornado的IOLoop,是单实例的。在Hub中有一个event loop,根据不同的事件来切换到对应的GreenThread。同时 Eventlet 还实现了一系列的补丁来使 Python 标准库中的 socket 等等module 来支持 GreenThread 的切换。Eventlet 的 Hub 可以被定制来实现自己调度过程。eventlet 目前支持 CPython 2.7 和 3.4+ 并将在未来删除,仅保留 CPython 3.5+ 支持。
  • Gevent:基于 libev 与 Greenlet 实现。不同于 Eventlet 的用 python 实现的 hub 调度,Gevent 通过 Cython 调用 libev 来实现一个高效的 event loop 调度循环。同时类似于 Event,Gevent 也有自己的 monkey_patch,在打了补丁后,完全可以使用 python 线程的方式来无感知的使用协程,减少了开发成本。 gevent 官网文档:http://www.gevent.org/contents.html

Gevent 主要特性有以下几点:

  1. 基于 libev 的快速事件循环,Linux上面的是 epoll 机制
  2. 基于 greenlet 的 轻量级执行单元
  3. API 复用了 Python 标准库里的内容。API 的概念和 Python 标准库一致(如事件,队列)。
  4. TCP / UDP / HTTP 服务器 
  5. 支持 SSL 的协作式 sockets
  6. 子进程支持( 通过 gevent.subprocess )
  7. 线程池
  8. greenlets是确定性的。给定相同的绿色配置和相同的输入集,它们总是产生相同的输出
  9. gevent 每次遇到 io 操作,需要耗时等待时,会自动跳到下一个协程继续执行。
  10. gevent 的代码风格和线程非常相似,运行出来后的效果也非常相似。
  11. 通过 monkey patching 功能来使得第三方模块变成协作式

libevent 是一个事件分发引擎,greenlet 提供了轻量级线程的支持,gevent 就是基于这两个的一个专门处理网络逻辑的并行库。

  1. gevent.spawn 启动的所有协程,都是运行在同一个线程之中,所以协程不能跨线程同步数据。
  2. gevent.queue.Queue 是协程安全的。
  3. gevent 启动的并发协程,具体到 task function,不能有长时间阻塞的IO操作。因为 gevent 的协程的特点是,当前协程阻塞了才会切换到别的协程。如果当前协程长时间阻塞,则不能显示( gevent.sleep(0),或隐式,由gevent来做)切换到别的协程。导致程序出问题。
  4. 如果有长时间阻塞的 IO 操作,还是用传统的线程模型比较好。
  5. 因为 gevent 的特点总结是:事件驱动 + 协程 + 非阻塞IO,事件驱动指的是 libvent 对 epool 的封装,是基于事件的方式处理 IO。协程指的是 greenlet,非阻塞 IO 指的是 gevent 已经 patch 过的各种库,例如 socket 和 select 等等。
  6. 使用 gevent 的协程,最好要用 gevent 自身的非阻塞的库。如 httplib, socket, select 等等。
  7. gevent 适合处理大量无阻塞的任务,如果有实在不能把阻塞的部分变为非阻塞再交给 gevent 处理,就把阻塞的部分改为异步吧。

原理:程序的重要部分是将任务函数封装到 gevent.spawn

  • 初始化的 greenlet 列表存放在数组 threads 中,此数组被传给 gevent.joinall 函数,
  • gevent.joinall 会阻塞当前流程,并执行所有给定的 greenlet,执行流程只会在所有 greenlet 执行完后才会继续向下走。 

gevent 实现了python 标准库里面大部分的阻塞式系统调用,包括 socket、ssl、threading 和 select 等模块,而将这些阻塞式调用变为协作式运行(参见猴子补丁)。

猴子补丁 Monkey Patch: 

  • (1)猴子补丁的由来 。猴子补丁的这个叫法起源于 Zope 框架,大家在修正 Zope 的 Bug 的时候经常在程序后面追加更新部分,这些被称作是 “杂牌军补丁(guerillapatch)”,后来 guerilla 就渐渐的写成了 gorllia(猩猩),再后来就写了 monkey(猴子),所以猴子补丁的叫法是这么莫名其妙的得来的。 后来在动态语言中,不改变源代码而对功能进行追加和变更,统称为 "猴子补丁"。所以猴子补丁并不是 Python 中专有的。猴子补丁这种东西充分利用了动态语言的灵活性,可以对现有的语言Api 进行追加,替换,修改 Bug,甚至性能优化等等。 使用猴子补丁的方式,gevent 能够修改标准库里面大部分的阻塞式系统调用,包括 socket、ssl、threading 和 select 等模块,而变为协作式运行。也就是通过猴子补丁的 monkey.patch_xxx() 来将 python 标准库中 模块 或 函数 改成 gevent 中的响应的具有协程的协作式对象。这样在不改变原有代码的情况下,将应用的阻塞式方法,变成协程式的。 
  • (2)猴子补丁使用时的注意事项 。猴子补丁的功能很强大,但是也带来了很多的风险,尤其是像 gevent 这种直接进行 API替换的补丁,整个 Python 进程所使用的模块都会被替换,可能自己的代码能 hold 住,但是其它第三方库,有时候问题并不好排查,即使排查出来也是很棘手,所以,就像松本建议的那样,如果要使用猴子补丁,那么只是做功能追加,尽量避免大规模的 API 覆盖。 虽然猴子补丁仍然是邪恶的(evil),但在这种情况下它是 "有用的邪恶(useful evil)"

Linux 的 epoll 和 libev

  • Linux 的 epoll 机制:epoll 是 Linux 内核为处理大批量文件描述符而作了改进的 poll,是 Linux 下 "多路复用IO" select/poll 的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。epoll 的优点:支持一个进程打开大数目的 socket 描述符。select 的一个进程所打开的 FD 由FD_SETSIZE 的设置来限定,而 epoll 没有这个限制,它所支持的 FD 上限是最大可打开文件的数目,远大于2048,而且IO 效率不随 FD 数目增加而线性下降:由于 epoll 只会对 "活跃" 的 socket 进行操作,于是,只有 "活跃" 的 socket 才会主动去调用 callback 函数,其他 idle 状态的 socket 则不会。epoll 使用 mmap 加速内核与用户空间的消息传递。epoll 是通过内核于用户空间 mmap 同一块内存实现的。
  • libev 机制:提供了指定文件描述符事件发生时调用回调函数的机制。libev是一个事件循环器:向libev注册感兴趣的事件,比如socket可读事件,libev会对所注册的事件的源进行管理,并在事件发生时触发相应的程序。

示例:

import gevent
from gevent import socketurls = ['www.baidu.com', 'www.example.com', 'www.python.org']jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]gevent.joinall(jobs, timeout=2)result = [job.value for job in jobs]
print(result)

结果:['61.135.169.125', '93.184.216.34', '151.101.228.223']

说明:gevent.spawn() 方法 spawn 一些 jobs,然后通过 gevent.joinall 将 jobs 加入到 微线程 执行队列中等待其完成,设置超时为 2 秒。执行后的结果通过检查 gevent.Greenlet.value 值来收集。gevent.socket.gethostbyname() 函数与标准的socket.gethotbyname() 有相同的接口,但它不会阻塞整个解释器,因此会使得其他的 greenlets 跟随着无阻的请求而执行。

##################################
看看 Gevent :What is gevent? — gevent 21.12.1.dev0 documentation 
您可以创建几个 Greenlet 对象为几个任务。
每个 greenlet 是 绿色的线程 :https://en.wikipedia.org/wiki/Green_threads。

import gevent
from gevent import monkey
from gevent import Greenletmonkey.patch_all()class Task(Greenlet):def __init__(self, name):Greenlet.__init__(self)self.name = namedef _run(self):print("Task %s: some task..." % self.name)t1 = Task("task1")
t2 = Task("task2")
t1.start()
t2.start()
# here we are waiting all tasks
gevent.joinall([t1, t2])

示例代码:

from gevent import monkey; monkey.patch_all()
import gevent
import requestsdef get_url(url):res = requests.get(url)print(url, res.status_code, len(res.text))url_l = ['http://www.baidu.com','http://www.python.org','http://www.cnblogs.com'
]
g_l = []
for i in url_l:g_l.append(gevent.spawn(get_url, i))
gevent.joinall(g_l)

示例代码( 利用 gevent 并发抓取 ):

from gevent import monkeymonkey.patch_all()import requests
import gevent
import io
import sys# 解决console显示乱码的编码问题
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')class Douban(object):"""A class containing interface test method of Douban object"""def __init__(self):self.host = 'movie.douban.com'self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0','Referer': 'https://movie.douban.com/',}def get_response(self, url, data):resp = requests.post(url=url, data=data, headers=self.headers).content.decode('utf-8')return respdef test_search_tags_movie(self):method = 'search_tags'url = 'https://%s/j/%s' % (self.host, method)post_data = {'type': 'movie','source': 'index'}resp = self.get_response(url=url, data=post_data)print(resp)return respif __name__ == '__main__':douban = Douban()threads = []for i in range(6):thread = gevent.spawn(douban.test_search_tags_movie)threads.append(thread)gevent.joinall(threads)

并发爬图片:

from gevent import monkey
monkey.patch_all()
import requests
import gevent
from lxml import etreedef downloader(img_name, img_url):req = requests.get(img_url)img_content = req.contentwith open(img_name, "wb") as f:f.write(img_content)def main():r = requests.get('http://www.nsgirl.com/portal.php')if r.status_code == 200:img_src_xpath = '//div[@id="frameXWswSe"]//div[@class="portal_block_summary"]//li//img/@src's_html = etree.HTML(text=r.text)all_img_src = s_html.xpath(img_src_xpath)count = 0for img_src in all_img_src:count += 1# print(img_src)# http://www.nsgirl.com/forum.php?mod=image&aid=342&size=218x285&key=cd6828baf05c305curl = 'http://www.nsgirl.com/' + img_srcgevent.joinall([gevent.spawn(downloader, f"{count}.jpg", url), ])if __name__ == '__main__':main()

示例:

# -*- coding: utf-8 -*-from gevent import monkey;monkey.patch_all()
import gevent
import requests
from datetime import datetimedef f(url):print(f'time: {datetime.now()}, GET: {url}')resp = requests.get(url)print(f'time: {datetime.now()}, {len(resp.text)} bytes received from {url}.')gevent.joinall([gevent.spawn(f, 'https://www.python.org/'),gevent.spawn(f, 'https://www.yahoo.com/'),gevent.spawn(f, 'https://github.com/'),
])

gevent 实现 生产者 - 消费者

# -*- coding: utf-8 -*-from gevent import monkey# 猴子补丁,all是所有能切换协程的地方都切换,包含了socket,所以一般都用all
monkey.patch_all()
from gevent.queue import Queue  # 队列 gevent中的队列
import gevent
import randomtask_queue = Queue(3)def producer(index=1):while True:print(f'生产者 [{index}]', end='')item = random.randint(0, 99)task_queue.put(item)print(f"生产 ---> {item}")def consumer(index=1):while True:print(f'消费者 [{index}]', end='')item = task_queue.get()print(f"消费 ---> {item}")def main_1():thread_1 = gevent.spawn(producer)thread_2 = gevent.spawn(consumer)thread_3 = gevent.spawn(consumer, 2)thread_list = [thread_1, thread_2, thread_3]gevent.joinall(thread_list)if __name__ == '__main__':main_1()# main_2()pass

gevent 程序员指南

由 Gevent 社区编写

gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API。

介绍

本指南假定读者有中级Python水平,但不要求有其它更多的知识,不期待读者有 并发方面的知识。本指南的目标在于给予你需要的工具来开始使用gevent,帮助你 驯服现有的并发问题,并从今开始编写异步应用程序。

gevent 官方文档还是可以看看的,尤其是源码里的 examples 都相当不错,有助于理解 gevent 的使用。gevent 封装了很多很方便的接口,其中一个就是 monkey。

from gevent import monkey
monkey.patch_all()

这样两行,就可以使用 python 以前的 socket 之类的,因为 gevent 已经给你自动转化了。
而且安装 gevent 也是很方便,首先安装依赖 libevent 和 greenlet,再利用 pypi 安装即可

                安装 libevent:sudo apt-get install libevent-dev
                安装 python-dev:sudo apt-get install python-dev
                安装 gevent:sudo pip install gevent
                安装 greenlet:sudo pip install greenlet

示例代码:

示例:

from gevent import monkey; monkey.patch_socket()
import geventdef f(n):for i in range(n):print(gevent.getcurrent(), i)gevent.sleep(0)g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()

3个 greenlet 交替运行,把循环次数改为 500000,运行时间长一点,然后在操作系统的进程管理器中看,线程数只有1个。 gevent.sleep() 作用是交出控制权

示例:

import gevent
from gevent import monkey# 切换是在 IO 操作时自动完成,所以gevent需要修改Python自带的一些标准库
# 这一过程在启动时通过monkey patch完成
monkey.patch_all()def func_a():count = 10while count > 0:print(f"func_a ---> {count}")# 用来模拟一个耗时操作,注意不是time模块中的sleep# 每当碰到耗时操作,会自动跳转至其他协程count -= 1gevent.sleep(1)def func_b():count = 10while count > 0:print(f"func_b ---> {count}")count -= 1gevent.sleep(0.5)# gevent.joinall([gevent.spawn(fn)
g1 = gevent.spawn(func_a)  # 创建一个协程
g2 = gevent.spawn(func_b)
g1.join()  # 等待协程执行结束
g2.join()

select() 函数通常是对各种文件描述符进行轮询的阻塞调用。

from gevent import select
...
select.select([], [], [], 2)

gevent 池

示例代码,测试 gevent 的 任务池

from gevent import poolgevent_pool = pool.Pool()def func_1():for index in range(100):gevent_pool.spawn(func_2, index)def func_2(arg=None):print(f'func_2 ---> {arg}')gevent_pool.spawn(func_1)
gevent_pool.join()

示例代码。程序及注释如下:

# -*- coding: utf-8 -*-import time
import gevent
from gevent import event  # 调用 gevent 的 event 子模块# 三个进程需要定义三个事件 event1,event2,event3,来进行12,23,31循环机制,即进程一,进程二,进程三顺序执行def fun1(num, event1, event2):  # 固定格式i = 0while i < 10:  # 设置循环10次i += 1time.sleep(1)  # 睡眠1秒print('进程一:111111111')event2.set()    # 将event2值设为Trueevent1.clear()  # 将event1值设为Falseevent1.wait()   # event1等待,其值为True时才执行def fun2(num, event2, event3):i = 0while i < 10:i += 1time.sleep(1)print('进程二:222222222')event3.set()  # 将event3值设为Trueevent2.clear()  # 将event2值设为Falseevent2.wait()  # event2等待,其值为True时才执行def fun3(num, event3, event1):i = 0while i < 10:i += 1time.sleep(1)print('进程三:333333333')event1.set()event3.clear()event3.wait()if __name__ == "__main__":  # 执行调用格式act1 = gevent.event.Event()  # 调用event中的Event类,用act1表示act2 = gevent.event.Event()act3 = gevent.event.Event()# 三个进程,act1,act2,act3gevent_list = []  # 建立一个数列,用来存和管理进程# 调用gevent中的Greenlet子模块,用Greenlet创建进程一g = gevent.Greenlet(fun1, 1, act1, act2)g.start()gevent_list.append(g)  # 将进程一加入到Gevents数列print('进程一启动:')g = gevent.Greenlet(fun2, 2, act2, act3)g.start()gevent_list.append(g)print('进程二启动:')g = gevent.Greenlet(fun3, 3, act3, act1)g.start()gevent_list.append(g)print('进程三启动:')print('所有进程都已启动!')# 调用Greenlet中的joinall函数,将Gevents的进程收集排列gevent.joinall(gevent_list)

核心部分

Greenlets

在 gevent 中用到的主要模式是 Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

在任何时刻,只有一个协程在运行。

这与 multiprocessing 或 threading 等提供真正并行构造的库是不同的。 这些库轮转使用操作系统调度的进程和线程,是真正的并行。

同步和异步执行

并发的核心思想在于,大的任务可以分解成一系列的子任务,后者可以被调度成 同时执行或异步执行,而不是一次一个地或者同步地执行。两个子任务之间的 切换也就是上下文切换。

在 gevent 里面,上下文切换是通过 yielding 来完成的。在下面的例子里, 我们有两个上下文,通过调用 gevent.sleep(0),它们各自 yield 向对方。

import geventdef foo():print('Running in foo')gevent.sleep(0)print('Explicit context switch to foo again')def bar():print('Explicit context to bar')gevent.sleep(0)print('Implicit context switch back to bar')gevent.joinall([gevent.spawn(foo),gevent.spawn(bar),
])

结果

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

下图将控制流形象化,就像在调试器中单步执行整个程序,以说明上下文切换如何发生。

当在受限于网络或 IO 的函数中使用 gevent,这些函数会被协作式的调度, gevent的真正能力会得到发挥。Gevent 处理了所有的细节, 来保证你的网络库会在可能的时候,隐式交出 greenlet 上下文的执行权。 这样的一种用法是如何强大,怎么强调都不为过。或者我们举些例子来详述。

下面例子中的select()函数通常是一个在各种文件描述符上轮询的阻塞调用

import time
import gevent
from gevent import selectstart = time.time()def gr1():# Busy waits for a second, but we don't want to stick around...print(f'开始 Polling: {int(time.time() - start)}')select.select([], [], [], 2)print(f'结束 Polling: {int(time.time() - start)}')def gr2():# Busy waits for a second, but we don't want to stick around...print(f'开始 Polling: {int(time.time() - start)}')select.select([], [], [], 2)print(f'结束 Polling: {int(time.time() - start)}')def gr3():print(f"在 greenlet pool 运行的时候可以做其他事, {int(time.time() - start)}")gevent.sleep(1)gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3),
])

下面是另外一个多少有点人造色彩的例子,定义一个非确定性的 (non-deterministic) 的 task 函数(给定相同输入的情况下,它的输出不保证相同)。 此例中执行这个函数的副作用就是,每次 task 在它的执行过程中都会随机地停某些秒。

import gevent
import randomdef task(pid):"""Some non-deterministic task"""gevent.sleep(random.randint(0, 2) * 0.001)print('Task %s done' % pid)def synchronous():for i in range(1, 10):task(i)def asynchronous():threads = [gevent.spawn(task, i) for i in range(10)]gevent.joinall(threads)print('同步执行开始:')
synchronous()print('异步执行开始:')
asynchronous()

在同步的部分,所有的 task 都同步的执行, 结果当每个task在执行时主流程被阻塞(主流程的执行暂时停住)。

程序的重要部分是将 task 函数封装到 Greenlet 内部线程的 gevent.spawn。 初始化的 greenlet 列表存放在数组 threads 中,此数组被传给 gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的 greenlet。执行流程只会在 所有 greenlet 执行完后才会继续向下走。

要重点留意的是,异步的部分本质上是随机的,而且异步部分的整体运行时间比同步 要大大减少。事实上,同步部分的最大运行时间,即是每个 task 停0.002秒,结果整个 队列要停0.02秒。而异步部分的最大运行时间大致为0.002秒,因为没有任何一个task会 阻塞其它task的执行。

一个更常见的应用场景,如异步地向服务器取数据,取数据操作的执行时间 依赖于发起取数据请求时远端服务器的负载,各个请求的执行时间会有差别。

示例:

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import jsondef fetch(pid):response = requests.get('https://www.baidu.com')print(f'Process {pid} ---> {len(response.text)}')return len(response.text)def synchronous():for i in range(1, 10):fetch(i)def asynchronous():threads = []for i in range(1, 10):threads.append(gevent.spawn(fetch, i))gevent.joinall(threads)print('Synchronous:')
synchronous()print('Asynchronous:')
asynchronous()

示例:

from gevent import monkey;monkey.patch_all()
import gevent, requests
import datetimedef func_crawl(url):print(f'[{datetime.datetime.now().replace(microsecond=0)}] {url}')resp = requests.get(url)print(f'[{datetime.datetime.now().replace(microsecond=0)}] {url} {len(resp.text)}')current_time = datetime.datetime.now().replace(microsecond=0)
task_list = [gevent.spawn(func_crawl, 'https://www.baidu.com/') for _ in range(5)]
gevent.joinall(task_list)
print(datetime.datetime.now().replace(microsecond=0) - current_time)

确定性

就像之前所提到的,greenlet 具有确定性。在相同配置相同输入的情况下,它们总是 会产生相同的输出。

import time
from gevent.pool import Pooldef echo(i):time.sleep(0.001)return ip = Pool(10)
run1 = [a for a in p.imap_unordered(echo, range(10))]
run2 = [a for a in p.imap_unordered(echo, range(10))]
run3 = [a for a in p.imap_unordered(echo, range(10))]
run4 = [a for a in p.imap_unordered(echo, range(10))]print(run1 == run2 == run3 == run4)

即使gevent通常带有确定性,当开始与如socket或文件等外部服务交互时, 不确定性也可能溜进你的程序中。因此尽管gevent线程是一种“确定的并发”形式, 使用它仍然可能会遇到像使用POSIX线程或进程时遇到的那些问题。

涉及并发长期存在的问题就是竞争条件(race condition)。简单来说, 当两个并发线程/进程都依赖于某个共享资源同时都尝试去修改它的时候, 就会出现竞争条件。这会导致资源修改的结果状态依赖于时间和执行顺序。 这是个问题,我们一般会做很多努力尝试避免竞争条件, 因为它会导致整个程序行为变得不确定。

最好的办法是始终避免所有全局的状态。全局状态和导入时(import-time)副作用总是会 反咬你一口!

创建 Greenlets

gevent 对 Greenlet 初始化提供了一些封装,最常用的使用模板有:

import gevent
from gevent import Greenletdef foo(message, n):"""Each thread will be passed the message, and n argumentsin its initialization."""gevent.sleep(n)print(message)threads = [Greenlet.spawn(foo, "Hello", 1),gevent.spawn(foo, "I live!", 2),gevent.spawn(lambda x: (x + 1), 2)
]# Block until all threads complete.
gevent.joinall(threads)

结果:


Hello
I live!

除使用基本的Greenlet类之外,你也可以子类化Greenlet类,重载它的_run方法。

import gevent
from gevent import Greenletclass MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n)g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

结果


Hi there!

Greenlet 状态

就像任何其他成段代码,Greenlet 也可能以不同的方式运行失败。 Greenlet 可能未能成功抛出异常,不能停止运行,或消耗了太多的系统资源。

一个 greenlet 的状态通常是一个依赖于时间的参数。在 greenlet 中有一些标志, 让你可以监视它的线程内部状态:

  • started -- Boolean, 指示此Greenlet是否已经启动
  • ready() -- Boolean, 指示此Greenlet是否已经停止
  • successful() -- Boolean, 指示此Greenlet是否已经停止而且没抛异常
  • value -- 任意值, 此Greenlet代码返回的值
  • exception -- 异常, 此Greenlet内抛出的未捕获异常
import geventdef win():return 'You win!'def fail():raise Exception('You fail at failing.')winner = gevent.spawn(win)
loser = gevent.spawn(fail)print(winner.started)  # True
print(loser.started)  # True# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:gevent.joinall([winner, loser])
except BaseException as be:print('This will never be reached')print(winner.value)  # 'You win!'
print(loser.value)  # Noneprint(winner.ready())  # True
print(loser.ready())  # Trueprint(winner.successful())  # True
print(loser.successful())  # False# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.print(loser.exception)# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

结果


True
True
You win!
None
True
True
True
False
You fail at failing.

程序停止

当主程序 (main program) 收到一个 SIGQUIT 信号时,不能成功做 yield 操作的 Greenlet 可能会令意外地挂起程序的执行。这导致了所谓的僵尸进程, 它需要在 Python 解释器之外被 kill 掉。

对此,一个通用的处理模式就是在主程序中监听 SIGQUIT 信号,在程序退出 调 gevent.shutdown。

import gevent
import signaldef run_forever():gevent.sleep(1000)if __name__ == '__main__':gevent.signal_handler(signal.SIGQUIT, gevent.shutdown)thread = gevent.spawn(run_forever)thread.join()

超时

超时是一种对一块代码或一个Greenlet的运行时间的约束。

import gevent
from gevent import Timeoutseconds = 10timeout = Timeout(seconds)
timeout.start()def wait():gevent.sleep(10)try:gevent.spawn(wait).join()
except Timeout:print('Could not complete')

超时类也可以用在上下文管理器(context manager)中, 也就是with语句内。

import gevent
from gevent import Timeouttime_to_wait = 5  # secondsclass TooLong(Exception):passwith Timeout(time_to_wait, TooLong):gevent.sleep(10)

另外,对各种 Greenlet 和数据结构相关的调用,gevent 也提供了超时参数。 例如:

import gevent
from gevent import Timeoutdef wait():gevent.sleep(2)timer = Timeout(1).start()
thread1 = gevent.spawn(wait)try:thread1.join(timeout=timer)
except Timeout:print('Thread 1 timed out')# --timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)try:thread2.get(timeout=timer)
except Timeout:print('Thread 2 timed out')try:gevent.with_timeout(1, wait)
except Timeout:print('Thread 3 timed out')

猴子补丁(Monkey patching)

我们现在来到 gevent 的死角了. 在此之前,我已经避免提到猴子补丁(monkey patching) 以尝试使gevent 这个强大的协程模型变得生动有趣,但现在到了讨论猴子补丁的黑色艺术 的时候了。你之前可能注意到我们提到了monkey.patch_socket() 这个命令,这个 纯粹副作用命令是用来改变标准socket 库的

import socket
print(socket.socket)print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)

结果:

class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'built-in function select
After monkey patch
function select at 0x1924de8

Python 的运行环境允许我们在运行时修改大部分的对象,包括模块,类甚至函数。 这是个一般说来令人惊奇的坏主意,因为它创造了 “隐式的副作用”,如果出现问题 它很多时候是极难调试的。虽然如此,在极端情况下当一个库需要修改 Python 本身的基础行为的时候,猴子补丁就派上用场了。在这种情况下,gevent 能够修改标准库里面大部分的阻塞式系统调用,包括 socket、ssl、threading 和 select 等模块,而变为协作式运行。

例如,Redis 的 python 绑定一般使用常规的 tcp socket 来与 redis-server 实例通信。 通过简单地调用 gevent.monkey.patch_all(),可以使得 redis 的绑定协作式的调度 请求,与 gevent 栈的其它部分一起工作。

这让我们可以将一般不能与 gevent 共同工作的库结合起来,而不用写哪怕一行代码。 虽然猴子补丁仍然是邪恶的(evil),但在这种情况下它是“有用的邪恶(useful evil)”。

数据结构

事件

事件(event)是一个在 Greenlet 之间异步通信的形式。

import gevent
from gevent.event import Event'''
Illustrates the use of events
'''evt = Event()def setter():"""After 3 seconds, wake all threads waiting on the value of evt"""print('A: Hey wait for me, I have to do something')gevent.sleep(3)print("Ok, I'm done")evt.set()def waiter():"""After 3 seconds the get call will unblock"""print("I'll wait for you")evt.wait()  # blockingprint("It's about time")def main():gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter),gevent.spawn(waiter),gevent.spawn(waiter),gevent.spawn(waiter),gevent.spawn(waiter)])if __name__ == '__main__':main()

事件对象的一个扩展是 AsyncResult,它允许你在唤醒调用上附加一个值。 它有时也被称作是future 或 defered,因为它持有一个指向将来任意时间可设置 为任何值的引用。

import gevent
from gevent.event import AsyncResulta = AsyncResult()def setter():"""After 3 seconds set the result of a."""gevent.sleep(3)a.set('Hello!')def waiter():"""After 3 seconds the get call will unblock after the setterputs a value into the AsyncResult."""print(a.get())gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter),
])

队列

队列是一个排序的数据集合,它有常见的 put / get 操作, 但是它是以在Greenlet之间可以安全操作的方式来实现的。举例来说,如果一个 Greenlet 从队列中取出一项,此项就不会被 同时执行的其它Greenlet再取到了。

import gevent
from gevent.queue import Queuetasks = Queue()def worker(n):while not tasks.empty():task = tasks.get()print('Worker %s got task %s' % (n, task))gevent.sleep(0)print('Quitting time!')def boss():for i in xrange(1, 25):tasks.put_nowait(i)gevent.spawn(boss).join()gevent.joinall([gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'nancy'),
])

如果需要,队列也可以阻塞在putget操作上。

putget操作都有非阻塞的版本,put_nowaitget_nowait不会阻塞, 然而在操作不能完成时抛出gevent.queue.Emptygevent.queue.Full异常。

在下面例子中,我们让boss与多个worker同时运行,并限制了queue不能放入多于3个元素。 这个限制意味着,直到queue有空余空间之间,put操作会被阻塞。相反地,如果队列中 没有元素,get操作会被阻塞。它同时带一个timeout参数,允许在超时时间内如果 队列没有元素无法完成操作就抛出gevent.queue.Empty异常。

import gevent
from gevent.queue import Queue, Emptytasks = Queue(maxsize=3)def worker(n):try:while True:task = tasks.get(timeout=1)  # decrements queue size by 1print('Worker %s got task %s' % (n, task))gevent.sleep(0)except Empty:print('Quitting time!')def boss():"""Boss will wait to hand out work until a individual worker isfree since the maxsize of the task queue is 3."""for i in range(1, 10):tasks.put(i)print('Assigned all work in iteration 1')for i in range(10, 20):tasks.put(i)print('Assigned all work in iteration 2')gevent.joinall([gevent.spawn(boss),gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'bob'),
])

组和池

组(group) 是一个运行中 greenlet 的集合,集合中的 greenlet 像一个组一样 会被共同管理和调度。 它也兼饰了像 Python 的 multiprocessing 库那样的 平行调度器的角色。

import gevent
from gevent.pool import Groupdef talk(msg):for i in range(3):print(msg)g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')group = Group()
group.add(g1)
group.add(g2)
group.join()group.add(g3)
group.join()

在管理异步任务的分组上它是非常有用的。

就像上面所说,Group 也以不同的方式为分组 greenlet/分发工作和收集它们的结果也提供了API。

import gevent
from gevent import getcurrent
from gevent.pool import Groupgroup = Group()def hello_from(n):print('Size of group %s' % len(group))print('Hello from Greenlet %s' % id(getcurrent()))group.map(hello_from, range(3))def intensive(n):gevent.sleep(3 - n)return 'task', nprint('Ordered')ogroup = Group()
for i in ogroup.imap(intensive, range(3)):print(i)print('Unordered')igroup = Group()
for i in igroup.imap_unordered(intensive, range(3)):print(i)

池(pool)是一个为处理数量变化并且需要限制并发的greenlet而设计的结构。 在需要并行地做很多受限于网络和IO的任务时常常需要用到它。

import gevent
from gevent.pool import Poolpool = Pool(2)def hello_from(n):print('Size of pool %s' % len(pool))pool.map(hello_from, range(3))

当构造 gevent 驱动的服务时,经常会将围绕一个池结构的整个服务作为中心。 一个例子就是在各个 socket 上轮询的类。

from gevent.pool import Poolclass SocketPool(object):def __init__(self):self.pool = Pool(1000)self.pool.start()def listen(self, socket):while True:socket.recv()def add_handler(self, socket):if self.pool.full():raise Exception("At maximum pool size")else:self.pool.spawn(self.listen, socket)def shutdown(self):self.pool.kill()

锁和信号量(locks and semaphores)

信号量是一个允许 greenlet 相互合作,限制并发访问或运行的低层次的同步原语。 信号量有两个方法,acquire 和 release。在信号量是否已经被 acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。如果一个信号量的范围已经降低到0,它会 阻塞 acquire 操作直到另一个已经获得信号量的 greenlet 作出释放。

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphoresem = BoundedSemaphore(2)def worker1(n):sem.acquire()print('Worker %i acquired semaphore' % n)sleep(0)sem.release()print('Worker %i released semaphore' % n)def worker2(n):with sem:print('Worker %i acquired semaphore' % n)sleep(0)print('Worker %i released semaphore' % n)pool = Pool()
pool.map(worker1, range(0, 2))
pool.map(worker2, range(3, 6))

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

范围为1的信号量也称为锁(lock)。它向单个greenlet提供了互斥访问。 信号量和锁常常用来保证资源只在程序上下文被单次使用。

加锁

如果需要在使用 gevent 的时候加锁,也是非常方便的:

# -*- coding: utf-8 -*-import gevent
from gevent.lock import Semaphoresem = Semaphore(1)def f1():for i in range(5):sem.acquire()print('run f1, this is ', i)sem.release()gevent.sleep(1)def f2():for i in range(5):sem.acquire()print('run f2, that is ', i)sem.release()gevent.sleep(0.3)t1 = gevent.spawn(f1)
t2 = gevent.spawn(f2)
gevent.joinall([t1, t2])

线程局部变量

Gevent也允许你指定局部于greenlet上下文的数据。 在内部,它被实现为以greenlet的getcurrent()为键, 在一个私有命名空间寻址的全局查找。

import gevent
from gevent.local import localstash = local()def f1():stash.x = 1print(stash.x)def f2():stash.y = 2print(stash.y)try:stash.xexcept AttributeError:print("x is not local to f2")g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)gevent.joinall([g1, g2])

很多集成了 gevent 的 web 框架将 HTTP 会话对象以线程局部变量的方式存储在 gevent 内。 例如使用 Werkzeug 实用库和它的 proxy 对象,我们可以创建 Flask 风格的请求对象。

from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanagerfrom gevent.wsgi import WSGIServer_requests = local()
request = LocalProxy(lambda: _requests.request)@contextmanager
def sessionmanager(environ):_requests.request = Request(environ)yield_requests.request = Nonedef logic():return "Hello " + request.remote_addrdef application(environ, start_response):status = '200 OK'with sessionmanager(environ):body = logic()headers = [('Content-Type', 'text/html')]start_response(status, headers)return [body]WSGIServer(('', 8000), application).serve_forever()

Flask 系统比这个例子复杂一点,然而使用线程局部变量作为局部的会话存储, 这个思想是相同的。

子进程

自 gevent 1.0 起,gevent.subprocess,一个 Python subprocess 模块 的修补版本已经添加。它支持协作式的等待子进程。

import gevent
from gevent.subprocess import Popen, PIPEdef cron():while True:print("cron")gevent.sleep(0.2)g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())
cron
cron
cron
cron
cron
Linux

很多人也想将 gevent 和 multiprocessing 一起使用。最明显的挑战之一 就是 multiprocessing 提供的进程间通信默认不是协作式的。由于基于 multiprocessing.Connection 的对象(例如Pipe)暴露了它们下面的 文件描述符(file descriptor),gevent.socket.wait_read 和 wait_write 可以用来在直接读写之前协作式的等待 ready-to-read/ready-to-write 事件。

import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write# To Process
a, b = Pipe()# From Process
c, d = Pipe()def relay():for i in range(10):msg = b.recv()c.send(msg + " in " + str(i))def put_msg():for i in range(10):wait_write(a.fileno())a.send('hi')def get_msg():for i in range(10):wait_read(d.fileno())print(d.recv())if __name__ == '__main__':proc = Process(target=relay)proc.start()g1 = gevent.spawn(get_msg)g2 = gevent.spawn(put_msg)gevent.joinall([g1, g2], timeout=1)

然而要注意,组合multiprocessing和gevent必定带来 依赖于操作系统(os-dependent)的缺陷,其中有:

  • 在兼容POSIX的系统创建子进程(forking)之后, 在子进程的gevent的状态是不适定的(ill-posed)。一个副作用就是, multiprocessing.Process创建之前的greenlet创建动作,会在父进程和子进程两 方都运行。

  • 上例的put_msg()中的a.send()可能依然非协作式地阻塞调用的线程:一个 ready-to-write事件只保证写了一个byte。在尝试写完成之前底下的buffer可能是满的。

  • 上面表示的基于wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)),因为Windows不能监视 pipe事件。

Python包gipc以大体上透明的方式在 兼容POSIX系统和Windows上克服了这些挑战。它提供了gevent感知的基于multiprocessing.Process的子进程和gevent基于pipe的协作式进程间通信。

Actors

actor 模型是一个由于Erlang变得普及的更高层的并发模型。 简单的说它的主要思想就是许多个独立的Actor,每个Actor有一个可以从 其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息,并 根据它期望的行为来采取行动。

Gevent 没有原生的 Actor 类型,但在一个子类化的 Greenlet 内使用队列, 我们可以定义一个非常简单的。

import gevent
from gevent.queue import Queueclass Actor(gevent.Greenlet):def __init__(self):super(Actor, self).__init__()self.inbox = Queue()       def receive(self, message):"""Define in your subclass."""raise NotImplemented()def _run(self):self.running = Truewhile self.running:message = self.inbox.get()self.receive(message)

下面是一个使用的例子:

import gevent
from gevent import Greenlet
from gevent.queue import Queueclass Actor(gevent.Greenlet):def __init__(self):super(Actor, self).__init__()self.inbox = Queue()def receive(self, message):"""Define in your subclass."""raise NotImplemented()def _run(self):self.running = Truewhile self.running:message = self.inbox.get()self.receive(message)class Pinger(Actor):def receive(self, message):print(message)pong.inbox.put('ping')gevent.sleep(0)class Ponger(Actor):def receive(self, message):print(message)ping.inbox.put('pong')gevent.sleep(0)ping = Pinger()
pong = Ponger()ping.start()
pong.start()ping.inbox.put('start')
gevent.joinall([ping, pong])

真实世界的应用

Gevent ZeroMQ

ZeroMQ 被它的作者描述为 “一个表现得像一个并发框架的socket库”。 它是一个非常强大的,为构建并发和分布式应用的消息传递层。

ZeroMQ提供了各种各样的socket原语。最简单的是请求-应答socket对 (Request-Response socket pair)。一个socket有两个方法sendrecv, 两者一般都是阻塞操作。但是Travis Cline 的一个杰出的库弥补了这一点,这个库使用gevent.socket来以非阻塞的方式 轮询ZereMQ socket。通过命令:

pip install gevent-zeromq

你可以从 PyPi 安装 gevent-zeremq。

# Note: Remember to ``pip install pyzmq gevent_zeromq``
import gevent
from gevent_zeromq import zmq# Global Context
context = zmq.Context()def server():server_socket = context.socket(zmq.REQ)server_socket.bind("tcp://127.0.0.1:5000")for request in range(1, 10):server_socket.send("Hello")print('Switched to Server for %s' % request)# Implicit context switch occurs hereserver_socket.recv()def client():client_socket = context.socket(zmq.REP)client_socket.connect("tcp://127.0.0.1:5000")for request in range(1, 10):client_socket.recv()print('Switched to Client for %s' % request)# Implicit context switch occurs hereclient_socket.send("World")publisher = gevent.spawn(server)
client = gevent.spawn(client)gevent.joinall([publisher, client])

结果


Switched to Server for 1
Switched to Client for 1
Switched to Server for 2
Switched to Client for 2
Switched to Server for 3
Switched to Client for 3
Switched to Server for 4
Switched to Client for 4
Switched to Server for 5
Switched to Client for 5
Switched to Server for 6
Switched to Client for 6
Switched to Server for 7
Switched to Client for 7
Switched to Server for 8
Switched to Client for 8
Switched to Server for 9
Switched to Client for 9

简单 server


# On Unix: Access with ``$ nc 127.0.0.1 5000``
# On Window: Access with ``$ telnet 127.0.0.1 5000``from gevent.server import StreamServerdef handle(socket, address):socket.send("Hello from a telnet!\n")for i in range(5):socket.send(str(i) + '\n')socket.close()server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()

WSGI Servers

Gevent为HTTP内容服务提供了两种WSGI server。从今以后就称为 wsgipywsgi

  • gevent.wsgi.WSGIServer
  • gevent.pywsgi.WSGIServer

在1.0.x之前更早期的版本里,gevent使用libevent而不是libev。 Libevent包含了一个快速HTTP server,它被用在gevent的wsgi server。

在gevent 1.0.x版本,没有包括http server了。作为替代,gevent.wsgi 现在是纯Python server gevent.pywsgi的一个别名。

流式 server

这个章节不适用于gevent 1.0.x版本

熟悉流式HTTP服务(streaming HTTP service)的人知道,它的核心思想 就是在头部(header)不指定内容的长度。反而,我们让连接保持打开, 在每块数据前加一个16进制字节来指示数据块的长度,并将数据刷入pipe中。 当发出一个0长度数据块时,流会被关闭。

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked8
<p>Hello9
World</p>0

上述的HTTP连接不能在wsgi中创建,因为它不支持流式。 请求只有被缓冲(buffered)下来。

from gevent.wsgi import WSGIServerdef application(environ, start_response):status = '200 OK'body = '<p>Hello World</p>'headers = [('Content-Type', 'text/html')]start_response(status, headers)return [body]WSGIServer(('', 8000), application).serve_forever()

然而使用pywsgi我们可以将handler写成generator,并以块的形式yield出结果。

from gevent.pywsgi import WSGIServerdef application(environ, start_response):status = '200 OK'headers = [('Content-Type', 'text/html')]start_response(status, headers)yield "<p>Hello"yield "World</p>"WSGIServer(('', 8000), application).serve_forever()

但无论如何,与其它Python server相比gevent server性能是显胜的。 Libev是得到非常好审查的技术,由它写出的server在大规模上表现优异为人熟知。

为了测试基准,试用Apache Benchmark ab或浏览 Benchmark of Python WSGI Servers 来与其它server作对比。

$ ab -n 10000 -c 100 http://127.0.0.1:8000/

Long Polling

import gevent
from gevent.queue import Queue, Empty
from gevent.pywsgi import WSGIServer
import simplejson as jsondata_source = Queue()def producer():while True:data_source.put_nowait('Hello World')gevent.sleep(1)def ajax_endpoint(environ, start_response):status = '200 OK'headers = [('Content-Type', 'application/json')]start_response(status, headers)while True:try:datum = data_source.get(timeout=5)yield json.dumps(datum) + '\n'except Empty:passgevent.spawn(producer)WSGIServer(('', 8000), ajax_endpoint).serve_forever()

Websockets

运行Websocket的例子需要gevent-websocket包。

# Simple gevent-websocket server
import json
import randomfrom gevent import pywsgi, sleep
from geventwebsocket.handler import WebSocketHandlerclass WebSocketApp(object):'''Send random data to the websocket'''def __call__(self, environ, start_response):ws = environ['wsgi.websocket']x = 0while True:data = json.dumps({'x': x, 'y': random.randint(1, 5)})ws.send(data)x += 1sleep(0.5)server = pywsgi.WSGIServer(("", 10000), WebSocketApp(),handler_class=WebSocketHandler
)
server.serve_forever()

HTML Page:

<html><head><title>Minimal websocket application</title><script type="text/javascript" src="jquery.min.js"></script><script type="text/javascript">$(function() {// Open up a connection to our servervar ws = new WebSocket("ws://localhost:10000/");// What do we do when we get a message?ws.onmessage = function(evt) {$("#placeholder").append('<p>' + evt.data + '</p>')}// Just update our conn_status field with the connection statusws.onopen = function(evt) {$('#conn_status').html('<b>Connected</b>');}ws.onerror = function(evt) {$('#conn_status').html('<b>Error</b>');}ws.onclose = function(evt) {$('#conn_status').html('<b>Closed</b>');}});</script></head><body><h1>WebSocket Example</h1><div id="conn_status">Not Connected</div><div id="placeholder" style="width:600px;height:300px;"></div></body>
</html>

聊天 server

最后一个生动的例子,实现一个实时聊天室。运行这个例子需要 Flask (你可以使用Django, Pyramid等,但不是必须的)。 对应的Javascript和HTML文件可以在 这里找到。

# Micro gevent chatroom.
# ----------------------from flask import Flask, render_template, requestfrom gevent import queue
from gevent.pywsgi import WSGIServerimport simplejson as jsonapp = Flask(__name__)
app.debug = Truerooms = {'topic1': Room(),'topic2': Room(),
}users = {}class Room(object):def __init__(self):self.users = set()self.messages = []def backlog(self, size=25):return self.messages[-size:]def subscribe(self, user):self.users.add(user)def add(self, message):for user in self.users:print(user)user.queue.put_nowait(message)self.messages.append(message)class User(object):def __init__(self):self.queue = queue.Queue()@app.route('/')
def choose_name():return render_template('choose.html')@app.route('/<uid>')
def main(uid):return render_template('main.html',uid=uid,rooms=rooms.keys())@app.route('/<room>/<uid>')
def join(room, uid):user = users.get(uid, None)if not user:users[uid] = user = User()active_room = rooms[room]active_room.subscribe(user)print('subscribe %s %s' % (active_room, user))messages = active_room.backlog()return render_template('room.html',room=room, uid=uid, messages=messages)@app.route("/put/<room>/<uid>", methods=["POST"])
def put(room, uid):user = users[uid]room = rooms[room]message = request.form['message']room.add(':'.join([uid, message]))return ''@app.route("/poll/<uid>", methods=["POST"])
def poll(uid):try:msg = users[uid].queue.get(timeout=10)except queue.Empty:msg = []return json.dumps(msg)if __name__ == "__main__":http = WSGIServer(('', 5000), app)http.serve_forever()

greenlet

greenlet 官方文档:https://greenlet.readthedocs.io/en/latest/

​为了更好使用 协程 来完成多任务,python 中 greenlet 模块对其封装,从而使得切换任务变得更加简单。安装方式:pip3 install greenlet

官网示例:

from greenlet import greenletdef test1():print(12)gr2.switch()print(34)def test2():print(56)gr1.switch()print(78)gr1 = greenlet(test1)
gr2 = greenlet(test2)gr1.switch()  # 切换到 gr1 开始运行, 即 从 gr1 对应 的 test1 开始运行

运行代码,输出为:12 56 34

当创建一个 greenlet 时,首先初始化一个空的栈, switch 到这个栈的时候,会运行在 greenlet 构造时传入的函数(首先在test1中打印 12), 如果在这个函数(test1)中 switch 到其他协程(到了test2 打印 56),那么该协程会被挂起,等到切换回来(在test2 中切换到 test1 打印34)。当这个协程对应函数执行完毕,那么这个协程就变成dead状态。

看下面代码:

from greenlet import greenletdef test1():print(12)gr2.switch()print(34)def test2():print(56)gr1.switch()print(78)gr1 = greenlet(test1)
gr2 = greenlet(test2)gr2.switch()  # 切换到 gr2 开始运行, 即 从 gr2 对应 的 test2 开始运行

运行代码,输出为:56 12 78

greenlet 的 module 与 class

一起看一下greenlet中的属性:

其中,比较重要的是: getcurrent(), 类greenlet异常类GreenletExit 
getcurrent() :返回当前的greenlet实例;
GreenletExit:是一个特殊的异常,当触发了这个异常的时候,即使不处理,也不会抛到其parent(后面会提到协程中对返回值或者异常的处理)

然后我们再来看看 greenlet.greenlet 这个类:

比较重要的几个属性:

  1. run:当 greenlet 启动的时候会调用到这个callable,如果我们需要继承greenlet.greenlet时,需要重写该方法
  2. switch:前面已经介绍过了,在greenlet之间切换
  3. parent:可读写属性,后面介绍
  4. dead:如果greenlet执行结束,那么该属性为true
  5. throw:切换到指定greenlet后立即跑出异常

注意,本文后面提到的 greenlet 大多都是指 greenlet.greenlet 这个 class,注意区分

Switch not call

对于 greenlet,最常用的写法是 x = gr.switch(y)。 这句话的意思是切换到 gr,传入参数 y。当从其他协程(不一定是这个gr)切换回来的时候,将值付给 x 。

import greenletdef test1(x, y):z = gr2.switch(x+y)print('test1 ', z)def test2(u):print('test2 ', u)gr1.switch(10)gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
print(gr1.switch("hello", " world"))

输出:
    'test2 ' 'hello world'
    'test1 ' 10
    None
上面的例子,第12行从 main greenlet 切换到了gr1,test1 第3行切换到了gr2,然后 gr1 挂起,第8行从 gr2 切回 gr1 时,将值(10)返回值给了 z。

每一个 Greenlet 都有一个 parent,一个新的 greenlet 在哪里创生,当前环境的 greenlet 就是这个新 greenlet 的 parent。所有的greenlet 构成一棵树,其跟节点就是还没有手动创建 greenlet 时候的 ”main” greenlet(事实上,在首次 import greenlet 的时候实例化)。当一个协程 正常结束,执行流程回到其对应的parent;或者在一个协程中抛出未被捕获的异常,该异常也是传递到其parent。

Python 中 一切皆对象。 "everything is oblect"
在学习 greenlet 的调用中,同样有一句话应该深刻理解, "switch not call"。即 切换 不是 调用。

import greenletdef test1(x, y):print(id(greenlet.getcurrent()), id(greenlet.getcurrent().parent))  # 40240272 40239952z = gr2.switch(x+y)print('back z', z)def test2(u):print(id(greenlet.getcurrent()), id(greenlet.getcurrent().parent))  # 40240352 40239952return 'hehe'gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
print(id(greenlet.getcurrent()), id(gr1), id(gr2))     # 40239952, 40240272, 40240352
print(gr1.switch("hello", " world"), 'back to main')   # hehe back to main

由这个例子可以看出,尽管是从 test1 所在的协程 gr1 切换到了 gr2,但 gr2 的 parent 还是 ’main’ greenlet,因为默认的 parent取决于 greenlet 的创生环境。另外,在 test2 中 return 之后整个返回值返回到了其 parent,而不是 switch 到该协程的地方(即不是 test1 ),这个跟我们平时的函数调用不一样,记住 “switch not call” 。对于异常,也是展开至 parent:

import greenletdef test1(x, y):try:z = gr2.switch(x+y)except Exception:print('catch Exception in test1')def test2(u):assert Falsegr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)
try:gr1.switch("hello", " world")
except:print('catch Exception in main')

输出为:catch Exception in main

greenlet 生命周期

本文开始的地方提到第一个例子中的 gr2 其实并没有正常结束,我们可以用 greenlet.dead 这个属性来查看:

from greenlet import greenletdef test1():gr2.switch(1)print('test1 finished')def test2(x):print('test2 first', x)z = gr1.switch()print('test2 back', z)gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
print('gr1 is dead?: %s, gr2 is dead?: %s' % (gr1.dead, gr2.dead))
gr2.switch()
print('gr1 is dead?: %s, gr2 is dead?: %s' % (gr1.dead, gr2.dead))
print(gr2.switch(10))

输出如下:

test2 first 1
test1 finished
gr1 is dead?: True, gr2 is dead?: False
test2 back ()
gr1 is dead?: True, gr2 is dead?: True
10

从这个例子可以看出:

  • 1. 只有当协程对应的函数执行完毕,协程才会 die,所以第一次 Check 的时候 gr2 并没有 die,因为第 9 行切换出去了就没切回来。在 main 中再 switch 到 gr2 的时候, 执行后面的逻辑,gr2 die
  • 2. 如果试图再次 switch 到一个已经是 dead 状态的 greenlet 会怎么样呢,事实上会切换到其 parent greenlet。

Greenlet Traceing

Greenlet 也提供了接口使得程序员可以监控 greenlet 的整个调度流程。主要是 gettrace 和 settrace(callback) 函数。

import greenletdef test_greenlet_tracing():def callback(event, args):print(event, 'from', id(args[0]), 'to', id(args[1]))def dummy():g2.switch()def dummyexception():raise Exception('excep in coroutine')main = greenlet.getcurrent()g1 = greenlet.greenlet(dummy)g2 = greenlet.greenlet(dummyexception)print('main id %s, gr1 id %s, gr2 id %s' % (id(main), id(g1), id(g2)))oldtrace = greenlet.settrace(callback)try:g1.switch()except BaseException as e:print('Exception : ', e)finally:greenlet.settrace(oldtrace)test_greenlet_tracing()

结果:

main id 1397838280136, gr1 id 1397838280312, gr2 id 1397838280488
switch from 1397838280136 to 1397838280312
switch from 1397838280312 to 1397838280488
throw from 1397838280488 to 1397838280136
Exception :  excep in coroutine

其中 callback 函数 event 是 switch 或者 throw 之一,表明是正常调度还是异常跑出;args 是二元组,表示是从协程 args[0] 切换到了协程 args[1]。上面的输出展示了切换流程:从 main 到 gr1,然后到 gr2,最后回到 main。

greenlet使用建议

使用greenlet需要注意一下三点:

  • 1. greenlet 创建之后,一定要结束,不能 switch 出去就不回来了,否则容易造成内存泄露
  • 2. python 中每个线程都有自己的 main greenlet 及其对应的 sub-greenlet ,不同线程之间的 greenlet 是不能相互切换的
  • 3. 不能存在循环引用,这个是官方文档明确说明:”Greenlets do not participate in garbage collection; cycles involving data that is present in a greenlet’s frames will not be detected. “

来看一个例子:

from greenlet import greenlet, GreenletExithuge = []def show_leak():def test1():gr2.switch()def test2():huge.extend([x * x for x in range(100)])gr1.switch()print('finish switch del huge')del huge[:]gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()gr1 = gr2 = Noneprint('length of huge is zero ? %s' % len(huge))if __name__ == '__main__':show_leak()# output: length of huge is zero ? 100

在test2函数中,第11行,我们将huge清空,然后再第16行将gr1、gr2的引用计数降到了0。但运行结果告诉我们,第11行并没有执行,所以如果一个协程没有正常结束是很危险的,往往不符合程序员的预期。greenlet提供了解决这个问题的办法,官网文档提到:如果一个greenlet实例的引用计数变成0,那么会在上次挂起的地方抛出GreenletExit异常,这就使得我们可以通过try ... finally 处理资源泄露的情况。如下面的代码:

from greenlet import greenlet, GreenletExithuge = []def show_leak():def test1():gr2.switch()def test2():huge.extend([x * x for x in range(100)])try:gr1.switch()finally:print('finish switch del huge')del huge[:]gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()gr1 = gr2 = Noneprint('length of huge is zero ? %s' % len(huge))if __name__ == '__main__':show_leak()# output :
# finish switch del huge
# length of huge is zero ? 0

上述代码的switch流程:main greenlet --> gr1 --> gr2 --> gr1 --> main greenlet, 很明显gr2没有正常结束(在第10行挂起了)。第18行之后gr1,gr2的引用计数都变成0,那么会在第10行抛出GreenletExit异常,因此finally语句有机会执行。同时,在文章开始介绍Greenlet module的时候也提到了,GreenletExit这个异常并不会抛出到parent,所以main greenlet也不会出异常。

看上去貌似解决了问题,但这对程序员要求太高了,百密一疏。所以最好的办法还是保证协程的正常结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/497027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高通首次推出AI引擎 打包所有软硬件算力

来源&#xff1a;智东西作者&#xff1a;明天2月22日消息&#xff0c;高通宣布推出人工智能引擎&#xff08;AI Engine&#xff09;&#xff0c;让人工智能在终端侧&#xff08;如智能手机&#xff09;上的应用更快速、高效。该AI Engine包括软硬件两部分&#xff0c;在高通骁龙…

一文详解「群体机器人」中的「实体进化」到底是什么?

原文来源&#xff1a;frontiers作者&#xff1a;Nicolas Bredeche、Evert Haasdijk、Abraham Prieto「雷克世界」编译&#xff1a;嗯~阿童木呀、KABUDA本文概述了适用于机器人群体&#xff08;robot collectives&#xff09;在线分布式进化的进化机器人技术&#xff0c;即实体进…

prototype.js1.5平面结果导读图

转载于:https://www.cnblogs.com/zjypp/archive/2007/10/16/2319458.html

NumPy的详细教程

来源&#xff1a;http://blog.csdn.net/lsjseu/article/details/20359201 用 Python 做科学计算(PDF源码)&#xff1a;https://download.csdn.net/download/freeking101/10959832用 Python 做科学计算&#xff1a;基础篇、手册篇、实战篇&#xff1a;http://old.sebug.net/pap…

美媒评2018年全球十大突破性技术:AI和人工胚胎上榜

来源&#xff1a;新浪科技作者&#xff1a;邱越 斯眉美国《麻省理工科技评论》本周刊文&#xff0c;列出了2018年的10大科技突破。今年入选的技术包括人工智能技术“生成对抗网络”&#xff08;GAN&#xff09;、人工胚胎&#xff0c;以及基于天然气的清洁能源技术等。以下为完…

Sublime Text 全程图文指引

From&#xff08;Sublime Text 全程指南&#xff09;&#xff1a;http://zh.lucida.me/blog/sublime-text-complete-guide From&#xff08;Sublime Text 3 全程详细图文原创教程&#xff09;&#xff1a;http://www.qdfuns.com/notes/15088/7f1b1a378c5b85c179571e0860f2baad.…

设计模式分析

聚合&#xff0c;层次设计模式&#xff1a;适用于层次关系例子&#xff1a;publicclassFee { privatefloatvaluee 0; publicstringGetName() { //返回费用的名称} publicboolHasChildren() { //该费用类型是否有子类型} …

dos命令、find、findstr、ping、nbtstat、netstat、net、at、ftp、telnet、tasklist、taskkill、netsh

DOS 在线手册&#xff1a;http://www.shouce.ren/api/dos/ DOS 命令学习手册 ( DOS W3School 教程 )&#xff1a;https://www.w3cschool.cn/dosmlxxsc1/ cmd命令&#xff1a; &#xff1a;http://wenku.baidu.com/view/5ecce91452d380eb62946da8.html&#xff1a;http://wenku…

重磅!中国科学家最新医学AI成果荣登《细胞》杂志

作者&#xff1a;李雨晨概要&#xff1a;在今天出版的最新一期《细胞》上&#xff0c;华人学者张康教授的研究荣登杂志封面。他们带来的&#xff0c;是一款能精确诊断多种疾病的人工智能工具。医学人工智能领域又有大新闻。在今天出版的最新一期《细胞》上&#xff0c;华人学者…

2017全球教育机器人行业研究报告(附PDF下载)

来源&#xff1a; 起点财经 概要&#xff1a;伴随着全球化市场激烈竞争及高科技日新月异的发展&#xff0c;美、日、韩、欧、中等国家逐渐将智能机器人作为战略新兴产业发展不可缺少的创新技术支撑。伴随着全球化市场激烈竞争及高科技日新月异的发展&#xff0c;美、日、韩、…

从零开始——PowerShell应用入门(全例子入门讲解)

From&#xff1a;https://www.cnblogs.com/lavender000/p/6935589.html PowerShell 在线教程&#xff1a;https://www.pstips.net/powershell-online-tutorials 微软 PowerShell 官方文档&#xff1a;https://docs.microsoft.com/zh-cn/powershell/ 学习一门技术&#xff0c;…

时空大数据赋能智慧城市的思考和实践

来源&#xff1a; 超图集团时空大数据和新型智慧城市是当下地信产业的两大热词&#xff0c;这两者的奇妙关联将擦出怎样精彩的火花&#xff1f;时空大数据如何赋能城市智能与城市智慧&#xff0c;让智慧城市建设迈上新的高度&#xff1f;2月6日至7日&#xff0c;由中国地理信息…

linux 命令:nc、netcat、ncat、socat

参考 &#xff1a;http://www.linuxso.com/command/nc.html NC工具的使用说明教程&#xff1a;https://blog.csdn.net/xysoul/article/details/52270149 window 版本 nc 下载&#xff1a;https://eternallybored.org/misc/netcat/ 1、nc、ncat 简介 NC 全名 Netcat (网络刀)&…

关于GPS 车辆定位导航中的投影变换

GPS 采用 WGS-84 椭球地理坐标, 用经、纬度和大地系来表示3 维空间信息。因此,GPS 车辆定位导航监控中心接收到的只是经、纬度信息, 必须通过高斯投影将其转换成高斯坐标。转换公式如下:转载于:https://www.cnblogs.com/kaixin110/archive/2007/12/11/990851.html

Linux下查看系统版本号信息的方法

From&#xff1a;https://linux.cn/article-9586-1.html 如果你加入了一家新公司&#xff0c;要为开发团队安装所需的软件并重启服务&#xff0c;这个时候首先要弄清楚它们运行在什么发行版以及哪个版本的系统上&#xff0c;你才能正确完成后续的工作。作为系统管理员&#xff…

人工智能的恶意用途:预测、预防和缓解

来源&#xff1a; 新浪科技编译一份由26名专家联合撰写的报告&#xff0c;对人工智能技术的潜在威胁发出警告。他们认为&#xff0c;这项技术可能在未来5到10年催生新型网络犯罪、实体攻击和政治颠覆。这份100页的报告标题为《人工智能的恶意用途&#xff1a;预测、预防和缓解》…

CSS使用总结

在分配ID和类名时&#xff0c;尽可能保持与表现形式无关&#xff0c;例如contleft有可能以后希望出现在右边。 尽量少使用类,因为可以层叠识别,如: .News h3而不必在h3上加类<div class”News”> <h3></h3> <h2></h2> <p></p> <…

linq 关联查询

可得会在以后的实体类中能用到usingSystem;usingSystem.Collections;usingSystem.Configuration;usingSystem.Data;usingSystem.Linq;usingSystem.Web;usingSystem.Web.Security;usingSystem.Web.UI;usingSystem.Web.UI.HtmlControls;usingSystem.Web.UI.WebControls;usingSyst…

DeepMind提出「心智神经网络ToMnet」,训练机器的「理解」能力

原文来源&#xff1a;arXiv作者&#xff1a;Neil C. Rabinowitz、Frank Perbet、H. Francis Song、Chiyuan Zhang、S. M. Ali Eslami、Matthew Botvinick「雷克世界」编译&#xff1a;嗯~阿童木呀、KABUDA一般来说&#xff0c;心智理论&#xff08;ToM&#xff0c;Premack和Woo…

华为正式发布5G商用芯片、5G终端!

来源:&#xff1a;5G概要&#xff1a;5G网络和5G终端是5G商用的两个基础条件。重大信息未来智能实验室是人工智能学家与科学院相关机构联合成立的人工智能&#xff0c;互联网和脑科学交叉研究机构。由互联网进化论作者&#xff0c;计算机博士刘锋与中国科学院虚拟经济与数据科学…