Skip to main content

Overview

The Resource Service is a FastAPI-based microservice that manages sustainability indicators through an automated wrapper generation and execution system. It leverages AI to dynamically create data collection wrappers and manages their lifecycle from creation to continuous execution.

Core Components

The system is built on several key architectural components that work together to provide a complete data collection and management solution.

1. FastAPI Application Layer

The main application entry point (app/main.py) orchestrates the service lifecycle:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from routes import router as api_router
from dependencies.rabbitmq import rabbitmq_client

app = FastAPI()
app.include_router(api_router)
The application uses an asynccontextmanager lifespan to handle startup and shutdown events, ensuring proper initialization and cleanup of resources.
Key Responsibilities:
  • CORS configuration for cross-origin requests
  • API route registration
  • Lifecycle management (startup/shutdown)
  • Dependency injection setup

2. Database Layer (MongoDB)

MongoDB stores all persistent data including resources, wrappers, and execution metadata. Primary Collections:
Stores information about sustainability indicators:
  • wrapper_id: Associated wrapper identifier
  • name: Resource display name
  • type: Resource classification
  • startPeriod: Earliest data point timestamp
  • endPeriod: Latest data point timestamp
  • deleted: Soft delete flag
Manages wrapper lifecycle and configuration:
  • wrapper_id: Unique identifier (UUID)
  • status: Current execution state
  • generated_code: AI-generated Python code
  • source_type: Data source (API, CSV, XLSX)
  • metadata: Indicator metadata
  • execution_log: Historical events
  • high_water_mark/low_water_mark: Checkpoint timestamps

3. Message Queue (RabbitMQ)

RabbitMQ provides asynchronous task processing and inter-service communication.
class RabbitMQClient:
    def __init__(self, url: str, pool_size: int = 5):
        self.url = url
        self.pool_size = pool_size
        self.connection = None
        self.channel_pool = None
Queue Types:
Queue NamePurposeMessage Format
wrapper_creation_queueAsync wrapper generation tasks{"wrapper_id": "uuid"}
resource_dataData points from wrappers{"resource_id": "id", "points": [...]}
resource_deletedResource deletion notifications{"resource_id": "id", "wrapper_id": "uuid"}
collected_dataRaw collected dataSource-specific formats
The RabbitMQ client uses a channel pool pattern (default 5 channels) to efficiently handle concurrent message publishing and consumption.

4. AI Wrapper Generator

The WrapperGenerator service uses Google’s Gemini AI to create customized data collection wrappers.
class WrapperGenerator:
    def __init__(
        self,
        gemini_api_key: str,
        rabbitmq_url: str,
        debug_mode: bool = False,
        model_name: str = "gemini-2.5-flash"
    ):
        self.client = genai.Client(api_key=gemini_api_key)
        self.model_name = model_name
        self.prompt_manager = PromptManager()
Generation Process:
  1. Extract sample data from source (CSV/XLSX/API)
  2. Build contextual prompt with indicator metadata
  3. Call Gemini with tool support (for API sources)
  4. Validate generated Python code
  5. Retry on linting errors
  6. Save to /app/generated_wrappers/{wrapper_id}.py
The generator includes automatic code validation to prevent syntax errors and placeholder code from being saved. If validation fails, the system automatically retries with error feedback to the AI.

5. Wrapper Process Manager

Manages the execution lifecycle of generated wrappers as separate Python processes. Capabilities:
  • Launch wrappers as background processes
  • Monitor process health and status
  • Stop running wrappers
  • Resume execution after service restart
  • Track execution metrics

6. Resource Service

Provides CRUD operations for resources with wrapper integration.
async def create_resource(resource_data: ResourceCreate) -> Optional[dict]:
    wrapper_id = resource_data.wrapper_id
    
    # Validate wrapper exists
    wrapper = await db.generated_wrappers.find_one({"wrapper_id": wrapper_id})
    if not wrapper:
        raise ValueError(f"Wrapper with ID '{wrapper_id}' does not exist")
    
    # Create resource
    resource_dict = deserialize(resource_data.dict())
    resource_dict["deleted"] = False
    resource_dict["startPeriod"] = None
    resource_dict["endPeriod"] = None
    result = await db.resources.insert_one(resource_dict)

Component Interaction Flow

Wrapper Creation Flow

Data Collection Flow

Configuration

The service uses Pydantic Settings for configuration management:
class Settings(BaseSettings):
    # Database
    MONGO_URI: str = "mongodb://localhost:27017"
    
    # Message Queue
    RABBITMQ_URL: str = "amqp://guest:guest@rabbitmq/"
    
    # AI Generation
    GEMINI_API_KEY: str
    GEMINI_MODEL_NAME: str = "gemini-1.5-flash"
    
    # Application
    ORIGINS: str = "localhost"
All settings can be overridden via environment variables. See the .env.example file for a complete list of available configuration options.

Docker Deployment

The service is containerized with Docker Compose:
services:
  resource-service:
    build: .
    environment:
      - MONGO_URI=mongodb://resource-mongo:27017/resources
      - GEMINI_API_KEY=${GEMINI_API_KEY}
    volumes:
      - resource_generated_wrappers:/app/generated_wrappers
      - resource_wrapper_logs:/app/wrapper_logs
      - resource_prompts:/app/prompts
    networks:
      - resource-network

  resource-mongo:
    image: mongo:latest
    volumes:
      - resource_db:/data/db
    networks:
      - resource-network
Persistent Volumes:
  • resource_generated_wrappers: Stores generated wrapper Python files
  • resource_wrapper_logs: Execution logs for debugging
  • resource_prompts: AI prompt/response debug data
  • resource_db: MongoDB data directory

Health Monitoring

The service includes comprehensive health checks:
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await rabbitmq_client.connect()
    await rabbitmq_client.start_consumers()
    await wrapper_process_manager.start_monitoring()
    await wrapper_service.restart_executing_wrappers()
    
    try:
        yield
    finally:
        # Shutdown
        await wrapper_process_manager.stop_monitoring()
        await rabbitmq_client.close()
The service automatically restarts continuous wrappers that were executing before a service restart, ensuring no data collection gaps.

API Structure

Routes are organized by domain:
router.include_router(resource_router, prefix="/resources")
router.include_router(wrapper_router, prefix="/resources/wrappers")
router.include_router(file_router, prefix="/resources/wrappers/files")
router.include_router(health_router, prefix="/health")

Next Steps

Resources

Learn about resource structure and management

Wrappers

Understand wrapper generation and execution

Data Sources

Explore supported data source types

API Reference

View complete API documentation

Build docs developers (and LLMs) love