Skip to main content
The CrawlerEngine class is the core component that orchestrates the entire crawling process. It manages request scheduling, concurrency, rate limiting, checkpoint/resume functionality, and item collection.
This class is typically used internally by the Spider framework. You usually don’t instantiate it directly.

Class Definition

from scrapling.spiders.engine import CrawlerEngine

class CrawlerEngine:
    """Orchestrates the crawling process."""

Constructor

def __init__(
    self,
    spider: Spider,
    session_manager: SessionManager,
    crawldir: Optional[Union[str, Path, AsyncPath]] = None,
    interval: float = 300.0,
)
spider
Spider
required
The spider instance to run.
session_manager
SessionManager
required
Session manager containing configured sessions.
crawldir
str | Path | AsyncPath | None
default:"None"
Directory for checkpoint files. If None, checkpointing is disabled.
interval
float
default:"300.0"
Seconds between periodic checkpoint saves (default 5 minutes). Set to 0 to disable periodic checkpoints.

Attributes

spider
Spider
Reference to the spider being executed.
session_manager
SessionManager
Session manager for handling requests.
scheduler
Scheduler
Request scheduler with duplicate filtering.
stats
CrawlStats
Current crawl statistics.
paused
bool
Whether the crawl was paused (vs. completed normally).

Methods

crawl

async def crawl(self) -> CrawlStats
Run the spider and return crawl statistics. This is the main entry point for the engine. Returns: CrawlStats object with detailed crawl metrics Process flow:
  1. Check for existing checkpoint and restore if found
  2. Call spider.on_start(resuming=bool)
  3. Generate initial requests from spider.start_requests() (if not resuming)
  4. Process requests concurrently with rate limiting
  5. Handle responses through callbacks
  6. Save periodic checkpoints (if enabled)
  7. Call spider.on_close() on completion
  8. Clean up checkpoint files on successful completion

request_pause

def request_pause(self) -> None
Request a graceful pause of the crawl.
  • First call: Requests graceful pause (waits for active tasks to complete)
  • Second call: Forces immediate stop (cancels active tasks)
This method is called automatically when the user presses Ctrl+C.

items

@property
def items(self) -> ItemList
Access scraped items collected during the crawl. Returns: ItemList containing all scraped items

Internal Methods

_process_request

async def _process_request(self, request: Request) -> None
Download and process a single request. Handles:
  • Rate limiting (global and per-domain)
  • Download delay
  • Session fetching
  • Blocked request detection and retry
  • Callback execution
  • Item processing
  • Error handling

_save_checkpoint

async def _save_checkpoint(self) -> None
Save current crawl state to checkpoint files. Includes:
  • Pending requests in the scheduler
  • Seen request fingerprints

_restore_from_checkpoint

async def _restore_from_checkpoint(self) -> bool
Attempt to restore state from checkpoint. Returns: True if successfully restored, False if no checkpoint found

_is_domain_allowed

def _is_domain_allowed(self, request: Request) -> bool
Check if the request’s domain is in spider.allowed_domains. Returns: True if allowed (or if allowed_domains is empty)

_normalize_request

def _normalize_request(self, request: Request) -> None
Normalize request fields before enqueueing. Resolves empty sid to the default session ID.

Async Iteration

The engine supports async iteration for streaming items:
async for item in engine:
    print(item)  # Process items as they're scraped
This is used internally by Spider.stream().

Usage Examples

Direct Engine Usage (Advanced)

import anyio
from scrapling.spiders import Spider
from scrapling.spiders.engine import CrawlerEngine
from scrapling.spiders.session import SessionManager
from scrapling.fetchers import FetcherSession

class MySpider(Spider):
    name = "example"
    start_urls = ["https://example.com"]
    
    async def parse(self, response):
        yield {"url": response.url}

async def main():
    spider = MySpider()
    
    # Manually create session manager
    manager = SessionManager()
    manager.add("default", FetcherSession())
    
    # Create and run engine
    async with manager:
        engine = CrawlerEngine(spider, manager, crawldir="./checkpoints")
        stats = await engine.crawl()
        
        print(f"Items: {len(engine.items)}")
        print(f"Stats: {stats.to_dict()}")

anyio.run(main)

Streaming Items

async def main():
    spider = MySpider()
    manager = SessionManager()
    manager.add("default", FetcherSession())
    
    async with manager:
        engine = CrawlerEngine(spider, manager)
        
        # Stream items as they arrive
        async for item in engine:
            print(f"Got item: {item}")
            print(f"Progress: {engine.stats.items_scraped} items")

anyio.run(main)

Monitoring Progress

import anyio

async def monitor_stats(engine: CrawlerEngine):
    """Periodically print crawl statistics."""
    while engine._running:
        stats = engine.stats
        print(f"Requests: {stats.requests_count} | Items: {stats.items_scraped} | Active: {engine._active_tasks}")
        await anyio.sleep(5)

async def main():
    spider = MySpider()
    manager = SessionManager()
    manager.add("default", FetcherSession())
    
    async with manager:
        engine = CrawlerEngine(spider, manager)
        
        async with anyio.create_task_group() as tg:
            tg.start_soon(monitor_stats, engine)
            stats = await engine.crawl()

anyio.run(main)

Concurrency Control

The engine manages concurrency at two levels:

Global Concurrency

class MySpider(Spider):
    concurrent_requests = 10  # Max 10 concurrent requests globally
Implemented via CapacityLimiter - limits total active requests.

Per-Domain Concurrency

class MySpider(Spider):
    concurrent_requests_per_domain = 2  # Max 2 concurrent per domain
Implemented via per-domain CapacityLimiter - prevents overwhelming specific servers.

Download Delay

class MySpider(Spider):
    download_delay = 1.0  # Wait 1 second between requests
Applied before each request is fetched.

Checkpoint System

When crawldir is provided, the engine automatically saves checkpoints:

Checkpoint Timing

  1. Periodic saves: Every interval seconds (default 300)
  2. Graceful pause: When request_pause() is called
  3. SIGINT handler: Automatic on Ctrl+C

Checkpoint Contents

Checkpoints store:
  • Pending requests: All requests still in the scheduler queue
  • Seen fingerprints: Set of request fingerprints to avoid re-fetching

Resume Behavior

On resume:
  • Skips spider.start_requests()
  • Restores pending requests to scheduler
  • Continues from where it left off
  • Calls spider.on_start(resuming=True)

Error Handling

The engine handles errors at multiple levels:

Request Errors

try:
    response = await session_manager.fetch(request)
except Exception as e:
    stats.failed_requests_count += 1
    await spider.on_error(request, e)

Callback Errors

try:
    async for result in callback(response):
        # Process result
except Exception as e:
    log.error(f"Spider error processing {request}: {e}", exc_info=e)
    await spider.on_error(request, e)

Blocked Request Handling

if await spider.is_blocked(response):
    stats.blocked_requests_count += 1
    if request._retry_count < spider.max_blocked_retries:
        retry_request = await spider.retry_blocked_request(request.copy(), response)
        await scheduler.enqueue(retry_request)

Performance Metrics

The engine tracks comprehensive statistics in CrawlStats:
  • requests_count: Total requests made
  • failed_requests_count: Failed requests
  • blocked_requests_count: Detected blocked requests
  • offsite_requests_count: Filtered offsite requests
  • items_scraped: Items yielded and accepted
  • items_dropped: Items dropped by on_scraped_item
  • response_bytes: Total bytes downloaded
  • domains_response_bytes: Per-domain bandwidth
  • sessions_requests_count: Requests per session
  • response_status_count: Status code distribution
  • elapsed_seconds: Total crawl duration
  • requests_per_second: Throughput rate

See Also

Build docs developers (and LLMs) love