Documentation Index
Fetch the complete documentation index at: https://mintlify.com/D4Vinci/Scrapling/llms.txt
Use this file to discover all available pages before exploring further.
Prerequisites
- You’ve read the Getting started page and know how to create and run a basic spider.
Scrapling’s checkpoint system allows you to pause long-running crawls and resume them later from exactly where they left off. This is invaluable for large-scale crawls, debugging, or when resources need to be freed up temporarily.
Quick Start
Enable checkpointing by passing a crawldir to your spider:
from scrapling.spiders import Spider, Response
class QuotesSpider(Spider):
name = "quotes"
start_urls = ["https://quotes.toscrape.com"]
async def parse(self, response: Response):
for quote in response.css("div.quote"):
yield {
"text": quote.css("span.text::text").get(""),
"author": quote.css("small.author::text").get(""),
}
next_page = response.css("li.next a::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
# Enable checkpointing
spider = QuotesSpider(crawldir="./crawl_data")
result = spider.start()
if result.paused:
print("Crawl was paused. Run again to resume.")
else:
print(f"Crawl completed! Scraped {len(result.items)} items.")
Press Ctrl+C during the crawl to pause. Run the same code again to resume from where it stopped.
How It Works
The checkpoint system saves the spider’s state to disk at regular intervals and when you pause the crawl. The state includes:
- Pending requests — All requests that haven’t been processed yet (in the scheduler’s priority queue)
- Seen URLs — A set of request fingerprints to prevent duplicate crawling
Checkpoint Lifecycle
-
Initial crawl: Spider starts normally, generating requests from
start_urls
-
Periodic saves: Every 5 minutes (configurable), the checkpoint is saved to
crawldir/checkpoint.pkl
-
Graceful pause: Press
Ctrl+C once. The spider:
- Stops accepting new requests from the scheduler
- Waits for all in-flight requests to complete
- Saves a final checkpoint
- Exits cleanly
-
Force stop: Press
Ctrl+C again to stop immediately without waiting
-
Resume: Run the spider again with the same
crawldir. It:
- Detects the existing checkpoint file
- Restores the pending requests and seen URLs
- Skips
start_requests() (since we already have requests queued)
- Continues crawling from where it left off
-
Completion: When the crawl finishes normally (scheduler empty, no active tasks), checkpoint files are deleted automatically
Checkpoint Configuration
Setting the Save Interval
By default, checkpoints are saved every 5 minutes (300 seconds). You can customize this:
# Save checkpoint every 2 minutes
spider = QuotesSpider(crawldir="./crawl_data", interval=120.0)
# Save checkpoint every 30 seconds (for testing)
spider = QuotesSpider(crawldir="./crawl_data", interval=30.0)
# Disable periodic saves (only save on pause)
spider = QuotesSpider(crawldir="./crawl_data", interval=0)
The interval parameter is in seconds.
Checkpoint Storage
Checkpoints are stored in the directory specified by crawldir:
crawl_data/
└── checkpoint.pkl # Serialized checkpoint data
The checkpoint file uses Python’s pickle format with the highest protocol for efficiency.
Checkpoint Implementation
The checkpoint system is implemented in checkpoint.py:
@dataclass
class CheckpointData:
"""Container for checkpoint state."""
requests: List["Request"] = field(default_factory=list)
seen: Set[bytes] = field(default_factory=set)
Atomic Saves
Checkpoint writes are atomic to prevent corruption if the process is killed during a save:
async def save(self, data: CheckpointData) -> None:
"""Save checkpoint data to disk atomically."""
await self.crawldir.mkdir(parents=True, exist_ok=True)
temp_path = self._checkpoint_path.with_suffix(".tmp")
try:
serialized = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
async with await anyio.open_file(temp_path, "wb") as f:
await f.write(serialized)
await temp_path.rename(self._checkpoint_path)
log.info(f"Checkpoint saved: {len(data.requests)} requests, {len(data.seen)} seen URLs")
except Exception as e:
# Clean up temp file if it exists
if await temp_path.exists():
await temp_path.unlink()
log.error(f"Failed to save checkpoint: {e}")
raise
The system writes to a temporary file (.tmp) first, then atomically renames it to the final checkpoint file. This ensures the checkpoint is always in a valid state.
Loading Checkpoints
async def load(self) -> Optional[CheckpointData]:
"""Load checkpoint data from disk.
Returns None if no checkpoint exists or if loading fails.
"""
if not await self.has_checkpoint():
return None
try:
async with await anyio.open_file(self._checkpoint_path, "rb") as f:
content = await f.read()
data: CheckpointData = pickle.loads(content)
log.info(f"Checkpoint loaded: {len(data.requests)} requests, {len(data.seen)} seen URLs")
return data
except Exception as e:
log.error(f"Failed to load checkpoint (starting fresh): {e}")
return None
If loading fails for any reason (corrupted file, version mismatch, etc.), the spider starts fresh rather than crashing.
Engine Integration
The crawler engine manages the checkpoint lifecycle:
Checking for Checkpoints
async def crawl(self) -> CrawlStats:
"""Run the spider and return CrawlStats."""
self._running = True
self._items.clear()
self.paused = False
self._pause_requested = False
self._force_stop = False
self.stats = CrawlStats(start_time=anyio.current_time())
# Check for existing checkpoint
resuming = (await self._restore_from_checkpoint()) if self._checkpoint_system_enabled else False
self._last_checkpoint_time = anyio.current_time()
Restoring from Checkpoint
async def _restore_from_checkpoint(self) -> bool:
"""Attempt to restore state from checkpoint.
Returns True if successfully restored, False otherwise.
"""
if not self._checkpoint_system_enabled:
raise
data = await self._checkpoint_manager.load()
if data is None:
return False
self.scheduler.restore(data)
# Restore callbacks from spider after scheduler restore
for request in data.requests:
request._restore_callback(self.spider)
return True
Callbacks are restored after unpickling because they can’t be pickled directly:
def _restore_callback(self, spider: "Spider") -> None:
"""Restore callback from spider after unpickling.
:param spider: Spider instance to look up callback method on
"""
if hasattr(self, "_callback_name") and self._callback_name:
self.callback = getattr(spider, self._callback_name, None) or spider.parse
del self._callback_name
elif hasattr(self, "_callback_name"):
del self._callback_name
Periodic Checkpoint Saves
def _is_checkpoint_time(self) -> bool:
"""Check if it's time for the periodic checkpoint."""
if not self._checkpoint_system_enabled:
return False
if self._checkpoint_manager.interval == 0:
return False
current_time = anyio.current_time()
return (current_time - self._last_checkpoint_time) >= self._checkpoint_manager.interval
During the crawl loop:
if self._checkpoint_system_enabled and self._is_checkpoint_time():
await self._save_checkpoint()
Graceful Pause Handling
if self._pause_requested:
if self._active_tasks == 0 or self._force_stop:
if self._force_stop:
log.warning(f"Force stopping with {self._active_tasks} active tasks")
tg.cancel_scope.cancel()
# Only save checkpoint if checkpoint system is enabled
if self._checkpoint_system_enabled:
await self._save_checkpoint()
self.paused = True
log.info("Spider paused, checkpoint saved")
else:
log.info("Spider stopped gracefully")
self._running = False
break
# Wait briefly and check again
await anyio.sleep(0.05)
continue
Scheduler State Management
The scheduler implements snapshot() and restore() for checkpointing:
def snapshot(self) -> Tuple[List[Request], Set[bytes]]:
"""Create a snapshot of the current state for checkpoints."""
sorted_items = sorted(self._pending.values(), key=lambda x: (x[0], x[1])) # Maintain queue order
requests = [item[2] for item in sorted_items]
return requests, self._seen.copy()
def restore(self, data: "CheckpointData") -> None:
"""Restore scheduler state from checkpoint data.
:param data: CheckpointData containing requests and seen set
"""
self._seen = data.seen.copy()
# Restore pending requests in order (they're already sorted by priority)
for request in data.requests:
counter = next(self._counter)
item = (-request.priority, counter, request)
self._pending[counter] = item
self._queue.put_nowait(item)
log.info(f"Scheduler restored: {len(data.requests)} requests, {len(data.seen)} seen")
Detecting Resume in Your Spider
The on_start() hook receives a resuming flag so you can perform different initialization logic:
class MySpider(Spider):
name = "my_spider"
start_urls = ["https://example.com"]
async def on_start(self, resuming: bool = False):
if resuming:
self.logger.info("Resuming from checkpoint!")
# Maybe skip some initialization that was already done
else:
self.logger.info("Starting fresh crawl")
# Full initialization
async def parse(self, response: Response):
yield {"title": response.css("title::text").get("")}
Best Practices
Use Unique crawldir per Spider
# Good - each spider has its own checkpoint directory
QuotesSpider(crawldir="./crawls/quotes").start()
ProductsSpider(crawldir="./crawls/products").start()
# Bad - different spiders share the same directory
QuotesSpider(crawldir="./crawls").start()
ProductsSpider(crawldir="./crawls").start() # Will load quotes checkpoint!
Adjust Interval Based on Crawl Size
# Short crawl - save every minute
SmallSpider(crawldir="./data", interval=60.0).start()
# Large crawl - save every 10 minutes to reduce I/O
LargeSpider(crawldir="./data", interval=600.0).start()
# Very long crawl - save every hour
MassiveSpider(crawldir="./data", interval=3600.0).start()
Handle Checkpoint Failures Gracefully
If a checkpoint load fails (corrupted file, version incompatibility), the spider starts fresh. To handle this explicitly:
class MySpider(Spider):
name = "my_spider"
start_urls = ["https://example.com"]
async def on_start(self, resuming: bool = False):
if not resuming and (Path(self.crawldir) / "checkpoint.pkl").exists():
self.logger.warning("Checkpoint exists but failed to load - starting fresh")
async def parse(self, response: Response):
yield {"title": response.css("title::text").get("")}
spider = MySpider(crawldir="./data")
result = spider.start()
Clean Up After Completion
Checkpoints are automatically deleted when a crawl completes successfully:
finally:
await self.spider.on_close()
# Clean up checkpoint files on successful completion (not paused)
if not self.paused and self._checkpoint_system_enabled:
await self._checkpoint_manager.cleanup()
But if you want to manually clear checkpoints:
from pathlib import Path
# Delete checkpoint to force fresh start
checkpoint_file = Path("./crawl_data/checkpoint.pkl")
if checkpoint_file.exists():
checkpoint_file.unlink()
spider = MySpider(crawldir="./crawl_data")
result = spider.start()
Troubleshooting
Checkpoint Not Loading
Symptoms: Spider starts from scratch even though checkpoint exists
Possible causes:
- Wrong
crawldir path
- Corrupted checkpoint file
- Pickle version mismatch (Python version changed)
- Spider code changed significantly (callback names changed)
Solutions:
import logging
class MySpider(Spider):
name = "my_spider"
logging_level = logging.DEBUG # Enable debug logs to see checkpoint loading
start_urls = ["https://example.com"]
async def parse(self, response: Response):
yield {"title": response.css("title::text").get("")}
Checkpoint Too Large
Symptoms: Slow checkpoint saves, large disk usage
Causes: Very large crawl with millions of URLs
Solutions:
-
Increase save interval to reduce I/O:
spider = MySpider(crawldir="./data", interval=1800.0) # 30 minutes
-
Use
allowed_domains to limit scope:
class MySpider(Spider):
allowed_domains = {"example.com"}
-
Increase
download_delay to crawl slower and accumulate fewer pending requests:
class MySpider(Spider):
download_delay = 1.0
Memory Issues After Resume
Symptoms: High memory usage after resuming
Cause: Large number of pending requests loaded into memory
Solution: The scheduler uses an asyncio.PriorityQueue which is memory-efficient, but if you have millions of pending requests, consider splitting your crawl into smaller jobs with different start_urls.
Example: Long-Running Crawler with Checkpointing
import logging
from scrapling.spiders import Spider, Response
class NewsSpider(Spider):
name = "news"
start_urls = ["https://news.ycombinator.com/"]
# Crawl settings
concurrent_requests = 10
download_delay = 0.5
logging_level = logging.INFO
log_file = "logs/news_crawler.log"
async def on_start(self, resuming: bool = False):
if resuming:
self.logger.info("Resuming news crawl from checkpoint")
else:
self.logger.info("Starting fresh news crawl")
async def parse(self, response: Response):
# Extract articles
for item in response.css(".athing"):
title = item.css(".titleline a::text").get("")
link = item.css(".titleline a::attr(href)").get("")
if title and link:
yield {"title": title, "url": link}
# Follow pagination
next_page = response.css(".morelink::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
async def on_close(self):
self.logger.info(f"Crawl finished. Scraped {self.stats.items_scraped} articles")
# Run with checkpointing - save every 2 minutes
spider = NewsSpider(crawldir="./crawls/news", interval=120.0)
result = spider.start()
if result.paused:
print("Crawl paused. Run again to resume.")
else:
print(f"Completed! Scraped {len(result.items)} articles")
result.items.to_json("news_articles.json", indent=True)
Press Ctrl+C during the crawl to pause. Run the same script again to resume from where it stopped.