Example 1: E-commerce Product Scraper
Scrape product listings with pagination, handle missing data, and export to structured format.ecommerce_spider.py
from scrapling.spiders import Spider, Response
from scrapling.fetchers import FetcherSession
import logging
from typing import Dict, Any
class EcommerceSpider(Spider):
"""Scrape products from an e-commerce site."""
name = "ecommerce"
start_urls = ["https://example-shop.com/products"]
allowed_domains = {"example-shop.com"}
# Configuration
concurrent_requests = 5
download_delay = 1.0 # Be respectful
logging_level = logging.INFO
def configure_sessions(self, manager):
# Use browser impersonation for better success rate
manager.add("default", FetcherSession(
impersonate="chrome",
stealthy_headers=True
))
async def parse(self, response: Response):
"""Parse product listing pages."""
# Extract products
for product in response.css('.product-card'):
# Get product URL
product_url = product.css('a.product-link::attr(href)').get()
if product_url:
yield response.follow(
product_url,
callback=self.parse_product
)
# Handle pagination
next_page = response.css('a.next-page::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
async def parse_product(self, response: Response):
"""Extract detailed product information."""
# Extract price (handle different formats)
price_text = response.css('.price::text').get() or ""
price = self.extract_price(price_text)
# Extract availability
in_stock = bool(response.css('.in-stock').get())
# Extract images
images = response.css('.product-images img::attr(src)').getall()
# Extract specs from table
specs = {}
for row in response.css('.specs-table tr'):
key = row.css('th::text').get()
value = row.css('td::text').get()
if key and value:
specs[key.strip()] = value.strip()
yield {
'url': response.url,
'title': response.css('h1.product-title::text').get(),
'price': price,
'currency': 'USD',
'in_stock': in_stock,
'description': response.css('.description::text').get(),
'images': images,
'specifications': specs,
'brand': response.css('.brand::text').get(),
'category': response.css('.breadcrumb a::text').getall(),
'rating': response.css('.rating::attr(data-rating)').get(),
'reviews_count': self.extract_number(
response.css('.reviews-count::text').get()
),
}
@staticmethod
def extract_price(price_text: str) -> float | None:
"""Extract numeric price from text."""
if not price_text:
return None
# Remove currency symbols and commas
import re
numbers = re.findall(r'[\d.]+', price_text.replace(',', ''))
return float(numbers[0]) if numbers else None
@staticmethod
def extract_number(text: str) -> int | None:
"""Extract integer from text."""
if not text:
return None
import re
numbers = re.findall(r'\d+', text.replace(',', ''))
return int(numbers[0]) if numbers else None
async def on_scraped_item(self, item: Dict[str, Any]):
"""Validate and clean items."""
# Drop items without required fields
if not item.get('title') or not item.get('url'):
self.logger.warning(f"Dropping item without title/url: {item.get('url')}")
return None
# Clean title
if item.get('title'):
item['title'] = item['title'].strip()
return item
def main():
spider = EcommerceSpider(crawldir="./crawl_data")
result = spider.start()
if result.items:
# Export to JSON
result.items.to_json("products.json", indent=True)
print(f"Exported {len(result.items)} products")
# Calculate statistics
in_stock = sum(1 for item in result.items if item.get('in_stock'))
avg_price = sum(item['price'] for item in result.items if item.get('price')) / len(result.items)
print(f"In stock: {in_stock}/{len(result.items)}")
print(f"Average price: ${avg_price:.2f}")
if __name__ == "__main__":
main()
Example 2: News Article Scraper with Category Navigation
Scrape articles from multiple categories, extract metadata, and handle dynamic content.news_spider.py
from scrapling.spiders import Spider, Request, Response
from scrapling.fetchers import FetcherSession
import logging
from datetime import datetime
class NewsSpider(Spider):
"""Scrape news articles from multiple categories."""
name = "news"
start_urls = [
"https://example-news.com/technology",
"https://example-news.com/business",
"https://example-news.com/science",
]
allowed_domains = {"example-news.com"}
concurrent_requests = 10
download_delay = 0.5
logging_level = logging.INFO
log_file = "news_spider.log"
def configure_sessions(self, manager):
manager.add("default", FetcherSession(
impersonate="chrome",
stealthy_headers=True,
http3=True # Use HTTP/3 for better performance
))
async def parse(self, response: Response):
"""Parse category pages."""
# Extract category from URL
category = response.url.rstrip('/').split('/')[-1]
# Extract article links
for article in response.css('.article-preview'):
article_url = article.css('a.article-link::attr(href)').get()
if article_url:
yield response.follow(
article_url,
callback=self.parse_article,
meta={'category': category}
)
# Pagination
next_page = response.css('a.pagination-next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
async def parse_article(self, response: Response):
"""Extract article content and metadata."""
# Get category from meta
category = response.request.meta.get('category', 'unknown')
# Extract publication date
date_text = response.css('time::attr(datetime)').get()
published_date = self.parse_date(date_text)
# Extract article content (combine all paragraphs)
paragraphs = response.css('.article-body p::text').getall()
content = '\n\n'.join(p.strip() for p in paragraphs if p.strip())
# Extract author info
author_name = response.css('.author-name::text').get()
author_url = response.css('.author-link::attr(href)').get()
# Extract tags/topics
tags = response.css('.article-tags a::text').getall()
yield {
'url': response.url,
'title': response.css('h1.article-title::text').get(),
'subtitle': response.css('.article-subtitle::text').get(),
'author': {
'name': author_name,
'url': response.urljoin(author_url) if author_url else None,
},
'published_date': published_date,
'category': category,
'tags': tags,
'content': content,
'word_count': len(content.split()) if content else 0,
'image': response.css('.article-hero-image::attr(src)').get(),
'scraped_at': datetime.utcnow().isoformat(),
}
@staticmethod
def parse_date(date_text: str) -> str | None:
"""Parse date from various formats."""
if not date_text:
return None
try:
# Try ISO format first
dt = datetime.fromisoformat(date_text.replace('Z', '+00:00'))
return dt.isoformat()
except:
return date_text # Return as-is if parsing fails
async def on_scraped_item(self, item):
"""Filter and clean articles."""
# Drop items without content
if not item.get('content') or len(item['content']) < 100:
self.logger.warning(f"Dropping article with short content: {item.get('url')}")
return None
return item
def main():
spider = NewsSpider(crawldir="./news_crawl")
result = spider.start()
if result.items:
# Export all articles
result.items.to_jsonl("articles.jsonl")
# Group by category
by_category = {}
for item in result.items:
cat = item.get('category', 'unknown')
by_category.setdefault(cat, []).append(item)
print("\nArticles by category:")
for cat, articles in by_category.items():
print(f" {cat}: {len(articles)} articles")
# Export category-specific file
from scrapling.spiders.result import ItemList
ItemList(articles).to_json(f"articles_{cat}.json", indent=True)
if __name__ == "__main__":
main()
Example 3: Job Listings Scraper with Stealth Mode
Scrape job listings from a protected site using stealth browser sessions.jobs_spider.py
from scrapling.spiders import Spider, Request, Response
from scrapling.fetchers import FetcherSession, AsyncStealthySession
import logging
class JobsSpider(Spider):
"""Scrape job listings with stealth browser for protected pages."""
name = "jobs"
start_urls = ["https://example-jobs.com/search?q=python"]
allowed_domains = {"example-jobs.com"}
concurrent_requests = 3 # Lower for browser sessions
download_delay = 2.0 # Be extra respectful with browsers
logging_level = logging.INFO
def configure_sessions(self, manager):
# Fast HTTP for listing pages
manager.add("fast", FetcherSession(impersonate="chrome"))
# Stealth browser for detail pages (often protected)
manager.add("stealth", AsyncStealthySession(
headless=True,
disable_resources=True, # Block images/fonts for speed
network_idle=True, # Wait for network to be idle
), lazy=True) # Only start when needed
async def parse(self, response: Response):
"""Parse job listing pages."""
# Extract job links
for job_card in response.css('.job-card'):
job_url = job_card.css('a.job-title::attr(href)').get()
if job_url:
# Use stealth session for detail pages
yield Request(
response.urljoin(job_url),
sid="stealth", # Route to stealth session
callback=self.parse_job
)
# Pagination (use fast session)
next_page = response.css('a[rel="next"]::attr(href)').get()
if next_page:
yield Request(
response.urljoin(next_page),
sid="fast",
callback=self.parse
)
async def parse_job(self, response: Response):
"""Extract detailed job information."""
# Extract salary range
salary_text = response.css('.salary::text').get() or ""
salary_range = self.parse_salary(salary_text)
# Extract requirements (bullet points)
requirements = response.css('.requirements li::text').getall()
# Extract benefits
benefits = response.css('.benefits li::text').getall()
# Extract location
location = response.css('.location::text').get()
remote = 'remote' in location.lower() if location else False
yield {
'url': response.url,
'title': response.css('h1.job-title::text').get(),
'company': response.css('.company-name::text').get(),
'location': location,
'remote': remote,
'salary_range': salary_range,
'job_type': response.css('.job-type::text').get(),
'experience_level': response.css('.experience-level::text').get(),
'description': response.css('.job-description').get_all_text(strip=True),
'requirements': [r.strip() for r in requirements],
'benefits': [b.strip() for b in benefits],
'posted_date': response.css('.posted-date::text').get(),
'apply_url': response.css('.apply-button::attr(href)').get(),
}
@staticmethod
def parse_salary(salary_text: str) -> dict | None:
"""Parse salary range from text."""
if not salary_text:
return None
import re
# Extract numbers (handles $50,000 - $70,000 or $50k-$70k)
numbers = re.findall(r'\$?([\d,]+)k?', salary_text.lower())
if len(numbers) >= 2:
min_sal = int(numbers[0].replace(',', '')) * (1000 if 'k' in salary_text.lower() else 1)
max_sal = int(numbers[1].replace(',', '')) * (1000 if 'k' in salary_text.lower() else 1)
return {'min': min_sal, 'max': max_sal, 'currency': 'USD'}
return None
async def on_scraped_item(self, item):
"""Filter jobs based on criteria."""
# Example: Only keep remote or Python-related jobs
title = (item.get('title') or '').lower()
description = (item.get('description') or '').lower()
if not (item.get('remote') or 'python' in title or 'python' in description):
return None # Drop non-relevant jobs
return item
def main():
spider = JobsSpider(crawldir="./jobs_crawl")
result = spider.start()
if result.items:
result.items.to_json("jobs.json", indent=True)
# Statistics
remote_jobs = sum(1 for job in result.items if job.get('remote'))
with_salary = sum(1 for job in result.items if job.get('salary_range'))
print(f"\nTotal jobs: {len(result.items)}")
print(f"Remote jobs: {remote_jobs}")
print(f"Jobs with salary: {with_salary}")
if __name__ == "__main__":
main()
Example 4: API + Web Scraping Hybrid
Combine API calls with web scraping for comprehensive data collection.hybrid_spider.py
from scrapling.spiders import Spider, Request, Response
from scrapling.fetchers import FetcherSession
import json
import logging
class HybridSpider(Spider):
"""Combine API calls with web scraping."""
name = "hybrid"
start_urls = ["https://api.example.com/products?page=1"]
allowed_domains = {"api.example.com", "example.com"}
concurrent_requests = 8
download_delay = 0.3
logging_level = logging.INFO
def configure_sessions(self, manager):
manager.add("default", FetcherSession(impersonate="chrome"))
async def parse(self, response: Response):
"""Parse API response."""
try:
data = response.json()
except:
self.logger.error(f"Failed to parse JSON from {response.url}")
return
# Process products from API
for product in data.get('products', []):
product_id = product.get('id')
# Scrape additional details from product page
product_url = f"https://example.com/products/{product_id}"
yield Request(
product_url,
callback=self.parse_product_page,
meta={'api_data': product}
)
# Handle API pagination
next_page = data.get('next_page')
if next_page:
yield Request(next_page, callback=self.parse)
async def parse_product_page(self, response: Response):
"""Scrape additional details from product page."""
# Get API data
api_data = response.request.meta.get('api_data', {})
# Extract additional info not in API
reviews = []
for review in response.css('.review'):
reviews.append({
'rating': review.css('.stars::attr(data-rating)').get(),
'text': review.css('.review-text::text').get(),
'author': review.css('.reviewer-name::text').get(),
})
# Combine API data with scraped data
yield {
# From API
'id': api_data.get('id'),
'name': api_data.get('name'),
'price': api_data.get('price'),
'category': api_data.get('category'),
# From web scraping
'reviews': reviews,
'average_rating': self.calculate_average_rating(reviews),
'detailed_description': response.css('.full-description').get_all_text(strip=True),
'image_gallery': response.css('.gallery img::attr(src)').getall(),
'related_products': response.css('.related a::attr(href)').getall(),
}
@staticmethod
def calculate_average_rating(reviews):
"""Calculate average rating from reviews."""
if not reviews:
return None
ratings = [float(r['rating']) for r in reviews if r.get('rating')]
return sum(ratings) / len(ratings) if ratings else None
def main():
result = HybridSpider().start()
if result.items:
result.items.to_json("products_detailed.json", indent=True)
print(f"Scraped {len(result.items)} products with reviews")
if __name__ == "__main__":
main()
Example 5: Streaming Spider with Real-Time Processing
Process items in real-time as they’re scraped.streaming_spider.py
import asyncio
from scrapling.spiders import Spider, Response
from scrapling.fetchers import FetcherSession
import logging
class StreamingSpider(Spider):
"""Spider that streams items for real-time processing."""
name = "streaming"
start_urls = ["https://example.com/feed"]
concurrent_requests = 10
logging_level = logging.INFO
async def parse(self, response: Response):
for item in response.css('.item'):
yield {
'title': item.css('h2::text').get(),
'url': item.css('a::attr(href)').get(),
'timestamp': item.css('.timestamp::text').get(),
}
async def process_in_realtime():
"""Process items as they arrive."""
spider = StreamingSpider()
# Initialize storage/database connection
items_processed = 0
async for item in spider.stream():
# Process each item immediately
print(f"Processing: {item.get('title')}")
# Example: Save to database
# await save_to_db(item)
# Example: Send to message queue
# await send_to_queue(item)
items_processed += 1
# Access real-time stats
if items_processed % 10 == 0:
stats = spider.stats
print(f"Progress: {stats.items_scraped} items, {stats.requests_count} requests")
print(f"\nFinished! Processed {items_processed} items")
if __name__ == "__main__":
asyncio.run(process_in_realtime())
Example 6: Proxy Rotation with Blocking Detection
Handle websites with anti-scraping measures.proxy_spider.py
from scrapling.spiders import Spider, Request, Response
from scrapling.fetchers import FetcherSession
from scrapling.engines.toolbelt import ProxyRotator
import logging
class ProxySpider(Spider):
"""Spider with proxy rotation and blocking detection."""
name = "proxy_spider"
start_urls = ["https://protected-site.com/data"]
# Retry blocked requests
max_blocked_retries = 5
concurrent_requests = 5
download_delay = 1.0
logging_level = logging.INFO
def configure_sessions(self, manager):
# List of proxies
proxies = [
'http://user:[email protected]:8000',
'http://user:[email protected]:8000',
'http://user:[email protected]:8000',
]
# Create rotating proxy
rotator = ProxyRotator(proxies, mode='cycle')
manager.add("default", FetcherSession(
impersonate="chrome",
proxy=rotator,
stealthy_headers=True,
))
async def is_blocked(self, response: Response) -> bool:
"""Detect if we're blocked."""
# Check status code
if response.status in {403, 429, 503}:
return True
# Check for CAPTCHA
if response.css('.captcha, #captcha').get():
self.logger.warning(f"CAPTCHA detected on {response.url}")
return True
# Check for common blocking messages
text = response.get_all_text(strip=True).lower()
blocking_phrases = ['access denied', 'blocked', 'rate limit']
if any(phrase in text for phrase in blocking_phrases):
return True
return False
async def retry_blocked_request(self, request: Request, response: Response) -> Request:
"""Prepare blocked request for retry."""
self.logger.warning(f"Request blocked: {request.url}, retrying...")
# Increase delay for retry
import asyncio
await asyncio.sleep(5)
return request
async def parse(self, response: Response):
# Extract data
for item in response.css('.data-item'):
yield {
'title': item.css('.title::text').get(),
'value': item.css('.value::text').get(),
}
# Pagination
next_page = response.css('a.next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def main():
result = ProxySpider(crawldir="./proxy_crawl").start()
print(f"\nResults:")
print(f" Items scraped: {result.stats.items_scraped}")
print(f" Blocked requests: {result.stats.blocked_requests}")
print(f" Blocked retries: {result.stats.blocked_retries}")
if result.items:
result.items.to_json("data.json", indent=True)
if __name__ == "__main__":
main()
Best Practices Summary
1. Respectful Scraping
class RespectfulSpider(Spider):
# Be gentle with servers
concurrent_requests = 4
download_delay = 1.0
# Filter to allowed domains
allowed_domains = {"example.com"}
# Handle robots.txt (respect website rules)
# Check manually or use external tools
2. Error Handling
class RobustSpider(Spider):
async def on_error(self, request, error):
"""Log errors for debugging."""
self.logger.error(f"Error on {request.url}: {error}")
async def on_scraped_item(self, item):
"""Validate data."""
required_fields = ['title', 'url']
if not all(item.get(field) for field in required_fields):
return None # Drop invalid items
return item
3. Data Quality
class QualitySpider(Spider):
async def on_scraped_item(self, item):
# Clean text
for key, value in item.items():
if isinstance(value, str):
item[key] = value.strip()
# Validate types
if 'price' in item and item['price']:
try:
item['price'] = float(item['price'])
except ValueError:
item['price'] = None
return item
4. Efficient Crawling
class EfficientSpider(Spider):
# Use appropriate concurrency
concurrent_requests = 10
def configure_sessions(self, manager):
# HTTP for simple pages
manager.add("fast", FetcherSession(http3=True))
# Browser only when needed
manager.add("browser", AsyncStealthySession(
disable_resources=True # Block images/fonts
), lazy=True)