Skip to main content
This guide will get you from zero to working code with both Kafka streaming and the REST API.

Prerequisites

Completed installation and environment setup
Access to a running AveniECA instance
KAFKA_URL, USERNAME, and PASSWORD environment variables configured

Stream State to Your Twin

The most common use case is continuously streaming state updates to a digital twin through Kafka.

Continuous Streaming

Stream data at a fixed rate using the Stream producer:
1

Import required modules

import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Stream
2

Define a handler function

The handler should return a Signal dataclass with your state data:
def handler():
    # Your state vector (e.g., sensor readings, coordinates, etc.)
    signal = Signal(
        state=[0.2, 0.3, 0.8],  # State vector
        valence=10,              # Evaluative value (-100 to +100)
    )
    return signal
The state field must be a list of floats. The valence field is optional but recommended for learning evaluative patterns.
3

Configure the broker connection

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",  # Your twin's subscriber topic
    group="test",            # Consumer group name
    pub_topic=""             # Leave empty for producers
)
4

Start streaming

# Initialize stream with 1-second sync rate
stream = Stream(config=config, sync_rate=1)

# Start publishing (this will run continuously)
stream.publish(handler)
stream.publish() runs in an infinite loop. Use Ctrl+C to stop, or run in a separate thread for production applications.

Complete Streaming Example

import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Stream

def handler():
    """Generate state signal from your data source"""
    signal = Signal(
        state=[0.2, 0.3, 0.8],
        valence=10,
    )
    return signal

# Configure broker
config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",
    group="test",
    pub_topic=""
)

# Stream at 1 Hz
stream = Stream(config=config, sync_rate=1)
stream.publish(handler)
Expected Output: The stream will continuously publish signals to Kafka. You won’t see output unless you add logging, but your AveniECA instance will receive the state updates.

Consume Messages from Kafka

Receive and process messages from your twin’s output topic:
consumer_example.py
import os
from avenieca.config.broker import Broker
from avenieca.consumer import Consumer
from avenieca.utils.signal import get_state_as_list

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",
    group="test",
    pub_topic=""
)

# Define handler to process incoming messages
def handler(data):
    valence = data["valence"]
    state = data["state"]  # Raw state as string
    
    # Convert state to Python list
    get_state_as_list(data)
    state_list = data["state"]  # Now a list of floats
    
    print(f"Received - Valence: {valence}, State: {state_list}")

client = Consumer(config=config)
client.consume(handler, True)  # True = auto-commit offsets
Expected Output:
Received - Valence: 10, State: [0.2, 0.3, 0.8]
Received - Valence: 10, State: [0.2, 0.3, 0.8]
...

Use the REST API

Interact with AveniECA’s cognitive functions using the REST API.

Initialize the API Client

import os
from avenieca.api.model import Config
from avenieca.api.eca import ECA

# Configure API client
config = Config(
    uri="http://localhost:2580/v1",
    username=os.getenv("USERNAME"),
    password=os.getenv("PASSWORD")
)

eca = ECA(config)

Create and Query ESS Entries

Store episodic states and search for similar patterns:
from avenieca.api.model import ESSInsert

# Create an ESS entry
ess = ESSInsert(
    module_id="air_conditioner",
    state=[11],
    valence=10.0,
    score=4,
    embedding_input=1,
    context=None,
)

res, status = eca.ess.create(data=ess)
print(f"Created ESS entry: {res.id}")
print(f"Status code: {status}")
Expected Output:
Created ESS entry: 42
Status code: 201

Get Next-State Predictions

Use the Cortex API to predict future states based on learned sequences:
predictions.py
from avenieca.api.model import NextStateRequest

# Request next-state predictions
nsr = NextStateRequest(
    module_id="aggregate001",
    recall=20,    # Number of past states to consider
    range=20,     # Search range for similar patterns
    n=1,          # Number of predictions to return
    status="e",   # Sequence status filter
)

res, status = eca.cortex.predictions(data=nsr)

print(f"Current state: {res.current_state}")
print(f"Predicted next states: {res.next_state}")
Expected Output:
Current state: [Twin(aggregate_id=5, ess_id=42, module_id='temperature', state='[25.0]')]
Predicted next states: [[Twin(...), Twin(...)]]

Work with Documents and Retrieval

Store documents and query them with natural language:
from avenieca.api.model import DocumentInsert

# Create and embed a document
document = DocumentInsert(
    doc_id="001",
    text="The temperature was 28°C on May 3rd at 1pm",
    embed=True,  # Generate embeddings for retrieval
)

res, status = eca.document.create(data=document)
print(f"Document created: {res.id}")
Expected Output:
Document created: 1
Answer: The temperature was 28°C on May 3rd at 1pm

Create Sequences

Track temporal sequences of states:
sequences.py
from avenieca.api.model import SequenceInsert

# Create a sequence
sequence = SequenceInsert(
    module_id="air_conditioner",
    instance_id=10,
    status="e",   # 'e' = ended, 'n' = new, 'sk' = skipped
    context=None,
)

res, status = eca.sequence.create(data=sequence)
print(f"Sequence created: {res.id}")

# Get all sequences for a module
res, status = eca.sequence.get_all(module_id="air_conditioner")
for seq in res:
    print(f"Sequence {seq.id}: instance={seq.instance_id}, status={seq.status}")
Expected Output:
Sequence created: 3
Sequence 1: instance=5, status=e
Sequence 2: instance=8, status=e
Sequence 3: instance=10, status=e

Complete Example: IoT Sensor System

Here’s a real-world example combining streaming and API usage:
iot_system.py
import os
import time
import random
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.api.model import Config, ESSInsert
from avenieca.api.eca import ECA

# Configure streaming
kafka_config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="temperature_sensor",
    group="iot_system",
    pub_topic=""
)

# Configure REST API
api_config = Config(
    uri="http://localhost:2580/v1",
    username=os.getenv("USERNAME"),
    password=os.getenv("PASSWORD")
)
eca = ECA(api_config)

# Simulate temperature sensor
def get_temperature():
    temp = 20 + random.uniform(-5, 5)
    valence = -abs(temp - 22) * 10  # Prefer 22°C
    
    signal = Signal(
        state=[temp],
        valence=valence,
    )
    
    # Also store in ESS every 10th reading
    if random.random() < 0.1:
        ess = ESSInsert(
            module_id="temperature",
            state=[temp],
            valence=valence,
            score=1,
        )
        res, status = eca.ess.create(data=ess)
        print(f"Stored ESS entry {res.id}: temp={temp:.1f}°C")
    
    return signal

# Start streaming
stream = Stream(config=kafka_config, sync_rate=1)
stream.publish(get_temperature)

Next Steps

Now that you have the basics working, explore more advanced features:

API Reference

Complete API documentation for all endpoints

Streaming Guide

Advanced streaming patterns and best practices

Aggregates

Combine multiple states into aggregate memories

Embeddings

Work with semantic embeddings and similarity search

Build docs developers (and LLMs) love