Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

Executors are the primary extension point for Aether task logic. Every leaf task in a workflow is dispatched to an executor — a small Go struct that implements three methods and returns a typed result. The engine schedules and coordinates; executors do the work. This strict separation means you can add new task types without touching the scheduling core, and you can swap, version, or distribute executors independently.

The Plugin interface

Every executor must implement executor.Plugin:
// executor/executor.go
type Plugin interface {
    // Type returns the unique executor type identifier (e.g. "http", "shell").
    Type() string

    // Schema declares the executor's contract. Use SchemaOf[Config, Output]() to
    // derive it from struct types rather than writing field names by hand.
    Schema() model.ExecutorSchema

    // Execute runs the task. Use OutputFrom for type-safe output construction.
    Execute(ctx context.Context, req *ExecuteRequest) (*model.ExecOutputs, error)
}
Type() is the string that appears in the workflow JSON under executor.type. It must be unique across all registered plugins. Schema() is called once at registration time and cached — its result is stored in the executor registry and optionally persisted to the schema store. Execute() is called for every task dispatch.

ExecuteRequest

The engine bundles everything an executor needs into a single ExecuteRequest. Executors are self-contained — they never call back into the store.
// executor/executor.go
type ExecuteRequest struct {
    // Identifiers — useful for structured logging and distributed tracing.
    TaskRunID     string
    WorkflowRunID string
    TaskName      string
    TemplateName  string

    // Runtime inputs and constraints.
    Inputs     *model.Inputs
    Resources  *model.Resources
    Timeout    string // e.g. "30m"; empty means no deadline beyond ctx
    RetryCount int    // number of retries already consumed (0 = first attempt)
}
TaskRunID
string
Unique ID of this task run. Use it as a correlation key in logs and traces.
WorkflowRunID
string
ID of the parent workflow run. Useful when an executor needs to emit metrics scoped to a run.
TaskName
string
The task’s name within its DAG scope, as declared in the workflow document.
TemplateName
string
The name of the referenced template. Equals TaskName for inline tasks.
Inputs
*model.Inputs
Resolved input parameters after interpolation and expression evaluation. May be nil when the task declares no inputs.
Resources
*model.Resources
CPU, memory, and GPU requirements declared on the task. May be nil.
Timeout
string
Human-readable duration string (e.g. "30m", "2h"). Empty when the task has no deadline. Pass this to subprocesses or remote API calls so they can enforce their own deadline independently of the Go context.
RetryCount
int
Number of retries already consumed. 0 means this is the first attempt; 1 means the first retry.

ExecOutputs

Executors return *model.ExecOutputs:
// model/common.go
type ExecOutputs struct {
    Code       int         `json:"code,omitempty"`
    Message    string      `json:"message,omitempty"`
    Parameters []Parameter `json:"parameters,omitempty"`
    Artifacts  []Artifact  `json:"artifacts,omitempty"`
}
Code must be one of the ExecCode constants. The engine maps this code to a Phase value — executors never write Phase directly.

ExecCode values

ConstantValueEngine maps to
ExecCodeSucceeded0PhaseSucceeded
ExecCodeSuspended1PhaseSuspended
ExecCodeFailed2PhaseFailed
ExecCodeError3PhaseError
ExecCodeTimeout4PhaseTimeout
PhaseSkipped and PhaseCancelled are set exclusively by the engine and are never valid return codes for an executor.

Declaring the executor schema

Use executor.SchemaOf to derive an ExecutorSchema from Go struct types. This approach reflects struct field names and json/desc tags to populate the schema automatically — no hand-written field name strings.
// executor/schema.go
func SchemaOf[C, O any](execType, version, description string) model.ExecutorSchema
  • C — the config struct that represents the executor’s accepted inputs.
  • O — the output struct that represents the executor’s produced parameters. Use executor.DynamicOutputs when outputs are determined at runtime.
Example:
type HttpConfig struct {
    URL     string `json:"url"     desc:"Target endpoint URL"`
    Method  string `json:"method"  desc:"HTTP method (GET, POST, ...)"`
    Body    string `json:"body"    desc:"Request body (optional)"`
}

type HttpOutput struct {
    StatusCode int    `json:"status_code" desc:"HTTP response status code"`
    Body       string `json:"body"        desc:"Response body as string"`
}

func (e *HttpExecutor) Schema() model.ExecutorSchema {
    return executor.SchemaOf[HttpConfig, HttpOutput](
        "http", "1.0", "Makes an HTTP request and returns the response",
    )
}
For executors whose outputs depend on runtime inputs, use DynamicOutputs:
func (e *EchoExecutor) Schema() model.ExecutorSchema {
    return executor.SchemaOf[executor.DynamicOutputs, executor.DynamicOutputs](
        "echo", "1.0", "Echoes inputs and produces declared outputs",
    )
}

Type-safe output construction with OutputFrom

executor.OutputFrom converts a flat output struct to *model.ExecOutputs by reflecting its fields. The json tag on each field becomes the Parameter.Name.
// executor/bind.go
func OutputFrom(output any) (*model.ExecOutputs, error)
Rules:
  • Input must be a flat struct — no embedded (anonymous) fields, no nested structs.
  • Fields tagged json:"-" are skipped.
  • All field values are JSON-serialised.
type HttpOutput struct {
    StatusCode int    `json:"status_code"`
    Body       string `json:"body"`
}

func (e *HttpExecutor) Execute(ctx context.Context, req *executor.ExecuteRequest) (*model.ExecOutputs, error) {
    // ... perform HTTP call ...
    out, err := executor.OutputFrom(HttpOutput{StatusCode: resp.StatusCode, Body: string(respBody)})
    if err != nil {
        return nil, err
    }
    return out, nil
}

Reading inputs with BindInputs

executor.BindInputs is the counterpart to OutputFrom. It maps model.Inputs.Parameters into a destination struct by matching each parameter’s Name against the json tag of the corresponding field.
// executor/bind.go
func BindInputs(inputs *model.Inputs, dst any) error
func (e *HttpExecutor) Execute(ctx context.Context, req *executor.ExecuteRequest) (*model.ExecOutputs, error) {
    var cfg HttpConfig
    if err := executor.BindInputs(req.Inputs, &cfg); err != nil {
        return &model.ExecOutputs{Code: model.ExecCodeError, Message: err.Error()}, nil
    }
    // cfg.URL, cfg.Method, cfg.Body are now populated
}

The executor Registry

executor.Registry manages registered plugins, routing dispatched tasks to the correct implementation.
// executor/executor.go
type Registry struct { /* ... */ }

func NewRegistry() *Registry
func (r *Registry) Register(plugin Plugin) error
func (r *Registry) RegisterSchema(schema model.ExecutorSchema)
func (r *Registry) Get(executorType string) (Plugin, bool)
func (r *Registry) GetSchema(executorType string) (model.ExecutorSchema, bool)
func (r *Registry) Types() []string
func (r *Registry) Schemas() []model.ExecutorSchema
Register returns an error if the type is already registered. It also caches the plugin’s Schema() result. RegisterSchema registers a schema without a plugin instance — used by distributed brokers to propagate remote worker schemas to the master, enabling schema-aware validation even when executors run on remote machines.

Engine options

WithExecutor

Registers a single executor plugin. Call multiple times to register multiple types. Duplicate type names are silently ignored at option-application time; duplicates are caught at workflow validation.
engine, _ := aether.New(
    aether.WithExecutor(newHttpExecutor()),
    aether.WithExecutor(newShellExecutor()),
)

WithExecutorRegistry

Accepts a pre-built *executor.Registry. Use this when you share a registry between the engine and a broker, avoiding double-registration.
reg := executor.NewRegistry()
reg.Register(newHttpExecutor())

engine, _ := aether.New(
    aether.WithExecutorRegistry(reg),
)

The fat-assignment principle

ExecuteRequest carries every piece of information the executor needs. Workers executing tasks never call back into the store. This makes executors:
  • Independently deployable — a remote worker process needs no store connection.
  • Easily testable — call Execute with a hand-crafted ExecuteRequest in any unit test.
  • Topology-agnostic — local goroutine, separate process, or remote machine: the executor code is identical.

Triggering suspension

An executor can pause a task mid-execution and wait for an external signal by returning ExecCodeSuspended:
func (e *ApprovalExecutor) Execute(ctx context.Context, req *executor.ExecuteRequest) (*model.ExecOutputs, error) {
    if !isResumed(req.Inputs) {
        // First call: register for approval and suspend
        registerApprovalRequest(req.TaskRunID)
        return &model.ExecOutputs{
            Code:    model.ExecCodeSuspended,
            Message: "awaiting human approval",
        }, nil
    }
    // Resume call: inputs contain the reviewer's decision
    decision := extractDecision(req.Inputs)
    return executor.OutputFrom(ApprovalOutput{Approved: decision})
}
When the external event arrives, call engine.Resume(ctx, taskRunID, additionalInputs). The engine merges the new inputs on top of the original resolved inputs (last-writer-wins) and re-dispatches the task.

Complete example: EchoExecutor

The playground’s EchoExecutor is a working reference implementation. It echoes inputs as outputs and supports three control parameters:
// cmd/playground/echo.go

type EchoExecutor struct{}

func (e *EchoExecutor) Type() string { return "echo" }

func (e *EchoExecutor) Schema() model.ExecutorSchema {
    return executor.SchemaOf[executor.DynamicOutputs, executor.DynamicOutputs](
        "echo", "1.0", "Echoes inputs and produces declared outputs",
    )
}

func (e *EchoExecutor) Execute(_ context.Context, req *executor.ExecuteRequest) (*model.ExecOutputs, error) {
    var suspend bool
    var failCount int
    var declaredOutputs []echoOutput
    var params []model.Parameter

    if req.Inputs != nil {
        for _, p := range req.Inputs.Parameters {
            switch p.Name {
            case "suspend":
                _ = json.Unmarshal(p.Value, &suspend)
            case "fail-count":
                _ = json.Unmarshal(p.Value, &failCount)
            case "outputs":
                _ = json.Unmarshal(p.Value, &declaredOutputs)
            default:
                params = append(params, p)
            }
        }
    }

    if suspend && !isResumed(req.Inputs) {
        return &model.ExecOutputs{
            Code:    model.ExecCodeSuspended,
            Message: "suspended; awaiting external resume signal",
        }, nil
    }

    if failCount > 0 && req.RetryCount < failCount {
        return &model.ExecOutputs{
            Code:    model.ExecCodeFailed,
            Message: fmt.Sprintf("simulated failure on attempt %d", req.RetryCount+1),
        }, nil
    }

    // merge declared outputs and return
    return &model.ExecOutputs{Parameters: params}, nil
}
Control parameters in the echo executor’s workflow JSON:
{
  "executor": { "type": "echo" },
  "inputs": {
    "parameters": [
      { "name": "suspend",    "value": true },
      { "name": "fail-count", "value": 2 },
      { "name": "outputs",    "value": [{ "name": "approved", "type": "bool", "value": true }] }
    ]
  }
}

Step-by-step: implementing a minimal executor

1

Define config and output structs

Create flat structs with json and desc tags for automatic schema derivation.
type GreetConfig struct {
    Name string `json:"name" desc:"Name to greet"`
}

type GreetOutput struct {
    Greeting string `json:"greeting" desc:"The greeting message"`
}
2

Implement the Plugin interface

Embed your structs into SchemaOf and use BindInputs / OutputFrom for type-safe I/O.
type GreetExecutor struct{}

func (g *GreetExecutor) Type() string { return "greet" }

func (g *GreetExecutor) Schema() model.ExecutorSchema {
    return executor.SchemaOf[GreetConfig, GreetOutput](
        "greet", "1.0", "Returns a personalised greeting",
    )
}

func (g *GreetExecutor) Execute(ctx context.Context, req *executor.ExecuteRequest) (*model.ExecOutputs, error) {
    var cfg GreetConfig
    if err := executor.BindInputs(req.Inputs, &cfg); err != nil {
        return &model.ExecOutputs{Code: model.ExecCodeError, Message: err.Error()}, nil
    }
    return executor.OutputFrom(GreetOutput{
        Greeting: "Hello, " + cfg.Name + "!",
    })
}
3

Register with the engine

Pass the executor to aether.New via WithExecutor.
engine, err := aether.New(
    aether.WithStore(store),
    aether.WithTaskBroker(broker),
    aether.WithIDGenerator(idgen),
    aether.WithExecutor(&GreetExecutor{}),
)
4

Reference in workflow JSON

Set executor.type to the string returned by Type().
{
  "name": "say-hello",
  "executor": { "type": "greet" },
  "inputs": {
    "parameters": [{ "name": "name", "value": "Aether" }]
  }
}

Build docs developers (and LLMs) love