Skip to main content
The @repo/streams package provides a Redis streams wrapper for managing the uptime check job queue.

Overview

This package exports:
  • xAddBulk() - Publish website check jobs to stream
  • xReadGroup() - Read fresh messages from stream
  • xAutoClaimStale() - Reclaim stale messages from PEL
  • xAckBulk() - Acknowledge processed messages
  • xPendingInfo() - Get pending entry list (PEL) status
  • xForceAckStalePel() - Force-clear stuck messages
  • ensureConsumerGroup() - Create consumer group if not exists

Configuration

Required environment variables:
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USERNAME=default
REDIS_PASSWORD=
STREAM_NAME=website_checks

Architecture

Better Uptime uses Redis Streams with consumer groups for reliable job processing:
Publisher → Redis Stream → Worker (Consumer Group)

         Pending Entry List (PEL)

           Auto-Claim

Publishing Messages

Bulk Add

From the publisher app:
apps/publisher/src/index.ts
import { xAddBulk } from "@repo/streams";

// Fetch active websites from database
const websites = await prismaClient.website.findMany({
  where: { isActive: true },
  select: { url: true, id: true },
});

// Publish all websites to stream
await xAddBulk(websites.map(w => ({ 
  url: w.url, 
  id: w.id 
})));
The function batches messages in chunks of 250:
packages/streams/src/index.ts
export async function xAddBulk(websites: WebsiteEvent[]) {
  const batchSize = 250;
  
  for (let i = 0; i < websites.length; i += batchSize) {
    const batch = websites.slice(i, i + batchSize);
    
    const multi = client.multi();
    for (const website of batch) {
      multi.xAdd(STREAM_NAME, "*", { 
        url: website.url, 
        id: website.id 
      });
    }
    await multi.exec();
    
    // Trim stream to ~8000 messages
    await client.sendCommand([
      "XTRIM", STREAM_NAME, "MAXLEN", "~", "8000"
    ]);
  }
}

Consuming Messages

Read Fresh Messages

From the worker app:
apps/worker/src/index.ts
import { xReadGroup } from "@repo/streams";

const messages = await xReadGroup({
  consumerGroup: REGION_ID,  // e.g., "iad"
  workerId: WORKER_ID,        // e.g., "worker-1"
});

// Returns: MessageType[]
// [
//   {
//     id: "1234567890-0",
//     event: {
//       url: "https://example.com",
//       id: "website_123"
//     }
//   },
//   ...
// ]
The function uses Redis XREADGROUP with server-side blocking:
packages/streams/src/index.ts
const response = await client.xReadGroup(
  consumerGroup,
  workerId,
  {
    key: STREAM_NAME,
    id: ">",  // Only new messages
  },
  {
    COUNT: 5,      // Batch size
    BLOCK: 1000,   // Block for 1 second if no messages
  }
);

Consumer Group Pattern

The worker creates a consumer group on startup:
packages/streams/src/index.ts
export async function ensureConsumerGroup(
  consumerGroup: string
): Promise<void> {
  try {
    await client.xGroupCreate(
      STREAM_NAME, 
      consumerGroup, 
      "0",  // Start from beginning
      { MKSTREAM: true }  // Create stream if not exists
    );
  } catch (error) {
    if (error.message.includes("BUSYGROUP")) {
      return; // Group already exists
    }
    throw error;
  }
}

Pending Entry List (PEL)

Auto-Claim Stale Messages

Messages that aren’t acknowledged are “idle” in the PEL. Workers reclaim them:
apps/worker/src/index.ts
const reclaimed = await xAutoClaimStale({
  consumerGroup: REGION_ID,
  workerId: WORKER_ID,
  minIdleMs: 300_000,      // 5 minutes
  count: 5,                // Small batch to prevent starving fresh work
  maxTotalReclaim: 10,     // Max 10 per cycle
});
The function uses Redis XAUTOCLAIM with cursor pagination:
packages/streams/src/index.ts
export async function xAutoClaimStale(
  options: AutoClaimOptions
): Promise<MessageType[]> {
  let startId = xAutoClaimCursors.get(options.consumerGroup) || "0-0";
  const allMessages: MessageType[] = [];
  
  while (allMessages.length < options.maxTotalReclaim) {
    const result = await client.xAutoClaim(
      STREAM_NAME,
      options.consumerGroup,
      options.workerId,
      options.minIdleMs,
      startId,
      { COUNT: options.count }
    );
    
    const { messages, nextId } = result;
    if (messages.length === 0) break;
    
    allMessages.push(...messages);
    startId = nextId;
  }
  
  return allMessages;
}

PEL Monitoring

Check PEL status:
packages/streams/src/index.ts
import { xPendingInfo } from "@repo/streams";

const pelInfo = await xPendingInfo("iad");

// Returns:
// {
//   pending: 42,
//   oldestIdleMs: 120000,  // 2 minutes
//   consumers: [
//     { name: "worker-1", pending: 30 },
//     { name: "worker-2", pending: 12 }
//   ]
// }
Real usage in worker:
apps/worker/src/index.ts
setInterval(async () => {
  const pelInfo = await xPendingInfo(REGION_ID);
  
  if (pelInfo.oldestIdleMs > 3_600_000) {  // 1 hour
    console.warn("PEL has messages idle >1 hour, force-clearing...");
    
    const cleared = await xForceAckStalePel({
      consumerGroup: REGION_ID,
      minIdleMs: 3_600_000,
      maxCount: 50,
    });
    
    console.warn(`Force-cleared ${cleared} stuck messages`);
  }
}, 5 * 60 * 1000); // Every 5 minutes

Acknowledging Messages

Bulk ACK

After processing messages:
apps/worker/src/index.ts
import { xAckBulk } from "@repo/streams";

// Process messages...
const processedIds = messages.map(m => m.id);

// Acknowledge all at once
await xAckBulk({
  consumerGroup: REGION_ID,
  eventIds: processedIds,
});
The function uses Promise.allSettled to handle partial failures:
packages/streams/src/index.ts
export async function xAckBulk(options: AckBulkOptions) {
  await Promise.allSettled(
    options.eventIds.map(eventId =>
      xAck({ 
        consumerGroup: options.consumerGroup, 
        streamId: eventId 
      })
    )
  );
}

Types

WebsiteEvent

packages/streams/src/index.ts
export interface WebsiteEvent {
  url: string;
  id: string;
}

MessageType

packages/streams/src/index.ts
type MessageType = {
  id: string;
  event: {
    url: string;
    id: string;
  };
};

ReadGroupOptions

packages/streams/src/index.ts
export interface ReadGroupOptions {
  consumerGroup: string;
  workerId: string;
}

AutoClaimOptions

packages/streams/src/index.ts
export interface AutoClaimOptions {
  consumerGroup: string;
  workerId: string;
  minIdleMs: number;
  count: number;
  maxTotalReclaim?: number;
}

PendingInfo

packages/streams/src/index.ts
export interface PendingInfo {
  pending: number;
  oldestIdleMs: number | null;
  consumers: Array<{
    name: string;
    pending: number;
  }>;
}

Error Handling

Timeout Protection

All Redis operations have timeouts:
packages/streams/src/index.ts
const XREADGROUP_TIMEOUT_MS = 5_000;
const XAUTOCLAIM_TIMEOUT_MS = 10_000;
const REDIS_COMMAND_TIMEOUT_MS = 10_000;

function withRedisTimeout<T>(
  promise: Promise<T>,
  timeoutMs: number,
  label: string
): Promise<T> {
  return Promise.race([
    promise,
    new Promise((_, reject) => 
      setTimeout(() => 
        reject(new Error(`[Redis] ${label} timed out after ${timeoutMs}ms`)), 
        timeoutMs
      )
    )
  ]);
}

Exponential Backoff

Failed operations use exponential backoff:
packages/streams/src/index.ts
const redisFailureState = new Map<string, {
  failures: number;
  lastFailureAt: number;
}>();

const MAX_BACKOFF_MS = 5000;
const BASE_BACKOFF_MS = 100;

const failureState = redisFailureState.get("xReadGroup");
if (failureState && failureState.failures > 0) {
  const backoffMs = Math.min(
    BASE_BACKOFF_MS * Math.pow(2, failureState.failures - 1),
    MAX_BACKOFF_MS
  );
  
  if (Date.now() - failureState.lastFailureAt < backoffMs) {
    return []; // Skip this cycle
  }
}

Malformed Message Handling

Messages missing required fields are auto-ACKed:
packages/streams/src/index.ts
const malformedMessageIds: string[] = [];
const validMessages: StreamMessage[] = [];

for (const streamMessage of response[0].messages) {
  if (streamMessage.message.url && streamMessage.message.id) {
    validMessages.push(streamMessage);
  } else {
    malformedMessageIds.push(streamMessage.id);
    console.warn(
      `Malformed message ${streamMessage.id}: missing url or id, will ACK to clear from PEL`
    );
  }
}

if (malformedMessageIds.length > 0) {
  await Promise.allSettled(
    malformedMessageIds.map(msgId =>
      client.xAck(STREAM_NAME, consumerGroup, msgId)
    )
  );
}

Connection Management

Redis client with automatic reconnection:
packages/streams/src/index.ts
const redisClient = createClient({
  username: REDIS_USERNAME,
  password: REDIS_PASSWORD,
  socket: {
    host: REDIS_HOST,
    port: Number(REDIS_PORT),
    keepAlive: true,
    keepAliveInitialDelay: 15_000,
    connectTimeout: 10_000,
    reconnectStrategy: (retries) => {
      const delay = Math.min(retries * 200, 30_000);
      if (retries % 10 === 0) {
        console.warn(
          `Redis: Reconnecting in ${delay}ms (attempt ${retries})`
        );
      }
      return delay;
    },
  },
  pingInterval: 30_000,
});

Location

packages/streams/src/index.ts

Build docs developers (and LLMs) love