Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

The broker.TaskBroker interface is the single bridge between the Aether engine (scheduler) and workers (executors). It manages the full lifecycle of task distribution: dispatch from the engine side, and fetch/start/complete from the worker side. Implementations decide the transport — goroutines, a message queue, Redis, gRPC, or any other mechanism.
Implementations must be safe for concurrent use by multiple goroutines. The engine calls Dispatch and Cancel from scheduling goroutines, and workers call FetchTask, StartTask, and CompleteTask concurrently.

TaskBroker interface

type TaskBroker interface {
    // Engine side
    Dispatch(ctx context.Context, assignment *TaskAssignment) error
    Cancel(ctx context.Context, taskRunID string) error

    // Worker side
    FetchTask(ctx context.Context, workerID string) (*TaskAssignment, error)
    StartTask(ctx context.Context, taskRunID string, workerID string) error
    CompleteTask(ctx context.Context, result *TaskResult) error

    // Lifecycle
    Close() error
}

Engine-side methods

Submits a task for execution. The implementation decides how and where the task runs.
  • Local broker: starts a goroutine that calls the executor directly
  • Distributed broker: enqueues to a message queue or HTTP endpoint
assignment
*TaskAssignment
required
The fat task assignment carrying all information the worker needs.
Sends a cancellation signal to a running or queued task. The implementation decides how to propagate the signal — context cancellation, a remote API call, or a queue message.Cancel is best-effort. The engine always updates the store regardless of whether Cancel succeeds.
taskRunID
string
required
The task run ID to cancel.

Worker-side methods

Pulls a pending task for execution. This is typically a blocking or long-poll call.Returns (nil, context.DeadlineExceeded) or (nil, context.Canceled) when the context expires before a task is available.
workerID
string
required
Identifies the calling worker for affinity or logging purposes.
Reports that a worker has begun executing a task. Must be called before any actual computation starts. The implementation delivers this event to the engine — either by directly invoking the StartHandler (local) or publishing to a queue (distributed).The engine transitions the task and its ancestor containers from Ready to Running when this is received.
Reports the final execution result after the task finishes. The implementation delivers this to the engine — either directly via CompletionHandler or through a queue.
result
*TaskResult
required
The task result including outputs and exit code.

TaskAssignment

The “fat assignment” struct carries everything a worker needs to execute a task. Workers never need to call back into the store.
type TaskAssignment struct {
    TaskRunID     string
    WorkflowRunID string
    TaskName      string
    TemplateName  string
    ExecutorType  string           // e.g. "echo", "http", "shell"
    Inputs        *model.Inputs    // resolved task inputs (nil if none)
    Timeout       string           // e.g. "30m"; empty = no deadline beyond ctx
    Resources     *model.Resources // resource requirements (nil if none)
    Priority      int
    RetryCount    int              // retries consumed; 0 = first attempt
}
TaskRunID
string
Unique ID of this task run. Pass to StartTask and embed in TaskResult.
ExecutorType
string
The executor type identifier. The worker uses this to route to the correct executor.Plugin.
Inputs
*model.Inputs
Fully resolved task inputs including parameters and secrets. Nil if the task declares no inputs.
RetryCount
int
Number of retries already consumed. 0 = first attempt. Passed through to ExecuteRequest.RetryCount.

TaskResult

The message a worker sends to the engine when a task finishes.
type TaskResult struct {
    TaskRunID     string
    WorkflowRunID string     // mirrors TaskAssignment.WorkflowRunID
    *model.ExecOutputs
}
Phase and Metrics are deliberately absent — the engine derives Phase from ExecOutputs.Code and records Metrics itself in OnTaskStarted / OnTaskCompleted.

Callback types

These types are convenience helpers for local (in-process) broker implementations. They are not part of the TaskBroker interface contract.
// StartHandler — satisfied by engine.OnTaskStarted
type StartHandler func(ctx context.Context, taskRunID string)

// CompletionHandler — satisfied by engine.OnTaskCompleted
type CompletionHandler func(ctx context.Context, result *TaskResult)
A local broker implementation typically accepts these callbacks at construction time and calls them directly from StartTask and CompleteTask.

Local broker pattern

type LocalBroker struct {
    onStart    broker.StartHandler
    onComplete broker.CompletionHandler
    // ... queue, cancel map, etc.
}

func NewLocalBroker(onStart broker.StartHandler, onComplete broker.CompletionHandler) *LocalBroker {
    return &LocalBroker{onStart: onStart, onComplete: onComplete}
}

func (b *LocalBroker) StartTask(ctx context.Context, taskRunID, workerID string) error {
    b.onStart(ctx, taskRunID) // directly invokes engine.OnTaskStarted
    return nil
}

func (b *LocalBroker) CompleteTask(ctx context.Context, result *broker.TaskResult) error {
    b.onComplete(ctx, result) // directly invokes engine.OnTaskCompleted
    return nil
}

Inject with WithTaskBroker

broker := NewLocalBroker(engine.OnTaskStarted, engine.OnTaskCompleted)

engine, err := aether.New(
    aether.WithTaskBroker(broker), // required
    // ...
)

Build docs developers (and LLMs) love