Use this file to discover all available pages before exploring further.
MLIP Arena uses Prefect as its workflow engine. Prefect handles task caching, parallel execution, state tracking, and integration with HPC schedulers — without requiring changes to your simulation code.
Orchestrates multiple tasks. Entry point for execution.
In MLIP Arena, every simulation operation (OPT, EOS, MD, etc.) is a @task. Benchmark scripts wrap those tasks in a @flow to run them in parallel across models and structures.
To run calculations concurrently, call .submit() on the task instead of calling it directly. .submit() returns a PrefectFuture immediately and dispatches the work to a Prefect worker.Wrap all .submit() calls inside a @flow so Prefect can track and schedule them:
# .github/README.md (lines 122–136)from prefect import flowfrom mlip_arena.models import MLIPEnumfrom mlip_arena.tasks import MDfrom mlip_arena.tasks.utils import get_calculatorfrom ase.build import bulkatoms = bulk("Cu", "fcc", a=3.6) * (5, 5, 5)@flowdef run_all_models(): futures = [] for model in MLIPEnum: future = MD.submit( atoms=atoms, calculator=get_calculator(model), ensemble="nvt", total_time=1e3, time_step=2, ) futures.append(future) return [f.result(raise_on_failure=False) for f in futures]results = run_all_models()
raise_on_failure=False lets you collect all results even if some models fail. Inspect the returned State objects to identify which models succeeded.
The homonuclear_diatomics flow in mlip_arena/flows/diatomics.py is a production example that parallelizes energy curve calculations across all 118 elements:
# mlip_arena/flows/diatomics.py (lines 260–285)from prefect import flow, taskfrom prefect.futures import waitfrom mlip_arena.models import REGISTRY, MLIPEnumfrom mlip_arena.tasks.utils import get_calculatorfrom ase.data import chemical_symbols@taskdef homonuclear_diatomic(symbol: str, calculator, out_dir): """Calculate the potential energy curve for one diatomic.""" # ... scans interatomic distances and writes trajectory@taskdef analyze(out_dir): """Compute physics metrics from all saved trajectories.""" # ... returns a DataFrame with conservation, tortuosity, Spearman metrics@flowdef homonuclear_diatomics(model: str, run_dir=None): futures = [] for symbol in chemical_symbols[1:]: # H through Og calculator = get_calculator(model) future = homonuclear_diatomic.submit( symbol, calculator, out_dir=out_dir, ) futures.append(future) wait(futures) # block until all 118 calculations complete df = analyze(out_dir) df["method"] = model_name df.to_json(out_dir / "homonuclear-diatomics.json", orient="records") return [f.result(raise_on_failure=False) for f in futures]
Key patterns in this flow:
@task on individual per-element calculations.
@flow wraps the loop and calls .submit() on each task.
wait(futures) blocks until all futures complete before the analyze task runs.
Results are collected with raise_on_failure=False to tolerate partial failures.