Skip to main content

What are Modules?

In AveniECA, a module is an organizational unit that groups related digital twins together. The module_id serves as both an identifier and a namespace for storing and querying state data in the Episodic State Space (ESS).

Module Fundamentals

Module as Type Definition

Think of a module as a “type” or “class” of digital twin. Just as you might have multiple instances of a Python class, you can have multiple instances of a module:
# Module defines the type
module_id = "temperature"

# Multiple instances can exist
instance_1 = {"sub_topic": "bedroom_temp", "instance_id": 1}
instance_2 = {"sub_topic": "kitchen_temp", "instance_id": 2}
instance_3 = {"sub_topic": "garage_temp", "instance_id": 3}
All instances share:
  • The same module_id
  • The same state vector structure
  • The same ESS storage
  • Access to the same prediction models

Module ID Structure

A module_id is a string that appears throughout the AveniECA system:
from avenieca.config.broker import Broker

config = Broker(
    url=kafka_url,
    sub_topic="left_wheel",  # Instance-specific
    group="vehicles",
    pub_topic=""
)
# Signals sent here are associated with a module_id

Module Types

Individual Modules

Purpose: Represent a single, atomic component or measurement. Characteristics:
  • Simple state vectors (often 1-3 dimensions)
  • Independent state history
  • Can be components of aggregates
  • Focused prediction domain
Examples from Real Systems:
# Environmental sensors
modules = [
    {"module_id": "temperature", "state_dim": 1},      # [celsius]
    {"module_id": "humidity", "state_dim": 1},         # [percentage]
    {"module_id": "air_quality_index", "state_dim": 1}, # [aqi]
    {"module_id": "occupancy", "state_dim": 1},        # [people_count]
]

# Equipment components
modules = [
    {"module_id": "air_conditioner", "state_dim": 1},  # [power_level]
    {"module_id": "purifier", "state_dim": 1},         # [speed]
]

# Vehicle subsystems
modules = [
    {"module_id": "left_wheel", "state_dim": 3},       # [speed, temp, pressure]
    {"module_id": "right_wheel", "state_dim": 3},
]
When to use individual modules:
  • Component needs independent monitoring
  • State is simple and well-defined
  • May be part of multiple different aggregates
  • Need granular history for specific component

Aggregate Modules

Purpose: Combine multiple individual modules to learn multi-modal patterns and relationships. Characteristics:
  • Complex state vectors (combination of component states)
  • Rich metadata about components
  • Enable holistic predictions
  • Store relationships between components
Structure Example:
from avenieca.api.model import ESSInsert

# Aggregate combining 5 environmental modules
aggregate = ESSInsert(
    module_id="climate_aggregate",
    
    # Combined state vector
    state=[25.0, 65.0, 10.0, 2.0, 70.0],  # temp, humidity, occupancy, purifier, aqi
    
    # Aggregate-level metrics
    valence=-10.0,           # Overall assessment
    avg_ess_valence=-18.0,   # Average component valence
    total_ess_score=37,      # Sum of component scores
    avg_ess_score=7,         # Average component score
    score=0,
    
    # Component tracking
    aggregate_id=[2, 7, 3, 5, 6],                      # ESS IDs of components
    aggregate_module_id=[                               # Component types
        "air_conditioner",
        "occupancy", 
        "purifier",
        "temperature",
        "air_quality_index"
    ],
    aggregate_shape=[1, 1, 1, 1, 1],                   # Dimensions of each
    aggregate_valence=[90.0, -90.0, 90.0, -90.0, -90.0],  # Individual assessments
    aggregate_score=[18, 6, 28, 2, 1],                  # Individual scores
    aggregate_context=[None, None, None, None, None],
    aggregate_emb_inp=[None, None, None, None, None]
)
When to use aggregates:
  • Components interact or influence each other
  • Need to predict multi-variable outcomes
  • Want to understand system-level behavior
  • Need efficient queries for complete system state

Module Organization Patterns

Hierarchical Organization

Organize modules in a hierarchy from simple to complex:
Individual Sensors → Component Aggregates → System Aggregates
Example: Smart Building
# Level 1: Individual sensors
individual_modules = [
    "temperature",
    "humidity",
    "co2_level",
    "light_level",
    "occupancy"
]

# Level 2: Room aggregate
room_aggregate = {
    "module_id": "room_101_climate",
    "components": ["temperature", "humidity", "co2_level"]
}

# Level 3: Floor aggregate
floor_aggregate = {
    "module_id": "floor_1_climate",
    "components": [
        "room_101_climate",
        "room_102_climate",
        "room_103_climate"
    ]
}

# Level 4: Building aggregate
building_aggregate = {
    "module_id": "building_climate",
    "components": [
        "floor_1_climate",
        "floor_2_climate",
        "floor_3_climate"
    ]
}
Benefits:
  • Predictions at multiple levels of granularity
  • Efficient queries at appropriate scope
  • Clear organization and debugging
  • Scalable to large systems

Functional Organization

Group modules by function or domain:
# Climate control domain
climate_modules = [
    "temperature",
    "humidity",
    "hvac_unit",
    "climate_aggregate"
]

# Air quality domain
air_quality_modules = [
    "co2_sensor",
    "pm25_sensor",
    "voc_sensor",
    "air_purifier",
    "air_quality_aggregate"
]

# Occupancy domain
occupancy_modules = [
    "motion_sensor",
    "door_sensor",
    "camera_count",
    "occupancy_aggregate"
]

# Building-wide aggregate
building_aggregate = {
    "module_id": "building_systems",
    "components": [
        "climate_aggregate",
        "air_quality_aggregate",
        "occupancy_aggregate"
    ]
}
Benefits:
  • Clear separation of concerns
  • Domain experts can focus on relevant modules
  • Independent evolution of domains
  • Easier maintenance and debugging

Creating and Using Modules

Defining a New Module

1

Design the state vector

Determine what information the module needs to track:
# Simple: Single value
state = [temperature_celsius]

# Multi-dimensional: Related values
state = [speed_rpm, temperature_c, vibration_level]

# Complex: System state
state = [sensor1, sensor2, sensor3, actuator1, actuator2]
2

Choose module_id naming

Use descriptive, consistent naming:
# Good
module_id = "hvac_unit_1"
module_id = "temperature_sensor"
module_id = "climate_aggregate"

# Avoid
module_id = "temp"  # Too vague
module_id = "Module1"  # Not descriptive
module_id = "sensor-01"  # Use underscores
3

Implement data capture

Create a function that generates signals:
from avenieca.data import Signal

def capture_hvac_state():
    # Read from hardware/API
    power_level = get_hvac_power()
    fan_speed = get_fan_speed()
    
    # Assess the state
    valence = calculate_efficiency_score(power_level, fan_speed)
    
    return Signal(
        state=[power_level, fan_speed],
        valence=valence,
        score=1
    )
4

Configure streaming or API

Set up Kafka streaming or direct API calls:
from avenieca.producers import Stream
from avenieca.config.broker import Broker

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="hvac/unit_1",  # Instance-specific topic
    group="building_systems",
    pub_topic=""
)

stream = Stream(config=config, sync_rate=5.0)  # Every 5 seconds
stream.publish(capture_hvac_state)

Creating Aggregates

Follow this pattern from the README to create properly structured aggregates:
from avenieca.api.model import ESSResponse, ESSInsert
from typing import List

def create_aggregate_from_ess(
    array_ess: List[ESSResponse], 
    aggregate_insert: ESSInsert
) -> ESSInsert:
    """
    Combine multiple ESS entries into an aggregate.
    
    Args:
        array_ess: List of component ESS entries
        aggregate_insert: Template aggregate to populate
        
    Returns:
        Populated aggregate ready to insert
    """
    total_ess_score = 0
    total_ess_valence = 0.0
    
    for ess in array_ess:
        # Combine state vectors
        aggregate_insert.state.extend(ess.state)
        
        # Track component metadata
        aggregate_insert.aggregate_module_id.append(ess.module_id)
        aggregate_insert.aggregate_id.append(ess.id)
        aggregate_insert.aggregate_context.append(ess.context)
        aggregate_insert.aggregate_valence.append(ess.valence)
        aggregate_insert.aggregate_score.append(ess.score)
        aggregate_insert.aggregate_emb_inp.append(ess.embedding_input)
        aggregate_insert.aggregate_shape.append(len(ess.state))
        
        # Accumulate metrics
        total_ess_score += ess.score
        total_ess_valence += ess.valence
    
    # Calculate aggregate metrics
    n = len(array_ess)
    aggregate_insert.total_ess_score = total_ess_score
    aggregate_insert.avg_ess_score = int(total_ess_score / n)
    aggregate_insert.avg_ess_valence = total_ess_valence / n
    aggregate_insert.valence = total_ess_valence
    
    return aggregate_insert

# Usage
components = [
    ess_temperature,
    ess_humidity,
    ess_air_quality,
    ess_occupancy
]

aggregate_template = ESSInsert(
    module_id="room_climate_aggregate",
    state=[],
    valence=0.0,
    avg_ess_valence=0.0,
    total_ess_score=0,
    avg_ess_score=0,
    score=0,
    aggregate_id=[],
    aggregate_valence=[],
    aggregate_score=[],
    aggregate_module_id=[],
    aggregate_shape=[],
    aggregate_context=[],
    aggregate_emb_inp=[]
)

aggregate = create_aggregate_from_ess(components, aggregate_template)
res, status = eca.ess.create(data=aggregate)

Querying Modules

Get All States for a Module

from avenieca.api.eca import ECA
from avenieca.api.model import Config

config = Config(
    uri="http://localhost:2580/v1",
    username=username,
    password=password
)
eca = ECA(config)

# Get all ESS entries for a module
states, status = eca.ess.get_all(module_id="temperature")

for state in states:
    print(f"State: {state.state}, Valence: {state.valence}")

Search for Similar States

from avenieca.api.model import Search

# Find states similar to a target state
search_results, status = eca.ess.search(data=Search(
    module_id="air_conditioner",
    state=[18.0],  # Target state
    limit=5        # Return top 5 matches
))

for result in search_results:
    print(f"Score: {result.score}, State: {result.ess.state}")

Get States from Sequences

# Get all ESS entries referenced by sequences
sequence_states, status = eca.ess.get_all_sequence(
    module_id="air_conditioner"
)

# Get ESS for a specific sequence
state, status = eca.ess.get_one_sequence(
    module_id="air_conditioner",
    sequence_id=3
)

Find Aggregates Containing a Module

# Find all aggregates that include a specific component
aggregates, status = eca.ess.get_all_aggregates(
    module_id="temperature",           # Component module
    aggregate_module_id="climate_aggregate",  # Aggregate type
    ess_id=8                           # Specific component instance
)

for agg in aggregates:
    print(f"Aggregate contains: {agg.aggregate_module_id}")

Module Predictions

Requesting Predictions

The Cortex API predicts future states for modules:
from avenieca.api.model import NextStateRequest

# Predict next states
request = NextStateRequest(
    module_id="climate_aggregate",
    recall=20,    # Look back 20 states
    range=20,     # Consider states within range of 20
    n=3,          # Return top 3 predictions
    status="e"    # Use sequences with status "e" (episode)
)

response, status = eca.cortex.predictions(data=request)

# Current state
print(f"Current: {response.current_state}")

# Predicted next states (sorted by likelihood)
for i, twin_list in enumerate(response.next_state):
    print(f"Prediction {i+1}:")
    for twin in twin_list.list:
        print(f"  {twin.module_id}: {twin.state}")

Raw Predictions

Get predictions as raw aggregate values:
# Get raw state vectors instead of mapped values
response_raw, status = eca.cortex.predictions_raw(data=request)

for prediction in response_raw.next_state:
    for twin in prediction.list:
        # Raw state vector
        print(f"Module: {twin.module_id}")
        print(f"State vector: {twin.state}")  # List[float]

Best Practices

All ESS entries for a module_id must have:
  • Same state vector dimensionality
  • Same component order
  • Same units and scaling
# Good: Consistent structure
ess1 = ESSInsert(module_id="sensor", state=[temp, humidity, pressure])
ess2 = ESSInsert(module_id="sensor", state=[temp, humidity, pressure])

# Bad: Inconsistent
ess1 = ESSInsert(module_id="sensor", state=[temp, humidity])
ess2 = ESSInsert(module_id="sensor", state=[temp, humidity, pressure])
Choose descriptive names that indicate:
  • What the module represents
  • Its level of abstraction
  • Its domain or context
# Clear and descriptive
"hvac_unit_bedroom"
"temperature_sensor"
"building_climate_aggregate"

# Vague or unclear
"module1"
"temp"
"aggregate"
When creating aggregates:
  • Include related, interacting components
  • Maintain component ordering consistency
  • Validate aggregate metadata accuracy
  • Use the helper function from README
The system validates aggregate structure, so ensure:
  • aggregate_shape matches actual component dimensions
  • aggregate_id references exist in ESS
  • Metric calculations are correct
Design modules at appropriate scope:
  • Too narrow: “temp_reading_1” (overly specific)
  • Good: “temperature” (right level)
  • Too broad: “all_sensors” (too general)
Each module should represent a coherent concept.

Common Patterns

Pattern: Sensor Array

Multiple identical sensors, different locations:
# Same module_id, different instances
for i, location in enumerate(["room_1", "room_2", "room_3"]):
    config = Broker(
        url=kafka_url,
        sub_topic=f"temperature/{location}",
        group="temperature_sensors",
        pub_topic=""
    )
    
    stream = Stream(config=config, sync_rate=10.0)
    stream.publish(lambda: read_temperature(location))

Pattern: Multi-Level Aggregates

Build system understanding hierarchically:
# Level 1: Components
components = [temp_ess, humidity_ess, co2_ess]
room_agg = create_aggregate_from_ess(components, room_template)

# Level 2: Room aggregate
eca.ess.create(data=room_agg)

# Level 3: Floor aggregate (aggregate of aggregates)
floor_components = [room_1_agg, room_2_agg, room_3_agg]
floor_agg = create_aggregate_from_ess(floor_components, floor_template)
eca.ess.create(data=floor_agg)

Pattern: Dynamic Module Creation

Create modules programmatically:
def create_module_for_sensor(sensor_config):
    """
    Dynamically create and configure a module for a sensor.
    """
    module_id = f"{sensor_config['type']}_{sensor_config['location']}"
    
    def capture_state():
        value = sensor_config['read_function']()
        return Signal(
            state=[value],
            valence=sensor_config['assess_function'](value)
        )
    
    config = Broker(
        url=os.environ["KAFKA_URL"],
        sub_topic=f"{sensor_config['type']}/{sensor_config['location']}",
        group=sensor_config['type'],
        pub_topic=""
    )
    
    stream = Stream(config=config, sync_rate=sensor_config['interval'])
    return stream, capture_state

# Use it
sensors = [
    {"type": "temperature", "location": "bedroom", "read_function": read_temp, "assess_function": assess_temp, "interval": 10},
    {"type": "humidity", "location": "bedroom", "read_function": read_humidity, "assess_function": assess_humidity, "interval": 10},
]

for sensor in sensors:
    stream, handler = create_module_for_sensor(sensor)
    stream.publish(handler)

Next Steps

Signals

Learn about the Signal data structure

Digital Twins

Understand digital twin concepts

ESS API

Complete ESS API reference

Cortex API

Prediction API reference

Build docs developers (and LLMs) love