Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Anny26022/chartsmaze_clone/llms.txt

Use this file to discover all available pages before exploring further.

The EDL Pipeline includes robust error handling to ensure data integrity and graceful degradation when individual scripts fail.

Error Handling Philosophy

The pipeline follows a fail-forward approach:
  1. Critical failures (Phase 1) halt the pipeline
  2. Non-critical failures (Phase 2+) log errors but continue execution
  3. Enrichment failures (Phase 4) skip problematic stocks but complete the run

Critical vs Non-Critical Failures

Critical Failures (Pipeline Stops)

Phase 1: Core Data
  • fetch_dhan_data.py failure → No master_isin_map.jsonSTOP
  • bulk_market_analyzer.py failure → No base JSON → STOP
# From run_full_pipeline.py (lines 207-212)
results["fetch_dhan_data.py"] = run_script("fetch_dhan_data.py", "Phase 1")

if not results["fetch_dhan_data.py"]:
    print("\n🛑 CRITICAL: fetch_dhan_data.py failed. Cannot continue.")
    print("   This script produces master_isin_map.json which ALL other scripts need.")
    return

Non-Critical Failures (Pipeline Continues)

Phase 2: Enrichment
  • Individual enrichment scripts can fail without stopping the pipeline
  • Example: fetch_market_news.py fails → News fields will be empty, but pipeline completes
# From run_full_pipeline.py (lines 123-126)
if result.returncode == 0:
    print(f"  ✅ {script_name} ({elapsed:.1f}s)")
    return True
else:
    print(f"  ❌ {script_name} FAILED ({elapsed:.1f}s)")
    return True  # Continuing on enrichment errors to finish the job

Error Types and Solutions

1. Network Errors

Symptoms: requests.exceptions.ConnectionError, ReadTimeout, HTTPError Causes:
  • API endpoint temporarily down
  • Network connectivity issues
  • Rate limiting by Dhan/NSE servers
Solutions:
# Example from fetch_fundamental_data.py
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# Add retry logic
session = requests.Session()
retry = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)

try:
    response = session.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
except requests.exceptions.RequestException as e:
    print(f"Network error: {e}")
    # Continue with next stock or retry

2. Timeout Errors

Symptoms: Script hangs or subprocess.TimeoutExpired Causes:
  • API response delay
  • Large data transfer
  • System resource constraints
Pipeline timeout: 1800 seconds (30 minutes) per script
# From run_full_pipeline.py (lines 112-130)
try:
    result = subprocess.run(
        [sys.executable, script_path],
        cwd=BASE_DIR,
        text=True,
        timeout=1800  # 30 minutes
    )
except subprocess.TimeoutExpired:
    print(f"  ⏰ {script_name} TIMED OUT (>30 min)")
    return False
Fix: Increase timeout for slow scripts
# For fetch_all_ohlcv.py, extend to 3600 seconds (1 hour)
timeout=3600

3. Data Quality Errors

Symptoms: Missing fields, None values, type mismatches Example:
# From bulk_market_analyzer.py (lines 5-9)
def get_float(value_str):
    try:
        return float(value_str)
    except (ValueError, TypeError):
        return 0.0  # Safe fallback
Best Practices:
  • Use defensive .get() instead of direct key access
  • Provide sensible defaults (0 for numbers, "" for strings, [] for arrays)
  • Validate critical fields before processing
# Good: Safe field access
pe = stock.get('P/E', 0)
if pe > 0:
    # Process P/E
    
# Bad: Direct access (throws KeyError if missing)
pe = stock['P/E']  # May crash

4. File I/O Errors

Symptoms: FileNotFoundError, PermissionError, OSError Common Causes:
  • Missing input files (e.g., master_isin_map.json not created)
  • Disk space exhausted
  • Permission issues on output directory
Prevention:
import os

# Check input file exists before processing
if not os.path.exists(INPUT_FILE):
    print(f"Error: {INPUT_FILE} not found. Run fetch_dhan_data.py first.")
    return

# Create output directory if missing
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# Check disk space before writing large files
stat = os.statvfs(BASE_DIR)
free_space_mb = (stat.f_bavail * stat.f_frsize) / (1024 * 1024)
if free_space_mb < 500:  # Less than 500 MB free
    print(f"Warning: Low disk space ({free_space_mb:.1f} MB)")

5. JSON Parsing Errors

Symptoms: json.decoder.JSONDecodeError Causes:
  • Malformed API response
  • Incomplete file write (crashed mid-write)
  • Encoding issues
Handling:
import json

try:
    with open('data.json', 'r') as f:
        data = json.load(f)
except json.JSONDecodeError as e:
    print(f"JSON parsing error: {e}")
    print(f"Line {e.lineno}, Column {e.colno}")
    # Option 1: Skip file
    data = []
    # Option 2: Attempt manual repair
    # Option 3: Re-fetch data

Multi-threaded Error Handling

ThreadPoolExecutor Patterns

Many scripts use threading for parallel API calls:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_with_retry(item, max_retries=3):
    """Fetch with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            # API call
            response = requests.post(url, json=payload, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                print(f"Failed after {max_retries} attempts: {e}")
                return None
            # Exponential backoff: 1s, 2s, 4s
            time.sleep(2 ** attempt)
    return None

# Execute with thread pool
with ThreadPoolExecutor(max_workers=20) as executor:
    future_to_stock = {
        executor.submit(fetch_with_retry, item): item["Symbol"] 
        for item in stock_list
    }
    
    for future in as_completed(future_to_stock):
        symbol = future_to_stock[future]
        try:
            result = future.result()
            if result:
                success_count += 1
            else:
                error_count += 1
        except Exception as e:
            print(f"Unexpected error for {symbol}: {e}")
            error_count += 1

Logging and Diagnostics

Enable Detailed Logging

Add logging to scripts for better debugging:
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# Use in scripts
logger.info(f"Processing {symbol}")
logger.warning(f"Missing field 'P/E' for {symbol}")
logger.error(f"API call failed for {symbol}: {error}")

Check Pipeline Logs

# View real-time logs
tail -f pipeline.log

# Search for errors
grep ERROR pipeline.log

# Count failures by type
grep "Network error" pipeline.log | wc -l

Recovery Strategies

Partial Re-runs

If a Phase 2 script fails, you can re-run just that script:
# Re-fetch company filings without re-running full pipeline
python3 fetch_company_filings.py

# Then re-run enrichment phases
python3 advanced_metrics_processor.py
python3 add_corporate_events.py

Checkpoint-based Recovery

Modify scripts to skip already-processed items:
# Example: Skip stocks with existing files
output_path = f"{OUTPUT_DIR}/{symbol}_news.json"
if os.path.exists(output_path) and os.path.getsize(output_path) > 10:
    print(f"Skipping {symbol} (already exists)")
    return "skipped"

# Fetch and save
# ...
Enable checkpoint mode:
# In fetch_company_filings.py (line 12)
FORCE_UPDATE = False  # Skip existing files

Common Troubleshooting Scenarios

Scenario 1: “master_isin_map.json not found”

Cause: fetch_dhan_data.py failed or didn’t run Solution:
python3 fetch_dhan_data.py
# Check output
ls -lh master_isin_map.json

Scenario 2: “Empty fundamental_data.json”

Cause: API endpoint changed or rate limited Solution:
  1. Check API endpoint in fetch_fundamental_data.py
  2. Test API call manually with curl
  3. Add delays between requests

Scenario 3: “Compression failed”

Cause: Disk full or corrupted JSON file Solution:
# Check disk space
df -h

# Validate JSON syntax
jq . all_stocks_fundamental_analysis.json > /dev/null

# Manual compression
gzip -9 all_stocks_fundamental_analysis.json

Scenario 4: “OHLCV data missing dates”

Cause: Market holiday or API gap Solution: OHLCV fetcher auto-fills gaps; verify date range:
import pandas as pd
df = pd.read_csv('ohlcv_data/RELIANCE.csv')
print(df['Date'].min(), df['Date'].max())

Monitoring and Alerts

Email Alerts on Failure

import smtplib
from email.mime.text import MIMEText

def send_alert(subject, body):
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = '[email protected]'
    msg['To'] = '[email protected]'
    
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login('user', 'password')
        server.send_message(msg)

# In run_full_pipeline.py
if failed > 0:
    send_alert(
        'Pipeline Failures Detected',
        f'{failed} scripts failed. Check logs.'
    )

Health Check Script

#!/bin/bash
# check_pipeline_health.sh

FILE="all_stocks_fundamental_analysis.json.gz"
MAX_AGE_HOURS=36

if [ ! -f "$FILE" ]; then
    echo "ERROR: Output file missing"
    exit 1
fi

AGE=$(( $(date +%s) - $(stat -c %Y "$FILE") ))
AGE_HOURS=$(( AGE / 3600 ))

if [ $AGE_HOURS -gt $MAX_AGE_HOURS ]; then
    echo "WARNING: Data is $AGE_HOURS hours old"
    exit 1
fi

echo "OK: Data is fresh ($AGE_HOURS hours old)"
exit 0

Next Steps

Performance Tuning

Optimize threading, batching, and timeouts

Incremental Updates

Set up automated daily updates with monitoring

Build docs developers (and LLMs) love