Documentation Index
Fetch the complete documentation index at: https://mintlify.com/aveni-hub/avenieca-python/llms.txt
Use this file to discover all available pages before exploring further.
The Stream class provides continuous, time-based streaming to your digital twin. It automatically handles the timing logic, calling your handler function at a fixed interval.
Class Definition
Location: avenieca/producers/stream.py:9
class Stream(Producer):
def __init__(self, config: Broker, sync_rate: float)
Parameters:
config (Broker): Broker configuration object
sync_rate (float): Time interval in seconds between syncs (supports sub-second values)
Methods
publish
Signature: avenieca/producers/stream.py:25
def publish(self, func, handler_params=None, sync_once=False)
Start the continuous streaming loop. Calls the handler function at each sync interval.
Parameters:
func: Handler function that returns a Signal dataclass
handler_params (optional): Tuple of parameters to pass to the handler
sync_once (optional): If True, runs the sync loop only once (default: False)
Returns: None
Basic Usage
Simple Continuous Stream
import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Stream
# Define a handler that returns a Signal dataclass
def handler():
signal = Signal(
valence=10,
state=[0.2, 0.3, 0.8]
)
return signal
# Initialize broker configuration
config = Broker(
url=os.environ["KAFKA_URL"],
sub_topic="left_wheel", # ECA twin subscriber-topic
group="test",
pub_topic=""
)
# Initialize the Stream object with a sync_rate
stream = Stream(config=config, sync_rate=1)
stream.publish(handler) # Streams continuously every 1 second
Handler with Parameters
You can pass parameters to your handler function:
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker
def handler_with_params(params):
sensor_id, multiplier = params
# Read sensor data
value = read_sensor(sensor_id)
return Signal(
valence=value * multiplier,
state=[value, value * 2, value * 3]
)
config = Broker(
url="localhost:9092",
sub_topic="sensors",
group="sensor_group",
pub_topic=""
)
stream = Stream(config=config, sync_rate=0.5)
# Pass parameters as a tuple
params = ("sensor_001", 1.5)
stream.publish(handler_with_params, handler_params=params)
Single Sync (One-time)
Use sync_once=True to publish just once:
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker
def handler():
return Signal(
valence=10,
state=[1.0, 2.0, 3.0]
)
config = Broker(
url="localhost:9092",
sub_topic="test_topic",
group="test",
pub_topic=""
)
stream = Stream(config=config, sync_rate=1)
stream.publish(handler, sync_once=True) # Publishes once and exits
Advanced Examples
Dynamic Sensor Streaming
import random
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker
class SensorMonitor:
def __init__(self):
self.reading_count = 0
def get_reading(self):
self.reading_count += 1
# Simulate sensor readings
temperature = 20 + random.uniform(-5, 5)
humidity = 50 + random.uniform(-10, 10)
pressure = 1013 + random.uniform(-20, 20)
return Signal(
valence=temperature,
state=[temperature, humidity, pressure],
score=self.reading_count
)
config = Broker(
url="localhost:9092",
sub_topic="weather_sensor",
group="weather",
pub_topic=""
)
monitor = SensorMonitor()
stream = Stream(config=config, sync_rate=2.0)
stream.publish(monitor.get_reading) # Streams every 2 seconds
High-Frequency Sub-Second Streaming
import time
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker
def high_frequency_handler():
# Get precise timestamp
timestamp = time.time()
# Simulate high-frequency data
signal_value = timestamp % 1 # Fractional part
return Signal(
valence=signal_value * 100,
state=[signal_value, signal_value * 2, signal_value * 3]
)
config = Broker(
url="localhost:9092",
sub_topic="high_freq",
group="hf_group",
pub_topic=""
)
# Stream at 100ms intervals (0.1 seconds)
stream = Stream(config=config, sync_rate=0.1)
stream.publish(high_frequency_handler)
Controlled Start/Stop
import threading
from avenieca.data import Signal
from avenieca.producers import Stream
from avenieca.config.broker import Broker
def handler():
return Signal(
valence=10,
state=[1.0, 2.0, 3.0]
)
config = Broker(
url="localhost:9092",
sub_topic="controlled",
group="control",
pub_topic=""
)
stream = Stream(config=config, sync_rate=1)
# Run stream in a separate thread
stream_thread = threading.Thread(
target=stream.publish,
args=(handler,)
)
stream_thread.start()
# Do other work...
time.sleep(10)
# Stop streaming
stream.sync = False # Stops the publish loop
stream_thread.join()
Handler Function Requirements
Your handler function must:
- Return a
Signal dataclass instance
- Include a
state field (list of floats)
- Optionally include
valence, score, and emb_inp fields
Valid handler:
def valid_handler():
return Signal(
state=[1.0, 2.0, 3.0],
valence=10.0
)
Invalid handler (returns dict instead of Signal):
def invalid_handler():
return {"state": [1.0, 2.0, 3.0]} # Error!
Signal Validation
The Stream producer automatically validates signals before sending (see avenieca/utils/signal.py:24):
- State must not be None or empty
- State must be 1-dimensional
- State values must be int or float
- Accepts state as list, numpy array, or JSON string
Sync Rate Guidelines
Typical sync rates:
sync_rate=1.0 - Once per second (default for most sensors)
sync_rate=0.1 - 10 times per second (high-frequency data)
sync_rate=5.0 - Every 5 seconds (low-frequency monitoring)
sync_rate=0.01 - 100 times per second (real-time critical data)
Performance considerations:
- Very low sync rates (< 0.01s) may impact performance
- Consider Kafka broker throughput limits
- Monitor consumer processing capabilities