Skip to main content

Overview

OpenShorts uses a sophisticated 10-step pipeline to transform long-form videos into viral vertical clips. The entire workflow is powered by AI and runs automatically once you submit a video.

Processing Pipeline

The complete pipeline processes videos through these stages:
1

Ingest

Downloads YouTube videos via yt-dlp or processes local uploads with size validation (2GB limit)
2

Transcription

Uses faster-whisper with CPU optimization (INT8 quantization) to generate word-level timestamps
3

Scene Detection

Applies PySceneDetect’s ContentDetector to identify scene boundaries for smooth transitions
4

AI Analysis

Gemini 2.5 Flash analyzes the transcript to identify 3-15 viral moments (15-60 seconds each)
5

FFmpeg Extraction

Precisely cuts selected segments using absolute timestamps with 0.2-0.4s padding for natural flow
6

AI Cropping

Dual-mode vertical reframing:
  • TRACK Mode: Single-subject tracking with MediaPipe + YOLOv8
  • GENERAL Mode: Multi-person scenes with blurred background layout
7

Effects/Subtitles

Optional AI-generated FFmpeg filters for dynamic zooms, color grading, and word-level subtitles
8

Hook Overlay

Adds viral text hooks with custom fonts and positioning
9

Voice Dubbing

Optional ElevenLabs AI translation to 30+ languages with voice cloning
10

S3 Backup

Silent background upload of clips and metadata to AWS S3
11

Social Distribution

One-click async posting to TikTok, Instagram, and YouTube via Upload-Post API

Input Methods

OpenShorts supports two input methods:
# main.py:452-579
# Downloads using yt-dlp with anti-bot detection
python main.py -u "https://youtube.com/watch?v=..." -o output/
Features:
  • Multi-client fallback (tv_embed, android, mweb, web)
  • Cookie support via YOUTUBE_COOKIES env var
  • Automatic H.264 codec selection for compatibility
  • Retry logic (10 retries, 10 fragment retries)
  • Socket timeout: 30 seconds
Example:
docker compose exec backend python main.py \
  -u "https://www.youtube.com/watch?v=dQw4w9WgXcQ" \
  -o /app/output

Scene Detection & Analysis

The system uses PySceneDetect to intelligently segment videos:
# main.py:423-433
def detect_scenes(video_path):
    video_manager = VideoManager([video_path])
    scene_manager = SceneManager()
    scene_manager.add_detector(ContentDetector())  # Default threshold
    video_manager.set_downscale_factor()
    video_manager.start()
    scene_manager.detect_scenes(frame_source=video_manager)
    scene_list = scene_manager.get_scene_list()
    fps = video_manager.get_framerate()
    video_manager.release()
    return scene_list, fps

Scene Strategy Analysis

After detection, each scene is analyzed to determine optimal framing:
# main.py:375-421
def analyze_scenes_strategy(video_path, scenes):
    # Samples 3 frames per scene (start, middle, end)
    # Counts faces using MediaPipe
    # Returns 'TRACK' or 'GENERAL' strategy for each scene
    
    if avg_faces > 1.2 or avg_faces < 0.5:
        strategies.append('GENERAL')  # Multiple people or no faces
    else:
        strategies.append('TRACK')     # Single subject tracking
Performance Tip: Scene detection runs at downscaled resolution for speed. The ContentDetector uses default sensitivity which works well for most content.

Job Queue & Concurrency

The FastAPI backend manages processing jobs with a sophisticated queue system:
# app.py:28-39
MAX_CONCURRENT_JOBS = int(os.environ.get("MAX_CONCURRENT_JOBS", "5"))
MAX_FILE_SIZE_MB = 2048  # 2GB limit
JOB_RETENTION_SECONDS = 3600  # 1 hour retention

job_queue = asyncio.Queue()
jobs: Dict[str, Dict] = {}
concurrency_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS)

Queue Worker

# app.py:113-144
async def process_queue():
    while True:
        job_id = await job_queue.get()
        await concurrency_semaphore.acquire()
        asyncio.create_task(run_job_wrapper(job_id))

async def run_job_wrapper(job_id):
    try:
        job = jobs.get(job_id)
        if job:
            await run_job(job_id, job)
    finally:
        concurrency_semaphore.release()
        job_queue.task_done()

Job Status Tracking

# Job states
jobs[job_id] = {
    'status': 'queued',        # queued -> processing -> completed/failed
    'logs': [],                # Real-time processing logs
    'cmd': cmd,                # Shell command being executed
    'env': env,                # Environment with API keys
    'output_dir': job_output_dir,
    'result': {                # Populated when completed
        'clips': [...],
        'cost_analysis': {...}
    }
}
# .env file
MAX_CONCURRENT_JOBS=10

AI Viral Moment Detection

Gemini analyzes the transcript using a strict prompt contract:
# main.py:31-68 - Gemini Prompt Template
GEMINI_PROMPT_TEMPLATE = """
You are a senior short-form video editor. Read the ENTIRE transcript and 
word-level timestamps to choose the 3–15 MOST VIRAL moments for TikTok/IG 
Reels/YouTube Shorts. Each clip must be between 15 and 60 seconds long.

⚠️ FFMPEG TIME CONTRACT — STRICT REQUIREMENTS:
- Return timestamps in ABSOLUTE SECONDS from the start of the video
- Only NUMBERS with decimal point, up to 3 decimals (examples: 0, 1.250, 17.350)
- Ensure 0 ≤ start < end ≤ VIDEO_DURATION_SECONDS
- Each clip between 15 and 60 s (inclusive)
- Prefer starting 0.2–0.4 s BEFORE the hook and ending 0.2–0.4 s AFTER the payoff
- Use silence moments for natural cuts; never cut mid-word
"""

Response Format

// main.py:56-67
{
  "shorts": [
    {
      "start": 12.340,
      "end": 37.900,
      "video_description_for_tiktok": "<description>",
      "video_description_for_instagram": "<description>",
      "video_title_for_youtube_short": "<title 100 chars max>",
      "viral_hook_text": "<SHORT punchy text (max 10 words)>"
    }
  ]
}

Cost Analysis

# main.py:836-869
# Gemini 2.5 Flash Pricing (Dec 2025)
# Input: $0.10 per 1M tokens
# Output: $0.40 per 1M tokens

if usage:
    prompt_tokens = usage.prompt_token_count
    output_tokens = usage.candidates_token_count
    
    input_cost = (prompt_tokens / 1_000_000) * 0.10
    output_cost = (output_tokens / 1_000_000) * 0.40
    total_cost = input_cost + output_cost
Important: The AI analysis requires the GEMINI_API_KEY environment variable. The system will fail gracefully if missing and convert the entire video instead.

Processing Complete Video

To skip AI analysis and process the entire video:
# main.py:898-899
python main.py -i input.mp4 -o output.mp4 --skip-analysis
This mode:
  • Skips transcription and Gemini analysis
  • Processes the entire video as a single clip
  • Applies vertical cropping with subject tracking
  • Faster for simple conversions

Configuration Options

Key constants you can modify:
# main.py:29
ASPECT_RATIO = 9 / 16  # Output aspect ratio (vertical)

# main.py:169-178 - Speaker tracking
stabilization_frames=15    # Frames needed to confirm new speaker
cooldown_frames=30         # Minimum frames before switching

# main.py:104 - Camera safe zone
self.safe_zone_radius = self.crop_width * 0.25  # 25% safe zone

Real-Time Progress

Monitor processing through the logs:
# app.py:176-188
def enqueue_output(out, job_id):
    for line in iter(out.readline, b''):
        decoded_line = line.decode('utf-8').strip()
        if decoded_line:
            print(f"📝 [Job Output] {decoded_line}")
            if job_id in jobs:
                jobs[job_id]['logs'].append(decoded_line)
Logs include:
  • 📥 Download progress
  • 🎙️ Transcription updates
  • 🤖 Gemini analysis status
  • 🎬 Scene detection results
  • ✂️ Frame processing progress
  • ✅ Completion status

Next Steps

After processing completes:

Build docs developers (and LLMs) love