2.2. 抓取入口

2.2.1. 创建样例项目

可以参考 官方入门教程

下面是我创建的一个 demo

2.2.2. scrapy命令从何而来

通常情况下我们是通过 scrapy crawl quotes 命令来执行抓取。

scrapy crawl quotes

这个scrapy命令是从哪里来的呢?

which scrapy
/Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/scrapy
# 查看类型,如果二进制的话,就没法查看。确定是文本类型
file /Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/scrapy
/Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/scrapy: a /Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/python script text executable, ASCII text

我们使用cat命令,或者ide打开这个文件,内容如下。

#!/Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from scrapy.cmdline import execute
if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
    sys.exit(execute())

可以看到核心是执行了 execute 方法。

下面为了调试方便,我们在我们的scrapy demo 项目下创建的一个项目目录下,创建一个debug.py文件,内容如下。

from scrapy.cmdline import execute
if __name__ == '__main__':
   execute(['scrapy', 'crawl', 'quotes'])
   #execute()

借助IDE的调试功能,我们可以通过打断点来高效分析代码执行流程。

../_images/scrapy_debug.png

2.2.3. 运行入口

从上面我们知道,scrapy命令执行了 execute 方法。这个方法来自scrapy.cmdline execute方法。

# scrapy/scrapy/cmdline.py
def execute(argv=None, settings=None):
    # 如果argv为空的话使用终端传参。
    if argv is None:
        argv = sys.argv

    # 核心1: 填充工程设置。
    if settings is None:
        settings = get_project_settings()
        # set EDITOR from environment if available
        try:
            editor = os.environ["EDITOR"]
        except KeyError:
            pass
        else:
            settings["EDITOR"] = editor

    # 核心2: 判断是否在工程目录下。
    inproject = inside_project()
    # 非核心, 下面一大坨可以先跳过了。
    # 主要是命令填充, 根据参数获取要执行的命令,填充终端参数,优先级设置为命令级别。
    cmds = _get_commands_dict(settings, inproject)
    cmdname = _pop_command_name(argv)
    if not cmdname:
        _print_commands(settings, inproject)
        sys.exit(0)
    elif cmdname not in cmds:
        _print_unknown_command(settings, cmdname, inproject)
        sys.exit(2)

    cmd = cmds[cmdname]
    parser = ScrapyArgumentParser(
        formatter_class=ScrapyHelpFormatter,
        usage=f"scrapy {cmdname} {cmd.syntax()}",
        conflict_handler="resolve",
        description=cmd.long_desc(),
    )
    settings.setdict(cmd.default_settings, priority="command")
    cmd.settings = settings
    cmd.add_options(parser)
    opts, args = parser.parse_known_args(args=argv[1:])
    _run_print_help(parser, cmd.process_options, args, opts)

    # 核心3,创建爬虫进程。
    cmd.crawler_process = CrawlerProcess(settings)
    # 核心4, 执行cmd.run()方法。
    _run_print_help(parser, _run_command, cmd, args, opts)
    sys.exit(cmd.exitcode)

这个方法主要做了以下几件事。 1. 通过get_project_settings获取项目配置。 2. 通过inside_project判断是否在项目目录下。 3. 创建了CrawlerProcess对象。 4. 执行了cmd.run方法。

下面我们主要分析一下几个核心方法。

2.2.4. get_project_settings

# scrapy/scrapy/utils/project.py
def get_project_settings() -> Settings:
    #  如果环境变量没有SCRAPY_SETTINGS_MODULE,则使用默认的default
    if ENVVAR not in os.environ:
        project = os.environ.get("SCRAPY_PROJECT", "default")
        # 核心: 初始化工作,完成scrapy.cfg文件找取,
        # 以及从scrapy.cfg里面找到工程设置文件应该读取哪个。
        init_env(project)

    # 构造一个setting实例,这个创建的实例通过init函数会读取scrapy/settings/defaut_settings.py里面的配置加载一个default优先级的配置。
    settings = Settings()
    settings_module_path = os.environ.get(ENVVAR)
    # 如果有用户配置,则覆盖默认配置,优先级为工程级别
    if settings_module_path:
        settings.setmodule(settings_module_path, priority="project")

    # 如果环境变量有scrapy相关的(Scrapy_开头的)变量,则覆盖默认配置,优先级为工程级别。
    valid_envvars = {
        "CHECK",
        "PROJECT",
        "PYTHON_SHELL",
        "SETTINGS_MODULE",
    }

    scrapy_envvars = {
        k[7:]: v
        for k, v in os.environ.items()
        if k.startswith("SCRAPY_") and k.replace("SCRAPY_", "") in valid_envvars
    }

    settings.setdict(scrapy_envvars, priority="project")

    return settings

2.2.4.1. 我们看下init_env方法做了什么?

# scrapy/scrapy/utils/conf.py

# 从当前目录开始,查找scrapy.cfg文件,然后递归向上查找,直到找到scrapy.cfg文件,如果找不到返回空字符串
def closest_scrapy_cfg(
    path: Union[str, os.PathLike] = ".",
    prevpath: Optional[Union[str, os.PathLike]] = None,
) -> str:
    """Return the path to the closest scrapy.cfg file by traversing the current
    directory and its parents
    """
    if prevpath is not None and str(path) == str(prevpath):
        return ""
    path = Path(path).resolve()
    cfgfile = path / "scrapy.cfg"
    if cfgfile.exists():
        return str(cfgfile)
    return closest_scrapy_cfg(path.parent, path)


def init_env(project: str = "default", set_syspath: bool = True) -> None:
    """Initialize environment to use command-line tool from inside a project
    dir. This sets the Scrapy settings module and modifies the Python path to
    be able to locate the project module.
    """
    # 这个方法会获取配置文件, 如果能从当前目录向上获取到scrapy.cfg文件,则使用scrapy.cfg文件中的配置,
    # 否则会逐步使用~/scrapy.cfg文件中的配置,/etc/scrapy.cfg文件中的配置,c:\scrapy\scrapy.cfg文件中的配置

    cfg = get_config()
    if cfg.has_option("settings", project):
        # 从工程的.scrapy.cfg 获取setting文件位置。 default = tutorial.settings
        # 将这个值,注入到环境变量里面, 方便后续的流程获取这个环境变量。
        os.environ["SCRAPY_SETTINGS_MODULE"] = cfg.get("settings", project)

    #从当前目录开始,查找scrapy.cfg文件,然后递归向上查找,直到找到scrapy.cfg文件,如果找不到返回空字符串
    closest = closest_scrapy_cfg()
    if closest:
        # 获取工程目录(目录有.scrapy.cfg文件的目录),添加到sys.path中。 
        projdir = str(Path(closest).parent)
        if set_syspath and projdir not in sys.path:
            sys.path.append(projdir)


def get_config(use_closest: bool = True) -> ConfigParser:
    """Get Scrapy config file as a ConfigParser"""
    sources = get_sources(use_closest)
    cfg = ConfigParser()
    # config parser 直接读取所有文件, config parser 会自动合并相同key的配置,如果key相同,优先读取的文件,会被后面的文件覆盖。
    # 这也能解释通,我们工程目录下,scrapy.cfg文件中的配置,会覆盖/etc/scrapy.cfg或者~/.scrapy.cfg文件中的配置,
    cfg.read(sources)
    return cfg


def get_sources(use_closest: bool = True) -> List[str]:
    # 配置文件是好几个的,获取所有可能得配置文件路径,返回一个列表。
    xdg_config_home = (
        os.environ.get("XDG_CONFIG_HOME") or Path("~/.config").expanduser()
    )
    sources = [
        "/etc/scrapy.cfg",
        r"c:\scrapy\scrapy.cfg",
        str(Path(xdg_config_home) / "scrapy.cfg"),
        str(Path("~/.scrapy.cfg").expanduser()),
    ]
    if use_closest:
        sources.append(closest_scrapy_cfg())
    return sources

../_images/cfg.png

可以看到init_env 函数完成如下功能。 1. 查找scrapy.cfg相关文件。 2. 从scrapy.cfg文件中获取爬虫setting文件位置。 3. 将工程设置文件位置添加到环境变量中(SCRAPY_SETTINGS_MODULE)。

2.2.4.2. Settings初始化

乍一看,还一会创建一个完全空的settings呢,其实看看init函数,发现不简单。 其实是通过default_setting.py的配置创建一个默认的setting实例,然后根据工程settings.py文件更新配置。

# scrapy/scrapy/settings/__init__.py
class Settings(BaseSettings):
    """
    This object stores Scrapy settings for the configuration of internal
    components, and can be used for any further customization.

    It is a direct subclass and supports all methods of
    :class:`~scrapy.settings.BaseSettings`. Additionally, after instantiation
    of this class, the new object will have the global default settings
    described on :ref:`topics-settings-ref` already populated.
    """

    def __init__(
        self, values: _SettingsInputT = None, priority: Union[int, str] = "project"
    ):
        # Do not pass kwarg values here. We don't want to promote user-defined
        # dicts, and we want to update, not replace, default dicts with the
        # values given by the user
        
        super().__init__()
        self.setmodule(default_settings, "default")
        # Promote default dictionaries to BaseSettings instances for per-key
        # priorities
        for name, val in self.items():
            if isinstance(val, dict):
                self.set(name, BaseSettings(val, "default"), "default")
        self.update(values, priority)

settings/default_settings.py 配置文件里面写了大量配置,其中包含了大量的类,这些给scrapy的扩展提供了很大的灵活性,举个例子。 比如 SCHEDULER = "scrapy.core.scheduler.Scheduler" 这个你可以自己写一个调度器,然后再自己的工程进行类似配置, 就可以使用自己的调度器了。 连调度器都可以定制,你说灵活不。

2.2.4.3. setting的优先级说明

# scrapy/scrapy/settings/__init__.py:SETTINGS_PRIORITIES
SETTINGS_PRIORITIES: Dict[str, int] = {
    "default": 0,
    "command": 10,
    "addon": 15,
    "project": 20,
    "spider": 30,
    "cmdline": 40,
}

2.2.5. inside_project

# scrapy/scrapy/utils/project.py
def inside_project() -> bool:

    # 检查环境变量是否已经存在,其实起那么已经设置了。 
    # 这个我们知道其实在前面的init_env函数里面已经设置过了。也将工程path加入到sys.path里面了。
    
    scrapy_module = os.environ.get(ENVVAR)
    if scrapy_module:
        try:
            import_module(scrapy_module)
        except ImportError as exc:
            warnings.warn(
                f"Cannot import scrapy settings module {scrapy_module}: {exc}"
            )
        else:
            return True
    # 如果没有设置环境变量,则检查scrapy.cfg是否存在,如果存在,则说明在项目内,否则不在项目内。
    return bool(closest_scrapy_cfg())
  1. 先通过import 方式判定, 之前通过init_env设置过SCRAPY_SETTINGS_MODULE,这里直接通过import获取。如果可以导入成功,那就说明必在项目目录的。

  2. 如果上面没有的话,能在目录找到scrapy.cfg文件,则说明也是在项目目录下。

  3. 否则,说明不在项目目录下。

2.2.6. 创建了CrawlerProcess对象

# scrapy/scrapy/crawler.py CrawlerProcess
    def __init__(
        self,
        settings: Union[Dict[str, Any], Settings, None] = None,
        install_root_handler: bool = True,
    ):
        # 核心: 调用父类的初始化函数
        super().__init__(settings)
        configure_logging(self.settings, install_root_handler)
        log_scrapy_info(self.settings)
        self._initialized_reactor = False

这个类没有做太多核心事情,核心看父类的初始化函数做了什么。

# scrapy/scrapy/crawler.py
    def __init__(self, settings: Union[Dict[str, Any], Settings, None] = None):
        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)
        self.settings = settings
        self.spider_loader = self._get_spider_loader(settings)
        self._crawlers: Set[Crawler] = set()
        self._active: Set[Deferred] = set()
        self.bootstrap_failed = False

这个初始化我们看到完成了核心的spider loader的加载工作。 我们看看详细的加载过程。

# scrapy/scrapy/crawler.py
    @staticmethod
    def _get_spider_loader(settings: BaseSettings):
        """Get SpiderLoader instance from settings"""
        # 从设置获取spider loader类【字符串】,默认是scrapy.spiderloader.SpiderLoader
        cls_path = settings.get("SPIDER_LOADER_CLASS")
        # 字符串=》类
        loader_cls = load_object(cls_path)
        # zope.interface 验证
        verifyClass(ISpiderLoader, loader_cls)
        # 实例化
        return loader_cls.from_settings(settings.frozencopy())

2.2.7. cmd.run

# scrapy/scrapy/commands/crawl.py
    def run(self, args: List[str], opts: argparse.Namespace) -> None:
        if len(args) < 1:
            raise UsageError()
        elif len(args) > 1:
            raise UsageError(
                "running 'scrapy crawl' with more than one spider is not supported"
            )
        # 获取爬虫名字,scrapy crawl <spider_name>,提取到spider_name
        spname = args[0]

        assert self.crawler_process
        # 核心1: 调用crawl方法
        crawl_defer = self.crawler_process.crawl(spname, **opts.spargs)

        if getattr(crawl_defer, "result", None) is not None and issubclass(
            cast(Failure, crawl_defer.result).type, Exception
        ):
            self.exitcode = 1
        else:
            # 核心2: 启动
            self.crawler_process.start()

            if (
                self.crawler_process.bootstrap_failed
                or hasattr(self.crawler_process, "has_exception")
                and self.crawler_process.has_exception
            ):
                self.exitcode = 1

可以看到cmd的run其实是调用了2个核心方法。

  1. crawler_process.crawl

  2. crawler_process.start

下面我们看看这2个部分做的什么工作

2.2.8. crawler_process.crawl

# scrapy/scrapy/crawler.py
    @inlineCallbacks
    def crawl(self, *args: Any, **kwargs: Any) -> Generator[Deferred, Any, None]:
        if self.crawling:
            raise RuntimeError("Crawling already taking place")
        if self._started:
            warnings.warn(
                "Running Crawler.crawl() more than once is deprecated.",
                ScrapyDeprecationWarning,
                stacklevel=2,
            )
        # 标记已经启动了
        self.crawling = self._started = True

        try:
            # 创建我们自定义的爬虫实例,
            self.spider = self._create_spider(*args, **kwargs)
            # 应用设置
            self._apply_settings()
            self._update_root_log_handler()
            # 创建引擎
            self.engine = self._create_engine()
            # 获取我们爬虫定义的start_requests 函数(用户可能么有重写这个方法,不过我们写的爬虫类是继承的父类有这个方法)
            start_requests = iter(self.spider.start_requests())
            # 引擎从爬虫这里的入口进行开始。
            yield self.engine.open_spider(self.spider, start_requests)
            yield maybeDeferred(self.engine.start)
        except Exception:
            # 有异常,停止引擎。
            self.crawling = False
            if self.engine is not None:
                yield self.engine.close()
            raise

这个部分主要完成爬虫实例(就是我们通过scrapy genspider quotes 生成的文件类)的创建,引擎创建,将爬虫的入口给引擎,然后启动引擎。

2.2.8.1. _create_spider

    def _create_spider(self, *args: Any, **kwargs: Any) -> Spider:
        return self.spidercls.from_crawler(self, *args, **kwargs)

可以看到其实是调用我们给我们自己的爬虫类的from_crawler方法,然后返回一个爬虫实例。一般情况下这个from_crawler如果我们不写的话,会调用到父类的方法。如下。

    @classmethod
    def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
        # 调用自己的__init__方法
        spider = cls(*args, **kwargs)
        # 把crawler对象设置到spider中
        spider._set_crawler(crawler)
        return spider

2.2.8.2. _apply_settings

    def _apply_settings(self) -> None:
        # 如果设置已经冻结,说明已经应用过,直接返回,避免重复设置。
        if self.settings.frozen:
            return
        # addon加载设置
        self.addons.load_settings(self.settings)
        # 统计实例化,方便运行时收集数据
        self.stats = load_object(self.settings["STATS_CLASS"])(self)

        handler = LogCounterHandler(self, level=self.settings.get("LOG_LEVEL"))
        logging.root.addHandler(handler)
        # lambda is assigned to Crawler attribute because this way it is not
        # garbage collected after leaving the scope
        self.__remove_handler = lambda: logging.root.removeHandler(handler)
        self.signals.connect(self.__remove_handler, signals.engine_stopped)

        lf_cls: Type[LogFormatter] = load_object(self.settings["LOG_FORMATTER"])
        self.logformatter = lf_cls.from_crawler(self)

        # 请求请求,方便支持后续去重操作。
        self.request_fingerprinter = build_from_crawler(
            load_object(self.settings["REQUEST_FINGERPRINTER_CLASS"]),
            self,
        )

        # 事件循环类加载和实例化。
        reactor_class: str = self.settings["TWISTED_REACTOR"]
        event_loop: str = self.settings["ASYNCIO_EVENT_LOOP"]
        if self._init_reactor:
            # this needs to be done after the spider settings are merged,
            # but before something imports twisted.internet.reactor
            if reactor_class:
                install_reactor(reactor_class, event_loop)
            else:
                from twisted.internet import reactor  # noqa: F401
            log_reactor_info()
        if reactor_class:
            verify_installed_reactor(reactor_class)
            if is_asyncio_reactor_installed() and event_loop:
                verify_installed_asyncio_event_loop(event_loop)

        # 扩展的实例化。
        self.extensions = ExtensionManager.from_crawler(self)
        # 进行冻结。
        self.settings.freeze()

        d = dict(overridden_settings(self.settings))
        logger.info(
            "Overridden settings:\n%(settings)s", {"settings": pprint.pformat(d)}
        )

根据设置信息,完成一些实例话和准备工作。 1. addon准备 2. stat实例化 3. 日志实例化 4. 事件循环实例化 5. 扩展实例化 6. 冻结设置

2.2.8.3. _create_engine

    def _create_engine(self) -> ExecutionEngine:
        return ExecutionEngine(self, lambda _: self.stop())

这个只是调用了初始化函数,我们看看引擎的初始化函数。

class ExecutionEngine:
    def __init__(self, crawler: "Crawler", spider_closed_callback: Callable) -> None:
        self.crawler: "Crawler" = crawler
        self.settings: Settings = crawler.settings
        self.signals: SignalManager = crawler.signals
        assert crawler.logformatter
        self.logformatter: LogFormatter = crawler.logformatter
        self.slot: Optional[Slot] = None
        self.spider: Optional[Spider] = None
        self.running: bool = False
        self.paused: bool = False
        # 核心1 : 调度类
        self.scheduler_cls: Type["BaseScheduler"] = self._get_scheduler_class(
            crawler.settings
        )
        # 核心2: 下载器
        downloader_cls: Type[Downloader] = load_object(self.settings["DOWNLOADER"])
        self.downloader: Downloader = downloader_cls(crawler)

        # 核心3: 爬虫
        self.scraper = Scraper(crawler)
        self._spider_closed_callback: Callable = spider_closed_callback
        self.start_time: Optional[float] = None

引擎初始化,将相关核心组件作为属性,方便后续引擎协调各个组件进行合同工作。 这个后面再说。

2.2.8.4. engine.open_spider

    # scrapy/scrapy/core/engine.py
    @inlineCallbacks
    def open_spider(
        self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True
    ) -> Generator[Deferred, Any, None]:
        if self.slot is not None:
            raise RuntimeError(f"No free spider slot when opening {spider.name!r}")
        logger.info("Spider opened", extra={"spider": spider})
        # 注册一个未来请求函数
        nextcall = CallLaterOnce(self._next_request)
        # 构建调度器
        scheduler = build_from_crawler(self.scheduler_cls, self.crawler)

        # 调用scraper 开始处理我们爬虫的start_requests.
        start_requests = yield self.scraper.spidermw.process_start_requests(
            start_requests, spider
        )
        # 构建槽位。slot
        self.slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
        self.spider = spider

        if hasattr(scheduler, "open"):
            # 调度器打开
            yield scheduler.open(spider)
            # 采集器打开。
        yield self.scraper.open_spider(spider)
        assert self.crawler.stats
        # 统计器打开
        self.crawler.stats.open_spider(spider)
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
        #
        self.slot.nextcall.schedule()
        self.slot.heartbeat.start(5)

2.2.9. crawler_process.start

    def start(
        self, stop_after_crawl: bool = True, install_signal_handlers: bool = True
    ) -> None:
        """
        This method starts a :mod:`~twisted.internet.reactor`, adjusts its pool
        size to :setting:`REACTOR_THREADPOOL_MAXSIZE`, and installs a DNS cache
        based on :setting:`DNSCACHE_ENABLED` and :setting:`DNSCACHE_SIZE`.

        If ``stop_after_crawl`` is True, the reactor will be stopped after all
        crawlers have finished, using :meth:`join`.

        :param bool stop_after_crawl: stop or not the reactor when all
            crawlers have finished

        :param bool install_signal_handlers: whether to install the OS signal
            handlers from Twisted and Scrapy (default: True)
        """
        from twisted.internet import reactor

        if stop_after_crawl:
            d = self.join()
            # Don't start the reactor if the deferreds are already fired
            if d.called:
                return
            d.addBoth(self._stop_reactor)

        # 核心: 创建DNS缓存
        resolver_class = load_object(self.settings["DNS_RESOLVER"])
        resolver = build_from_crawler(resolver_class, self, reactor=reactor)
        resolver.install_on_reactor()

        # 核心: 创建线程池
        tp = reactor.getThreadPool()
        tp.adjustPoolsize(maxthreads=self.settings.getint("REACTOR_THREADPOOL_MAXSIZE"))

        # 核心: 给reactor添加信号处理, 在shutdown前完成stop工作,在启动后完成install_shutdown_handlers
        reactor.addSystemEventTrigger("before", "shutdown", self.stop)
        if install_signal_handlers:
            reactor.addSystemEventTrigger(
                "after", "startup", install_shutdown_handlers, self._signal_shutdown
            )
        reactor.run(installSignalHandlers=install_signal_handlers)  # blocking call

这里使用了reactor反应堆模块,它是twisted模块的事件管理器,我们只需要将事件注册到这个管理器中,然后调用run就可以启动 它会帮忙我们执行注册的事件,如果遇到网络io等待,会自动切换到其他的可执行事件上。