Skip to main content
The PubSubAdapter trait defines the interface for pub/sub messaging in III. PubSub adapters enable event-driven architectures by allowing functions to publish and subscribe to topics.

Trait Definition

#[async_trait]
pub trait PubSubAdapter: Send + Sync + 'static {
    async fn publish(&self, topic: &str, pubsub_data: Value);
    async fn subscribe(&self, topic: &str, id: &str, function_id: &str);
    async fn unsubscribe(&self, topic: &str, id: &str);
}
Source: /workspace/source/src/modules/pubsub/mod.rs:21

Methods

publish

async fn publish(&self, topic: &str, pubsub_data: Value)
Publishes an event to a topic. All functions subscribed to this topic will be invoked with the event data. Parameters:
  • topic - The topic name to publish to
  • pubsub_data - JSON value containing the event data
Note: This method does not return an error. Implementations should handle errors internally with logging.

subscribe

async fn subscribe(&self, topic: &str, id: &str, function_id: &str)
Subscribes a function to receive events from a topic. Parameters:
  • topic - The topic name to subscribe to
  • id - Unique subscription identifier
  • function_id - The function to invoke when events are published
Behavior:
  • When an event is published to the topic, the specified function is called with the event data
  • Multiple subscriptions to the same topic are supported
  • Subscriptions persist until explicitly unsubscribed

unsubscribe

async fn unsubscribe(&self, topic: &str, id: &str)
Removes a subscription from a topic. Parameters:
  • topic - The topic name to unsubscribe from
  • id - The subscription identifier to remove
Behavior:
  • Stops the function from receiving events for this topic
  • If this is the last subscription, resources may be cleaned up

Available Adapters

RedisAdapter

Redis-based pub/sub for distributed event messaging across multiple engine instances.
modules: {
  pubsub: {
    adapter: "modules::pubsub::RedisAdapter",
    config: {
      redis_url: "redis://localhost:6379"
    }
  }
}
Features:
  • Distributed messaging across engine instances
  • Persistent connections with automatic reconnection
  • Asynchronous event handling
  • Per-topic subscription tasks
Source: /workspace/source/src/modules/pubsub/adapters/redis_adapter.rs

LocalAdapter

In-memory pub/sub for single-instance deployments and development.
modules: {
  pubsub: {
    adapter: "modules::pubsub::LocalAdapter"
  }
}
Features:
  • Zero external dependencies
  • Low latency event delivery
  • Perfect for development and testing
  • Events only delivered within the same process
Source: /workspace/source/src/modules/pubsub/adapters/local_adapter.rs

Example Implementation

use async_trait::async_trait;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;

struct CustomPubSubAdapter {
    subscriptions: Arc<RwLock<HashMap<String, Vec<(String, String)>>>>,
    engine: Arc<Engine>,
}

#[async_trait]
impl PubSubAdapter for CustomPubSubAdapter {
    async fn publish(&self, topic: &str, event_data: Value) {
        let subs = self.subscriptions.read().await;
        
        if let Some(subscribers) = subs.get(topic) {
            for (id, function_id) in subscribers {
                let engine = Arc::clone(&self.engine);
                let function_id = function_id.clone();
                let event_data = event_data.clone();
                
                // Spawn async task to invoke function
                tokio::spawn(async move {
                    let _ = engine.call(&function_id, event_data).await;
                });
            }
        }
    }

    async fn subscribe(&self, topic: &str, id: &str, function_id: &str) {
        let mut subs = self.subscriptions.write().await;
        subs.entry(topic.to_string())
            .or_insert_with(Vec::new)
            .push((id.to_string(), function_id.to_string()));
    }

    async fn unsubscribe(&self, topic: &str, id: &str) {
        let mut subs = self.subscriptions.write().await;
        
        if let Some(subscribers) = subs.get_mut(topic) {
            subscribers.retain(|(sub_id, _)| sub_id != id);
            
            // Clean up empty topics
            if subscribers.is_empty() {
                subs.remove(topic);
            }
        }
    }
}

Usage Example

Defining a function that subscribes to events:
export default {
  async handler(event) {
    console.log("Received event:", event);
    return { processed: true };
  },
  
  triggers: [
    {
      type: "subscribe",
      topic: "user.created"
    }
  ]
}
Publishing events from another function:
export default {
  async handler({ pubsub }) {
    await pubsub.publish({
      topic: "user.created",
      data: {
        userId: "user-123",
        email: "[email protected]",
        timestamp: new Date().toISOString()
      }
    });
    
    return { published: true };
  }
}

Implementation Notes

Error Handling

The publish method doesn’t return errors. Implementations should:
  • Log errors internally using tracing::error!
  • Continue processing other subscribers if one fails
  • Not block the publisher on delivery failures

Concurrency

Subscriber functions are typically invoked concurrently:
  • Use tokio::spawn to invoke functions asynchronously
  • Multiple events can be processed simultaneously
  • Consider rate limiting for high-volume topics

Cleanup

Implementations should:
  • Track active subscriptions and spawned tasks
  • Abort tasks when unsubscribing
  • Clean up resources when topics have no subscribers
  • Implement proper shutdown on adapter destruction

Build docs developers (and LLMs) love