scrapy-redis 使 redis 不止保存 url(例如:json)

scrapy-redis 使 redis 不止保存 url:https://my.oschina.net/u/4382439/blog/3712637

Scrapy-redis 和 Scrapyd 用法详解:https://zhuanlan.zhihu.com/p/44564597

Scrapy-redis GitHub 地址:https://github.com/rmax/scrapy-redis

scrapy-redis中url队列类型的控制(zset、list):https://blog.csdn.net/ryuhfxz/article/details/85782467

先看 scrapy-redis 源码 ( spider.py ):

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpiderfrom . import connection, defaults
from .utils import bytes_to_strclass RedisMixin(object):"""Mixin class to implement reading urls from a redis queue."""redis_key = Noneredis_batch_size = Noneredis_encoding = None# Redis client placeholder.server = Nonedef start_requests(self):"""Returns a batch of start requests from redis."""return self.next_requests()def setup_redis(self, crawler=None):"""Setup redis connection and idle signal.This should be called after the spider has set its crawler object."""if self.server is not None:returnif crawler is None:# We allow optional crawler argument to keep backwards# compatibility.# XXX: Raise a deprecation warning.crawler = getattr(self, 'crawler', None)if crawler is None:raise ValueError("crawler is required")settings = crawler.settingsif self.redis_key is None:self.redis_key = settings.get('REDIS_START_URLS_KEY', defaults.START_URLS_KEY,)self.redis_key = self.redis_key % {'name': self.name}if not self.redis_key.strip():raise ValueError("redis_key must not be empty")if self.redis_batch_size is None:# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).self.redis_batch_size = settings.getint('REDIS_START_URLS_BATCH_SIZE',settings.getint('CONCURRENT_REQUESTS'),)try:self.redis_batch_size = int(self.redis_batch_size)except (TypeError, ValueError):raise ValueError("redis_batch_size must be an integer")if self.redis_encoding is None:self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)self.logger.info("Reading start URLs from redis key '%(redis_key)s' ""(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",self.__dict__)self.server = connection.from_settings(crawler.settings)# The idle signal is called when the spider has no requests left,# that's when we will schedule new requests from redis queuecrawler.signals.connect(self.spider_idle, signal=signals.spider_idle)def next_requests(self):"""Returns a request to be scheduled or none."""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:data = fetch_one(self.redis_key)if not data:# Queue empty.print('task is none')breakreq = self.make_request_from_data(data)if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)def make_request_from_data(self, data):"""Returns a Request instance from data coming from Redis.By default, ``data`` is an encoded URL. You can override this method toprovide your own message decoding.Parameters----------data : bytesMessage from redis."""url = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(url)def schedule_next_requests(self):"""Schedules a request if available"""# TODO: While there is capacity, schedule a batch of redis requests.for req in self.next_requests():self.crawler.engine.crawl(req, spider=self)def spider_idle(self):"""Schedules a request if available, otherwise waits."""# XXX: Handle a sentinel to close the spider.self.schedule_next_requests()raise DontCloseSpiderclass RedisSpider(RedisMixin, Spider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: False)Use SET operations to retrieve messages from the redis queue. If False,the messages are retrieve using the LPOP command.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return objclass RedisCrawlSpider(RedisMixin, CrawlSpider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: True)Use SET operations to retrieve messages from the redis queue.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return obj

方法 1:

仔细看完的话会发现:make_request_from_data(self, data) 这个方法是从 redis 中返回一个请求实例,默认是一个 url 接下来重写一下这个方法直接传入到 self.make_requests_from_url 一个 json 串就好了,在这个方法里面可以把这个串解析了请求 url 或者生产 url。代码如下

# -*- coding: utf-8 -*-from scrapy.http import Request
from scrapy_redis.utils import bytes_to_str
from scrapy_redis.spiders import RedisSpiderclass MySpider(RedisSpider):name = 'my_spider'redis_key = f'start_urls:{name}'def make_request_from_data(self, data):""":param data: params data bytes, Message from redis:return:"""company = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(company)def make_requests_from_url(self, company):data = eval(company)url = data["url"]headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 ""(KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36","Accept": "*/*"}return Request(url, self.parse, meta={"data": data}, dont_filter=True, headers=headers)def parse(self, response):passif __name__ == '__main__':from scrapy import cmdlinecmdline.execute('scrapy crawl my_spider'.split())pass

所以,只需要在重写  make_request_from_data 方法,然后在里面解析 data 即可。 

方法 2:重写 next_requests 方法

def next_requests(self):redis_ip = self.settings.get('REDIS_HOST')redis_port = self.settings.get('REDIS_PORT')redis_pw = self.settings.get('REDIS_PARAMS').get('password')redis_pool = redis.ConnectionPool(host=redis_ip, port=redis_port, password=redis_pw)self.redis_conn = redis.Redis(connection_pool=redis_pool)found = 0while found < self.redis_batch_size:data_raw = self.redis_conn.spop(self.redis_key)  # 从redis中取出内容if not data_raw:breakdata = json.loads(data_raw)  # 存入redis的内容是json,需要转化if "source_url" not in data:breakreq = scrapy.Request(url=data['source_url'],meta=data['meta'])  # 发出请求if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %s", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

示例代码( 为了演示,这个代码有点冗余,可以根据自己情况重写和删除 ):

# -*- coding: utf-8 -*-
# @Author  :
# @File    : simple_spider_base_class.py
# @Software: PyCharm
# @description : XXXimport json
import datetime
from abc import ABC
from scrapy_redis import defaults
from scrapy_redis.spiders import RedisSpider
from scrapy.utils.project import get_project_settingsclass SpiderBaseClass(RedisSpider, ABC):def __init__(self):super(SpiderBaseClass, self).__init__()self.temp = Noneself.__server_pipeline = Noneself.task_string = Nonedef __del__(self):passdef __get_server_pipeline(self):if not self.__server_pipeline:self.__server_pipeline = self.server.pipeline()return self.__server_pipelinedef __get_custom_redis_key(self):custom_redis_key = self.redis_key.split(':')[1]return custom_redis_keydef return_url(self, task_string=None):"""用来重写,返回值是一个 url:param task_string::return: url"""self.temp = Nonereturn 'http://www.examplewebsite.com'def redis_sort_set_pop(self, r_k=None):"""Pop a requesttimeout not support in this queue class"""t_k = r_k if r_k else self.redis_key# use atomic range/remove using multi/execpipe = self.__get_server_pipeline()pipe.multi()pipe.zrange(t_k, 0, 0).zremrangebyrank(t_k, 0, 0)results, count = pipe.execute()return results, countdef next_requests(self):"""Returns a request to be scheduled or none."""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:try:data = fetch_one(self.redis_key)except BaseException as e:results, count = self.redis_sort_set_pop()if len(results):task_string = results[0].decode('utf-8')  # 字节类型self.task_string = task_stringmsg = 'get task : {0}'.format(task_string)print(msg)self.logger.info(msg)task_dict = json.loads(task_string)if task_dict.get('spider_url', None):data = task_dict['spider_url']print('get url : {0}'.format(data))self.logger.info('get url : {0}'.format(data))else:data = self.return_url(task_string=task_string)if not data:# Queue empty.print('task is none')break# 添加时间标识# dict_data = collections.OrderedDict(dict_data)# dict_data['add_time'] = str(datetime.datetime.now())# task_string = json.dumps(dict_data, ensure_ascii=False)# try:#     self.server.zadd('mirror:{0}'.format(self.name), {task_string: row_score})# except BaseException as e:#     passreq = self.make_request_from_data(data)req.meta['task_string'] = self.task_stringreq.meta['start_time'] = str(datetime.datetime.now().replace(microsecond=0))if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)def __custom_get_task(self):row_str = Nonerow_score = Nonepipe = self.__get_server_pipeline()loop = Truewhile loop:try:# 升序temp = pipe.zrange(self.redis_key, 0, 0, withscores=True)[0]# temp = pipe.zrange(self.custom_redis_key, 0, 0, withscores=True)[0]# 降序# temp = pipe.zrevrange(self.redis_key, 0, 0, withscores=True)[0]row_value_score = pipe.execute()[0][0]  # <class 'tuple'>: (b'99', 99.0)row_str, row_score = row_value_scorepipe.watch(self.redis_key)pipe.multi()results = pipe.zrem(self.redis_key, row_str.decode('utf-8')).execute()# results = pipe.zrem(self.custom_redis_key, row_str.decode('utf-8')).execute()pipe.unwatch()if results[0]:  # results 是一个 list, 第一个元素代表成功与失败。1:成功。0:失败# 删除成功。退出循环loop = False  # 没有出现异常,说明不同线程或进程操作正确,跳出循环breakelse:# 删除失败,说明数据已经被别的进程取到并且删除,需要继续取任务和删除任务continueexcept WatchError as watch_error:# 释放锁,继续下次循环pipe.unwatch()continueexcept IndexError as i_e:# 当 redis 里面 没有任务时,再操作 redis 时报 IndexError 异常# 没有任务时,结束循环,令 row_str=Noneloop = Falserow_str = Noneexcept BaseException as e:row_str = Nonebreakreturn row_str, row_scoredef next_requests_old(self):"""Returns a request to be scheduled or none.:return:"""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)# fetch_one = self.server.spop if use_set else self.server.lpopall_settings = get_project_settings()flag = all_settings.getbool('REDIS_START_URLS_AS_SET_TAG')# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:if flag:row_str, row_score = self.__custom_get_task()else:fetch_one = self.server.spop if use_set else self.server.lpopjson_data = fetch_one(self.redis_key) if not flag else row_strsource = Noneif json_data:task_string = json_data.decode('utf-8')  # 字节类型self.task_string = task_stringself.logger.info('get task : {0}'.format(task_string))dict_data = json.loads(task_string)source = dict_data.get('source', None)task_id = dict_data.get('id', None)# LOG.INFO('get task is {0}'.format(self.task_string))if dict_data.get('spider_url', None):data = dict_data['spider_url']print('get url : {0}'.format(data))self.logger.info('get url : {0}'.format(data))else:data = self.return_url(task_string=task_string)else:data = None# LOG.INFO('get task is None')print('get task is None')self.logger.info('get task is None')if not data:# Queue empty.breakif source:try:filter_conditions = {'_id': ObjectId(task_id)}except BaseException as e:filter_conditions = {'_id': task_id}item = {"$set": {"status": 'running'}}db_template_name = all_settings.get('DB_TEMPLATE_NAME', None)if db_template_name:db_name = db_config.get(db_template_name).get('database', None)tb_name = db_config.get(db_template_name).get('echo_tb', None)is_success = self.wb.update_one(db_name=db_name, tb_name=tb_name,filter_conditions=filter_conditions, item=item)if 0 == is_success:self.logger.info('update status : running')else:self.logger.info('update status fail')# 添加时间标识# dict_data = collections.OrderedDict(dict_data)# dict_data['add_time'] = str(datetime.datetime.now())task_string = json.dumps(dict_data, ensure_ascii=False)# try:#     self.server.zadd('mirror:{0}'.format(self.name), {task_string: row_score})# except BaseException as e:#     passreq = self.make_request_from_data(data)req.meta['task_string'] = task_stringreq.meta['start_time'] = str(datetime.datetime.now().replace(microsecond=0))if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)if __name__ == '__main__':pass

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/495408.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

学界 | DeepMind等机构提出「图网络」:面向关系推理

来源&#xff1a;机器之心摘要&#xff1a;近日&#xff0c;由 DeepMind、谷歌大脑、MIT 和爱丁堡大学等公司和机构的 27 位科学家共同提交的论文《Relational inductive biases, deep learning, and graph networks》引起了人们的关注。深度学习虽然精于分类&#xff0c;但一直…

oracle 11g 下载地址

oracle 11g 下载地址&#xff1a;http://download.oracle.com/otn/nt/oracle11g/win32_11gR1_database.zip不需要注册&#xff0c;直接复制到迅雷或其他下载软件中即可下载。转载于:https://www.cnblogs.com/snake-hand/archive/2011/07/25/2452273.html

ARM 汇编基础教程番外篇 ——配置实验环境

From&#xff1a;https://zhuanlan.zhihu.com/p/29145513 win10 arm 汇编环境 Windows 平台下搭建 ARM 汇编集成环境&#xff1a;https://jingyan.baidu.com/article/4b52d70288bfcdfc5c774ba5.html 要调试 ARM 程序&#xff0c;我们需要&#xff1a; 能运行 ARM 程序的运行环…

asp.net网站编码测试

1.常规asp.net网站 a.使用GBK编码时 1)form提交方式 ie&#xff1a;地址栏显示GBK编码后的字符串&#xff0c;输出中文 fireforx&#xff1a;地址栏显示GBK编码后的字符串&#xff0c;输出中文 chrome&#xff1a;地址栏显示GBK编码后的字符串&#xff0c;输…

asp.net调试方法

1、先将网站设为启动项目。 2、选择“启动选项”。 3、进行设置&#xff1a; 然后调试&#xff0c;在浏览器输入网址&#xff0c;此时如果遇到“断点”程序将自动停止运行&#xff0c;即可进行调试&#xff0c;查看运行中的变量的值。 转载于:https://www.cnblogs.com/gwjtssy/…

基因对智力的预测能力不到7%,别迷信它

图片来源&#xff1a;The Conversation撰文 Carl Zimmer翻译 李杨审校 贾晓璇编辑 魏潇2016 年我在写一本关于遗传的书时&#xff0c;曾对自己的基因组进行了测序。一些科学家还好心地指出了我基因组图谱的一些有趣特征&#xff0c;教我如何自己读取数据。从那以后&#xff0c;…

ARM 汇编语言入门

[翻译]二进制漏洞利用&#xff08;二&#xff09;ARM32位汇编下的TCP Bind shell&#xff1a;https://bbs.pediy.com/thread-253511.htm ARM汇编语言入门 From&#xff1a;ARM汇编语言入门&#xff08;一&#xff09;&#xff1a;https://zhuanlan.zhihu.com/p/109057983 原文…

hdu 1250 Hat's Fibonacci

哈&#xff0c;原来大数用整型数组模拟更快&#xff0c;更容易用呀 一个位可以保存大数中的四个位甚至更多&#xff0c;而且效率极高&#xff0c;看完这方面的资料&#xff0c;真是后知后觉啊&#xff0c;自己写了一下代码&#xff0c;确实好写很多 #include<stdio.h> #i…

图灵奖演讲2018,59页PPT迎接芯片体系结构的新黄金时代

来源&#xff1a; 计算所控制计算实验室6月4日&#xff0c;在今年的国际计算机体系结构大会ISCA2018的图灵奖演讲会上&#xff0c;ACM/IEEE邀请了2017年图灵奖获得者John Hennessy与David Patterson联合进行了一场关于未来计算机体系结构发展道路探索的精彩演讲。二位图灵奖得主…

ARM 指令集 和 Thumb 指令集

From&#xff1a;https://gitee.com/lsliushuang/ASM/blob/master/arm汇编.txt ARM 汇编指令集汇总&#xff1a;https://blog.csdn.net/qq_40531974/article/details/83897559 ARM 指令集详解(超详细&#xff01;带实例!&#xff09;&#xff1a;https://blog.csdn.net/micke…

HDU 1033 水题

题意还真的不好懂&#xff0c;摸索数据就出来了&#xff0c;注意顺时针和逆时针时候的方向变化 /* * Author:lonelycatcher * problem:hdu 1033 * Type:水题 */#include<string.h>#include<iostream>#include<string>#include<stdio.h>#include<cst…

花旗银行将因人工智能裁员50%,失业风波究竟要持续多久

来源&#xff1a;ofweek摘要&#xff1a; 在这个人工智能随时可能会取代人类工作的年代里&#xff0c;人们对自己的工作的未来应该去了解和重视&#xff0c;尤其是数字类别的行业&#xff0c;像是会计、税务、收费站是最早被人工智能替代的一批。在这个人工智能随时可能会取代人…

Python 程序的抽样分析器 - Py-Spy

From&#xff1a;https://python.freelycode.com/contribution/detail/1320 GitHub 地址&#xff1a;https://github.com/benfred/py-spy Py-Spy 是 Python 程序的抽样分析器。 它允许您可视化 Python 程序正花费时间在哪部分&#xff0c;而无需重新启动程序或以任何方式修改代…

微观世界探索者:15家值得关注的纳米技术公司

来源&#xff1a;资本实验室摘要&#xff1a;纳米技术就像微观世界的魔术&#xff0c;让人类得以前所未有地深入到物质的分子与原子层面&#xff0c;探索生物、化学、物理等各领域的融合。资本实验室今日投资关注聚焦前沿科技创新与传统产业升级纳米技术就像微观世界的魔术&…

Python 异步 redis

现在的 Python 的异步 redis&#xff0c;有三种&#xff08; aredis 、aioredis、asynio_redis) aredis 、aioredis、asynio_redis 对比 From&#xff1a;https://zhuanlan.zhihu.com/p/24720629 aioredis 要求装上 hiredis &#xff0c; 而 aredis 可以不需要相关依赖地运行&…

AspNetPager 修改 然后返回当前页

昨天碰到一个问题需要修改了 然后然后当前选中页面 这个问题应该很简单 也就是传值问题 根据自己的需求而决定 在返回的时候数据是当前的数据但是不在选中的页号 纠结了很长时间 请求别人帮助终于发现问题 原来是 Pager.CurrentPageIndex没有设定 大家应该认为我是不是很马虎 …

DeepMind新论文:给侧面照片,AI给你脑补出正面

来源&#xff1a;澎湃新闻摘要&#xff1a;大家在学生时代可能都面对过这些涉及空间想象的几何题。从根本上&#xff0c;它们考验的是2D图像和3D场景间的转换能力。如今&#xff0c;人工智能也成功打破了这种“次元壁”。用小立方体搭一个几何体&#xff0c;使它的主视图和俯视…

安卓逆向 和 手游辅助 学习 路线

From&#xff1a;https://zhuanlan.zhihu.com/p/95915254 知乎&#xff1a;Android 逆向分析学习路线&#xff1f;&#xff1a;https://www.zhihu.com/question/25626303 入门篇 --- 学习Android安全和逆向开发的路线总结&#xff1a;https://www.52pojie.cn/thread-1065039-…

对 带头结点的单链表 的操作

//带头结点的单链表#include<iostream>using namespace std;typedef struct student{int data;struct student *next;}node;node * creat() //创建单链表{node *head,*p,*s;int x,cycle1;head(node *)malloc(sizeof(node));phead;while(cycle){…

可交互的对抗网络如何增强人类创造力?

编译&#xff1a;集智翻译组来源&#xff1a;distill.pub作者&#xff1a;Shan Carter&#xff0c;Michael Nielsen原题&#xff1a;Using Artificial Intelligence to Augment Human Intelligence摘要&#xff1a;计算机不仅可以是解决数学问题的工具&#xff0c;还可以是拥有…