前言:
对于scrapy-redis有一个特殊的地方,就是队列的进出关系,因为我们的url请求会从各个任务统一归纳到redis里面,因此,如何解决下载请求这个问题,也是scrapy-redis的一个关键点!!!
正文:
- 先讲解代码,讲它自带的3个队列方式;
- 然后,再讲讲如何自定义队列...
原文翻译:
1.Base类
try:from scrapy.utils.request import request_from_dict
except ImportError:from scrapy.utils.reqser import request_to_dict, request_from_dictfrom . import picklecompatclass Base(object):"""Per-spider base queue class"""# 每个爬虫的基本队列类def __init__(self, server, spider, key, serializer=None):"""Initialize per-spider redis queue.初始化每个爬虫的Redis队列。Parameters----------server : StrictRedisRedis client instance. Redis客户端实例。spider : SpiderScrapy spider instance. Scrapy爬虫实例。key: strRedis key where to put and get messages. 在Redis中用于放置和获取消息的键。serializer : objectSerializer object with ``loads`` and ``dumps`` methods. 具有“loads”和“dumps”方法的序列化对象。"""if serializer is None:# Backward compatibility.# TODO: deprecate pickle.serializer = picklecompatif not hasattr(serializer, 'loads'):raise TypeError(f"serializer does not implement 'loads' function: {serializer}")if not hasattr(serializer, 'dumps'):raise TypeError(f"serializer does not implement 'dumps' function: {serializer}")self.server = serverself.spider = spiderself.key = key % {'spider': spider.name}self.serializer = serializerdef _encode_request(self, request):"""Encode a request object""""""将请求对象进行编码"""try:obj = request.to_dict(spider=self.spider)except AttributeError:obj = request_to_dict(request, self.spider)return self.serializer.dumps(obj)def _decode_request(self, encoded_request):"""Decode a request previously encoded""""""解码先前编码的请求"""obj = self.serializer.loads(encoded_request)return request_from_dict(obj, spider=self.spider)def __len__(self):"""Return the length of the queue""""""返回队列的长度"""raise NotImplementedErrordef push(self, request):"""Push a request""""""推送一个请求"""raise NotImplementedErrordef pop(self, timeout=0):"""Pop a request""""""弹出一个请求"""raise NotImplementedErrordef clear(self):"""Clear queue/stack""""""清除队列/堆栈"""self.server.delete(self.key)
2.三个自带的queue类
class FifoQueue(Base):"""Per-spider FIFO queue"""# 单个爬虫的先进先出队列def __len__(self):"""Return the length of the queue"""return self.server.llen(self.key) # 返回队列的长度def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request)) # 推送一个请求到队列头部def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.brpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.rpop(self.key)if data:return self._decode_request(data) # 从队列尾部弹出一个请求并返回class PriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""# 单个爬虫的优先级队列,使用Redis的有序集合实现def __len__(self):"""Return the length of the queue"""return self.server.zcard(self.key) # 返回队列的长度def push(self, request):"""Push a request"""data = self._encode_request(request)score = -request.priorityself.server.execute_command('ZADD', self.key, score, data) # 根据请求的优先级将请求推送到队列中def pop(self, timeout=0):"""Pop a request"""pipe = self.server.pipeline()pipe.multi()pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)results, count = pipe.execute() # 使用管道操作实现原子的范围取值和移除操作if results:return self._decode_request(results[0]) # 从队列中弹出优先级最高的请求并返回class LifoQueue(Base):"""Per-spider LIFO queue."""# 单个爬虫的后进先出队列def __len__(self):"""Return the length of the stack"""return self.server.llen(self.key) # 返回堆栈的长度def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request)) # 推送一个请求到堆栈顶部def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.blpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.lpop(self.key)if data:return self._decode_request(data) # 从堆栈顶部弹出一个请求并返回# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue
讲解:
在Scrapy Redis队列中,包含了三个自定义队列类,分别是FifoQueue、PriorityQueue和LifoQueue。
- FifoQueue:实现了单个爬虫的先进先出队列,使用Redis的列表(list)数据结构来存储请求。
- PriorityQueue:实现了单个爬虫的优先级队列,使用Redis的有序集合(sorted set)数据结构来存储请求,并按照请求的优先级进行排序。
- LifoQueue:实现了单个爬虫的后进先出队列,同样使用Redis的列表(list)数据结构来存储请求。
使用这些自定义队列类时,首先需要根据自己的需求选择合适的队列类型。然后,使用相应的类来实例化队列,并可以通过调用其方法来实现数据的推送和获取,如push方法用于推送请求,pop方法用于弹出请求。
例如,以下是使用FifoQueue的示例代码:
from scrapy_redis.queue import SpiderQueueclass MySpider(scrapy.Spider):# ...custom_queue_key = 'my_spider:fifo_queue'def __init__(self, name=None, **kwargs):super().__init__(name=name, **kwargs)self.queue = SpiderQueue(self.server, self, self.custom_queue_key)def start_requests(self):# 推送请求到FifoQueueself.queue.push(scrapy.Request(url='http://example.com/page1'))self.queue.push(scrapy.Request(url='http://example.com/page2'))self.queue.push(scrapy.Request(url='http://example.com/page3'))# 从FifoQueue中获取请求,并通过yield返回while True:request = self.queue.pop()if not request:breakyield request
通过实例化自定义队列类,例如SpiderQueue,将其与爬虫关联。然后,使用push
方法将请求推送到队列中,使用pop
方法从队列中获取请求。
请根据实际需求选择适合的自定义队列类,并实现相应的处理逻辑.
代码总结:
一个基类 (Base),定义了每个爬虫的 Redis 队列的基本行为。此基类用于派生子类,实现不同类型的队列,比如:
- 先进先出队列 (FifoQueue)、
- 优先级队列 (PriorityQueue)、
- 后进先出队列 (LifoQueue)。
每个子类继承 Base
类,并根据不同的类型实现相应的方法,包括 _encode_request
将请求对象编码成字符串,_decode_request
将编码的请求字符串解码成请求对象,__len__
返回队列的长度,push
将请求对象放入队列,pop
从队列中取出一个请求。此外,clear
方法用于清空队列。
在每个子类中,队列的实现会根据具体的数据结构进行。在 FifoQueue
中使用列表实现先进先出队列,PriorityQueue
使用 Redis 的有序集合实现优先级队列,LifoQueue
使用列表实现后进先出队列。
这个队列用于将爬虫的请求放入队列中,然后通过 Redis 进行存储和取出,确保请求的顺序和优先级。
如何用:
#setting里面设置这个,即可开启
SCHEDULER_QUEUE_CLASS ='scrapy_redis.queue.PriorityQueue'
自定义queue:
假设我们需要根据请求的优先级进行存储和处理。
from scrapy_redis.queue import Baseclass PriorityQueue(Base):"""自定义优先级队列"""def __len__(self):return self.server.zcard(self.key)def push(self, request, priority=0):"""按照优先级推送请求"""data = self._encode_request(request)score = -priority # 取负数以便按优先级降序排序self.server.zadd(self.key, {data: score})def pop(self):"""按照优先级弹出请求"""data = self.server.zrange(self.key, 0, 0)if data:self.server.zrem(self.key, data[0])return self._decode_request(data[0])# 在Spider中使用自定义队列
class MySpider(scrapy.Spider):# ...custom_queue_key = 'my_spider:priority_queue'def start_requests(self):# 创建自定义优先级队列queue = PriorityQueue(self.server, self, self.custom_queue_key)# 推送请求到自定义队列queue.push(scrapy.Request(url='http://example.com/page1'), priority=2)queue.push(scrapy.Request(url='http://example.com/page2'), priority=1)queue.push(scrapy.Request(url='http://example.com/page3'), priority=3)# 从自定义队列中获取请求while True:request = queue.pop()if not request:breakyield request
讲解:
- 创建了一个名为
PriorityQueue
的自定义优先级队列类。该类继承了Base
类,根据优先级对请求进行存储和处理。通过重写push
方法,将请求按照指定的优先级进行推送;通过重写pop
方法,按照优先级从队列中弹出请求。 - 在Spider中使用自定义队列时,先实例化
PriorityQueue
类,然后推送请求到队列中,并通过pop
方法从队列中获取请求。读者可以根据自己的需求,定义更多自定义队列类,并根据不同的存储和处理逻辑进行扩展。
ps:记得在setting里面,导入这个类:
要将自定义队列类用于Scrapy的配置(settings)中,需要在项目的settings.py文件中进行相应的设置。确保以下步骤:
1.在settings.py文件中导入自定义队列类:
from your_project.queue_file import PriorityQueue # 根据实际情况进行导入
2.设置SCHEDULER
属性为自定义的队列类:
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
3.设置SCHEDULER_QUEUE_CLASS
属性为自定义队列类的路径:
SCHEDULER_QUEUE_CLASS = 'your_project.queue_file.PriorityQueue' # 根据实际情况设置路径
这样,Scrapy将使用自定义队列类作为请求队列。确保将your_project
替换为项目名称,并根据项目结构和文件位置进行正确的设置。