Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

The store.Store interface is the single source of truth for all Aether state. It aggregates four sub-interfaces — WorkflowRunStore, TaskRunStore, SchemaStore, and CronWorkflowStore — into one type that is injected into the engine via WithStore(). Implementations define the persistence strategy: in-memory, SQL, Redis, or any other backend.
The engine uses a Token-based optimistic concurrency model. Every UpdateWorkflowRun and UpdateTaskRun call passes a Token opaque uint64. Implementations that enforce concurrency control compare this token against the stored value and return ErrTokenMismatch if they differ. Single-threaded stores may ignore the token entirely.

Store (aggregate interface)

type Store interface {
    WorkflowRunStore
    TaskRunStore
    SchemaStore
    CronWorkflowStore
    Close() error
}
Close() releases any resources held by the store (connections, goroutines, etc.).

WorkflowRunStore

Manages the lifecycle of workflow-level run records.
type WorkflowRunStore interface {
    CreateWorkflowRun(ctx context.Context, run *WorkflowRun) error
    GetWorkflowRun(ctx context.Context, runID string) (*WorkflowRun, error)
    UpdateWorkflowRun(ctx context.Context, run *WorkflowRun) (*WorkflowRun, error)
    ListActiveWorkflowRuns(ctx context.Context) ([]*WorkflowRun, error)
    DeleteWorkflowRun(ctx context.Context, runID string) error
    ListWorkflowRunsByCronID(ctx context.Context, cronWorkflowID string) ([]*WorkflowRun, error)
}
Stores the initial workflow run record. Called once at submit time with immutable data.
run
*WorkflowRun
required
The initial run record. RunID, Workflow (raw JSON), and CreatedAt are set by the engine.
Retrieves a workflow run by ID. Returns ErrNotFound if it does not exist.
runID
string
required
The workflow run ID returned by Engine.Submit.
Persists mutable fields. Only non-nil pointer fields in the run argument are written; nil fields are left unchanged. Returns the post-update state on success.Uses Token-based optimistic locking — return ErrTokenMismatch on conflict.
Returns all non-terminal workflow runs that have a Deadline set. Used by the timeout watchdog.
Removes a workflow run and all its associated TaskRun records. Returns ErrNotFound if the run does not exist.
Returns all WorkflowRun records created by a given CronWorkflow. Used for concurrency policy checks and history cleanup.

TaskRunStore

Manages task run records within a workflow.
type TaskRunStore interface {
    CreateTaskRun(ctx context.Context, run *TaskRun) error
    GetTaskRun(ctx context.Context, taskRunID string) (*TaskRun, error)
    UpdateTaskRun(ctx context.Context, run *TaskRun) (*TaskRun, error)
    ListTaskRuns(ctx context.Context, workflowRunID string) ([]*TaskRun, error)
    ListTaskRunsByParent(ctx context.Context, workflowRunID, parentRunID string) ([]*TaskRun, error)
    ListActiveTaskRuns(ctx context.Context) ([]*TaskRun, error)
}
CreateTaskRun must be idempotent by the composite key (workflowRunID, parentRunID, scope, taskName). If a record with the same key already exists, implementations MUST return nil — not an error, not a duplicate. The engine relies on this for concurrent-safe scheduling.
Persists a new task run. Idempotent by composite key — repeated calls with the same key are a no-op. This is critical for loop scheduling where iterations share the same taskName but have unique Scope values.
Retrieves a single task run by its unique RunID.
Persists mutable fields using the nil-field partial update convention. Immutable fields (RunID, WorkflowRunID, ParentRunID, Scope, TaskName, etc.) are always ignored. Returns the post-update state on success.
Returns all task runs for a given workflow run, in creation order.
Returns task runs sharing the same parent container. parentRunID="" returns top-level tasks. This is the core query for scoped DAG scheduling — advanceScope uses it to find sibling tasks.
Returns all non-terminal task runs that have a Deadline set. Used by the timeout watchdog.

SchemaStore

Persists executor schemas for recovery after a master restart.
type SchemaStore interface {
    UpsertSchema(ctx context.Context, workerID string, schema model.ExecutorSchema) error
    ListSchemas(ctx context.Context) ([]SchemaRecord, error)
    DeleteSchema(ctx context.Context, execType, workerID string) error
}
workerID="" in UpsertSchema marks a schema as an orphan (loaded during recovery). DeleteSchema accepts optional filters — empty string means no restriction on that dimension.

CronWorkflowStore

Manages CronWorkflow persistence.
type CronWorkflowStore interface {
    CreateCronWorkflow(ctx context.Context, record *CronWorkflowRecord) error
    GetCronWorkflow(ctx context.Context, cronID string) (*CronWorkflowRecord, error)
    UpdateCronWorkflow(ctx context.Context, record *CronWorkflowRecord) error
    DeleteCronWorkflow(ctx context.Context, cronID string) error
    ListCronWorkflows(ctx context.Context) ([]*CronWorkflowRecord, error)
}
DeleteCronWorkflow does NOT cascade-delete associated WorkflowRun records. ListCronWorkflows is called during Engine.Start() to re-register schedules after a crash.

Persistent model types

WorkflowRun

type WorkflowRun struct {
    // Immutable (set at creation)
    RunID          string
    Workflow       json.RawMessage // raw workflow JSON snapshot
    CreatedAt      time.Time
    CronWorkflowID string          // non-empty when created by a CronWorkflow trigger

    // Mutable (nil = do not modify in UpdateWorkflowRun)
    Status   *model.Phase
    Message  *string
    Outputs  *model.Outputs
    Metrics  *model.Metrics
    Deadline *time.Time  // absolute timeout; nil = no workflow-level deadline

    // Internal
    Token     uint64
    UpdatedAt time.Time
}

TaskRun

type TaskRun struct {
    // Immutable (set at creation)
    RunID         string
    WorkflowRunID string
    ParentRunID   string   // parent container TaskRun ID; "" = top-level scope
    Depth         int      // tree depth; 0 = top-level
    Scope         string   // path segment, e.g. "main-pipeline/"
    TaskName      string   // task name within current scope
    TemplateName  string   // referenced template name
    TemplateType  string   // "dag" / "task" / "loop"
    CreatedAt     time.Time

    // Mutable (nil = do not modify in UpdateTaskRun)
    Inputs     *model.Inputs
    Status     *model.Phase
    Message    *string
    Outputs    *model.Outputs
    Metrics    *model.Metrics
    RetryCount *int       // retries consumed; 0 = first attempt
    Deadline   *time.Time // absolute timeout; nil = no task-level deadline

    // Internal
    Token     uint64
    UpdatedAt time.Time
}

Error sentinels

ErrNotFound
error
Returned when a requested resource does not exist.
ErrTokenMismatch
error
Returned when an Update call’s token does not match the stored token. Signals normal optimistic-lock contention — not an infrastructure failure. The engine treats this as a no-op for concurrent updates.

Inject with WithStore

engine, err := aether.New(
    aether.WithStore(myStore), // required
    // ...
)

Build docs developers (and LLMs) love