Documentation Index Fetch the complete documentation index at: https://mintlify.com/66HEX/frame/llms.txt
Use this file to discover all available pages before exploring further.
Frame’s batch processing system allows you to queue multiple conversion tasks and process them concurrently with full control over execution.
Task Queue System
The conversion manager implements a concurrent task queue with configurable concurrency limits:
Architecture (manager.rs:35-41):
pub struct ConversionManager {
pub ( crate ) sender : mpsc :: Sender < ManagerMessage >,
max_concurrency : Arc < AtomicUsize >,
active_tasks : Arc < Mutex < HashMap < String , u32 >>>,
cancelled_tasks : Arc < Mutex < HashSet < String >>>,
}
Components
Task Queue: FIFO queue (VecDeque) holding pending conversions
Active Tasks: HashMap tracking currently running tasks and their PIDs
Cancelled Tasks: HashSet marking tasks for cancellation
Message Channel: Async channel for queue management messages
Concurrency Control
Default Concurrency
Frame processes 2 tasks concurrently by default:
Configuration (types.rs:3):
pub const DEFAULT_MAX_CONCURRENCY : usize = 2 ;
Updating Max Concurrency
manager . update_max_concurrency ( 4 ) ? ; // Process up to 4 tasks simultaneously
Implementation (manager.rs:252-264):
pub fn update_max_concurrency ( & self , value : usize ) -> Result <(), ConversionError > {
if value == 0 {
return Err ( ConversionError :: InvalidInput (
"Max concurrency must be at least 1" . to_string (),
));
}
self . max_concurrency . store ( value , Ordering :: SeqCst );
let tx = self . sender . clone ();
tauri :: async_runtime :: spawn ( async move {
let _ = tx . send ( ManagerMessage :: ConcurrencyUpdated ) . await ;
});
Ok (())
}
Changing concurrency triggers immediate queue processing to start additional tasks if slots are available.
Querying Current Concurrency
let current = manager . current_max_concurrency ();
Implementation (manager.rs:248-250):
pub fn current_max_concurrency ( & self ) -> usize {
self . max_concurrency . load ( Ordering :: SeqCst )
}
Task States
Tasks progress through multiple states:
enum FileStatus {
IDLE = 'IDLE' , // Not queued
QUEUED = 'QUEUED' , // Waiting in queue
CONVERTING = 'CONVERTING' , // Currently processing
PAUSED = 'PAUSED' , // Paused (process suspended)
COMPLETED = 'COMPLETED' , // Successfully completed
ERROR = 'ERROR' // Failed with error
}
Type definition (types.ts:6-13):
export enum FileStatus {
IDLE = 'IDLE' ,
QUEUED = 'QUEUED' ,
CONVERTING = 'CONVERTING' ,
PAUSED = 'PAUSED' ,
COMPLETED = 'COMPLETED' ,
ERROR = 'ERROR'
}
Queue Operations
Enqueueing Tasks
Tasks are added to the queue via message passing:
Implementation (manager.rs:60-82):
ManagerMessage :: Enqueue ( task ) => {
let task = * task ;
{
let mut cancelled = cancelled_tasks_loop . lock () . unwrap ();
cancelled . remove ( & task . id);
}
if running_tasks . contains_key ( & task . id) || queued_ids . contains ( & task . id) {
continue ; // Prevent duplicate tasks
}
queued_ids . insert ( task . id . clone ());
queue . push_back ( task );
ConversionManager :: process_queue (
& app ,
& tx_clone ,
& mut queue ,
& mut queued_ids ,
& mut running_tasks ,
Arc :: clone ( & limiter ),
Arc :: clone ( & cancelled_tasks_loop ),
) . await ;
}
Processing Queue
The queue processor starts tasks up to the concurrency limit:
Implementation (manager.rs:201-246):
async fn process_queue (
app : & AppHandle ,
tx : & mpsc :: Sender < ManagerMessage >,
queue : & mut VecDeque < ConversionTask >,
queued_ids : & mut HashSet < String >,
running_tasks : & mut HashMap < String , ()>,
max_concurrency : Arc < AtomicUsize >,
cancelled_tasks : Arc < Mutex < HashSet < String >>>,
) {
let limit = max_concurrency . load ( Ordering :: SeqCst ) . max ( 1 );
while running_tasks . len () < limit {
if let Some ( task ) = queue . pop_front () {
queued_ids . remove ( & task . id);
let is_cancelled = {
let mut cancelled = cancelled_tasks . lock () . unwrap ();
cancelled . remove ( & task . id)
};
if is_cancelled {
continue ;
}
running_tasks . insert ( task . id . clone (), ());
let app_clone = app . clone ();
let tx_worker = tx . clone ();
let task_clone = task . clone ();
tauri :: async_runtime :: spawn ( async move {
if let Err ( e ) = run_ffmpeg_worker ( app_clone , tx_worker . clone (), task_clone . clone ()) . await {
let _ = tx_worker . send ( ManagerMessage :: TaskError ( task_clone . id, e )) . await ;
} else {
let _ = tx_worker . send ( ManagerMessage :: TaskCompleted ( task_clone . id)) . await ;
}
});
} else {
break ;
}
}
}
Task Control Operations
Pause Task
Suspend a running task by sending SIGSTOP (Unix) or using NtSuspendProcess (Windows):
manager . pause_task ( & task_id ) ? ;
Unix implementation (manager.rs:266-288):
#[cfg(unix)]
pub fn pause_task ( & self , id : & str ) -> Result <(), ConversionError > {
let tasks = self . active_tasks . lock () . unwrap ();
if let Some ( & pid ) = tasks . get ( id ) {
if pid == 0 {
return Err ( ConversionError :: TaskNotFound ( id . to_string ()));
}
unsafe {
if libc :: kill ( pid as libc :: pid_t , libc :: SIGSTOP ) != 0 {
return Err ( ConversionError :: Shell ( "Failed to send SIGSTOP" . to_string ()));
}
}
Ok (())
} else {
Err ( ConversionError :: TaskNotFound ( id . to_string ()))
}
}
Windows implementation (manager.rs:374-409):
#[cfg(windows)]
unsafe fn windows_suspend_resume ( pid : u32 , suspend : bool ) -> Result <(), ConversionError > {
let process_handle = OpenProcess ( PROCESS_SUSPEND_RESUME , false , pid )
. map_err ( | e | ConversionError :: Shell ( format! ( "Failed to open process: {}" , e ))) ? ;
let ntdll = GetModuleHandleA ( s! ( "ntdll.dll" )) . map_err ( | e | {
let _ = CloseHandle ( process_handle );
ConversionError :: Shell ( format! ( "Failed to get ntdll handle: {}" , e ))
}) ? ;
let fn_name = if suspend {
s! ( "NtSuspendProcess" )
} else {
s! ( "NtResumeProcess" )
};
let func_ptr = GetProcAddress ( ntdll , fn_name );
if let Some ( func ) = func_ptr {
let func : extern "system" fn ( HANDLE ) -> i32 = std :: mem :: transmute ( func );
let status = func ( process_handle );
let _ = CloseHandle ( process_handle );
if status != 0 {
return Err ( ConversionError :: Shell (
format! ( "NtSuspendProcess/NtResumeProcess failed with status: {}" , status )
));
}
Ok (())
} else {
let _ = CloseHandle ( process_handle );
Err ( ConversionError :: Shell (
"Could not find NtSuspendProcess/NtResumeProcess in ntdll" . to_string (),
))
}
}
Resume Task
Resume a paused task by sending SIGCONT (Unix) or using NtResumeProcess (Windows):
manager . resume_task ( & task_id ) ? ;
Unix implementation (manager.rs:290-314):
#[cfg(unix)]
pub fn resume_task ( & self , id : & str ) -> Result <(), ConversionError > {
let tasks = self . active_tasks . lock () . unwrap ();
if let Some ( & pid ) = tasks . get ( id ) {
if pid == 0 {
return Err ( ConversionError :: TaskNotFound ( id . to_string ()));
}
unsafe {
if libc :: kill ( pid as libc :: pid_t , libc :: SIGCONT ) != 0 {
return Err ( ConversionError :: Shell ( "Failed to send SIGCONT" . to_string ()));
}
}
Ok (())
} else {
Err ( ConversionError :: TaskNotFound ( id . to_string ()))
}
}
Cancel Task
Terminate a running task and clean up temporary files:
manager . cancel_task ( & task_id ) ? ;
Implementation (manager.rs:316-333):
pub fn cancel_task ( & self , id : & str ) -> Result <(), ConversionError > {
{
let mut cancelled = self . cancelled_tasks . lock () . unwrap ();
cancelled . insert ( id . to_string ());
}
let tasks = self . active_tasks . lock () . unwrap ();
if let Some ( & pid ) = tasks . get ( id ) {
if pid > 0 {
ConversionManager :: terminate_process ( pid ) ? ;
}
ConversionManager :: cleanup_temp_upscale_dir ( id );
Ok (())
} else {
ConversionManager :: cleanup_temp_upscale_dir ( id );
Ok (())
}
}
Cleanup (manager.rs:335-340):
fn cleanup_temp_upscale_dir ( id : & str ) {
let temp_dir = std :: env :: temp_dir () . join ( format! ( "frame_upscale_{}" , id ));
if temp_dir . exists () {
let _ = std :: fs :: remove_dir_all ( & temp_dir );
}
}
Unix termination (manager.rs:342-351):
#[cfg(unix)]
fn terminate_process ( pid : u32 ) -> Result <(), ConversionError > {
unsafe {
let _ = libc :: kill ( pid as libc :: pid_t , libc :: SIGCONT );
if libc :: kill ( pid as libc :: pid_t , libc :: SIGKILL ) != 0 {
return Err ( ConversionError :: Shell ( "Failed to send SIGKILL" . to_string ()));
}
}
Ok (())
}
Windows termination (manager.rs:353-371):
#[cfg(windows)]
fn terminate_process ( pid : u32 ) -> Result <(), ConversionError > {
unsafe {
let _ = windows_suspend_resume ( pid , false );
let process_handle = OpenProcess (
windows :: Win32 :: System :: Threading :: PROCESS_TERMINATE ,
false ,
pid ,
) . map_err ( | e | {
ConversionError :: Shell ( format! ( "Failed to open process for termination: {}" , e ))
}) ? ;
let _ = windows :: Win32 :: System :: Threading :: TerminateProcess ( process_handle , 1 );
let _ = CloseHandle ( process_handle );
}
Ok (())
}
Manager Messages
The queue communicates via async messages:
Message types (manager.rs:27-33):
pub enum ManagerMessage {
Enqueue ( Box < ConversionTask >),
ConcurrencyUpdated ,
TaskStarted ( String , u32 ),
TaskCompleted ( String ),
TaskError ( String , ConversionError ),
}
Message Flow
Enqueue: Add task to queue and trigger processing
TaskStarted: Register PID for process control
TaskCompleted: Remove from active tasks, process next in queue
TaskError: Emit error event, remove from active tasks, process next
ConcurrencyUpdated: Re-process queue with new limit
Task Lifecycle
Progress Tracking
The manager emits events for UI updates:
Event types (types.rs:183-209):
#[derive( Clone , Serialize )]
pub struct ProgressPayload {
pub id : String ,
pub progress : f64 ,
}
#[derive( Clone , Serialize )]
pub struct StartedPayload {
pub id : String ,
}
#[derive( Clone , Serialize )]
pub struct CompletedPayload {
pub id : String ,
pub output_path : String ,
}
#[derive( Clone , Serialize )]
pub struct ErrorPayload {
pub id : String ,
pub error : String ,
}
#[derive( Clone , Serialize )]
pub struct LogPayload {
pub id : String ,
pub line : String ,
}
Event emission (manager.rs:149-166):
ManagerMessage :: TaskError ( id , err ) => {
eprintln! ( "Task {} failed: {}" , id , err );
let _ = app . emit (
"conversion-log" ,
LogPayload {
id : id . clone (),
line : format! ( "[ERROR] {}" , err ),
},
);
let _ = app . emit (
"conversion-error" ,
ErrorPayload {
id : id . clone (),
error : err . to_string (),
},
);
// ...
}
Error Handling
When a task fails, it is removed from active tasks and the queue continues processing:
Error flow (manager.rs:149-188):
ManagerMessage :: TaskError ( id , err ) => {
eprintln! ( "Task {} failed: {}" , id , err );
let _ = app . emit ( "conversion-log" , LogPayload {
id : id . clone (),
line : format! ( "[ERROR] {}" , err ),
});
let _ = app . emit ( "conversion-error" , ErrorPayload {
id : id . clone (),
error : err . to_string (),
});
running_tasks . remove ( & id );
{
let mut cancelled = cancelled_tasks_loop . lock () . unwrap ();
cancelled . remove ( & id );
}
{
let mut tasks = active_tasks_loop . lock () . unwrap ();
tasks . remove ( & id );
}
ConversionManager :: process_queue (
& app ,
& tx_clone ,
& mut queue ,
& mut queued_ids ,
& mut running_tasks ,
Arc :: clone ( & limiter ),
Arc :: clone ( & cancelled_tasks_loop ),
) . await ;
}
Failed tasks do not block the queue. Other tasks continue processing independently.
Thread Safety
The manager uses thread-safe primitives:
Arc AtomicUsize: Atomic concurrency limit shared across threads
Arc Mutex HashMap: Thread-safe active task registry
Arc Mutex HashSet: Thread-safe cancellation tracking
mpsc::Sender: Lock-free async message passing
Performance Considerations
Concurrency Recommendations
Software encoding (libx264, libx265, vp9):
Set concurrency to number of CPU cores / 4
Example: 16-core CPU → max 4 concurrent tasks
Prevents CPU oversubscription and thermal throttling
Hardware encoding (NVENC, VideoToolbox):
Set concurrency to 2-4
GPU encoders have dedicated hardware units
Limited by VRAM and hardware encoder count
Combination of CPU and GPU tasks:
Set concurrency to 3-4
GPU tasks use encoder while CPU tasks use cores
Monitor system resources and adjust
Real-ESRGAN processing:
Set concurrency to 1-2
Very VRAM intensive (4-16GB per task)
Frame manages per-task thread concurrency automatically
Resource Management
Default conservative setting (2 concurrent tasks):
Prevents system overload
Ensures responsive UI
Safe for most hardware configurations
Increasing concurrency:
// For powerful workstations
manager . update_max_concurrency ( 4 ) ? ;
// For servers with multiple GPUs
manager . update_max_concurrency ( 8 ) ? ;
Example: Batch Processing Workflow
import { conversionService } from '$lib/services/conversion' ;
// Prepare files
const files = [
{ id: '1' , path: '/path/to/video1.mp4' , config: preset1Config },
{ id: '2' , path: '/path/to/video2.mkv' , config: preset2Config },
{ id: '3' , path: '/path/to/video3.avi' , config: preset1Config },
];
// Set concurrency
await conversionService . setMaxConcurrency ( 3 );
// Enqueue all tasks
for ( const file of files ) {
await conversionService . convert ( file . id , file . path , file . config );
}
// Listen for progress
conversionService . on ( 'progress' , ( payload ) => {
console . log ( `Task ${ payload . id } : ${ payload . progress } %` );
});
// Listen for completion
conversionService . on ( 'completed' , ( payload ) => {
console . log ( `Task ${ payload . id } completed: ${ payload . output_path } ` );
});
// Listen for errors
conversionService . on ( 'error' , ( payload ) => {
console . error ( `Task ${ payload . id } failed: ${ payload . error } ` );
});
// Pause a specific task
await conversionService . pauseTask ( '1' );
// Resume it later
await conversionService . resumeTask ( '1' );
// Cancel a task
await conversionService . cancelTask ( '2' );
Related Features
Video Conversion Configure individual task settings
Presets Apply consistent settings across batch
AI Upscaling Batch upscale multiple videos