Stream class provides continuous, time-based streaming to your digital twin. It automatically handles the timing logic, calling your handler function at a fixed interval.
Class Definition
Location:avenieca/producers/stream.py:9
config(Broker): Broker configuration objectsync_rate(float): Time interval in seconds between syncs (supports sub-second values)
Methods
publish
Signature:avenieca/producers/stream.py:25
func: Handler function that returns a Signal dataclasshandler_params(optional): Tuple of parameters to pass to the handlersync_once(optional): If True, runs the sync loop only once (default: False)
Basic Usage
Simple Continuous Stream
Handler with Parameters
You can pass parameters to your handler function:Single Sync (One-time)
Usesync_once=True to publish just once:
Advanced Examples
Dynamic Sensor Streaming
High-Frequency Sub-Second Streaming
Controlled Start/Stop
Handler Function Requirements
Your handler function must:- Return a
Signaldataclass instance - Include a
statefield (list of floats) - Optionally include
valence,score, andemb_inpfields
Signal Validation
The Stream producer automatically validates signals before sending (seeavenieca/utils/signal.py:24):
- State must not be None or empty
- State must be 1-dimensional
- State values must be int or float
- Accepts state as list, numpy array, or JSON string
Sync Rate Guidelines
Typical sync rates:sync_rate=1.0- Once per second (default for most sensors)sync_rate=0.1- 10 times per second (high-frequency data)sync_rate=5.0- Every 5 seconds (low-frequency monitoring)sync_rate=0.01- 100 times per second (real-time critical data)
- Very low sync rates (< 0.01s) may impact performance
- Consider Kafka broker throughput limits
- Monitor consumer processing capabilities
Related Classes
- Event Producer - For event-driven publishing
- Consumer - For consuming streamed data
- Configuration - Broker configuration options