Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

Aether requires you to provide four interface implementations before the engine will accept a workflow: a store.Store for state persistence, a broker.TaskBroker for task dispatch, at least one executor.Plugin for execution logic, and an idgen.Generator for unique IDs. This guide walks through each one using the same in-memory implementations the playground CLI uses for integration testing, so every snippet below is taken directly from working code.
1

Install the module

Add the module to your Go project. The core engine has zero external dependencies, so this adds nothing to your transitive dependency tree beyond the Aether packages themselves.
go get github.com/BabySid/aether
Verify the import path resolves:
go list github.com/BabySid/aether
2

Implement an executor plugin

An executor.Plugin is what actually runs your task logic. It exposes a Type() string (matched against executor.type in the workflow JSON) and an Execute method that receives resolved inputs and returns outputs.The echo executor below is the same one used throughout the playground examples. It reads an outputs input parameter to discover what to return, making it useful for testing any workflow shape without real backend logic:
package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/BabySid/aether/executor"
    "github.com/BabySid/aether/model"
)

// EchoExecutor echoes its inputs back as outputs.
// Control parameters: "outputs" declares what the executor should return.
type EchoExecutor struct{}

func (e *EchoExecutor) Type() string { return "echo" }

func (e *EchoExecutor) Schema() model.ExecutorSchema {
    return executor.SchemaOf[executor.DynamicOutputs, executor.DynamicOutputs](
        "echo", "1.0", "Echoes inputs and produces declared outputs",
    )
}

func (e *EchoExecutor) Execute(_ context.Context, req *executor.ExecuteRequest) (*model.ExecOutputs, error) {
    var declaredOutputs []struct {
        Name  string          `json:"name"`
        Type  string          `json:"type"`
        Value json.RawMessage `json:"value,omitempty"`
    }

    if req.Inputs != nil {
        for _, p := range req.Inputs.Parameters {
            if p.Name == "outputs" {
                _ = json.Unmarshal(p.Value, &declaredOutputs)
            }
        }
    }

    var params []model.Parameter
    for _, out := range declaredOutputs {
        params = append(params, model.Parameter{
            Name:  out.Name,
            Type:  out.Type,
            Value: out.Value,
        })
    }

    log.Printf("[echo] task=%s outputs=%d", req.TaskName, len(params))
    return &model.ExecOutputs{Parameters: params}, nil
}
The executor.Plugin interface is the only place your domain logic lives. The engine never calls your business logic directly — it only dispatches task assignments to the broker, and the broker hands them to workers that call Execute.
3

Implement the local broker

The broker.TaskBroker bridges the engine and your workers. The engine calls Dispatch when a task is ready; the broker delivers it to a worker; the worker calls StartTask and CompleteTask to report progress back.The channel-based LocalBroker below is modeled directly on the playground implementation:
package main

import (
    "context"
    "fmt"
    "sync"

    "github.com/BabySid/aether/broker"
)

// LocalBroker routes tasks between the engine and local goroutine workers.
type LocalBroker struct {
    startHandler broker.StartHandler
    handler      broker.CompletionHandler

    mu      sync.Mutex
    closed  bool
    taskCh  chan *broker.TaskAssignment
    cancels map[string]context.CancelFunc
}

func NewLocalBroker(
    start broker.StartHandler,
    complete broker.CompletionHandler,
) *LocalBroker {
    return &LocalBroker{
        startHandler: start,
        handler:      complete,
        taskCh:       make(chan *broker.TaskAssignment, 64),
        cancels:      make(map[string]context.CancelFunc),
    }
}

func (b *LocalBroker) Dispatch(ctx context.Context, a *broker.TaskAssignment) error {
    taskCtx, cancel := context.WithCancel(ctx)
    b.mu.Lock()
    if b.closed {
        b.mu.Unlock()
        cancel()
        return fmt.Errorf("broker is closed")
    }
    b.cancels[a.TaskRunID] = cancel
    b.mu.Unlock()
    select {
    case b.taskCh <- a:
        return nil
    case <-ctx.Done():
        cancel()
        return ctx.Err()
    }
    _ = taskCtx
}

func (b *LocalBroker) Cancel(_ context.Context, taskRunID string) error {
    b.mu.Lock()
    cancel, ok := b.cancels[taskRunID]
    b.mu.Unlock()
    if ok {
        cancel()
    }
    return nil
}

func (b *LocalBroker) FetchTask(ctx context.Context, _ string) (*broker.TaskAssignment, error) {
    select {
    case a, ok := <-b.taskCh:
        if !ok {
            return nil, fmt.Errorf("broker closed")
        }
        return a, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func (b *LocalBroker) StartTask(ctx context.Context, taskRunID string, _ string) error {
    if b.startHandler != nil {
        b.startHandler(ctx, taskRunID)
    }
    return nil
}

func (b *LocalBroker) CompleteTask(ctx context.Context, result *broker.TaskResult) error {
    b.mu.Lock()
    delete(b.cancels, result.TaskRunID)
    b.mu.Unlock()
    if b.handler != nil {
        b.handler(ctx, result)
    }
    return nil
}
The startHandler and handler callbacks wire directly back into the engine’s OnTaskStarted and OnTaskCompleted methods. This is the only coupling between the broker and the engine — callbacks, not direct method calls.
4

Implement an ID generator

The idgen.Generator produces unique string IDs for workflow runs and task runs. The atomic counter below matches the playground’s AtomicIDGen:
package main

import (
    "fmt"
    "sync/atomic"

    "github.com/BabySid/aether/idgen"
)

// AtomicIDGen generates monotonically increasing string IDs.
type AtomicIDGen struct {
    counter atomic.Uint64
}

func NewAtomicIDGen() *AtomicIDGen { return &AtomicIDGen{} }

func (g *AtomicIDGen) Generate(_ idgen.Context) string {
    return fmt.Sprintf("%d", g.counter.Add(1))
}
For production use, replace this with a UUID generator or a distributed ID scheme such as Snowflake, depending on your consistency requirements.
5

Wire the engine

With all four required dependencies ready, construct the engine using functional options. The var eng *aether.Engine is declared before NewLocalBroker so that the callbacks can close over eng once it is assigned:
package main

import (
    "context"
    "log"
    "time"

    aether "github.com/BabySid/aether"
    "github.com/BabySid/aether/broker"
    "github.com/BabySid/aether/executor"
)

func buildEngine(ctx context.Context) (*aether.Engine, chan struct{}) {
    // Register executors in a registry shared with the broker.
    reg := executor.NewRegistry()
    _ = reg.Register(&EchoExecutor{})

    memStore := NewMemoryStore() // your store.Store implementation

    finishCh := make(chan struct{}, 64)

    // eng is declared before NewLocalBroker so the callbacks can close over it.
    var eng *aether.Engine

    brok := NewLocalBroker(
        func(ctx context.Context, taskRunID string) {
            eng.OnTaskStarted(ctx, taskRunID)
        },
        func(ctx context.Context, result *broker.TaskResult) {
            eng.OnTaskCompleted(ctx, result)
            select {
            case finishCh <- struct{}{}:
            default:
            }
        },
    )

    // Start a goroutine that pulls tasks from the broker and executes them.
    go func() {
        for {
            assignment, err := brok.FetchTask(ctx, "local-worker")
            if err != nil {
                return // context cancelled or broker closed
            }
            go func(a *broker.TaskAssignment) {
                _ = brok.StartTask(ctx, a.TaskRunID, "local-worker")
                plugin, _ := reg.Get(a.ExecutorType)
                outputs, execErr := plugin.Execute(ctx, &executor.ExecuteRequest{
                    TaskRunID:    a.TaskRunID,
                    TaskName:     a.TaskName,
                    TemplateName: a.TemplateName,
                    Inputs:       a.Inputs,
                })
                result := &broker.TaskResult{
                    TaskRunID:     a.TaskRunID,
                    WorkflowRunID: a.WorkflowRunID,
                }
                if execErr != nil {
                    result.ExecOutputs = &model.ExecOutputs{
                        Code:    model.ExecCodeError,
                        Message: execErr.Error(),
                    }
                } else {
                    result.ExecOutputs = outputs
                }
                _ = brok.CompleteTask(ctx, result)
            }(assignment)
        }
    }()

    var err error
    eng, err = aether.New(
        aether.WithStore(memStore),
        aether.WithExecutorRegistry(reg),
        aether.WithIDGenerator(NewAtomicIDGen()),
        aether.WithTaskBroker(brok),
    )
    if err != nil {
        log.Fatalf("create engine: %v", err)
    }

    if err := eng.Start(ctx); err != nil {
        log.Fatalf("start engine: %v", err)
    }

    return eng, finishCh
}
eng.Start(ctx) must be called before submitting workflows. It launches background services including the timeout watchdog (if configured) and the cron scheduler (if configured). Pair every Start call with a deferred eng.Stop().
6

Define a workflow document

The following JSON matches 01-single-task.json from the playground examples. It submits a single greet task using the echo executor, passes username from workflow arguments, and declares five typed output parameters:
{
  "apiVersion": "aether/v1",
  "kind": "Workflow",
  "metadata": {
    "name": "single-task-all-types"
  },
  "spec": {
    "entrypoint": "greet",
    "arguments": {
      "parameters": [
        { "name": "username", "type": "string", "value": "Alice" }
      ]
    },
    "templates": [
      {
        "task": {
          "name": "greet",
          "inputs": {
            "parameters": [
              {
                "name": "username",
                "type": "string",
                "valueFrom": { "parameter": "workflow.arguments.parameters.username" }
              },
              {
                "name": "outputs",
                "value": [
                  { "name": "message", "type": "string", "value": "Hello, Alice!" },
                  { "name": "code",    "type": "int",    "value": 200 },
                  { "name": "ok",      "type": "bool",   "value": true },
                  { "name": "tags",    "type": "array",  "value": ["greet", "demo"] },
                  { "name": "meta",    "type": "object", "value": { "version": "1.0" } }
                ]
              }
            ]
          },
          "executor": { "type": "echo" },
          "outputs": {
            "parameters": [
              { "name": "message", "type": "string" },
              { "name": "code",    "type": "int" },
              { "name": "ok",      "type": "bool" },
              { "name": "tags",    "type": "array" },
              { "name": "meta",    "type": "object" }
            ]
          }
        }
      }
    ]
  }
}
The valueFrom.parameter field uses a dot-path reference to the workflow-level argument username. At dispatch time the engine resolves this reference and injects the resolved value into the task’s inputs.
7

Submit and observe the result

Parse the workflow document, submit it to the engine, then poll engine.Get until the workflow reaches a terminal phase:
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    aether "github.com/BabySid/aether"
    "github.com/BabySid/aether/model"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    eng, finishCh := buildEngine(ctx)
    defer eng.Stop()

    // Load and parse the workflow JSON.
    raw, err := os.ReadFile("single-task.json")
    if err != nil {
        log.Fatalf("read workflow: %v", err)
    }
    var wf model.Workflow
    if err := json.Unmarshal(raw, &wf); err != nil {
        log.Fatalf("parse workflow: %v", err)
    }

    // Submit the workflow. Returns immediately with a run ID.
    runID, err := eng.Submit(ctx, &wf)
    if err != nil {
        log.Fatalf("submit: %v", err)
    }
    log.Printf("submitted runID=%s", runID)

    // Poll until the workflow reaches a terminal phase.
    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()

    for {
        exec, err := eng.Get(ctx, runID)
        if err != nil {
            log.Fatalf("get: %v", err)
        }
        if exec.Status.IsTerminal() {
            printExecution(exec)
            return
        }
        select {
        case <-ctx.Done():
            log.Printf("timed out; last phase: %s", exec.Status)
            return
        case <-finishCh:
            // drain any extra signals
            for len(finishCh) > 0 {
                <-finishCh
            }
        case <-ticker.C:
        }
    }
}

func printExecution(exec *aether.WorkflowExecution) {
    fmt.Printf("Workflow %s finished: phase=%s progress=%s\n",
        exec.RunID, exec.Status, exec.Progress)
    for _, t := range exec.Tasks {
        fmt.Printf("  task=%s phase=%s\n", t.TaskName, t.Status)
        if t.Outputs != nil {
            for _, p := range t.Outputs.Parameters {
                fmt.Printf("    %s (%s) = %s\n", p.Name, p.Type, string(p.Value))
            }
        }
    }
}
A completed execution from engine.Get looks like this:
Workflow 1 finished: phase=Succeeded progress=1/1
  task=greet phase=Succeeded
    message (string) = "Hello, Alice!"
    code (int) = 200
    ok (bool) = true
    tags (array) = ["greet","demo"]
    meta (object) = {"version":"1.0"}
engine.Get is non-blocking and returns the current snapshot of the workflow and all its task runs. The WorkflowExecution.Tasks slice is ordered by creation time, so DAG tasks appear in the order they were dispatched.

What a WorkflowExecution contains

engine.Get returns a *aether.WorkflowExecution, which is the read-only projection of the workflow’s current state. All store-internal fields (tokens, raw JSON, deadline timestamps) are excluded — only the fields useful for observing and reporting are exposed:
type WorkflowExecution struct {
    RunID     string
    Status    model.Phase    // "" until first transition
    Message   string         // human-readable status message
    Outputs   *model.Outputs // workflow-level outputs, nil until finalized
    Metrics   *model.Metrics // workflow-level timing metrics
    CreatedAt time.Time
    Progress  string          // "completed/total", empty when no tasks yet
    Tasks     []TaskExecution // all task runs, in creation order
}
Each entry in Tasks is a TaskExecution:
type TaskExecution struct {
    RunID         string
    WorkflowRunID string
    ParentRunID   string // "" for top-level scope tasks
    Depth         int
    Scope         string
    TaskName      string
    TemplateName  string
    TemplateType  string // "task", "dag", or "loop"
    CreatedAt     time.Time
    Status        model.Phase
    Message       string
    Inputs        *model.Inputs
    Outputs       *model.Outputs
    Metrics       *model.Metrics
    RetryCount    int
}

Next steps

Architecture

Understand the scheduler internals, scope tree, and all ten extension interfaces

DAG workflows

Build multi-step pipelines with parallel execution and parameter passing

Executor plugin

Implement a production-grade executor with full schema validation

Store interface

Replace the in-memory store with a persistent database-backed implementation

Build docs developers (and LLMs) love