Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/galloclaudio/mega-search-links/llms.txt

Use this file to discover all available pages before exploring further.

Batch processing multiple queries

Process multiple search queries efficiently:
from main import URLFetcher

def batch_fetch_urls(queries, base_url, user_agent):
    """
    Fetch URLs for multiple search queries
    """
    fetcher = URLFetcher(base_url, user_agent)
    results = {}
    
    for query in queries:
        print(f"Fetching URLs for: {query}")
        urls = fetcher.fetch_urls(query)
        results[query] = urls
        print(f"  Found {len(urls)} URLs")
    
    return results

# Usage
queries = ["python tutorials", "machine learning", "data science", "web development"]

all_results = batch_fetch_urls(
    queries,
    "https://meawfy.com/internal/api/results.json",
    "BatchProcessor/1.0"
)

# Process results
for query, urls in all_results.items():
    print(f"\n{query}: {len(urls)} URLs")
    for url in urls[:3]:  # Show first 3
        print(f"  - {url}")

Concurrent requests with threading

Speed up batch processing with concurrent requests:
from main import URLFetcher
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_single_query(query, base_url, user_agent):
    """
    Fetch URLs for a single query (thread-safe)
    """
    fetcher = URLFetcher(base_url, user_agent)
    urls = fetcher.fetch_urls(query)
    return query, urls

def concurrent_batch_fetch(queries, base_url, user_agent, max_workers=5):
    """
    Fetch URLs for multiple queries concurrently
    """
    results = {}
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_query = {
            executor.submit(fetch_single_query, query, base_url, user_agent): query
            for query in queries
        }
        
        # Process completed tasks
        for future in as_completed(future_to_query):
            query, urls = future.result()
            results[query] = urls
            print(f"Completed: {query} ({len(urls)} URLs)")
    
    return results

# Usage
queries = ["movies", "games", "courses", "ebooks", "music"]

start_time = time.time()
results = concurrent_batch_fetch(
    queries,
    "https://meawfy.com/internal/api/results.json",
    "ConcurrentFetcher/1.0",
    max_workers=3
)
end_time = time.time()

print(f"\nProcessed {len(queries)} queries in {end_time - start_time:.2f}s")
print(f"Total URLs found: {sum(len(urls) for urls in results.values())}")
Be respectful when using concurrent requests. Limit the number of workers and consider implementing rate limiting to avoid overwhelming the API server.

Async requests with asyncio

For high-performance async I/O operations:
import asyncio
import aiohttp

class AsyncURLFetcher:
    def __init__(self, base_url, user_agent):
        self.base_url = base_url
        self.headers = {'User-Agent': user_agent}
    
    async def fetch_urls(self, session, search_query):
        """
        Asynchronously fetch URLs for a search query
        """
        try:
            url = f"{self.base_url}?q={search_query}"
            async with session.get(url, headers=self.headers) as response:
                response.raise_for_status()
                data = await response.json()
                urls = data.get('urls', [])
                return search_query, urls
        except Exception as e:
            print(f"Error fetching {search_query}: {e}")
            return search_query, []
    
    async def batch_fetch(self, queries):
        """
        Fetch URLs for multiple queries asynchronously
        """
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_urls(session, query) for query in queries]
            results = await asyncio.gather(*tasks)
            return dict(results)

# Usage
async def main():
    fetcher = AsyncURLFetcher(
        "https://meawfy.com/internal/api/results.json",
        "AsyncFetcher/1.0"
    )
    
    queries = ["python", "javascript", "golang", "rust", "java"]
    results = await fetcher.batch_fetch(queries)
    
    for query, urls in results.items():
        print(f"{query}: {len(urls)} URLs")

# Run the async code
asyncio.run(main())

Extending URLFetcher for custom needs

Implement caching to avoid redundant API calls:
from main import URLFetcher
import time

class CachedURLFetcher(URLFetcher):
    def __init__(self, base_url, user_agent, cache_ttl=300):
        super().__init__(base_url, user_agent)
        self.cache = {}
        self.cache_ttl = cache_ttl  # Time to live in seconds
    
    def fetch_urls(self, search_query):
        # Check cache
        if search_query in self.cache:
            cached_time, cached_urls = self.cache[search_query]
            if time.time() - cached_time < self.cache_ttl:
                print(f"Cache hit for: {search_query}")
                return cached_urls
        
        # Cache miss - fetch from API
        print(f"Cache miss for: {search_query}")
        urls = super().fetch_urls(search_query)
        
        # Store in cache
        self.cache[search_query] = (time.time(), urls)
        return urls
    
    def clear_cache(self):
        """Clear all cached results"""
        self.cache.clear()

# Usage
fetcher = CachedURLFetcher(
    "https://meawfy.com/internal/api/results.json",
    "CachedFetcher/1.0",
    cache_ttl=600  # 10 minutes
)

# First call - fetches from API
urls1 = fetcher.fetch_urls("python")

# Second call - returns from cache
urls2 = fetcher.fetch_urls("python")

Real-world use cases

Content aggregation pipeline

Build a content aggregation system:
from main import URLFetcher
import json
from datetime import datetime

class ContentAggregator:
    def __init__(self, base_url, user_agent):
        self.fetcher = URLFetcher(base_url, user_agent)
    
    def aggregate_content(self, categories):
        """
        Aggregate URLs from multiple categories
        """
        aggregated_data = {
            'timestamp': datetime.now().isoformat(),
            'categories': {}
        }
        
        for category in categories:
            print(f"Aggregating: {category}")
            urls = self.fetcher.fetch_urls(category)
            
            aggregated_data['categories'][category] = {
                'count': len(urls),
                'urls': urls
            }
        
        return aggregated_data
    
    def save_to_file(self, data, filename):
        """Save aggregated data to JSON file"""
        with open(filename, 'w') as f:
            json.dump(data, f, indent=2)
        print(f"Saved to {filename}")

# Usage
aggregator = ContentAggregator(
    "https://meawfy.com/internal/api/results.json",
    "ContentAggregator/1.0"
)

categories = ["movies", "documentaries", "courses", "ebooks"]
data = aggregator.aggregate_content(categories)
aggregator.save_to_file(data, 'content_aggregate.json')

# Print summary
for category, info in data['categories'].items():
    print(f"{category}: {info['count']} URLs")

URL monitoring system

Monitor search results for changes:
from main import URLFetcher
import time
import json
from datetime import datetime

class URLMonitor:
    def __init__(self, base_url, user_agent):
        self.fetcher = URLFetcher(base_url, user_agent)
        self.previous_results = {}
    
    def check_for_changes(self, query):
        """
        Check if results for a query have changed
        """
        current_urls = set(self.fetcher.fetch_urls(query))
        
        if query not in self.previous_results:
            self.previous_results[query] = current_urls
            return {
                'new': list(current_urls),
                'removed': [],
                'unchanged': []
            }
        
        previous_urls = self.previous_results[query]
        
        changes = {
            'new': list(current_urls - previous_urls),
            'removed': list(previous_urls - current_urls),
            'unchanged': list(current_urls & previous_urls)
        }
        
        self.previous_results[query] = current_urls
        return changes
    
    def monitor(self, queries, interval=300, duration=3600):
        """
        Monitor queries for changes over time
        
        Args:
            queries: List of search queries to monitor
            interval: Check interval in seconds (default: 5 minutes)
            duration: Total monitoring duration in seconds (default: 1 hour)
        """
        end_time = time.time() + duration
        
        while time.time() < end_time:
            print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Checking for changes...")
            
            for query in queries:
                changes = self.check_for_changes(query)
                
                if changes['new']:
                    print(f"  {query}: {len(changes['new'])} new URLs")
                    for url in changes['new'][:3]:  # Show first 3
                        print(f"    + {url}")
                
                if changes['removed']:
                    print(f"  {query}: {len(changes['removed'])} removed URLs")
            
            time.sleep(interval)

# Usage
monitor = URLMonitor(
    "https://meawfy.com/internal/api/results.json",
    "URLMonitor/1.0"
)

# Monitor for 1 hour, checking every 10 minutes
monitor.monitor(
    queries=["new movies", "recent courses"],
    interval=600,
    duration=3600
)

Data export to CSV

Export results to CSV format for analysis:
from main import URLFetcher
import csv
from datetime import datetime
from urllib.parse import urlparse

class URLExporter:
    def __init__(self, base_url, user_agent):
        self.fetcher = URLFetcher(base_url, user_agent)
    
    def export_to_csv(self, queries, filename):
        """
        Export URLs from multiple queries to CSV
        """
        with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Query', 'URL', 'Domain', 'Timestamp'])
            
            for query in queries:
                urls = self.fetcher.fetch_urls(query)
                timestamp = datetime.now().isoformat()
                
                for url in urls:
                    domain = urlparse(url).netloc
                    writer.writerow([query, url, domain, timestamp])
        
        print(f"Exported to {filename}")

# Usage
exporter = URLExporter(
    "https://meawfy.com/internal/api/results.json",
    "URLExporter/1.0"
)

queries = ["python tutorials", "machine learning courses"]
exporter.export_to_csv(queries, 'search_results.csv')

Integration with pandas for data analysis

Analyze results using pandas:
from main import URLFetcher
import pandas as pd
from urllib.parse import urlparse
from collections import Counter

class URLAnalyzer:
    def __init__(self, base_url, user_agent):
        self.fetcher = URLFetcher(base_url, user_agent)
    
    def analyze_queries(self, queries):
        """
        Fetch and analyze URLs from multiple queries
        """
        data = []
        
        for query in queries:
            urls = self.fetcher.fetch_urls(query)
            for url in urls:
                parsed = urlparse(url)
                data.append({
                    'query': query,
                    'url': url,
                    'domain': parsed.netloc,
                    'scheme': parsed.scheme
                })
        
        df = pd.DataFrame(data)
        return df
    
    def get_statistics(self, df):
        """
        Generate statistics from the DataFrame
        """
        stats = {
            'total_urls': len(df),
            'unique_urls': df['url'].nunique(),
            'queries': df['query'].nunique(),
            'top_domains': df['domain'].value_counts().head(10).to_dict(),
            'urls_per_query': df.groupby('query').size().to_dict()
        }
        return stats

# Usage
analyzer = URLAnalyzer(
    "https://meawfy.com/internal/api/results.json",
    "URLAnalyzer/1.0"
)

queries = ["movies", "games", "courses"]
df = analyzer.analyze_queries(queries)
stats = analyzer.get_statistics(df)

print(f"Total URLs: {stats['total_urls']}")
print(f"Unique URLs: {stats['unique_urls']}")
print("\nTop domains:")
for domain, count in stats['top_domains'].items():
    print(f"  {domain}: {count}")

Performance optimization tips

Best practices for optimal performance:
  1. Use connection pooling: Reuse the same URLFetcher instance for multiple requests
  2. Implement caching: Cache results for frequently queried terms
  3. Rate limiting: Respect API limits with proper rate limiting
  4. Concurrent requests: Use threading or async for batch operations
  5. Timeout configuration: Set appropriate timeouts to prevent hanging
  6. Retry logic: Implement exponential backoff for transient failures
  7. Compress responses: Request gzip compression if API supports it
  8. Monitor performance: Track response times and success rates

Session persistence with pickle

Save and restore fetcher state:
from main import URLFetcher
import pickle

# Create and configure fetcher
fetcher = URLFetcher(
    "https://meawfy.com/internal/api/results.json",
    "MyApp/1.0"
)

# Save to file
with open('fetcher_config.pkl', 'wb') as f:
    pickle.dump(fetcher, f)

# Later, restore from file
with open('fetcher_config.pkl', 'rb') as f:
    restored_fetcher = pickle.load(f)

urls = restored_fetcher.fetch_urls("test query")
These advanced patterns demonstrate the flexibility of the URLFetcher class. You can extend it to fit your specific needs while maintaining the simple, clean interface of the base implementation.

Build docs developers (and LLMs) love