爬虫工作量由小到大的思维转变---<第四十章 Scrapy Redis 的Queue问题>

前言:

对于scrapy-redis有一个特殊的地方,就是队列的进出关系,因为我们的url请求会从各个任务统一归纳到redis里面,因此,如何解决下载请求这个问题,也是scrapy-redis的一个关键点!!!

正文:

  1. 先讲解代码,讲它自带的3个队列方式; 
  2. 然后,再讲讲如何自定义队列...

原文翻译:

1.Base类
try:from scrapy.utils.request import request_from_dict
except ImportError:from scrapy.utils.reqser import request_to_dict, request_from_dictfrom . import picklecompatclass Base(object):"""Per-spider base queue class"""# 每个爬虫的基本队列类def __init__(self, server, spider, key, serializer=None):"""Initialize per-spider redis queue.初始化每个爬虫的Redis队列。Parameters----------server : StrictRedisRedis client instance. Redis客户端实例。spider : SpiderScrapy spider instance. Scrapy爬虫实例。key: strRedis key where to put and get messages. 在Redis中用于放置和获取消息的键。serializer : objectSerializer object with ``loads`` and ``dumps`` methods. 具有“loads”和“dumps”方法的序列化对象。"""if serializer is None:# Backward compatibility.# TODO: deprecate pickle.serializer = picklecompatif not hasattr(serializer, 'loads'):raise TypeError(f"serializer does not implement 'loads' function: {serializer}")if not hasattr(serializer, 'dumps'):raise TypeError(f"serializer does not implement 'dumps' function: {serializer}")self.server = serverself.spider = spiderself.key = key % {'spider': spider.name}self.serializer = serializerdef _encode_request(self, request):"""Encode a request object""""""将请求对象进行编码"""try:obj = request.to_dict(spider=self.spider)except AttributeError:obj = request_to_dict(request, self.spider)return self.serializer.dumps(obj)def _decode_request(self, encoded_request):"""Decode a request previously encoded""""""解码先前编码的请求"""obj = self.serializer.loads(encoded_request)return request_from_dict(obj, spider=self.spider)def __len__(self):"""Return the length of the queue""""""返回队列的长度"""raise NotImplementedErrordef push(self, request):"""Push a request""""""推送一个请求"""raise NotImplementedErrordef pop(self, timeout=0):"""Pop a request""""""弹出一个请求"""raise NotImplementedErrordef clear(self):"""Clear queue/stack""""""清除队列/堆栈"""self.server.delete(self.key)
2.三个自带的queue类
class FifoQueue(Base):"""Per-spider FIFO queue"""# 单个爬虫的先进先出队列def __len__(self):"""Return the length of the queue"""return self.server.llen(self.key)  # 返回队列的长度def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))  # 推送一个请求到队列头部def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.brpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.rpop(self.key)if data:return self._decode_request(data)  # 从队列尾部弹出一个请求并返回class PriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""# 单个爬虫的优先级队列,使用Redis的有序集合实现def __len__(self):"""Return the length of the queue"""return self.server.zcard(self.key)  # 返回队列的长度def push(self, request):"""Push a request"""data = self._encode_request(request)score = -request.priorityself.server.execute_command('ZADD', self.key, score, data)  # 根据请求的优先级将请求推送到队列中def pop(self, timeout=0):"""Pop a request"""pipe = self.server.pipeline()pipe.multi()pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)results, count = pipe.execute()  # 使用管道操作实现原子的范围取值和移除操作if results:return self._decode_request(results[0])  # 从队列中弹出优先级最高的请求并返回class LifoQueue(Base):"""Per-spider LIFO queue."""# 单个爬虫的后进先出队列def __len__(self):"""Return the length of the stack"""return self.server.llen(self.key)  # 返回堆栈的长度def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))  # 推送一个请求到堆栈顶部def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.blpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.lpop(self.key)if data:return self._decode_request(data)  # 从堆栈顶部弹出一个请求并返回# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue
SpiderStack = LifoQueue
SpiderPriorityQueue = PriorityQueue
讲解:

在Scrapy Redis队列中,包含了三个自定义队列类,分别是FifoQueue、PriorityQueue和LifoQueue

  • FifoQueue:实现了单个爬虫的先进先出队列,使用Redis的列表(list)数据结构来存储请求。
  • PriorityQueue:实现了单个爬虫的优先级队列,使用Redis的有序集合(sorted set)数据结构来存储请求,并按照请求的优先级进行排序
  • LifoQueue:实现了单个爬虫的后进先出队列,同样使用Redis的列表(list)数据结构来存储请求。

使用这些自定义队列类时,首先需要根据自己的需求选择合适的队列类型。然后,使用相应的类来实例化队列,并可以通过调用其方法来实现数据的推送和获取,如push方法用于推送请求,pop方法用于弹出请求。

例如,以下是使用FifoQueue的示例代码:
from scrapy_redis.queue import SpiderQueueclass MySpider(scrapy.Spider):# ...custom_queue_key = 'my_spider:fifo_queue'def __init__(self, name=None, **kwargs):super().__init__(name=name, **kwargs)self.queue = SpiderQueue(self.server, self, self.custom_queue_key)def start_requests(self):# 推送请求到FifoQueueself.queue.push(scrapy.Request(url='http://example.com/page1'))self.queue.push(scrapy.Request(url='http://example.com/page2'))self.queue.push(scrapy.Request(url='http://example.com/page3'))# 从FifoQueue中获取请求,并通过yield返回while True:request = self.queue.pop()if not request:breakyield request

通过实例化自定义队列类,例如SpiderQueue,将其与爬虫关联。然后,使用push方法将请求推送到队列中,使用pop方法从队列中获取请求。

请根据实际需求选择适合的自定义队列类,并实现相应的处理逻辑.

代码总结:

一个基类 (Base),定义了每个爬虫的 Redis 队列的基本行为。此基类用于派生子类,实现不同类型的队列,比如:

  1. 先进先出队列 (FifoQueue)
  2. 优先级队列 (PriorityQueue)
  3. 后进先出队列 (LifoQueue)

每个子类继承 Base 类,并根据不同的类型实现相应的方法,包括 _encode_request 将请求对象编码成字符串,_decode_request 将编码的请求字符串解码成请求对象,__len__ 返回队列的长度,push 将请求对象放入队列,pop 从队列中取出一个请求。此外,clear 方法用于清空队列。

在每个子类中,队列的实现会根据具体的数据结构进行。在 FifoQueue 中使用列表实现先进先出队列,PriorityQueue 使用 Redis 的有序集合实现优先级队列,LifoQueue 使用列表实现后进先出队列。

这个队列用于将爬虫的请求放入队列中,然后通过 Redis 进行存储和取出,确保请求的顺序和优先级。

如何用:
#setting里面设置这个,即可开启
SCHEDULER_QUEUE_CLASS ='scrapy_redis.queue.PriorityQueue'

自定义queue:

假设我们需要根据请求的优先级进行存储和处理。

from scrapy_redis.queue import Baseclass PriorityQueue(Base):"""自定义优先级队列"""def __len__(self):return self.server.zcard(self.key)def push(self, request, priority=0):"""按照优先级推送请求"""data = self._encode_request(request)score = -priority  # 取负数以便按优先级降序排序self.server.zadd(self.key, {data: score})def pop(self):"""按照优先级弹出请求"""data = self.server.zrange(self.key, 0, 0)if data:self.server.zrem(self.key, data[0])return self._decode_request(data[0])# 在Spider中使用自定义队列
class MySpider(scrapy.Spider):# ...custom_queue_key = 'my_spider:priority_queue'def start_requests(self):# 创建自定义优先级队列queue = PriorityQueue(self.server, self, self.custom_queue_key)# 推送请求到自定义队列queue.push(scrapy.Request(url='http://example.com/page1'), priority=2)queue.push(scrapy.Request(url='http://example.com/page2'), priority=1)queue.push(scrapy.Request(url='http://example.com/page3'), priority=3)# 从自定义队列中获取请求while True:request = queue.pop()if not request:breakyield request
讲解:
  1. 创建了一个名为PriorityQueue的自定义优先级队列类。该类继承了Base类,根据优先级对请求进行存储和处理。通过重写push方法,将请求按照指定的优先级进行推送;通过重写pop方法,按照优先级从队列中弹出请求。
  2. 在Spider中使用自定义队列时,先实例化PriorityQueue类,然后推送请求到队列中,并通过pop方法从队列中获取请求。读者可以根据自己的需求,定义更多自定义队列类,并根据不同的存储和处理逻辑进行扩展。
ps:记得在setting里面,导入这个类:

要将自定义队列类用于Scrapy的配置(settings)中,需要在项目的settings.py文件中进行相应的设置。确保以下步骤:

1.在settings.py文件中导入自定义队列类:
from your_project.queue_file import PriorityQueue  # 根据实际情况进行导入
2.设置SCHEDULER属性为自定义的队列类:
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
3.设置SCHEDULER_QUEUE_CLASS属性为自定义队列类的路径:
SCHEDULER_QUEUE_CLASS = 'your_project.queue_file.PriorityQueue'  # 根据实际情况设置路径

这样,Scrapy将使用自定义队列类作为请求队列。确保将your_project替换为项目名称,并根据项目结构和文件位置进行正确的设置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/664351.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

<网络安全>《14 日志审计系统》

1 概念 日志审计系统是用于全面收集企业IT系统中常见的安全设备、网络设备、数据库、服务器、应用系统、主机等设备所产生的日志(包括运行、告警、操作、消息、状态等)并进行存储、监控、审计、分析、报警、响应和报告的系统。 日志审计系统是一种用于…

投资更好的管理会计系统,探索全面预算管理的奥秘

目前,我国财政体制正值如火如荼的调整阶段,各级政府和部门响应国家号召,旨在加强管理会计系统建设,制定具有先导性和科学性的现代化全面预算管理制度,从而将我国财力推向一个新高度。其中,基于服务或产品的…

工程师 - headless模式

headless 英文释义: 在没有用户界面的情况下运行;具体地说,在没有显示器、键盘和鼠标的情况下运行。 Running without a user interface; specifically, running without a monitor, keyboard, and mouse. 说明 所谓的“无头系统”&#x…

【PostgreSQL灵活使用psql执行SQL的一些方式】

一、psql执行SQL并使用选项灵活输出结果 可以不进入数据库,在命令行,使用psql 的-c选项跟上需要执行的SQL。来获取SQL的执行结果 postgresubuntu-linux-22-04-desktop:~$ psql -c "select 1,2" ?column? | ?column? -------------------…

Swift Vapor 教程(CURD 操作)

接上篇使用 Swift Vapor 对数据库进行简单的操作。 下面会使用一个稍微简单的方式进行 CURD 操作 import Fluent import Vaporstruct SongController: RouteCollection {func boot(routes: Vapor.RoutesBuilder) throws {let songs routes.grouped("songs")// GET…

tengine ngx_http_upstream_dynamic_module 动态域名解析功能的代码详细解析

tengine ngx_http_upstream_dynamic_module 动态域名解析功能的代码详细解析 1. 为什么需要域名动态解析2. 配置指令3. 加载模块3. 源码分析3.1 指令解析3.2 upstream负载均衡算法的初始化3.3 upstream负载均衡上下文的初始化3.4 获取upstream的服务器地址3.5 域名解析回调处理…

【Boost】:parser代码的基本结构(二)

parser代码的基本结构 一.总体概述二. EumeFile的实现三.ParserHtml的实现四.SaveHtml实现五.完整源代码 打开parser.cc,用vscode或者vim都行。 一.总体概述 首先递归式的把文件名和路径读入一个数组内,接着把数组内的每一个数据按照一定的格式进行划分,…

创建型模式-单例模式:定义、实现及应用

目录 一、模式定义二、针对问题1.解决的问题2.解决方案3.举个例子4.设计模式适合场景5.实现方式6.优缺点7.与其他模式的关系 三、代码实现 一、模式定义 单例模式(Singleton Pattern)是一种创建型模式,用于限制某个类只能创建一个对象。它提…

大数据信用报告查询费用一般要多少钱?

一些不少朋友在申贷的时候被拒贷之后,得到的原因就是因为大数据不良被拒,这时候很多人都反过来查询自己的大数据信用报告,而查询的价格也是不少朋友都比较关注的,那大数据信用报告查询费用一般要多少钱呢?下面本文就为你介绍一下…

码农也得“开口说话”

咱们程序员兄弟们有时候被大家贴上了“闷葫芦”的标签,好像我们只适合跟电脑谈恋爱,不爱搭理人似的。不过今儿咱要说的是,码农界的大神可不只是会敲代码那么简单,会聊天、懂合作那也是必不可少的生存法则! 一、内向也…

vue3中如何实现图片的压缩

首先,为什么需要进行图片压缩: 减少页面加载时间:因为图片是页面中常见的资源之一,较大的图片会增加页面的加载时间,影响用户体验,压缩图片可以减小图片的文件大小,提升页面加载速度。节省网络…

App ICP备案获取iOS和Android的公钥和证书指纹

依照《工业和信息化部关于开展移动互联网应用程序备案工作的通知》,向iOS和安卓平台提交App时需要先提交ICP备案信息。 iOS平台: 1、下载appuploader工具:Appuploader home -- A tool improve ios develop efficiency such as submit ipa to…

vue yarn certificate has expired

背景:我在用ant design pro框架进行初始化时,安装脚手架时,安装yarn时显示报错 原因分析:查了很久的资料,这种情况应该是开了服务器代理访问导致ssl安全证书失效了 解决办法: 在终端输入:yarn…

【MybatisPlus篇】查询条件设置(范围匹配 | 模糊匹配 | 空判定 | 包含性判定 | 分组 | 排序)

文章目录 🎄环境准备⭐导入依赖⭐写入User类⭐配置启动类⭐创建UserDao 的 MyBatis Mapper 接口,用于定义数据库访问操作⭐创建配置文件🛸创建测试类MpATest.java 🍔范围查询⭐eq⭐between⭐gt 🍔模糊匹配⭐like &…

使用ngrok内网穿透

没有服务器和公网IP,想要其他人访问自己做好的网站,使用这款简单免费的内网穿透小工具——ngrok,有了它轻松让别人访问你的项目~ 一、下载ngrok 官网地址:ngrok | Unified Application Delivery Platform for Developers&#x…

Redis(十一)单线程VS多线程

文章目录 概述为何选择单线程主要性能瓶颈多线程特性和IO多路复用概述Unix网络编程中的五种IO模型Blocking IO-阻塞IONoneBlocking IO-非阻塞IOIO multiplexing-IO多路复用signal driven IO-信号驱动IOasynchronous IO-异步IO 场景:引出epoll总结 开启Redis多线程其…

2022美国大学生数学建模(优秀获奖论文)-A题:Power Planning Model: Magic Weapon for Cyclists

目录 Summary 1 Introduction 1.1 Background 1.2 Restatement of the Problem 1.3 Our Work 2 Assumptions and Justifification 3 Notations

ADB的配置和使用及刷机root

ADB的配置和使用 ADB即Android Debug Bridge,安卓调试桥,是谷歌为安卓开发者提供的开发工具之一,可以让你的电脑以指令窗口的方式控制手机。可以在安卓开发者网页中的 SDK 平台工具页面下直接下载对应系统的 adb 配置文件,大小只…

115.工业相机海康SDK开发指南(阅读)

一、SDK初始化 包含初始化SDK和反初始化SDK接口。(由于看不到函数内部的具体实现,因此以下的解释仅代表个人的理解) 函数说明 MV_CAMCTRL_API int __stdcall MV_CC_Initialize()//初始化SDK 初始化SDK 成功,返回MV_OK&#xff1b…

Qt-互斥量-临界区-QMutex-QMutexLocker-QReadWriteLock

文章目录 1.QMutex2.QMutexLocker3.QReadWriteLock 在Qt中,互斥量(Mutex)是用于同步多线程访问共享资源的一种机制。临界区(Critical Section)是指一段必须由单个线程执行的代码区域,防止多个线程同时执行这…