Overview
Wrappers execute as separate Python processes that fetch data from sources and send it to RabbitMQ. The execution behavior differs based on the data source type, supporting both one-time batch processing and continuous monitoring.
Execution Modes
File-Based Execution (CSV/XLSX)
File wrappers perform one-time execution :
Read File
The wrapper reads the entire CSV or Excel file from disk.
Parse Data
Data is parsed into time-series data points with timestamps and values.
Send to RabbitMQ
All data points are sent to the data queue in batches.
Complete
Wrapper marks itself as completed and exits.
Status progression: pending → generating → executing → completed
API-Based Execution
API wrappers run continuously with two phases:
Historical Phase
Continuous Phase
Fetch Historical Data
Retrieves all available historical data from the API.
Send Data
Sends historical data points to RabbitMQ.
Set High Water Mark
Records the newest timestamp as high_water_mark.
Checkpoint
Updates database with checkpoint state for resumability.
Poll for New Data
Periodically queries the API for data newer than high_water_mark.
Send New Data
Sends only new data points to avoid duplicates.
Update High Water Mark
Updates high_water_mark with the newest timestamp received.
Sleep and Repeat
Waits before the next poll (configurable interval).
Status: Remains in executing state indefinitely until stopped.
API wrappers use checkpointing to resume from the last successful data point after restarts, preventing data loss and duplication.
Executing a Wrapper
Automatic Execution
Wrappers execute automatically after generation:
curl -X POST "http://localhost:8000/api/v1/wrappers/generate" \
-H "Content-Type: application/json" \
-d '{
"source_type": "API",
"source_config": {...},
"metadata": {...}
}'
The generation process includes automatic execution - no additional step needed.
Manual Execution
Execute an already-generated wrapper manually:
curl -X POST "http://localhost:8000/api/v1/wrappers/{wrapper_id}/execute"
Response:
{
"wrapper_id" : "a1b2c3d4-e5f6-7890-abcd-ef1234567890" ,
"success" : true ,
"message" : "Wrapper executed successfully" ,
"data_points_sent" : 150 ,
"execution_time" : "2026-03-03T11:25:30Z"
}
Process Management
Wrappers run as separate processes managed by the WrapperProcessManager.
Process Lifecycle
Process Creation
A new Python process is spawned with the generated wrapper code. # Internal process creation (simplified)
subprocess.Popen(
[ "python" , "-u" , wrapper_file_path, wrapper_id, source_config_json],
env = env_with_credentials,
stdout = log_file,
stderr = error_log
)
Process Monitoring
The process manager checks process health every 30 seconds.
Log Management
Output is captured to log files:
/app/wrapper_logs/{wrapper_id}_stdout.log
/app/wrapper_logs/{wrapper_id}_stderr.log
Health Checks
Process status is monitored and reported via health endpoints.
Process Adoption on Restart
When the service restarts (e.g., during deployment), the process manager automatically adopts orphaned wrapper processes by scanning /proc for Python processes running wrapper files.
This ensures continuous execution even during service updates.
Stopping a Wrapper
Stop a running wrapper gracefully:
curl -X POST "http://localhost:8000/api/v1/wrappers/{wrapper_id}/stop"
Response:
{
"wrapper_id" : "a1b2c3d4-e5f6-7890-abcd-ef1234567890" ,
"success" : true ,
"message" : "Wrapper stopped successfully"
}
SIGTERM Sent
A graceful termination signal (SIGTERM) is sent to the process.
Wait for Cleanup
The system waits up to 10 seconds for the process to clean up.
Force Kill if Needed
If the process doesn’t exit, SIGKILL is sent to force termination.
Status Update
Database status is updated to stopped or completed.
Cleanup
Process is removed from tracking (logs and code are preserved).
Stopping an API wrapper loses the in-memory state. The wrapper can resume from the last checkpoint (high_water_mark) but may miss data between the last checkpoint and stop time.
Checkpointing and Resumability
API wrappers support resumability through checkpointing:
Checkpoint Fields
Field Type Description phasestring Current execution phase: historical or continuous high_water_markdatetime Newest data point timestamp ever sent low_water_markdatetime Oldest data point timestamp ever sent data_points_countinteger Total data points sent since creation last_data_sentdatetime Timestamp of last successful data transmission
Resume After Restart
When the service restarts, executing wrappers automatically resume:
# Service startup resumes executing wrappers
await wrapper_service.restart_executing_wrappers()
Wrappers resume from their last checkpoint:
If in historical phase: Continue fetching historical data
If in continuous phase: Poll for new data since high_water_mark
Checkpointing ensures no data is lost or duplicated during service restarts or temporary failures.
Monitoring Execution
Check if Actively Executing
curl "http://localhost:8000/api/v1/wrappers/{wrapper_id}/health"
Response:
{
"wrapper_id" : "a1b2c3d4-e5f6-7890-abcd-ef1234567890" ,
"health_status" : "HEALTHY" ,
"is_actively_executing" : true
}
Health Statuses
Status Description HEALTHYProcess is running normally STALLEDProcess hasn’t sent data in expected timeframe DEGRADEDProcess is running but experiencing issues CRASHEDProcess has terminated unexpectedly UNKNOWNCannot determine process state
Get Detailed Monitoring
curl "http://localhost:8000/api/v1/wrappers/{wrapper_id}/monitoring"
Response:
{
"wrapper_id" : "a1b2c3d4-e5f6-7890-abcd-ef1234567890" ,
"health_status" : "HEALTHY" ,
"monitoring_details" : {
"process_id" : 12345 ,
"exit_code" : null ,
"uptime_seconds" : 3600.5 ,
"started_at" : "2026-03-03T10:30:00Z" ,
"last_health_check" : "2026-03-03T11:30:00Z" ,
"log_files" : {
"stdout" : "/app/wrapper_logs/a1b2c3d4-e5f6-7890-abcd-ef1234567890_stdout.log" ,
"stderr" : "/app/wrapper_logs/a1b2c3d4-e5f6-7890-abcd-ef1234567890_stderr.log"
}
}
}
Execution Logs
View real-time execution logs:
curl "http://localhost:8000/api/v1/wrappers/{wrapper_id}/logs?limit=100"
Response:
{
"wrapper_id" : "a1b2c3d4-e5f6-7890-abcd-ef1234567890" ,
"logs" : [
"[STDOUT] Starting wrapper execution..." ,
"[STDOUT] Fetching data from API: https://api.example.com/data" ,
"[STDOUT] Retrieved 150 data points" ,
"[STDOUT] Sent 150 data points to RabbitMQ" ,
"[STDOUT] Updated high_water_mark: 2026-03-03T11:00:00Z" ,
"[STDOUT] Entering continuous polling mode..."
],
"total_lines" : 6
}
Logs include both stdout and stderr, labeled accordingly. The limit parameter controls how many recent lines to return (default: 200).
Error Handling
Execution Errors
Error: {
"detail" : "Wrapper not found"
}
Solution: Verify the wrapper_id is correct and the wrapper exists.
Error: {
"detail" : "Execution error: Wrapper file not found"
}
Solution: The wrapper code file may have been deleted. Regenerate the wrapper.
Service unavailable (503)
Error: {
"detail" : "Service unavailable: Database connection timeout"
}
Solution: The service is experiencing connectivity issues. Retry after a moment.
Crashed Wrappers
When a wrapper crashes, the system:
Detects process termination via exit code
Captures last log lines (stdout and stderr)
Updates database status to error
Stores error message with crash logs
Cleans up process resources
Crashed Wrapper Status:
{
"wrapper_id" : "a1b2c3d4-e5f6-7890-abcd-ef1234567890" ,
"status" : "error" ,
"error_message" : "Process crashed. Last logs: \n [STDERR] ConnectionError: Failed to connect to API \n [STDERR] Retrying in 5 seconds... \n [STDERR] Max retries exceeded" ,
"completed_at" : "2026-03-03T11:45:00Z"
}
Best Practices
Monitor Health Regularly Set up periodic health checks for API wrappers to catch issues early.
Review Logs on Errors Check stdout and stderr logs to diagnose execution problems.
Use Checkpoints For API wrappers, rely on automatic checkpointing for resilience.
Graceful Stops Always use the stop endpoint rather than killing processes directly.
Next Steps
Monitoring Deep dive into monitoring wrapper health and performance
Generating Wrappers Learn how to generate wrappers for different data sources
API Reference View complete wrapper execution API documentation
File Upload Upload CSV/XLSX files for wrapper generation