Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/najmulhossainnj/Hedge-fund-backend/llms.txt

Use this file to discover all available pages before exploring further.

The Hedge Fund Backend is designed as a single-responsibility Research Layer service within a larger microservice architecture. It owns the full lifecycle of quantitative strategy research — from feature computation and model training through backtesting and validation — and delegates market data ingestion to an upstream service and live portfolio execution to a downstream one. Internally the platform is structured as a series of concentric layers: a thin FastAPI routing layer, a set of stateless engine pipelines that implement the business logic, a plugin registry that decouples compute from infrastructure, and a durable storage tier spread across PostgreSQL, S3/MinIO, Redis, and MLflow. Understanding this separation makes it straightforward to extend any single layer without touching the others.

Project Layout

The entire application lives under app/, which is partitioned by responsibility rather than by feature:
backend/app/
├── api/                  REST routers, one sub-package per resource
│   ├── strategies/       strategies CRUD + promotion_router
│   ├── features/         features CRUD + generation_router
│   ├── models/           models CRUD + training_router
│   ├── backtests/        backtests CRUD + sweep_router
│   ├── experiments/      experiments CRUD + compare endpoint
│   ├── signals/          signal generation endpoints
│   ├── validation/       walk-forward + CPCV validation
│   ├── tracking/         MLflow run tracking endpoints
│   ├── news/             news ingestion + FinBERT sentiment
│   └── agents/           AI research agent endpoints

├── core/                 Cross-cutting infrastructure
│   ├── config.py         pydantic-settings Settings class
│   ├── storage.py        boto3/MinIO S3 client wrapper
│   ├── cache.py          Redis cache client
│   └── mlflow_client.py  MLflow tracking client wrapper

├── db/                   Database layer
│   ├── session.py        async SQLAlchemy engine + session factory
│   ├── mixins.py         declarative base with UUID PK mixin
│   ├── crud_base.py      generic CRUDRepository (no per-resource boilerplate)
│   └── models_registry.py  single import point for Alembic autogenerate

├── domain/               ORM models + Pydantic schemas, one sub-package per resource
│   ├── strategy/         Strategy ORM + Create/Update/Read schemas
│   ├── feature/          Feature + FeatureDataset ORM + schemas
│   ├── model/            MLModel + Experiment ORM + schemas
│   ├── signal/           Signal ORM + schemas
│   └── backtest/         Backtest ORM + schemas

├── engines/              Stateless pipeline implementations
│   ├── feature_engine/        FeaturePipeline, FeatureStore, versioning, market_data_client
│   ├── backtest_engine/       BacktestPipeline, VectorBTAdapter, BacktraderAdapter, metrics, storage
│   ├── model_training_engine/ Trainer, CV splitters, Optuna tuner, AutoML, dataset assembler
│   ├── signal_engine/         Signal generation pipeline
│   ├── news_engine/           FinBERT sentiment pipeline
│   ├── validation_engine/     Walk-forward + CPCV validation
│   ├── tracking/              MLflow tracking service
│   └── promotion/             Strategy promotion service

├── plugins/              Abstract interfaces + plugin registries
│   ├── base.py           BaseFeature, BaseModel, BaseSignalGenerator, BaseBacktestEngine ABCs
│   ├── registry.py       PluginRegistry (maps key → class)
│   ├── features/         Feature plugin implementations (technical, statistical, news_sentiment)
│   ├── models/           Model plugin implementations (xgboost, lightgbm, catboost, lstm, rf)
│   └── signals/          Signal plugin implementations (threshold, long_short, ranking, portfolio)

├── workers/              Celery task definitions
│   ├── celery_app.py     shared Celery application instance
│   ├── feature_tasks.py  feature generation tasks
│   ├── training_tasks.py model train + tune tasks
│   ├── backtest_tasks.py backtest execution task
│   ├── signal_tasks.py   signal generation tasks
│   ├── validation_tasks.py walk-forward + CPCV validation tasks
│   └── sweep_tasks.py    parameter sweep fan-out task

├── events/               Event bus (Kafka / NATS / noop)
│   ├── bus.py            consumer/producer factory
│   └── handlers.py       dispatch() — routes inbound events to engine calls

└── agents/               AI research agent framework
    └── ...               LLM-orchestrated multi-step research workflows
The interactive Swagger UI is available at /docs once the server is running. It documents every registered endpoint with full request/response schemas and a built-in request builder.

Request Lifecycle

Every synchronous API call follows a consistent path through the application:
HTTP Request


FastAPI Router  (app/api/<resource>/router.py)
    │  path/query params validated by Pydantic v2

Engine / Service  (app/engines/<engine>/pipeline.py)
    │  pure business logic — no HTTP context
    ├─── Plugin Registry lookup  (app/plugins/registry.py)
    │       │  resolves plugin key → concrete class
    │       ▼
    │    Plugin.compute()   (app/plugins/<type>/*.py)
    │       │  feature calculation / model inference / signal generation
    │       ▼
    ├─── Postgres write  (app/db/crud_base.py via SQLAlchemy async session)
    │       domain model row created/updated

    └─── S3/MinIO write  (app/core/storage.py)
            Parquet artifact persisted (feature arrays, equity curves, trade lists)


JSON Response  (Pydantic Read schema)
Routers are intentionally thin: they own request parsing, dependency injection (database session, settings), and response serialisation — nothing else. All domain logic lives in the engine layer.

Async Task Flow

Compute-heavy operations — model training, hyperparameter tuning, feature generation, backtest execution, and parameter sweeps — are dispatched to the Celery worker pool so that API calls return immediately with a task_id rather than blocking for minutes:
POST /api/v1/models/{id}/train/async


FastAPI router dispatches Celery task
    │  returns {"task_id": "<uuid>", "status": "PENDING"}

Celery worker (app/workers/training_tasks.py)
    │  executes full engine pipeline asynchronously
    │  reads/writes Postgres + S3 + MLflow

Result stored in Redis result backend


Client polls  GET /api/v1/tasks/{task_id}
    │  {"task_id": "...", "status": "SUCCESS", "result": {...}}
The GET /api/v1/tasks/{task_id} endpoint is generic — it works for training, tuning, feature generation, backtest, and sweep tasks without any per-task router code.
# app/main.py — generic task-status polling
@app.get("/api/v1/tasks/{task_id}", tags=["tasks"])
async def get_task_status(task_id: str):
    result = AsyncResult(task_id, app=celery_app)
    response = {"task_id": task_id, "status": result.status}
    if result.ready():
        if result.successful():
            response["result"] = result.result
        else:
            response["error"] = str(result.result)
    return response

Storage Layers

The platform uses four complementary storage technologies, each selected for what it does best: PostgreSQL — relational domain state All domain model rows live in Postgres: strategies, feature definitions, FeatureDataset version records, ML model metadata, experiment runs, and backtest configuration and metrics. SQLAlchemy 2.0’s async engine (asyncpg driver) means no blocking I/O on the main event loop. Alembic manages all schema migrations.
TablePurpose
strategiesStrategy definitions + universe + timeframe
featuresFeature plugin configurations
feature_datasetsPer-symbol/timeframe/date-range versioned instances
ml_modelsModel plugin configs, status, S3 artifact path
experimentsExperiment containers linking runs to strategies
backtestsBacktest configs, engine choice, JSONB metrics
S3 / MinIO — binary artifact storage Large binary objects are stored in two dedicated buckets:
  • feature-store — computed feature arrays as Parquet files, keyed by the SHA-256 content hash for deduplication.
  • research-artifacts — trained model files (XGBoost .ubj, LightGBM .txt, PyTorch .pt), equity curves, trade lists, drawdown series, and MLflow artifact directories.
The app/core/storage.py boto3 wrapper is the single access point for all S3 operations across engines. Redis — cache and message broker Redis serves two roles simultaneously:
  • Feature read cache (app/core/cache.py): recently-accessed feature Parquet payloads are cached in Redis so that repeated calls within a research session skip the S3 round-trip.
  • Celery broker + result backend: three separate Redis databases are used — db 0 for the application cache, db 1 for the Celery task broker queue, and db 2 for the Celery result backend.
MLflow — experiment governance MLflow tracks every training and tuning run: hyperparameters, CV metrics (MSE, MAE, directional accuracy per fold), and final model performance. Artifacts are stored to s3://research-artifacts/mlflow via the S3 artifact store. The GET /api/v1/experiments/compare endpoint diffs metrics across up to 10 runs and highlights the best performer per metric.

Event Bus

On application startup, a Kafka consumer is registered to listen for events published by the upstream Market Data Layer:
# app/main.py — startup hook
@app.on_event("startup")
async def _startup():
    consumer = get_consumer()
    consumer.start(
        topics=["market.datasetcreated", "market.datasetupdated"],
        handler=dispatch,
    )
The dispatch handler in app/events/handlers.py routes each inbound event to the appropriate engine call — for example, a market.datasetcreated event can trigger an automatic feature regeneration for any strategy that depends on the affected dataset. The event backend is configurable via the EVENT_BACKEND environment variable:
ValueBehaviour
kafkaReal Kafka consumer/producer via kafka-python-ng
natsNATS JetStream consumer/producer
noopNo-op stub — all publish/consume calls are silent (safe for local dev without Kafka)
Set EVENT_BACKEND=noop in your local .env if you do not want to run a Kafka broker during development. The API will function normally; only the reactive event-driven feature regeneration will be disabled.

External Dependencies

The Research Layer integrates with two external services at runtime: Market Data Layer (MARKET_DATA_URL) The FeaturePipeline fetches OHLCV price data and raw news articles from the Market Data Layer via app/engines/feature_engine/market_data_client.py (an httpx-based async HTTP client). The base URL is configured via the MARKET_DATA_URL environment variable (default: http://localhost:8001). All feature computation is gated on this client — if the Market Data Layer is unavailable, feature generation tasks will fail gracefully and mark the FeatureDataset row as FAILED. Portfolio Layer (strategy promotion) When a strategy passes validation, it can be promoted to the Portfolio Layer via POST /api/v1/strategies/{id}/promote. The promotion router (app/api/strategies/promotion_router.py) serialises the strategy configuration and forwards it to the Portfolio Layer’s ingestion endpoint, closing the loop from research to live execution.

Plugin Architecture

The plugin system is the extensibility backbone of the platform. Every compute-heavy operation — feature calculation, model inference, signal generation, backtest simulation — is implemented as a plugin that satisfies a typed abstract base class. The PluginRegistry maps string keys to plugin classes, and engines resolve the correct plugin at runtime using these keys.
PluginRegistry  (app/plugins/registry.py)
        │   Base ABCs defined in app/plugins/base.py

        ├── Feature Plugins      (BaseFeature)
        │     ├── Technical        app/plugins/features/technical.py
        │     ├── Statistical      app/plugins/features/statistical.py
        │     ├── NewsSentiment    app/plugins/features/news_sentiment.py
        │     └── <YourPlugin>     key: "<your_key>"

        ├── Model Plugins        (BaseModel)
        │     ├── XGBoostModel     key: "xgboost"
        │     ├── LightGBMModel    key: "lightgbm"
        │     ├── CatBoostModel    key: "catboost"
        │     ├── RandomForest     key: "random_forest"
        │     └── LSTMModel        key: "lstm"

        ├── Signal Plugins       (BaseSignalGenerator)
        │     ├── ThresholdSignal  key: "threshold"
        │     ├── LongShortSignal  key: "long_short"
        │     ├── RankingSignal    key: "ranking"
        │     └── PortfolioSignal  key: "portfolio"

        └── Backtest Engines     (BaseBacktestEngine)
              ├── VectorBTAdapter   key: "vectorbt"
              └── BacktraderAdapter key: "backtrader"
To add a new plugin, implement the appropriate abstract base class from app/plugins/base.py, place the module in the relevant sub-package under app/plugins/, and call registry.register("<your_key>", YourPluginClass). No changes to routers, engines, or any other core module are required.
Consult the Plugin Architecture guide for a step-by-step walkthrough of implementing and registering a custom feature plugin, including the required method signatures and how to surface plugin-specific hyperparameter search spaces to the Optuna tuner.

Build docs developers (and LLMs) love