Documentation Index
Fetch the complete documentation index at: https://mintlify.com/aveni-hub/avenieca-python/llms.txt
Use this file to discover all available pages before exploring further.
The Event class provides event-driven publishing to your digital twin. Unlike the Stream producer, you control when signals are sent, making it ideal for triggered events and irregular updates.
Class Definition
Location: avenieca/producers/event.py:8
class Event(Producer):
def __init__(self, config: Broker)
Parameters:
config (Broker): Broker configuration object
Methods
publish
Signature: avenieca/producers/event.py:21
def publish(self, signal: Signal)
Publish a single signal to the digital twin.
Parameters:
signal (Signal): Signal dataclass instance to publish
Returns: None
Basic Usage
Simple Event Publishing
import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Event
config = Broker(
url=os.environ["KAFKA_URL"],
sub_topic="left_wheel",
group="test",
pub_topic=""
)
# Define the signal
signal = Signal(
valence=9.0,
state=[0.2, 0.3, 0.8]
)
event = Event(config=config)
event.publish(signal) # Publishes immediately
Multiple Events
Publish multiple events independently:
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="events",
group="app",
pub_topic=""
)
event_producer = Event(config=config)
# Publish first event
event_producer.publish(Signal(
valence=10.0,
state=[1.0, 2.0, 3.0],
score=1
))
# Publish second event
event_producer.publish(Signal(
valence=15.0,
state=[4.0, 5.0, 6.0],
score=2
))
When to Use Event Producer
Use Event Producer for:
User-triggered actions
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="user_actions",
group="webapp",
pub_topic=""
)
event = Event(config=config)
def on_button_click(button_id, user_id):
signal = Signal(
valence=1.0,
state=[float(button_id), float(user_id)]
)
event.publish(signal)
State changes
from avenieca.data import Signal
from avenieca.producers import Event
class SystemMonitor:
def __init__(self, event_producer):
self.event = event_producer
self.state = "idle"
def change_state(self, new_state):
old_state = self.state
self.state = new_state
# Publish state change event
signal = Signal(
valence=self.get_state_value(new_state),
state=[self.encode_state(old_state), self.encode_state(new_state)]
)
self.event.publish(signal)
def get_state_value(self, state):
return {"idle": 0, "running": 10, "error": -10}.get(state, 0)
def encode_state(self, state):
return float({"idle": 0, "running": 1, "error": 2}.get(state, 0))
Threshold-based alerts
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="alerts",
group="monitoring",
pub_topic=""
)
event = Event(config=config)
def check_temperature(temp):
if temp > 80:
# Publish alert event only when threshold exceeded
signal = Signal(
valence=-100.0, # High negative valence for alert
state=[temp, 80.0, temp - 80.0], # current, threshold, difference
score=10 # High priority
)
event.publish(signal)
External system integration
import json
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="webhooks",
group="integrations",
pub_topic=""
)
event = Event(config=config)
def webhook_handler(webhook_data):
"""Handle incoming webhook and publish as event"""
payload = json.loads(webhook_data)
signal = Signal(
valence=payload.get("priority", 0),
state=[
float(payload.get("value1", 0)),
float(payload.get("value2", 0)),
float(payload.get("value3", 0))
]
)
event.publish(signal)
Stream vs Event Comparison
| Feature | Stream Producer | Event Producer |
|---|
| Timing | Automatic (sync_rate) | Manual |
| Use case | Periodic updates | Triggered events |
| Handler | Returns Signal | Receives Signal |
| Control | Library manages timing | You control when to publish |
| Example | Sensor every 1s | Button click |
Stream example:
from avenieca.producers import Stream
from avenieca.data import Signal
def handler():
return Signal(valence=10, state=[1.0, 2.0])
stream = Stream(config=config, sync_rate=1)
stream.publish(handler) # Calls handler every 1 second
Event example:
from avenieca.producers import Event
from avenieca.data import Signal
event = Event(config=config)
signal = Signal(valence=10, state=[1.0, 2.0])
event.publish(signal) # Publishes immediately when you call it
Advanced Examples
Batch Event Publishing
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="batch_events",
group="batch",
pub_topic=""
)
event = Event(config=config)
def process_batch(data_list):
"""Process a batch of data and publish each as an event"""
for idx, data in enumerate(data_list):
signal = Signal(
valence=data["valence"],
state=data["state"],
score=idx + 1
)
event.publish(signal)
# Process batch
batch = [
{"valence": 10.0, "state": [1.0, 2.0, 3.0]},
{"valence": 20.0, "state": [4.0, 5.0, 6.0]},
{"valence": 30.0, "state": [7.0, 8.0, 9.0]}
]
process_batch(batch)
Conditional Publishing
import random
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="conditional",
group="filter",
pub_topic=""
)
event = Event(config=config)
def publish_if_significant(value, threshold=5.0):
"""Only publish events that meet significance threshold"""
if abs(value) > threshold:
signal = Signal(
valence=value,
state=[value, threshold, abs(value) - threshold]
)
event.publish(signal)
return True
return False
# Only significant values get published
for _ in range(100):
value = random.uniform(-10, 10)
published = publish_if_significant(value, threshold=7.0)
if published:
print(f"Published: {value}")
Error Handling
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="errors",
group="error_handler",
pub_topic=""
)
event = Event(config=config)
def safe_publish(data):
"""Publish with error handling"""
try:
signal = Signal(
valence=data.get("valence", 0.0),
state=data.get("state", [0.0, 0.0, 0.0])
)
event.publish(signal)
return True
except Exception as e:
print(f"Failed to publish: {e}")
return False
# Safely handle potentially invalid data
result = safe_publish({"valence": 10.0, "state": [1.0, 2.0, 3.0]})
print(f"Published: {result}")
Signal Validation
The Event producer validates signals before publishing (see avenieca/producers/event.py:29):
- Signal must be a Signal dataclass instance
- State must not be None or empty
- State must be 1-dimensional
- State values must be int or float
Controlling Sync
The sync property controls whether publishing is enabled:
from avenieca.producers import Event
from avenieca.data import Signal
from avenieca.config.broker import Broker
config = Broker(
url="localhost:9092",
sub_topic="controlled",
group="control",
pub_topic=""
)
event = Event(config=config)
# Enable publishing
event.sync = True
event.publish(Signal(valence=10, state=[1.0, 2.0, 3.0])) # Publishes
# Disable publishing
event.sync = False
event.publish(Signal(valence=10, state=[1.0, 2.0, 3.0])) # Does not publish