Skip to main content

Error Handling Patterns

All AveniECA API methods return a tuple of (response, status_code). Always check both to handle errors gracefully.

Basic Error Handling

from avenieca.api.model import Error, AggregateError

res, status = eca.ess.create(data=ess)

# Check status code first
if status != 201:
    # Try to parse as Error object
    if isinstance(res, Error):
        print("Validation errors:")
        for error in res.errors:
            print(f"  - {error}")
    elif isinstance(res, AggregateError):
        # Handle aggregate-specific errors
        print("Aggregate validation failed")
        if res.field_length_errors:
            print(f"Field length errors: {res.field_length_errors}")
    else:
        # Fallback for unexpected errors
        print(f"Request failed with status {status}: {res}")
else:
    # Success - res is ESSResponse
    print(f"Created ESS with ID {res.id}")

Robust Error Handler

Create a reusable error handler for all API calls:
from typing import Tuple, Any
from avenieca.api.model import Error, AggregateError

def handle_response(
    res: Any,
    status: int,
    expected_status: int = 200,
    operation: str = "API operation"
) -> Tuple[Any, bool]:
    """
    Handle API response with standardized error checking.
    
    Returns: (response, success: bool)
    """
    if status != expected_status:
        print(f"{operation} failed with status {status}")
        
        if isinstance(res, Error):
            for error in res.errors:
                print(f"  Error: {error}")
        elif isinstance(res, AggregateError):
            if res.field_length_errors:
                print(f"  Field length errors: {res.field_length_errors}")
            if res.invalid_ess_db_ids:
                print(f"  Invalid ESS IDs: {res.invalid_ess_db_ids}")
            if res.incorrect_avg_ess_score:
                print(f"  Score calculation: {res.incorrect_avg_ess_score}")
        else:
            print(f"  Response: {res}")
        
        return res, False
    
    return res, True

# Usage
res, status = eca.ess.create(data=ess)
ess_response, success = handle_response(
    res, status,
    expected_status=201,
    operation="ESS creation"
)

if success:
    print(f"ESS created: {ess_response.id}")

Retry Logic

Implement exponential backoff for transient failures:
import time
from typing import Callable, Tuple, Any

def retry_with_backoff(
    operation: Callable[[], Tuple[Any, int]],
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0,
    retryable_status_codes: set = {500, 502, 503, 504}
) -> Tuple[Any, int]:
    """
    Retry an operation with exponential backoff.
    """
    delay = initial_delay
    
    for attempt in range(max_retries):
        res, status = operation()
        
        # Success - return immediately
        if status < 400:
            return res, status
        
        # Client error (4xx) - don't retry
        if status not in retryable_status_codes:
            return res, status
        
        # Server error - retry with backoff
        if attempt < max_retries - 1:
            print(f"Attempt {attempt + 1} failed with status {status}")
            print(f"Retrying in {delay}s...")
            time.sleep(delay)
            delay *= backoff_factor
    
    return res, status

# Usage
res, status = retry_with_backoff(
    lambda: eca.ess.create(data=ess),
    max_retries=3
)

Connection Management

Authentication Best Practices

Cache the session token instead of re-authenticating on every request:
import os
from avenieca.api.model import Config
from avenieca.api.eca import ECA

class ECAClient:
    """Singleton ECA client with session management."""
    
    _instance = None
    _eca = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def get_client(self) -> ECA:
        if self._eca is None:
            config = Config(
                uri=os.getenv("ECA_URI", "http://localhost:2580/v1"),
                username=os.getenv("USERNAME"),
                password=os.getenv("PASSWORD")
            )
            self._eca = ECA(config)
        return self._eca

# Usage - always get the same authenticated client
client = ECAClient()
eca = client.get_client()

res, status = eca.ess.get_all(module_id="temperature")

Environment Configuration

Use environment variables for all configuration:
import os
from dataclasses import dataclass

@dataclass
class AppConfig:
    """Application configuration from environment."""
    
    eca_uri: str
    username: str
    password: str
    kafka_url: str
    
    @classmethod
    def from_env(cls):
        # Required variables
        required = {
            'eca_uri': os.getenv('ECA_URI'),
            'username': os.getenv('USERNAME'),
            'password': os.getenv('PASSWORD'),
            'kafka_url': os.getenv('KAFKA_URL'),
        }
        
        # Check for missing required variables
        missing = [k for k, v in required.items() if not v]
        if missing:
            raise ValueError(
                f"Missing required environment variables: {', '.join(missing)}"
            )
        
        return cls(**required)

# Usage
config = AppConfig.from_env()

eca_config = Config(
    uri=config.eca_uri,
    username=config.username,
    password=config.password
)
eca = ECA(eca_config)

Streaming Performance Tips

Optimize Sync Rate

Choose appropriate sync rates based on your use case:
from avenieca.config.broker import Broker
from avenieca.producers import Stream

# High-frequency sensor data (10 Hz)
stream_fast = Stream(config=broker_config, sync_rate=0.1)

# Medium-frequency monitoring (1 Hz)
stream_medium = Stream(config=broker_config, sync_rate=1.0)

# Low-frequency state updates (every 10 seconds)
stream_slow = Stream(config=broker_config, sync_rate=10.0)
Performance tip: Higher sync rates increase load. Start with lower rates (1-5 seconds) and increase only if needed.

Efficient Signal Handlers

Keep handler functions lightweight:
from avenieca.data import Signal
import numpy as np

# Good: Fast, minimal computation
def efficient_handler():
    return Signal(
        valence=10,
        state=[0.2, 0.3, 0.8]
    )

# Bad: Slow, heavy computation in handler
def slow_handler():
    # Avoid expensive operations
    data = np.random.random(1000)
    processed = complex_algorithm(data)  # Don't do this!
    return Signal(
        valence=processed.mean(),
        state=processed.tolist()
    )

# Better: Pre-compute or use separate thread
def better_handler(shared_state):
    # Read from shared state updated by another thread
    return Signal(
        valence=shared_state['valence'],
        state=shared_state['state']
    )

Batch Processing

For bulk operations, batch your requests:
from typing import List
from avenieca.api.model import ESSInsert

def create_ess_batch(ess_list: List[ESSInsert], eca):
    """Create multiple ESS with error tracking."""
    
    created = []
    failed = []
    
    for i, ess in enumerate(ess_list):
        res, status = eca.ess.create(data=ess)
        
        if status == 201:
            created.append(res)
        else:
            failed.append((i, ess, res, status))
    
    print(f"Created {len(created)} ESS, {len(failed)} failed")
    
    # Log failures for retry
    for idx, ess, error, status in failed:
        print(f"Failed ESS {idx}: status {status}")
    
    return created, failed

# Usage
ess_batch = [
    ESSInsert(module_id="temp", state=[20.0], valence=50, score=5),
    ESSInsert(module_id="temp", state=[21.0], valence=55, score=6),
    ESSInsert(module_id="temp", state=[22.0], valence=60, score=7),
]

created, failed = create_ess_batch(ess_batch, eca)

State Vector Design

Keep State Vectors Consistent

Maintain the same dimensionality within a module:
# Good: Consistent 3D state vector for temperature module
state_1 = [temp, humidity, pressure]  # [20.5, 65.0, 1013.25]
state_2 = [temp, humidity, pressure]  # [22.0, 60.0, 1012.80]

# Bad: Inconsistent dimensions
state_1 = [temp, humidity]           # [20.5, 65.0]
state_2 = [temp, humidity, pressure]  # [22.0, 60.0, 1012.80]

Normalize State Values

Normalize values to similar ranges for better similarity search:
def normalize_state(temp, humidity, pressure):
    """
    Normalize sensor values to 0-1 range.
    """
    # Temperature: 0-50°C -> 0-1
    temp_norm = temp / 50.0
    
    # Humidity: 0-100% -> 0-1
    humidity_norm = humidity / 100.0
    
    # Pressure: 950-1050 hPa -> 0-1
    pressure_norm = (pressure - 950.0) / 100.0
    
    return [temp_norm, humidity_norm, pressure_norm]

# Usage
state = normalize_state(temp=22.5, humidity=65, pressure=1013.25)
ess = ESSInsert(
    module_id="weather",
    state=state,
    valence=50,
    score=10
)

Document State Semantics

Maintain documentation of what each state dimension represents:
# state_schemas.py
MODULE_SCHEMAS = {
    "weather": {
        "dimensions": 3,
        "schema": [
            {"index": 0, "name": "temperature", "unit": "celsius", "range": [0, 50]},
            {"index": 1, "name": "humidity", "unit": "percent", "range": [0, 100]},
            {"index": 2, "name": "pressure", "unit": "hPa", "range": [950, 1050]},
        ]
    },
    "air_conditioner": {
        "dimensions": 1,
        "schema": [
            {"index": 0, "name": "setpoint", "unit": "celsius", "range": [16, 30]},
        ]
    }
}

def validate_state(module_id: str, state: list) -> bool:
    """Validate state matches module schema."""
    schema = MODULE_SCHEMAS.get(module_id)
    if not schema:
        return False
    
    if len(state) != schema["dimensions"]:
        print(f"Expected {schema['dimensions']} dimensions, got {len(state)}")
        return False
    
    for i, value in enumerate(state):
        field = schema["schema"][i]
        min_val, max_val = field["range"]
        if not (min_val <= value <= max_val):
            print(f"{field['name']} out of range: {value} not in [{min_val}, {max_val}]")
            return False
    
    return True

Production Considerations

Logging

Implement structured logging for debugging:
import logging
import json
from datetime import datetime

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('avenieca_app')

def log_api_call(operation: str, module_id: str, status: int, details: dict = None):
    """Log API calls with structured data."""
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "operation": operation,
        "module_id": module_id,
        "status": status,
        "success": status < 400,
        "details": details or {}
    }
    
    if status >= 400:
        logger.error(json.dumps(log_entry))
    else:
        logger.info(json.dumps(log_entry))

# Usage
res, status = eca.ess.create(data=ess)
log_api_call(
    operation="ess.create",
    module_id=ess.module_id,
    status=status,
    details={"ess_id": res.id if status == 201 else None}
)

Monitoring

Track key metrics:
from collections import defaultdict
from datetime import datetime
import time

class MetricsCollector:
    """Collect application metrics."""
    
    def __init__(self):
        self.counters = defaultdict(int)
        self.timings = defaultdict(list)
    
    def increment(self, metric: str, value: int = 1):
        self.counters[metric] += value
    
    def record_timing(self, metric: str, duration: float):
        self.timings[metric].append(duration)
    
    def get_stats(self):
        stats = {"counters": dict(self.counters)}
        
        # Calculate timing statistics
        stats["timings"] = {}
        for metric, times in self.timings.items():
            if times:
                stats["timings"][metric] = {
                    "count": len(times),
                    "mean": sum(times) / len(times),
                    "min": min(times),
                    "max": max(times),
                }
        
        return stats

metrics = MetricsCollector()

# Track API calls
start = time.time()
res, status = eca.ess.create(data=ess)
duration = time.time() - start

metrics.increment("ess.create.total")
if status == 201:
    metrics.increment("ess.create.success")
else:
    metrics.increment("ess.create.error")
metrics.record_timing("ess.create.duration", duration)

# Periodically log metrics
print(json.dumps(metrics.get_stats(), indent=2))

Health Checks

Implement health checks for your application:
import requests

def check_eca_health(config) -> bool:
    """Check if ECA API is accessible."""
    try:
        # Try a lightweight endpoint
        response = requests.get(
            f"{config.uri}/health",
            timeout=5
        )
        return response.status_code == 200
    except Exception as e:
        logger.error(f"ECA health check failed: {e}")
        return False

def check_kafka_health(config) -> bool:
    """Check if Kafka is accessible."""
    try:
        from kafka import KafkaProducer
        producer = KafkaProducer(
            bootstrap_servers=[config.kafka_url],
            request_timeout_ms=5000
        )
        producer.close()
        return True
    except Exception as e:
        logger.error(f"Kafka health check failed: {e}")
        return False

# Run health checks on startup
if not check_eca_health(config):
    raise RuntimeError("ECA API is not available")

if not check_kafka_health(config):
    raise RuntimeError("Kafka is not available")

Testing Strategies

Unit Testing

import unittest
from unittest.mock import Mock, patch
from avenieca.api.model import ESSInsert, ESSResponse

class TestESSOperations(unittest.TestCase):
    
    def setUp(self):
        self.ess_insert = ESSInsert(
            module_id="test_module",
            state=[1.0, 2.0, 3.0],
            valence=50.0,
            score=10
        )
    
    @patch('avenieca.api.eca.ECA')
    def test_create_ess_success(self, mock_eca):
        # Mock successful response
        mock_response = ESSResponse(
            id=1,
            module_id="test_module",
            state=[1.0, 2.0, 3.0],
            valence=50.0,
            score=10
        )
        mock_eca.ess.create.return_value = (mock_response, 201)
        
        # Test
        res, status = mock_eca.ess.create(data=self.ess_insert)
        
        # Assert
        self.assertEqual(status, 201)
        self.assertEqual(res.id, 1)
        self.assertEqual(res.module_id, "test_module")
    
    def test_validate_state_dimensions(self):
        # Test state validation
        valid = validate_state("weather", [25.0, 65.0, 1013.25])
        self.assertTrue(valid)
        
        invalid = validate_state("weather", [25.0, 65.0])
        self.assertFalse(invalid)

if __name__ == '__main__':
    unittest.main()

Integration Testing

import pytest
import os
from avenieca.api.model import Config, ESSInsert
from avenieca.api.eca import ECA

@pytest.fixture
def eca_client():
    """Fixture providing authenticated ECA client."""
    config = Config(
        uri=os.getenv("TEST_ECA_URI", "http://localhost:2580/v1"),
        username=os.getenv("TEST_USERNAME"),
        password=os.getenv("TEST_PASSWORD")
    )
    return ECA(config)

def test_ess_lifecycle(eca_client):
    """Test complete ESS create-read-update-delete cycle."""
    
    # Create
    ess_insert = ESSInsert(
        module_id="test_lifecycle",
        state=[10.0],
        valence=50.0,
        score=5
    )
    create_res, create_status = eca_client.ess.create(data=ess_insert)
    assert create_status == 201
    ess_id = create_res.id
    
    # Read
    read_res, read_status = eca_client.ess.get_one(
        module_id="test_lifecycle",
        db_id=ess_id
    )
    assert read_status == 200
    assert read_res.state == [10.0]
    
    # Update
    updated_ess = ESSInsert(
        module_id="test_lifecycle",
        state=[20.0],
        valence=60.0,
        score=6
    )
    update_res, update_status = eca_client.ess.update(
        module_id="test_lifecycle",
        db_id=ess_id,
        data=updated_ess
    )
    assert update_status == 200
    assert update_res.state == [20.0]

Next Steps

API Reference

Explore all available API methods

Aggregates

Build complex multi-module states

Predictions

Master Cortex prediction patterns

Build docs developers (and LLMs) love