Skip to main content

Orchestrator

The Orchestrator is the core coordination engine that owns the poll loop, runtime state, and all dispatch decisions.

Implementation

defmodule SymphonyElixir.Orchestrator do
  use GenServer
  
  defmodule State do
    defstruct [
      :poll_interval_ms,
      :max_concurrent_agents,
      :next_poll_due_at_ms,
      :poll_check_in_progress,
      running: %{},
      completed: MapSet.new(),
      claimed: MapSet.new(),
      retry_attempts: %{},
      codex_totals: nil,
      codex_rate_limits: nil
    ]
  end
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:24-41

Responsibilities

Poll Management

Schedules ticks every polling.interval_ms, validates config, fetches candidates

Dispatch Decisions

Sorts issues by priority, checks eligibility, enforces concurrency limits

Reconciliation

Detects stalled sessions, refreshes tracker state, terminates ineligible workers

Retry Scheduling

Exponential backoff for failures, continuation retries for active issues

Poll Loop

The orchestrator runs a continuous poll loop:
def handle_info(:tick, state) do
  state = refresh_runtime_config(state)
  state = %{state | poll_check_in_progress: true}
  notify_dashboard()
  schedule_poll_cycle_start()
  {:noreply, state}
end

def handle_info(:run_poll_cycle, state) do
  state = maybe_dispatch(state)  # Reconcile + validate + dispatch
  now_ms = System.monotonic_time(:millisecond)
  next_poll_due_at_ms = now_ms + state.poll_interval_ms
  schedule_tick(state.poll_interval_ms)
  {:noreply, %{state | poll_check_in_progress: false, next_poll_due_at_ms: next_poll_due_at_ms}}
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:68-89

Candidate Selection

Issues are eligible for dispatch when:
  1. Has required fields: id, identifier, title, state
  2. State is in active_states AND not in terminal_states
  3. Not already in running map
  4. Not already in claimed set
  5. Global concurrency slots available: max_concurrent_agents - running_count > 0
  6. Per-state concurrency slots available (if configured)
  7. Blocker rule for Todo: If state is “Todo”, no non-terminal blockers exist
defp should_dispatch_issue?(
  %Issue{} = issue,
  %State{running: running, claimed: claimed} = state,
  active_states,
  terminal_states
) do
  candidate_issue?(issue, active_states, terminal_states) and
    !todo_issue_blocked_by_non_terminal?(issue, terminal_states) and
    !MapSet.member?(claimed, issue.id) and
    !Map.has_key?(running, issue.id) and
    available_slots(state) > 0 and
    state_slots_available?(issue, running)
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:473-485

Dispatch Sorting

Issues are sorted by:
  1. Priority (ascending: 1..4 preferred, null sorts last)
  2. Created at (oldest first)
  3. Identifier (lexicographic tie-breaker)
defp sort_issues_for_dispatch(issues) when is_list(issues) do
  Enum.sort_by(issues, fn
    %Issue{} = issue ->
      {priority_rank(issue.priority), issue_created_at_sort_key(issue), issue.identifier || issue.id || ""}
  end)
end

defp priority_rank(priority) when is_integer(priority) and priority in 1..4, do: priority
defp priority_rank(_priority), do: 5
Reference: elixir/lib/symphony_elixir/orchestrator.ex:453-464

Retry Backoff

Symphony uses two different retry strategies depending on exit reason.
Continuation retry (normal exit, issue still active):
delay = 1000ms (fixed short delay)
attempt = 1
Failure retry (abnormal exit, error, timeout, stall):
delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)
attempt increments: 1, 2, 3, ...
defp retry_delay(attempt, metadata) when is_integer(attempt) and attempt > 0 do
  if metadata[:delay_type] == :continuation and attempt == 1 do
    @continuation_retry_delay_ms  # 1000
  else
    failure_retry_delay(attempt)
  end
end

defp failure_retry_delay(attempt) do
  max_delay_power = min(attempt - 1, 10)
  min(@failure_retry_base_ms * (1 <<< max_delay_power), Config.max_retry_backoff_ms())
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:821-832

Reconciliation

Reconciliation has two parts:
defp reconcile_stalled_running_issues(%State{} = state) do
  timeout_ms = Config.codex_stall_timeout_ms()
  
  cond do
    timeout_ms <= 0 -> state  # Disabled
    map_size(state.running) == 0 -> state
    true ->
      now = DateTime.utc_now()
      Enum.reduce(state.running, state, fn {issue_id, running_entry}, state_acc ->
        restart_stalled_issue(state_acc, issue_id, running_entry, now, timeout_ms)
      end)
  end
end
Elapsed time is computed from last_codex_timestamp (if any event seen) or started_at.Reference: elixir/lib/symphony_elixir/orchestrator.ex:367-384
defp reconcile_running_issues(%State{} = state) do
  state = reconcile_stalled_running_issues(state)
  running_ids = Map.keys(state.running)
  
  if running_ids == [] do
    state
  else
    case Tracker.fetch_issue_states_by_ids(running_ids) do
      {:ok, issues} ->
        reconcile_running_issue_states(issues, state, active_state_set(), terminal_state_set())
      {:error, reason} ->
        Logger.debug("Failed to refresh: #{inspect(reason)}; keeping active workers")
        state
    end
  end
end
For each running issue:
  • Terminal state → terminate worker, clean workspace
  • Active state → update in-memory issue snapshot
  • Neither → terminate worker (no cleanup)
Reference: elixir/lib/symphony_elixir/orchestrator.ex:236-258

Token Accounting

The orchestrator tracks cumulative token usage across all sessions:
defp integrate_codex_update(running_entry, %{event: event, timestamp: timestamp} = update) do
  token_delta = extract_token_delta(running_entry, update)
  
  {
    Map.merge(running_entry, %{
      codex_input_tokens: running_entry.codex_input_tokens + token_delta.input_tokens,
      codex_output_tokens: running_entry.codex_output_tokens + token_delta.output_tokens,
      codex_total_tokens: running_entry.codex_total_tokens + token_delta.total_tokens,
      # ... other fields
    }),
    token_delta
  }
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:987-1015
Token deltas are extracted from absolute totals (thread/tokenUsage/updated payloads) to avoid double-counting. Delta-style payloads are ignored for aggregate totals.

Workspace Manager

The Workspace Manager creates isolated per-issue directories and runs lifecycle hooks.

Implementation

defmodule SymphonyElixir.Workspace do
  @spec create_for_issue(map() | String.t() | nil) :: {:ok, Path.t()} | {:error, term()}
  def create_for_issue(issue_or_identifier) do
    safe_id = safe_identifier(issue_context.issue_identifier)
    workspace = workspace_path_for_issue(safe_id)
    
    with :ok <- validate_workspace_path(workspace),
         {:ok, created?} <- ensure_workspace(workspace),
         :ok <- maybe_run_after_create_hook(workspace, issue_context, created?) do
      {:ok, workspace}
    end
  end
end
Reference: elixir/lib/symphony_elixir/workspace.ex:11-30

Workspace Path Sanitization

Security-critical: Issue identifiers are sanitized before use in filesystem paths.
defp safe_identifier(identifier) do
  String.replace(identifier || "issue", ~r/[^a-zA-Z0-9._-]/, "_")
end
Only [A-Za-z0-9._-] are allowed. All other characters become _. Reference: elixir/lib/symphony_elixir/workspace.ex:115-117

Path Validation

Workspace paths must pass strict safety checks:
defp validate_workspace_path(workspace) when is_binary(workspace) do
  expanded_workspace = Path.expand(workspace)
  root = Path.expand(Config.workspace_root())
  root_prefix = root <> "/"
  
  cond do
    expanded_workspace == root ->
      {:error, {:workspace_equals_root, expanded_workspace, root}}
    
    String.starts_with?(expanded_workspace <> "/", root_prefix) ->
      ensure_no_symlink_components(expanded_workspace, root)
    
    true ->
      {:error, {:workspace_outside_root, expanded_workspace, root}}
  end
end
Reference: elixir/lib/symphony_elixir/workspace.ex:213-228
Symlink escape prevention: Each path component is checked with File.lstat/1 to detect symlinks that might escape the workspace root.

Lifecycle Hooks

Hooks are shell scripts executed in the workspace directory:

after_create

Runs only when workspace directory is newly created. Failure aborts workspace creation.

before_run

Runs before each agent attempt. Failure aborts current attempt.

after_run

Runs after each attempt (success/failure/timeout). Failure logged and ignored.

before_remove

Runs before workspace deletion. Failure logged and ignored.
defp run_hook(command, workspace, issue_context, hook_name) do
  timeout_ms = Config.workspace_hooks()[:timeout_ms]
  
  task = Task.async(fn ->
    System.cmd("sh", ["-lc", command], cd: workspace, stderr_to_stdout: true)
  end)
  
  case Task.yield(task, timeout_ms) do
    {:ok, {_output, 0}} -> :ok
    {:ok, {output, status}} -> {:error, {:workspace_hook_failed, hook_name, status, output}}
    nil -> 
      Task.shutdown(task, :brutal_kill)
      {:error, {:workspace_hook_timeout, hook_name, timeout_ms}}
  end
end
Reference: elixir/lib/symphony_elixir/workspace.ex:166-187

Agent Runner

The Agent Runner executes a single issue in an isolated workspace by launching Codex app-server.

Implementation

defmodule SymphonyElixir.AgentRunner do
  @spec run(map(), pid() | nil, keyword()) :: :ok | no_return()
  def run(issue, codex_update_recipient, opts) do
    case Workspace.create_for_issue(issue) do
      {:ok, workspace} ->
        try do
          with :ok <- Workspace.run_before_run_hook(workspace, issue),
               :ok <- run_codex_turns(workspace, issue, codex_update_recipient, opts) do
            :ok
          end
        after
          Workspace.run_after_run_hook(workspace, issue)
        end
    end
  end
end
Reference: elixir/lib/symphony_elixir/agent_runner.ex:10-33

Multi-Turn Sessions

The Agent Runner can execute multiple turns on the same Codex thread within a single worker run, up to agent.max_turns.
defp do_run_codex_turns(app_session, workspace, issue, recipient, opts, fetcher, turn_number, max_turns) do
  prompt = build_turn_prompt(issue, opts, turn_number, max_turns)
  
  with {:ok, turn_session} <- AppServer.run_turn(app_session, prompt, issue, on_message: handler) do
    case continue_with_issue?(issue, fetcher) do
      {:continue, refreshed_issue} when turn_number < max_turns ->
        # Start next turn on same thread
        do_run_codex_turns(app_session, workspace, refreshed_issue, recipient, opts, fetcher, turn_number + 1, max_turns)
      
      {:continue, _} ->
        Logger.info("Reached max_turns with issue still active; returning to orchestrator")
        :ok
      
      {:done, _} ->
        :ok
    end
  end
end
Reference: elixir/lib/symphony_elixir/agent_runner.ex:62-101

Prompt Construction

First turn: Full task prompt rendered from WORKFLOW.md template + issue data Continuation turns: Short guidance message
defp build_turn_prompt(_issue, _opts, turn_number, max_turns) when turn_number > 1 do
  """
  Continuation guidance:
  
  - The previous Codex turn completed normally, but the Linear issue is still active.
  - This is continuation turn ##{turn_number} of #{max_turns}.
  - Resume from current workspace state instead of restarting from scratch.
  - The original task instructions are already in this thread, so do not restate them.
  - Focus on remaining ticket work.
  """
end
Reference: elixir/lib/symphony_elixir/agent_runner.ex:105-115

Tracker Client

The Tracker Client provides a normalized interface to issue tracker APIs.

Adapter Pattern

defmodule SymphonyElixir.Tracker do
  @callback fetch_candidate_issues() :: {:ok, [term()]} | {:error, term()}
  @callback fetch_issues_by_states([String.t()]) :: {:ok, [term()]} | {:error, term()}
  @callback fetch_issue_states_by_ids([String.t()]) :: {:ok, [term()]} | {:error, term()}
  
  @spec adapter() :: module()
  def adapter do
    case Config.tracker_kind() do
      "memory" -> SymphonyElixir.Tracker.Memory
      _ -> SymphonyElixir.Linear.Adapter
    end
  end
end
Reference: elixir/lib/symphony_elixir/tracker.ex:1-46

Linear Adapter

The Linear adapter normalizes GraphQL responses into the issue model:
%Issue{
  id: String.t(),              # Tracker-internal ID
  identifier: String.t(),      # Human-readable key (e.g. "ABC-123")
  title: String.t(),
  description: String.t() | nil,
  priority: integer() | nil,   # Lower = higher priority
  state: String.t(),
  branch_name: String.t() | nil,
  url: String.t() | nil,
  labels: [String.t()],        # Normalized to lowercase
  blocked_by: [blocker_ref],
  created_at: DateTime.t() | nil,
  updated_at: DateTime.t() | nil
}

Query Operations

Candidate Issues

Filters by project.slugId and active_states, paginated with page size 50

State Refresh

Fetches current state for specific issue IDs (reconciliation)

Terminal Cleanup

Fetches issues in terminal states (startup cleanup)

Codex App Server Client

The Codex App Server Client manages the JSON-RPC protocol over stdio.

Session Lifecycle

# 1. Start session
{:ok, session} <- AppServer.start_session(workspace)

# 2. Send protocol messages
send_initialize(port)     # id: 1
send_initialized(port)
send_thread_start(port)   # id: 2

# 3. Run turn
send_turn_start(port)     # id: 3
await_turn_completion(port, on_message, tool_executor, auto_approve)

# 4. Stop session
AppServer.stop_session(session)
Reference: elixir/lib/symphony_elixir/codex/app_server.ex:38-64

Tool Execution

Dynamic tools are handled via SymphonyElixir.Codex.DynamicTool:
Allows the agent to execute raw GraphQL queries against Linear using Symphony’s configured auth.
# Tool input
%{
  "query" => "mutation { issueUpdate(id: \"...\", input: { stateId: \"...\" }) { success } }",
  "variables" => %{}
}

# Tool result
%{"success" => true, "data" => %{...}}
Reference: SPEC.md:1060-1092

Approval Policy

Approval behavior is implementation-defined via codex.approval_policy:
  • High-trust: Auto-approve command execution and file changes
  • User-input: Treat as hard failure (non-interactive session)
  • Unsupported tools: Return failure result, continue session

Next Steps

Workflow Lifecycle

End-to-end flow from issue polling to cleanup

Workspace Isolation

Safety mechanisms and lifecycle hooks

Build docs developers (and LLMs) love