Use this file to discover all available pages before exploring further.
Aether requires you to provide four interface implementations before the engine will accept a workflow: a store.Store for state persistence, a broker.TaskBroker for task dispatch, at least one executor.Plugin for execution logic, and an idgen.Generator for unique IDs. This guide walks through each one using the same in-memory implementations the playground CLI uses for integration testing, so every snippet below is taken directly from working code.
1
Install the module
Add the module to your Go project. The core engine has zero external dependencies, so this adds nothing to your transitive dependency tree beyond the Aether packages themselves.
go get github.com/BabySid/aether
Verify the import path resolves:
go list github.com/BabySid/aether
2
Implement an executor plugin
An executor.Plugin is what actually runs your task logic. It exposes a Type() string (matched against executor.type in the workflow JSON) and an Execute method that receives resolved inputs and returns outputs.The echo executor below is the same one used throughout the playground examples. It reads an outputs input parameter to discover what to return, making it useful for testing any workflow shape without real backend logic:
package mainimport ( "context" "encoding/json" "log" "github.com/BabySid/aether/executor" "github.com/BabySid/aether/model")// EchoExecutor echoes its inputs back as outputs.// Control parameters: "outputs" declares what the executor should return.type EchoExecutor struct{}func (e *EchoExecutor) Type() string { return "echo" }func (e *EchoExecutor) Schema() model.ExecutorSchema { return executor.SchemaOf[executor.DynamicOutputs, executor.DynamicOutputs]( "echo", "1.0", "Echoes inputs and produces declared outputs", )}func (e *EchoExecutor) Execute(_ context.Context, req *executor.ExecuteRequest) (*model.ExecOutputs, error) { var declaredOutputs []struct { Name string `json:"name"` Type string `json:"type"` Value json.RawMessage `json:"value,omitempty"` } if req.Inputs != nil { for _, p := range req.Inputs.Parameters { if p.Name == "outputs" { _ = json.Unmarshal(p.Value, &declaredOutputs) } } } var params []model.Parameter for _, out := range declaredOutputs { params = append(params, model.Parameter{ Name: out.Name, Type: out.Type, Value: out.Value, }) } log.Printf("[echo] task=%s outputs=%d", req.TaskName, len(params)) return &model.ExecOutputs{Parameters: params}, nil}
The executor.Plugin interface is the only place your domain logic lives. The engine never calls your business logic directly — it only dispatches task assignments to the broker, and the broker hands them to workers that call Execute.
3
Implement the local broker
The broker.TaskBroker bridges the engine and your workers. The engine calls Dispatch when a task is ready; the broker delivers it to a worker; the worker calls StartTask and CompleteTask to report progress back.The channel-based LocalBroker below is modeled directly on the playground implementation:
package mainimport ( "context" "fmt" "sync" "github.com/BabySid/aether/broker")// LocalBroker routes tasks between the engine and local goroutine workers.type LocalBroker struct { startHandler broker.StartHandler handler broker.CompletionHandler mu sync.Mutex closed bool taskCh chan *broker.TaskAssignment cancels map[string]context.CancelFunc}func NewLocalBroker( start broker.StartHandler, complete broker.CompletionHandler,) *LocalBroker { return &LocalBroker{ startHandler: start, handler: complete, taskCh: make(chan *broker.TaskAssignment, 64), cancels: make(map[string]context.CancelFunc), }}func (b *LocalBroker) Dispatch(ctx context.Context, a *broker.TaskAssignment) error { taskCtx, cancel := context.WithCancel(ctx) b.mu.Lock() if b.closed { b.mu.Unlock() cancel() return fmt.Errorf("broker is closed") } b.cancels[a.TaskRunID] = cancel b.mu.Unlock() select { case b.taskCh <- a: return nil case <-ctx.Done(): cancel() return ctx.Err() } _ = taskCtx}func (b *LocalBroker) Cancel(_ context.Context, taskRunID string) error { b.mu.Lock() cancel, ok := b.cancels[taskRunID] b.mu.Unlock() if ok { cancel() } return nil}func (b *LocalBroker) FetchTask(ctx context.Context, _ string) (*broker.TaskAssignment, error) { select { case a, ok := <-b.taskCh: if !ok { return nil, fmt.Errorf("broker closed") } return a, nil case <-ctx.Done(): return nil, ctx.Err() }}func (b *LocalBroker) StartTask(ctx context.Context, taskRunID string, _ string) error { if b.startHandler != nil { b.startHandler(ctx, taskRunID) } return nil}func (b *LocalBroker) CompleteTask(ctx context.Context, result *broker.TaskResult) error { b.mu.Lock() delete(b.cancels, result.TaskRunID) b.mu.Unlock() if b.handler != nil { b.handler(ctx, result) } return nil}
The startHandler and handler callbacks wire directly back into the engine’s OnTaskStarted and OnTaskCompleted methods. This is the only coupling between the broker and the engine — callbacks, not direct method calls.
4
Implement an ID generator
The idgen.Generator produces unique string IDs for workflow runs and task runs. The atomic counter below matches the playground’s AtomicIDGen:
For production use, replace this with a UUID generator or a distributed ID scheme such as Snowflake, depending on your consistency requirements.
5
Wire the engine
With all four required dependencies ready, construct the engine using functional options. The var eng *aether.Engine is declared before NewLocalBroker so that the callbacks can close over eng once it is assigned:
package mainimport ( "context" "log" "time" aether "github.com/BabySid/aether" "github.com/BabySid/aether/broker" "github.com/BabySid/aether/executor")func buildEngine(ctx context.Context) (*aether.Engine, chan struct{}) { // Register executors in a registry shared with the broker. reg := executor.NewRegistry() _ = reg.Register(&EchoExecutor{}) memStore := NewMemoryStore() // your store.Store implementation finishCh := make(chan struct{}, 64) // eng is declared before NewLocalBroker so the callbacks can close over it. var eng *aether.Engine brok := NewLocalBroker( func(ctx context.Context, taskRunID string) { eng.OnTaskStarted(ctx, taskRunID) }, func(ctx context.Context, result *broker.TaskResult) { eng.OnTaskCompleted(ctx, result) select { case finishCh <- struct{}{}: default: } }, ) // Start a goroutine that pulls tasks from the broker and executes them. go func() { for { assignment, err := brok.FetchTask(ctx, "local-worker") if err != nil { return // context cancelled or broker closed } go func(a *broker.TaskAssignment) { _ = brok.StartTask(ctx, a.TaskRunID, "local-worker") plugin, _ := reg.Get(a.ExecutorType) outputs, execErr := plugin.Execute(ctx, &executor.ExecuteRequest{ TaskRunID: a.TaskRunID, TaskName: a.TaskName, TemplateName: a.TemplateName, Inputs: a.Inputs, }) result := &broker.TaskResult{ TaskRunID: a.TaskRunID, WorkflowRunID: a.WorkflowRunID, } if execErr != nil { result.ExecOutputs = &model.ExecOutputs{ Code: model.ExecCodeError, Message: execErr.Error(), } } else { result.ExecOutputs = outputs } _ = brok.CompleteTask(ctx, result) }(assignment) } }() var err error eng, err = aether.New( aether.WithStore(memStore), aether.WithExecutorRegistry(reg), aether.WithIDGenerator(NewAtomicIDGen()), aether.WithTaskBroker(brok), ) if err != nil { log.Fatalf("create engine: %v", err) } if err := eng.Start(ctx); err != nil { log.Fatalf("start engine: %v", err) } return eng, finishCh}
eng.Start(ctx) must be called before submitting workflows. It launches background services including the timeout watchdog (if configured) and the cron scheduler (if configured). Pair every Start call with a deferred eng.Stop().
6
Define a workflow document
The following JSON matches 01-single-task.json from the playground examples. It submits a single greet task using the echo executor, passes username from workflow arguments, and declares five typed output parameters:
The valueFrom.parameter field uses a dot-path reference to the workflow-level argument username. At dispatch time the engine resolves this reference and injects the resolved value into the task’s inputs.
7
Submit and observe the result
Parse the workflow document, submit it to the engine, then poll engine.Get until the workflow reaches a terminal phase:
package mainimport ( "context" "encoding/json" "fmt" "log" "os" "time" aether "github.com/BabySid/aether" "github.com/BabySid/aether/model")func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() eng, finishCh := buildEngine(ctx) defer eng.Stop() // Load and parse the workflow JSON. raw, err := os.ReadFile("single-task.json") if err != nil { log.Fatalf("read workflow: %v", err) } var wf model.Workflow if err := json.Unmarshal(raw, &wf); err != nil { log.Fatalf("parse workflow: %v", err) } // Submit the workflow. Returns immediately with a run ID. runID, err := eng.Submit(ctx, &wf) if err != nil { log.Fatalf("submit: %v", err) } log.Printf("submitted runID=%s", runID) // Poll until the workflow reaches a terminal phase. ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() for { exec, err := eng.Get(ctx, runID) if err != nil { log.Fatalf("get: %v", err) } if exec.Status.IsTerminal() { printExecution(exec) return } select { case <-ctx.Done(): log.Printf("timed out; last phase: %s", exec.Status) return case <-finishCh: // drain any extra signals for len(finishCh) > 0 { <-finishCh } case <-ticker.C: } }}func printExecution(exec *aether.WorkflowExecution) { fmt.Printf("Workflow %s finished: phase=%s progress=%s\n", exec.RunID, exec.Status, exec.Progress) for _, t := range exec.Tasks { fmt.Printf(" task=%s phase=%s\n", t.TaskName, t.Status) if t.Outputs != nil { for _, p := range t.Outputs.Parameters { fmt.Printf(" %s (%s) = %s\n", p.Name, p.Type, string(p.Value)) } } }}
A completed execution from engine.Get looks like this:
engine.Get is non-blocking and returns the current snapshot of the workflow and all its task runs. The WorkflowExecution.Tasks slice is ordered by creation time, so DAG tasks appear in the order they were dispatched.
engine.Get returns a *aether.WorkflowExecution, which is the read-only projection of the workflow’s current state. All store-internal fields (tokens, raw JSON, deadline timestamps) are excluded — only the fields useful for observing and reporting are exposed:
type WorkflowExecution struct { RunID string Status model.Phase // "" until first transition Message string // human-readable status message Outputs *model.Outputs // workflow-level outputs, nil until finalized Metrics *model.Metrics // workflow-level timing metrics CreatedAt time.Time Progress string // "completed/total", empty when no tasks yet Tasks []TaskExecution // all task runs, in creation order}
Each entry in Tasks is a TaskExecution:
type TaskExecution struct { RunID string WorkflowRunID string ParentRunID string // "" for top-level scope tasks Depth int Scope string TaskName string TemplateName string TemplateType string // "task", "dag", or "loop" CreatedAt time.Time Status model.Phase Message string Inputs *model.Inputs Outputs *model.Outputs Metrics *model.Metrics RetryCount int}