Skip to main content

Overview

The Event class is a producer for event-driven syncing. Use this class when you want to handle the outer syncing logic yourself and manually publish signals as events occur.

Constructor

Event(config: Broker)
Creates a new Event producer instance.
config
Broker
required
Configuration object containing broker connection details. Must include:
  • url: Broker URL
  • sub_topic: Subscription topic
  • pub_topic: Publishing topic
  • group: Consumer group
  • auto_offset_reset: Offset reset policy (default: “latest”)

Methods

publish()

publish(signal: Signal)
Publishes a signal once to a digital twin. Call this method whenever you have new event data to publish.
signal
Signal
required
Signal dataclass containing the data to publish. Must include:
  • state: List of float values representing the signal state
  • valence: Optional float value
  • score: Optional integer value
  • emb_inp: Optional integer value
Returns: Result of the send operation

Properties

config

Gets or sets the broker configuration.
event.config  # Get current config
event.config = new_config  # Set new config

sync

Boolean flag that controls whether the event producer is active. Set to False to disable publishing.
event.sync = False  # Disable publishing

Example

from avenieca import Event, Broker, Signal

# Configure broker connection
config = Broker(
    url="localhost:9092",
    sub_topic="user-events",
    pub_topic="processed-events",
    group="event-group"
)

# Create event producer
event = Event(config=config)

# Publish when events occur
def on_user_action(user_id, action_data):
    signal = Signal(
        state=action_data,
        valence=0.92,
        score=100
    )
    event.publish(signal)

# Example: Button click event
button_data = [1.0, 0.0, 0.5]  # x, y, intensity
signal = Signal(
    state=button_data,
    valence=0.8,
    score=85,
    emb_inp=1
)
event.publish(signal)

# Example: Webhook handler
def webhook_handler(payload):
    # Process webhook payload
    processed_data = process_webhook(payload)
    
    signal = Signal(
        state=processed_data['values'],
        valence=processed_data.get('valence'),
        score=processed_data.get('score')
    )
    
    event.publish(signal)

# Example: IoT device trigger
def on_motion_detected(sensor_readings):
    signal = Signal(
        state=sensor_readings,
        valence=1.0
    )
    event.publish(signal)
  • Stream - Time-based continuous producer
  • Consumer - Consume signals from digital twins
  • Signal - Signal data structure

Build docs developers (and LLMs) love