scrapy-redis 使 redis 不止保存 url:https://my.oschina.net/u/4382439/blog/3712637
Scrapy-redis 和 Scrapyd 用法详解:https://zhuanlan.zhihu.com/p/44564597
Scrapy-redis GitHub 地址:https://github.com/rmax/scrapy-redis
scrapy-redis中url队列类型的控制(zset、list):https://blog.csdn.net/ryuhfxz/article/details/85782467
先看 scrapy-redis 源码 ( spider.py ):
from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpiderfrom . import connection, defaults
from .utils import bytes_to_strclass RedisMixin(object):"""Mixin class to implement reading urls from a redis queue."""redis_key = Noneredis_batch_size = Noneredis_encoding = None# Redis client placeholder.server = Nonedef start_requests(self):"""Returns a batch of start requests from redis."""return self.next_requests()def setup_redis(self, crawler=None):"""Setup redis connection and idle signal.This should be called after the spider has set its crawler object."""if self.server is not None:returnif crawler is None:# We allow optional crawler argument to keep backwards# compatibility.# XXX: Raise a deprecation warning.crawler = getattr(self, 'crawler', None)if crawler is None:raise ValueError("crawler is required")settings = crawler.settingsif self.redis_key is None:self.redis_key = settings.get('REDIS_START_URLS_KEY', defaults.START_URLS_KEY,)self.redis_key = self.redis_key % {'name': self.name}if not self.redis_key.strip():raise ValueError("redis_key must not be empty")if self.redis_batch_size is None:# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).self.redis_batch_size = settings.getint('REDIS_START_URLS_BATCH_SIZE',settings.getint('CONCURRENT_REQUESTS'),)try:self.redis_batch_size = int(self.redis_batch_size)except (TypeError, ValueError):raise ValueError("redis_batch_size must be an integer")if self.redis_encoding is None:self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)self.logger.info("Reading start URLs from redis key '%(redis_key)s' ""(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",self.__dict__)self.server = connection.from_settings(crawler.settings)# The idle signal is called when the spider has no requests left,# that's when we will schedule new requests from redis queuecrawler.signals.connect(self.spider_idle, signal=signals.spider_idle)def next_requests(self):"""Returns a request to be scheduled or none."""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:data = fetch_one(self.redis_key)if not data:# Queue empty.print('task is none')breakreq = self.make_request_from_data(data)if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)def make_request_from_data(self, data):"""Returns a Request instance from data coming from Redis.By default, ``data`` is an encoded URL. You can override this method toprovide your own message decoding.Parameters----------data : bytesMessage from redis."""url = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(url)def schedule_next_requests(self):"""Schedules a request if available"""# TODO: While there is capacity, schedule a batch of redis requests.for req in self.next_requests():self.crawler.engine.crawl(req, spider=self)def spider_idle(self):"""Schedules a request if available, otherwise waits."""# XXX: Handle a sentinel to close the spider.self.schedule_next_requests()raise DontCloseSpiderclass RedisSpider(RedisMixin, Spider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: False)Use SET operations to retrieve messages from the redis queue. If False,the messages are retrieve using the LPOP command.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return objclass RedisCrawlSpider(RedisMixin, CrawlSpider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: True)Use SET operations to retrieve messages from the redis queue.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return obj
方法 1:
仔细看完的话会发现:make_request_from_data(self, data) 这个方法是从 redis 中返回一个请求实例,默认是一个 url 接下来重写一下这个方法直接传入到 self.make_requests_from_url 一个 json 串就好了,在这个方法里面可以把这个串解析了请求 url 或者生产 url。代码如下
# -*- coding: utf-8 -*-from scrapy.http import Request
from scrapy_redis.utils import bytes_to_str
from scrapy_redis.spiders import RedisSpiderclass MySpider(RedisSpider):name = 'my_spider'redis_key = f'start_urls:{name}'def make_request_from_data(self, data):""":param data: params data bytes, Message from redis:return:"""company = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(company)def make_requests_from_url(self, company):data = eval(company)url = data["url"]headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 ""(KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36","Accept": "*/*"}return Request(url, self.parse, meta={"data": data}, dont_filter=True, headers=headers)def parse(self, response):passif __name__ == '__main__':from scrapy import cmdlinecmdline.execute('scrapy crawl my_spider'.split())pass
所以,只需要在重写 make_request_from_data 方法,然后在里面解析 data 即可。
方法 2:重写 next_requests 方法
def next_requests(self):redis_ip = self.settings.get('REDIS_HOST')redis_port = self.settings.get('REDIS_PORT')redis_pw = self.settings.get('REDIS_PARAMS').get('password')redis_pool = redis.ConnectionPool(host=redis_ip, port=redis_port, password=redis_pw)self.redis_conn = redis.Redis(connection_pool=redis_pool)found = 0while found < self.redis_batch_size:data_raw = self.redis_conn.spop(self.redis_key) # 从redis中取出内容if not data_raw:breakdata = json.loads(data_raw) # 存入redis的内容是json,需要转化if "source_url" not in data:breakreq = scrapy.Request(url=data['source_url'],meta=data['meta']) # 发出请求if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %s", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
示例代码( 为了演示,这个代码有点冗余,可以根据自己情况重写和删除 ):
# -*- coding: utf-8 -*-
# @Author :
# @File : simple_spider_base_class.py
# @Software: PyCharm
# @description : XXXimport json
import datetime
from abc import ABC
from scrapy_redis import defaults
from scrapy_redis.spiders import RedisSpider
from scrapy.utils.project import get_project_settingsclass SpiderBaseClass(RedisSpider, ABC):def __init__(self):super(SpiderBaseClass, self).__init__()self.temp = Noneself.__server_pipeline = Noneself.task_string = Nonedef __del__(self):passdef __get_server_pipeline(self):if not self.__server_pipeline:self.__server_pipeline = self.server.pipeline()return self.__server_pipelinedef __get_custom_redis_key(self):custom_redis_key = self.redis_key.split(':')[1]return custom_redis_keydef return_url(self, task_string=None):"""用来重写,返回值是一个 url:param task_string::return: url"""self.temp = Nonereturn 'http://www.examplewebsite.com'def redis_sort_set_pop(self, r_k=None):"""Pop a requesttimeout not support in this queue class"""t_k = r_k if r_k else self.redis_key# use atomic range/remove using multi/execpipe = self.__get_server_pipeline()pipe.multi()pipe.zrange(t_k, 0, 0).zremrangebyrank(t_k, 0, 0)results, count = pipe.execute()return results, countdef next_requests(self):"""Returns a request to be scheduled or none."""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:try:data = fetch_one(self.redis_key)except BaseException as e:results, count = self.redis_sort_set_pop()if len(results):task_string = results[0].decode('utf-8') # 字节类型self.task_string = task_stringmsg = 'get task : {0}'.format(task_string)print(msg)self.logger.info(msg)task_dict = json.loads(task_string)if task_dict.get('spider_url', None):data = task_dict['spider_url']print('get url : {0}'.format(data))self.logger.info('get url : {0}'.format(data))else:data = self.return_url(task_string=task_string)if not data:# Queue empty.print('task is none')break# 添加时间标识# dict_data = collections.OrderedDict(dict_data)# dict_data['add_time'] = str(datetime.datetime.now())# task_string = json.dumps(dict_data, ensure_ascii=False)# try:# self.server.zadd('mirror:{0}'.format(self.name), {task_string: row_score})# except BaseException as e:# passreq = self.make_request_from_data(data)req.meta['task_string'] = self.task_stringreq.meta['start_time'] = str(datetime.datetime.now().replace(microsecond=0))if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)def __custom_get_task(self):row_str = Nonerow_score = Nonepipe = self.__get_server_pipeline()loop = Truewhile loop:try:# 升序temp = pipe.zrange(self.redis_key, 0, 0, withscores=True)[0]# temp = pipe.zrange(self.custom_redis_key, 0, 0, withscores=True)[0]# 降序# temp = pipe.zrevrange(self.redis_key, 0, 0, withscores=True)[0]row_value_score = pipe.execute()[0][0] # <class 'tuple'>: (b'99', 99.0)row_str, row_score = row_value_scorepipe.watch(self.redis_key)pipe.multi()results = pipe.zrem(self.redis_key, row_str.decode('utf-8')).execute()# results = pipe.zrem(self.custom_redis_key, row_str.decode('utf-8')).execute()pipe.unwatch()if results[0]: # results 是一个 list, 第一个元素代表成功与失败。1:成功。0:失败# 删除成功。退出循环loop = False # 没有出现异常,说明不同线程或进程操作正确,跳出循环breakelse:# 删除失败,说明数据已经被别的进程取到并且删除,需要继续取任务和删除任务continueexcept WatchError as watch_error:# 释放锁,继续下次循环pipe.unwatch()continueexcept IndexError as i_e:# 当 redis 里面 没有任务时,再操作 redis 时报 IndexError 异常# 没有任务时,结束循环,令 row_str=Noneloop = Falserow_str = Noneexcept BaseException as e:row_str = Nonebreakreturn row_str, row_scoredef next_requests_old(self):"""Returns a request to be scheduled or none.:return:"""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)# fetch_one = self.server.spop if use_set else self.server.lpopall_settings = get_project_settings()flag = all_settings.getbool('REDIS_START_URLS_AS_SET_TAG')# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:if flag:row_str, row_score = self.__custom_get_task()else:fetch_one = self.server.spop if use_set else self.server.lpopjson_data = fetch_one(self.redis_key) if not flag else row_strsource = Noneif json_data:task_string = json_data.decode('utf-8') # 字节类型self.task_string = task_stringself.logger.info('get task : {0}'.format(task_string))dict_data = json.loads(task_string)source = dict_data.get('source', None)task_id = dict_data.get('id', None)# LOG.INFO('get task is {0}'.format(self.task_string))if dict_data.get('spider_url', None):data = dict_data['spider_url']print('get url : {0}'.format(data))self.logger.info('get url : {0}'.format(data))else:data = self.return_url(task_string=task_string)else:data = None# LOG.INFO('get task is None')print('get task is None')self.logger.info('get task is None')if not data:# Queue empty.breakif source:try:filter_conditions = {'_id': ObjectId(task_id)}except BaseException as e:filter_conditions = {'_id': task_id}item = {"$set": {"status": 'running'}}db_template_name = all_settings.get('DB_TEMPLATE_NAME', None)if db_template_name:db_name = db_config.get(db_template_name).get('database', None)tb_name = db_config.get(db_template_name).get('echo_tb', None)is_success = self.wb.update_one(db_name=db_name, tb_name=tb_name,filter_conditions=filter_conditions, item=item)if 0 == is_success:self.logger.info('update status : running')else:self.logger.info('update status fail')# 添加时间标识# dict_data = collections.OrderedDict(dict_data)# dict_data['add_time'] = str(datetime.datetime.now())task_string = json.dumps(dict_data, ensure_ascii=False)# try:# self.server.zadd('mirror:{0}'.format(self.name), {task_string: row_score})# except BaseException as e:# passreq = self.make_request_from_data(data)req.meta['task_string'] = task_stringreq.meta['start_time'] = str(datetime.datetime.now().replace(microsecond=0))if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)if __name__ == '__main__':pass