Skip to main content
warp-md tools support NDJSON (Newline-Delimited JSON) streaming for real-time progress monitoring. Agents can parse events from stderr to track operation status, report progress to users, and estimate completion times.
Why streaming?
  • Long-running operations - Molecular packing can take minutes
  • User feedback - Agents can report “45% complete” instead of just spinning
  • Timeout detection - Detect stuck processes via lack of events
  • Resource monitoring - Track iteration counts, convergence rates

Enabling Streaming

Add the --stream flag to any warp-md CLI command:
warp-pack --config pack.yaml --stream
warp-pep build -s ACDEFG --stream
warp-md run config.json --stream ndjson
Events are emitted to stderr (one JSON object per line), leaving stdout clean for the actual output.

Event Reference

warp-pack Events

EventFieldsDescription
pack_startedtotal_molecules, box_size, output_pathInitial configuration
phase_startedphase, total_molecules, max_iterationsPhase begins
molecule_placedmolecule_index, total_molecules, progress_pctCore placement progress
gencan_iterationiteration, obj_value, pg_sup, progress_pct, eta_msOptimization iteration
phase_completephase, elapsed_ms, final_obj_valuePhase finished
pack_completetotal_atoms, elapsed_ms, profile_msFinal result
errorcode, messageError occurred

Phases

  • template_load - Loading input structures
  • core_placement - Initial molecule placement
  • movebad - Stochastic refinement passes
  • gencan - Gradient-based optimization
  • relax - Final overlap relaxation

warp-pep Events

EventFieldsDescription
operation_startedoperation, total_residues, total_mutationsBuild/mutate starts
mutation_completemutation_index, mutation_spec, progress_pctEach mutation
operation_completetotal_atoms, elapsed_msFinal result
errorcode, messageError occurred

warp-md run Events

EventFieldsDescription
run_startedrun_id, analysis_count, progress_pct, eta_msConfig run begins
analysis_startedindex, analysis, out, progress_pctSingle analysis starts
analysis_completedindex, analysis, status, timing_msSingle analysis done
analysis_failedindex, analysis, errorSingle analysis failed
run_completedfinal_envelopeAll analyses complete
run_failedfinal_envelopeRun failed

Example Stream Output

warp-pack

{"event":"pack_started","total_molecules":150,"box_size":[50,50,50],"output_path":"out.pdb"}
{"event":"phase_started","phase":"template_load","total_molecules":150}
{"event":"phase_complete","phase":"template_load","elapsed_ms":125}
{"event":"phase_started","phase":"core_placement","total_molecules":150}
{"event":"molecule_placed","molecule_index":10,"total_molecules":150,"progress_pct":6.7}
{"event":"molecule_placed","molecule_index":20,"total_molecules":150,"progress_pct":13.3}
{"event":"phase_complete","phase":"core_placement","elapsed_ms":5420}
{"event":"phase_started","phase":"gencan","max_iterations":1000}
{"event":"gencan_iteration","iteration":10,"max_iterations":1000,"obj_value":1.234e-2,"pg_sup":0.15,"progress_pct":1.0,"eta_ms":495000}
{"event":"gencan_iteration","iteration":100,"max_iterations":1000,"obj_value":2.1e-3,"pg_sup":0.02,"progress_pct":10.0,"eta_ms":45000}
{"event":"phase_complete","phase":"gencan","elapsed_ms":45000,"final_obj_value":1.5e-4}
{"event":"pack_complete","total_atoms":4500,"total_molecules":150,"elapsed_ms":52000,"profile_ms":{"templates":125,"place_core":5420,"gencan":45000,"relax":0}}

warp-md run

{"event":"run_started","run_id":"batch-001","analysis_count":3,"completed":0,"total":3,"progress_pct":0.0}
{"event":"analysis_started","index":0,"analysis":"rg","out":"outputs/rg.npz","progress_pct":0.0}
{"event":"analysis_completed","index":0,"analysis":"rg","status":"ok","timing_ms":1250,"progress_pct":33.3}
{"event":"analysis_started","index":1,"analysis":"rmsd","out":"outputs/rmsd.npz","progress_pct":33.3}
{"event":"analysis_completed","index":1,"analysis":"rmsd","status":"ok","timing_ms":2100,"progress_pct":66.7}
{"event":"run_completed","final_envelope":{"status":"ok","exit_code":0,"analysis_count":3}}

Python Integration

Basic Event Parsing

import subprocess
import json

proc = subprocess.Popen(
    ["warp-pack", "--config", "pack.yaml", "--stream"],
    stderr=subprocess.PIPE,
    stdout=subprocess.PIPE,
    text=True
)

for line in proc.stderr:
    event = json.loads(line)
    event_type = event["event"]

    if event_type == "gencan_iteration":
        pct = event["progress_pct"]
        print(f"Optimization: {pct:.1f}%")
    elif event_type == "pack_complete":
        atoms = event["total_atoms"]
        print(f"Done! {atoms} atoms")

proc.wait()

Real-Time Progress Tracking

import subprocess
import json
import sys

def run_with_progress(cmd):
    """Run command with real-time progress reporting."""
    proc = subprocess.Popen(
        cmd,
        stderr=subprocess.PIPE,
        stdout=subprocess.PIPE,
        text=True
    )
    
    for line in proc.stderr:
        event = json.loads(line)
        event_type = event["event"]
        
        if event_type == "molecule_placed":
            idx = event["molecule_index"]
            total = event["total_molecules"]
            pct = event["progress_pct"]
            print(f"\rPlaced {idx}/{total} ({pct:.1f}%)", end="", file=sys.stderr)
        
        elif event_type == "gencan_iteration":
            iteration = event["iteration"]
            obj_val = event["obj_value"]
            pct = event["progress_pct"]
            eta_ms = event.get("eta_ms")
            eta_s = eta_ms / 1000 if eta_ms else None
            print(f"\rIter {iteration}: f={obj_val:.2e} ({pct:.1f}%) ETA: {eta_s:.1f}s", 
                  end="", file=sys.stderr)
        
        elif event_type == "pack_complete":
            atoms = event["total_atoms"]
            elapsed = event["elapsed_ms"] / 1000
            print(f"\nComplete: {atoms} atoms in {elapsed:.1f}s", file=sys.stderr)
    
    proc.wait()
    return proc.returncode

# Usage
run_with_progress(["warp-pack", "--config", "pack.json", "--stream"])

Agent Framework Integration

from langchain_core.tools import StructuredTool
import subprocess
import json

def pack_molecules(config: str, output: str) -> str:
    """Pack molecules with progress tracking."""
    proc = subprocess.Popen(
        ["warp-pack", "--config", config, "--output", output, "--stream"],
        stderr=subprocess.PIPE,
        stdout=subprocess.PIPE,
        text=True
    )
    
    for line in proc.stderr:
        event = json.loads(line)
        if event["event"] == "molecule_placed":
            print(f"Progress: {event['progress_pct']:.1f}%")
    
    proc.wait()
    return f"Packed to {output}"

tool = StructuredTool.from_function(
    func=pack_molecules,
    name="warp_pack_streaming",
    description="Molecular packing with real-time progress",
)

Error Handling

All errors emit structured events:
{"event":"error","code":"E_CONFIG_VALIDATION","message":"invalid YAML: ..."}
{"event":"error","code":"E_PLACEMENT_FAILED","message":"failed to place structure water.pdb"}

Error Codes

CodeMeaningRecovery
E_CONFIG_VALIDATIONInvalid config fileFix YAML/JSON syntax
E_PLACEMENT_FAILEDCannot place moleculeIncrease box size or reduce molecule count
E_FILE_NOT_FOUNDMissing input fileCheck file paths
E_INTERNALUnexpected errorReport bug

Best Practices

Always Stream

Use --stream in agent contexts to enable monitoring and timeout detection.

Parse Line-by-Line

Don’t buffer entire output - process events as they arrive.

Monitor ETA

Use eta_ms fields to estimate completion time for users.

Check Final Event

Confirm success via pack_complete/operation_complete events.

Performance Considerations

  • Streaming overhead: less than 1% CPU, minimal memory
  • Event frequency: ~1-10 events/second during optimization
  • No performance impact on core computation

See Also

Build docs developers (and LLMs) love