Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

Engine is the only struct exposed at the top level of the aether package. Its surface area is intentionally small: you construct it with functional options, call Start to activate background services, submit workflows, query their state, and tear down with Stop. Every capability is opt-in — the engine makes no assumptions about your infrastructure topology.
import "github.com/BabySid/aether"

Constructor

New

func New(opts ...Option) (*Engine, error)
Creates a new Engine with the given options. Returns ErrValidation if any required dependency is missing. Required options (omitting any of these causes New to return an error):
WithStore(s store.Store)
Option
required
Sets the state persistence backend. All workflow and task run state is written here. See Store Interface.
WithExecutor(plugin executor.Plugin)
Option
required
Registers an executor plugin. Call multiple times to register multiple types. At least one executor must be registered. See Executor Interface.
WithIDGenerator(gen idgen.Generator)
Option
required
Sets the unique ID generator used to produce RunID values for workflow and task runs.
WithTaskBroker(b broker.TaskBroker)
Option
required
Sets the task dispatch bridge between the engine and workers. See Broker Interface.
Optional options:
WithExecutorRegistry(reg *executor.Registry)
Option
Sets a pre-built executor registry. Use when you already have an *executor.Registry shared with a broker and want to avoid registering each plugin twice.
WithExprEvaluator(eval expr.Evaluator)
Option
Sets the expression evaluator used for when conditions, repeatCondition expressions, phaseConditions, and retry expressions. Without this, expression fields are ignored (no conditional branching).
WithArtifactStore(a artifact.Repository)
Option
Sets the artifact repository. Currently stored but not yet wired into task execution; reserved for the artifact upload/download feature.
WithSecretStore(s secret.Provider)
Option
Sets the secret provider used to resolve valueFrom.secretKeyRef references in workflow parameters.
WithHookNotifier(h hook.Notifier)
Option
Sets the hook notification backend. When configured, the engine calls the notifier when workflow-level and task-level hooks fire.
WithErrorSink(s errsink.ErrorSink)
Option
Sets the error observation sink. The engine reports internal non-fatal errors (hook failures, store errors in void callbacks) here for external monitoring. Scheduling behaviour is identical whether or not a sink is configured.
WithTimeoutWatcher(w timeout.Watcher)
Option
Sets the timeout watchdog. Required to enforce spec.timeout and per-task timeout fields. The watchdog calls OnTaskTimeout and OnWorkflowTimeout when deadlines are exceeded.
WithVarsSource(p vars.Source)
Option
Registers a global variable source available in all workflow templates. Call multiple times to layer multiple providers (per-call providers have higher priority). Intended for stable, run-independent data such as system metadata.
WithWorkerRegistry(r worker.Registry)
Option
Sets the worker registration and discovery backend. Used by distributed broker implementations to route tasks to workers that support a given executor type.
WithCronScheduler(s cron.Scheduler)
Option
Sets the cron scheduling backend. Required to use any *CronWorkflow method. Without it, those methods return ErrNotSupported.
Example:
engine, err := aether.New(
    aether.WithStore(memstore.New()),
    aether.WithExecutor(myPlugin),
    aether.WithIDGenerator(idgen.NewAtomic()),
    aether.WithTaskBroker(myBroker),
    aether.WithTimeoutWatcher(timeout.NewPolling(5*time.Second)),
    aether.WithCronScheduler(cron.NewScheduler()),
)
if err != nil {
    log.Fatal(err)
}

Workflow lifecycle

Submit

func (e *Engine) Submit(ctx context.Context, wf *model.Workflow) (string, error)
Validates, persists, and dispatches a workflow for execution. Returns the runID string that identifies this specific execution. The call is non-blocking: it returns as soon as the entrypoint task has been dispatched to the broker. All validation (API version, kind, entrypoint resolution, template nesting depth, hook template references) occurs before any state is written to the store. If validation fails, the store is not modified.
ctx
context.Context
required
Request context. Used for store and broker calls.
wf
*model.Workflow
required
The workflow definition to execute. Must not be nil.
runID
string
System-generated unique identifier for this workflow execution. Pass to Get, Resume, and Cancel.
error
error
ErrValidation if the workflow document is structurally invalid. Store or broker errors are returned unwrapped.

Get

func (e *Engine) Get(ctx context.Context, workflowID string) (*WorkflowExecution, error)
Retrieves the current execution state of a workflow run. Non-blocking.
ctx
context.Context
required
Request context.
workflowID
string
required
The runID returned by Submit.
*WorkflowExecution
struct
Read-only snapshot of the workflow run state. See WorkflowExecution below for field documentation.
error
error
store.ErrNotFound (wrapped) if the workflowID does not exist.

Resume

func (e *Engine) Resume(ctx context.Context, workflowID string, taskID string, payload map[string]any) error
Re-dispatches a suspended task with an incremental payload. The payload is merged (last-writer-wins) into the task’s accumulated inputs so the executor receives the full history of all Resume payloads on top of the original resolved inputs. The executor then decides whether to remain suspended (return ExecCodeSuspended again) or complete. Resume is a no-op if the task is not in PhaseSuspended (e.g. already timed out, cancelled, or completed by a concurrent Resume).
ctx
context.Context
required
Request context.
workflowID
string
required
The runID of the workflow that owns the suspended task.
taskID
string
required
The RunID of the suspended TaskRun (available from WorkflowExecution.Tasks).
payload
map[string]any
required
Key/value pairs merged into the task’s inputs. Keys in payload overwrite matching parameter names; absent keys are left unchanged.
error
error
ErrInvalidState if the task does not belong to the given workflowID. store.ErrNotFound if either ID is invalid.

Cancel

func (e *Engine) Cancel(ctx context.Context, workflowID string) error
Cancels a running workflow. Non-blocking. For each non-terminal task:
  • Running / Suspended tasks: a cancellation signal is sent to the broker, then the task is transitioned to PhaseCancelled.
  • Created / Ready tasks: transitioned directly to PhaseCancelled (no broker signal needed since dispatch has not completed).
The workflow run itself is then marked Cancelled. Workflow-level and task-level onCancel hooks fire after cancellation.
ctx
context.Context
required
Request context.
workflowID
string
required
The runID to cancel.
error
error
ErrInvalidState if the workflow is already in a terminal phase. store.ErrNotFound if the ID is invalid.

Background services

Start

func (e *Engine) Start(ctx context.Context) error
Launches the engine’s background services (timeout watchdog, cron scheduler). Returns immediately; all goroutines run until Stop is called or the parent ctx is cancelled. Pair every Start call with a Stop call. If a timeout.Watcher is configured, Start begins the watchdog loop that periodically scans for tasks and workflows that have exceeded their deadlines. If a cron.Scheduler is configured, Start re-registers all non-suspended CronWorkflows loaded from the store (crash recovery) and begins the cron loop.
ctx
context.Context
required
Parent context. When cancelled, all background goroutines exit.
error
error
Returned if the timeout watcher or cron scheduler fails to start.

Stop

func (e *Engine) Stop()
Shuts down all background services started by Start. Safe to call concurrently or multiple times; only the first call takes effect. Stops the timeout watcher and cron scheduler if configured.

Task callbacks (broker integration)

These methods implement the broker.StartHandler and broker.CompletionHandler callback signatures. Local broker implementations invoke them directly; distributed brokers call them via a message consumer.

OnTaskStarted

func (e *Engine) OnTaskStarted(ctx context.Context, taskRunID string)
Invoked when a worker begins executing a task. Transitions the leaf task from Ready to Running, records StartedAt in task metrics, then walks ancestor DAG/Loop containers and the WorkflowRun upward from Ready to Running. Fires the workflow-level onStart hook (once per workflow) and the task-level onStart hook. Both the leaf transition and the ancestor walk use token-based optimistic locking so duplicate or concurrent invocations are idempotent.
ctx
context.Context
required
Request context.
taskRunID
string
required
The RunID of the started task.

OnTaskCompleted

func (e *Engine) OnTaskCompleted(ctx context.Context, result *broker.TaskResult)
Invoked when a task finishes execution. Performs the following in order:
  1. Guards against duplicate callbacks (ignores if task is not Running or Suspended).
  2. Derives Phase from ExecOutputs.Code, optionally overridden by phaseConditions.
  3. Handles the suspend fast-path (ExecCodeSuspendedPhaseSuspended, accumulates partial outputs, fires onSuspend hook, returns early).
  4. Evaluates the retry policy; if a retry is needed, resets the task to Created and re-dispatches.
  5. Computes Metrics (FinishedAt, Duration, Retries).
  6. Merges executor outputs with template-declared output defaults.
  7. Persists the final state via UpdateTaskRun.
  8. Fires task-level phase hooks.
  9. Calls advanceScope to dispatch newly-unblocked tasks or finalize the scope.
ctx
context.Context
required
Request context.
result
*broker.TaskResult
required
The execution result from the worker. See TaskResult.

Watchdog callbacks

OnTaskTimeout

func (e *Engine) OnTaskTimeout(ctx context.Context, taskRunID string)
Invoked by the timeout.Watcher when a task run has exceeded its deadline. Idempotent: if the task is already terminal the call is a no-op. Sends a best-effort cancellation signal to the broker, then calls OnTaskCompleted with ExecCodeTimeout — reusing the same completion path for hooks, retry checks, metrics, and scope advancement.
ctx
context.Context
required
Request context.
taskRunID
string
required
The timed-out task’s RunID.

OnWorkflowTimeout

func (e *Engine) OnWorkflowTimeout(ctx context.Context, workflowRunID string)
Invoked by the timeout.Watcher when a workflow run has exceeded its deadline. Idempotent and safe for concurrent invocation from multiple engine instances. Marks all non-terminal task runs as Timeout, sends cancellation signals to the broker for each, then marks the workflow run as Timeout. Fires workflow-level onExit hooks.
ctx
context.Context
required
Request context.
workflowRunID
string
required
The timed-out workflow’s RunID.

CronWorkflow operations

All methods in this section require WithCronScheduler to be configured. Without it they return ErrNotSupported.

SubmitCronWorkflow

func (e *Engine) SubmitCronWorkflow(ctx context.Context, cw *model.CronWorkflow) (string, error)
Validates and registers a CronWorkflow for periodic execution. Returns the system-generated cronID. The definition is stored as an immutable JSON snapshot; only scheduling fields can be updated later via UpdateCronWorkflow.
ctx
context.Context
required
Request context.
cw
*model.CronWorkflow
required
Must not be nil.
cronID
string
System-generated unique identifier. Pass to GetCronWorkflow, UpdateCronWorkflow, and DeleteCronWorkflow.
error
error
ErrValidation if the document fails validation. ErrNotSupported if no cron scheduler is configured.

GetCronWorkflow

func (e *Engine) GetCronWorkflow(ctx context.Context, cronID string) (*CronWorkflowExecution, error)
Returns the execution state of a CronWorkflow and all WorkflowRuns it has triggered.
ctx
context.Context
required
Request context.
cronID
string
required
The ID returned by SubmitCronWorkflow.
*CronWorkflowExecution
struct

UpdateCronWorkflow

func (e *Engine) UpdateCronWorkflow(ctx context.Context, cronID string, cw *model.CronWorkflow) error
Updates the mutable scheduling fields of a registered CronWorkflow (schedule, timezone, concurrencyPolicy, suspend, startAt, endAt, startingDeadlineSeconds, history limits). Re-registers the schedule with the cron backend automatically. WorkflowSpec is immutable. Attempting to change it returns ErrValidation with a message indicating that a delete-and-resubmit is required.
ctx
context.Context
required
Request context.
cronID
string
required
The ID of the registered cron workflow.
cw
*model.CronWorkflow
required
Updated definition. Must not be nil.

DeleteCronWorkflow

func (e *Engine) DeleteCronWorkflow(ctx context.Context, cronID string) error
Stops scheduling and removes a CronWorkflow. WorkflowRun records previously created by this cron workflow are not cascade-deleted.
ctx
context.Context
required
Request context.
cronID
string
required
The ID to delete.
error
error
store.ErrNotFound if the ID does not exist.

Return types

WorkflowExecution

The read-only return type of Engine.Get. Internal store fields (Token, raw Workflow JSON, UpdatedAt) are deliberately excluded.
type WorkflowExecution struct {
    RunID     string
    Status    model.Phase
    Message   string
    Outputs   *model.Outputs
    Metrics   *model.Metrics
    CreatedAt time.Time
    Progress  string
    Tasks     []TaskExecution
}
RunID
string
Unique identifier for this workflow execution.
Status
model.Phase
Current phase. Empty string ("") when not yet set (before first OnTaskStarted). See Phase values.
Message
string
Human-readable status message. Set on cancellation, error, or timeout.
Outputs
*model.Outputs
Workflow-level outputs. nil until the workflow finalizes. The Phase, Metrics, and Parameters embedded in Outputs reflect the final state.
Metrics
*model.Metrics
Timing metrics: StartedAt, FinishedAt, Duration (Go duration string), Retries.
CreatedAt
time.Time
Wall clock time when Submit persisted the workflow run.
Progress
string
"<completed>/<total>" task count string. Empty when no tasks exist. A task is counted as completed when it reaches any terminal phase.
Tasks
[]TaskExecution
All task runs in creation order, including intermediate DAG and Loop container runs. See TaskExecution below.

TaskExecution

The read-only view of a single task run within a workflow.
type TaskExecution struct {
    RunID         string
    WorkflowRunID string
    ParentRunID   string
    Depth         int
    Scope         string
    TaskName      string
    TemplateName  string
    TemplateType  string
    CreatedAt     time.Time
    Status        model.Phase
    Message       string
    Inputs        *model.Inputs
    Outputs       *model.Outputs
    Metrics       *model.Metrics
    RetryCount    int
}
RunID
string
Unique identifier for this task run.
WorkflowRunID
string
Parent workflow run ID.
ParentRunID
string
ID of the parent container task run (DAG or Loop). Empty string for top-level scope tasks.
Depth
int
Tree depth from the root scope. 0 = top-level.
Scope
string
Direct-parent path segment, e.g. "main-pipeline/" or "batch-review.loop[0]/". Used as part of the idempotency key in CreateTaskRun.
TaskName
string
Task name within the current scope. Unique among siblings.
TemplateName
string
Name of the referenced template definition.
TemplateType
string
One of "dag", "task", or "loop".
CreatedAt
time.Time
Wall clock creation time.
Status
model.Phase
Current phase of this task run.
Message
string
Human-readable status message.
Inputs
*model.Inputs
Resolved task inputs at dispatch time. For suspended tasks accumulates successive Resume payloads merged on top of the originals.
Outputs
*model.Outputs
Task outputs after completion.
Metrics
*model.Metrics
Task timing metrics.
RetryCount
int
Number of retries already consumed. 0 = first attempt, 1 = first retry.

CronWorkflowExecution

The read-only return type of Engine.GetCronWorkflow.
type CronWorkflowExecution struct {
    ID   string
    Runs []WorkflowExecution
}
ID
string
The cronID returned by SubmitCronWorkflow.
Runs
[]WorkflowExecution
All WorkflowRuns triggered by this cron workflow, each as a full WorkflowExecution snapshot.

Phase values

The model.Phase type is a string with the following values:
PhaseDescription
"Created"Task persisted; awaiting scheduling (dependencies not yet met).
"Ready"Engine has committed to execute; Broker.Dispatch has been called.
"Running"Worker has called StartTask; real execution has begun.
"Suspended"Executor returned ExecCodeSuspended; awaiting Resume.
"Succeeded"Completed successfully (ExecCodeSucceeded).
"Failed"Business-level failure (ExecCodeFailed).
"Error"System-level failure (ExecCodeError).
"Timeout"Exceeded the configured deadline (ExecCodeTimeout).
"Skipped"when condition evaluated to false. Engine-set only.
"Cancelled"User called Cancel or workflow was cancelled. Engine-set only.
PhaseSkipped and PhaseCancelled are set exclusively by the engine and never appear in ExecOutputs. Executor plugins must not attempt to return these codes.

Error variables

var (
    ErrInvalidState = errors.New("invalid state")
    ErrValidation   = errors.New("validation error")
    ErrNotSupported = errors.New("not supported")
)
ErrInvalidState
error
Returned when an operation is invalid for the current state, e.g. cancelling an already-terminal workflow or resuming a task that belongs to a different workflow run.
ErrValidation
error
Returned by New, Submit, SubmitCronWorkflow, and UpdateCronWorkflow when the workflow document fails structural or semantic validation.
ErrNotSupported
error
Returned by CronWorkflow methods when WithCronScheduler was not provided to New.
Use errors.Is to match these sentinel errors through wrapped chains:
if errors.Is(err, aether.ErrValidation) {
    // handle validation failure
}

Build docs developers (and LLMs) love