Skip to main content

Overview

The Stream class is a producer for continuous syncing at a specified sync rate. Use this class when you want the library to handle the syncing logic for you by providing a handler function that returns signal data for publishing.

Constructor

Stream(config: Broker, sync_rate: float)
Creates a new Stream producer instance.
config
Broker
required
Configuration object containing broker connection details. Must include:
  • url: Broker URL
  • sub_topic: Subscription topic
  • pub_topic: Publishing topic
  • group: Consumer group
  • auto_offset_reset: Offset reset policy (default: “latest”)
sync_rate
float
required
Time interval between syncs in seconds (supports sub-second values, e.g., 0.5 for 500ms)

Methods

publish()

publish(func, handler_params=None, sync_once=False)
Publishes signals continuously at the configured sync rate by calling the provided handler function.
func
callable
required
Handler function that returns a Signal dataclass for publishing. The function should return a Signal object with the data to be published.
handler_params
tuple
default:"None"
Optional tuple of parameters to pass to the handler function. If provided, the handler will be called with these parameters.
sync_once
bool
default:"False"
If True, runs the sync loop only once and then stops. If False, continues syncing at the specified rate until manually stopped.

Properties

config

Gets or sets the broker configuration.
stream.config  # Get current config
stream.config = new_config  # Set new config

sync

Boolean flag that controls whether the stream should continue syncing. Set to False to stop the sync loop.
stream.sync = False  # Stop syncing

sync_rate

The configured sync rate in seconds.
rate = stream.sync_rate  # Get current sync rate

Example

from avenieca import Stream, Broker, Signal

# Configure broker connection
config = Broker(
    url="localhost:9092",
    sub_topic="sensor-data",
    pub_topic="processed-data",
    group="sensor-group"
)

# Create stream with 1-second sync rate
stream = Stream(config=config, sync_rate=1.0)

# Define handler that generates signal data
def sensor_handler():
    # Read sensor data and return as Signal
    sensor_values = [23.5, 45.2, 67.8]
    return Signal(
        state=sensor_values,
        valence=0.85,
        score=95
    )

# Start publishing (runs continuously)
stream.publish(sensor_handler)

# Or publish once
stream.publish(sensor_handler, sync_once=True)

# Handler with parameters
def parameterized_handler(params):
    device_id, threshold = params
    # Custom logic using parameters
    return Signal(state=[device_id, threshold])

stream.publish(
    parameterized_handler,
    handler_params=("device-123", 75.0)
)
  • Event - Event-driven producer
  • Consumer - Consume signals from digital twins

Build docs developers (and LLMs) love