Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/atomind-ai/mlip-arena/llms.txt

Use this file to discover all available pages before exploring further.

MLIP Arena uses Prefect as its workflow engine. Prefect handles task caching, parallel execution, state tracking, and integration with HPC schedulers — without requiring changes to your simulation code.

Flows vs tasks in Prefect

Prefect distinguishes two primitives:
PrimitiveDecoratorPurpose
Task@taskOne unit of work. Supports caching and retry.
Flow@flowOrchestrates multiple tasks. Entry point for execution.
In MLIP Arena, every simulation operation (OPT, EOS, MD, etc.) is a @task. Benchmark scripts wrap those tasks in a @flow to run them in parallel across models and structures.

Running tasks directly

You can call any task directly without a flow for single calculations:
from mlip_arena.tasks import MD
from mlip_arena.tasks.utils import get_calculator
from mlip_arena.models import MLIPEnum
from ase.build import bulk

atoms = bulk("Cu", "fcc", a=3.6) * (5, 5, 5)

result = MD(
    atoms=atoms,
    calculator=get_calculator(MLIPEnum["MACE-MP(M)"]),
    ensemble="nvt",
    dynamics="langevin",
    total_time=1e3,  # 1 ps
    time_step=2,     # fs
)

Using .submit() for parallel execution

To run calculations concurrently, call .submit() on the task instead of calling it directly. .submit() returns a PrefectFuture immediately and dispatches the work to a Prefect worker. Wrap all .submit() calls inside a @flow so Prefect can track and schedule them:
# .github/README.md (lines 122–136)
from prefect import flow
from mlip_arena.models import MLIPEnum
from mlip_arena.tasks import MD
from mlip_arena.tasks.utils import get_calculator
from ase.build import bulk

atoms = bulk("Cu", "fcc", a=3.6) * (5, 5, 5)

@flow
def run_all_models():
    futures = []
    for model in MLIPEnum:
        future = MD.submit(
            atoms=atoms,
            calculator=get_calculator(model),
            ensemble="nvt",
            total_time=1e3,
            time_step=2,
        )
        futures.append(future)

    return [f.result(raise_on_failure=False) for f in futures]

results = run_all_models()
raise_on_failure=False lets you collect all results even if some models fail. Inspect the returned State objects to identify which models succeeded.

A complete parallel benchmark flow

The homonuclear_diatomics flow in mlip_arena/flows/diatomics.py is a production example that parallelizes energy curve calculations across all 118 elements:
# mlip_arena/flows/diatomics.py (lines 260–285)
from prefect import flow, task
from prefect.futures import wait
from mlip_arena.models import REGISTRY, MLIPEnum
from mlip_arena.tasks.utils import get_calculator
from ase.data import chemical_symbols

@task
def homonuclear_diatomic(symbol: str, calculator, out_dir):
    """Calculate the potential energy curve for one diatomic."""
    # ... scans interatomic distances and writes trajectory

@task
def analyze(out_dir):
    """Compute physics metrics from all saved trajectories."""
    # ... returns a DataFrame with conservation, tortuosity, Spearman metrics

@flow
def homonuclear_diatomics(model: str, run_dir=None):
    futures = []
    for symbol in chemical_symbols[1:]:  # H through Og
        calculator = get_calculator(model)
        future = homonuclear_diatomic.submit(
            symbol,
            calculator,
            out_dir=out_dir,
        )
        futures.append(future)
    wait(futures)  # block until all 118 calculations complete

    df = analyze(out_dir)
    df["method"] = model_name
    df.to_json(out_dir / "homonuclear-diatomics.json", orient="records")
    return [f.result(raise_on_failure=False) for f in futures]
Key patterns in this flow:
  • @task on individual per-element calculations.
  • @flow wraps the loop and calls .submit() on each task.
  • wait(futures) blocks until all futures complete before the analyze task runs.
  • Results are collected with raise_on_failure=False to tolerate partial failures.

Caching behavior

All MLIP Arena tasks use the TASK_SOURCE + INPUTS cache policy:
# mlip_arena/tasks/optimize.py (lines 53–55)
from prefect.cache_policies import INPUTS, TASK_SOURCE

@task(
    name="OPT",
    cache_policy=TASK_SOURCE + INPUTS,
)
def run(atoms, calculator, ...):
    ...
This policy stores a cache key from the hash of:
  1. The task’s source code (TASK_SOURCE) — cache is invalidated when the task implementation changes.
  2. All input parameters (INPUTS) — separate results are cached for each unique (atoms, calculator, kwargs) combination.

Fresh execution

Pass refresh_cache=True via .with_options() to bypass the cache and re-run a task:
OPT_fresh = OPT.with_options(refresh_cache=True)
result = OPT_fresh(atoms=atoms, calculator=calc)

Persistent results

Pass persist_result=True to write results to a Prefect result backend. EOS uses this for intermediate OPT results:
OPT_cached = OPT.with_options(
    refresh_cache=False,
    persist_result=True,
)

Running on HPC with dask_jobqueue

For large-scale benchmarks, configure Prefect to use a dask_jobqueue worker pool that submits jobs to SLURM, PBS, or SGE:
1

Install dask-jobqueue

pip install dask-jobqueue prefect-dask
2

Configure a DaskTaskRunner

from prefect_dask import DaskTaskRunner
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    cores=8,
    memory="32GB",
    walltime="04:00:00",
    job_extra_directives=["-p gpu", "--gres=gpu:1"],
)
cluster.scale(jobs=16)  # submit 16 SLURM jobs

@flow(task_runner=DaskTaskRunner(address=cluster.scheduler_address))
def benchmark_flow():
    futures = []
    for model in MLIPEnum:
        future = MD.submit(atoms=atoms, calculator=get_calculator(model), ...)
        futures.append(future)
    return [f.result(raise_on_failure=False) for f in futures]
3

Run the flow

python benchmark_flow.py
Prefect submits each .submit() call as a Dask task, which dask_jobqueue dispatches as individual SLURM jobs.
For a practical HPC example, refer to the MD stability benchmark notebook at benchmarks/stability/temperature.ipynb.

Waiting for futures

Use prefect.futures.wait() to block until a set of futures completes before proceeding:
# mlip_arena/flows/diatomics.py (lines 280–284)
from prefect.futures import wait

futures = [task_fn.submit(...) for ... in items]
wait(futures)  # blocks here

result = post_process(out_dir)  # runs after all futures complete
This is necessary when a downstream task (like analyze) depends on the output files written by all upstream tasks.

Build docs developers (and LLMs) love