StreamClient keeps a single dpf process alive and sends jobs to it over stdin one line at a time, reading responses from stdout. This eliminates the process-spawn overhead that Client incurs on every call.
When to use StreamClient
| Scenario | Recommended client |
|---|
| One-off script or CLI tool | Client |
| Web server handling image uploads | StreamClient |
| Background worker processing a queue | StreamClient |
| Multiple sequential operations in a request | StreamClient |
| Isolated, infrequent jobs | Client |
For server workloads where dpf is called repeatedly, StreamClient is the right choice. You start it once at startup and reuse it for the lifetime of the process.
Initialization
Call NewStreamClient with the path to the binary. It starts the Rust engine in --stream mode and returns a ready-to-use client.
sc, err := dpf.NewStreamClient("./bin/dpf")
if err != nil {
log.Fatalf("failed to start dpf: %v", err)
}
defer sc.Close()
Always call sc.Close() when your application shuts down. It closes the stdin pipe and waits for the Rust process to exit cleanly. Skipping this can leave orphaned processes and corrupt in-flight output.
Sending jobs
StreamClient.Execute serializes the job to a single JSON line, writes it to stdin, and reads the response line from stdout:
result, err := sc.Execute(&dpf.ResizeJob{
Operation: "resize",
Input: "photo.png",
OutputDir: "out/",
Widths: []uint32{320, 640, 1024},
})
if err != nil {
log.Printf("execute failed: %v", err)
return
}
if !result.Success {
log.Printf("operation failed: %s", result.Operation)
return
}
log.Printf("resized in %dms", result.ElapsedMs)
All typed convenience methods from StreamClient follow the same pattern — they call Execute internally:
// Image operations
result, err := sc.Crop(&dpf.CropJob{...})
result, err := sc.Rotate(&dpf.RotateJob{...})
result, err := sc.Watermark(&dpf.WatermarkJob{...})
result, err := sc.Adjust(&dpf.AdjustJob{...})
result, err := sc.Srcset(&dpf.SrcsetJob{...})
result, err := sc.Exif(&dpf.ExifJob{...})
// Video operations
result, err := sc.VideoTranscode(&dpf.VideoTranscodeJob{...})
result, err := sc.VideoResize(&dpf.VideoResizeJob{...})
result, err := sc.VideoTrim(&dpf.VideoTrimJob{...})
result, err := sc.VideoThumbnail(&dpf.VideoThumbnailJob{...})
result, err := sc.VideoProfile(&dpf.VideoProfileJob{...})
// Audio operations
result, err := sc.AudioTranscode(&dpf.AudioTranscodeJob{...})
result, err := sc.AudioNormalize(&dpf.AudioNormalizeJob{...})
result, err := sc.AudioTrim(&dpf.AudioTrimJob{...})
result, err := sc.AudioSilenceTrim(&dpf.AudioSilenceTrimJob{...})
Thread safety
StreamClient.Execute acquires a mutex before writing to stdin and reading from stdout, so it is safe to call from multiple goroutines concurrently:
// dpf.go — the mutex protects the stdin/stdout round-trip
func (sc *StreamClient) Execute(job any) (*JobResult, error) {
sc.mu.Lock()
defer sc.mu.Unlock()
// ...
}
Jobs are serialized: each goroutine that calls Execute blocks until its response arrives. If you need true parallel execution, use the batch operation to send multiple jobs in a single call.
Server integration example
The following pattern initializes StreamClient once at server startup and shares it across request handlers:
type MediaServer struct {
dpf *dpf.StreamClient
}
func NewMediaServer(binaryPath string) (*MediaServer, error) {
sc, err := dpf.NewStreamClient(binaryPath)
if err != nil {
return nil, err
}
return &MediaServer{dpf: sc}, nil
}
func (s *MediaServer) Shutdown() {
s.dpf.Close()
}
func (s *MediaServer) handleVideoTranscode(params json.RawMessage) (any, error) {
var req struct {
Input string `json:"input"`
Output string `json:"output"`
Codec string `json:"codec"`
Bitrate string `json:"bitrate,omitempty"`
}
if err := json.Unmarshal(params, &req); err != nil {
return nil, err
}
return s.dpf.VideoTranscode(&dpf.VideoTranscodeJob{
Input: req.Input,
Output: req.Output,
Codec: req.Codec,
Bitrate: req.Bitrate,
})
}
func (s *MediaServer) handleAudioNormalize(params json.RawMessage) (any, error) {
var req struct {
Input string `json:"input"`
Output string `json:"output"`
TargetLUFS float64 `json:"target_lufs"`
}
if err := json.Unmarshal(params, &req); err != nil {
return nil, err
}
return s.dpf.AudioNormalize(&dpf.AudioNormalizeJob{
Input: req.Input,
Output: req.Output,
TargetLUFS: req.TargetLUFS,
})
}
Error handling
Handle both the Go-level error and the operation-level Success flag:
result, err := sc.VideoProfile(&dpf.VideoProfileJob{
Input: "recording.mp4",
Output: "web.mp4",
Profile: "web-mid",
})
if err != nil {
// Process died, pipe broken, or JSON was malformed
log.Printf("stream error: %v", err)
return
}
if !result.Success {
log.Printf("transcode failed: operation=%s elapsed=%dms", result.Operation, result.ElapsedMs)
return
}
log.Printf("transcoded to %s in %dms", result.Outputs[0].Path, result.ElapsedMs)
Implementation details
NewStreamClient wires up the stdin/stdout pipes, starts the process, and wraps stdout in a 4 MB buffered reader to handle large responses efficiently:
return &StreamClient{
cmd: cmd,
stdin: stdin,
reader: bufio.NewReaderSize(stdout, 4*1024*1024), // 4MB buffer
}, nil
Close closes stdin (which signals the Rust engine to exit) and waits for the process to finish:
func (sc *StreamClient) Close() error {
sc.stdin.Close()
return sc.cmd.Wait()
}