Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

Aether’s hexagonal architecture defines ten extension interfaces beyond the executor, store, and broker. Each interface isolates a single infrastructure concern behind a minimal Go contract. All of them are wired into the engine via functional options, and all are optional except where noted. This page covers each interface, its method signatures, the circumstances under which you would implement it, and the corresponding WithXxx option.

idgen.Generator — unique ID generation

Required. Every WorkflowRun and TaskRun needs a globally unique string ID. The engine calls Generate whenever it creates one of these records.
// idgen/idgen.go

type Context struct {
    WorkflowRunID string // non-empty when generating a TaskRun ID
    WorkflowKind  string // "Workflow" or "CronWorkflow"
    TaskName      string // non-empty when generating a TaskRun ID
    TemplateName  string // non-empty when generating a TaskRun ID
}

type Generator interface {
    Generate(ctx Context) string
}
The Context argument lets implementations embed structured information into the ID — for example, a UUID that encodes the workflow kind, or a Snowflake ID that embeds the task name as a prefix. Implementations that only need globally unique strings can ignore ctx entirely.When to implement: Always — WithIDGenerator is required. Your choices are UUIDs (github.com/google/uuid), Snowflake IDs, or a monotonic counter for testing.Engine option:
// option.go
func WithIDGenerator(gen idgen.Generator) Option
Playground example — AtomicIDGen:
// cmd/playground/atomic_idgen.go

type AtomicIDGen struct {
    counter atomic.Uint64
}

func NewAtomicIDGen() *AtomicIDGen { return &AtomicIDGen{} }

func (g *AtomicIDGen) Generate(_ idgen.Context) string {
    return fmt.Sprintf("%d", g.counter.Add(1))
}
AtomicIDGen generates monotonically increasing string IDs starting from "1". It is safe for concurrent use — atomic.Uint64.Add is a single instruction. In production, replace it with a UUID or a distributed ID scheme that is unique across engine replicas.
Optional. The engine uses an Evaluator for when conditions (task-level guards), repeatCondition (loop termination), phaseConditions (custom phase mapping), and valueFrom.expression (parameter computation). Without an evaluator, all expression fields must be absent or the engine returns an error.
// expr/expr.go

type Evaluator interface {
    Eval(ctx context.Context, expr string, env map[string]any) (any, error)
}
env is a flat map[string]any containing all variables in scope at the call site: workflow inputs, task outputs, loop iteration variables, and any values contributed by registered vars.Source implementations. The return value is compared as a boolean for when/repeatCondition, or used as a string/numeric value for valueFrom.expression.When to implement: Any workflow that uses when, repeatCondition, phaseConditions, or valueFrom.expression requires an evaluator. Common choices: expr-lang/expr, google/cel-go, or a simple custom parser for constrained DSLs.Engine option:
// option.go
func WithExprEvaluator(eval expr.Evaluator) Option
Playground example — SimpleEvaluator:
// cmd/playground/simple_eval.go

type SimpleEvaluator struct{}

func (e *SimpleEvaluator) Eval(_ context.Context, expr string, env map[string]any) (any, error) {
    expr = strings.TrimSpace(expr)
    if expr == "true"  { return true, nil }
    if expr == "false" { return false, nil }
    if parts := strings.SplitN(expr, "==", 2); len(parts) == 2 {
        left  := simpleResolve(strings.TrimSpace(parts[0]), env)
        right := simpleResolve(strings.TrimSpace(parts[1]), env)
        return fmt.Sprintf("%v", left) == fmt.Sprintf("%v", right), nil
    }
    if parts := strings.SplitN(expr, "!=", 2); len(parts) == 2 {
        left  := simpleResolve(strings.TrimSpace(parts[0]), env)
        right := simpleResolve(strings.TrimSpace(parts[1]), env)
        return fmt.Sprintf("%v", left) != fmt.Sprintf("%v", right), nil
    }
    return nil, fmt.Errorf("unsupported expression: %s", expr)
}
SimpleEvaluator handles true, false, a == b, and a != b. Tokens wrapped in single or double quotes are treated as string literals; bare tokens are resolved from env.
Optional. The engine calls Notify at key moments in a workflow or task lifecycle. Implement Notifier to integrate with external systems — webhooks, message buses, observability platforms, or audit logs.
// hook/hook.go

type Notifier interface {
    Notify(ctx context.Context, event *Event) error
}

type Event struct {
    HookType      Type
    Scope         Scope
    WorkflowRunID string
    WorkflowName  string
    TaskRunID     string // only for ScopeTask
    TaskName      string // only for ScopeTask
    Template      string
    Context       map[string]any
}
Hook types:
ConstantFires when
hook.OnStartA workflow or task begins
hook.OnSuccessA workflow or task succeeds
hook.OnFailureA workflow or task fails (business failure)
hook.OnErrorA workflow or task errors (system failure)
hook.OnSuspendA task is suspended (ExecCodeSuspended)
hook.OnResumeA suspended task is resumed
hook.OnCancelA workflow or task is cancelled
hook.OnExitA workflow exits (terminal, regardless of outcome)
Scope values: hook.ScopeWorkflow or hook.ScopeTask.Fan-out with CompositeNotifier: To send hooks to multiple destinations, compose notifiers:
notifier := hook.CompositeNotifier{
    slackNotifier,
    datadogNotifier,
    auditLogNotifier,
}
engine, _ := aether.New(
    aether.WithHookNotifier(notifier),
)
CompositeNotifier.Notify calls all members and joins errors — individual failures do not prevent other notifiers from running.Engine option:
// option.go
func WithHookNotifier(h hook.Notifier) Option
When to implement: When you need real-time notifications about workflow and task lifecycle events for monitoring, alerting, or downstream automation.
Optional. The Watcher detects when tasks or workflow runs exceed their configured deadlines and emits TimeoutEvent values for the engine to consume. The engine’s only job is to react to these events — it never polls the store for expired deadlines itself.
// timeout/watcher.go

type Kind int

const (
    KindTask     Kind = iota // a TaskRun has exceeded its deadline
    KindWorkflow             // a WorkflowRun has exceeded its deadline
)

type TimeoutEvent struct {
    Kind  Kind
    RunID string // TaskRun.RunID or WorkflowRun.RunID
}

type Watcher interface {
    Start(ctx context.Context) error
    Stop()
    Events() <-chan TimeoutEvent
}
Design invariants from the source:
  • Events are delivered at-least-once. The engine’s timeout handler is idempotent.
  • The watcher does not modify any state — it only discovers and emits.
  • Deadline data lives in the store (store.TaskRun.Deadline, store.WorkflowRun.Deadline), so the watcher is stateless and survives engine restarts without losing pending timeouts.
Engine option:
// option.go
func WithTimeoutWatcher(w timeout.Watcher) Option
When to implement: When any workflow or task uses the timeout field. Without a Watcher, timeout fields are stored but deadlines are never enforced.Playground example — PollingWatcher:
// cmd/playground/polling_watcher.go

type PollingWatcher struct {
    store    *MemoryStore
    interval time.Duration
    events   chan timeout.TimeoutEvent
    cancel   context.CancelFunc
}

func (w *PollingWatcher) Start(ctx context.Context) error {
    innerCtx, cancel := context.WithCancel(ctx)
    w.cancel = cancel
    go w.loop(innerCtx)
    return nil
}

func (w *PollingWatcher) Stop() {
    if w.cancel != nil { w.cancel() }
}

func (w *PollingWatcher) Events() <-chan timeout.TimeoutEvent {
    return w.events
}
PollingWatcher scans the store at a fixed interval, comparing each active entity’s Deadline to time.Now(). It emits events to a buffered channel (capacity 256) and drops events if the buffer is full (consumer too slow).
Production deployments should replace PollingWatcher with a distributed timeout mechanism backed by Redis sorted sets, a database cron job with leader election, or a dedicated deadline service — something that survives engine process restarts and works across replicas.
Optional. Artifacts are binary blobs (files, archives, reports) produced or consumed by tasks. Implement Repository to integrate with object storage, local filesystem, or an HTTP endpoint.
// artifact/artifact.go

type Repository interface {
    // Type returns the storage type identifier (e.g. "oss", "local", "http").
    Type() string

    // Upload uploads an artifact.
    Upload(ctx context.Context, config json.RawMessage, data io.Reader) error

    // Download downloads an artifact.
    Download(ctx context.Context, config json.RawMessage) (io.ReadCloser, error)
}
config is opaque JSON parsed from the workflow’s artifact.source.config field. The model layer defines three config shapes: OSSSourceConfig, LocalSourceConfig, and HTTPSourceConfig — but your implementation may define its own schema.Engine option:
// option.go
func WithArtifactStore(a artifact.Repository) Option
When to implement: When workflows use artifact inputs or outputs (artifacts fields on tasks or templates). The artifact interface is currently stored by the engine but not yet wired into the execution path — it is reserved for the upcoming artifact upload/download feature.
Optional. Implement Provider to resolve secretKeyRef parameter values at task dispatch time. This lets workflow authors reference secrets by name and key without embedding credential values in workflow documents.
// secret/secret.go

type Provider interface {
    Get(ctx context.Context, name string, key string) (string, error)
}
name is the secret’s logical name (e.g. "my-db-creds"); key is the field within that secret (e.g. "password"). This maps naturally to Kubernetes Secrets, HashiCorp Vault, AWS Secrets Manager, or any key-value secret store.A workflow parameter using a secretKeyRef looks like:
{
  "name": "db-password",
  "valueFrom": {
    "secretKeyRef": {
      "name": "my-db-creds",
      "key": "password"
    }
  }
}
Engine option:
// option.go
func WithSecretStore(s secret.Provider) Option
When to implement: Whenever workflow parameters reference secretKeyRef. Without a provider, resolution of these parameters will fail.
Optional. Source contributes a flat key→value map to the workflow evaluation environment. Variables from registered sources are available as {{namespace.key}} placeholders in workflow template parameters.
// vars/vars.go

type Source interface {
    // Namespace returns a short identifier for this source (e.g. "system", "tenant").
    Namespace() string

    // Vars returns the flat key→value map contributed by this source.
    // Called once per evaluation context build. The map must not be mutated after return.
    Vars() map[string]any
}
Vars() is called each time the evaluation environment is built, so it may return dynamically computed values. Engine-level sources have lower priority than per-call sources — if WorkflowArgsSource and a custom source both produce the same key, the per-call value wins.Built-in: SystemSource:
// vars/vars.go

type SystemSource struct{}

func (s *SystemSource) Namespace() string { return "system" }

func (s *SystemSource) Vars() map[string]any {
    return map[string]any{
        "system.os":   runtime.GOOS,
        "system.arch": runtime.GOARCH,
    }
}
Register it to make {{system.os}} and {{system.arch}} available in all workflow templates.Custom example — per-run tenant context:
type TenantSource struct {
    TenantID string
    Tier     string
}

func (s *TenantSource) Namespace() string { return "tenant" }

func (s *TenantSource) Vars() map[string]any {
    return map[string]any{
        "tenant.id":   s.TenantID,
        "tenant.tier": s.Tier,
    }
}
Engine option:
// option.go
func WithVarsSource(p vars.Source) Option
Call WithVarsSource multiple times to register multiple sources. Sources are injected into every variable resolution call for every workflow run.
engine, _ := aether.New(
    aether.WithVarsSource(&vars.SystemSource{}),
    aether.WithVarsSource(&TenantSource{TenantID: "acme", Tier: "enterprise"}),
)
When to implement: When workflows need access to runtime environment data, deployment metadata, or any stable per-engine context that does not change between task executions.
Optional. In distributed deployments, workers register themselves with the registry at startup to announce their identity and supported executor types. The master uses the registry to find eligible workers for type-based task routing.
// worker/worker.go

type Registry interface {
    // Engine side
    Get(ctx context.Context, workerID string) (*Info, error)
    List(ctx context.Context) ([]*Info, error)
    ListByExecutorType(ctx context.Context, executorType string) ([]*Info, error)

    // Worker side
    Register(ctx context.Context, info *Info) error
    Unregister(ctx context.Context, workerID string) error
    Heartbeat(ctx context.Context, workerID string, meta map[string]any) error
}

type Info struct {
    ID            string
    ExecutorTypes []string
    Schemas       []model.ExecutorSchema
    Tags          map[string]string
    RegisteredAt  time.Time
}
Workers call Register at startup with their Info (ID, supported executor types, and full schemas for each type). The engine can then call ListByExecutorType to route tasks to capable workers. Heartbeat keeps registrations alive; implementations may remove stale workers that miss heartbeats.
worker.Registry manages worker identity and capabilities. Task lifecycle (dispatch, fetch, complete) is still handled by broker.TaskBroker. These two interfaces are intentionally separate.
Engine option:
// option.go
func WithWorkerRegistry(r worker.Registry) Option
When to implement: In distributed deployments where the broker needs to route tasks to specific workers by executor type, or where you need to track which workers are alive and what they support.
Optional. ErrorSink is a non-blocking observation outlet for engine-internal errors. It never influences scheduling decisions — it exists purely to feed external monitoring and alerting systems.
// errsink/errsink.go

type Severity int

const (
    SeverityInfo     Severity = iota // expected contention (e.g. token mismatch)
    SeverityWarning                  // non-critical (hook failures, ancestor marking)
    SeverityError                    // critical path errors (dispatch failure)
    SeverityCritical                 // may cause workflow to hang permanently
)

type ErrorContext struct {
    WorkflowRunID string
    TaskRunID     string
    Operation     string // e.g. "advanceScope", "dispatchLeafTask", "hook.onSuccess"
    Severity      Severity
}

type ErrorSink interface {
    OnError(ctx context.Context, err error, ec ErrorContext)
}
Severity guide:
LevelMeaningExample
SeverityInfoExpected, benignToken mismatch from concurrent writers
SeverityWarningNon-criticalHook notification failure
SeverityErrorScheduling-path failuredispatchLeafTask error
SeverityCriticalPotential permanent hangadvanceScope failure without fallback
Engine option:
// option.go
func WithErrorSink(s errsink.ErrorSink) Option
When to implement: In production systems where you want structured visibility into engine-internal errors. Without a sink, errors on void callbacks and non-critical paths are silently discarded.Implementation requirements:
  • Must be safe for concurrent use.
  • Must return quickly. Offload long-running work (HTTP calls, disk I/O) to a background goroutine or a buffered channel to avoid blocking the engine’s scheduling loop.
Optional. Scheduler provides the scheduling backend for CronWorkflow resources. Without it, SubmitCronWorkflow, GetCronWorkflow, and related engine methods return ErrNotSupported.
// cron/scheduler.go

type Scheduler interface {
    Start(ctx context.Context) error
    Stop()
    Add(id string, schedule string, timezone string, callback func()) error
    Remove(id string)
}
Start
method
Launches the scheduler’s background services. Returns immediately. Background work runs until Stop is called. Safe to call only once; idempotent implementations may log a warning on repeated calls.
Stop
method
Shuts down background services. Safe to call multiple times; only the first call takes effect.
Add
method
Registers a cron entry. id is the CronWorkflow’s system-generated ID. schedule is a standard cron expression (five fields). timezone is an IANA timezone string (e.g. "UTC", "Asia/Shanghai"). callback is invoked on each schedule match — the engine provides this function and it submits a new WorkflowRun.
Remove
method
Unregisters a cron entry by ID. No-op if the ID is not found.
Engine lifecycle integration: Engine.Start calls Scheduler.Start; Engine.Stop calls Scheduler.Stop. During startup, the engine calls ListCronWorkflows on the store and re-registers each active cron entry with Add — this restores schedules after a crash or restart.Engine option:
// option.go
func WithCronScheduler(s cron.Scheduler) Option
When to implement: When you use CronWorkflow resources to trigger workflows on a schedule. A robust production implementation would use a distributed cron library with leader election to prevent duplicate triggers across engine replicas — for example robfig/cron for single-process, or a Redis-backed scheduler for distributed deployments.

Summary: required vs optional

These three options must be provided; aether.New will return an error without them.
InterfaceOption
store.StoreWithStore
broker.TaskBrokerWithTaskBroker
idgen.GeneratorWithIDGenerator

Build docs developers (and LLMs) love