From:https://blog.csdn.net/weixin_37947156/article/details/74533108
运行入口
还是回到最初的入口,在Scrapy源码分析(二)运行入口这篇文章中已经讲解到,在执行scrapy命令时,调用流程如下:
- 调用cmdline.py的execute方法
- 调用命令实例解析命令行
- 构建CrawlerProcess实例,调用crawl和start方法
而 crawl 方法最终是调用了 Cralwer 实例的 crawl,这个方法最终把控制权交由 Engine,而 start 方法 注册好协程池,开始异步调度。
我们来看 Cralwer 的 crawl 方法:(scrapy/crawler.py)
在把控制权交给引擎调度之前,先创建出爬虫实例,然后创建引擎实例(此过程见Scrapy源码分析(三)核心组件初始化),然后调用了spider
的start_requests
方法,这个方法就是我们平时写的最多爬虫类的父类,它在spiders/__init__.py
中:
构建请求
在这里我们能看到,平时我们必须要定义的start_urls
,原来是在这里拿来构建Request
的,来看Request
的是如何构建的:
(scrapy/http/request/__init__.py)
"""
This module implements the Request class which is used to represent HTTP
requests in Scrapy.See documentation in docs/topics/request-response.rst
"""
import six
from w3lib.url import safe_url_stringfrom scrapy.http.headers import Headers
from scrapy.utils.python import to_bytes
from scrapy.utils.trackref import object_ref
from scrapy.utils.url import escape_ajax
from scrapy.http.common import obsolete_setterclass Request(object_ref):def __init__(self, url, callback=None, method='GET', headers=None, body=None,cookies=None, meta=None, encoding='utf-8', priority=0,dont_filter=False, errback=None, flags=None):# 编码self._encoding = encoding # this one has to be set first# 请求方法self.method = str(method).upper()# 设置 URLself._set_url(url)# 设置 bodyself._set_body(body)assert isinstance(priority, int), "Request priority not an integer: %r" % priority# 优先级self.priority = priorityif callback is not None and not callable(callback):raise TypeError('callback must be a callable, got %s' % type(callback).__name__)if errback is not None and not callable(errback):raise TypeError('errback must be a callable, got %s' % type(errback).__name__)assert callback or not errback, "Cannot use errback without a callback"# 回调函数self.callback = callback# 异常回调函数self.errback = errback# cookiesself.cookies = cookies or {}# 构建Headerself.headers = Headers(headers or {}, encoding=encoding)# 是否需要过滤self.dont_filter = dont_filter# 附加信息self._meta = dict(meta) if meta else Noneself.flags = [] if flags is None else list(flags)@propertydef meta(self):if self._meta is None:self._meta = {}return self._metadef _get_url(self):return self._urldef _set_url(self, url):if not isinstance(url, six.string_types):raise TypeError('Request url must be str or unicode, got %s:' % type(url).__name__)s = safe_url_string(url, self.encoding)self._url = escape_ajax(s)if ':' not in self._url:raise ValueError('Missing scheme in request url: %s' % self._url)url = property(_get_url, obsolete_setter(_set_url, 'url'))def _get_body(self):return self._bodydef _set_body(self, body):if body is None:self._body = b''else:self._body = to_bytes(body, self.encoding)body = property(_get_body, obsolete_setter(_set_body, 'body'))@propertydef encoding(self):return self._encodingdef __str__(self):return "<%s %s>" % (self.method, self.url)__repr__ = __str__def copy(self):"""Return a copy of this Request"""return self.replace()def replace(self, *args, **kwargs):"""Create a new Request with the same attributes except for thosegiven new values."""for x in ['url', 'method', 'headers', 'body', 'cookies', 'meta', 'flags','encoding', 'priority', 'dont_filter', 'callback', 'errback']:kwargs.setdefault(x, getattr(self, x))cls = kwargs.pop('cls', self.__class__)return cls(*args, **kwargs)
Request
对象比较简单,就是简单封装了请求参数、方式、回调以及可附加的属性信息。
当然,你也可以在子类重写start_requests
以及make_requests_from_url
这2个方法,来构建种子请求。
引擎调度
回到crawl
方法,构建好种子请求对象后,调用了engine
的open_spider
方法:
初始化的过程之前的文章已讲到,这里不再多说。主要说一下处理流程,这里第一步是构建了CallLaterOnce
,把_next_request
注册进去,看此类的实现:
class CallLaterOnce(object):"""Schedule a function to be called in the next reactor loop, but only ifit hasn't been already scheduled since the last time it ran."""# 在twisted的reactor中循环调度一个方法def __init__(self, func, *a, **kw):self._func = funcself._a = aself._kw = kwself._call = Nonedef schedule(self, delay=0):# 上次发起调度,才可再次继续调度if self._call is None:self._call = reactor.callLater(delay, self)def cancel(self):if self._call:self._call.cancel()def __call__(self):# 上面注册的是self,所以会执行__call__self._call = Nonereturn self._func(*self._a, **self._kw)
这里封装了循环执行的方法类,并且注册的方法会在 twisted 的 reactor 中异步执行,以后执行只需调用 schedule 方法,就会注册 self 到 reactor 的 callLater 中,然后它会执行 __call__ 方法,进而执行我们注册的方法。而这里我们注册的方法是引擎的_next_request,也就是说,此方法会循环调度,直到程序退出。
然后调用了爬虫中间件的 process_start_requests 方法,也就是说,你可以定义多个自己的爬虫中间件,每个类都重写此方法,爬虫在调度之前会分别调用你定义好的爬虫中间件,来分别处理初始化请求,你可以进行过滤、加工、筛选以及你想做的任何逻辑。这样做的好处就是,把想做的逻辑拆分成做个中间件,功能独立而且维护起来更加清晰。
调度器
接着调用了Scheduler
的open
:(scrapy/core/scheduler.py)
在open
方法中,实例化出优先级队列以及根据dqdir
决定是否使用磁盘队列,然后调用了请求指纹过滤器的open
,在父类BaseDupeFilter
中定义:
请求过滤器提供了请求过滤的具体实现方式,Scrapy默认提供了RFPDupeFilter
过滤器实现过滤重复请求的逻辑,后面讲具体是如何过滤重复请求的。
Scraper
再来看Scraper
的open_spider
:
这里的工作主要是Scraper
调用所有Pipeline
的open_spider
方法,也就是说,如果我们定义了多个Pipeline
输出类,可重写open_spider
完成每个Pipeline
处理输出开始的初始化工作。
循环调度
调用了一些列的组件的open
方法后,最后调用了nextcall.schedule()
开始调度,
也就是循环执行在上面注册的 _next_request
方法:
_next_request 方法首先调用 _needs_backout 方法检查是否需要等待,等待的条件有:
- 引擎是否主动关闭
- Slot是否关闭
- 下载器网络下载超过预设参数
- Scraper处理输出超过预设参数
如果不需要等待,则调用 _next_request_from_scheduler,此方法从名字上就能看出,主要是从 Schduler中 获取 Request。
这里要注意,在第一次调用此方法时,Scheduler 中是没有放入任何 Request 的,这里会直接 break 出来,执行下面的逻辑,而下面就会调用 crawl 方法,实际是把请求放到 Scheduler 的请求队列,放入队列的过程会经过 请求过滤器 校验是否重复。
下次再调用 _next_request_from_scheduler 时,就能从 Scheduler 中获取到下载请求,然后执行下载动作。
先来看第一次调度,执行 crawl:
调用引擎的crawl
实际就是把请求放入Scheduler
的队列中,下面看请求是如何入队列的。
请求入队
Scheduler
请求入队方法:(scrapy/core/schedulter.py)
在之前将核心组件实例化时有说到,调度器主要定义了2种队列:基于磁盘队列、基于内存队列。
如果在实例化Scheduler
时候传入jobdir
,则使用磁盘队列,否则使用内存队列,默认使用内存队列。
指纹过滤
在入队之前,首先通过请求指纹过滤器检查请求是否重复,也就是调用了过滤器的request_seen
:
(scrapy/duperfilters.py)
utils.request
的request_fingerprint
:
这个过滤器先是通过 Request 对象 生成一个请求指纹,在这里使用 sha1 算法,并记录到指纹集合,每次请求入队前先到这里验证一下指纹集合,如果已存在,则认为请求重复,则不会重复入队列。
不过如果我想不校验重复,也想重复爬取怎么办?看 enqueue_request 的第一行判断,仅需将 Request 实例的 dont_filter 定义为 True 就可以重复爬取此请求,非常灵活。
Scrapy 就是通过此逻辑实现重复请求的过滤逻辑,默认重复请求是不会进行重复抓取的。
下载请求
第一次请求进来后,肯定是不重复的,那么则会正常进入调度器队列。然后再进行下一次调度,再次调用_next_request_from_scheduler 方法,此时调用调度器的 next_request 方法,就是从调度器队列中取出一个请求,这次就要开始进行网络下载了,也就是调用 _download:(scrapy/core/engine.py)
在进行网络下载时,调用了Downloader
的fetch
:(scrapy/core/downloader/__init__.py)
这里调用下载器中间件的download
方法 (scrapy/core/downloader/middleware.py),并注册下载成功的回调方法是_enqueue_request
,来看下载方法:
在下载过程中,首先先找到所有定义好的下载器中间件,包括内置定义好的,也可以自己扩展下载器中间件,下载前先依次执行process_request 方法,可对 request 进行加工、处理、校验等操作,然后发起真正的网络下载,也就是第一个参数download_func,在这里是 Downloader 的 _enqueue_request 方法:
下载成功后回调 Downloader 的 _enqueue_request:(scrapy/core/downloader/__init__.py)
在这里,也维护了一个下载队列,可根据配置达到延迟下载的要求。真正发起下载请求的是调用了self.handlers.download_request
:(scrapy/core/downloader/handlers/__init__.py)
下载前,先通过解析request
的scheme
来获取对应的下载处理器,默认配置文件中定义的下载处理器:
然后调用 download_request 方法,完成网络下载,这里不再详细讲解每个处理器的实现,简单来说你就把它想象成封装好的网络下载库,输入URL,输出下载结果就好了,这样方便理解。
在下载过程中,如果发生异常情况,则会依次调用下载器中间件的process_exception方法,每个中间件只需定义自己的异常处理逻辑即可。
如果下载成功,则会依次执行下载器中间件的 process_response 方法,每个中间件可以进一步处理下载后的结果,最终返回。
这里值得提一下,除了process_request 方法是每个中间件顺序执行的,而 process_response 和 process_exception 方法是每个中间件倒序执行的,具体可看一下 DownaloderMiddlewareManager 的 _add_middleware 方法,可明白是如何注册这个方法链的。
拿到最终的下载结果后,再回到 ExecuteEngine 的 _next_request_from_scheduler 方法,会看到调用了_handle_downloader_output 方法,也就是处理下载结果的逻辑:
拿到下载结果后,主要分2个逻辑,如果是 Request 实例,则直接再次放入 Scheduler 请求队列。如果是 Response 或 Failure 实例,则调用 Scraper 的 enqueue_scrape 方法,进行进一步处理。
Scrapyer 主要是与 Spider 模块和 Pipeline 模块进行交互。
处理下载结果
请求入队逻辑不用再说,前面已经讲过。现在主要看Scraper
的enqueue_scrape
,看Scraper
组件是如何处理后续逻辑的:
(scrapy/core/scraper.py)
首先加入到Scraper
的处理队列中,然后从队列中获取到任务,如果不是异常结果,则调用 爬虫中间件管理器 的 scrape_response
方法:
有没有感觉套路很熟悉?与上面下载器中间件调用方式非常相似,也调用一系列的前置方法,再执行真正的处理逻辑,然后执行一些列的后置方法。
回调爬虫
在这里真正的处理逻辑是call_spider
,也就是回调我们写的爬虫类:(scrapy/core/scraper.py)
看到这里,你应该更熟悉,平时我们写的最多的爬虫模块的parse
则是第一个回调方法,后续爬虫模块拿到下载结果,可定义下载后的callback
就是在这里进行回调执行的。
处理输出
在与爬虫模块交互完成之后,Scraper
调用了handle_spider_output
方法处理输出结果:
我们编写爬虫类时,写的那些回调方法处理逻辑,也就是在这里被回调执行,执行完自定义的解析逻辑后,解析方法可返回新的 Request 或 BaseItem 实例,如果是新的请求,则再次通过 Scheduler 进入请求队列,如果是 BaseItem 实例,则调用 Pipeline 管理器,依次执行 process_item,也就是我们想输出结果时,只定义 Pepeline 类,然后重写这个方法就可以了。
ItemPipeManager 处理逻辑:
可以看到ItemPipeManager
也是一个中间件,和之前下载器中间件管理器和爬虫中间件管理器类似,如果子类有定义process_item
,则依次执行它。
执行完后,调用_itemproc_finished
:
这里可以看到,如果想在 Pipeline中 丢弃某个结果,直接抛出 DropItem 异常即可,Scrapy 会进行对应的处理。
到这里,抓取结果根据自定义的输出类输出到指定位置,而新的 Request 则会再次进入请求队列,等待引擎下一次调度,也就是再次调用 ExecutionEngine 的 _next_request 方法,直至请求队列没有新的任务,整个程序退出。
CrawlerSpider
这里也简单说一下 CrawlerSpider 类,它其实就继承了 Spider 类,然后重写了 parse 方法(这也是集成此类不要重写此方法的原因),并结合Rule等规则类,来完成Request的自动提取逻辑。
由此也可看出,Scrapy 的每个模块的实现都非常纯粹,每个组件都通过配置文件定义连接起来,如果想要扩展或替换,只需定义并实现自己的处理逻辑即可,其他模块均不受任何影响,这也导致编写一个插件是变得多么容易!
总结
总结一下整个运行流程,还是用这两张图表示再清楚不过:
Scrapy整体给我的感觉是,虽然它提供的只是单机版的爬虫框架,但我们可以通过编写更多的插件和替换某些组件,来定制化自己的爬虫,从而来实现更强大的功能,例如分布式、代理调度、并发控制、可视化、监控等等功能,都是非常方便的!