Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt
Use this file to discover all available pages before exploring further.
The request/response pattern allows you to enqueue a job and wait for its result, turning asynchronous job processing into a synchronous operation. This is useful for scenarios where you need an immediate response, such as API requests or user-facing operations.
enqueueAndWait()
The enqueueAndWait() method enqueues a job and blocks until it completes or times out:
import { Queue, RedisStorage } from '@platformatic/job-queue'
const queue = new Queue<{ url: string }, { status: number; body: string }>({
storage: new RedisStorage({ url: 'redis://localhost:6379' })
})
try {
const result = await queue.enqueueAndWait('fetch-1', { url: 'https://api.example.com' }, {
timeout: 30000, // 30 seconds
maxAttempts: 3,
resultTTL: 300000 // Cache result for 5 minutes
})
console.log('Response:', result.status, result.body)
} catch (error) {
if (error instanceof TimeoutError) {
console.error('Request timed out after 30s')
} else if (error instanceof JobFailedError) {
console.error('Job failed:', error.originalError)
}
}
How It Works
Subscription Before Enqueue
To avoid race conditions, enqueueAndWait() subscribes to job notifications before enqueueing the job:
Producer implementation (from src/producer.ts:93):
async enqueueAndWait(id: string, payload: TPayload, options?: EnqueueAndWaitOptions): Promise<TResult> {
const timeout = options?.timeout ?? 30000
// 1. Subscribe BEFORE enqueue to avoid race conditions
const { promise: resultPromise, resolve: resolveResult, reject: rejectResult } = Promise.withResolvers<TResult>()
const unsubscribe = await this.#storage.subscribeToJob(id, async status => {
if (status === 'completed') {
const result = await this.getResult(id)
if (result !== null) {
resolveResult(result)
}
} else if (status === 'failed') {
const error = await this.#storage.getError(id)
const errorMessage = error ? error.toString() : 'Job failed'
rejectResult(new JobFailedError(id, errorMessage))
}
})
// 2. Now enqueue the job
const enqueueResult = await this.enqueue(id, payload, options)
// 3. Handle immediate completion (cached result)
if (enqueueResult.status === 'completed') {
await unsubscribe()
return enqueueResult.result
}
// 4. Wait for result with timeout
const { promise: timeoutPromise, reject: rejectTimeout } = Promise.withResolvers<never>()
const timeoutId = setTimeout(() => {
rejectTimeout(new TimeoutError(id, timeout))
}, timeout)
try {
return await Promise.race([resultPromise, timeoutPromise])
} finally {
clearTimeout(timeoutId)
await unsubscribe()
}
}
Notification Mechanism
Each storage backend implements job notifications differently:
RedisStorage uses Redis Pub/Sub:
// Subscribe to job-specific channel
await subscriber.subscribe(`jq:notify:${jobId}`)
// Worker publishes completion
await redis.publish(`jq:notify:${jobId}`, 'completed')
MemoryStorage uses EventEmitter:
this.#notifyEmitter.on(`notify:${id}`, handler)
this.#notifyEmitter.emit(`notify:${id}`, status)
FileStorage uses filesystem watchers:
// Write notification file
await writeFile(`notify/${id}-${timestamp}.notify`, `${id}:completed`)
// Watcher picks it up
for await (const event of watch(notifyPath)) {
// Read and emit notification
}
Timeout Handling
The timeout option specifies how long to wait for the job to complete:
const result = await queue.enqueueAndWait('job-123', payload, {
timeout: 10000 // 10 seconds
})
If the job doesn’t complete within the timeout, a TimeoutError is thrown:
import { TimeoutError } from '@platformatic/job-queue'
try {
await queue.enqueueAndWait('slow-job', payload, { timeout: 5000 })
} catch (error) {
if (error instanceof TimeoutError) {
console.log(`Job ${error.jobId} timed out after ${error.timeoutMs}ms`)
// Job continues processing in the background
}
}
Timeout does not cancel the job. The job continues processing in the background. Only the caller stops waiting.
Error Handling
Two types of errors can be thrown by enqueueAndWait():
TimeoutError
Thrown when the job doesn’t complete within the timeout period.
class TimeoutError extends Error {
constructor(public jobId: string, public timeoutMs: number) {
super(`Job ${jobId} timed out after ${timeoutMs}ms`)
}
}
JobFailedError
Thrown when the job fails after all retry attempts.
class JobFailedError extends Error {
constructor(public jobId: string, public originalError: string) {
super(`Job ${jobId} failed: ${originalError}`)
}
}
Example error handling:
import { TimeoutError, JobFailedError } from '@platformatic/job-queue'
try {
const result = await queue.enqueueAndWait('api-request', { url }, { timeout: 10000 })
return { success: true, data: result }
} catch (error) {
if (error instanceof TimeoutError) {
return { success: false, error: 'TIMEOUT', message: 'Request took too long' }
} else if (error instanceof JobFailedError) {
return { success: false, error: 'FAILED', message: error.originalError }
} else {
return { success: false, error: 'UNKNOWN', message: error.message }
}
}
Cached Results
If a job with the same ID was already completed and its result is still cached, enqueueAndWait() returns immediately:
// First request - job runs
const result1 = await queue.enqueueAndWait('calc-123', { x: 5, y: 10 }, {
timeout: 30000,
resultTTL: 3600000 // Cache for 1 hour
})
// Takes 2 seconds to compute
// Second request - cached result returned immediately
const result2 = await queue.enqueueAndWait('calc-123', { x: 5, y: 10 }, {
timeout: 30000
})
// Returns instantly with cached result
See Deduplication for more details on result caching.
Use Cases
API Gateway Pattern
Use the queue to offload heavy processing from API handlers:
import Fastify from 'fastify'
import { Queue, RedisStorage } from '@platformatic/job-queue'
const app = Fastify()
const queue = new Queue({
storage: new RedisStorage({ url: process.env.REDIS_URL })
})
app.post('/api/process', async (request, reply) => {
const { id, data } = request.body
try {
const result = await queue.enqueueAndWait(id, data, {
timeout: 30000,
resultTTL: 300000
})
return { success: true, result }
} catch (error) {
if (error instanceof TimeoutError) {
return reply.code(504).send({ error: 'Processing timeout' })
}
return reply.code(500).send({ error: 'Processing failed' })
}
})
Synchronous RPC Over Queue
Implement RPC-style communication between services:
// Service A (caller)
const response = await queue.enqueueAndWait('rpc:getUserProfile', { userId: 123 }, {
timeout: 5000
})
console.log('User profile:', response)
// Service B (worker)
queue.execute(async (job) => {
if (job.id.startsWith('rpc:getUserProfile')) {
const { userId } = job.payload
return await database.getUser(userId)
}
})
Background Job with Frontend Waiting
Start a background job from the frontend and poll for results:
// Backend endpoint
app.post('/api/generate-report', async (request, reply) => {
const reportId = `report-${Date.now()}`
// Start job in background (don't wait)
queue.enqueue(reportId, request.body, {
maxAttempts: 3,
resultTTL: 3600000 // Keep result for 1 hour
})
return { reportId, status: 'queued' }
})
app.get('/api/generate-report/:id', async (request, reply) => {
const { id } = request.params
// Wait up to 5 seconds for result
try {
const result = await queue.enqueueAndWait(id, {}, { timeout: 5000 })
return { status: 'completed', result }
} catch (error) {
if (error instanceof TimeoutError) {
return { status: 'pending' }
}
return reply.code(500).send({ status: 'failed', error: error.message })
}
})
Fire-and-Forget vs Request/Response
Choose between enqueue() and enqueueAndWait() based on your needs:
Use enqueue() when:
- ✅ You don’t need the result immediately
- ✅ The operation is time-consuming (> 30 seconds)
- ✅ You want to return quickly to the caller
- ✅ Failures can be handled asynchronously (via events or retries)
- ✅ You’re building background processing systems
// Fire-and-forget: return immediately
app.post('/api/send-email', async (request, reply) => {
await queue.enqueue(`email-${Date.now()}`, request.body)
return { status: 'queued' }
})
Use enqueueAndWait() when:
- ✅ You need the result to return to the user
- ✅ The operation is reasonably fast (< 30 seconds)
- ✅ You want to reuse existing job deduplication
- ✅ You’re implementing RPC-style communication
- ✅ Failures should be reported immediately
// Request/response: wait for result
app.post('/api/calculate', async (request, reply) => {
const result = await queue.enqueueAndWait(`calc-${Date.now()}`, request.body, {
timeout: 10000
})
return { result }
})
Latency Overhead
The request/response pattern adds latency compared to direct execution:
Direct execution: ~2ms
MemoryStorage: ~5ms (+3ms overhead)
RedisStorage (localhost): ~8ms (+6ms overhead)
RedisStorage (network): ~15ms (+13ms overhead)
The overhead comes from:
- Pub/sub subscription setup
- Message serialization/deserialization
- Network round trips (for Redis)
Connection Pooling
For high-throughput scenarios, reuse Queue instances:
// ❌ Creates new connections on every request
app.post('/api/process', async (request, reply) => {
const queue = new Queue({ storage: new RedisStorage() })
await queue.start()
const result = await queue.enqueueAndWait(id, data)
await queue.stop()
return result
})
// ✅ Reuses a single Queue instance
const queue = new Queue({ storage: new RedisStorage() })
await queue.start()
app.post('/api/process', async (request, reply) => {
const result = await queue.enqueueAndWait(id, data)
return result
})
Timeout Tuning
Set timeouts based on your SLA and worker capacity:
// Fast operations (API calls, cache lookups)
await queue.enqueueAndWait(id, data, { timeout: 5000 })
// Medium operations (database queries, file processing)
await queue.enqueueAndWait(id, data, { timeout: 30000 })
// Slow operations (report generation, video encoding)
await queue.enqueueAndWait(id, data, { timeout: 300000 }) // 5 minutes
If operations regularly exceed the timeout, consider switching to fire-and-forget with polling or webhooks.
Duplicate Handling
When using enqueueAndWait() with duplicate job IDs:
// First call - job runs
const promise1 = queue.enqueueAndWait('job-123', payload, { timeout: 10000 })
// Second call (before first completes) - both wait for same job
const promise2 = queue.enqueueAndWait('job-123', payload, { timeout: 10000 })
// Both resolve with the same result when the job completes
const [result1, result2] = await Promise.all([promise1, promise2])
// result1 === result2
If the job already completed and the result is cached:
// Job completed 5 minutes ago, result still cached
const result = await queue.enqueueAndWait('job-123', payload, { timeout: 10000 })
// Returns immediately with cached result (no subscription needed)
See Deduplication for more details.
Best Practices
Set reasonable timeouts: Don’t make users wait forever. Use 5-30 second timeouts for user-facing requests.
Use content-addressed IDs: For idempotent operations, hash the input to get consistent job IDs:const jobId = createHash('sha256').update(JSON.stringify(payload)).digest('hex')
await queue.enqueueAndWait(jobId, payload)
Cache aggressively: Set long resultTTL for deterministic operations to reduce load:await queue.enqueueAndWait(jobId, payload, {
resultTTL: 24 * 60 * 60 * 1000 // 24 hours for pure functions
})
Don’t use for long-running jobs: If a job takes > 30 seconds, use fire-and-forget with polling or webhooks instead.
Handle timeout errors gracefully: Remember that a timeout doesn’t cancel the job - it continues running in the background.