BioAgents uses BullMQ for reliable background job processing. This enables horizontal scaling, job persistence, automatic retries, and real-time progress notifications.
Architecture Overview
Data Flow
- Request - Client sends HTTP request to API server
- Enqueue - API creates job in Redis queue, returns job ID immediately
- Process - Worker picks up job, executes AI workflow
- Notify - Worker publishes progress via Redis Pub/Sub
- Broadcast - API receives notification, broadcasts to WebSocket clients
- Fetch - Client receives notification, fetches results via HTTP
This is the “Notify + Fetch” pattern - notifications are lightweight (just IDs), actual data is fetched via REST API.
Queue Definitions
BioAgents uses separate queues for different job types:
Chat Queue
For /api/chat requests - conversational AI interactions.
export function getChatQueue(): Queue<ChatJobData, ChatJobResult> {
return new Queue<ChatJobData, ChatJobResult>("chat", {
connection: getBullMQConnection(),
defaultJobOptions: {
// Retry configuration
attempts: 3,
backoff: {
type: "exponential",
delay: 1000, // 1s → 2s → 4s
},
// Job cleanup
removeOnComplete: {
age: 3600, // Keep for 1 hour
count: 1000,
},
removeOnFail: {
age: 86400, // Keep failed for 24 hours
},
},
});
}
Settings:
- Concurrency: 5 (configurable via
CHAT_QUEUE_CONCURRENCY)
- Retry: 3 attempts with exponential backoff
- Timeout: None (handled by worker
lockDuration)
- Retention: 1 hour (completed), 24 hours (failed)
Deep Research Queue
For /api/deep-research/start requests - multi-iteration research workflows.
export function getDeepResearchQueue(): Queue<DeepResearchJobData, DeepResearchJobResult> {
return new Queue<DeepResearchJobData, DeepResearchJobResult>("deep-research", {
connection: getBullMQConnection(),
defaultJobOptions: {
// Fewer retries for long jobs
attempts: 2,
backoff: {
type: "exponential",
delay: 5000, // 5s → 10s
},
// No timeout - jobs can run 20-30+ minutes
removeOnComplete: {
age: 86400, // Keep for 24 hours
count: 500,
},
removeOnFail: {
age: 604800, // Keep failed for 7 days
},
},
});
}
Settings:
- Concurrency: 3 (configurable via
DEEP_RESEARCH_QUEUE_CONCURRENCY)
- Retry: 2 attempts with longer backoff
- Timeout: None (can run indefinitely)
- Retention: 24 hours (completed), 7 days (failed)
Deep research jobs can run for 20-30+ minutes and consume 500MB-1GB of memory per job. Size worker resources accordingly.
Paper Generation Queue
For LaTeX paper generation from research conversations.
Settings:
- Concurrency: 1 (configurable via
PAPER_GENERATION_CONCURRENCY)
- Retry: No retries (internal fallback strategies)
- Timeout: None (compilation can take 5-15+ minutes)
- Retention: 24 hours (completed), 7 days (failed)
File Process Queue
For processing uploaded files (PDFs, images, etc.).
Settings:
- Concurrency: 5 (auto-configured)
- Retry: 3 attempts
- Timeout: 2 minutes
- Retention: 1 hour (completed), 24 hours (failed)
Redis Connection
BioAgents uses ioredis for Redis connections:
import Redis from "ioredis";
// BullMQ connection (for queues and workers)
export function getBullMQConnection(): Redis {
const redisUrl = process.env.REDIS_URL || "redis://localhost:6379";
return new Redis(redisUrl, {
maxRetriesPerRequest: null, // Required for BullMQ
retryStrategy: (times) => {
if (times > 10) return null;
return Math.min(times * 200, 5000);
},
reconnectOnError: (err) => {
const targetErrors = ["READONLY", "ECONNRESET", "ETIMEDOUT"];
return targetErrors.some((e) => err.message.includes(e));
},
});
}
// Publisher (for notifications)
export function getPublisher(): Redis {
return new Redis(process.env.REDIS_URL);
}
// Subscriber (for notifications)
export function getSubscriber(): Redis {
return new Redis(process.env.REDIS_URL);
}
Pub/Sub requires separate connections for publisher and subscriber - this is a Redis limitation.
Worker Implementation
Workers run in separate processes and poll Redis for jobs:
import { Worker, Job } from "bullmq";
import { getBullMQConnection } from "../connection";
async function processChatJob(
job: Job<ChatJobData, ChatJobResult>
): Promise<ChatJobResult> {
const { userId, conversationId, messageId, message } = job.data;
// Notify job started
await notifyJobStarted(job.id!, conversationId, messageId);
try {
// Process message with AI agent
const result = await processChatMessage({
userId,
conversationId,
messageId,
message,
});
// Notify completion
await notifyJobCompleted(job.id!, conversationId, messageId);
return result;
} catch (error) {
await notifyJobFailed(job.id!, conversationId, messageId, error);
throw error;
}
}
// Create worker
const worker = new Worker("chat", processChatJob, {
connection: getBullMQConnection(),
concurrency: parseInt(process.env.CHAT_QUEUE_CONCURRENCY || "5"),
lockDuration: 180000, // 3 minutes
});
worker.on("completed", (job) => {
logger.info({ jobId: job.id }, "chat_job_completed");
});
worker.on("failed", (job, err) => {
logger.error({ jobId: job?.id, err }, "chat_job_failed");
});
Worker Process
Run workers as separate processes:
# Development (with hot reload)
bun run worker:dev
# Production
bun run worker
# Docker
docker compose up -d worker
WebSocket Notifications
Real-time progress updates via Redis Pub/Sub:
Connection
const ws = new WebSocket('wss://api.example.com/api/ws?token=<jwt>');
ws.onopen = () => {
// Subscribe to conversation
ws.send(JSON.stringify({
action: 'subscribe',
conversationId: '<conversation-id>'
}));
};
Notification Types
Job processing began{
"type": "job:started",
"jobId": "123",
"conversationId": "abc",
"messageId": "msg-789"
}
Progress update{
"type": "job:progress",
"jobId": "123",
"conversationId": "abc",
"progress": {
"stage": "literature_search",
"percent": 45
}
}
Job finished successfully{
"type": "job:completed",
"jobId": "123",
"conversationId": "abc",
"messageId": "msg-789"
}
Job failed after retries{
"type": "job:failed",
"jobId": "123",
"conversationId": "abc",
"messageId": "msg-789",
"error": "LLM API error"
}
Client Implementation
ws.onmessage = async (event) => {
const notification = JSON.parse(event.data);
switch (notification.type) {
case 'job:progress':
updateProgressBar(notification.progress.percent);
break;
case 'job:completed':
// Fetch the actual message content
const response = await fetch(`/api/messages/${notification.messageId}`);
const message = await response.json();
displayMessage(message);
break;
case 'job:failed':
showError('Processing failed');
break;
}
};
Monitoring
Bull Board Dashboard
Access the admin UI at /admin/queues when queue mode is enabled:
Features:
- View queue status and job counts
- Inspect job data and results
- Retry failed jobs manually
- Pause/resume queues
- View job logs and stack traces
Health Check
curl http://localhost:3000/api/health
Response:
{
"status": "ok",
"timestamp": "2024-01-15T10:30:00.000Z",
"jobQueue": {
"enabled": true,
"redis": "connected"
}
}
Queue Metrics
Query queue metrics programmatically:
curl http://localhost:3000/admin/queues/api/queues
{
"queues": [
{
"name": "chat",
"counts": {
"active": 2,
"waiting": 5,
"completed": 150,
"failed": 3
}
},
{
"name": "deep-research",
"counts": {
"active": 1,
"waiting": 0,
"completed": 25,
"failed": 1
}
}
]
}
Configuration
Environment Variables
USE_JOB_QUEUE
boolean
default:"false"
required
Enable BullMQ job queue (required for queue mode)
Redis connection URL (e.g., redis://localhost:6379)
Number of parallel chat jobs per worker
DEEP_RESEARCH_QUEUE_CONCURRENCY
Number of parallel research jobs per worker
PAPER_GENERATION_CONCURRENCY
Number of parallel paper generation jobs per worker
Redis Memory Configuration
# In docker-compose.yml
redis:
image: redis:7-alpine
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy noeviction
Memory Guidelines:
| Scenario | Memory | Concurrent Jobs |
|---|
| Development | 256MB | ~50 |
| Light Production | 512MB | ~100 |
| Medium Production | 1GB | ~500 |
| Heavy Production | 2GB+ | 1000+ |
Deep research jobs have larger payloads (literature results, analysis data) and consume more memory per job.
Troubleshooting
Jobs Stuck in “waiting”
Cause: No workers are running.
Fix:
USE_JOB_QUEUE=true bun run worker
Jobs Stuck in “active”
Cause: Worker crashed mid-job.
Fix: BullMQ automatically marks stalled jobs after stalledInterval (30s default). Or manually retry via Bull Board.
Redis Connection Errors
Cause: Redis not running or network issues.
Fix:
# Check Redis is running
redis-cli ping # Should return PONG
# Check connection URL
echo $REDIS_URL
# Test connection
redis-cli -u $REDIS_URL ping
High Memory Usage
Cause: Too many retained jobs or large payloads.
Fix: Adjust retention settings in src/services/queue/queues.ts:
removeOnComplete: {
age: 1800, // 30 minutes instead of 1 hour
count: 500, // Keep fewer jobs
}
Duplicate Job Processing
Cause: Network partition caused BullMQ to think job was stalled.
Fix: Increase lockDuration in worker:
const worker = new Worker('chat', processor, {
lockDuration: 60000, // 60 seconds instead of 30
});
Best Practices
Next Steps
Docker Deployment
Learn how to deploy with docker-compose
Horizontal Scaling
Scale workers across multiple servers for high throughput