Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/BabySid/aether/llms.txt

Use this file to discover all available pages before exploring further.

The task broker is the seam between Aether’s scheduling engine and the workers that execute tasks. The engine decides which tasks to run and when; the broker decides where and how they run. This boundary is the only point in the system where the deployment topology is encoded — swap the broker implementation and the same workflow definitions execute in-process, over a message queue, or across a fleet of remote workers, without touching a single workflow document or executor plugin.

The TaskBroker interface

// broker/broker.go
type TaskBroker interface {
    // Engine side
    Dispatch(ctx context.Context, assignment *TaskAssignment) error
    Cancel(ctx context.Context, taskRunID string) error

    // Worker side
    FetchTask(ctx context.Context, workerID string) (*TaskAssignment, error)
    StartTask(ctx context.Context, taskRunID string, workerID string) error
    CompleteTask(ctx context.Context, result *TaskResult) error

    // Lifecycle
    Close() error
}
The interface is deliberately split into two logical groups:
Dispatch — the engine calls this when a task becomes ready to run. The broker decides how to deliver the assignment: enqueue it to a channel, push to Redis, publish to a message queue, or invoke an HTTP endpoint. The engine does not care.Cancel — the engine calls this when a workflow is cancelled or a task times out. The broker propagates the signal however it can: cancel a Go context, send a remote kill signal, or mark the task for rejection in a queue.
Implementations must be safe for concurrent use by multiple goroutines. In a real deployment, multiple worker goroutines or processes will call FetchTask, StartTask, and CompleteTask simultaneously.

Callback types

Two function types connect the broker to the engine’s internal lifecycle handlers:
// broker/broker.go

// StartHandler is the callback invoked when a task begins execution.
// Engine's OnTaskStarted method satisfies this signature.
type StartHandler func(ctx context.Context, taskRunID string)

// CompletionHandler is the callback invoked when a task finishes execution.
// Engine's OnTaskCompleted method satisfies this signature.
type CompletionHandler func(ctx context.Context, result *TaskResult)
These types are not part of the TaskBroker interface contract. They are convenience types for local implementations that invoke engine callbacks directly. Distributed implementations typically publish events to a message bus instead and let a separate consumer call into the engine.

TaskAssignment: the fat assignment

// broker/broker.go
type TaskAssignment struct {
    TaskRunID     string
    WorkflowRunID string
    TaskName      string
    TemplateName  string
    ExecutorType  string           // e.g. "echo", "http", "shell"
    Inputs        *model.Inputs    // resolved task inputs (nil if none)
    Timeout       string           // e.g. "30m"
    Resources     *model.Resources // resource requirements (nil if none)
    Priority      int
    RetryCount    int              // retries already consumed (0 = first attempt)
}
TaskAssignment carries everything a worker needs to execute a task. Workers are fully self-contained — they never call back into store.Store. This design:
  • Eliminates distributed coupling between workers and the store
  • Makes workers simple, testable, and independently deployable
  • Allows a single broker interface to serve both local and remote topologies

TaskResult

// broker/broker.go
type TaskResult struct {
    TaskRunID     string
    WorkflowRunID string // mirrors TaskAssignment; avoids extra store lookup in the engine
    *model.ExecOutputs
}
WorkflowRunID is mirrored from the assignment so the engine can locate the correct scope for advancing the workflow without an additional store round-trip. Phase and Metrics are not included — they are framework concerns derived by the engine from ExecOutputs.Code.

Local vs distributed broker patterns

Local broker

Routes tasks through an in-process channel. Workers run as goroutines. StartTask and CompleteTask directly invoke the engine’s OnTaskStarted and OnTaskCompleted callbacks. Zero network latency; single process.

Distributed broker

Routes tasks through an external medium (Redis, RabbitMQ, Kafka, gRPC). Dispatch publishes to the queue; a separate consumer process calls FetchTask, executes the task, and calls CompleteTask, which publishes the result to a result queue consumed by the engine. Multiple workers, multiple processes.
The TaskBroker interface encodes no assumptions about which pattern is in use. Both patterns are valid implementations of the same interface.

Engine option

// option.go
func WithTaskBroker(b broker.TaskBroker) Option
WithTaskBroker is required. The engine will fail at startup without a broker.
engine, err := aether.New(
    aether.WithStore(store),
    aether.WithTaskBroker(broker),
    aether.WithIDGenerator(idgen),
    aether.WithExecutor(echoExecutor),
)

Reference implementation: LocalBroker

The playground LocalBroker is a complete, production-quality local implementation. Study it to understand the threading model and callback contract.
// cmd/playground/local_broker.go

type LocalBroker struct {
    startHandler broker.StartHandler
    handler      broker.CompletionHandler

    mu       sync.Mutex
    closed   bool
    taskCh   chan *broker.TaskAssignment
    taskCtxs map[string]context.Context    // taskRunID → execution context
    cancels  map[string]context.CancelFunc // taskRunID → cancel func

    worker *LocalWorker
}

func NewLocalBroker(startHandler broker.StartHandler, handler broker.CompletionHandler) *LocalBroker {
    return &LocalBroker{
        startHandler: startHandler,
        handler:      handler,
        taskCh:       make(chan *broker.TaskAssignment, 64),
        taskCtxs:     make(map[string]context.Context),
        cancels:      make(map[string]context.CancelFunc),
    }
}

Dispatch

Creates a per-task context (with optional timeout derived from assignment.Timeout), stores the cancel function for later Cancel calls, and sends the assignment to a buffered channel.
func (b *LocalBroker) Dispatch(ctx context.Context, assignment *broker.TaskAssignment) error {
    taskCtx, cancel := context.WithCancel(ctx)
    if assignment.Timeout != "" {
        timeout, err := internal.ParseDuration(assignment.Timeout)
        if err == nil && timeout > 0 {
            cancel()
            taskCtx, cancel = context.WithTimeout(ctx, timeout)
        }
    }
    b.mu.Lock()
    b.taskCtxs[assignment.TaskRunID] = taskCtx
    b.cancels[assignment.TaskRunID] = cancel
    b.mu.Unlock()

    b.taskCh <- assignment
    return nil
}

Cancel

Looks up the cancel function by taskRunID and invokes it, propagating cancellation to the task’s context.
func (b *LocalBroker) Cancel(_ context.Context, taskRunID string) error {
    b.mu.Lock()
    cancel, ok := b.cancels[taskRunID]
    b.mu.Unlock()
    if ok {
        cancel()
    }
    return nil
}

FetchTask

Blocks on the channel until an assignment arrives or the context is cancelled.
func (b *LocalBroker) FetchTask(ctx context.Context, _ string) (*broker.TaskAssignment, error) {
    select {
    case assignment, ok := <-b.taskCh:
        if !ok {
            return nil, fmt.Errorf("broker closed")
        }
        return assignment, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

StartTask and CompleteTask

StartTask directly invokes the StartHandler (the engine’s OnTaskStarted). CompleteTask cleans up per-task state and invokes the CompletionHandler (the engine’s OnTaskCompleted).
func (b *LocalBroker) StartTask(ctx context.Context, taskRunID string, _ string) error {
    if b.startHandler != nil {
        b.startHandler(ctx, taskRunID)
    }
    return nil
}

func (b *LocalBroker) CompleteTask(ctx context.Context, result *broker.TaskResult) error {
    b.CleanupTask(result.TaskRunID)
    if b.handler != nil {
        b.handler(ctx, result)
    }
    return nil
}

Close

Cancels all in-flight task contexts, closes the task channel, and signals the worker to stop.
func (b *LocalBroker) Close() error {
    b.mu.Lock()
    b.closed = true
    for _, cancel := range b.cancels {
        cancel()
    }
    b.mu.Unlock()
    close(b.taskCh)
    if b.worker != nil {
        b.worker.Stop()
    }
    return nil
}

Thread-safety requirements

Your TaskBroker implementation must protect all shared state with appropriate synchronisation. In LocalBroker, taskCtxs and cancels are protected by mu because Dispatch (engine goroutine) and Cancel (engine goroutine) may race with CleanupTask (worker goroutines calling CompleteTask). The channel itself is safe for concurrent send and receive.
Failing to synchronise the cancel map leads to data races that are difficult to reproduce. Use -race in tests: go test -race ./....

Build docs developers (and LLMs) love