Skip to main content
BioAgents uses BullMQ for reliable background job processing. This enables horizontal scaling, job persistence, automatic retries, and real-time progress notifications.

Architecture Overview

Data Flow

  1. Request - Client sends HTTP request to API server
  2. Enqueue - API creates job in Redis queue, returns job ID immediately
  3. Process - Worker picks up job, executes AI workflow
  4. Notify - Worker publishes progress via Redis Pub/Sub
  5. Broadcast - API receives notification, broadcasts to WebSocket clients
  6. Fetch - Client receives notification, fetches results via HTTP
This is the “Notify + Fetch” pattern - notifications are lightweight (just IDs), actual data is fetched via REST API.

Queue Definitions

BioAgents uses separate queues for different job types:

Chat Queue

For /api/chat requests - conversational AI interactions.
export function getChatQueue(): Queue<ChatJobData, ChatJobResult> {
  return new Queue<ChatJobData, ChatJobResult>("chat", {
    connection: getBullMQConnection(),
    defaultJobOptions: {
      // Retry configuration
      attempts: 3,
      backoff: {
        type: "exponential",
        delay: 1000, // 1s → 2s → 4s
      },
      // Job cleanup
      removeOnComplete: {
        age: 3600, // Keep for 1 hour
        count: 1000,
      },
      removeOnFail: {
        age: 86400, // Keep failed for 24 hours
      },
    },
  });
}
Settings:
  • Concurrency: 5 (configurable via CHAT_QUEUE_CONCURRENCY)
  • Retry: 3 attempts with exponential backoff
  • Timeout: None (handled by worker lockDuration)
  • Retention: 1 hour (completed), 24 hours (failed)

Deep Research Queue

For /api/deep-research/start requests - multi-iteration research workflows.
export function getDeepResearchQueue(): Queue<DeepResearchJobData, DeepResearchJobResult> {
  return new Queue<DeepResearchJobData, DeepResearchJobResult>("deep-research", {
    connection: getBullMQConnection(),
    defaultJobOptions: {
      // Fewer retries for long jobs
      attempts: 2,
      backoff: {
        type: "exponential",
        delay: 5000, // 5s → 10s
      },
      // No timeout - jobs can run 20-30+ minutes
      removeOnComplete: {
        age: 86400, // Keep for 24 hours
        count: 500,
      },
      removeOnFail: {
        age: 604800, // Keep failed for 7 days
      },
    },
  });
}
Settings:
  • Concurrency: 3 (configurable via DEEP_RESEARCH_QUEUE_CONCURRENCY)
  • Retry: 2 attempts with longer backoff
  • Timeout: None (can run indefinitely)
  • Retention: 24 hours (completed), 7 days (failed)
Deep research jobs can run for 20-30+ minutes and consume 500MB-1GB of memory per job. Size worker resources accordingly.

Paper Generation Queue

For LaTeX paper generation from research conversations. Settings:
  • Concurrency: 1 (configurable via PAPER_GENERATION_CONCURRENCY)
  • Retry: No retries (internal fallback strategies)
  • Timeout: None (compilation can take 5-15+ minutes)
  • Retention: 24 hours (completed), 7 days (failed)

File Process Queue

For processing uploaded files (PDFs, images, etc.). Settings:
  • Concurrency: 5 (auto-configured)
  • Retry: 3 attempts
  • Timeout: 2 minutes
  • Retention: 1 hour (completed), 24 hours (failed)

Redis Connection

BioAgents uses ioredis for Redis connections:
import Redis from "ioredis";

// BullMQ connection (for queues and workers)
export function getBullMQConnection(): Redis {
  const redisUrl = process.env.REDIS_URL || "redis://localhost:6379";
  
  return new Redis(redisUrl, {
    maxRetriesPerRequest: null, // Required for BullMQ
    retryStrategy: (times) => {
      if (times > 10) return null;
      return Math.min(times * 200, 5000);
    },
    reconnectOnError: (err) => {
      const targetErrors = ["READONLY", "ECONNRESET", "ETIMEDOUT"];
      return targetErrors.some((e) => err.message.includes(e));
    },
  });
}

// Publisher (for notifications)
export function getPublisher(): Redis {
  return new Redis(process.env.REDIS_URL);
}

// Subscriber (for notifications)
export function getSubscriber(): Redis {
  return new Redis(process.env.REDIS_URL);
}
Pub/Sub requires separate connections for publisher and subscriber - this is a Redis limitation.

Worker Implementation

Workers run in separate processes and poll Redis for jobs:
import { Worker, Job } from "bullmq";
import { getBullMQConnection } from "../connection";

async function processChatJob(
  job: Job<ChatJobData, ChatJobResult>
): Promise<ChatJobResult> {
  const { userId, conversationId, messageId, message } = job.data;
  
  // Notify job started
  await notifyJobStarted(job.id!, conversationId, messageId);
  
  try {
    // Process message with AI agent
    const result = await processChatMessage({
      userId,
      conversationId,
      messageId,
      message,
    });
    
    // Notify completion
    await notifyJobCompleted(job.id!, conversationId, messageId);
    
    return result;
  } catch (error) {
    await notifyJobFailed(job.id!, conversationId, messageId, error);
    throw error;
  }
}

// Create worker
const worker = new Worker("chat", processChatJob, {
  connection: getBullMQConnection(),
  concurrency: parseInt(process.env.CHAT_QUEUE_CONCURRENCY || "5"),
  lockDuration: 180000, // 3 minutes
});

worker.on("completed", (job) => {
  logger.info({ jobId: job.id }, "chat_job_completed");
});

worker.on("failed", (job, err) => {
  logger.error({ jobId: job?.id, err }, "chat_job_failed");
});

Worker Process

Run workers as separate processes:
# Development (with hot reload)
bun run worker:dev

# Production
bun run worker

# Docker
docker compose up -d worker

WebSocket Notifications

Real-time progress updates via Redis Pub/Sub:

Connection

const ws = new WebSocket('wss://api.example.com/api/ws?token=<jwt>');

ws.onopen = () => {
  // Subscribe to conversation
  ws.send(JSON.stringify({
    action: 'subscribe',
    conversationId: '<conversation-id>'
  }));
};

Notification Types

job:started
notification
Job processing began
{
  "type": "job:started",
  "jobId": "123",
  "conversationId": "abc",
  "messageId": "msg-789"
}
job:progress
notification
Progress update
{
  "type": "job:progress",
  "jobId": "123",
  "conversationId": "abc",
  "progress": {
    "stage": "literature_search",
    "percent": 45
  }
}
job:completed
notification
Job finished successfully
{
  "type": "job:completed",
  "jobId": "123",
  "conversationId": "abc",
  "messageId": "msg-789"
}
job:failed
notification
Job failed after retries
{
  "type": "job:failed",
  "jobId": "123",
  "conversationId": "abc",
  "messageId": "msg-789",
  "error": "LLM API error"
}

Client Implementation

ws.onmessage = async (event) => {
  const notification = JSON.parse(event.data);
  
  switch (notification.type) {
    case 'job:progress':
      updateProgressBar(notification.progress.percent);
      break;
      
    case 'job:completed':
      // Fetch the actual message content
      const response = await fetch(`/api/messages/${notification.messageId}`);
      const message = await response.json();
      displayMessage(message);
      break;
      
    case 'job:failed':
      showError('Processing failed');
      break;
  }
};

Monitoring

Bull Board Dashboard

Access the admin UI at /admin/queues when queue mode is enabled:
Bull Board Dashboard
Features:
  • View queue status and job counts
  • Inspect job data and results
  • Retry failed jobs manually
  • Pause/resume queues
  • View job logs and stack traces

Health Check

curl http://localhost:3000/api/health
Response:
{
  "status": "ok",
  "timestamp": "2024-01-15T10:30:00.000Z",
  "jobQueue": {
    "enabled": true,
    "redis": "connected"
  }
}

Queue Metrics

Query queue metrics programmatically:
curl http://localhost:3000/admin/queues/api/queues
{
  "queues": [
    {
      "name": "chat",
      "counts": {
        "active": 2,
        "waiting": 5,
        "completed": 150,
        "failed": 3
      }
    },
    {
      "name": "deep-research",
      "counts": {
        "active": 1,
        "waiting": 0,
        "completed": 25,
        "failed": 1
      }
    }
  ]
}

Configuration

Environment Variables

USE_JOB_QUEUE
boolean
default:"false"
required
Enable BullMQ job queue (required for queue mode)
REDIS_URL
string
required
Redis connection URL (e.g., redis://localhost:6379)
CHAT_QUEUE_CONCURRENCY
number
default:"5"
Number of parallel chat jobs per worker
DEEP_RESEARCH_QUEUE_CONCURRENCY
number
default:"3"
Number of parallel research jobs per worker
PAPER_GENERATION_CONCURRENCY
number
default:"1"
Number of parallel paper generation jobs per worker

Redis Memory Configuration

# In docker-compose.yml
redis:
  image: redis:7-alpine
  command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy noeviction
Memory Guidelines:
ScenarioMemoryConcurrent Jobs
Development256MB~50
Light Production512MB~100
Medium Production1GB~500
Heavy Production2GB+1000+
Deep research jobs have larger payloads (literature results, analysis data) and consume more memory per job.

Troubleshooting

Jobs Stuck in “waiting”

Cause: No workers are running. Fix:
USE_JOB_QUEUE=true bun run worker

Jobs Stuck in “active”

Cause: Worker crashed mid-job. Fix: BullMQ automatically marks stalled jobs after stalledInterval (30s default). Or manually retry via Bull Board.

Redis Connection Errors

Cause: Redis not running or network issues. Fix:
# Check Redis is running
redis-cli ping  # Should return PONG

# Check connection URL
echo $REDIS_URL

# Test connection
redis-cli -u $REDIS_URL ping

High Memory Usage

Cause: Too many retained jobs or large payloads. Fix: Adjust retention settings in src/services/queue/queues.ts:
removeOnComplete: {
  age: 1800,   // 30 minutes instead of 1 hour
  count: 500,  // Keep fewer jobs
}

Duplicate Job Processing

Cause: Network partition caused BullMQ to think job was stalled. Fix: Increase lockDuration in worker:
const worker = new Worker('chat', processor, {
  lockDuration: 60000,  // 60 seconds instead of 30
});

Best Practices

  • Use queue mode for production deployments
  • Configure appropriate concurrency based on resources
  • Set stop_grace_period to allow long-running jobs to complete
  • Enable Redis persistence with appendonly yes
  • Monitor queue depth and scale workers accordingly
  • Set up alerts for failed jobs
  • Use managed Redis (Upstash, ElastiCache) for high availability
  • Configure job retention to prevent memory bloat
  • Test graceful shutdown behavior
  • Implement idempotent job handlers

Next Steps

Docker Deployment

Learn how to deploy with docker-compose

Horizontal Scaling

Scale workers across multiple servers for high throughput

Build docs developers (and LLMs) love