Skip to main content

Overview

The Queue API provides endpoints for managing BullMQ media processing jobs and monitoring queue health. Jobs are enqueued via this API and processed by background workers. Base Path: /api/queue Key Features:
  • Enqueue media jobs (transcoding, thumbnails, captions)
  • Real-time queue statistics (depth, waiting, active, completed)
  • Subscription tier enforcement
  • Taxonomy cache management

GET /api/queue/stats

Returns BullMQ queue depths and taxonomy cache statistics. Useful for dashboard observability. Authentication: None (public endpoint)
ok
boolean
Request success status
stats
object
BullMQ queue statistics object containing:
  • waiting - Jobs waiting to be processed
  • active - Currently processing jobs
  • completed - Successfully completed jobs
  • failed - Failed jobs
  • delayed - Jobs scheduled for future execution
taxonomy_cache
object
Taxonomy cache statistics:
  • size - Number of cached entries
  • hits - Cache hit count
  • misses - Cache miss count
Example Request:
curl http://localhost:3001/api/queue/stats
Response:
{
  "ok": true,
  "stats": {
    "waiting": 3,
    "active": 1,
    "completed": 142,
    "failed": 2,
    "delayed": 0
  },
  "taxonomy_cache": {
    "size": 1847,
    "hits": 9234,
    "misses": 128
  }
}
Use Cases:
  • Health check dashboards
  • Queue depth monitoring alerts
  • Performance analytics
  • Worker scaling decisions

POST /api/queue/media

Enqueue a media job that already exists in the Directus media_jobs collection. Authentication: Directus JWT (Bearer token)
You must create the Directus media_jobs record before calling this endpoint. This endpoint only pushes the job into BullMQ so workers can pick it up.

Request Parameters

jobId
string
required
UUID of the existing media_jobs Directus record
operation
string
required
Operation type (e.g., transcoding, thumbnail, caption_generation). Used for tier validation.
input_file_id
string
UUID of the input file from Directus directus_files (optional)
input_url
string
URL of remote input file (optional, alternative to input_file_id)
params
object
Job-specific parameters (e.g., {"format": "mp4", "resolution": "1080p"})
creator_profile_id
string
UUID of the associated platform_connections record (optional)
priority
number
Job priority (1-100, default: 10). Lower numbers = higher priority.

Response

ok
boolean
Request success status
bullJobId
string
BullMQ job ID (UUID) assigned by the queue
jobId
string
Directus media_jobs record UUID (echoed from request)

Example Request

curl -X POST http://localhost:3001/api/queue/media \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." \
  -H "Content-Type: application/json" \
  -d '{
    "jobId": "f47ac10b-58cc-4372-a567-0e02b2c3d479",
    "operation": "transcoding",
    "input_file_id": "a1b2c3d4-5e6f-7a8b-9c0d-1e2f3a4b5c6d",
    "params": {
      "format": "mp4",
      "resolution": "1080p",
      "codec": "h264"
    },
    "creator_profile_id": "d3f21c8e-4a5b-4c9d-8e7f-1a2b3c4d5e6f",
    "priority": 5
  }'
Success Response:
{
  "ok": true,
  "bullJobId": "bull-job-uuid-12345",
  "jobId": "f47ac10b-58cc-4372-a567-0e02b2c3d479"
}

Subscription Tier Enforcement

The endpoint validates if the user’s subscription tier allows the requested operation: From queue.js:88:
const check = await canPerform(userId, operation, _directusAdminClient);
if (!check.allowed) {
  return res.status(403).json({
    error: "tier_limit_reached",
    reason: check.reason,
    message:
      check.reason === "limit_reached"
        ? "You have reached your plan limit for this feature. Upgrade to continue."
        : "This feature is not available on your current plan.",
  });
}
Tier Limit Error Response:
{
  "error": "tier_limit_reached",
  "reason": "limit_reached",
  "message": "You have reached your plan limit for this feature. Upgrade to continue."
}
Tier Feature Unavailable:
{
  "error": "tier_limit_reached",
  "reason": "feature_unavailable",
  "message": "This feature is not available on your current plan."
}

Supported Operations

Common operation values (validated against subscription tier):
  • transcoding - Video format conversion
  • thumbnail - Generate video thumbnail
  • caption_generation - AI caption generation
  • download - Download remote media
  • upload - Upload processing
  • analysis - Media analysis/metadata extraction

POST /api/queue/taxonomy/invalidate

Bust the taxonomy cache (e.g., after admin updates taxonomy_terms in Directus). Authentication: None (internal loopback only)
This endpoint is designed for internal use only. It should be called via localhost after Directus webhooks update taxonomy data.
Example Request:
curl -X POST http://localhost:3001/api/queue/taxonomy/invalidate
Response:
{
  "ok": true,
  "message": "Taxonomy cache invalidated"
}
Use Case: Directus webhook configuration:
Webhook URL: http://localhost:3001/api/queue/taxonomy/invalidate
Trigger: After Create/Update/Delete on taxonomy_terms collection
Method: POST

Queue Architecture

Job Lifecycle

  1. Create Directus Record - Client creates media_jobs record with status=pending
  2. Enqueue - Client calls POST /api/queue/media with jobId
  3. BullMQ Processing - Worker picks up job from queue
  4. Status Updates - Worker updates Directus record (status=processing, status=completed, status=failed)
  5. Client Polling - Client polls Directus media_jobs record for status changes

Queue Configuration

From server/utils/actionBus.js:
const mediaQueue = new Queue('media', {
  connection: redisConnection,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000,
    },
    removeOnComplete: 100, // Keep last 100 completed jobs
    removeOnFail: 1000,    // Keep last 1000 failed jobs
  },
});

Job Priority Levels

  • 1-3 - Critical (real-time user actions)
  • 4-6 - High (scheduled urgent tasks)
  • 7-10 - Normal (default background processing)
  • 11+ - Low (batch operations, cleanup)

Error Handling

Missing required fields:
{
  "error": "jobId and operation are required"
}
Invalid token:
{
  "error": "Invalid or expired Directus token"
}
Queue enqueue failure:
{
  "ok": false,
  "error": "Redis connection refused"
}
Stats retrieval failure:
{
  "ok": false,
  "error": "Failed to connect to Redis"
}

Monitoring & Observability

Queue Metrics Dashboard

Use the /stats endpoint to build real-time dashboards:
// Poll every 5 seconds
setInterval(async () => {
  const res = await fetch('http://localhost:3001/api/queue/stats');
  const { stats } = await res.json();
  
  console.log(`Queue depth: ${stats.waiting}`);
  console.log(`Active jobs: ${stats.active}`);
  console.log(`Failed jobs: ${stats.failed}`);
}, 5000);

Alerting Thresholds

Recommended alert triggers:
  • High queue depth: waiting > 100 (consider scaling workers)
  • High failure rate: failed / (completed + failed) > 0.1 (10%)
  • Stalled jobs: active > 0 for > 30 minutes (worker crash?)

Integration Example

Complete workflow for enqueuing a transcoding job:
// 1. Upload file to Directus
const formData = new FormData();
formData.append('file', videoFile);

const uploadRes = await fetch('http://127.0.0.1:8055/files', {
  method: 'POST',
  headers: { 'Authorization': `Bearer ${directusToken}` },
  body: formData,
});
const { data: fileData } = await uploadRes.json();
const fileId = fileData.id;

// 2. Create media_jobs record
const jobRes = await fetch('http://127.0.0.1:8055/items/media_jobs', {
  method: 'POST',
  headers: {
    'Authorization': `Bearer ${directusToken}`,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    operation: 'transcoding',
    input_file_id: fileId,
    status: 'pending',
    params: { format: 'mp4', resolution: '1080p' },
  }),
});
const { data: jobData } = await jobRes.json();
const jobId = jobData.id;

// 3. Enqueue in BullMQ
const enqueueRes = await fetch('http://localhost:3001/api/queue/media', {
  method: 'POST',
  headers: {
    'Authorization': `Bearer ${directusToken}`,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    jobId,
    operation: 'transcoding',
    input_file_id: fileId,
    params: { format: 'mp4', resolution: '1080p' },
    priority: 5,
  }),
});
const { ok, bullJobId } = await enqueueRes.json();

console.log(`Job enqueued: ${bullJobId}`);

// 4. Poll for completion
const pollInterval = setInterval(async () => {
  const statusRes = await fetch(`http://127.0.0.1:8055/items/media_jobs/${jobId}`, {
    headers: { 'Authorization': `Bearer ${directusToken}` },
  });
  const { data } = await statusRes.json();
  
  if (data.status === 'completed') {
    console.log('Job completed!', data.output_file_id);
    clearInterval(pollInterval);
  } else if (data.status === 'failed') {
    console.error('Job failed:', data.error);
    clearInterval(pollInterval);
  }
}, 2000);

Next Steps

Captions API

Generate AI-powered captions (one of the queue operations)

Credentials API

Manage encrypted credentials for media jobs

Build docs developers (and LLMs) love