Skip to main content

Overview

Framefox provides a powerful event system that allows you to build reactive, decoupled applications. The event dispatcher enables components to communicate without tight coupling.

Event System

The event system consists of:
  • EventDispatcher: Central hub for managing events and listeners
  • Event Listeners: Functions or classes that respond to events
  • Decorators: Convenient event dispatching in middleware
  • Auto-loading: Automatic listener registration

Basic Usage

Dispatching Events

from framefox.core.events.event_dispatcher import dispatcher

# Dispatch a simple event
dispatcher.dispatch("user.registered", {"user_id": 123, "email": "[email protected]"})

# Dispatch with complex payload
dispatcher.dispatch(
    "order.created",
    {
        "order_id": "ORD-001",
        "customer_id": 456,
        "total": 99.99,
        "items": [{"sku": "PROD-1", "quantity": 2}]
    }
)

Adding Listeners

from framefox.core.events.event_dispatcher import dispatcher

# Function listener
def send_welcome_email(payload):
    user_id = payload.get("user_id")
    email = payload.get("email")
    # Send email logic...
    print(f"Sending welcome email to {email}")

# Register listener
dispatcher.add_listener("user.registered", send_welcome_email)

Event Listeners

Class-Based Listeners

Create organized listener classes:
class OrderListener:
    """Listener for order-related events"""
    
    def on_order_created(self, payload):
        order_id = payload.get("order_id")
        print(f"Order {order_id} created")
        # Process order, update inventory, etc.
    
    def on_order_completed(self, payload):
        order_id = payload.get("order_id")
        print(f"Order {order_id} completed")
        # Send confirmation, generate invoice, etc.
    
    def register_listeners(self, dispatcher):
        """Register all listeners for this class"""
        dispatcher.add_listener("order.created", self.on_order_created)
        dispatcher.add_listener("order.completed", self.on_order_completed)

# Register the listener class
listener = OrderListener()
listener.register_listeners(dispatcher)

Auto-Loading Listeners

Framefox can automatically discover and register listeners:
from framefox.core.events.event_dispatcher import dispatcher

# Load all listeners from a package
dispatcher.load_listeners("myapp.events.listeners")
Project structure:
myapp/
├── events/
│   └── listeners/
│       ├── __init__.py
│       ├── user_listener.py
│       ├── order_listener.py
│       └── payment_listener.py
user_listener.py:
class UserListener:
    def on_user_login(self, payload):
        user = payload.get("user")
        print(f"User {user.email} logged in")
    
    def on_user_logout(self, payload):
        user = payload.get("user")
        print(f"User {user.email} logged out")
    
    def register_listeners(self, dispatcher):
        dispatcher.add_listener("user.login", self.on_user_login)
        dispatcher.add_listener("user.logout", self.on_user_logout)

DispatchEvent Decorator

Use the @DispatchEvent decorator to automatically dispatch events in middleware:
from framefox.core.events.decorator.dispatch_event import DispatchEvent
from starlette.middleware.base import BaseHTTPMiddleware

class CustomMiddleware(BaseHTTPMiddleware):
    @DispatchEvent(
        event_before="kernel.request_received",
        event_after="kernel.request_completed"
    )
    async def dispatch(self, request, call_next):
        # Process request
        response = await call_next(request)
        return response
The decorator automatically dispatches events:
  • Before execution: kernel.request_received with {"request": request}
  • After execution: kernel.request_completed with {"response": response}

Built-in Events

Framefox dispatches several built-in events:

Kernel Events

# Request lifecycle
"kernel.request_received"    # When request is received
"kernel.request_completed"   # When request is completed

Worker Events

# Task execution
"worker.task.before_execution"  # Before task runs
"worker.task.after_execution"   # After task completes
"worker.task.execution_error"   # When task fails

Listening to Built-in Events

from framefox.core.events.event_dispatcher import dispatcher

class RequestListener:
    def on_request_received(self, payload):
        request = payload.get("request")
        if request:
            method = request.method
            url = request.url.path
            client = request.client.host
            print(f"[Request] {method} {url} from {client}")
    
    def on_request_completed(self, payload):
        response = payload.get("response")
        if response:
            status_code = response.status_code
            print(f"[Response] Status: {status_code}")
    
    def register_listeners(self, dispatcher):
        dispatcher.add_listener("kernel.request_received", self.on_request_received)
        dispatcher.add_listener("kernel.request_completed", self.on_request_completed)

Custom Events

Creating Custom Events

Define your own events for application logic:
from dataclasses import dataclass
from datetime import datetime

# Define event payload structure
@dataclass
class UserRegisteredEvent:
    user_id: int
    email: str
    username: str
    timestamp: datetime

# Dispatch custom event
from framefox.core.events.event_dispatcher import dispatcher

def register_user(email, username):
    # Create user in database...
    user_id = create_user(email, username)
    
    # Dispatch event
    event_data = UserRegisteredEvent(
        user_id=user_id,
        email=email,
        username=username,
        timestamp=datetime.now()
    )
    
    dispatcher.dispatch("user.registered", event_data.__dict__)
    
    return user_id

Multi-Listener Events

Multiple listeners can respond to the same event:
# Email notification
def send_welcome_email(payload):
    email = payload.get("email")
    # Send email...

# Analytics tracking
def track_registration(payload):
    user_id = payload.get("user_id")
    # Track in analytics...

# Audit logging
def log_registration(payload):
    username = payload.get("username")
    # Log to audit trail...

# Register all listeners
dispatcher.add_listener("user.registered", send_welcome_email)
dispatcher.add_listener("user.registered", track_registration)
dispatcher.add_listener("user.registered", log_registration)

Event Naming Conventions

Follow these conventions for event names:
# Format: <entity>.<action>
"user.created"
"user.updated"
"user.deleted"
"user.login"
"user.logout"

"order.created"
"order.shipped"
"order.delivered"
"order.cancelled"

"payment.received"
"payment.failed"
"payment.refunded"

# System events use kernel prefix
"kernel.request_received"
"kernel.response_sent"
"kernel.exception"

# Worker events
"worker.task.started"
"worker.task.completed"
"worker.task.failed"

Advanced Patterns

Event Priority

Execute listeners in specific order by registering them sequentially:
# High priority - execute first
dispatcher.add_listener("order.created", validate_inventory)

# Medium priority
dispatcher.add_listener("order.created", process_payment)

# Low priority - execute last
dispatcher.add_listener("order.created", send_confirmation)

Asynchronous Event Handling

For long-running operations, dispatch to background tasks:
from framefox.core.task.task_manager import TaskManager
from framefox.core.di.service_container import ServiceContainer

def on_order_created(payload):
    # Quick synchronous operations
    order_id = payload.get("order_id")
    
    # Dispatch heavy work to background
    container = ServiceContainer()
    task_manager = container.get(TaskManager)
    task_manager.queue_task(
        "myapp.tasks.process_order",
        order_id=order_id
    )

dispatcher.add_listener("order.created", on_order_created)

Event Aggregation

Collect multiple events for batch processing:
class EventAggregator:
    def __init__(self):
        self.events = []
    
    def collect_event(self, payload):
        self.events.append(payload)
        
        # Process in batches of 100
        if len(self.events) >= 100:
            self.process_batch()
    
    def process_batch(self):
        # Bulk insert, batch API calls, etc.
        print(f"Processing {len(self.events)} events")
        self.events.clear()

aggregator = EventAggregator()
dispatcher.add_listener("analytics.track", aggregator.collect_event)

Best Practices

  1. Event Names: Use clear, descriptive names with entity.action format
  2. Payload Structure: Keep payloads simple and serializable
  3. Error Handling: Wrap listener code in try-except blocks
  4. Performance: Keep listeners fast; use background tasks for heavy work
  5. Testing: Test listeners independently from event dispatching
  6. Documentation: Document available events and their payload structure

Testing Events

import pytest
from framefox.core.events.event_dispatcher import EventDispatcher

def test_user_registration_event():
    # Create a test dispatcher
    dispatcher = EventDispatcher()
    
    # Track if listener was called
    called = {"value": False}
    
    def test_listener(payload):
        called["value"] = True
        assert payload["email"] == "[email protected]"
    
    # Register and dispatch
    dispatcher.add_listener("user.registered", test_listener)
    dispatcher.dispatch("user.registered", {"email": "[email protected]"})
    
    # Verify
    assert called["value"] == True

Build docs developers (and LLMs) love