Skip to main content
StreamClient keeps a single dpf process alive and sends jobs to it over stdin one line at a time, reading responses from stdout. This eliminates the process-spawn overhead that Client incurs on every call.

When to use StreamClient

ScenarioRecommended client
One-off script or CLI toolClient
Web server handling image uploadsStreamClient
Background worker processing a queueStreamClient
Multiple sequential operations in a requestStreamClient
Isolated, infrequent jobsClient
For server workloads where dpf is called repeatedly, StreamClient is the right choice. You start it once at startup and reuse it for the lifetime of the process.

Initialization

Call NewStreamClient with the path to the binary. It starts the Rust engine in --stream mode and returns a ready-to-use client.
sc, err := dpf.NewStreamClient("./bin/dpf")
if err != nil {
    log.Fatalf("failed to start dpf: %v", err)
}
defer sc.Close()
Always call sc.Close() when your application shuts down. It closes the stdin pipe and waits for the Rust process to exit cleanly. Skipping this can leave orphaned processes and corrupt in-flight output.

Sending jobs

StreamClient.Execute serializes the job to a single JSON line, writes it to stdin, and reads the response line from stdout:
result, err := sc.Execute(&dpf.ResizeJob{
    Operation: "resize",
    Input:     "photo.png",
    OutputDir: "out/",
    Widths:    []uint32{320, 640, 1024},
})
if err != nil {
    log.Printf("execute failed: %v", err)
    return
}
if !result.Success {
    log.Printf("operation failed: %s", result.Operation)
    return
}
log.Printf("resized in %dms", result.ElapsedMs)
All typed convenience methods from StreamClient follow the same pattern — they call Execute internally:
// Image operations
result, err := sc.Crop(&dpf.CropJob{...})
result, err := sc.Rotate(&dpf.RotateJob{...})
result, err := sc.Watermark(&dpf.WatermarkJob{...})
result, err := sc.Adjust(&dpf.AdjustJob{...})
result, err := sc.Srcset(&dpf.SrcsetJob{...})
result, err := sc.Exif(&dpf.ExifJob{...})

// Video operations
result, err := sc.VideoTranscode(&dpf.VideoTranscodeJob{...})
result, err := sc.VideoResize(&dpf.VideoResizeJob{...})
result, err := sc.VideoTrim(&dpf.VideoTrimJob{...})
result, err := sc.VideoThumbnail(&dpf.VideoThumbnailJob{...})
result, err := sc.VideoProfile(&dpf.VideoProfileJob{...})

// Audio operations
result, err := sc.AudioTranscode(&dpf.AudioTranscodeJob{...})
result, err := sc.AudioNormalize(&dpf.AudioNormalizeJob{...})
result, err := sc.AudioTrim(&dpf.AudioTrimJob{...})
result, err := sc.AudioSilenceTrim(&dpf.AudioSilenceTrimJob{...})

Thread safety

StreamClient.Execute acquires a mutex before writing to stdin and reading from stdout, so it is safe to call from multiple goroutines concurrently:
// dpf.go — the mutex protects the stdin/stdout round-trip
func (sc *StreamClient) Execute(job any) (*JobResult, error) {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    // ...
}
Jobs are serialized: each goroutine that calls Execute blocks until its response arrives. If you need true parallel execution, use the batch operation to send multiple jobs in a single call.

Server integration example

The following pattern initializes StreamClient once at server startup and shares it across request handlers:
type MediaServer struct {
    dpf *dpf.StreamClient
}

func NewMediaServer(binaryPath string) (*MediaServer, error) {
    sc, err := dpf.NewStreamClient(binaryPath)
    if err != nil {
        return nil, err
    }
    return &MediaServer{dpf: sc}, nil
}

func (s *MediaServer) Shutdown() {
    s.dpf.Close()
}

func (s *MediaServer) handleVideoTranscode(params json.RawMessage) (any, error) {
    var req struct {
        Input   string `json:"input"`
        Output  string `json:"output"`
        Codec   string `json:"codec"`
        Bitrate string `json:"bitrate,omitempty"`
    }
    if err := json.Unmarshal(params, &req); err != nil {
        return nil, err
    }
    return s.dpf.VideoTranscode(&dpf.VideoTranscodeJob{
        Input:   req.Input,
        Output:  req.Output,
        Codec:   req.Codec,
        Bitrate: req.Bitrate,
    })
}

func (s *MediaServer) handleAudioNormalize(params json.RawMessage) (any, error) {
    var req struct {
        Input      string  `json:"input"`
        Output     string  `json:"output"`
        TargetLUFS float64 `json:"target_lufs"`
    }
    if err := json.Unmarshal(params, &req); err != nil {
        return nil, err
    }
    return s.dpf.AudioNormalize(&dpf.AudioNormalizeJob{
        Input:      req.Input,
        Output:     req.Output,
        TargetLUFS: req.TargetLUFS,
    })
}

Error handling

Handle both the Go-level error and the operation-level Success flag:
result, err := sc.VideoProfile(&dpf.VideoProfileJob{
    Input:   "recording.mp4",
    Output:  "web.mp4",
    Profile: "web-mid",
})
if err != nil {
    // Process died, pipe broken, or JSON was malformed
    log.Printf("stream error: %v", err)
    return
}
if !result.Success {
    log.Printf("transcode failed: operation=%s elapsed=%dms", result.Operation, result.ElapsedMs)
    return
}
log.Printf("transcoded to %s in %dms", result.Outputs[0].Path, result.ElapsedMs)

Implementation details

NewStreamClient wires up the stdin/stdout pipes, starts the process, and wraps stdout in a 4 MB buffered reader to handle large responses efficiently:
return &StreamClient{
    cmd:    cmd,
    stdin:  stdin,
    reader: bufio.NewReaderSize(stdout, 4*1024*1024), // 4MB buffer
}, nil
Close closes stdin (which signals the Rust engine to exit) and waits for the process to finish:
func (sc *StreamClient) Close() error {
    sc.stdin.Close()
    return sc.cmd.Wait()
}

Build docs developers (and LLMs) love