Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/najmulhossainnj/Hedge-fund-backend/llms.txt

Use this file to discover all available pages before exploring further.

All long-running compute operations in the Hedge Fund Backend — model training, hyperparameter tuning, feature generation, backtest execution, parameter sweeps, signal generation, walk-forward validation, and CPCV validation — can be dispatched to the Celery worker pool. When dispatched asynchronously the API returns immediately with a task_id; the client then polls a lightweight status endpoint until the task completes. This design keeps the FastAPI event loop free for interactive requests while expensive CPU/GPU work runs in isolated worker processes.

Available Async Endpoints

Every compute-heavy domain exposes a dedicated async route alongside its synchronous counterpart:
EndpointCelery task nameDescription
POST /api/v1/models/{id}/train/asynctraining.train_modelTrain a registered ML model on assembled feature data
POST /api/v1/models/tune/asynctraining.tune_modelRun an Optuna hyperparameter search over a defined parameter space
POST /api/v1/features/{id}/generatefeatures.generateCompute and persist a feature dataset for a symbol/timeframe window
POST /api/v1/backtests/{id}/execute/asyncbacktests.executeExecute a single backtest run against vectorbt or Backtrader
POST /api/v1/backtests/sweepbacktests.sweepRun a parameter grid sweep and return a ranked leaderboard
POST /api/v1/signals/generate/asyncsignals.generateGenerate trade signals from a trained model over a date range
POST /api/v1/validation/walk-forward/asyncvalidation.walk_forwardWalk-forward validation with configurable rolling/expanding splits
POST /api/v1/validation/cpcv/asyncvalidation.cpcvCombinatorially Purged Cross-Validation (CPCV) with PBO and deflated Sharpe
All async endpoints return the same envelope immediately:
{
  "task_id": "b3d2e1f0-84a2-4c3b-9f1e-2a7d5c8b9e04",
  "status": "PENDING"
}

Polling Task Status

Use GET /api/v1/tasks/{task_id} to check progress. The response shape varies by state:
{
  "task_id": "b3d2e1f0-84a2-4c3b-9f1e-2a7d5c8b9e04",
  "status": "PENDING",
  "result": null,
  "error": null
}
The task is queued but has not been picked up by a worker yet.

Celery Worker Setup

1
Install dependencies
2
Workers require the same requirements.txt as the API server. In Docker this is handled automatically by Dockerfile.worker. For local development:
3
pip install -r requirements.txt
4
Start the worker process
5
celery -A app.workers.celery_app worker --loglevel=info
6
The worker will connect to CELERY_BROKER_URL and begin consuming tasks from all registered modules.
7
Verify the worker is connected
8
Open a second terminal and run:
9
celery -A app.workers.celery_app inspect active

Worker Configuration

The Celery application is configured in app/workers/celery_app.py:
from celery import Celery
from app.core.config import get_settings

settings = get_settings()

celery_app = Celery(
    "research_layer",
    broker=settings.CELERY_BROKER_URL,       # redis://localhost:6379/1
    backend=settings.CELERY_RESULT_BACKEND,  # redis://localhost:6379/2
    include=[
        "app.workers.training_tasks",
        "app.workers.feature_tasks",
        "app.workers.signal_tasks",
        "app.workers.backtest_tasks",
        "app.workers.sweep_tasks",
        "app.workers.validation_tasks",
    ],
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,  # One task at a time per worker — safe for GPU/memory
)
Key settings to note:
  • task_acks_late=True — the task message is acknowledged only after the task finishes, preventing data loss if a worker crashes mid-execution.
  • worker_prefetch_multiplier=1 — each worker process holds at most one task at a time, preventing memory exhaustion on large training jobs.
  • task_track_started=True — enables the STARTED state so polling clients can distinguish “queued” from “running”.

Available Task Modules

ModuleTask names registered
app.workers.training_taskstraining.train_model, training.tune_model
app.workers.feature_tasksfeatures.generate
app.workers.backtest_tasksbacktests.execute
app.workers.signal_taskssignals.generate
app.workers.validation_tasksvalidation.walk_forward, validation.cpcv
app.workers.sweep_tasksbacktests.sweep

Full Async Workflow Example

The following example dispatches a model training job, polls until it completes, then fetches the trained model record:
import time
import httpx

BASE = "http://localhost:8000/api/v1"

# 1. Dispatch the training task
resp = httpx.post(f"{BASE}/models/{model_id}/train/async", json={
    "feature_ids": ["feat-uuid-1", "feat-uuid-2"],
    "symbol": "AAPL",
    "timeframe": "1d",
    "start_date": "2022-01-01",
    "end_date": "2024-01-01",
    "target_horizon": 1,
})
resp.raise_for_status()
task_id = resp.json()["task_id"]
print(f"Task dispatched: {task_id}")

# 2. Poll until terminal state
while True:
    status_resp = httpx.get(f"{BASE}/tasks/{task_id}")
    status_resp.raise_for_status()
    data = status_resp.json()
    state = data["status"]

    print(f"  status={state}")

    if state == "SUCCESS":
        print("Training complete!")
        print(f"  artifact_uri: {data['result']['artifact_uri']}")
        print(f"  cv_metrics:   {data['result']['cv_metrics']}")
        break
    elif state == "FAILURE":
        raise RuntimeError(f"Training failed: {data['error']}")

    # Back off before next poll
    time.sleep(3)

# 3. Fetch the updated model record
model_resp = httpx.get(f"{BASE}/models/{model_id}")
print(model_resp.json())
Control how many tasks run in parallel by setting --concurrency when starting the worker. The Docker image default is 2. On a GPU machine with limited VRAM you may want --concurrency=1; on a high-CPU inference node you can increase it safely:
celery -A app.workers.celery_app worker --loglevel=info --concurrency=8
For heterogeneous workloads, consider routing GPU-intensive training tasks to a dedicated queue with its own worker pool using Celery’s routing keys.

Build docs developers (and LLMs) love