Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

The store is Aether’s single source of truth. Every phase transition, input resolution, output merge, and timeout deadline is written through store.Store. The engine never caches state locally — it reads what it needs, mutates through the store, and trusts that the store’s version of reality is authoritative. This makes the engine restartable and, in distributed deployments, replaceable behind any consistent backend.

The Store interface

store.Store is an aggregated interface that composes four focused sub-interfaces plus a lifecycle method:
// store/store.go
type Store interface {
    WorkflowRunStore
    TaskRunStore
    SchemaStore
    CronWorkflowStore
    Close() error
}
You must implement all four sub-interfaces. Partial implementations will fail to compile.

Sentinel errors

Two sentinel errors signal expected, non-exceptional conditions:
var ErrNotFound = errors.New("not found")
var ErrTokenMismatch = errors.New("token mismatch")
Return errors that wrap these values (using fmt.Errorf("...: %w", store.ErrNotFound)) so callers can use errors.Is to distinguish them from infrastructure failures.

Optimistic concurrency: the Token model

All mutable records carry an opaque uint64 field called Token. The framework passes the token it read from the last Get call into every subsequent Update call unchanged. The store implementation decides what this means:
  • A monotonic counter (e.g. increment on every write) gives strict optimistic locking — mismatched tokens return ErrTokenMismatch.
  • A no-op (always accept) is valid for single-threaded or test implementations.
MemoryStore uses a monotonic counter, incrementing Token on every successful update and returning ErrTokenMismatch if the token passed by the caller does not match the current stored value.
The token is opaque to the engine: it never interprets the numeric value. Your implementation defines the semantics — the engine only guarantees that it echoes back whatever token it last received.

WorkflowRunStore

Manages the lifecycle of workflow run records.
type WorkflowRunStore interface {
    CreateWorkflowRun(ctx context.Context, run *WorkflowRun) error
    GetWorkflowRun(ctx context.Context, runID string) (*WorkflowRun, error)
    UpdateWorkflowRun(ctx context.Context, run *WorkflowRun) (*WorkflowRun, error)
    ListActiveWorkflowRuns(ctx context.Context) ([]*WorkflowRun, error)
    DeleteWorkflowRun(ctx context.Context, runID string) error
    ListWorkflowRunsByCronID(ctx context.Context, cronWorkflowID string) ([]*WorkflowRun, error)
}
CreateWorkflowRun
method
Stores the initial workflow run record. RunID must be unique; duplicates should return an error. Workflow (the raw JSON) and CreatedAt are set here and never modified again.
GetWorkflowRun
method
Retrieves a workflow run by ID. Returns ErrNotFound (wrapped) if missing. Return a copy — the caller may mutate the returned struct.
UpdateWorkflowRun
method
Persists mutable fields. Only non-nil pointer fields in run are written; nil fields are left unchanged (nil-field partial update convention). Must check run.Token against the stored token and return ErrTokenMismatch on mismatch. Returns the post-update state.
ListActiveWorkflowRuns
method
Returns all non-terminal workflow runs that have a Deadline set. Used by the timeout watchdog to detect overdue workflows.
DeleteWorkflowRun
method
Removes a workflow run and all its associated TaskRun records. Returns ErrNotFound if the run does not exist. Implementations must cascade-delete child task runs.
ListWorkflowRunsByCronID
method
Returns all WorkflowRun records created by the given CronWorkflow. Used by the cron controller for concurrency policy checks and history cleanup.

The WorkflowRun persistent model

// store/store.go
type WorkflowRun struct {
    // Immutable — set at creation, never modified
    RunID          string
    Workflow       json.RawMessage // raw workflow JSON snapshot
    CreatedAt      time.Time
    CronWorkflowID string // non-empty when triggered by a CronWorkflow

    // Mutable — nil means "do not modify" in UpdateWorkflowRun
    Status   *model.Phase
    Message  *string
    Outputs  *model.Outputs
    Metrics  *model.Metrics
    Deadline *time.Time

    // Internal control
    Token     uint64
    UpdatedAt time.Time
}

TaskRunStore

Manages individual task execution records.
type TaskRunStore interface {
    CreateTaskRun(ctx context.Context, run *TaskRun) error
    GetTaskRun(ctx context.Context, taskRunID string) (*TaskRun, error)
    UpdateTaskRun(ctx context.Context, run *TaskRun) (*TaskRun, error)
    ListTaskRuns(ctx context.Context, workflowRunID string) ([]*TaskRun, error)
    ListTaskRunsByParent(ctx context.Context, workflowRunID string, parentRunID string) ([]*TaskRun, error)
    ListActiveTaskRuns(ctx context.Context) ([]*TaskRun, error)
}

CreateTaskRun idempotency contract

CreateTaskRun is idempotent by composite key: (workflowRunID, parentRunID, scope, taskName). If a task run with the same four-field key already exists, the implementation must return nil rather than an error or a duplicate. The engine relies on this contract for concurrent-safe scheduling — the scheduler may attempt to create the same task run multiple times under concurrent completion events.
Violating the idempotency contract — returning an error or inserting a duplicate on a repeated CreateTaskRun call — will cause the engine to double-dispatch tasks or stall a workflow run permanently. This is the most critical correctness requirement of the TaskRunStore interface.
The Scope field is part of the key because loop iterations share the same TaskName (the body template) but each receives a unique scope string (e.g. "poll-job.loop[0]/", "poll-job.loop[1]/").
ListTaskRunsByParent
method
Returns task runs sharing the same parent container. parentRunID="" returns top-level tasks. This is the core scheduling query: advanceScope calls it to enumerate siblings within a single DAG or loop scope.
ListActiveTaskRuns
method
Returns all non-terminal task runs (including pending ones) that have a Deadline set. Pending tasks are included because the deadline is written at dispatch time, before OnTaskStarted transitions the run to Running.

The TaskRun persistent model

// store/store.go
type TaskRun struct {
    // Immutable
    RunID         string
    WorkflowRunID string
    ParentRunID   string // parent container TaskRun ID ("" = top-level scope)
    Depth         int    // tree depth (0 = top-level), created as parent.Depth + 1
    Scope         string // direct-parent path segment, e.g. "main-pipeline/"
    TaskName      string // task name within current scope
    TemplateName  string // referenced template name
    TemplateType  string // "dag" / "task" / "loop"
    CreatedAt     time.Time

    // Mutable — nil means "do not modify" in UpdateTaskRun
    Inputs     *model.Inputs
    Status     *model.Phase
    Message    *string
    Outputs    *model.Outputs
    Metrics    *model.Metrics
    RetryCount *int
    Deadline   *time.Time

    // Internal control
    Token     uint64
    UpdatedAt time.Time
}
TaskRun records form a parent-child tree via ParentRunID that mirrors the template nesting structure. This tree enables scoped scheduling (advanceScope only looks at sibling TaskRuns), variable isolation (each scope sees only its own sibling outputs), and upward propagation (when all children of a container complete, the container is finalized).

SchemaStore

Persists executor schemas for crash-recovery and distributed schema propagation.
type SchemaStore interface {
    UpsertSchema(ctx context.Context, workerID string, schema model.ExecutorSchema) error
    ListSchemas(ctx context.Context) ([]SchemaRecord, error)
    DeleteSchema(ctx context.Context, execType, workerID string) error
}

type SchemaRecord struct {
    WorkerID string
    Schema   model.ExecutorSchema
}
UpsertSchema inserts or overwrites a schema. workerID="" denotes an orphan record loaded during recovery. DeleteSchema accepts optional execType and workerID filters (empty = no restriction), combined with AND semantics — allowing GC eviction of stale executor types or cleanup when a worker goes offline.

CronWorkflowStore

Manages persistent records for scheduled (cron-based) workflows.
type CronWorkflowStore interface {
    CreateCronWorkflow(ctx context.Context, record *CronWorkflowRecord) error
    GetCronWorkflow(ctx context.Context, cronID string) (*CronWorkflowRecord, error)
    UpdateCronWorkflow(ctx context.Context, record *CronWorkflowRecord) error
    DeleteCronWorkflow(ctx context.Context, cronID string) error
    ListCronWorkflows(ctx context.Context) ([]*CronWorkflowRecord, error)
}
ListCronWorkflows is called during engine.Start() to re-register all cron schedules after a crash or restart.
type CronWorkflowRecord struct {
    // Immutable
    CronID       string
    CronWorkflow json.RawMessage // original CronWorkflow JSON
    CreatedAt    time.Time

    // Mutable
    Status    *CronWorkflowStatus
    Token     uint64
    UpdatedAt time.Time
}

type CronWorkflowStatus struct {
    LastScheduleTime   *time.Time `json:"lastScheduleTime,omitempty"`
    LastSubmittedRunID string     `json:"lastSubmittedRunID,omitempty"`
}

The nil-field partial update convention

Both UpdateWorkflowRun and UpdateTaskRun apply a nil-means-skip rule: only pointer fields that are non-nil in the update struct are written to the store. Nil fields are left unchanged. This avoids accidental overwrites when only a subset of mutable fields need updating. Example (from MemoryStore):
// store/memory_store.go (simplified)
if run.Status != nil {
    s := *run.Status
    existing.Status = &s
}
if run.Message != nil {
    s := *run.Message
    existing.Message = &s
}
// Deadline, Outputs, Metrics follow the same pattern

Engine option

// option.go
func WithStore(s store.Store) Option
WithStore is required. The engine will panic at New() time if no store is provided.
engine, err := aether.New(
    aether.WithStore(NewMemoryStore()),
    // ... other options
)

Reference implementation: MemoryStore

The playground MemoryStore is a fully compliant, thread-safe in-memory implementation. It demonstrates every contract described above and is suitable for unit tests and single-process deployments.
// cmd/playground/memory_store.go

type MemoryStore struct {
    mu           sync.RWMutex
    workflowRuns map[string]*store.WorkflowRun
    taskRuns     map[string]*store.TaskRun
    taskIndex    map[string][]*store.TaskRun   // workflowRunID → task runs
    parentIndex  map[string][]*store.TaskRun   // "wfRunID:parentRunID" → task runs
    schemas      map[string]store.SchemaRecord  // "execType::workerID" → record
    cronWorkflows map[string]*store.CronWorkflowRecord
}

func NewMemoryStore() *MemoryStore {
    return &MemoryStore{
        workflowRuns:  make(map[string]*store.WorkflowRun),
        taskRuns:      make(map[string]*store.TaskRun),
        taskIndex:     make(map[string][]*store.TaskRun),
        parentIndex:   make(map[string][]*store.TaskRun),
        schemas:       make(map[string]store.SchemaRecord),
        cronWorkflows: make(map[string]*store.CronWorkflowRecord),
    }
}
Key implementation details visible in MemoryStore:
  • Token is incremented on every successful Update call (existing.Token++).
  • UpdateWorkflowRun and UpdateTaskRun check existing.Token != run.Token and return store.ErrTokenMismatch on mismatch.
  • CreateTaskRun calls taskExistsLocked to check the composite key before inserting, returning nil (no error) on a duplicate.
  • All Get methods return copies of stored structs, including deep copies of pointer fields, to prevent callers from mutating internal state.
  • DeleteWorkflowRun cascade-deletes all TaskRun records for the workflow run from both taskRuns, taskIndex, and parentIndex.
MemoryStore also records a full-state Snapshot after every mutating operation, enabling the playground’s HTML report to replay store history step-by-step — a useful pattern for debugging and audit logging.

Build docs developers (and LLMs) love