《500 Lines or Less》(5)异步爬虫

https://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html
——A. Jesse Jiryu Davis and Guido van Rossum

介绍

网络程序消耗的不是计算资源,而是打开许多缓慢的连接,解决此问题的现代方法是异步IO。
本章介绍一个简单的网络爬虫,使用异步I/O实现。本章分三个部分:首先异步事件循环,以及一个带有回调的爬虫。其次,我们说明了Python协程高效且可扩展。我们使用生成器实现简单的协程。最后,我们使用Python标准库asyncio的协程,并使用异步队列来协调。

任务

网络爬虫的功能是下载网站上的所有页面,从根URL,获取页面,并解析页面中的链接,并对链接中的页面进行相同的操作,直到世界的尽头。我们可以通过并发进行加速,同时下载许多页面。

传统方法

如何实现并发的爬虫?传统上,我们使用线程池

def fetch(url):sock = socket.socket()sock.connect(('xkcd.com', 80))request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)sock.send(request.encode('ascii'))response = b''chunk = sock.recv(4096)while chunk:response += chunkchunk = sock.recv(4096)# Page is now downloaded.links = parse_links(response)q.add(links)

默认情况下,socket操作处于阻塞状态,线程调用connectrecv方法时,它会暂停。因此,要一次下载多个页面,我们需要许多线程。为了方便线程复用(减少创建和销毁线程的开销),我们使用线程池
但线程成本高,操作系统可能对线程数量设置了上限。如果我们在并发socket上同时扩展到数以计的操作,我们会在用完socket之前用完线程。每个线程的开销或系统限制是瓶颈。

Async(异步)

异步 I/O 框架使用非阻塞socket在单个线程上执行并发操作。在我们的异步爬虫程序中,我们在开始连接到服务器之前将套接字设置为非阻塞:

sock = socket.socket()
sock.setblocking(False) # 非阻塞 
try:sock.connect(('xkcd.com', 80))
except BlockingIOError:pass

恼火的是,非阻塞socket会从connect中抛出异常,即使它正常工作。此异常复制了底层的C函数的行为,该函数将EINPROGRESS设置为errno通知开始。
现在,我们的爬虫需要一种方法来知道连接何时建立,以便它可以发送 HTTP 请求。我们可以在一个循环中反复尝试:

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')
while True:try:sock.send(encoded)break  # Done.except OSError as e:pass
print('sent')

这种方法不仅浪费电力,而且无法有效地等待多个sockets上的事件。在古代,BSD Unix 的解决方案是 select ,一个 C 函数,它等待事件发生在一个非阻塞套接字或它们的一个小数组上。如今,对具有大量连接的互联网应用程序的需求导致了像poll ,然后是BSD的 kqueue 和 Linux 上的epoll 等替代品的出现。这些 API 与 select 类似,但在连接数量非常多的情况下表现良好。

Python 3.4的 DefaultSelector 使用系统上可用的最好的 类select函数。要注册有关网络 I/O 的通知,我们创建一个非阻塞套接字,并将其注册到默认选择器

from selectors import DefaultSelector, EVENT_WRITE
selector = DefaultSelector() # 选择器
sock = socket.socket()     
sock.setblocking(False) # 非阻塞套接字
try:sock.connect(('xkcd.com', 80))
except BlockingIOError:pass
def connected():selector.unregister(sock.fileno())print('connected!')
selector.register(sock.fileno(), EVENT_WRITE, connected)

我们忽略虚假错误并调用 selector.register() ,传入套接字的文件描述符和一个表示我们正在等待的事件的常量。为了在建立连接时收到通知,我们传递 EVENT_WRITE :也就是说,我们想知道套接字何时是“可写”的。我们还传递了函数 connected ,以便在该事件发生时运行。这样的函数称为回调(callback)

当选择器接收 I/O 通知时,以遍历方式处理:

def loop():while True:events = selector.select()for event_key, event_mask in events:callback = event_key.datacallback()

event_key.data作为connected的回调函数 ,一旦连接了非阻塞套接字,我们就会检索并执行。
与上面的快速旋转循环不同,对 select()的调用会暂停,等待下一个 I/O 事件。然后,循环运行正在等待这些事件的回调。尚未完成的操作将保持挂起状态,直到事件循环的未来某个滴答声。

我们展示了如何在操作准备就绪时开始操作并执行回调。异步框架基于我们展示的两个功能(非阻塞套接字事件循环)构建,用于在单个线程上运行并发操作。

我们在这里实现了“并发性”,但不是传统上所说的“并行性”。也就是说,我们构建了一个执行重叠 I/O 的微型系统。它能够在其他人在飞行过程中开始新的操作。它实际上并不利用多个内核来并行执行计算。但是,这个系统是为 I/O 密集型问题而设计的,而不是 CPU 密集型问题。

因此,我们的事件循环在并发 I/O 时是有效的,因为它不会将线程资源专用于每个连接。但在我们继续之前,重要的是要纠正一个常见的误解,即异步比多线程更快。通常情况并非如此,事实上,在 Python 中,像我们这样的事件循环在处理少量非常活跃的连接时比多线程慢一些。在没有全局解释器锁的运行时中,线程在这样的工作负载上会表现得更好。异步 I/O 适用于具有许多缓慢或昏昏欲睡的连接且事件不频繁的应用程序。

使用回调进行编程

使用我们目前构建的简陋的异步框架,我们如何构建网络爬虫?即使是一个简单的 URL 获取器也很难编写。
我们从尚未获取的 URL 以及看过的 URL 开始:

urls_todo = set(['/'])
seen_urls = set(['/'])

获取页面将需要一系列回调。连接套接字时触发 connected 回调,并向服务器发送 GET 请求。但随后它必须等待响应,因此它会注册另一个回调。如果在触发该回调时,它还无法读取完整的响应,则会再次注册,依此类推。

让我们将这些回调收集到一个Fetcher对象中,它需要一个 URL、一个套接字对象和一个累积响应字节的位置

class Fetcher:def __init__(self, url):self.response = b''  # Empty array of bytes.self.url = urlself.sock = None

首先调用Fetcher.fetch()

    # Method on Fetcher class.def fetch(self):self.sock = socket.socket()self.sock.setblocking(False)try:self.sock.connect(('xkcd.com', 80))except BlockingIOError:pass# Register next callback.selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

fetch() 方法开始连接套接字。但请注意,该方法在(完全)建立连接之前返回。它必须将控制权返回到事件循环以等待连接。为了理解原因,想象一下我们的整个应用程序是这样的结构:

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()
while True:events = selector.select()for event_key, event_mask in events:callback = event_key.datacallback(event_key, event_mask)

当事件循环调用 select 时,所有事件通知都会在事件循环中处理。因此 fetch() 必须将控制权交给事件循环,以便程序知道套接字何时连接。只有这样,循环才会运行connected 回调,该回调已在上面的 fetch 末尾注册。

以下是 connected 的实现:

    # Method on Fetcher class.def connected(self, key, mask):print('connected!')selector.unregister(key.fd)request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)self.sock.send(request.encode('ascii'))# Register the next callback.selector.register(key.fd,EVENT_READ,self.read_response)

该方法发送 GET 请求。一个真正的应用程序会检查 send 的返回值, 以防无法一次发送整个消息。但是我们的请求很小,应用程序也不复杂。它轻快地调用 send ,然后等待响应。当然,它必须注册另一个回调,并放弃对事件循环的控制权。下一个也是最后一个回调 read_response ,处理服务器的回复:

    # Method on Fetcher class.def read_response(self, key, mask):global stoppedchunk = self.sock.recv(4096)  # 4k chunk size.if chunk:self.response += chunkelse:selector.unregister(key.fd)  # Done reading.links = self.parse_links()# Python set-logic:for link in links.difference(seen_urls):urls_todo.add(link)Fetcher(link).fetch()  # <- New Fetcher.seen_urls.update(links)urls_todo.remove(self.url)if not urls_todo:stopped = True

每次选择器看到套接字是“可读的”时,都会执行回调,这可能意味着两件事:套接字有数据或已关闭。回调要求从套接字提供最多 4 KB 的数据。如果准备就绪, chunk 包含任何可用的数据。如果有更多 chunk ,则长度为 4 KB,并且套接字保持可读性,因此事件循环会在下一个时钟周期再次运行此回调。响应完成后,服务器已关闭套接字, chunk为空。

parse_links 方法(未显示)返回一组 URL。我们为每个新 URL 启动一个新的提取器,没有并发上限。请注意带有回调的异步编程的一个很好的功能:我们不需要对共享数据的更改进行互斥锁,例如当我们添加指向 seen_urls 的链接时。没有抢占式的多任务处理,因此我们不能在代码中的任意点被打断。

我们添加一个全局变量 stopped ,用来控制循环:

stopped = False
def loop():while not stopped:events = selector.select()for event_key, event_mask in events:callback = event_key.datacallback()

下载所有页面后,提取器将停止循环,程序将退出。

这个例子把异步的问题说得很清楚:意大利面条代码。我们需要某种方式来表达一系列计算和 I/O 操作,并安排多个这样的操作并发运行。但是如果没有线程,就无法将一系列操作收集到单个函数中:每当函数开始 I/O 操作时,它都会显式保存将来需要的任何状态,然后返回。您负责思考和编写此保存状态的代码。

让我们解释一下。考虑一下我们使用传统阻塞套接字在线程上获取 URL 是多么简单:

# Blocking version.
def fetch(url):sock = socket.socket()sock.connect(('xkcd.com', 80))request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)sock.send(request.encode('ascii'))response = b''chunk = sock.recv(4096)while chunk:response += chunkchunk = sock.recv(4096)# Page is now downloaded.links = parse_links(response)q.add(links)

此函数在一个套接字操作和下一个套接字操作之间记住什么状态?它有套接字、URL 和累积的 response 。在线程上运行的函数使用编程语言的基本功能将此临时状态存储在其堆栈上的局部变量中。该函数还具有“延续”,即它计划在 I/O 完成后执行的代码。运行时通过存储线程的指令指针来记住延续。您无需考虑恢复这些局部变量和 I/O 之后的延续

但是对于基于回调的异步框架,在等待 I/O 时,函数必须显式保存其状态,因为函数会在 I/O 完成之前返回并丢失其堆栈帧。代替局部变量,我们基于回调的示例将 sockresponse 存储 为 Fetcher 实例的 self 属性。代替指令指针,它通过注册回调 connectedread_response 来存储其延续。随着应用程序功能的增长,我们在回调中手动保存的状态的复杂性也在增加。如此繁重的会计工作使编码员容易患偏头痛。

更糟糕的是,如果回调在安排链中的下一个回调之前抛出异常,会发生什么?假设 parse_links() 做得很差,并且在解析一些 HTML 时会抛出异常:

Traceback (most recent call last):File "loop-with-callbacks.py", line 111, in <module>loop()File "loop-with-callbacks.py", line 106, in loopcallback(event_key, event_mask)File "loop-with-callbacks.py", line 51, in read_responselinks = self.parse_links()File "loop-with-callbacks.py", line 67, in parse_linksraise Exception('parse error')
Exception: parse error

堆栈跟踪仅显示事件循环正在运行回调。我们不记得是什么导致了错误。链条的两端都断了:我们忘记了我们要去哪里,我们从哪里来。这种上下文的丢失被称为“堆栈撕裂”,在许多情况下,它使调查人员感到困惑。堆栈撕裂还阻止我们为回调链安装异常处理程序,就像“try / except”块包装函数调用及其后代树的方式一样。

因此,即使除了关于多线程和异步的相对效率的长期争论之外,还有另一个关于哪个更容易出错的争论:如果线程在同步时出错,它们很容易受到数据争用的影响,但由于堆栈撕裂,回调很难调试

协程 Coroutines

我们用画饼来吸引你:编写异步代码,将回调的效率与多线程编程的经典美观相结合。这种组合是通过一种称为“协程”的模式实现的。使用 Python 3.4 的标准 asyncio 库和一个名为“aiohttp”的包,在协程中获取 URL 非常直接:

    @asyncio.coroutinedef fetch(self, url):response = yield from self.session.get(url)body = yield from response.read()

协程也是可扩展的。与每个线程 50k 的内存和操作系统对线程的硬性限制相比,Python 协程在 Jesse 的系统上仅占用 3k 的内存。Python 可以轻松启动数十万个协程。

协程的概念可以追溯到计算机科学的早期,很简单:它是一个可以暂停和恢复的子程序。虽然线程由操作系统抢先地进行多任务处理,但协程协同处理多任务:它们选择何时暂停,以及接下来要运行哪个协程。

协程有许多实现;即使在 Python 中也有几个。Python 3.4 中标准“asyncio”库中的协程基于生成器、Future 类和“yield from”语句构建。从 Python 3.5 开始,协程是语言本身 的原生特性;但是,理解协程最初是在 Python 3.4 中实现的,使用预先存在的语言工具,是解决 Python 3.5 原生协程的基础。

为了解释 Python 3.4 基于生成器的协程,我们将对生成器以及它们如何在 asyncio 中用作协程进行阐述,相信您会喜欢阅读它,就像我们喜欢编写它一样。一旦我们解释了基于生成器的协程,我们将在异步网络爬虫中使用它们。

Python 生成器(Generator) 的工作原理

在掌握 Python 生成器之前,您必须了解常规 Python 函数的工作原理。通常,当 Python 函数调用子例程时,子例程会保留控制权,直到它返回或引发异常。然后,控制权返回给调用方:

>>> def foo():
...     bar()
...
>>> def bar():
...     pass

标准的 Python 解释器是用 C 语言编写的。执行 Python 函数的 C 函数被巧妙地称为 PyEval_EvalFrameEx 。它采用一个 Python堆栈帧对象,并在帧的上下文中计算 Python 字节码。这是foo的字节码:

>>> import dis
>>> dis.dis(foo)2           0 LOAD_GLOBAL              0 (bar)3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)6 POP_TOP7 LOAD_CONST               0 (None)10 RETURN_VALUE

foo 函数加载 bar 到其堆栈上并调用,然后从堆栈中弹出其返回值,加载 None 到堆栈上,然后返回 None

PyEval_EvalFrameEx 遇到字节码CALL_FUNCTION时 ,它会创建一个新的 Python 堆栈帧并递归:也就是说,它以递归方式调用 PyEval_EvalFrameEx 新帧,用于执行 bar

了解 Python 堆栈帧是在堆内存中分配的,这一点至关重要!Python 解释器是一个普通的 C 程序,所以它的堆栈帧是普通的堆栈帧。但是它操作的 Python 堆栈帧在上。令人惊讶的是,这意味着 Python 堆栈帧可以比其函数调用更长久。要以交互方式查看此内容,请从bar 中保存当前帧:

>>> import inspect
>>> frame = None
>>> def foo():
...     bar()
...
>>> def bar():
...     global frame
...     frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'

函数调用舞台已为 Python 生成器搭建,它们利用相同的构建块——代码对象和堆栈框架——创造出令人惊叹的效果。

这是一个生成器函数:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     

当 Python 编译 gen_fn 为字节码时,它会看到该 yield 语句并知道这是一个 gen_fn 生成器函数,而不是常规函数。它设置了一个标志来记住这个事实:

>>> # The generator flag is bit position 5.
>>> generator_bit = 1 << 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True

调用生成器函数时,Python 会看到生成器标志,并且它实际上不会运行该函数,而是创建一个生成器

>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>

Python 生成器封装了一个堆栈帧以及对某些代码的引用,即 gen_fn

>>> gen.gi_code.co_name
'gen_fn'

从调用到 gen_fn 的所有生成器都指向相同的代码。但每个都有自己的堆栈框架。此堆栈帧不在任何实际堆栈上,它位于堆内存中等待使用:
生成器
该帧有一个“最后一条指令(last instruction)”指针,即它最近执行的指令。一开始,最后一个指令指针是 -1,表示生成器尚未开始:

>>> gen.gi_frame.f_lasti
-1

当我们调用send 时,生成器到达它的第一个 yield ,并暂停。 send 的返回值为 1,因为这是 gen 传递给 yield 表达式的内容:

>>> gen.send(None)
1

生成器的指令指针现在从头开始的 3 个字节,是编译后的 Python 的 56 字节的一部分:

>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56

生成器可以在任何时候从任何函数恢复,因为它的堆栈帧实际上不在栈(stack)上:它在堆(heap)上。它在调用层次结构中的位置不是固定的,它不需要像常规函数那样遵循先进后出的执行顺序。它被解放了,像云一样自由地漂浮。

我们可以将值“hello”发送到生成器中,它成为 yield 表达式的结果,生成器继续直到它产生 2:

>>> gen.send('hello')
result of yield: hello
2

它的堆栈帧现在包含局部变量 result :

>>> gen.gi_frame.f_locals
{'result': 'hello'}

从 gen_fn 中创建的其他生成器将具有自己的堆栈帧和局部变量。

当我们再次调用 send 时,生成器从其第二个 yield 继续,并通过引发特殊 StopIteration 异常来结束:

>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):File "<input>", line 1, in <module>
StopIteration: done

异常有一个值,该值是生成器的返回值:字符串 “done” 。

使用生成器构建协程

所以生成器可以暂停,它可以用一个值恢复,它有一个返回值。听起来像是一个很好的原语,可以在其上构建异步编程模型,而无需意大利面条回调!我们想要构建一个“协程”:一个与程序中的其他例程协同调度的例程。我们的协程将是 Python 标准“asyncio”库中的协程的简化版本。与asyncio 一样,我们将使用生成器futuresyield from语句。

首先,我们需要一种方法来表示协程正在等待的一些future结果。
精简版Future

class Future:def __init__(self):self.result = Noneself._callbacks = []def add_done_callback(self, fn):self._callbacks.append(fn)def set_result(self, result):self.result = resultfor fn in self._callbacks:fn(self)

Future最初是“悬而未决”的。它通过调用 set_result 来“解决”。
让我们调整我们的Fetcher以使用 futures协程。我们为 fetch 写了一个回调:

class Fetcher:def fetch(self):self.sock = socket.socket()self.sock.setblocking(False)try:self.sock.connect(('xkcd.com', 80))except BlockingIOError:passselector.register(self.sock.fileno(),EVENT_WRITE,self.connected)def connected(self, key, mask):print('connected!')# And so on....

fetch()方法首先连接套接字,然后注册回调connected,以便在套接字准备就绪时执行。现在我们可以将这两个步骤合并到一个协程中:

    def fetch(self):sock = socket.socket()sock.setblocking(False)try:sock.connect(('xkcd.com', 80))except BlockingIOError:passf = Future()def on_connected():f.set_result(None)selector.register(sock.fileno(),EVENT_WRITE,on_connected)yield fselector.unregister(sock.fileno())print('connected!')

现在fetch 是一个生成器函数,它包含一个 yield 语句。我们创建一个挂起的Future,然后用yield暂停,直到套接字准备就绪。内部函数 on_connected 决定future

但是,当future确定时,谁来恢复生成器?我们需要一个协程驱动程序。我们称之为“任务”(task):

class Task:def __init__(self, coro):self.coro = corof = Future()f.set_result(None)self.step(f)def step(self, future):try:next_future = self.coro.send(future.result)except StopIteration:returnnext_future.add_done_callback(self.step)
# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())
loop()

任务通过 发送None 来启动 fetch 生成器。然后 fetch 运行直到产生一个future,任务将其捕获为 next_future。当套接字连接时,事件循环运行回调 on_connected,它确定future,调用 step(),恢复 fetch

分解 yield from 协程

连接套接字后,我们发送 HTTP GET 请求并读取服务器响应。这些步骤不再需要分散在回调中;我们将它们收集到相同的生成器函数中:

    def fetch(self):# ... connection logic from above, then:sock.send(request.encode('ascii'))while True:f = Future()def on_readable():f.set_result(sock.recv(4096))selector.register(sock.fileno(),EVENT_READ,on_readable)chunk = yield fselector.unregister(sock.fileno())if chunk:self.response += chunkelse:# Done reading.break

这段代码从套接字读取整条消息,似乎是通用的。我们如何将其从 fetch 中分解为一个子程序?这时,Python 3 中著名的 yield from 登场了,它允许将生成器委托给另一个生成器

为了了解如何操作,让我们回到简单的生成器示例:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...  

要从另一个生成器调用此生成器,通过yield from 委托:

>>> # Generator function:
>>> def caller_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return value of yield-from: {}'
...           .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()

caller 生成器的行为就好像它是被委托的生成器gen

>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti  # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):File "<input>", line 1, in <module>
StopIteration

callergen生成值时 ,不会前进。请注意,即使内部生成器gen从一个 yield 语句 前进到下一个语句,它的指令指针仍保持在 15 处(即其 yield from 语句的位置)。 从外部 caller 的角度来看,我们无法判断它产生的值是来自 caller 还是来自它委托给的生成器。从内部 gen ,我们无法判断值是从内部发送的 caller 还是从外部发送的。yield from 语句是一个无摩擦的通道,值通过该通道流入和流出 gen ,直到 gen 完成。

协程可以通过yield from 将工作委派给 子协程,并接收工作结果。请注意,上面 caller 打印了“yield-from: done”的返回值。 gen完成后 ,其返回值成为calleryield from的值:

rv = yield from gen

早些时候,当我们批评基于回调的异步编程时,我们最尖锐的抱怨是关于“堆栈撕裂”:当回调抛出异常时,堆栈跟踪通常是无用的。它只显示事件循环正在运行回调,而不是为什么。协程的表现如何?

>>> def gen_fn():
...     raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):File "<input>", line 1, in <module>File "<input>", line 3, in caller_fnFile "<input>", line 2, in gen_fn
Exception: my error

这更有用!堆栈跟踪显示 caller_fn 在抛出错误时委派给 gen_fn 。更令人欣慰的是,我们可以将对子协程的调用包装在异常处理程序中,普通子例程也是如此:

>>> def gen_fn():
...     yield 1
...     raise Exception('uh oh')
...
>>> def caller_fn():
...     try:
...         yield from gen_fn()
...     except Exception as exc:
...         print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh

因此,就像使用常规子程序一样,我们将逻辑分解成子协程。让我们从fetcher中分解出一些子协程。我们编写
read 协程来接收一个块:

def read(sock):f = Future()def on_readable():f.set_result(sock.recv(4096))selector.register(sock.fileno(), EVENT_READ, on_readable)chunk = yield f  # Read one chunk.selector.unregister(sock.fileno())return chunk

read_all协程接收完整消息:

def read_all(sock):response = []# Read whole response.chunk = yield from read(sock)while chunk:response.append(chunk)chunk = yield from read(sock)return b''.join(response)

如果你眯着眼睛看,这些 yield from 语句就会消失,这些语句看起来像是执行阻塞 I/O 的传统函数。但实际上, readread_all 是协程。yield from read 会暂停 read_all,直到 I/O 完成。当read_all暂停时 ,asyncio 的事件循环执行其他工作并等待其他 I/O 事件; 一旦read_all就绪,就会在下个循环中以read的结果恢复。

在堆栈的根目录下,fetch调用read_all

class Fetcher:def fetch(self):# ... connection logic from above, then:sock.send(request.encode('ascii'))self.response = yield from read_all(sock)

奇迹般地,Task 类不需要修改。它像以前一样驱动外部 fetch 协程:

Task(fetcher.fetch())
loop()

read产生一个future时 ,task通过 yield from的通道接收它,就好像future是直接从 fetch 中产生的一样。当循环解决完future时,任务将其结果发送到 fetch 中,并且该值由 read 接收,就像task直接驱动 read 一样:
yield from
了完善我们的协程实现,我们去除了一个瑕疵:我们的代码在等待future时使用 yield ,但在委托给子协程时使用 yield from 。如果我们在协程暂停时始终使用 yield from ,那将更加精致。这样一来,协程就不需要关心等待的是什么类型的东西。

我们利用了 Python 中生成器和迭代器之间的深度对应关系。对于调用者来说,使用生成器与使用迭代器相同。因此,我们通过实现特殊方法__iter__使 Future可迭代

    # Method on Future class.def __iter__(self):# Tell Task to resume me here.yield selfreturn self.result

Future__iter__ 方法是一个协程序,返回Future自身。
现在,当我们将代码:

# f is a Future.
yield f

替换为

# f is a Future.
yield from f

…结果是一样的!驱动Task从其对send 的调用 中接收future,当future被决定时,它会将新结果发送回协程。

统一使用 yield from 有什么好处?为什么这比等待具有 yieldfuture和委托给具有 yield from 的子协程更好?这更好,因为现在,一个方法可以自由地更改其实现而不影响调用者:它可能是一个返回future的普通方法,也可能是一个包含 yield from 语句(并返回值)的协程。对于这两种情况,调用者只需要 yield from 方法以等待结果。

耐心的读者,我们已经结束了对异步协程的愉快阐述。我们窥视了生成器的机制,并勾勒出futuretask的实现。我们概述了 asyncio 如何的两全其美:并发 I/O 比线程更高效,比回调更清晰。当然,真正的异步比我们的草图要复杂得多。真正的异步框架解决了零拷贝 I/O、公平调度、异常处理和大量其他功能。

对于 asyncio 用户来说,使用协程进行编码比您在这里看到的要简单得多。在上面的代码中,我们从第一性原理实现了协程,因此您看到了回调、任务和未来。您甚至看到了非阻塞套接字和对 select .但是,当需要使用 asyncio 构建应用程序时,这些都不会出现在您的代码中。正如我们承诺的那样,您现在可以时尚地获取 URL:

    @asyncio.coroutinedef fetch(self, url):response = yield from self.session.get(url)body = yield from response.read()

对这个阐述感到满意,我们回到我们最初的任务:使用 asyncio 编写一个异步网络爬虫。

协调协程

我们首先描述了我们希望我们的爬虫如何工作。现在是时候用 asyncio 协程来实现它了。

我们的爬虫将获取第一页,解析其链接,并将链接添加到队列中。在此之后,它会在整个网站上扇出,同时获取页面。但是为了限制客户端和服务器上的负载,我们希望运行一些最大数量的工作线程,而不是更多。每当worker完成获取页面时,它应该立即从队列中拉取下一个链接。我们将经历没有足够的工作可以进行的时期,因此一些worker必须暂停。但是,当一个worker点击一个包含新链接的页面时,队列会突然增加,任何暂停的worker都应该醒来并破解。最后,完成工作后,程序就退出。

想象一下,如果worker线程,该如何表达爬虫的算法?我们可以使用 Python 标准库中的同步队列。每次将项目放入队列时,队列都会递增其“task”计数。工作线程在完成项目工作后进行调用 task_done 。主线程会在Queue.join 阻塞,直到队列中的每个项目都与 task_done 调用匹配,然后退出。

协程使用与异步队列完全相同的模式!首先我们导入Queue

try:from asyncio import JoinableQueue as Queue
except ImportError:# Python 3.5: asyncio.JoinableQueue is merged into Queue.from asyncio import Queue

我们在crawler类中

  • 收集worker的共享状态,
  • crawl 方法中编写主逻辑。我们从协程启动 crawl ,运行 asyncio 的事件循环,直到 crawl 完成:
loop = asyncio.get_event_loop()
crawler = crawling.Crawler('http://xkcd.com',max_redirect=10)
loop.run_until_complete(crawler.crawl())

爬虫以根 URL 和 max_redirect(某个 URL 的最大重定向数量)开始。它将(URL, max_redirect) 放入队列中。(原因敬请期待)

class Crawler:def __init__(self, root_url, max_redirect):self.max_tasks = 10self.max_redirect = max_redirectself.q = Queue()self.seen_urls = set()# aiohttp's ClientSession does connection pooling and# HTTP keep-alives for us.self.session = aiohttp.ClientSession(loop=loop)# Put (URL, max_redirect) in the queue.self.q.put((root_url, self.max_redirect))

队列中未完成的任务数现在为 1。回到我们的主脚本中,我们启动事件循环crawl方法:

loop.run_until_complete(crawler.crawl())

crawl 协程启动了worker。它类似主线程:一直在 join 处阻塞,直到所有任务完成,而worker在后台运行。

    @asyncio.coroutinedef crawl(self):"""Run the crawler until all work is done."""workers = [asyncio.Task(self.work())for _ in range(self.max_tasks)]# When all work is done, exit.yield from self.q.join()for w in workers:w.cancel()

如果 worker 是线程,我们可能不希望一次启动它们。为了避免在确定需要之前创建昂贵的线程,线程池通常会按需增长。但是协程很便宜,所以我们从允许的最大数量开始。

有趣的是如何关闭爬虫。当 join future确定时,工作线程任务处于活动状态,但已暂停:它们等待更多 URL,但没有一个 URL 出现。因此,主协程在退出之前会取消它们。否则,当 Python 解释器关闭并调用所有对象的析构函数时,活动任务会大叫:

ERROR:asyncio:Task was destroyed but it is pending!

cancel 如何 工作?生成器具有我们尚未向您展示的功能。您可以从外部将异常抛入生成器:

>>> gen = gen_fn()
>>> gen.send(None)  # Start the generator as usual.
1
>>> gen.throw(Exception('error'))
Traceback (most recent call last):File "<input>", line 3, in <module>File "<input>", line 2, in gen_fn
Exception: error

生成器由 throw 恢复,但现在引发异常。如果生成器的调用堆栈中没有捕获它,则异常会冒泡回顶部。因此,要取消任务的协程:

    # Method of Task class.def cancel(self):self.coro.throw(CancelledError)

无论生成器在哪里暂停(在某个 yield from 语句中),它都会恢复并抛出异常。我们在任务的 step 的方法处理取消:

    # Method of Task class.def step(self, future):try:next_future = self.coro.send(future.result)except CancelledError:self.cancelled = Truereturnexcept StopIteration:returnnext_future.add_done_callback(self.step)

现在任务知道它被取消了,所以当它被摧毁时,它不会对光的消逝感到愤怒。

一旦 crawl 取消了工作线程,它就会退出。事件循环看到协程完成了(我们稍后会看到),它也会退出:

loop.run_until_complete(crawler.crawl())

crawl 方法包含了我们的主要协程必须执行的所有操作。工作器协程从队列中获取 URL,获取它们,并解析它们以获取新链接。每个 worker 独立运行 work 协程:

    @asyncio.coroutinedef work(self):while True:url, max_redirect = yield from self.q.get()# Download page and add new links to self.q.yield from self.fetch(url, max_redirect)self.q.task_done()

Python 看到此代码包含 yield from 语句,并将其编译为生成器函数。所以在 crawl中 ,当主协程调用self.work 十次时,它实际上并没有执行这个方法:它只创建十个引用此代码的生成器对象。它将每个包装在一个 Task 中。task接收 生成器产生的future,并通过在future确定时,以每个future的结果调用send 来驱动生成器。由于生成器有自己的堆栈帧,因此它们独立运行,具有单独的局部变量和指令指针。

worker通过队列和同伴进行协调,通过下面代码等待新的url:

url, max_redirect = yield from self.q.get()

队列的 get 方法本身就是一个协程:它会暂停,直到有人将一个项目放入队列中,然后恢复并返回该项目。
顺便说一句,这是在爬虫结束时,当主协程取消它时,worker暂停的位置。从协程的角度来看,它在循环中的最后一次行程在 yield from 引发 CancelledError。

当worker获取页面时,它会解析链接并将新链接放入队列中,然后调用 task_done 以递减计数器。最终,worke获取一个页面,其 URL 已全部获取,并且队列中也没有剩余工作。因此,该worke的调用 task_done 将计数器递减为零。然后 crawl ,正在等待队列 join 方法的 取消暂停并完成。

我们承诺解释为什么队列中的项目是成对的,例如:

# URL to fetch, and the number of redirects left.
('http://xkcd.com/353', 10)

新 URL 还剩下 10 个重定向。获取此特定 URL 会导致重定向到带有尾部斜杠的新位置。我们减少剩余的重定向数量,并将下一个位置放入队列中:

# URL with a trailing slash. Nine redirects left.
('http://xkcd.com/353/', 9)

默认情况下, aiohttp 将遵循重定向并给我们最终响应。但是,我们告诉它不要这样做,并在爬虫中处理重定向,因此它可以合并指向同一目的地的重定向路径:如果我们已经看到此 URL,则它已在 self.seen_urls 并且我们已经从不同的入口点开始此路径:
重定向
爬虫获取“foo",并看到它重定向到"baz",因此将"baz"添加到队列和"seen_urls"。如果下一页是”bar",并且也重定向到"baz",则不会再次添加"baz"。如果响应是页面而不是重定向,fetch将解析链接并添加新链接到队列。

    @asyncio.coroutinedef fetch(self, url, max_redirect):# Handle redirects ourselves.response = yield from self.session.get(url, allow_redirects=False)try:if is_redirect(response):if max_redirect > 0:next_url = response.headers['location']if next_url in self.seen_urls:# We have been down this path before.return# Remember we have seen this URL.self.seen_urls.add(next_url)# Follow the redirect. One less redirect remains.self.q.put_nowait((next_url, max_redirect - 1))else:links = yield from self.parse_links(response)# Python set-logic:for link in links.difference(self.seen_urls):self.q.put_nowait((link, self.max_redirect))self.seen_urls.update(links)finally:# Return connection to pool.yield from response.release()

如果这是多线程代码,那么在竞争条件下会很糟糕。例如,工作线程检查链接是否在 seen_urls中 ,如果不是,则工作线程将其放入队列中并将其添加到 seen_urls 中。如果它在两个操作之间被中断,那么另一个工作线程可能会从不同的页面解析相同的链接,同时观察它不在seen_urls 中 ,并将其添加到队列中。现在,同一个链接在队列中两次,导致(充其量)重复工作和错误的统计数据。

但是,协程仅在语句yield from时会受到中断。这是一个关键的区别,使协程代码比多线程代码更不容易发生争用:多线程代码必须通过抓取锁来显式进入关键部分,否则它是可中断的。默认情况下,Python 协程是不可中断的,并且仅在显式产生控制权时才放弃控制权。
.
我们不再需要像在基于回调的程序中那样的 fetcher 类。该类是解决回调缺点的方法:它们在等待 I/O 时需要一些地方来存储状态,因为它们的局部变量不会在调用之间保留。但是 fetch 协程可以像常规函数一样将其状态存储在局部变量中,因此不再需要类。

fetch 处理完服务器响应后 ,它会返回给调用方 work 。work 方法调用队列的task_done ,然后从队列中获取下一个要提取的URL。

fetch将新链接放入队列中时 ,它会增加未完成任务的计数,并暂停正在等待 q.join 的主协程。但是,如果没有看不见的链接,并且这是队列中的最后一个 URL,则work调用 task_done 时 ,未完成任务的计数将降至零。该事件解除join的暂停,主协程完成。

协调 worker 和主协程的队列代码是这样的

class Queue:def __init__(self):self._join_future = Future()self._unfinished_tasks = 0# ... other initialization ...def put_nowait(self, item):self._unfinished_tasks += 1# ... store the item ...def task_done(self):self._unfinished_tasks -= 1if self._unfinished_tasks == 0:self._join_future.set_result(None)@asyncio.coroutinedef join(self):if self._unfinished_tasks > 0:yield from self._join_future

主协程 crawl 从 join中获取值。 因此,当最后一个worker将未完成任务的计数减少到零时,它会向crawl 发出恢复并完成的信号。

旅程快结束了。程序从调用crawl开始:

loop.run_until_complete(self.crawler.crawl())

该程序如何结束?由于 crawl 是一个生成器函数,调用它将返回一个生成器。为了驱动生成器,asyncio 将其包装在一个任务中:

class EventLoop:def run_until_complete(self, coro):"""Run until the coroutine is done."""task = Task(coro)task.add_done_callback(stop_callback)try:self.run_forever()except StopError:pass
class StopError(BaseException):"""Raised to stop the event loop."""
def stop_callback(future):raise StopError

当任务完成时,它会发出 StopError ,循环将其用作已正常完成的信号。
但这是什么?该任务具有名

add_done_callbackresult 的方法?你可能会认为一项任务类似于一个future。你的直觉是正确的。我们必须承认一个关于我们向你隐瞒的任务类的细节:task就是future

class Task(Future):"""A coroutine wrapped in a Future."""

通常future是由其他人调用 set_result 解决的。但是,当task的协程停止时,task会自行解决。请记住,当生成器返回时,它会抛出特殊 StopIteration 异常:

    # Method of class Task.def step(self, future):try:next_future = self.coro.send(future.result)except CancelledError:self.cancelled = Truereturnexcept StopIteration as exc:# Task resolves itself with coro's return# value.self.set_result(exc.value)returnnext_future.add_done_callback(self.step)

因此,当事件循环调用 task.add_done_callback(stop_callback) 时,它准备被任务停止。
这是 run_until_complete

    # Method of event loop.def run_until_complete(self, coro):task = Task(coro)task.add_done_callback(stop_callback)try:self.run_forever()except StopError:pass

当任务捕获 StopIteration 并解决自己时,回调将从循环中引发 StopError。循环停止,调用堆栈回到 run_until_complete 。程序完成。

结论

现代程序越来越频繁地受 I/O 限制,而不是受 CPU 限制。对于这样的程序来说,Python 线程是完美的错误选择:全局解释器锁阻止它们实际并行计算,而抢占式切换使它们容易出现争用。异步通常是正确的模式。但随着基于回调的异步代码的增长,它往往会变得一团糟。协程是一个简洁的替代方案。它们自然而然地融入到子例程中,具有合理的异常处理和堆栈跟踪。

如果我们眯着眼睛使yield from 语句变得模糊,协程看起来像一个执行传统阻塞 I/O 的线程。我们甚至可以将协程与多线程编程中的经典模式进行协调。没有必要重新发明。因此,与回调相比,协程对于具有多线程经验的编码人员来说是一个诱人的习惯。

但是,当我们睁开眼睛并专注于这些 yield from 语句时,我们会看到它们在协程放弃控制权并允许其他人运行时标记点。与线程不同,协程显示我们的代码可以中断和不能中断的位置。Glyph Lefkowitz 在他的启发性文章“不屈不挠” 中写道:“线程使局部推理变得困难,而局部推理也许是软件开发中最重要的事情。然而,显式让权使得“通过检查例程本身而不是检查整个系统来理解例程的行为(以及由此而来的正确性)”成为可能。

本章是在 Python 和异步历史上的复兴时期写成的。2014 年 3 月,基于生成器的协程在 Python 3.4 的“asyncio”模块中发布。2015 年 9 月,Python 3.5 发布,其中内置了协程。这些原生协程使用新语法async def声明,而不是“yield from”,并且代指协程或等待 Future 时,不再使用“yield from”,而是使用新的“await”关键字。

尽管取得了这些进展,核心思想依然保留。Python 的新原生协程在语法上与生成器将有所区分,但工作方式非常类似;事实上,它们将在 Python 解释器内共享实现。TaskFuture事件循环将继续在其 asyncio 中扮演它们的角色。

现在你知道了 asyncio 协程的工作原理,你可以大部分忘记细节。这个机制被隐藏在一个时髦的接口后面。但是你对基本原理的掌握使你能够在现代异步环境中正确高效地编写代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/874797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32F0-标准库时钟配置指南

启动 从startup_stm32f0xx.s内的开头的Description可以看到 ;* Description : STM32F051 devices vector table for EWARM toolchain. ;* This module performs: ;* - Set the initial SP ;* - Set t…

【Leetcode】十八、动态规划:不同路径 + 最大正方形

文章目录 1、动态规划2、leetcode509&#xff1a;斐波那契数列3、leetcode62&#xff1a;不同路径4、leetcode121&#xff1a;买卖股票的最佳时机5、leetcode70&#xff1a;爬楼梯6、leetcode279&#xff1a;完全平方数7、leetcode221&#xff1a;最大正方形 1、动态规划 只能…

C#开源、简单易用的Dapper扩展类库 - Dommel

项目特性 Dommel 使用 IDbConnection 接口上的扩展方法为 CRUD 操作提供了便捷的 API。 Dommel 能够根据你的 POCO 实体自动生成相应的 SQL 查询语句。这大大减少了手动编写 SQL 代码的工作量&#xff0c;并提高了代码的可读性和可维护性。 Dommel 支持 LINQ 表达式&#xff…

记一次因敏感信息泄露而导致的越权+存储型XSS

1、寻找测试目标 可能各位师傅会有苦于不知道如何寻找测试目标的烦恼&#xff0c;这里我惯用的就是寻找可进站的思路。这个思路分为两种&#xff0c;一是弱口令进站测试&#xff0c;二是可注册进站测试。依照这个思路&#xff0c;我依旧是用鹰图进行了一波资产的搜集&#xff…

SSIS_SQLITE

1.安装 SQLite ODBC 驱动程序 2.添加SQLite数据源 在“用户DSN”或“系统DSN”选项卡中&#xff0c;点击“添加”。选择“SQLite3 ODBC Driver”&#xff0c;然后点击“完成”。在弹出的配置窗口中&#xff0c;设置数据源名称&#xff08;DSN&#xff09;&#xff0c;并指定S…

英迈中国与 Splashtop 正式达成战略合作协议

2024年7月23日&#xff0c;英迈中国与 Splashtop 正式达成战略合作协议&#xff0c;英迈中国正式成为其在中国区的战略合作伙伴。此次合作将结合 Splashtop 先进的远程桌面控制技术和英迈在技术服务与供应链管理领域的专业优势&#xff0c;为中国地区的用户带来更加安全的远程访…

联想教育电脑硬盘保护同传EDU系统使用简明教程

目录 一、原理概述 二、简明使用方法 1、软件下载 2、开机引导 3、开始安装 4、使用 &#xff08;1&#xff09;进入底层 &#xff08;2&#xff09;进行分区设置 &#xff08;3&#xff09;系统设置 &#xff08;4&#xff09;安装硬盘保护驱动 &#xff08;5&…

前端模块化CommonJS、AMD、CMD、ES6

在前端开发中&#xff0c;模块化是一种重要的代码组织方式&#xff0c;它有助于将复杂的代码拆分成可管理的小块&#xff0c;提高代码的可维护性和可重用性。CommonJS、AMD&#xff08;异步模块定义&#xff09;和CMD&#xff08;通用模块定义&#xff09;是三种不同的模块规范…

leetcode-101. 对称二叉树

题目描述 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 示例 1&#xff1a; 输入&#xff1a;root [1,2,2,3,4,4,3] 输出&#xff1a;true示例 2&#xff1a; 输入&#xff1a;root [1,2,2,null,3,null,3] 输出&#xff1a;false 思路 1) 如果同时root1…

【调试笔记-20240723-Linux-gitee 仓库同步 github 仓库,并保持所有访问链接调整为指向 gitee 仓库的 URL】

调试笔记-系列文章目录 调试笔记-20240723-Linux-gitee 仓库同步 github 仓库&#xff0c;并保持所有访问链接调整为指向 gitee 仓库的 URL 文章目录 调试笔记-系列文章目录调试笔记-20240723-Linux-gitee 仓库同步 github 仓库&#xff0c;并保持所有访问链接调整为指向 gite…

Ubuntu20.04版本升级openssh9.8p1方法

一、问题描述&#xff1a; 8.5p1 和 9.7p1 之间的openssh版本漏洞可能会导致linux系统以root身份进行RCE&#xff0c;所以需安装最新版本 二、解决方法&#xff1a; 将当前openssh版本升级到最新的版本即openssh-9.8p1版本&#xff0c;OpenSSL大版本升级且OpenSSH有新稳定版本…

Zabbix监控应用

一.监控tomcat 1.在tomcat服务器上安装zabbix-agent服务 [rootnode2 etc]#vim zabbix_agentd.conf 94 Server192.168.240.13 #指向当前zabbix server ##### Passive checks related #被动检查相关配置### Option: ListenPort ListenPort10050 #监听端口 默认的无需修改11…

SPF配置教程:如何安全构建邮件发送策略?

SPF配置教程的步骤详解&#xff01;SPF记录配置方法策略有哪些&#xff1f; SPF通过允许域名所有者指定哪些主机可以代表该域发送邮件&#xff0c;从而减少电子邮件欺诈和垃圾邮件的风险。AokSend将详细介绍SPF配置教程&#xff0c;并指导您如何安全地构建邮件发送策略。 SPF…

《白话机器学习的数学》第4章——评估

4.1模型评估 1.由于像多重回归这样的问题会导致无法在图上展示&#xff0c;所以需要能够够定量地表示机器学习模型的精度。 4.2交叉验证 4.2.1回归问题的验证 1.把获取的全部训练数据分成两份&#xff1a;一份用于测试&#xff0c;一份用于训练。然后用前者来评估模型。 一般…

C# 数组常用遍历方式

// 假设数组Point[] points new Point[2];// 第一种遍历 forfor (int i 0; i < points.Length; i){Point p points[i];Console.WriteLine($"X{p.X},y{p.Y}");}// 第二种遍历 foreachforeach (Point p in points){Console.WriteLine($"X{p.X},y{p.Y}"…

TCP三次握手和四次挥手的理解

三次握手 第一次握手&#xff1a; 客户端发出 请求报文其中SYN应1&#xff0c;选择一个序列号x 第二次握手&#xff1a; 服务端接收到之后回复 确认报文&#xff0c;其中SYN应1&#xff0c;ACK1&#xff0c;确认号是x1&#xff0c;同时为自己初始化序列号y 第三次握手&…

Hadoop架构

一、案列分析 1.1案例概述 现在已经进入了大数据(Big Data)时代&#xff0c;数以万计用户的互联网服务时时刻刻都在产生大量的交互&#xff0c;要处理的数据量实在是太大了&#xff0c;以传统的数据库技术等其他手段根本无法应对数据处理的实时性、有效性的需求。HDFS顺应时代…

Linux(CentOS)的“应用商城” —— yum

Linux&#xff08;CentOS&#xff09;的“应用商城” —— yum 关于 yum 和软件包Linux 系统&#xff08;CentOS&#xff09;的生态yum 相关操作yum 本地配置yum 安装 lrzsz.x86_64 关于 yum 和软件包 首先 yum 是软件下载安装管理的客户端&#xff0c;类似各种手机里的“应用…

WEB前端10- Fetch API(同步/异步/跨域处理)

Fetch API Fetch API 可以用来获取远程数据&#xff0c;用于在 Web 应用程序中发起和处理 HTTP 请求。它基于 Promise&#xff0c;提供了一种简单而强大的方式来处理网络通信&#xff0c;替代了传统的 XMLHttpRequest。 Promise对象 Promise 对象是 JavaScript 中处理异步操…

0723,UDP通信(聪明小辉聪明小辉),HTTP协议

我就是一个爱屋及乌的人&#xff01;&#xff01;&#xff01;&#xff01; #include "network_disk_kai.h" 昨天的epoll&#xff1a; 可恶抄错代码了 epoll_s.csockect return listenfdsetsockoptsockaddr_in bind listenfd & serveraddr…