Skip to main content
The Event class provides event-driven publishing to your digital twin. Unlike the Stream producer, you control when signals are sent, making it ideal for triggered events and irregular updates.

Class Definition

Location: avenieca/producers/event.py:8
class Event(Producer):
    def __init__(self, config: Broker)
Parameters:
  • config (Broker): Broker configuration object

Methods

publish

Signature: avenieca/producers/event.py:21
def publish(self, signal: Signal)
Publish a single signal to the digital twin. Parameters:
  • signal (Signal): Signal dataclass instance to publish
Returns: None

Basic Usage

Simple Event Publishing

import os
from avenieca.config.broker import Broker
from avenieca.data import Signal
from avenieca.producers import Event

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="left_wheel",
    group="test",
    pub_topic=""
)

# Define the signal
signal = Signal(
    valence=9.0,
    state=[0.2, 0.3, 0.8]
)

event = Event(config=config)
event.publish(signal)  # Publishes immediately

Multiple Events

Publish multiple events independently:
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="events",
    group="app",
    pub_topic=""
)

event_producer = Event(config=config)

# Publish first event
event_producer.publish(Signal(
    valence=10.0,
    state=[1.0, 2.0, 3.0],
    score=1
))

# Publish second event
event_producer.publish(Signal(
    valence=15.0,
    state=[4.0, 5.0, 6.0],
    score=2
))

When to Use Event Producer

Use Event Producer for:

User-triggered actions
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="user_actions",
    group="webapp",
    pub_topic=""
)

event = Event(config=config)

def on_button_click(button_id, user_id):
    signal = Signal(
        valence=1.0,
        state=[float(button_id), float(user_id)]
    )
    event.publish(signal)
State changes
from avenieca.data import Signal
from avenieca.producers import Event

class SystemMonitor:
    def __init__(self, event_producer):
        self.event = event_producer
        self.state = "idle"
    
    def change_state(self, new_state):
        old_state = self.state
        self.state = new_state
        
        # Publish state change event
        signal = Signal(
            valence=self.get_state_value(new_state),
            state=[self.encode_state(old_state), self.encode_state(new_state)]
        )
        self.event.publish(signal)
    
    def get_state_value(self, state):
        return {"idle": 0, "running": 10, "error": -10}.get(state, 0)
    
    def encode_state(self, state):
        return float({"idle": 0, "running": 1, "error": 2}.get(state, 0))
Threshold-based alerts
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="alerts",
    group="monitoring",
    pub_topic=""
)

event = Event(config=config)

def check_temperature(temp):
    if temp > 80:
        # Publish alert event only when threshold exceeded
        signal = Signal(
            valence=-100.0,  # High negative valence for alert
            state=[temp, 80.0, temp - 80.0],  # current, threshold, difference
            score=10  # High priority
        )
        event.publish(signal)
External system integration
import json
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="webhooks",
    group="integrations",
    pub_topic=""
)

event = Event(config=config)

def webhook_handler(webhook_data):
    """Handle incoming webhook and publish as event"""
    payload = json.loads(webhook_data)
    
    signal = Signal(
        valence=payload.get("priority", 0),
        state=[
            float(payload.get("value1", 0)),
            float(payload.get("value2", 0)),
            float(payload.get("value3", 0))
        ]
    )
    event.publish(signal)

Stream vs Event Comparison

FeatureStream ProducerEvent Producer
TimingAutomatic (sync_rate)Manual
Use casePeriodic updatesTriggered events
HandlerReturns SignalReceives Signal
ControlLibrary manages timingYou control when to publish
ExampleSensor every 1sButton click
Stream example:
from avenieca.producers import Stream
from avenieca.data import Signal

def handler():
    return Signal(valence=10, state=[1.0, 2.0])

stream = Stream(config=config, sync_rate=1)
stream.publish(handler)  # Calls handler every 1 second
Event example:
from avenieca.producers import Event
from avenieca.data import Signal

event = Event(config=config)
signal = Signal(valence=10, state=[1.0, 2.0])
event.publish(signal)  # Publishes immediately when you call it

Advanced Examples

Batch Event Publishing

from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="batch_events",
    group="batch",
    pub_topic=""
)

event = Event(config=config)

def process_batch(data_list):
    """Process a batch of data and publish each as an event"""
    for idx, data in enumerate(data_list):
        signal = Signal(
            valence=data["valence"],
            state=data["state"],
            score=idx + 1
        )
        event.publish(signal)

# Process batch
batch = [
    {"valence": 10.0, "state": [1.0, 2.0, 3.0]},
    {"valence": 20.0, "state": [4.0, 5.0, 6.0]},
    {"valence": 30.0, "state": [7.0, 8.0, 9.0]}
]
process_batch(batch)

Conditional Publishing

import random
from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="conditional",
    group="filter",
    pub_topic=""
)

event = Event(config=config)

def publish_if_significant(value, threshold=5.0):
    """Only publish events that meet significance threshold"""
    if abs(value) > threshold:
        signal = Signal(
            valence=value,
            state=[value, threshold, abs(value) - threshold]
        )
        event.publish(signal)
        return True
    return False

# Only significant values get published
for _ in range(100):
    value = random.uniform(-10, 10)
    published = publish_if_significant(value, threshold=7.0)
    if published:
        print(f"Published: {value}")

Error Handling

from avenieca.data import Signal
from avenieca.producers import Event
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="errors",
    group="error_handler",
    pub_topic=""
)

event = Event(config=config)

def safe_publish(data):
    """Publish with error handling"""
    try:
        signal = Signal(
            valence=data.get("valence", 0.0),
            state=data.get("state", [0.0, 0.0, 0.0])
        )
        event.publish(signal)
        return True
    except Exception as e:
        print(f"Failed to publish: {e}")
        return False

# Safely handle potentially invalid data
result = safe_publish({"valence": 10.0, "state": [1.0, 2.0, 3.0]})
print(f"Published: {result}")

Signal Validation

The Event producer validates signals before publishing (see avenieca/producers/event.py:29):
  • Signal must be a Signal dataclass instance
  • State must not be None or empty
  • State must be 1-dimensional
  • State values must be int or float

Controlling Sync

The sync property controls whether publishing is enabled:
from avenieca.producers import Event
from avenieca.data import Signal
from avenieca.config.broker import Broker

config = Broker(
    url="localhost:9092",
    sub_topic="controlled",
    group="control",
    pub_topic=""
)

event = Event(config=config)

# Enable publishing
event.sync = True
event.publish(Signal(valence=10, state=[1.0, 2.0, 3.0]))  # Publishes

# Disable publishing
event.sync = False
event.publish(Signal(valence=10, state=[1.0, 2.0, 3.0]))  # Does not publish

Build docs developers (and LLMs) love