Documentation Index
Fetch the complete documentation index at: https://mintlify.com/galloclaudio/mega-search-links/llms.txt
Use this file to discover all available pages before exploring further.
Batch processing multiple queries
Process multiple search queries efficiently:from main import URLFetcher
def batch_fetch_urls(queries, base_url, user_agent):
"""
Fetch URLs for multiple search queries
"""
fetcher = URLFetcher(base_url, user_agent)
results = {}
for query in queries:
print(f"Fetching URLs for: {query}")
urls = fetcher.fetch_urls(query)
results[query] = urls
print(f" Found {len(urls)} URLs")
return results
# Usage
queries = ["python tutorials", "machine learning", "data science", "web development"]
all_results = batch_fetch_urls(
queries,
"https://meawfy.com/internal/api/results.json",
"BatchProcessor/1.0"
)
# Process results
for query, urls in all_results.items():
print(f"\n{query}: {len(urls)} URLs")
for url in urls[:3]: # Show first 3
print(f" - {url}")
Concurrent requests with threading
Speed up batch processing with concurrent requests:from main import URLFetcher
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_single_query(query, base_url, user_agent):
"""
Fetch URLs for a single query (thread-safe)
"""
fetcher = URLFetcher(base_url, user_agent)
urls = fetcher.fetch_urls(query)
return query, urls
def concurrent_batch_fetch(queries, base_url, user_agent, max_workers=5):
"""
Fetch URLs for multiple queries concurrently
"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_query = {
executor.submit(fetch_single_query, query, base_url, user_agent): query
for query in queries
}
# Process completed tasks
for future in as_completed(future_to_query):
query, urls = future.result()
results[query] = urls
print(f"Completed: {query} ({len(urls)} URLs)")
return results
# Usage
queries = ["movies", "games", "courses", "ebooks", "music"]
start_time = time.time()
results = concurrent_batch_fetch(
queries,
"https://meawfy.com/internal/api/results.json",
"ConcurrentFetcher/1.0",
max_workers=3
)
end_time = time.time()
print(f"\nProcessed {len(queries)} queries in {end_time - start_time:.2f}s")
print(f"Total URLs found: {sum(len(urls) for urls in results.values())}")
Be respectful when using concurrent requests. Limit the number of workers and consider implementing rate limiting to avoid overwhelming the API server.
Async requests with asyncio
For high-performance async I/O operations:import asyncio
import aiohttp
class AsyncURLFetcher:
def __init__(self, base_url, user_agent):
self.base_url = base_url
self.headers = {'User-Agent': user_agent}
async def fetch_urls(self, session, search_query):
"""
Asynchronously fetch URLs for a search query
"""
try:
url = f"{self.base_url}?q={search_query}"
async with session.get(url, headers=self.headers) as response:
response.raise_for_status()
data = await response.json()
urls = data.get('urls', [])
return search_query, urls
except Exception as e:
print(f"Error fetching {search_query}: {e}")
return search_query, []
async def batch_fetch(self, queries):
"""
Fetch URLs for multiple queries asynchronously
"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_urls(session, query) for query in queries]
results = await asyncio.gather(*tasks)
return dict(results)
# Usage
async def main():
fetcher = AsyncURLFetcher(
"https://meawfy.com/internal/api/results.json",
"AsyncFetcher/1.0"
)
queries = ["python", "javascript", "golang", "rust", "java"]
results = await fetcher.batch_fetch(queries)
for query, urls in results.items():
print(f"{query}: {len(urls)} URLs")
# Run the async code
asyncio.run(main())
Extending URLFetcher for custom needs
- Add caching
- Add rate limiting
- Add result filtering
Implement caching to avoid redundant API calls:
from main import URLFetcher
import time
class CachedURLFetcher(URLFetcher):
def __init__(self, base_url, user_agent, cache_ttl=300):
super().__init__(base_url, user_agent)
self.cache = {}
self.cache_ttl = cache_ttl # Time to live in seconds
def fetch_urls(self, search_query):
# Check cache
if search_query in self.cache:
cached_time, cached_urls = self.cache[search_query]
if time.time() - cached_time < self.cache_ttl:
print(f"Cache hit for: {search_query}")
return cached_urls
# Cache miss - fetch from API
print(f"Cache miss for: {search_query}")
urls = super().fetch_urls(search_query)
# Store in cache
self.cache[search_query] = (time.time(), urls)
return urls
def clear_cache(self):
"""Clear all cached results"""
self.cache.clear()
# Usage
fetcher = CachedURLFetcher(
"https://meawfy.com/internal/api/results.json",
"CachedFetcher/1.0",
cache_ttl=600 # 10 minutes
)
# First call - fetches from API
urls1 = fetcher.fetch_urls("python")
# Second call - returns from cache
urls2 = fetcher.fetch_urls("python")
Implement rate limiting to control request frequency:
from main import URLFetcher
import time
class RateLimitedURLFetcher(URLFetcher):
def __init__(self, base_url, user_agent, max_requests=10, time_window=60):
super().__init__(base_url, user_agent)
self.max_requests = max_requests
self.time_window = time_window # in seconds
self.request_times = []
def fetch_urls(self, search_query):
# Clean old requests outside time window
current_time = time.time()
self.request_times = [
t for t in self.request_times
if current_time - t < self.time_window
]
# Check rate limit
if len(self.request_times) >= self.max_requests:
wait_time = self.time_window - (current_time - self.request_times[0])
print(f"Rate limit reached. Waiting {wait_time:.1f}s...")
time.sleep(wait_time)
self.request_times.pop(0)
# Record this request
self.request_times.append(time.time())
# Make the request
return super().fetch_urls(search_query)
# Usage: Maximum 5 requests per 30 seconds
fetcher = RateLimitedURLFetcher(
"https://meawfy.com/internal/api/results.json",
"RateLimitedFetcher/1.0",
max_requests=5,
time_window=30
)
Filter and process results automatically:
from main import URLFetcher
import re
class FilteredURLFetcher(URLFetcher):
def __init__(self, base_url, user_agent, url_pattern=None, min_urls=0):
super().__init__(base_url, user_agent)
self.url_pattern = re.compile(url_pattern) if url_pattern else None
self.min_urls = min_urls
def fetch_urls(self, search_query):
urls = super().fetch_urls(search_query)
# Apply pattern filter if specified
if self.url_pattern:
urls = [url for url in urls if self.url_pattern.search(url)]
# Return empty if below minimum threshold
if len(urls) < self.min_urls:
return []
return urls
# Usage: Only return URLs containing "mega.nz" with at least 3 results
fetcher = FilteredURLFetcher(
"https://meawfy.com/internal/api/results.json",
"FilteredFetcher/1.0",
url_pattern=r"mega\.nz",
min_urls=3
)
urls = fetcher.fetch_urls("movies")
print(f"Found {len(urls)} matching URLs")
Real-world use cases
Content aggregation pipeline
Build a content aggregation system:from main import URLFetcher
import json
from datetime import datetime
class ContentAggregator:
def __init__(self, base_url, user_agent):
self.fetcher = URLFetcher(base_url, user_agent)
def aggregate_content(self, categories):
"""
Aggregate URLs from multiple categories
"""
aggregated_data = {
'timestamp': datetime.now().isoformat(),
'categories': {}
}
for category in categories:
print(f"Aggregating: {category}")
urls = self.fetcher.fetch_urls(category)
aggregated_data['categories'][category] = {
'count': len(urls),
'urls': urls
}
return aggregated_data
def save_to_file(self, data, filename):
"""Save aggregated data to JSON file"""
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
print(f"Saved to {filename}")
# Usage
aggregator = ContentAggregator(
"https://meawfy.com/internal/api/results.json",
"ContentAggregator/1.0"
)
categories = ["movies", "documentaries", "courses", "ebooks"]
data = aggregator.aggregate_content(categories)
aggregator.save_to_file(data, 'content_aggregate.json')
# Print summary
for category, info in data['categories'].items():
print(f"{category}: {info['count']} URLs")
URL monitoring system
Monitor search results for changes:from main import URLFetcher
import time
import json
from datetime import datetime
class URLMonitor:
def __init__(self, base_url, user_agent):
self.fetcher = URLFetcher(base_url, user_agent)
self.previous_results = {}
def check_for_changes(self, query):
"""
Check if results for a query have changed
"""
current_urls = set(self.fetcher.fetch_urls(query))
if query not in self.previous_results:
self.previous_results[query] = current_urls
return {
'new': list(current_urls),
'removed': [],
'unchanged': []
}
previous_urls = self.previous_results[query]
changes = {
'new': list(current_urls - previous_urls),
'removed': list(previous_urls - current_urls),
'unchanged': list(current_urls & previous_urls)
}
self.previous_results[query] = current_urls
return changes
def monitor(self, queries, interval=300, duration=3600):
"""
Monitor queries for changes over time
Args:
queries: List of search queries to monitor
interval: Check interval in seconds (default: 5 minutes)
duration: Total monitoring duration in seconds (default: 1 hour)
"""
end_time = time.time() + duration
while time.time() < end_time:
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Checking for changes...")
for query in queries:
changes = self.check_for_changes(query)
if changes['new']:
print(f" {query}: {len(changes['new'])} new URLs")
for url in changes['new'][:3]: # Show first 3
print(f" + {url}")
if changes['removed']:
print(f" {query}: {len(changes['removed'])} removed URLs")
time.sleep(interval)
# Usage
monitor = URLMonitor(
"https://meawfy.com/internal/api/results.json",
"URLMonitor/1.0"
)
# Monitor for 1 hour, checking every 10 minutes
monitor.monitor(
queries=["new movies", "recent courses"],
interval=600,
duration=3600
)
Data export to CSV
Export results to CSV format for analysis:from main import URLFetcher
import csv
from datetime import datetime
from urllib.parse import urlparse
class URLExporter:
def __init__(self, base_url, user_agent):
self.fetcher = URLFetcher(base_url, user_agent)
def export_to_csv(self, queries, filename):
"""
Export URLs from multiple queries to CSV
"""
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Query', 'URL', 'Domain', 'Timestamp'])
for query in queries:
urls = self.fetcher.fetch_urls(query)
timestamp = datetime.now().isoformat()
for url in urls:
domain = urlparse(url).netloc
writer.writerow([query, url, domain, timestamp])
print(f"Exported to {filename}")
# Usage
exporter = URLExporter(
"https://meawfy.com/internal/api/results.json",
"URLExporter/1.0"
)
queries = ["python tutorials", "machine learning courses"]
exporter.export_to_csv(queries, 'search_results.csv')
Integration with pandas for data analysis
Analyze results using pandas:from main import URLFetcher
import pandas as pd
from urllib.parse import urlparse
from collections import Counter
class URLAnalyzer:
def __init__(self, base_url, user_agent):
self.fetcher = URLFetcher(base_url, user_agent)
def analyze_queries(self, queries):
"""
Fetch and analyze URLs from multiple queries
"""
data = []
for query in queries:
urls = self.fetcher.fetch_urls(query)
for url in urls:
parsed = urlparse(url)
data.append({
'query': query,
'url': url,
'domain': parsed.netloc,
'scheme': parsed.scheme
})
df = pd.DataFrame(data)
return df
def get_statistics(self, df):
"""
Generate statistics from the DataFrame
"""
stats = {
'total_urls': len(df),
'unique_urls': df['url'].nunique(),
'queries': df['query'].nunique(),
'top_domains': df['domain'].value_counts().head(10).to_dict(),
'urls_per_query': df.groupby('query').size().to_dict()
}
return stats
# Usage
analyzer = URLAnalyzer(
"https://meawfy.com/internal/api/results.json",
"URLAnalyzer/1.0"
)
queries = ["movies", "games", "courses"]
df = analyzer.analyze_queries(queries)
stats = analyzer.get_statistics(df)
print(f"Total URLs: {stats['total_urls']}")
print(f"Unique URLs: {stats['unique_urls']}")
print("\nTop domains:")
for domain, count in stats['top_domains'].items():
print(f" {domain}: {count}")
Performance optimization tips
Best practices for optimal performance:
- Use connection pooling: Reuse the same
URLFetcherinstance for multiple requests - Implement caching: Cache results for frequently queried terms
- Rate limiting: Respect API limits with proper rate limiting
- Concurrent requests: Use threading or async for batch operations
- Timeout configuration: Set appropriate timeouts to prevent hanging
- Retry logic: Implement exponential backoff for transient failures
- Compress responses: Request gzip compression if API supports it
- Monitor performance: Track response times and success rates
Session persistence with pickle
Save and restore fetcher state:from main import URLFetcher
import pickle
# Create and configure fetcher
fetcher = URLFetcher(
"https://meawfy.com/internal/api/results.json",
"MyApp/1.0"
)
# Save to file
with open('fetcher_config.pkl', 'wb') as f:
pickle.dump(fetcher, f)
# Later, restore from file
with open('fetcher_config.pkl', 'rb') as f:
restored_fetcher = pickle.load(f)
urls = restored_fetcher.fetch_urls("test query")
These advanced patterns demonstrate the flexibility of the
URLFetcher class. You can extend it to fit your specific needs while maintaining the simple, clean interface of the base implementation.