当前位置: 代码迷 >> 综合 >> scrapy Scheduler
  详细解决方案

scrapy Scheduler

热度:3   发布时间:2023-12-25 09:34:44.0

源码

import os
import json
import logging
from os.path import join, existsfrom scrapy.utils.reqser import request_to_dict, request_from_dict
from scrapy.utils.misc import load_object
from scrapy.utils.job import job_dirlogger = logging.getLogger(__name__)class Scheduler(object):def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,logunser=False, stats=None, pqclass=None):self.df = dupefilterself.dqdir = self._dqdir(jobdir)self.pqclass = pqclassself.dqclass = dqclassself.mqclass = mqclassself.logunser = logunserself.stats = stats@classmethoddef from_crawler(cls, crawler):settings = crawler.settingsdupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])dupefilter = dupefilter_cls.from_settings(settings)pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS', settings.getbool('SCHEDULER_DEBUG'))return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)def has_pending_requests(self):return len(self) > 0def open(self, spider):self.spider = spiderself.mqs = self.pqclass(self._newmq)self.dqs = self._dq() if self.dqdir else Nonereturn self.df.open()def close(self, reason):if self.dqs:prios = self.dqs.close()with open(join(self.dqdir, 'active.json'), 'w') as f:json.dump(prios, f)return self.df.close(reason)def enqueue_request(self, request):if not request.dont_filter and self.df.request_seen(request):self.df.log(request, self.spider)return Falsedqok = self._dqpush(request)if dqok:self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)else:self._mqpush(request)self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)self.stats.inc_value('scheduler/enqueued', spider=self.spider)return Truedef next_request(self):request = self.mqs.pop()if request:self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)else:request = self._dqpop()if request:self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)if request:self.stats.inc_value('scheduler/dequeued', spider=self.spider)return requestdef __len__(self):return len(self.dqs) + len(self.mqs) if self.dqs else len(self.mqs)def _dqpush(self, request):if self.dqs is None:returntry:reqd = request_to_dict(request, self.spider)self.dqs.push(reqd, -request.priority)except ValueError as e:  # non serializable requestif self.logunser:msg = ("Unable to serialize request: %(request)s - reason:"" %(reason)s - no more unserializable requests will be"" logged (stats being collected)")logger.warning(msg, {'request': request, 'reason': e},exc_info=True, extra={'spider': self.spider})self.logunser = Falseself.stats.inc_value('scheduler/unserializable',spider=self.spider)returnelse:return Truedef _mqpush(self, request):self.mqs.push(request, -request.priority)def _dqpop(self):if self.dqs:d = self.dqs.pop()if d:return request_from_dict(d, self.spider)def _newmq(self, priority):return self.mqclass()def _newdq(self, priority):return self.dqclass(join(self.dqdir, 'p%s' % priority))def _dq(self):activef = join(self.dqdir, 'active.json')if exists(activef):with open(activef) as f:prios = json.load(f)else:prios = ()q = self.pqclass(self._newdq, startprios=prios)if q:logger.info("Resuming crawl (%(queuesize)d requests scheduled)",{'queuesize': len(q)}, extra={'spider': self.spider})return qdef _dqdir(self, jobdir):if jobdir:dqdir = join(jobdir, 'requests.queue')if not exists(dqdir):os.makedirs(dqdir)return dqdir
  相关解决方案