Skip to main content

Overview

The media worker is a BullMQ-based background processor that handles all FFmpeg, ImageMagick, and yt-dlp operations. It polls the media_jobs collection every 1.5 seconds and executes operations asynchronously. Source: media-worker/index.js
pm2 name: media-worker
Queue: Redis-backed BullMQ (polls media_jobs collection)

Architecture

Polling System

Directus media_jobs collection
  ↓ (poll every 1.5s)
BullMQ worker picks jobs with status=queued

Execute operation (ffmpeg/imagemagick/yt-dlp)

Upload output to Directus /files

Create media_job_outputs record

Update job status=completed
Concurrency: 1 (configurable via MEDIA_WORKER_CONCURRENCY)

Environment Variables

VariableDefaultPurpose
DIRECTUS_URLhttp://127.0.0.1:8055Directus API base
DIRECTUS_ADMIN_TOKENagentx-directus-admin-static-2026Admin auth token
CREDENTIALS_ENC_KEY_B64AES-256-GCM key for decrypting download credentials
MEDIA_WORKER_IDmedia-worker-1Worker instance ID
MEDIA_WORKER_POLL_MS1500Poll interval (ms)
MEDIA_TMP_DIR/tmp/agentx-mediaTemp directory for processing

Operations

Watermarking

Operation: apply_watermark
Supported formats: Images (JPG, PNG, WebP) and videos (MP4)

Parameters

{
  "watermark_file_id": "abc123",
  "position": "br",  // tl | tr | bl | br | center
  "opacity": 70,      // 0-100
  "margin": 20,       // pixels from edge
  "format": "auto"    // output format (auto = same as input)
}

Image Watermarking (ImageMagick)

  • Scales watermark to 15% of source image width
  • Uses composite command with -dissolve for opacity
  • Preserves aspect ratio

Video Watermarking (FFmpeg)

  • Converts watermark to half-transparent PNG
  • Applies overlay filter: [1:v]scale=iw*0.15:-1[wm];[0:v][wm]overlay=...
  • Re-encodes with H.264 (preset: veryfast, CRF: 22)
async function runApplyWatermark(job, workDir) {
  const { watermark_file_id, position = "br", opacity = 70, margin = 20 } = job.params;
  
  // Download source and watermark
  await downloadDirectusFile(job.input_file_id, srcPath);
  await downloadDirectusFile(watermark_file_id, wmPath);
  
  // Detect if video or image
  const isVideo = mimeType.startsWith("video/");
  
  if (isVideo) {
    // FFmpeg overlay with alpha blending
    await execFileAsync("ffmpeg", [
      "-y", "-i", srcPath, "-i", wmAlphaPath,
      "-filter_complex", `[1:v]scale=iw*0.15:-1[wm];[0:v][wm]overlay=${overlayPos}`,
      "-c:v", "libx264", "-preset", "veryfast", "-crf", "22",
      outPath
    ]);
  } else {
    // ImageMagick composite
    await execFileAsync("composite", [
      "-gravity", gravity, "-geometry", `+${margin}+${margin}`,
      "-dissolve", String(opacity), wmScaledPath, srcPath, outPath
    ]);
  }
}

Teaser Clip Generation

Operation: create_teaser
Use case: Generate preview clips for social media (TikTok, Instagram Reels, X)

Parameters

{
  "duration": 15,           // seconds
  "start_time": 0,          // offset in seconds
  "crf": 23,                // quality (lower = better)
  "preset": "veryfast",     // FFmpeg preset
  "add_blur_end": true      // blur last 2s as paywall tease
}

Blur Effect

When add_blur_end: true, the worker:
  1. Extracts the full teaser duration
  2. Splits into clean segment (0 to duration-2s) and blur segment (last 2s)
  3. Applies progressive Gaussian blur: gblur=sigma=20
  4. Concatenates both segments
async function runCreateTeaser(job, workDir) {
  const { duration = 15, start_time = 0, add_blur_end = true } = job.params;
  
  if (add_blur_end) {
    const blurStart = Math.max(0, duration - 2);
    
    // Extract full teaser
    await execFileAsync("ffmpeg", [
      "-y", "-i", inPath,
      "-ss", String(start_time), "-t", String(duration),
      "-c:v", "libx264", "-preset", "veryfast", rawPath
    ]);
    
    // Apply blur to last 2 seconds
    await execFileAsync("ffmpeg", [
      "-y", "-i", rawPath,
      "-vf", `split[in1][in2];[in1]trim=end=${cleanEnd}[clean];[in2]trim=start=${blurStart},gblur=sigma=20[blur];[clean][blur]concat=n=2:v=1:a=0[v]`,
      "-map", "[v]", "-c:v", "libx264", outPath
    ]);
  }
}

Video Compression

Operation: compress_video
Use case: Reduce file size for platform uploads

Parameters

{
  "crf": 28,           // 18-28 recommended (28 = high compression)
  "preset": "medium"   // ultrafast | veryfast | medium | slow
}
Codec: H.264 (libx264)
Audio: Copy (no re-encode)

Metadata Stripping

Operations: strip_metadata
Use case: Remove EXIF/GPS/device data before sharing content

Image Stripping (Sharp or ImageMagick)

Primary: sharp().withMetadata(false)
Fallback: convert +profile "*" (removes all embedded profiles/EXIF)

Video Stripping (FFmpeg)

Command: ffmpeg -i input.mp4 -map_metadata -1 -c copy output.mp4
Removes: Global metadata, stream metadata, chapter metadata
Auto-strip: The download_video and scrape_profile operations automatically strip metadata from all downloaded files before upload to Directus.
async function stripVideoMetadata(inputPath, outputPath) {
  await execFileAsync("ffmpeg", [
    "-y", "-i", inputPath,
    "-map_metadata", "-1",  // Remove all metadata
    "-c:v", "copy",          // No re-encode
    "-c:a", "copy",
    outputPath
  ]);
}

async function stripImageMetadata(inputPath, outputPath) {
  if (sharp) {
    await sharp(inputPath).withMetadata(false).toFile(outputPath);
  } else {
    // Fallback: ImageMagick
    await execFileAsync("convert", [inputPath, "+profile", "*", outputPath]);
  }
}

Download Video (yt-dlp)

Operation: download_video
Supported platforms: YouTube, TikTok, Instagram, X, Reddit, OnlyFans, Fansly, Pornhub, XVideos, and 10+ more

Parameters

{
  "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]",
  "extract_audio": false,
  "audio_format": "mp3",
  "no_playlist": true,
  "max_filesize": "500m"
}

Credential Injection

The worker automatically injects cookies or username/password from the download_credentials collection:
  1. Maps input URL domain → platform slug (e.g., onlyfans.comonlyfans)
  2. Fetches active credentials scoped to creator profile
  3. Decrypts credentials using CREDENTIALS_ENC_KEY_B64
  4. Adds --cookies cookies.txt or -u username -p password to yt-dlp args
async function runDownloadVideo(job, workDir) {
  const { format, extract_audio, max_filesize } = job.params;
  const { credArgs, cleanup } = await buildCredArgs(job.input_url, workDir, job.creator_profile_id);
  
  const args = ["--no-warnings", "--no-progress", "-o", outTemplate, ...credArgs];
  
  if (extract_audio) {
    args.push("-x", "--audio-format", audio_format);
  } else {
    args.push("-f", format, "--merge-output-format", "mp4");
  }
  
  await execFileAsync("yt-dlp", args);
  
  // Auto-strip metadata before upload
  if (videoExts.has(dlExt)) {
    await stripVideoMetadata(rawOutputPath, strippedOutputPath);
  }
}

Image Operations

Resize Image

{
  "width": 1920,
  "height": 1080,
  "format": "jpg"
}
Command: convert input -resize 1920x1080 output.jpg

Crop Image

{
  "width": 1080,
  "height": 1080,
  "x": 0,
  "y": 0,
  "format": "jpg"
}
Command: convert input -crop 1080x1080+0+0 +repage output.jpg

Convert Image

{
  "format": "webp",
  "quality": 85
}
Command: convert input -quality 85 output.webp

Steganographic Watermarking

Operation: apply_steganographic_watermark
Use case: Embed invisible buyer fingerprints for leak tracking
Requires sharp npm package. Falls back to copying file unchanged if sharp is unavailable.

Parameters

{
  "user_id": "abc123",     // Directus user ID
  "content_id": "def456",  // Content reference
  "output_format": "jpg"
}

How It Works

  1. Embeds { userId, timestamp, contentId } in red channel LSBs
  2. Offset determined by userId hash (unique per buyer)
  3. Invisible to naked eye (~0.4% pixel change)
  4. Survives JPEG compression and social media re-encoding
Implementation: server/utils/steganography.js (lazy-loaded by worker)

Job Management

Enqueue a Job (from Dashboard)

import { queue } from '@/utils/api';

await queue.create({
  operation: 'apply_watermark',
  input_file_id: mediaItem.id,
  params: {
    watermark_file_id: 'watermark-logo-id',
    position: 'br',
    opacity: 70,
    margin: 20
  }
});

Check Job Status

const { data } = await collections.read('media_jobs', {
  filter: { id: { _eq: jobId } },
  fields: 'id,status,progress,error_message'
});

// status: queued | processing | completed | failed
// progress: 0-100

Operations Registry

All 17 operations registered in media-worker/index.js:1161-1184:
OperationCategoryTools
convert_imageImageImageMagick
resize_imageImageImageMagick
crop_imageImageImageMagick
resize_videoVideoFFmpeg
crop_videoVideoFFmpeg
compress_videoVideoFFmpeg
download_videoDownloadyt-dlp
ytdlp_infoDownloadyt-dlp
trim_videosVideoFFmpeg
join_videosVideoFFmpeg
apply_watermarkProtectionFFmpeg + ImageMagick
create_teaserVideoFFmpeg
strip_metadataPrivacyFFmpeg + Sharp/ImageMagick
apply_steganographic_watermarkProtectionSharp
scrape_profileScrapingStagehand (see Platform Scraping)
publish_postPublishingStagehand (see Post Scheduling)

Logs & Debugging

pm2 logs media-worker --lines 100

# Follow logs in real-time
pm2 logs media-worker -f

Build docs developers (and LLMs) love