Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt
Use this file to discover all available pages before exploring further.
Platformatic Job Queue uses job IDs and result caching to prevent duplicate job processing. When you enqueue a job with an ID that already exists, the queue returns the existing state or cached result instead of creating a duplicate.
How Deduplication Works
Every job has a unique identifier (the id parameter in enqueue()). The storage backend tracks job state in a jobs registry:
await queue.enqueue('email-123', { to: 'user@example.com' })
// Job added to queue
await queue.enqueue('email-123', { to: 'user@example.com' })
// Returns { status: 'duplicate', existingState: 'queued' }
Job states (from src/types.ts:21):
type MessageState = 'queued' | 'processing' | 'failing' | 'completed' | 'failed'
Atomic Duplicate Check
The enqueue operation atomically checks for duplicates and creates the job if it doesn’t exist:
Producer implementation (from src/producer.ts:51):
async enqueue(id: string, payload: TPayload, options?: EnqueueOptions): Promise<EnqueueResult<TResult>> {
const message: QueueMessage<TPayload> = {
id,
payload,
createdAt: timestamp,
attempts: 0,
maxAttempts: options?.maxAttempts ?? this.#maxRetries,
resultTTL: options?.resultTTL ?? this.#resultTTL
}
const serialized = this.#payloadSerde.serialize(message)
const existingState = await this.#storage.enqueue(id, serialized, timestamp)
if (existingState) {
const { status } = parseState(existingState)
if (status === 'completed') {
// Return cached result immediately
const result = await this.getResult(id)
if (result !== null) {
return { status: 'completed', result }
}
}
return { status: 'duplicate', existingState: status }
}
return { status: 'queued' }
}
Storage Backend Implementation
Each storage backend implements atomic duplicate checking differently:
RedisStorage (Lua script redis-scripts/enqueue.lua):
-- Check if job exists
local existing = redis.call('HGET', KEYS[1], ARGV[1])
if existing then
return existing -- Return existing state
end
-- Create job atomically
redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]) -- Set state
redis.call('RPUSH', KEYS[2], ARGV[2]) -- Add to queue
return false
MemoryStorage (from src/storage/memory.ts:68):
const existing = this.#jobs.get(id)
if (existing) return existing
this.#jobs.set(id, `queued:${timestamp}`)
this.#queue.push(message)
return null
FileStorage (from src/storage/file.ts:298):
try {
const existing = await readFile(jobFile, 'utf8')
return existing // Job already exists
} catch {
// Create job state atomically with fast-write-atomic
await this.#writeFileAtomic(jobFile, state)
}
Result Caching
When a job completes, its result is cached for a configurable TTL (time-to-live). Subsequent enqueues with the same ID return the cached result immediately.
const queue = new Queue({
storage,
resultTTL: 3600000 // Cache results for 1 hour (default)
})
// First enqueue - job runs
const result1 = await queue.enqueue('calc-123', { x: 5, y: 10 })
// { status: 'queued' }
// ... job completes with result { sum: 15 }
// Second enqueue - cached result returned
const result2 = await queue.enqueue('calc-123', { x: 5, y: 10 })
// { status: 'completed', result: { sum: 15 } }
Per-Job TTL Override
You can override the default resultTTL for specific jobs:
// Cache this result for 5 minutes instead of 1 hour
await queue.enqueue('short-lived-123', payload, {
resultTTL: 5 * 60 * 1000
})
// Cache this result for 24 hours
await queue.enqueueAndWait('long-lived-456', payload, {
resultTTL: 24 * 60 * 60 * 1000
})
The resultTTL specified at enqueue time is stored in the job message and used when the job completes. Subsequent duplicate enqueues use the TTL from the first accepted enqueue.
Error Caching
Failed jobs also cache their errors with the same TTL:
// First attempt - job fails after max retries
await queue.enqueue('failing-job', payload)
// ... job fails with MaxRetriesError
// Second attempt - cached error returned
const result = await queue.enqueue('failing-job', payload)
// { status: 'duplicate', existingState: 'failed' }
const status = await queue.getStatus('failing-job')
// {
// id: 'failing-job',
// state: 'failed',
// error: { message: '...' }
// }
Updating Result TTL
You can extend or reduce the TTL of cached results and errors using updateResultTTL():
// Extend TTL to 2 hours
const update = await queue.updateResultTTL('job-123', 2 * 60 * 60 * 1000)
if (update.status === 'updated') {
console.log('TTL updated successfully')
} else if (update.status === 'not_terminal') {
console.log('Job is still processing, cannot update TTL yet')
} else if (update.status === 'not_found') {
console.log('Job does not exist')
} else if (update.status === 'missing_payload') {
console.log('Job is terminal but result/error already expired')
}
Return type (from src/types.ts:47):
type UpdateResultTTLResult =
| { status: 'updated' }
| { status: 'not_found' }
| { status: 'not_terminal' } // Job is still queued/processing
| { status: 'missing_payload' } // Job is terminal but payload expired
Producer implementation (from src/producer.ts:199):
async updateResultTTL(id: string, ttlMs: number): Promise<UpdateResultTTLResult> {
const state = await this.#storage.getJobState(id)
if (!state) return { status: 'not_found' }
const { status } = parseState(state)
if (status !== 'completed' && status !== 'failed') {
return { status: 'not_terminal' }
}
if (status === 'completed') {
const existingResult = await this.#storage.getResult(id)
if (!existingResult) return { status: 'missing_payload' }
await this.#storage.setResult(id, existingResult, ttlMs)
return { status: 'updated' }
}
// Failed job
const existingError = await this.#storage.getError(id)
if (!existingError) return { status: 'missing_payload' }
await this.#storage.setError(id, existingError, ttlMs)
return { status: 'updated' }
}
Dynamic TTL with AfterExecution Hook
The afterExecution hook allows you to dynamically adjust TTL based on job results:
const queue = new Queue<{ url: string }, { body: string; cacheControl?: string }>({
storage,
resultTTL: 60000, // Default 1 minute
afterExecution: async (context) => {
if (context.status === 'completed') {
// Parse Cache-Control header from HTTP response
const cacheControl = context.result?.cacheControl
if (cacheControl) {
const maxAge = parseInt(cacheControl.match(/max-age=(\d+)/)?.[1] || '60')
context.ttl = maxAge * 1000 // Convert to milliseconds
}
}
if (context.status === 'failed') {
// Cache errors for shorter time
context.ttl = 5000 // 5 seconds
}
}
})
AfterExecutionContext (from src/types.ts:98):
interface AfterExecutionContext<TPayload, TResult> {
id: string
payload: TPayload
attempts: number
maxAttempts: number
createdAt: number
status: 'completed' | 'failed'
result?: TResult
error?: Error
ttl: number // Mutable: modify to change TTL
workerId: string
startedAt: number
finishedAt: number
durationMs: number
}
The hook runs after job execution but before persisting the terminal state, giving you a chance to modify the TTL.
Cache Invalidation
There is no explicit cache invalidation API. To “invalidate” a cached result:
Option 1: Use Different Job IDs
Include a version or timestamp in the job ID:
const jobId = `fetch-${url}-${Date.now()}`
await queue.enqueue(jobId, { url })
Option 2: Wait for TTL Expiry
Set a short TTL for results that may change:
await queue.enqueue('weather-nyc', { city: 'NYC' }, {
resultTTL: 5 * 60 * 1000 // 5 minutes
})
Option 3: Cancel and Re-enqueue
If the job is queued or failing (not completed), you can cancel and re-enqueue:
const cancelResult = await queue.cancel('job-123')
if (cancelResult.status === 'cancelled') {
await queue.enqueue('job-123', newPayload)
}
You cannot cancel jobs that are processing or completed. The cancel operation returns the current status.
Deduplication Patterns
Content-Addressed IDs
Use a hash of the payload as the job ID to deduplicate based on content:
import { createHash } from 'node:crypto'
function hashPayload(payload: unknown): string {
const json = JSON.stringify(payload)
return createHash('sha256').update(json).digest('hex')
}
const jobId = hashPayload({ url: 'https://api.example.com' })
await queue.enqueue(jobId, payload)
Idempotency Keys
Use client-provided idempotency keys:
app.post('/api/send-email', async (req, reply) => {
const idempotencyKey = req.headers['idempotency-key']
if (!idempotencyKey) {
return reply.code(400).send({ error: 'Idempotency-Key required' })
}
const result = await queue.enqueue(idempotencyKey, req.body)
if (result.status === 'completed') {
// Return cached result
return reply.send({ result: result.result, cached: true })
}
return reply.code(202).send({ id: idempotencyKey, status: result.status })
})
Entity-Based IDs
Use entity IDs with operation names:
// Only one "send-welcome-email" job per user
const jobId = `send-welcome-email:${userId}`
await queue.enqueue(jobId, { userId, email })
// Only one "generate-report" job per report
const jobId = `generate-report:${reportId}`
await queue.enqueue(jobId, { reportId })
Best Practices
Choose meaningful job IDs: Use IDs that reflect the operation and its uniqueness constraints.
Set appropriate TTLs: Balance between cache hit rates and stale data. Use shorter TTLs for frequently changing data.
Use content-addressed IDs for pure functions: If the result depends only on the input, hash the input for the job ID.
Avoid random IDs if you want deduplication: UUIDs and timestamps defeat deduplication since every enqueue gets a unique ID.
Consider clock skew in distributed systems: If multiple servers enqueue jobs, ensure their clocks are synchronized or use a centralized ID generator.
Storage Considerations
MemoryStorage
- Results and errors are stored in
Map objects with in-memory TTL tracking
- Cleanup runs every second to remove expired entries
- All data is lost on process restart
RedisStorage
- Results and errors are stored in Redis hashes with 8-byte TTL headers
- Expired entries are lazy-deleted on read (no background cleanup)
- Data persists across restarts (depends on Redis configuration)
TTL encoding (from src/storage/redis.ts:82):
#encodeExpiringValue(value: Buffer, ttlMs: number): Buffer {
const buffer = Buffer.allocUnsafe(8 + value.length)
buffer.writeBigInt64BE(BigInt(Date.now() + ttlMs)) // Expiry timestamp
value.copy(buffer, 8) // Payload
return buffer
}
FileStorage
- Results stored as
id.result + id.ttl file pairs
- TTL cleanup runs every second (only on the elected leader)
- Data persists across restarts