Skip to main content

What are Digital Twins?

In AveniECA, a digital twin is a virtual representation of a real-world entity, process, or system that maintains its own state history and can predict future states based on past experiences. Digital twins are the core abstraction that enables the ECA model’s episodic learning and predictive capabilities.

Key Characteristics

Every digital twin in AveniECA has:
  • Identity: A unique module_id that identifies what the twin represents
  • State History: An Episodic State Space (ESS) storing all past states
  • Input Channel: A Kafka sub_topic for receiving real-time state updates
  • Memory: Sequences that organize states into temporal episodes
  • Prediction: Ability to forecast future states through the Cortex API

Module IDs: Twin Identity

What is a Module ID?

The module_id is a string identifier that defines what type of entity a digital twin represents. It’s used throughout the system to:
  • Route signals to the correct twin
  • Store and retrieve state history
  • Query predictions
  • Group related twins together

Naming Conventions

Use descriptive, lowercase names with underscores for readability:
  • temperature_sensor
  • left_wheel
  • air_conditioner
  • climate_aggregate
# Good module IDs
module_id = "bedroom_temperature"
module_id = "hvac_unit_1"
module_id = "user_activity"

# Avoid
module_id = "temp"  # Too vague
module_id = "BedroomTemperature"  # Use lowercase
module_id = "bedroom-temperature"  # Use underscores, not hyphens

Examples from Real Systems

From the README examples:
  • Individual sensors: "temperature", "air_conditioner", "occupancy", "purifier", "air_quality_index"
  • Vehicles: "left_wheel", "right_wheel"
  • Aggregates: "aggregate001", "climate_aggregate", "gwp_aggregate"

Twin Instances: Multiple Twins of Same Type

While module_id defines the type of twin, you can have multiple instances of the same type:

Using Sub-topics for Instances

Kafka sub_topic allows you to differentiate between multiple twins of the same type:
from avenieca.config.broker import Broker

# First temperature sensor
config_sensor_1 = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="bedroom_temp",  # Instance 1
    group="sensors",
    pub_topic=""
)

# Second temperature sensor
config_sensor_2 = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="living_room_temp",  # Instance 2
    group="sensors",
    pub_topic=""
)
Both could use the same module_id="temperature" when storing in ESS, but stream to different topics.

Using Sequence Instance IDs

For tracking different instances in the API:
from avenieca.api.model import SequenceInsert

# Instance 1 of air_conditioner module
sequence_1 = SequenceInsert(
    module_id="air_conditioner",
    instance_id=1,  # First instance
    status="e"
)

# Instance 2 of air_conditioner module
sequence_2 = SequenceInsert(
    module_id="air_conditioner",
    instance_id=2,  # Second instance
    status="e"
)

Streaming to Digital Twins

Sub-topics: The Twin’s Input Channel

Every digital twin receives state updates through a Kafka topic called a sub_topic (subscriber topic). This is the input channel through which real-world state flows into the twin.
from avenieca.producers import Stream
from avenieca.data import Signal
from avenieca.config.broker import Broker
import os

def capture_state():
    # Read from real-world source
    value = read_sensor()
    return Signal(
        state=[value],
        valence=evaluate(value)
    )

# Configure connection to twin
config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",  # Twin's input channel
    group="vehicle_sensors",
    pub_topic=""  # Not publishing output in this example
)

# Stream to the twin
stream = Stream(config=config, sync_rate=1.0)
stream.publish(capture_state)

Continuous vs Event-Based Updates

# Use Stream for continuous monitoring
from avenieca.producers import Stream

def monitor_temperature():
    temp = read_thermometer()
    return Signal(state=[temp], valence=assess_temp(temp))

stream = Stream(config=config, sync_rate=1.0)  # Every second
stream.publish(monitor_temperature)
# Runs continuously until stopped

Episodic State Space (ESS)

What is ESS?

The Episodic State Space is where digital twins store their state history. Every signal sent to a twin is stored as an ESS entry, creating a complete record of the twin’s evolution over time.

ESS Structure

from avenieca.api.model import ESSInsert

# Creating an ESS entry for a twin
ess_entry = ESSInsert(
    module_id="air_conditioner",  # Which twin
    state=[25.0],                  # State vector
    valence=10.0,                  # Assessment
    score=4,                       # Importance
    embedding_input=1,             # Semantic link
    context=None                   # Optional metadata
)

res, status = eca.ess.create(data=ess_entry)

Querying Twin History

from avenieca.api.eca import ECA
from avenieca.api.model import Config

# Initialize API client
config = Config(
    uri="http://localhost:2580/v1",
    username=username,
    password=password
)
eca = ECA(config)

# Get all states for a twin
states, status = eca.ess.get_all(module_id="air_conditioner")

# Get specific state by ID
state, status = eca.ess.get_one(module_id="air_conditioner", db_id=8)

# Search for similar states
from avenieca.api.model import Search

search_results, status = eca.ess.search(data=Search(
    module_id="air_conditioner",
    state=[18.0],  # Find states similar to this
    limit=5
))

Real-World Use Cases

Smart Building Climate Control

# Individual twins for each component
temperature_twin = {
    "module_id": "temperature",
    "sub_topic": "climate/temperature",
    "state_dim": 1  # [celsius]
}

humidity_twin = {
    "module_id": "humidity",
    "sub_topic": "climate/humidity",
    "state_dim": 1  # [percentage]
}

ac_twin = {
    "module_id": "air_conditioner",
    "sub_topic": "climate/ac",
    "state_dim": 1  # [power_level]
}

# Aggregate twin combines them
climate_aggregate = {
    "module_id": "climate_aggregate",
    "sub_topic": "climate/aggregate",
    "components": ["temperature", "humidity", "air_conditioner"],
    "state_dim": 3  # Combined state vector
}
Benefits:
  • Each component maintains its own history
  • Aggregate learns relationships between components
  • System predicts optimal AC settings based on temperature/humidity patterns

Autonomous Vehicle Fleet

# Digital twins for each vehicle subsystem
vehicle_twins = [
    {"module_id": "left_wheel", "sub_topic": "vehicle/wheel/left"},
    {"module_id": "right_wheel", "sub_topic": "vehicle/wheel/right"},
    {"module_id": "battery", "sub_topic": "vehicle/battery"},
    {"module_id": "motor", "sub_topic": "vehicle/motor"},
]

# Aggregate for whole vehicle behavior
vehicle_aggregate = {
    "module_id": "vehicle_001",
    "sub_topic": "vehicle/aggregate",
    "components": ["left_wheel", "right_wheel", "battery", "motor"]
}
Benefits:
  • Track individual component wear and performance
  • Predict maintenance needs before failures
  • Optimize routes based on learned battery/motor patterns
  • Detect anomalies by comparing to normal state sequences

Industrial Equipment Monitoring

# Twin for each machine
machine_twin = {
    "module_id": "cnc_mill_1",
    "sub_topic": "factory/cnc/mill_1",
    "state": [  # Multi-dimensional state
        "vibration_x",
        "vibration_y",
        "vibration_z",
        "temperature",
        "spindle_speed",
        "power_draw"
    ]
}
Benefits:
  • Continuous health monitoring
  • Early detection of degradation patterns
  • Predictive maintenance scheduling
  • Quality control through state correlation

User Behavior Analytics

# Digital twin for user activity
user_twin = {
    "module_id": "user_activity",
    "sub_topic": "users/activity",
    "state": [
        "session_duration",
        "clicks_per_minute",
        "pages_viewed",
        "feature_engagement"
    ]
}
Benefits:
  • Learn typical user behavior patterns
  • Predict next likely actions
  • Detect unusual behavior (security)
  • Personalize experience based on learned preferences

Individual Modules vs Aggregates

Individual Modules

Purpose: Represent a single, specific aspect or component
# Simple individual module
temp_signal = Signal(
    state=[22.5],  # Just temperature
    valence=10.0
)

# Store to ESS
ess = ESSInsert(
    module_id="temperature",
    state=[22.5],
    valence=10.0,
    score=1
)
When to use:
  • Tracking a single sensor or metric
  • Need independent history for a component
  • Component may be part of multiple aggregates

Aggregate Modules

Purpose: Combine multiple modules to learn multi-modal patterns
from avenieca.api.model import ESSResponse, ESSInsert
from typing import List

# Individual module states
temp_ess = ESSResponse(
    id=1, module_id="temperature",
    state=[28.0], valence=-90, score=1
)

ac_ess = ESSResponse(
    id=2, module_id="air_conditioner",
    state=[75.0], valence=50, score=2  # Power percentage
)

# Create aggregate
def create_aggregate(components: List[ESSResponse]):
    aggregate = ESSInsert(
        module_id="climate_aggregate",
        state=[],  # Will be populated
        valence=0.0,
        score=0,
        aggregate_id=[],
        aggregate_valence=[],
        aggregate_score=[],
        aggregate_module_id=[],
        aggregate_shape=[],
        aggregate_context=[],
        aggregate_emb_inp=[]
    )
    
    total_valence = 0.0
    total_score = 0
    
    for component in components:
        # Combine states
        aggregate.state.extend(component.state)
        
        # Track metadata
        aggregate.aggregate_id.append(component.id)
        aggregate.aggregate_module_id.append(component.module_id)
        aggregate.aggregate_valence.append(component.valence)
        aggregate.aggregate_score.append(component.score)
        aggregate.aggregate_shape.append(len(component.state))
        aggregate.aggregate_emb_inp.append(component.embedding_input)
        aggregate.aggregate_context.append(component.context)
        
        total_valence += component.valence
        total_score += component.score
    
    # Calculate aggregate metrics
    n = len(components)
    aggregate.avg_ess_valence = total_valence / n
    aggregate.avg_ess_score = int(total_score / n)
    aggregate.total_ess_score = total_score
    aggregate.valence = total_valence
    
    return aggregate

# Create and store aggregate
agg = create_aggregate([temp_ess, ac_ess])
res, status = eca.ess.create(data=agg)
When to use:
  • Learning relationships between multiple components
  • Predicting how multiple systems evolve together
  • Need holistic understanding of complex system
  • Want to query complete system state efficiently

Aggregate Metadata

Aggregates maintain rich metadata about their components:
  • aggregate_id: Database IDs of component ESS entries
  • aggregate_module_id: Module IDs of each component
  • aggregate_shape: Dimensionality of each component’s state
  • aggregate_valence: Individual valence values
  • aggregate_score: Individual score values
  • avg_ess_valence: Average valence across components
  • avg_ess_score: Average score across components
This allows reconstructing individual component states from the aggregate.

Predictions: Twin Intelligence

How Twins Predict Future States

Digital twins use their episodic memory (sequences of past states) to predict likely future states through the Cortex API:
from avenieca.api.model import NextStateRequest

# Request prediction
request = NextStateRequest(
    module_id="climate_aggregate",
    recall=20,   # Look back at last 20 states
    range=20,    # Consider states within range of 20
    n=3,         # Predict 3 most likely next states
    status="e"   # Use sequences with status "e"
)

response, status = eca.cortex.predictions(data=request)

# response.current_state: The twin's current state
# response.next_state: List of n predicted next states

Prediction Use Cases

Compare actual next state with predictions. Large deviations indicate anomalies:
predictions = eca.cortex.predictions(data=request)
actual_next = get_current_state()

# Check if actual matches any prediction
is_anomaly = not any(
    is_similar(actual_next, pred) 
    for pred in predictions.next_state
)
Predict when equipment will enter failure states:
# Predict next 10 states
future = predict_sequence(module_id="machine", steps=10)

# Check if any predicted state has negative valence
failure_imminent = any(
    state.valence < -50 for state in future
)
Test different actions to find optimal outcomes:
# Simulate different AC power levels
outcomes = []
for power in [25, 50, 75, 100]:
    simulated_state = create_state(ac_power=power)
    prediction = predict_outcome(simulated_state)
    outcomes.append((power, prediction.valence))

# Choose action with best predicted valence
best_action = max(outcomes, key=lambda x: x[1])

Best Practices

Individual modules for components that:
  • Need independent monitoring
  • May fail separately
  • Require individual history
Aggregates for:
  • Related components that interact
  • System-level predictions
  • Holistic state queries
All signals for a module_id should have:
  • Same state vector dimensionality
  • Same component order
  • Same units/scaling
This ensures proper learning and prediction.
Structure Kafka topics hierarchically:
building/floor1/room101/temperature
building/floor1/room101/humidity
building/floor1/room101/occupancy
Makes management and debugging easier.
Use sequences and timestamps to maintain temporal relationships:
sequence = SequenceInsert(
    module_id="sensor",
    instance_id=1,
    status="e",  # Episode complete
    context="maintenance_period_2024_01"
)

Next Steps

Modules

Deep dive into module organization

Sequences

Learn about episodic memory

Streaming Guide

Start streaming to twins

API Reference

Complete ESS API documentation

Build docs developers (and LLMs) love