2.2. 抓取入口
2.2.1. 创建样例项目
可以参考 官方入门教程
下面是我创建的一个 demo
2.2.2. scrapy命令从何而来
通常情况下我们是通过 scrapy crawl quotes
命令来执行抓取。
scrapy crawl quotes
这个scrapy命令是从哪里来的呢?
which scrapy
/Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/scrapy
# 查看类型,如果二进制的话,就没法查看。确定是文本类型
file /Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/scrapy
/Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/scrapy: a /Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/python script text executable, ASCII text
我们使用cat命令,或者ide打开这个文件,内容如下。
#!/Users/bytedance/codes/github/zhaojiedi1992/tutorial/venv/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from scrapy.cmdline import execute
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(execute())
可以看到核心是执行了 execute
方法。
下面为了调试方便,我们在我们的scrapy demo 项目下创建的一个项目目录下,创建一个debug.py文件,内容如下。
from scrapy.cmdline import execute
if __name__ == '__main__':
execute(['scrapy', 'crawl', 'quotes'])
#execute()
借助IDE的调试功能,我们可以通过打断点来高效分析代码执行流程。
2.2.3. 运行入口
从上面我们知道,scrapy命令执行了 execute
方法。这个方法来自scrapy.cmdline execute方法。
# scrapy/scrapy/cmdline.py
def execute(argv=None, settings=None):
# 如果argv为空的话使用终端传参。
if argv is None:
argv = sys.argv
# 核心1: 填充工程设置。
if settings is None:
settings = get_project_settings()
# set EDITOR from environment if available
try:
editor = os.environ["EDITOR"]
except KeyError:
pass
else:
settings["EDITOR"] = editor
# 核心2: 判断是否在工程目录下。
inproject = inside_project()
# 非核心, 下面一大坨可以先跳过了。
# 主要是命令填充, 根据参数获取要执行的命令,填充终端参数,优先级设置为命令级别。
cmds = _get_commands_dict(settings, inproject)
cmdname = _pop_command_name(argv)
if not cmdname:
_print_commands(settings, inproject)
sys.exit(0)
elif cmdname not in cmds:
_print_unknown_command(settings, cmdname, inproject)
sys.exit(2)
cmd = cmds[cmdname]
parser = ScrapyArgumentParser(
formatter_class=ScrapyHelpFormatter,
usage=f"scrapy {cmdname} {cmd.syntax()}",
conflict_handler="resolve",
description=cmd.long_desc(),
)
settings.setdict(cmd.default_settings, priority="command")
cmd.settings = settings
cmd.add_options(parser)
opts, args = parser.parse_known_args(args=argv[1:])
_run_print_help(parser, cmd.process_options, args, opts)
# 核心3,创建爬虫进程。
cmd.crawler_process = CrawlerProcess(settings)
# 核心4, 执行cmd.run()方法。
_run_print_help(parser, _run_command, cmd, args, opts)
sys.exit(cmd.exitcode)
这个方法主要做了以下几件事。 1. 通过get_project_settings获取项目配置。 2. 通过inside_project判断是否在项目目录下。 3. 创建了CrawlerProcess对象。 4. 执行了cmd.run方法。
下面我们主要分析一下几个核心方法。
2.2.4. get_project_settings
# scrapy/scrapy/utils/project.py
def get_project_settings() -> Settings:
# 如果环境变量没有SCRAPY_SETTINGS_MODULE,则使用默认的default
if ENVVAR not in os.environ:
project = os.environ.get("SCRAPY_PROJECT", "default")
# 核心: 初始化工作,完成scrapy.cfg文件找取,
# 以及从scrapy.cfg里面找到工程设置文件应该读取哪个。
init_env(project)
# 构造一个setting实例,这个创建的实例通过init函数会读取scrapy/settings/defaut_settings.py里面的配置加载一个default优先级的配置。
settings = Settings()
settings_module_path = os.environ.get(ENVVAR)
# 如果有用户配置,则覆盖默认配置,优先级为工程级别
if settings_module_path:
settings.setmodule(settings_module_path, priority="project")
# 如果环境变量有scrapy相关的(Scrapy_开头的)变量,则覆盖默认配置,优先级为工程级别。
valid_envvars = {
"CHECK",
"PROJECT",
"PYTHON_SHELL",
"SETTINGS_MODULE",
}
scrapy_envvars = {
k[7:]: v
for k, v in os.environ.items()
if k.startswith("SCRAPY_") and k.replace("SCRAPY_", "") in valid_envvars
}
settings.setdict(scrapy_envvars, priority="project")
return settings
2.2.4.1. 我们看下init_env方法做了什么?
# scrapy/scrapy/utils/conf.py
# 从当前目录开始,查找scrapy.cfg文件,然后递归向上查找,直到找到scrapy.cfg文件,如果找不到返回空字符串
def closest_scrapy_cfg(
path: Union[str, os.PathLike] = ".",
prevpath: Optional[Union[str, os.PathLike]] = None,
) -> str:
"""Return the path to the closest scrapy.cfg file by traversing the current
directory and its parents
"""
if prevpath is not None and str(path) == str(prevpath):
return ""
path = Path(path).resolve()
cfgfile = path / "scrapy.cfg"
if cfgfile.exists():
return str(cfgfile)
return closest_scrapy_cfg(path.parent, path)
def init_env(project: str = "default", set_syspath: bool = True) -> None:
"""Initialize environment to use command-line tool from inside a project
dir. This sets the Scrapy settings module and modifies the Python path to
be able to locate the project module.
"""
# 这个方法会获取配置文件, 如果能从当前目录向上获取到scrapy.cfg文件,则使用scrapy.cfg文件中的配置,
# 否则会逐步使用~/scrapy.cfg文件中的配置,/etc/scrapy.cfg文件中的配置,c:\scrapy\scrapy.cfg文件中的配置
cfg = get_config()
if cfg.has_option("settings", project):
# 从工程的.scrapy.cfg 获取setting文件位置。 default = tutorial.settings
# 将这个值,注入到环境变量里面, 方便后续的流程获取这个环境变量。
os.environ["SCRAPY_SETTINGS_MODULE"] = cfg.get("settings", project)
#从当前目录开始,查找scrapy.cfg文件,然后递归向上查找,直到找到scrapy.cfg文件,如果找不到返回空字符串
closest = closest_scrapy_cfg()
if closest:
# 获取工程目录(目录有.scrapy.cfg文件的目录),添加到sys.path中。
projdir = str(Path(closest).parent)
if set_syspath and projdir not in sys.path:
sys.path.append(projdir)
def get_config(use_closest: bool = True) -> ConfigParser:
"""Get Scrapy config file as a ConfigParser"""
sources = get_sources(use_closest)
cfg = ConfigParser()
# config parser 直接读取所有文件, config parser 会自动合并相同key的配置,如果key相同,优先读取的文件,会被后面的文件覆盖。
# 这也能解释通,我们工程目录下,scrapy.cfg文件中的配置,会覆盖/etc/scrapy.cfg或者~/.scrapy.cfg文件中的配置,
cfg.read(sources)
return cfg
def get_sources(use_closest: bool = True) -> List[str]:
# 配置文件是好几个的,获取所有可能得配置文件路径,返回一个列表。
xdg_config_home = (
os.environ.get("XDG_CONFIG_HOME") or Path("~/.config").expanduser()
)
sources = [
"/etc/scrapy.cfg",
r"c:\scrapy\scrapy.cfg",
str(Path(xdg_config_home) / "scrapy.cfg"),
str(Path("~/.scrapy.cfg").expanduser()),
]
if use_closest:
sources.append(closest_scrapy_cfg())
return sources
可以看到init_env 函数完成如下功能。 1. 查找scrapy.cfg相关文件。 2. 从scrapy.cfg文件中获取爬虫setting文件位置。 3. 将工程设置文件位置添加到环境变量中(SCRAPY_SETTINGS_MODULE)。
2.2.4.2. Settings初始化
乍一看,还一会创建一个完全空的settings呢,其实看看init函数,发现不简单。 其实是通过default_setting.py的配置创建一个默认的setting实例,然后根据工程settings.py文件更新配置。
# scrapy/scrapy/settings/__init__.py
class Settings(BaseSettings):
"""
This object stores Scrapy settings for the configuration of internal
components, and can be used for any further customization.
It is a direct subclass and supports all methods of
:class:`~scrapy.settings.BaseSettings`. Additionally, after instantiation
of this class, the new object will have the global default settings
described on :ref:`topics-settings-ref` already populated.
"""
def __init__(
self, values: _SettingsInputT = None, priority: Union[int, str] = "project"
):
# Do not pass kwarg values here. We don't want to promote user-defined
# dicts, and we want to update, not replace, default dicts with the
# values given by the user
super().__init__()
self.setmodule(default_settings, "default")
# Promote default dictionaries to BaseSettings instances for per-key
# priorities
for name, val in self.items():
if isinstance(val, dict):
self.set(name, BaseSettings(val, "default"), "default")
self.update(values, priority)
settings/default_settings.py
配置文件里面写了大量配置,其中包含了大量的类,这些给scrapy的扩展提供了很大的灵活性,举个例子。
比如 SCHEDULER = "scrapy.core.scheduler.Scheduler"
这个你可以自己写一个调度器,然后再自己的工程进行类似配置,
就可以使用自己的调度器了。 连调度器都可以定制,你说灵活不。
2.2.4.3. setting的优先级说明
# scrapy/scrapy/settings/__init__.py:SETTINGS_PRIORITIES
SETTINGS_PRIORITIES: Dict[str, int] = {
"default": 0,
"command": 10,
"addon": 15,
"project": 20,
"spider": 30,
"cmdline": 40,
}
2.2.5. inside_project
# scrapy/scrapy/utils/project.py
def inside_project() -> bool:
# 检查环境变量是否已经存在,其实起那么已经设置了。
# 这个我们知道其实在前面的init_env函数里面已经设置过了。也将工程path加入到sys.path里面了。
scrapy_module = os.environ.get(ENVVAR)
if scrapy_module:
try:
import_module(scrapy_module)
except ImportError as exc:
warnings.warn(
f"Cannot import scrapy settings module {scrapy_module}: {exc}"
)
else:
return True
# 如果没有设置环境变量,则检查scrapy.cfg是否存在,如果存在,则说明在项目内,否则不在项目内。
return bool(closest_scrapy_cfg())
先通过import 方式判定, 之前通过init_env设置过SCRAPY_SETTINGS_MODULE,这里直接通过import获取。如果可以导入成功,那就说明必在项目目录的。
如果上面没有的话,能在目录找到scrapy.cfg文件,则说明也是在项目目录下。
否则,说明不在项目目录下。
2.2.6. 创建了CrawlerProcess对象
# scrapy/scrapy/crawler.py CrawlerProcess
def __init__(
self,
settings: Union[Dict[str, Any], Settings, None] = None,
install_root_handler: bool = True,
):
# 核心: 调用父类的初始化函数
super().__init__(settings)
configure_logging(self.settings, install_root_handler)
log_scrapy_info(self.settings)
self._initialized_reactor = False
这个类没有做太多核心事情,核心看父类的初始化函数做了什么。
# scrapy/scrapy/crawler.py
def __init__(self, settings: Union[Dict[str, Any], Settings, None] = None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
self.spider_loader = self._get_spider_loader(settings)
self._crawlers: Set[Crawler] = set()
self._active: Set[Deferred] = set()
self.bootstrap_failed = False
这个初始化我们看到完成了核心的spider loader的加载工作。 我们看看详细的加载过程。
# scrapy/scrapy/crawler.py
@staticmethod
def _get_spider_loader(settings: BaseSettings):
"""Get SpiderLoader instance from settings"""
# 从设置获取spider loader类【字符串】,默认是scrapy.spiderloader.SpiderLoader
cls_path = settings.get("SPIDER_LOADER_CLASS")
# 字符串=》类
loader_cls = load_object(cls_path)
# zope.interface 验证
verifyClass(ISpiderLoader, loader_cls)
# 实例化
return loader_cls.from_settings(settings.frozencopy())
2.2.7. cmd.run
# scrapy/scrapy/commands/crawl.py
def run(self, args: List[str], opts: argparse.Namespace) -> None:
if len(args) < 1:
raise UsageError()
elif len(args) > 1:
raise UsageError(
"running 'scrapy crawl' with more than one spider is not supported"
)
# 获取爬虫名字,scrapy crawl <spider_name>,提取到spider_name
spname = args[0]
assert self.crawler_process
# 核心1: 调用crawl方法
crawl_defer = self.crawler_process.crawl(spname, **opts.spargs)
if getattr(crawl_defer, "result", None) is not None and issubclass(
cast(Failure, crawl_defer.result).type, Exception
):
self.exitcode = 1
else:
# 核心2: 启动
self.crawler_process.start()
if (
self.crawler_process.bootstrap_failed
or hasattr(self.crawler_process, "has_exception")
and self.crawler_process.has_exception
):
self.exitcode = 1
可以看到cmd的run其实是调用了2个核心方法。
crawler_process.crawl
crawler_process.start
下面我们看看这2个部分做的什么工作
2.2.8. crawler_process.crawl
# scrapy/scrapy/crawler.py
@inlineCallbacks
def crawl(self, *args: Any, **kwargs: Any) -> Generator[Deferred, Any, None]:
if self.crawling:
raise RuntimeError("Crawling already taking place")
if self._started:
warnings.warn(
"Running Crawler.crawl() more than once is deprecated.",
ScrapyDeprecationWarning,
stacklevel=2,
)
# 标记已经启动了
self.crawling = self._started = True
try:
# 创建我们自定义的爬虫实例,
self.spider = self._create_spider(*args, **kwargs)
# 应用设置
self._apply_settings()
self._update_root_log_handler()
# 创建引擎
self.engine = self._create_engine()
# 获取我们爬虫定义的start_requests 函数(用户可能么有重写这个方法,不过我们写的爬虫类是继承的父类有这个方法)
start_requests = iter(self.spider.start_requests())
# 引擎从爬虫这里的入口进行开始。
yield self.engine.open_spider(self.spider, start_requests)
yield maybeDeferred(self.engine.start)
except Exception:
# 有异常,停止引擎。
self.crawling = False
if self.engine is not None:
yield self.engine.close()
raise
这个部分主要完成爬虫实例(就是我们通过scrapy genspider quotes 生成的文件类)的创建,引擎创建,将爬虫的入口给引擎,然后启动引擎。
2.2.8.1. _create_spider
def _create_spider(self, *args: Any, **kwargs: Any) -> Spider:
return self.spidercls.from_crawler(self, *args, **kwargs)
可以看到其实是调用我们给我们自己的爬虫类的from_crawler方法,然后返回一个爬虫实例。一般情况下这个from_crawler如果我们不写的话,会调用到父类的方法。如下。
@classmethod
def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
# 调用自己的__init__方法
spider = cls(*args, **kwargs)
# 把crawler对象设置到spider中
spider._set_crawler(crawler)
return spider
2.2.8.2. _apply_settings
def _apply_settings(self) -> None:
# 如果设置已经冻结,说明已经应用过,直接返回,避免重复设置。
if self.settings.frozen:
return
# addon加载设置
self.addons.load_settings(self.settings)
# 统计实例化,方便运行时收集数据
self.stats = load_object(self.settings["STATS_CLASS"])(self)
handler = LogCounterHandler(self, level=self.settings.get("LOG_LEVEL"))
logging.root.addHandler(handler)
# lambda is assigned to Crawler attribute because this way it is not
# garbage collected after leaving the scope
self.__remove_handler = lambda: logging.root.removeHandler(handler)
self.signals.connect(self.__remove_handler, signals.engine_stopped)
lf_cls: Type[LogFormatter] = load_object(self.settings["LOG_FORMATTER"])
self.logformatter = lf_cls.from_crawler(self)
# 请求请求,方便支持后续去重操作。
self.request_fingerprinter = build_from_crawler(
load_object(self.settings["REQUEST_FINGERPRINTER_CLASS"]),
self,
)
# 事件循环类加载和实例化。
reactor_class: str = self.settings["TWISTED_REACTOR"]
event_loop: str = self.settings["ASYNCIO_EVENT_LOOP"]
if self._init_reactor:
# this needs to be done after the spider settings are merged,
# but before something imports twisted.internet.reactor
if reactor_class:
install_reactor(reactor_class, event_loop)
else:
from twisted.internet import reactor # noqa: F401
log_reactor_info()
if reactor_class:
verify_installed_reactor(reactor_class)
if is_asyncio_reactor_installed() and event_loop:
verify_installed_asyncio_event_loop(event_loop)
# 扩展的实例化。
self.extensions = ExtensionManager.from_crawler(self)
# 进行冻结。
self.settings.freeze()
d = dict(overridden_settings(self.settings))
logger.info(
"Overridden settings:\n%(settings)s", {"settings": pprint.pformat(d)}
)
根据设置信息,完成一些实例话和准备工作。 1. addon准备 2. stat实例化 3. 日志实例化 4. 事件循环实例化 5. 扩展实例化 6. 冻结设置
2.2.8.3. _create_engine
def _create_engine(self) -> ExecutionEngine:
return ExecutionEngine(self, lambda _: self.stop())
这个只是调用了初始化函数,我们看看引擎的初始化函数。
class ExecutionEngine:
def __init__(self, crawler: "Crawler", spider_closed_callback: Callable) -> None:
self.crawler: "Crawler" = crawler
self.settings: Settings = crawler.settings
self.signals: SignalManager = crawler.signals
assert crawler.logformatter
self.logformatter: LogFormatter = crawler.logformatter
self.slot: Optional[Slot] = None
self.spider: Optional[Spider] = None
self.running: bool = False
self.paused: bool = False
# 核心1 : 调度类
self.scheduler_cls: Type["BaseScheduler"] = self._get_scheduler_class(
crawler.settings
)
# 核心2: 下载器
downloader_cls: Type[Downloader] = load_object(self.settings["DOWNLOADER"])
self.downloader: Downloader = downloader_cls(crawler)
# 核心3: 爬虫
self.scraper = Scraper(crawler)
self._spider_closed_callback: Callable = spider_closed_callback
self.start_time: Optional[float] = None
引擎初始化,将相关核心组件作为属性,方便后续引擎协调各个组件进行合同工作。 这个后面再说。
2.2.8.4. engine.open_spider
# scrapy/scrapy/core/engine.py
@inlineCallbacks
def open_spider(
self, spider: Spider, start_requests: Iterable = (), close_if_idle: bool = True
) -> Generator[Deferred, Any, None]:
if self.slot is not None:
raise RuntimeError(f"No free spider slot when opening {spider.name!r}")
logger.info("Spider opened", extra={"spider": spider})
# 注册一个未来请求函数
nextcall = CallLaterOnce(self._next_request)
# 构建调度器
scheduler = build_from_crawler(self.scheduler_cls, self.crawler)
# 调用scraper 开始处理我们爬虫的start_requests.
start_requests = yield self.scraper.spidermw.process_start_requests(
start_requests, spider
)
# 构建槽位。slot
self.slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.spider = spider
if hasattr(scheduler, "open"):
# 调度器打开
yield scheduler.open(spider)
# 采集器打开。
yield self.scraper.open_spider(spider)
assert self.crawler.stats
# 统计器打开
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
#
self.slot.nextcall.schedule()
self.slot.heartbeat.start(5)
2.2.9. crawler_process.start
def start(
self, stop_after_crawl: bool = True, install_signal_handlers: bool = True
) -> None:
"""
This method starts a :mod:`~twisted.internet.reactor`, adjusts its pool
size to :setting:`REACTOR_THREADPOOL_MAXSIZE`, and installs a DNS cache
based on :setting:`DNSCACHE_ENABLED` and :setting:`DNSCACHE_SIZE`.
If ``stop_after_crawl`` is True, the reactor will be stopped after all
crawlers have finished, using :meth:`join`.
:param bool stop_after_crawl: stop or not the reactor when all
crawlers have finished
:param bool install_signal_handlers: whether to install the OS signal
handlers from Twisted and Scrapy (default: True)
"""
from twisted.internet import reactor
if stop_after_crawl:
d = self.join()
# Don't start the reactor if the deferreds are already fired
if d.called:
return
d.addBoth(self._stop_reactor)
# 核心: 创建DNS缓存
resolver_class = load_object(self.settings["DNS_RESOLVER"])
resolver = build_from_crawler(resolver_class, self, reactor=reactor)
resolver.install_on_reactor()
# 核心: 创建线程池
tp = reactor.getThreadPool()
tp.adjustPoolsize(maxthreads=self.settings.getint("REACTOR_THREADPOOL_MAXSIZE"))
# 核心: 给reactor添加信号处理, 在shutdown前完成stop工作,在启动后完成install_shutdown_handlers
reactor.addSystemEventTrigger("before", "shutdown", self.stop)
if install_signal_handlers:
reactor.addSystemEventTrigger(
"after", "startup", install_shutdown_handlers, self._signal_shutdown
)
reactor.run(installSignalHandlers=install_signal_handlers) # blocking call
这里使用了reactor反应堆模块,它是twisted模块的事件管理器,我们只需要将事件注册到这个管理器中,然后调用run就可以启动 它会帮忙我们执行注册的事件,如果遇到网络io等待,会自动切换到其他的可执行事件上。