Skip to main content
The AveniECA Python SDK provides Kafka-based streaming capabilities for real-time communication with digital twins. The streaming module consists of producers for sending data and consumers for receiving data.

Architecture

The streaming architecture follows a producer-consumer pattern:
┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│   Stream    │────────▶│     Kafka    │────────▶│  Consumer   │
│  Producer   │         │    Broker    │         │             │
└─────────────┘         │              │         └─────────────┘
                        │              │
┌─────────────┐         │              │
│   Event     │────────▶│  Topics:     │
│  Producer   │         │  - sub_topic │
└─────────────┘         │  - pub_topic │
                        └──────────────┘

Core Components

Producers

AveniECA provides two producer types for different use cases: Stream Producer - For continuous, time-based streaming:
  • Automatically syncs data at a fixed interval (sync_rate)
  • Handles timing logic internally
  • Best for sensors, telemetry, or periodic updates
  • See Stream Producer for details
Event Producer - For event-driven publishing:
  • Manual control over when data is sent
  • Publish individual signals on-demand
  • Best for triggered events, user actions, or irregular updates
  • See Event Producer for details

Consumer

The Consumer class receives messages from Kafka topics:
  • Processes messages using custom handler functions
  • Supports both continuous and one-time consumption
  • Provides utilities for state data conversion
  • See Consumer for details

When to Use Stream vs Event Producers

Use Stream Producer when:

  • You need periodic, time-based updates
  • Sampling sensor data at regular intervals
  • Want the library to handle timing automatically
  • Data source provides continuous readings
Example: Temperature sensor reading every second

Use Event Producer when:

  • Updates are triggered by events
  • You control the timing externally
  • Publishing individual, discrete events
  • Integration with event-driven systems
Example: User clicks a button, system state changes

Signal Data Structure

Both producers work with the Signal dataclass:
from avenieca.data import Signal

signal = Signal(
    state=[0.2, 0.3, 0.8],  # Required: list of floats
    valence=10.0,            # Optional: valence score
    score=5,                 # Optional: integer score
    emb_inp=1                # Optional: embedding input ID
)
Signal Fields:
  • state (required): List of float values representing the state vector
  • valence (optional): Float value for valence scoring
  • score (optional): Integer score value
  • emb_inp (optional): Embedding input identifier

Configuration

All streaming components require a Broker configuration object:
from avenieca.config.broker import Broker
import os

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",  # Topic for subscribing (producing to twin)
    pub_topic="output",      # Topic for publishing (consuming from twin)
    group="test",            # Consumer group ID
    auto_offset_reset="latest"  # Offset reset policy
)
See Configuration for complete details on broker settings.

Quick Start Examples

Stream continuous data

from avenieca.producers import Stream
from avenieca.data import Signal
from avenieca.config.broker import Broker

def handler():
    return Signal(
        valence=10,
        state=[0.2, 0.3, 0.8]
    )

config = Broker(
    url="localhost:9092",
    sub_topic="sensor_data",
    group="sensors",
    pub_topic=""
)

stream = Stream(config=config, sync_rate=1.0)
stream.publish(handler)

Publish a single event

from avenieca.producers import Event
from avenieca.data import Signal
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="events",
    group="app",
    pub_topic=""
)

signal = Signal(
    valence=9.0,
    state=[0.2, 0.3, 0.8]
)

event = Event(config=config)
event.publish(signal)

Consume messages

from avenieca.consumer import Consumer
from avenieca.config.broker import Broker

def handler(data):
    print(f"Received valence: {data['valence']}")
    print(f"Received state: {data['state']}")

config = Broker(
    url="localhost:9092",
    sub_topic="",
    pub_topic="output",
    group="consumers"
)

consumer = Consumer(config=config)
consumer.consume(handler)

Next Steps

Build docs developers (and LLMs) love