Prerequisites
- You’ve read the Getting started page and know how to create and run a basic spider.
This page covers the spider system’s advanced features: concurrency control, pause/resume, streaming, lifecycle hooks, statistics, and logging.
Concurrency Control
The spider system uses three class attributes to control how aggressively it crawls:
| Attribute | Default | Description |
|---|
concurrent_requests | 4 | Maximum number of requests being processed at the same time |
concurrent_requests_per_domain | 0 | Maximum concurrent requests per domain (0 = no per-domain limit) |
download_delay | 0.0 | Seconds to wait before each request |
class PoliteSpider(Spider):
name = "polite"
start_urls = ["https://example.com"]
# Be gentle with the server
concurrent_requests = 4
concurrent_requests_per_domain = 2
download_delay = 1.0 # Wait 1 second between requests
async def parse(self, response: Response):
yield {"title": response.css("title::text").get("")}
When concurrent_requests_per_domain is set, each domain gets its own concurrency limiter in addition to the global limit. This is useful when crawling multiple domains simultaneously — you can allow high global concurrency while being polite to each individual domain.
Rate Limiting Implementation
The rate limiting logic is implemented in the CrawlerEngine:
def _rate_limiter(self, domain: str) -> CapacityLimiter:
"""Get or create a per-domain concurrency limiter if enabled, otherwise use the global limiter."""
if self.spider.concurrent_requests_per_domain:
if domain not in self._domain_limiters:
self._domain_limiters[domain] = CapacityLimiter(self.spider.concurrent_requests_per_domain)
return self._domain_limiters[domain]
return self._global_limiter
And used during request processing:
async def _process_request(self, request: Request) -> None:
"""Download and process a single request."""
async with self._rate_limiter(request.domain):
if self.spider.download_delay:
await anyio.sleep(self.spider.download_delay)
The download_delay parameter adds a fixed wait before every request, regardless of the domain. Use it for simple rate limiting.
Using uvloop
The start() method accepts a use_uvloop parameter to use the faster uvloop/winloop event loop implementation, if available:
result = MySpider().start(use_uvloop=True)
This can improve throughput for I/O-heavy crawls. You’ll need to install uvloop (Linux/macOS) or winloop (Windows) separately.
Pause & Resume
The spider supports graceful pause-and-resume via checkpointing. To enable it, pass a crawldir directory to the spider constructor:
spider = MySpider(crawldir="crawl_data/my_spider")
result = spider.start()
if result.paused:
print("Crawl was paused. Run again to resume.")
else:
print("Crawl completed!")
How It Works
-
Pausing: Press
Ctrl+C during a crawl. The spider waits for all in-flight requests to finish, saves a checkpoint (pending requests + a set of seen request fingerprints), and then exits.
-
Force stopping: Press
Ctrl+C a second time to stop immediately without waiting for active tasks.
-
Resuming: Run the spider again with the same
crawldir. It detects the checkpoint, restores the queue and seen set, and continues from where it left off — skipping start_requests().
-
Cleanup: When a crawl completes normally (not paused), the checkpoint files are deleted automatically.
Checkpoints are also saved periodically during the crawl (every 5 minutes by default).
You can change the interval as follows:
# Save checkpoint every 2 minutes
spider = MySpider(crawldir="crawl_data/my_spider", interval=120.0)
The writing to disk is atomic, so it’s totally safe.
Checkpoint Implementation
The pause handling logic is implemented in the engine:
def request_pause(self) -> None:
"""Request a graceful pause of the crawl.
First call: requests graceful pause (waits for active tasks).
Second call: forces immediate stop.
"""
if self._force_stop:
return # Already forcing stop
if self._pause_requested:
# Second Ctrl+C - force stop
self._force_stop = True
log.warning("Force stop requested, cancelling immediately...")
else:
self._pause_requested = True
log.info(
"Pause requested, waiting for in-flight requests to complete (press Ctrl+C again to force stop)..."
)
Checkpoint saving:
async def _save_checkpoint(self) -> None:
"""Save current state to checkpoint files."""
requests, seen = self.scheduler.snapshot()
data = CheckpointData(requests=requests, seen=seen)
await self._checkpoint_manager.save(data)
self._last_checkpoint_time = anyio.current_time()
Pressing Ctrl+C during a crawl always causes the spider to close gracefully, even if the checkpoint system is not enabled. Doing it again without waiting forces the spider to close immediately.
Knowing If You’re Resuming
The on_start() hook receives a resuming flag:
async def on_start(self, resuming: bool = False):
if resuming:
self.logger.info("Resuming from checkpoint!")
else:
self.logger.info("Starting fresh crawl")
Streaming
For long-running spiders or applications that need real-time access to scraped items, use the stream() method instead of start():
import anyio
async def main():
spider = MySpider()
async for item in spider.stream():
print(f"Got item: {item}")
# Access real-time stats
print(f"Items so far: {spider.stats.items_scraped}")
print(f"Requests made: {spider.stats.requests_count}")
anyio.run(main)
Key differences from start():
stream() must be called from an async context
- Items are yielded one by one as they’re scraped, not collected into a list
- You can access
spider.stats during iteration for real-time statistics
Streaming Implementation
The streaming logic uses memory channels:
def __aiter__(self) -> AsyncGenerator[dict, None]:
return self._stream()
async def _stream(self) -> AsyncGenerator[dict, None]:
"""Async generator that runs crawl and yields items."""
send, recv = create_memory_object_stream[dict](100)
self._item_stream = send
async def run():
try:
await self.crawl()
finally:
await send.aclose()
async with create_task_group() as tg:
tg.start_soon(run)
try:
async for item in recv:
yield item
except EndOfStream:
pass
You can use it with the checkpoint system too, making it easy to build UIs on top of spiders with real-time data that can be paused/resumed:
import anyio
async def main():
spider = MySpider(crawldir="crawl_data/my_spider")
async for item in spider.stream():
print(f"Got item: {item}")
# Access real-time stats
print(f"Items so far: {spider.stats.items_scraped}")
print(f"Requests made: {spider.stats.requests_count}")
anyio.run(main)
You can also use spider.pause() to shut down the spider programmatically. If you use it without enabling the checkpoint system, it will just close the crawl.
Lifecycle Hooks
The spider provides several hooks you can override to add custom behavior at different stages of the crawl:
on_start
Called before crawling begins. Use it for setup tasks like loading data or initializing resources:
async def on_start(self, resuming: bool = False) -> None:
"""Called before crawling starts. Override for setup logic.
:param resuming: It's enabled if the spider is resuming from a checkpoint, left for the user to use.
"""
if resuming:
self.logger.debug("Resuming spider from checkpoint")
else:
self.logger.debug("Starting spider")
on_close
Called after crawling finishes (whether completed or paused). Use it for cleanup:
async def on_close(self) -> None:
"""Called after crawling finishes. Override for cleanup logic."""
self.logger.debug("Spider closed")
on_error
Called when a request fails with an exception. Use it for error tracking or custom recovery logic:
async def on_error(self, request: Request, error: Exception) -> None:
"""
Handle request errors for all spider requests.
Override for custom error handling.
"""
pass
on_scraped_item
Called for every scraped item before it’s added to the results. Return the item (modified or not) to keep it, or return None to drop it:
async def on_scraped_item(self, item: Dict[str, Any]) -> Dict[str, Any] | None:
"""A hook to be overridden by users to do some processing on scraped items, return `None` to drop the item silently."""
return item
Example usage:
async def on_scraped_item(self, item: dict) -> dict | None:
# Drop items without a title
if not item.get("title"):
return None
# Modify items (e.g., add timestamps)
item["scraped_at"] = "2026-01-01"
return item
This hook can also be used to direct items through your own pipelines and drop them from the spider.
start_requests
Override start_requests() for custom initial request generation instead of using start_urls:
async def start_requests(self) -> AsyncGenerator[Request, None]:
"""Generate initial requests to start the crawl.
By default, this generates Request objects for each URL in `start_urls`
using the session manager's default session and `parse()` as callback.
Override this method for more control over initial requests
(e.g., to add custom headers, use different callbacks, etc.)
"""
if not self.start_urls:
raise RuntimeError(
"Spider has no starting point, either set `start_urls` or override `start_requests` function."
)
for url in self.start_urls:
yield Request(url, sid=self._session_manager.default_session_id)
Example with custom login:
async def start_requests(self):
# POST request to log in first
yield Request(
"https://example.com/login",
method="POST",
data={"user": "admin", "pass": "secret"},
callback=self.after_login,
)
async def after_login(self, response: Response):
# Now crawl the authenticated pages
yield response.follow("/dashboard", callback=self.parse)
Results & Statistics
The CrawlResult returned by start() contains both the scraped items and detailed statistics:
result = MySpider().start()
# Items
print(f"Total items: {len(result.items)}")
result.items.to_json("output.json", indent=True)
# Did the crawl complete?
print(f"Completed: {result.completed}")
print(f"Paused: {result.paused}")
# Statistics
stats = result.stats
print(f"Requests: {stats.requests_count}")
print(f"Failed: {stats.failed_requests_count}")
print(f"Blocked: {stats.blocked_requests_count}")
print(f"Offsite filtered: {stats.offsite_requests_count}")
print(f"Items scraped: {stats.items_scraped}")
print(f"Items dropped: {stats.items_dropped}")
print(f"Response bytes: {stats.response_bytes}")
print(f"Duration: {stats.elapsed_seconds:.1f}s")
print(f"Speed: {stats.requests_per_second:.1f} req/s")
CrawlStats Details
The CrawlStats dataclass tracks comprehensive information:
@dataclass
class CrawlStats:
"""Statistics for a crawl run."""
requests_count: int = 0
concurrent_requests: int = 0
concurrent_requests_per_domain: int = 0
failed_requests_count: int = 0
offsite_requests_count: int = 0
response_bytes: int = 0
items_scraped: int = 0
items_dropped: int = 0
start_time: float = 0.0
end_time: float = 0.0
download_delay: float = 0.0
blocked_requests_count: int = 0
custom_stats: Dict = field(default_factory=dict)
response_status_count: Dict = field(default_factory=dict)
domains_response_bytes: Dict = field(default_factory=dict)
sessions_requests_count: Dict = field(default_factory=dict)
proxies: List[str | Dict | Tuple] = field(default_factory=list)
log_levels_counter: Dict = field(default_factory=dict)
Detailed Stats
stats = result.stats
# Status code distribution
print(stats.response_status_count)
# {'status_200': 150, 'status_404': 3, 'status_403': 1}
# Bytes downloaded per domain
print(stats.domains_response_bytes)
# {'example.com': 1234567, 'api.example.com': 45678}
# Requests per session
print(stats.sessions_requests_count)
# {'http': 120, 'stealth': 34}
# Proxies used during the crawl
print(stats.proxies)
# ['http://proxy1:8080', 'http://proxy2:8080']
# Log level counts
print(stats.log_levels_counter)
# {'debug': 200, 'info': 50, 'warning': 3, 'error': 1, 'critical': 0}
# Timing information
print(stats.start_time) # Unix timestamp when crawl started
print(stats.end_time) # Unix timestamp when crawl finished
print(stats.download_delay) # The download delay used (seconds)
# Concurrency settings used
print(stats.concurrent_requests) # Global concurrency limit
print(stats.concurrent_requests_per_domain) # Per-domain concurrency limit
# Custom stats (set by your spider code)
print(stats.custom_stats)
# {'login_attempts': 3, 'pages_with_errors': 5}
# Export everything as a dict
print(stats.to_dict())
Logging
The spider has a built-in logger accessible via self.logger. It’s pre-configured with the spider’s name and supports several customization options:
| Attribute | Default | Description |
|---|
logging_level | logging.DEBUG | Minimum log level |
logging_format | "[%(asctime)s]:({spider_name}) %(levelname)s: %(message)s" | Log message format |
logging_date_format | "%Y-%m-%d %H:%M:%S" | Date format in log messages |
log_file | None | Path to a log file (in addition to console output) |
import logging
class MySpider(Spider):
name = "my_spider"
start_urls = ["https://example.com"]
logging_level = logging.INFO
log_file = "logs/my_spider.log"
async def parse(self, response: Response):
self.logger.info(f"Processing {response.url}")
yield {"title": response.css("title::text").get("")}
Logger Initialization
The logger is initialized in the Spider’s __init__ method:
self.logger = logging.getLogger(f"scrapling.spiders.{self.name}")
self.logger.setLevel(self.logging_level)
self.logger.handlers.clear()
self.logger.propagate = False # Don't propagate to parent 'scrapling' logger
formatter = logging.Formatter(
fmt=self.logging_format.format(spider_name=self.name), datefmt=self.logging_date_format
)
# Add a log counter handler to track log counts by level
self._log_counter = LogCounterHandler()
self.logger.addHandler(self._log_counter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
if self.log_file:
Path(self.log_file).parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(self.log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
The log file directory is created automatically if it doesn’t exist. Both console and file output use the same format.