Skip to main content
The Stream class provides continuous, time-based streaming to your digital twin. It automatically handles the timing logic, calling your handler function at a fixed interval.

Class Definition

Location: avenieca/producers/stream.py:9
class Stream(Producer):
    def __init__(self, config: Broker, sync_rate: float)
Parameters:
  • config (Broker): Broker configuration object
  • sync_rate (float): Time interval in seconds between syncs (supports sub-second values)

Methods

publish

Signature: avenieca/producers/stream.py:25
def publish(self, func, handler_params=None, sync_once=False)
Start the continuous streaming loop. Calls the handler function at each sync interval. Parameters:
  • func: Handler function that returns a Signal dataclass
  • handler_params (optional): Tuple of parameters to pass to the handler
  • sync_once (optional): If True, runs the sync loop only once (default: False)
Returns: None

Basic Usage

Simple Continuous Stream

import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Stream

# Define a handler that returns a Signal dataclass
def handler():
    signal = Signal(
        valence=10,
        state=[0.2, 0.3, 0.8]
    )
    return signal

# Initialize broker configuration
config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",  # ECA twin subscriber-topic
    group="test",
    pub_topic=""
)

# Initialize the Stream object with a sync_rate
stream = Stream(config=config, sync_rate=1)
stream.publish(handler)  # Streams continuously every 1 second

Handler with Parameters

You can pass parameters to your handler function:
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker

def handler_with_params(params):
    sensor_id, multiplier = params
    
    # Read sensor data
    value = read_sensor(sensor_id)
    
    return Signal(
        valence=value * multiplier,
        state=[value, value * 2, value * 3]
    )

config = Broker(
    url="localhost:9092",
    sub_topic="sensors",
    group="sensor_group",
    pub_topic=""
)

stream = Stream(config=config, sync_rate=0.5)

# Pass parameters as a tuple
params = ("sensor_001", 1.5)
stream.publish(handler_with_params, handler_params=params)

Single Sync (One-time)

Use sync_once=True to publish just once:
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker

def handler():
    return Signal(
        valence=10,
        state=[1.0, 2.0, 3.0]
    )

config = Broker(
    url="localhost:9092",
    sub_topic="test_topic",
    group="test",
    pub_topic=""
)

stream = Stream(config=config, sync_rate=1)
stream.publish(handler, sync_once=True)  # Publishes once and exits

Advanced Examples

Dynamic Sensor Streaming

import random
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker

class SensorMonitor:
    def __init__(self):
        self.reading_count = 0
    
    def get_reading(self):
        self.reading_count += 1
        
        # Simulate sensor readings
        temperature = 20 + random.uniform(-5, 5)
        humidity = 50 + random.uniform(-10, 10)
        pressure = 1013 + random.uniform(-20, 20)
        
        return Signal(
            valence=temperature,
            state=[temperature, humidity, pressure],
            score=self.reading_count
        )

config = Broker(
    url="localhost:9092",
    sub_topic="weather_sensor",
    group="weather",
    pub_topic=""
)

monitor = SensorMonitor()
stream = Stream(config=config, sync_rate=2.0)
stream.publish(monitor.get_reading)  # Streams every 2 seconds

High-Frequency Sub-Second Streaming

import time
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker

def high_frequency_handler():
    # Get precise timestamp
    timestamp = time.time()
    
    # Simulate high-frequency data
    signal_value = timestamp % 1  # Fractional part
    
    return Signal(
        valence=signal_value * 100,
        state=[signal_value, signal_value * 2, signal_value * 3]
    )

config = Broker(
    url="localhost:9092",
    sub_topic="high_freq",
    group="hf_group",
    pub_topic=""
)

# Stream at 100ms intervals (0.1 seconds)
stream = Stream(config=config, sync_rate=0.1)
stream.publish(high_frequency_handler)

Controlled Start/Stop

import threading
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker

def handler():
    return Signal(
        valence=10,
        state=[1.0, 2.0, 3.0]
    )

config = Broker(
    url="localhost:9092",
    sub_topic="controlled",
    group="control",
    pub_topic=""
)

stream = Stream(config=config, sync_rate=1)

# Run stream in a separate thread
stream_thread = threading.Thread(
    target=stream.publish,
    args=(handler,)
)
stream_thread.start()

# Do other work...
time.sleep(10)

# Stop streaming
stream.sync = False  # Stops the publish loop
stream_thread.join()

Handler Function Requirements

Your handler function must:
  1. Return a Signal dataclass instance
  2. Include a state field (list of floats)
  3. Optionally include valence, score, and emb_inp fields
Valid handler:
def valid_handler():
    return Signal(
        state=[1.0, 2.0, 3.0],
        valence=10.0
    )
Invalid handler (returns dict instead of Signal):
def invalid_handler():
    return {"state": [1.0, 2.0, 3.0]}  # Error!

Signal Validation

The Stream producer automatically validates signals before sending (see avenieca/utils/signal.py:24):
  • State must not be None or empty
  • State must be 1-dimensional
  • State values must be int or float
  • Accepts state as list, numpy array, or JSON string

Sync Rate Guidelines

Typical sync rates:
  • sync_rate=1.0 - Once per second (default for most sensors)
  • sync_rate=0.1 - 10 times per second (high-frequency data)
  • sync_rate=5.0 - Every 5 seconds (low-frequency monitoring)
  • sync_rate=0.01 - 100 times per second (real-time critical data)
Performance considerations:
  • Very low sync rates (< 0.01s) may impact performance
  • Consider Kafka broker throughput limits
  • Monitor consumer processing capabilities

Build docs developers (and LLMs) love