Error Handling Patterns
All AveniECA API methods return a tuple of(response, status_code). Always check both to handle errors gracefully.
Basic Error Handling
from avenieca.api.model import Error, AggregateError
res, status = eca.ess.create(data=ess)
# Check status code first
if status != 201:
# Try to parse as Error object
if isinstance(res, Error):
print("Validation errors:")
for error in res.errors:
print(f" - {error}")
elif isinstance(res, AggregateError):
# Handle aggregate-specific errors
print("Aggregate validation failed")
if res.field_length_errors:
print(f"Field length errors: {res.field_length_errors}")
else:
# Fallback for unexpected errors
print(f"Request failed with status {status}: {res}")
else:
# Success - res is ESSResponse
print(f"Created ESS with ID {res.id}")
Robust Error Handler
Create a reusable error handler for all API calls:from typing import Tuple, Any
from avenieca.api.model import Error, AggregateError
def handle_response(
res: Any,
status: int,
expected_status: int = 200,
operation: str = "API operation"
) -> Tuple[Any, bool]:
"""
Handle API response with standardized error checking.
Returns: (response, success: bool)
"""
if status != expected_status:
print(f"{operation} failed with status {status}")
if isinstance(res, Error):
for error in res.errors:
print(f" Error: {error}")
elif isinstance(res, AggregateError):
if res.field_length_errors:
print(f" Field length errors: {res.field_length_errors}")
if res.invalid_ess_db_ids:
print(f" Invalid ESS IDs: {res.invalid_ess_db_ids}")
if res.incorrect_avg_ess_score:
print(f" Score calculation: {res.incorrect_avg_ess_score}")
else:
print(f" Response: {res}")
return res, False
return res, True
# Usage
res, status = eca.ess.create(data=ess)
ess_response, success = handle_response(
res, status,
expected_status=201,
operation="ESS creation"
)
if success:
print(f"ESS created: {ess_response.id}")
Retry Logic
Implement exponential backoff for transient failures:import time
from typing import Callable, Tuple, Any
def retry_with_backoff(
operation: Callable[[], Tuple[Any, int]],
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
retryable_status_codes: set = {500, 502, 503, 504}
) -> Tuple[Any, int]:
"""
Retry an operation with exponential backoff.
"""
delay = initial_delay
for attempt in range(max_retries):
res, status = operation()
# Success - return immediately
if status < 400:
return res, status
# Client error (4xx) - don't retry
if status not in retryable_status_codes:
return res, status
# Server error - retry with backoff
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed with status {status}")
print(f"Retrying in {delay}s...")
time.sleep(delay)
delay *= backoff_factor
return res, status
# Usage
res, status = retry_with_backoff(
lambda: eca.ess.create(data=ess),
max_retries=3
)
Connection Management
Authentication Best Practices
Cache the session token instead of re-authenticating on every request:import os
from avenieca.api.model import Config
from avenieca.api.eca import ECA
class ECAClient:
"""Singleton ECA client with session management."""
_instance = None
_eca = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def get_client(self) -> ECA:
if self._eca is None:
config = Config(
uri=os.getenv("ECA_URI", "http://localhost:2580/v1"),
username=os.getenv("USERNAME"),
password=os.getenv("PASSWORD")
)
self._eca = ECA(config)
return self._eca
# Usage - always get the same authenticated client
client = ECAClient()
eca = client.get_client()
res, status = eca.ess.get_all(module_id="temperature")
Environment Configuration
Use environment variables for all configuration:import os
from dataclasses import dataclass
@dataclass
class AppConfig:
"""Application configuration from environment."""
eca_uri: str
username: str
password: str
kafka_url: str
@classmethod
def from_env(cls):
# Required variables
required = {
'eca_uri': os.getenv('ECA_URI'),
'username': os.getenv('USERNAME'),
'password': os.getenv('PASSWORD'),
'kafka_url': os.getenv('KAFKA_URL'),
}
# Check for missing required variables
missing = [k for k, v in required.items() if not v]
if missing:
raise ValueError(
f"Missing required environment variables: {', '.join(missing)}"
)
return cls(**required)
# Usage
config = AppConfig.from_env()
eca_config = Config(
uri=config.eca_uri,
username=config.username,
password=config.password
)
eca = ECA(eca_config)
Streaming Performance Tips
Optimize Sync Rate
Choose appropriate sync rates based on your use case:from avenieca.config.broker import Broker
from avenieca.producers import Stream
# High-frequency sensor data (10 Hz)
stream_fast = Stream(config=broker_config, sync_rate=0.1)
# Medium-frequency monitoring (1 Hz)
stream_medium = Stream(config=broker_config, sync_rate=1.0)
# Low-frequency state updates (every 10 seconds)
stream_slow = Stream(config=broker_config, sync_rate=10.0)
Performance tip: Higher sync rates increase load. Start with lower rates (1-5 seconds) and increase only if needed.
Efficient Signal Handlers
Keep handler functions lightweight:from avenieca.data import Signal
import numpy as np
# Good: Fast, minimal computation
def efficient_handler():
return Signal(
valence=10,
state=[0.2, 0.3, 0.8]
)
# Bad: Slow, heavy computation in handler
def slow_handler():
# Avoid expensive operations
data = np.random.random(1000)
processed = complex_algorithm(data) # Don't do this!
return Signal(
valence=processed.mean(),
state=processed.tolist()
)
# Better: Pre-compute or use separate thread
def better_handler(shared_state):
# Read from shared state updated by another thread
return Signal(
valence=shared_state['valence'],
state=shared_state['state']
)
Batch Processing
For bulk operations, batch your requests:from typing import List
from avenieca.api.model import ESSInsert
def create_ess_batch(ess_list: List[ESSInsert], eca):
"""Create multiple ESS with error tracking."""
created = []
failed = []
for i, ess in enumerate(ess_list):
res, status = eca.ess.create(data=ess)
if status == 201:
created.append(res)
else:
failed.append((i, ess, res, status))
print(f"Created {len(created)} ESS, {len(failed)} failed")
# Log failures for retry
for idx, ess, error, status in failed:
print(f"Failed ESS {idx}: status {status}")
return created, failed
# Usage
ess_batch = [
ESSInsert(module_id="temp", state=[20.0], valence=50, score=5),
ESSInsert(module_id="temp", state=[21.0], valence=55, score=6),
ESSInsert(module_id="temp", state=[22.0], valence=60, score=7),
]
created, failed = create_ess_batch(ess_batch, eca)
State Vector Design
Keep State Vectors Consistent
Maintain the same dimensionality within a module:# Good: Consistent 3D state vector for temperature module
state_1 = [temp, humidity, pressure] # [20.5, 65.0, 1013.25]
state_2 = [temp, humidity, pressure] # [22.0, 60.0, 1012.80]
# Bad: Inconsistent dimensions
state_1 = [temp, humidity] # [20.5, 65.0]
state_2 = [temp, humidity, pressure] # [22.0, 60.0, 1012.80]
Normalize State Values
Normalize values to similar ranges for better similarity search:def normalize_state(temp, humidity, pressure):
"""
Normalize sensor values to 0-1 range.
"""
# Temperature: 0-50°C -> 0-1
temp_norm = temp / 50.0
# Humidity: 0-100% -> 0-1
humidity_norm = humidity / 100.0
# Pressure: 950-1050 hPa -> 0-1
pressure_norm = (pressure - 950.0) / 100.0
return [temp_norm, humidity_norm, pressure_norm]
# Usage
state = normalize_state(temp=22.5, humidity=65, pressure=1013.25)
ess = ESSInsert(
module_id="weather",
state=state,
valence=50,
score=10
)
Document State Semantics
Maintain documentation of what each state dimension represents:# state_schemas.py
MODULE_SCHEMAS = {
"weather": {
"dimensions": 3,
"schema": [
{"index": 0, "name": "temperature", "unit": "celsius", "range": [0, 50]},
{"index": 1, "name": "humidity", "unit": "percent", "range": [0, 100]},
{"index": 2, "name": "pressure", "unit": "hPa", "range": [950, 1050]},
]
},
"air_conditioner": {
"dimensions": 1,
"schema": [
{"index": 0, "name": "setpoint", "unit": "celsius", "range": [16, 30]},
]
}
}
def validate_state(module_id: str, state: list) -> bool:
"""Validate state matches module schema."""
schema = MODULE_SCHEMAS.get(module_id)
if not schema:
return False
if len(state) != schema["dimensions"]:
print(f"Expected {schema['dimensions']} dimensions, got {len(state)}")
return False
for i, value in enumerate(state):
field = schema["schema"][i]
min_val, max_val = field["range"]
if not (min_val <= value <= max_val):
print(f"{field['name']} out of range: {value} not in [{min_val}, {max_val}]")
return False
return True
Production Considerations
Logging
Implement structured logging for debugging:import logging
import json
from datetime import datetime
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('avenieca_app')
def log_api_call(operation: str, module_id: str, status: int, details: dict = None):
"""Log API calls with structured data."""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"operation": operation,
"module_id": module_id,
"status": status,
"success": status < 400,
"details": details or {}
}
if status >= 400:
logger.error(json.dumps(log_entry))
else:
logger.info(json.dumps(log_entry))
# Usage
res, status = eca.ess.create(data=ess)
log_api_call(
operation="ess.create",
module_id=ess.module_id,
status=status,
details={"ess_id": res.id if status == 201 else None}
)
Monitoring
Track key metrics:from collections import defaultdict
from datetime import datetime
import time
class MetricsCollector:
"""Collect application metrics."""
def __init__(self):
self.counters = defaultdict(int)
self.timings = defaultdict(list)
def increment(self, metric: str, value: int = 1):
self.counters[metric] += value
def record_timing(self, metric: str, duration: float):
self.timings[metric].append(duration)
def get_stats(self):
stats = {"counters": dict(self.counters)}
# Calculate timing statistics
stats["timings"] = {}
for metric, times in self.timings.items():
if times:
stats["timings"][metric] = {
"count": len(times),
"mean": sum(times) / len(times),
"min": min(times),
"max": max(times),
}
return stats
metrics = MetricsCollector()
# Track API calls
start = time.time()
res, status = eca.ess.create(data=ess)
duration = time.time() - start
metrics.increment("ess.create.total")
if status == 201:
metrics.increment("ess.create.success")
else:
metrics.increment("ess.create.error")
metrics.record_timing("ess.create.duration", duration)
# Periodically log metrics
print(json.dumps(metrics.get_stats(), indent=2))
Health Checks
Implement health checks for your application:import requests
def check_eca_health(config) -> bool:
"""Check if ECA API is accessible."""
try:
# Try a lightweight endpoint
response = requests.get(
f"{config.uri}/health",
timeout=5
)
return response.status_code == 200
except Exception as e:
logger.error(f"ECA health check failed: {e}")
return False
def check_kafka_health(config) -> bool:
"""Check if Kafka is accessible."""
try:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=[config.kafka_url],
request_timeout_ms=5000
)
producer.close()
return True
except Exception as e:
logger.error(f"Kafka health check failed: {e}")
return False
# Run health checks on startup
if not check_eca_health(config):
raise RuntimeError("ECA API is not available")
if not check_kafka_health(config):
raise RuntimeError("Kafka is not available")
Testing Strategies
Unit Testing
import unittest
from unittest.mock import Mock, patch
from avenieca.api.model import ESSInsert, ESSResponse
class TestESSOperations(unittest.TestCase):
def setUp(self):
self.ess_insert = ESSInsert(
module_id="test_module",
state=[1.0, 2.0, 3.0],
valence=50.0,
score=10
)
@patch('avenieca.api.eca.ECA')
def test_create_ess_success(self, mock_eca):
# Mock successful response
mock_response = ESSResponse(
id=1,
module_id="test_module",
state=[1.0, 2.0, 3.0],
valence=50.0,
score=10
)
mock_eca.ess.create.return_value = (mock_response, 201)
# Test
res, status = mock_eca.ess.create(data=self.ess_insert)
# Assert
self.assertEqual(status, 201)
self.assertEqual(res.id, 1)
self.assertEqual(res.module_id, "test_module")
def test_validate_state_dimensions(self):
# Test state validation
valid = validate_state("weather", [25.0, 65.0, 1013.25])
self.assertTrue(valid)
invalid = validate_state("weather", [25.0, 65.0])
self.assertFalse(invalid)
if __name__ == '__main__':
unittest.main()
Integration Testing
import pytest
import os
from avenieca.api.model import Config, ESSInsert
from avenieca.api.eca import ECA
@pytest.fixture
def eca_client():
"""Fixture providing authenticated ECA client."""
config = Config(
uri=os.getenv("TEST_ECA_URI", "http://localhost:2580/v1"),
username=os.getenv("TEST_USERNAME"),
password=os.getenv("TEST_PASSWORD")
)
return ECA(config)
def test_ess_lifecycle(eca_client):
"""Test complete ESS create-read-update-delete cycle."""
# Create
ess_insert = ESSInsert(
module_id="test_lifecycle",
state=[10.0],
valence=50.0,
score=5
)
create_res, create_status = eca_client.ess.create(data=ess_insert)
assert create_status == 201
ess_id = create_res.id
# Read
read_res, read_status = eca_client.ess.get_one(
module_id="test_lifecycle",
db_id=ess_id
)
assert read_status == 200
assert read_res.state == [10.0]
# Update
updated_ess = ESSInsert(
module_id="test_lifecycle",
state=[20.0],
valence=60.0,
score=6
)
update_res, update_status = eca_client.ess.update(
module_id="test_lifecycle",
db_id=ess_id,
data=updated_ess
)
assert update_status == 200
assert update_res.state == [20.0]
Next Steps
API Reference
Explore all available API methods
Aggregates
Build complex multi-module states
Predictions
Master Cortex prediction patterns