PubSub
A type-safe Redis pub/sub wrapper for real-time messaging with result-based error handling. Built on Bun’s native Redis client.
Import
import { PubSub } from "semola/pubsub";
Class: PubSub
Constructor
new PubSub<T extends Record<string, unknown>>(options: PubSubOptions)
PubSub configuration optionsRedis client for subscribe/unsubscribe operations. Must be a separate instance from publisher when both subscribing and publishing.
Redis client for publish operations. Can be the same instance as subscriber if only subscribing or only publishing.
Channel name to publish/subscribe to
Example:
type UserEvent = {
userId: string;
action: "login" | "logout" | "update";
timestamp: number;
};
// Two separate connections required when both subscribing and publishing
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");
const pubsub = new PubSub<UserEvent>({
subscriber,
publisher,
channel: "user-events",
});
Methods
publish
Publishes a message to the channel.
publish(message: T): Promise<[Error, null] | [null, number]>
Message object to publish (must be JSON-serializable)
result
[null, number] | [Error, null]
Result tuple containing either:
[null, count] - Success with number of subscribers who received the message
[error, null] - Error with type "SerializationError" or "PublishError"
Example:
const [error, count] = await pubsub.publish({
userId: "123",
action: "login",
timestamp: Date.now(),
});
if (error) {
console.error("Failed to publish:", error.message);
} else {
console.log(`Message delivered to ${count} subscribers`);
}
subscribe
Subscribes to messages with a handler function.
subscribe(handler: MessageHandler<T>): Promise<[Error, null] | [null, number]>
handler
MessageHandler<T>
required
Function called for each message received. Type: (message: T, channel: string) => void | Promise<void>
result
[null, number] | [Error, null]
Result tuple containing either:
[null, count] - Success with number of channels subscribed
[error, null] - Error with type "SubscribeError"
Example:
const [error] = await pubsub.subscribe(async (message, channel) => {
console.log(`Received on ${channel}:`, message);
await processMessage(message);
});
if (error) {
console.error("Failed to subscribe:", error.message);
}
unsubscribe
Unsubscribes from the channel and cleans up the handler.
unsubscribe(): Promise<[Error, null] | [null, true]>
result
[null, true] | [Error, null]
Result tuple containing either:
[null, true] - Success
[error, null] - Error with type "UnsubscribeError"
Example:
const [error] = await pubsub.unsubscribe();
if (error) {
console.error("Failed to unsubscribe:", error.message);
}
isActive
Returns whether the PubSub instance is currently subscribed.
true if currently subscribed, false otherwise
Example:
if (pubsub.isActive()) {
console.log("Currently subscribed");
}
Type Definitions
PubSubOptions
type PubSubOptions = {
subscriber: Bun.RedisClient;
publisher: Bun.RedisClient;
channel: string;
};
MessageHandler
type MessageHandler<T> = (
message: T,
channel: string,
) => void | Promise<void>;
Channel the message was received on
Usage Examples
Basic Channel Subscription
import { PubSub } from "semola/pubsub";
type UserEvent = {
userId: string;
action: "login" | "logout" | "update";
timestamp: number;
};
// Two connections: subscriber mode cannot publish
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");
const events = new PubSub<UserEvent>({
subscriber,
publisher,
channel: "user-events",
});
// Subscribe to events
await events.subscribe(async (event) => {
console.log(`User ${event.userId} performed ${event.action}`);
await logToDatabase(event);
});
// Publish events
await events.publish({
userId: "123",
action: "login",
timestamp: Date.now(),
});
Error Handling
import { PubSub } from "semola/pubsub";
type Notification = {
notification: string;
};
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");
const pubsub = new PubSub<Notification>({
subscriber,
publisher,
channel: "notifications",
});
// Subscribe with error handling
const [subscribeError] = await pubsub.subscribe(async (message) => {
// Handler errors are caught automatically;
// subscription remains active even if handler throws
await processNotification(message.notification);
});
if (subscribeError) {
console.error("Failed to subscribe:", subscribeError.message);
process.exit(1);
}
// Publish with error handling
const [publishError, count] = await pubsub.publish({
notification: "Hello!"
});
if (publishError) {
switch (publishError.type) {
case "SerializationError":
console.error("Invalid message format");
break;
case "PublishError":
console.error("Redis connection failed");
break;
}
} else {
console.log(`Delivered to ${count} subscribers`);
}
// Clean up
await pubsub.unsubscribe();
Multiple Instances
import { PubSub } from "semola/pubsub";
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");
// Separate instances for different channels
const notifications = new PubSub<{ message: string }>({
subscriber,
publisher,
channel: "notifications",
});
const alerts = new PubSub<{ level: string; text: string }>({
subscriber,
publisher,
channel: "alerts",
});
await notifications.subscribe(async (msg) => {
console.log("Notification:", msg.message);
});
await alerts.subscribe(async (msg) => {
console.log(`Alert [${msg.level}]:`, msg.text);
});
// Publish to different channels
await notifications.publish({ message: "Welcome!" });
await alerts.publish({ level: "warning", text: "High CPU usage" });
Real-time Chat
import { PubSub } from "semola/pubsub";
type ChatMessage = {
roomId: string;
userId: string;
username: string;
message: string;
timestamp: number;
};
class ChatRoom {
private pubsub: PubSub<ChatMessage>;
constructor(roomId: string) {
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");
this.pubsub = new PubSub<ChatMessage>({
subscriber,
publisher,
channel: `chat:${roomId}`,
});
}
async join(onMessage: (msg: ChatMessage) => void) {
const [error] = await this.pubsub.subscribe(onMessage);
if (error) {
throw new Error(`Failed to join room: ${error.message}`);
}
}
async send(userId: string, username: string, message: string) {
const [error] = await this.pubsub.publish({
roomId: this.pubsub.options.channel,
userId,
username,
message,
timestamp: Date.now(),
});
if (error) {
throw new Error(`Failed to send message: ${error.message}`);
}
}
async leave() {
await this.pubsub.unsubscribe();
}
}
// Usage
const room = new ChatRoom("general");
await room.join((msg) => {
console.log(`[${msg.username}]: ${msg.message}`);
});
await room.send("123", "Alice", "Hello everyone!");
Important Notes
No Message Persistence
Redis pub/sub is ephemeral. Messages are delivered at-most-once and only to active subscribers. If no subscribers are connected, messages are discarded. For guaranteed delivery, consider using Redis Streams instead.
Message Ordering
Messages on a single channel are delivered in order. Pattern subscriptions matching multiple channels have no cross-channel ordering guarantees.
Handler Errors
If your message handler throws an error, it will be caught. The subscription remains active and continues processing subsequent messages.
Lifecycle Management
The PubSub class does not manage the Redis client lifecycle. You provide the clients when creating the instance and are responsible for closing them when done:
const subscriber = new Bun.RedisClient("redis://localhost:6379");
const publisher = new Bun.RedisClient("redis://localhost:6379");
const pubsub = new PubSub({ subscriber, publisher, channel: "events" });
// Use pubsub...
// Clean up
await pubsub.unsubscribe();
await subscriber.quit();
await publisher.quit();
Subscriber Mode and Publish
A Redis connection in subscriber mode (after SUBSCRIBE) cannot run PUBLISH. Use two connections (subscriber for subscribe/unsubscribe, publisher for publish) when the same PubSub instance both subscribes and publishes. The same client can be passed for both when only subscribing or only publishing.
JSON Serialization
Messages are automatically serialized to JSON when published and deserialized when received. This ensures type safety but means only JSON-serializable values can be sent. Attempting to publish circular references or other non-serializable values will return a SerializationError.
One Handler Per Instance
Each PubSub instance supports a single message handler. If you need multiple handlers for the same channel, create multiple PubSub instances.
When to Use Redis Streams
If you need message acknowledgment, guaranteed delivery, message history, or consumer groups, use Redis Streams instead of pub/sub. PubSub is best for real-time, fire-and-forget messaging where occasional message loss is acceptable.