Skip to main content

Overview

Wrappers execute as separate Python processes that fetch data from sources and send it to RabbitMQ. The execution behavior differs based on the data source type, supporting both one-time batch processing and continuous monitoring.

Execution Modes

File-Based Execution (CSV/XLSX)

File wrappers perform one-time execution:
1

Read File

The wrapper reads the entire CSV or Excel file from disk.
2

Parse Data

Data is parsed into time-series data points with timestamps and values.
3

Send to RabbitMQ

All data points are sent to the data queue in batches.
4

Complete

Wrapper marks itself as completed and exits.
Status progression: pendinggeneratingexecutingcompleted

API-Based Execution

API wrappers run continuously with two phases:
1

Fetch Historical Data

Retrieves all available historical data from the API.
2

Send Data

Sends historical data points to RabbitMQ.
3

Set High Water Mark

Records the newest timestamp as high_water_mark.
4

Checkpoint

Updates database with checkpoint state for resumability.
Status: Remains in executing state indefinitely until stopped.
API wrappers use checkpointing to resume from the last successful data point after restarts, preventing data loss and duplication.

Executing a Wrapper

Automatic Execution

Wrappers execute automatically after generation:
curl -X POST "http://localhost:8000/api/v1/wrappers/generate" \
  -H "Content-Type: application/json" \
  -d '{
    "source_type": "API",
    "source_config": {...},
    "metadata": {...}
  }'
The generation process includes automatic execution - no additional step needed.

Manual Execution

Execute an already-generated wrapper manually:
curl -X POST "http://localhost:8000/api/v1/wrappers/{wrapper_id}/execute"
Response:
{
  "wrapper_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "success": true,
  "message": "Wrapper executed successfully",
  "data_points_sent": 150,
  "execution_time": "2026-03-03T11:25:30Z"
}

Process Management

Wrappers run as separate processes managed by the WrapperProcessManager.

Process Lifecycle

1

Process Creation

A new Python process is spawned with the generated wrapper code.
# Internal process creation (simplified)
subprocess.Popen(
    ["python", "-u", wrapper_file_path, wrapper_id, source_config_json],
    env=env_with_credentials,
    stdout=log_file,
    stderr=error_log
)
2

Process Monitoring

The process manager checks process health every 30 seconds.
3

Log Management

Output is captured to log files:
  • /app/wrapper_logs/{wrapper_id}_stdout.log
  • /app/wrapper_logs/{wrapper_id}_stderr.log
4

Health Checks

Process status is monitored and reported via health endpoints.

Process Adoption on Restart

When the service restarts (e.g., during deployment), the process manager automatically adopts orphaned wrapper processes by scanning /proc for Python processes running wrapper files.
This ensures continuous execution even during service updates.

Stopping a Wrapper

Stop a running wrapper gracefully:
curl -X POST "http://localhost:8000/api/v1/wrappers/{wrapper_id}/stop"
Response:
{
  "wrapper_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "success": true,
  "message": "Wrapper stopped successfully"
}
1

SIGTERM Sent

A graceful termination signal (SIGTERM) is sent to the process.
2

Wait for Cleanup

The system waits up to 10 seconds for the process to clean up.
3

Force Kill if Needed

If the process doesn’t exit, SIGKILL is sent to force termination.
4

Status Update

Database status is updated to stopped or completed.
5

Cleanup

Process is removed from tracking (logs and code are preserved).
Stopping an API wrapper loses the in-memory state. The wrapper can resume from the last checkpoint (high_water_mark) but may miss data between the last checkpoint and stop time.

Checkpointing and Resumability

API wrappers support resumability through checkpointing:

Checkpoint Fields

FieldTypeDescription
phasestringCurrent execution phase: historical or continuous
high_water_markdatetimeNewest data point timestamp ever sent
low_water_markdatetimeOldest data point timestamp ever sent
data_points_countintegerTotal data points sent since creation
last_data_sentdatetimeTimestamp of last successful data transmission

Resume After Restart

When the service restarts, executing wrappers automatically resume:
# Service startup resumes executing wrappers
await wrapper_service.restart_executing_wrappers()
Wrappers resume from their last checkpoint:
  • If in historical phase: Continue fetching historical data
  • If in continuous phase: Poll for new data since high_water_mark
Checkpointing ensures no data is lost or duplicated during service restarts or temporary failures.

Monitoring Execution

Check if Actively Executing

curl "http://localhost:8000/api/v1/wrappers/{wrapper_id}/health"
Response:
{
  "wrapper_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "health_status": "HEALTHY",
  "is_actively_executing": true
}

Health Statuses

StatusDescription
HEALTHYProcess is running normally
STALLEDProcess hasn’t sent data in expected timeframe
DEGRADEDProcess is running but experiencing issues
CRASHEDProcess has terminated unexpectedly
UNKNOWNCannot determine process state

Get Detailed Monitoring

curl "http://localhost:8000/api/v1/wrappers/{wrapper_id}/monitoring"
Response:
{
  "wrapper_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "health_status": "HEALTHY",
  "monitoring_details": {
    "process_id": 12345,
    "exit_code": null,
    "uptime_seconds": 3600.5,
    "started_at": "2026-03-03T10:30:00Z",
    "last_health_check": "2026-03-03T11:30:00Z",
    "log_files": {
      "stdout": "/app/wrapper_logs/a1b2c3d4-e5f6-7890-abcd-ef1234567890_stdout.log",
      "stderr": "/app/wrapper_logs/a1b2c3d4-e5f6-7890-abcd-ef1234567890_stderr.log"
    }
  }
}

Execution Logs

View real-time execution logs:
curl "http://localhost:8000/api/v1/wrappers/{wrapper_id}/logs?limit=100"
Response:
{
  "wrapper_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "logs": [
    "[STDOUT] Starting wrapper execution...",
    "[STDOUT] Fetching data from API: https://api.example.com/data",
    "[STDOUT] Retrieved 150 data points",
    "[STDOUT] Sent 150 data points to RabbitMQ",
    "[STDOUT] Updated high_water_mark: 2026-03-03T11:00:00Z",
    "[STDOUT] Entering continuous polling mode..."
  ],
  "total_lines": 6
}
Logs include both stdout and stderr, labeled accordingly. The limit parameter controls how many recent lines to return (default: 200).

Error Handling

Execution Errors

Error:
{
  "detail": "Wrapper not found"
}
Solution: Verify the wrapper_id is correct and the wrapper exists.
Error:
{
  "detail": "Execution error: Wrapper file not found"
}
Solution: The wrapper code file may have been deleted. Regenerate the wrapper.
Error:
{
  "detail": "Service unavailable: Database connection timeout"
}
Solution: The service is experiencing connectivity issues. Retry after a moment.

Crashed Wrappers

When a wrapper crashes, the system:
  1. Detects process termination via exit code
  2. Captures last log lines (stdout and stderr)
  3. Updates database status to error
  4. Stores error message with crash logs
  5. Cleans up process resources
Crashed Wrapper Status:
{
  "wrapper_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "status": "error",
  "error_message": "Process crashed. Last logs:\n[STDERR] ConnectionError: Failed to connect to API\n[STDERR] Retrying in 5 seconds...\n[STDERR] Max retries exceeded",
  "completed_at": "2026-03-03T11:45:00Z"
}

Best Practices

Monitor Health Regularly

Set up periodic health checks for API wrappers to catch issues early.

Review Logs on Errors

Check stdout and stderr logs to diagnose execution problems.

Use Checkpoints

For API wrappers, rely on automatic checkpointing for resilience.

Graceful Stops

Always use the stop endpoint rather than killing processes directly.

Next Steps

Monitoring

Deep dive into monitoring wrapper health and performance

Generating Wrappers

Learn how to generate wrappers for different data sources

API Reference

View complete wrapper execution API documentation

File Upload

Upload CSV/XLSX files for wrapper generation

Build docs developers (and LLMs) love