Skip to main content
Prerequisites
  1. You’ve read the Getting started page and know how to create and run a basic spider.
This page covers the spider system’s advanced features: concurrency control, pause/resume, streaming, lifecycle hooks, statistics, and logging.

Concurrency Control

The spider system uses three class attributes to control how aggressively it crawls:
AttributeDefaultDescription
concurrent_requests4Maximum number of requests being processed at the same time
concurrent_requests_per_domain0Maximum concurrent requests per domain (0 = no per-domain limit)
download_delay0.0Seconds to wait before each request
class PoliteSpider(Spider):
    name = "polite"
    start_urls = ["https://example.com"]

    # Be gentle with the server
    concurrent_requests = 4
    concurrent_requests_per_domain = 2
    download_delay = 1.0  # Wait 1 second between requests

    async def parse(self, response: Response):
        yield {"title": response.css("title::text").get("")}
When concurrent_requests_per_domain is set, each domain gets its own concurrency limiter in addition to the global limit. This is useful when crawling multiple domains simultaneously — you can allow high global concurrency while being polite to each individual domain.

Rate Limiting Implementation

The rate limiting logic is implemented in the CrawlerEngine:
engine.py:71-77
def _rate_limiter(self, domain: str) -> CapacityLimiter:
    """Get or create a per-domain concurrency limiter if enabled, otherwise use the global limiter."""
    if self.spider.concurrent_requests_per_domain:
        if domain not in self._domain_limiters:
            self._domain_limiters[domain] = CapacityLimiter(self.spider.concurrent_requests_per_domain)
        return self._domain_limiters[domain]
    return self._global_limiter
And used during request processing:
engine.py:88-92
async def _process_request(self, request: Request) -> None:
    """Download and process a single request."""
    async with self._rate_limiter(request.domain):
        if self.spider.download_delay:
            await anyio.sleep(self.spider.download_delay)
The download_delay parameter adds a fixed wait before every request, regardless of the domain. Use it for simple rate limiting.

Using uvloop

The start() method accepts a use_uvloop parameter to use the faster uvloop/winloop event loop implementation, if available:
result = MySpider().start(use_uvloop=True)
This can improve throughput for I/O-heavy crawls. You’ll need to install uvloop (Linux/macOS) or winloop (Windows) separately.

Pause & Resume

The spider supports graceful pause-and-resume via checkpointing. To enable it, pass a crawldir directory to the spider constructor:
spider = MySpider(crawldir="crawl_data/my_spider")
result = spider.start()

if result.paused:
    print("Crawl was paused. Run again to resume.")
else:
    print("Crawl completed!")

How It Works

  1. Pausing: Press Ctrl+C during a crawl. The spider waits for all in-flight requests to finish, saves a checkpoint (pending requests + a set of seen request fingerprints), and then exits.
  2. Force stopping: Press Ctrl+C a second time to stop immediately without waiting for active tasks.
  3. Resuming: Run the spider again with the same crawldir. It detects the checkpoint, restores the queue and seen set, and continues from where it left off — skipping start_requests().
  4. Cleanup: When a crawl completes normally (not paused), the checkpoint files are deleted automatically.
Checkpoints are also saved periodically during the crawl (every 5 minutes by default). You can change the interval as follows:
# Save checkpoint every 2 minutes
spider = MySpider(crawldir="crawl_data/my_spider", interval=120.0)
The writing to disk is atomic, so it’s totally safe.

Checkpoint Implementation

The pause handling logic is implemented in the engine:
engine.py:165-182
def request_pause(self) -> None:
    """Request a graceful pause of the crawl.

    First call: requests graceful pause (waits for active tasks).
    Second call: forces immediate stop.
    """
    if self._force_stop:
        return  # Already forcing stop

    if self._pause_requested:
        # Second Ctrl+C - force stop
        self._force_stop = True
        log.warning("Force stop requested, cancelling immediately...")
    else:
        self._pause_requested = True
        log.info(
            "Pause requested, waiting for in-flight requests to complete (press Ctrl+C again to force stop)..."
        )
Checkpoint saving:
engine.py:184-189
async def _save_checkpoint(self) -> None:
    """Save current state to checkpoint files."""
    requests, seen = self.scheduler.snapshot()
    data = CheckpointData(requests=requests, seen=seen)
    await self._checkpoint_manager.save(data)
    self._last_checkpoint_time = anyio.current_time()
Pressing Ctrl+C during a crawl always causes the spider to close gracefully, even if the checkpoint system is not enabled. Doing it again without waiting forces the spider to close immediately.

Knowing If You’re Resuming

The on_start() hook receives a resuming flag:
async def on_start(self, resuming: bool = False):
    if resuming:
        self.logger.info("Resuming from checkpoint!")
    else:
        self.logger.info("Starting fresh crawl")

Streaming

For long-running spiders or applications that need real-time access to scraped items, use the stream() method instead of start():
import anyio

async def main():
    spider = MySpider()
    async for item in spider.stream():
        print(f"Got item: {item}")
        # Access real-time stats
        print(f"Items so far: {spider.stats.items_scraped}")
        print(f"Requests made: {spider.stats.requests_count}")

anyio.run(main)
Key differences from start():
  • stream() must be called from an async context
  • Items are yielded one by one as they’re scraped, not collected into a list
  • You can access spider.stats during iteration for real-time statistics

Streaming Implementation

The streaming logic uses memory channels:
engine.py:313-334
def __aiter__(self) -> AsyncGenerator[dict, None]:
    return self._stream()

async def _stream(self) -> AsyncGenerator[dict, None]:
    """Async generator that runs crawl and yields items."""
    send, recv = create_memory_object_stream[dict](100)
    self._item_stream = send

    async def run():
        try:
            await self.crawl()
        finally:
            await send.aclose()

    async with create_task_group() as tg:
        tg.start_soon(run)
        try:
            async for item in recv:
                yield item
        except EndOfStream:
            pass
You can use it with the checkpoint system too, making it easy to build UIs on top of spiders with real-time data that can be paused/resumed:
import anyio

async def main():
    spider = MySpider(crawldir="crawl_data/my_spider")
    async for item in spider.stream():
        print(f"Got item: {item}")
        # Access real-time stats
        print(f"Items so far: {spider.stats.items_scraped}")
        print(f"Requests made: {spider.stats.requests_count}")

anyio.run(main)
You can also use spider.pause() to shut down the spider programmatically. If you use it without enabling the checkpoint system, it will just close the crawl.

Lifecycle Hooks

The spider provides several hooks you can override to add custom behavior at different stages of the crawl:

on_start

Called before crawling begins. Use it for setup tasks like loading data or initializing resources:
spider.py:164-172
async def on_start(self, resuming: bool = False) -> None:
    """Called before crawling starts. Override for setup logic.

    :param resuming: It's enabled if the spider is resuming from a checkpoint, left for the user to use.
    """
    if resuming:
        self.logger.debug("Resuming spider from checkpoint")
    else:
        self.logger.debug("Starting spider")

on_close

Called after crawling finishes (whether completed or paused). Use it for cleanup:
spider.py:174-176
async def on_close(self) -> None:
    """Called after crawling finishes. Override for cleanup logic."""
    self.logger.debug("Spider closed")

on_error

Called when a request fails with an exception. Use it for error tracking or custom recovery logic:
spider.py:178-184
async def on_error(self, request: Request, error: Exception) -> None:
    """
    Handle request errors for all spider requests.

    Override for custom error handling.
    """
    pass

on_scraped_item

Called for every scraped item before it’s added to the results. Return the item (modified or not) to keep it, or return None to drop it:
spider.py:186-188
async def on_scraped_item(self, item: Dict[str, Any]) -> Dict[str, Any] | None:
    """A hook to be overridden by users to do some processing on scraped items, return `None` to drop the item silently."""
    return item
Example usage:
async def on_scraped_item(self, item: dict) -> dict | None:
    # Drop items without a title
    if not item.get("title"):
        return None

    # Modify items (e.g., add timestamps)
    item["scraped_at"] = "2026-01-01"
    return item
This hook can also be used to direct items through your own pipelines and drop them from the spider.

start_requests

Override start_requests() for custom initial request generation instead of using start_urls:
spider.py:141-156
async def start_requests(self) -> AsyncGenerator[Request, None]:
    """Generate initial requests to start the crawl.

    By default, this generates Request objects for each URL in `start_urls`
    using the session manager's default session and `parse()` as callback.

    Override this method for more control over initial requests
    (e.g., to add custom headers, use different callbacks, etc.)
    """
    if not self.start_urls:
        raise RuntimeError(
            "Spider has no starting point, either set `start_urls` or override `start_requests` function."
        )

    for url in self.start_urls:
        yield Request(url, sid=self._session_manager.default_session_id)
Example with custom login:
async def start_requests(self):
    # POST request to log in first
    yield Request(
        "https://example.com/login",
        method="POST",
        data={"user": "admin", "pass": "secret"},
        callback=self.after_login,
    )

async def after_login(self, response: Response):
    # Now crawl the authenticated pages
    yield response.follow("/dashboard", callback=self.parse)

Results & Statistics

The CrawlResult returned by start() contains both the scraped items and detailed statistics:
result = MySpider().start()

# Items
print(f"Total items: {len(result.items)}")
result.items.to_json("output.json", indent=True)

# Did the crawl complete?
print(f"Completed: {result.completed}")
print(f"Paused: {result.paused}")

# Statistics
stats = result.stats
print(f"Requests: {stats.requests_count}")
print(f"Failed: {stats.failed_requests_count}")
print(f"Blocked: {stats.blocked_requests_count}")
print(f"Offsite filtered: {stats.offsite_requests_count}")
print(f"Items scraped: {stats.items_scraped}")
print(f"Items dropped: {stats.items_dropped}")
print(f"Response bytes: {stats.response_bytes}")
print(f"Duration: {stats.elapsed_seconds:.1f}s")
print(f"Speed: {stats.requests_per_second:.1f} req/s")

CrawlStats Details

The CrawlStats dataclass tracks comprehensive information:
result.py:41-62
@dataclass
class CrawlStats:
    """Statistics for a crawl run."""

    requests_count: int = 0
    concurrent_requests: int = 0
    concurrent_requests_per_domain: int = 0
    failed_requests_count: int = 0
    offsite_requests_count: int = 0
    response_bytes: int = 0
    items_scraped: int = 0
    items_dropped: int = 0
    start_time: float = 0.0
    end_time: float = 0.0
    download_delay: float = 0.0
    blocked_requests_count: int = 0
    custom_stats: Dict = field(default_factory=dict)
    response_status_count: Dict = field(default_factory=dict)
    domains_response_bytes: Dict = field(default_factory=dict)
    sessions_requests_count: Dict = field(default_factory=dict)
    proxies: List[str | Dict | Tuple] = field(default_factory=list)
    log_levels_counter: Dict = field(default_factory=dict)

Detailed Stats

stats = result.stats

# Status code distribution
print(stats.response_status_count)
# {'status_200': 150, 'status_404': 3, 'status_403': 1}

# Bytes downloaded per domain
print(stats.domains_response_bytes)
# {'example.com': 1234567, 'api.example.com': 45678}

# Requests per session
print(stats.sessions_requests_count)
# {'http': 120, 'stealth': 34}

# Proxies used during the crawl
print(stats.proxies)
# ['http://proxy1:8080', 'http://proxy2:8080']

# Log level counts
print(stats.log_levels_counter)
# {'debug': 200, 'info': 50, 'warning': 3, 'error': 1, 'critical': 0}

# Timing information
print(stats.start_time)       # Unix timestamp when crawl started
print(stats.end_time)         # Unix timestamp when crawl finished
print(stats.download_delay)   # The download delay used (seconds)

# Concurrency settings used
print(stats.concurrent_requests)             # Global concurrency limit
print(stats.concurrent_requests_per_domain)  # Per-domain concurrency limit

# Custom stats (set by your spider code)
print(stats.custom_stats)
# {'login_attempts': 3, 'pages_with_errors': 5}

# Export everything as a dict
print(stats.to_dict())

Logging

The spider has a built-in logger accessible via self.logger. It’s pre-configured with the spider’s name and supports several customization options:
AttributeDefaultDescription
logging_levellogging.DEBUGMinimum log level
logging_format"[%(asctime)s]:({spider_name}) %(levelname)s: %(message)s"Log message format
logging_date_format"%Y-%m-%d %H:%M:%S"Date format in log messages
log_fileNonePath to a log file (in addition to console output)
import logging

class MySpider(Spider):
    name = "my_spider"
    start_urls = ["https://example.com"]
    logging_level = logging.INFO
    log_file = "logs/my_spider.log"

    async def parse(self, response: Response):
        self.logger.info(f"Processing {response.url}")
        yield {"title": response.css("title::text").get("")}

Logger Initialization

The logger is initialized in the Spider’s __init__ method:
spider.py:101-122
self.logger = logging.getLogger(f"scrapling.spiders.{self.name}")
self.logger.setLevel(self.logging_level)
self.logger.handlers.clear()
self.logger.propagate = False  # Don't propagate to parent 'scrapling' logger

formatter = logging.Formatter(
    fmt=self.logging_format.format(spider_name=self.name), datefmt=self.logging_date_format
)

# Add a log counter handler to track log counts by level
self._log_counter = LogCounterHandler()
self.logger.addHandler(self._log_counter)

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)

if self.log_file:
    Path(self.log_file).parent.mkdir(parents=True, exist_ok=True)
    file_handler = logging.FileHandler(self.log_file)
    file_handler.setFormatter(formatter)
    self.logger.addHandler(file_handler)
The log file directory is created automatically if it doesn’t exist. Both console and file output use the same format.

Build docs developers (and LLMs) love