Skip to main content

PubSub

A type-safe Redis pub/sub wrapper for real-time messaging with result-based error handling. Built on Bun’s native Redis client.

Import

import { PubSub } from "semola/pubsub";

Class: PubSub

Constructor

new PubSub<T extends Record<string, unknown>>(options: PubSubOptions)
options
PubSubOptions
required
PubSub configuration options
options.subscriber
Bun.RedisClient
required
Redis client for subscribe/unsubscribe operations. Must be a separate instance from publisher when both subscribing and publishing.
options.publisher
Bun.RedisClient
required
Redis client for publish operations. Can be the same instance as subscriber if only subscribing or only publishing.
options.channel
string
required
Channel name to publish/subscribe to
Example:
type UserEvent = {
  userId: string;
  action: "login" | "logout" | "update";
  timestamp: number;
};

// Two separate connections required when both subscribing and publishing
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");

const pubsub = new PubSub<UserEvent>({
  subscriber,
  publisher,
  channel: "user-events",
});

Methods

publish

Publishes a message to the channel.
publish(message: T): Promise<[Error, null] | [null, number]>
message
T
required
Message object to publish (must be JSON-serializable)
result
[null, number] | [Error, null]
Result tuple containing either:
  • [null, count] - Success with number of subscribers who received the message
  • [error, null] - Error with type "SerializationError" or "PublishError"
Example:
const [error, count] = await pubsub.publish({
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

if (error) {
  console.error("Failed to publish:", error.message);
} else {
  console.log(`Message delivered to ${count} subscribers`);
}

subscribe

Subscribes to messages with a handler function.
subscribe(handler: MessageHandler<T>): Promise<[Error, null] | [null, number]>
handler
MessageHandler<T>
required
Function called for each message received. Type: (message: T, channel: string) => void | Promise<void>
result
[null, number] | [Error, null]
Result tuple containing either:
  • [null, count] - Success with number of channels subscribed
  • [error, null] - Error with type "SubscribeError"
Example:
const [error] = await pubsub.subscribe(async (message, channel) => {
  console.log(`Received on ${channel}:`, message);
  await processMessage(message);
});

if (error) {
  console.error("Failed to subscribe:", error.message);
}

unsubscribe

Unsubscribes from the channel and cleans up the handler.
unsubscribe(): Promise<[Error, null] | [null, true]>
result
[null, true] | [Error, null]
Result tuple containing either:
  • [null, true] - Success
  • [error, null] - Error with type "UnsubscribeError"
Example:
const [error] = await pubsub.unsubscribe();

if (error) {
  console.error("Failed to unsubscribe:", error.message);
}

isActive

Returns whether the PubSub instance is currently subscribed.
isActive(): boolean
active
boolean
true if currently subscribed, false otherwise
Example:
if (pubsub.isActive()) {
  console.log("Currently subscribed");
}

Type Definitions

PubSubOptions

type PubSubOptions = {
  subscriber: Bun.RedisClient;
  publisher: Bun.RedisClient;
  channel: string;
};

MessageHandler

type MessageHandler<T> = (
  message: T,
  channel: string,
) => void | Promise<void>;
message
T
Parsed message object
channel
string
Channel the message was received on

Usage Examples

Basic Channel Subscription

import { PubSub } from "semola/pubsub";

type UserEvent = {
  userId: string;
  action: "login" | "logout" | "update";
  timestamp: number;
};

// Two connections: subscriber mode cannot publish
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");

const events = new PubSub<UserEvent>({
  subscriber,
  publisher,
  channel: "user-events",
});

// Subscribe to events
await events.subscribe(async (event) => {
  console.log(`User ${event.userId} performed ${event.action}`);
  await logToDatabase(event);
});

// Publish events
await events.publish({
  userId: "123",
  action: "login",
  timestamp: Date.now(),
});

Error Handling

import { PubSub } from "semola/pubsub";

type Notification = {
  notification: string;
};

const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");

const pubsub = new PubSub<Notification>({
  subscriber,
  publisher,
  channel: "notifications",
});

// Subscribe with error handling
const [subscribeError] = await pubsub.subscribe(async (message) => {
  // Handler errors are caught automatically;
  // subscription remains active even if handler throws
  await processNotification(message.notification);
});

if (subscribeError) {
  console.error("Failed to subscribe:", subscribeError.message);
  process.exit(1);
}

// Publish with error handling
const [publishError, count] = await pubsub.publish({ 
  notification: "Hello!" 
});

if (publishError) {
  switch (publishError.type) {
    case "SerializationError":
      console.error("Invalid message format");
      break;
    case "PublishError":
      console.error("Redis connection failed");
      break;
  }
} else {
  console.log(`Delivered to ${count} subscribers`);
}

// Clean up
await pubsub.unsubscribe();

Multiple Instances

import { PubSub } from "semola/pubsub";

const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");

// Separate instances for different channels
const notifications = new PubSub<{ message: string }>({
  subscriber,
  publisher,
  channel: "notifications",
});

const alerts = new PubSub<{ level: string; text: string }>({
  subscriber,
  publisher,
  channel: "alerts",
});

await notifications.subscribe(async (msg) => {
  console.log("Notification:", msg.message);
});

await alerts.subscribe(async (msg) => {
  console.log(`Alert [${msg.level}]:`, msg.text);
});

// Publish to different channels
await notifications.publish({ message: "Welcome!" });
await alerts.publish({ level: "warning", text: "High CPU usage" });

Real-time Chat

import { PubSub } from "semola/pubsub";

type ChatMessage = {
  roomId: string;
  userId: string;
  username: string;
  message: string;
  timestamp: number;
};

class ChatRoom {
  private pubsub: PubSub<ChatMessage>;
  
  constructor(roomId: string) {
    const subscriber = new Bun.RedisClient("redis://localhost:6379");
    const publisher = new Bun.RedisClient("redis://localhost:6379");
    
    this.pubsub = new PubSub<ChatMessage>({
      subscriber,
      publisher,
      channel: `chat:${roomId}`,
    });
  }
  
  async join(onMessage: (msg: ChatMessage) => void) {
    const [error] = await this.pubsub.subscribe(onMessage);
    if (error) {
      throw new Error(`Failed to join room: ${error.message}`);
    }
  }
  
  async send(userId: string, username: string, message: string) {
    const [error] = await this.pubsub.publish({
      roomId: this.pubsub.options.channel,
      userId,
      username,
      message,
      timestamp: Date.now(),
    });
    
    if (error) {
      throw new Error(`Failed to send message: ${error.message}`);
    }
  }
  
  async leave() {
    await this.pubsub.unsubscribe();
  }
}

// Usage
const room = new ChatRoom("general");

await room.join((msg) => {
  console.log(`[${msg.username}]: ${msg.message}`);
});

await room.send("123", "Alice", "Hello everyone!");

Important Notes

No Message Persistence

Redis pub/sub is ephemeral. Messages are delivered at-most-once and only to active subscribers. If no subscribers are connected, messages are discarded. For guaranteed delivery, consider using Redis Streams instead.

Message Ordering

Messages on a single channel are delivered in order. Pattern subscriptions matching multiple channels have no cross-channel ordering guarantees.

Handler Errors

If your message handler throws an error, it will be caught. The subscription remains active and continues processing subsequent messages.

Lifecycle Management

The PubSub class does not manage the Redis client lifecycle. You provide the clients when creating the instance and are responsible for closing them when done:
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");
const pubsub = new PubSub({ subscriber, publisher, channel: "events" });

// Use pubsub...

// Clean up
await pubsub.unsubscribe();
await subscriber.quit();
await publisher.quit();

Subscriber Mode and Publish

A Redis connection in subscriber mode (after SUBSCRIBE) cannot run PUBLISH. Use two connections (subscriber for subscribe/unsubscribe, publisher for publish) when the same PubSub instance both subscribes and publishes. The same client can be passed for both when only subscribing or only publishing.

JSON Serialization

Messages are automatically serialized to JSON when published and deserialized when received. This ensures type safety but means only JSON-serializable values can be sent. Attempting to publish circular references or other non-serializable values will return a SerializationError.

One Handler Per Instance

Each PubSub instance supports a single message handler. If you need multiple handlers for the same channel, create multiple PubSub instances.

When to Use Redis Streams

If you need message acknowledgment, guaranteed delivery, message history, or consumer groups, use Redis Streams instead of pub/sub. PubSub is best for real-time, fire-and-forget messaging where occasional message loss is acceptable.

Build docs developers (and LLMs) love