Documentation Index Fetch the complete documentation index at: https://mintlify.com/Anny26022/chartsmaze_clone/llms.txt
Use this file to discover all available pages before exploring further.
The EDL Pipeline includes robust error handling to ensure data integrity and graceful degradation when individual scripts fail.
Error Handling Philosophy
The pipeline follows a fail-forward approach:
Critical failures (Phase 1) halt the pipeline
Non-critical failures (Phase 2+) log errors but continue execution
Enrichment failures (Phase 4) skip problematic stocks but complete the run
Critical vs Non-Critical Failures
Critical Failures (Pipeline Stops)
Phase 1: Core Data
fetch_dhan_data.py failure → No master_isin_map.json → STOP
bulk_market_analyzer.py failure → No base JSON → STOP
# From run_full_pipeline.py (lines 207-212)
results[ "fetch_dhan_data.py" ] = run_script( "fetch_dhan_data.py" , "Phase 1" )
if not results[ "fetch_dhan_data.py" ]:
print ( " \n 🛑 CRITICAL: fetch_dhan_data.py failed. Cannot continue." )
print ( " This script produces master_isin_map.json which ALL other scripts need." )
return
Non-Critical Failures (Pipeline Continues)
Phase 2: Enrichment
Individual enrichment scripts can fail without stopping the pipeline
Example: fetch_market_news.py fails → News fields will be empty, but pipeline completes
# From run_full_pipeline.py (lines 123-126)
if result.returncode == 0 :
print ( f " ✅ { script_name } ( { elapsed :.1f} s)" )
return True
else :
print ( f " ❌ { script_name } FAILED ( { elapsed :.1f} s)" )
return True # Continuing on enrichment errors to finish the job
Error Types and Solutions
1. Network Errors
Symptoms : requests.exceptions.ConnectionError, ReadTimeout, HTTPError
Causes :
API endpoint temporarily down
Network connectivity issues
Rate limiting by Dhan/NSE servers
Solutions :
# Example from fetch_fundamental_data.py
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# Add retry logic
session = requests.Session()
retry = Retry(
total = 3 ,
backoff_factor = 1 ,
status_forcelist = [ 500 , 502 , 503 , 504 ]
)
adapter = HTTPAdapter( max_retries = retry)
session.mount( 'http://' , adapter)
session.mount( 'https://' , adapter)
try :
response = session.post(url, json = payload, headers = headers, timeout = 30 )
response.raise_for_status()
except requests.exceptions.RequestException as e:
print ( f "Network error: { e } " )
# Continue with next stock or retry
2. Timeout Errors
Symptoms : Script hangs or subprocess.TimeoutExpired
Causes :
API response delay
Large data transfer
System resource constraints
Pipeline timeout : 1800 seconds (30 minutes) per script
# From run_full_pipeline.py (lines 112-130)
try :
result = subprocess.run(
[sys.executable, script_path],
cwd = BASE_DIR ,
text = True ,
timeout = 1800 # 30 minutes
)
except subprocess.TimeoutExpired:
print ( f " ⏰ { script_name } TIMED OUT (>30 min)" )
return False
Fix : Increase timeout for slow scripts
# For fetch_all_ohlcv.py, extend to 3600 seconds (1 hour)
timeout = 3600
3. Data Quality Errors
Symptoms : Missing fields, None values, type mismatches
Example :
# From bulk_market_analyzer.py (lines 5-9)
def get_float ( value_str ):
try :
return float (value_str)
except ( ValueError , TypeError ):
return 0.0 # Safe fallback
Best Practices :
Use defensive .get() instead of direct key access
Provide sensible defaults (0 for numbers, "" for strings, [] for arrays)
Validate critical fields before processing
# Good: Safe field access
pe = stock.get( 'P/E' , 0 )
if pe > 0 :
# Process P/E
# Bad: Direct access (throws KeyError if missing)
pe = stock[ 'P/E' ] # May crash
4. File I/O Errors
Symptoms : FileNotFoundError, PermissionError, OSError
Common Causes :
Missing input files (e.g., master_isin_map.json not created)
Disk space exhausted
Permission issues on output directory
Prevention :
import os
# Check input file exists before processing
if not os.path.exists( INPUT_FILE ):
print ( f "Error: { INPUT_FILE } not found. Run fetch_dhan_data.py first." )
return
# Create output directory if missing
if not os.path.exists( OUTPUT_DIR ):
os.makedirs( OUTPUT_DIR )
# Check disk space before writing large files
stat = os.statvfs( BASE_DIR )
free_space_mb = (stat.f_bavail * stat.f_frsize) / ( 1024 * 1024 )
if free_space_mb < 500 : # Less than 500 MB free
print ( f "Warning: Low disk space ( { free_space_mb :.1f} MB)" )
5. JSON Parsing Errors
Symptoms : json.decoder.JSONDecodeError
Causes :
Malformed API response
Incomplete file write (crashed mid-write)
Encoding issues
Handling :
import json
try :
with open ( 'data.json' , 'r' ) as f:
data = json.load(f)
except json.JSONDecodeError as e:
print ( f "JSON parsing error: { e } " )
print ( f "Line { e.lineno } , Column { e.colno } " )
# Option 1: Skip file
data = []
# Option 2: Attempt manual repair
# Option 3: Re-fetch data
Multi-threaded Error Handling
ThreadPoolExecutor Patterns
Many scripts use threading for parallel API calls:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_with_retry ( item , max_retries = 3 ):
"""Fetch with exponential backoff retry."""
for attempt in range (max_retries):
try :
# API call
response = requests.post(url, json = payload, timeout = 10 )
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1 :
print ( f "Failed after { max_retries } attempts: { e } " )
return None
# Exponential backoff: 1s, 2s, 4s
time.sleep( 2 ** attempt)
return None
# Execute with thread pool
with ThreadPoolExecutor( max_workers = 20 ) as executor:
future_to_stock = {
executor.submit(fetch_with_retry, item): item[ "Symbol" ]
for item in stock_list
}
for future in as_completed(future_to_stock):
symbol = future_to_stock[future]
try :
result = future.result()
if result:
success_count += 1
else :
error_count += 1
except Exception as e:
print ( f "Unexpected error for { symbol } : { e } " )
error_count += 1
Logging and Diagnostics
Enable Detailed Logging
Add logging to scripts for better debugging:
import logging
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(levelname)s - %(message)s ' ,
handlers = [
logging.FileHandler( 'pipeline.log' ),
logging.StreamHandler()
]
)
logger = logging.getLogger( __name__ )
# Use in scripts
logger.info( f "Processing { symbol } " )
logger.warning( f "Missing field 'P/E' for { symbol } " )
logger.error( f "API call failed for { symbol } : { error } " )
Check Pipeline Logs
# View real-time logs
tail -f pipeline.log
# Search for errors
grep ERROR pipeline.log
# Count failures by type
grep "Network error" pipeline.log | wc -l
Recovery Strategies
Partial Re-runs
If a Phase 2 script fails, you can re-run just that script:
# Re-fetch company filings without re-running full pipeline
python3 fetch_company_filings.py
# Then re-run enrichment phases
python3 advanced_metrics_processor.py
python3 add_corporate_events.py
Checkpoint-based Recovery
Modify scripts to skip already-processed items:
# Example: Skip stocks with existing files
output_path = f " { OUTPUT_DIR } / { symbol } _news.json"
if os.path.exists(output_path) and os.path.getsize(output_path) > 10 :
print ( f "Skipping { symbol } (already exists)" )
return "skipped"
# Fetch and save
# ...
Enable checkpoint mode:
# In fetch_company_filings.py (line 12)
FORCE_UPDATE = False # Skip existing files
Common Troubleshooting Scenarios
Scenario 1: “master_isin_map.json not found”
Cause : fetch_dhan_data.py failed or didn’t run
Solution :
python3 fetch_dhan_data.py
# Check output
ls -lh master_isin_map.json
Scenario 2: “Empty fundamental_data.json”
Cause : API endpoint changed or rate limited
Solution :
Check API endpoint in fetch_fundamental_data.py
Test API call manually with curl
Add delays between requests
Scenario 3: “Compression failed”
Cause : Disk full or corrupted JSON file
Solution :
# Check disk space
df -h
# Validate JSON syntax
jq . all_stocks_fundamental_analysis.json > /dev/null
# Manual compression
gzip -9 all_stocks_fundamental_analysis.json
Scenario 4: “OHLCV data missing dates”
Cause : Market holiday or API gap
Solution : OHLCV fetcher auto-fills gaps; verify date range:
import pandas as pd
df = pd.read_csv( 'ohlcv_data/RELIANCE.csv' )
print (df[ 'Date' ].min(), df[ 'Date' ].max())
Monitoring and Alerts
Email Alerts on Failure
import smtplib
from email.mime.text import MIMEText
def send_alert ( subject , body ):
msg = MIMEText(body)
msg[ 'Subject' ] = subject
msg[ 'From' ] = '[email protected] '
msg[ 'To' ] = '[email protected] '
with smtplib.SMTP( 'smtp.gmail.com' , 587 ) as server:
server.starttls()
server.login( 'user' , 'password' )
server.send_message(msg)
# In run_full_pipeline.py
if failed > 0 :
send_alert(
'Pipeline Failures Detected' ,
f ' { failed } scripts failed. Check logs.'
)
Health Check Script
#!/bin/bash
# check_pipeline_health.sh
FILE = "all_stocks_fundamental_analysis.json.gz"
MAX_AGE_HOURS = 36
if [ ! -f " $FILE " ]; then
echo "ERROR: Output file missing"
exit 1
fi
AGE = $(( $(date +%s ) - $( stat -c %Y " $FILE " ) ))
AGE_HOURS = $(( AGE / 3600 ))
if [ $AGE_HOURS -gt $MAX_AGE_HOURS ]; then
echo "WARNING: Data is $AGE_HOURS hours old"
exit 1
fi
echo "OK: Data is fresh ( $AGE_HOURS hours old)"
exit 0
Next Steps
Performance Tuning Optimize threading, batching, and timeouts
Incremental Updates Set up automated daily updates with monitoring