Orchestrator
The Orchestrator is the core coordination engine that owns the poll loop, runtime state, and all dispatch decisions.
Implementation
defmodule SymphonyElixir . Orchestrator do
use GenServer
defmodule State do
defstruct [
:poll_interval_ms ,
:max_concurrent_agents ,
:next_poll_due_at_ms ,
:poll_check_in_progress ,
running: %{},
completed: MapSet . new (),
claimed: MapSet . new (),
retry_attempts: %{},
codex_totals: nil ,
codex_rate_limits: nil
]
end
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:24-41
Responsibilities
Poll Management Schedules ticks every polling.interval_ms, validates config, fetches candidates
Dispatch Decisions Sorts issues by priority, checks eligibility, enforces concurrency limits
Reconciliation Detects stalled sessions, refreshes tracker state, terminates ineligible workers
Retry Scheduling Exponential backoff for failures, continuation retries for active issues
Poll Loop
The orchestrator runs a continuous poll loop:
def handle_info ( :tick , state) do
state = refresh_runtime_config (state)
state = %{state | poll_check_in_progress: true }
notify_dashboard ()
schedule_poll_cycle_start ()
{ :noreply , state}
end
def handle_info ( :run_poll_cycle , state) do
state = maybe_dispatch (state) # Reconcile + validate + dispatch
now_ms = System . monotonic_time ( :millisecond )
next_poll_due_at_ms = now_ms + state.poll_interval_ms
schedule_tick (state.poll_interval_ms)
{ :noreply , %{state | poll_check_in_progress: false , next_poll_due_at_ms: next_poll_due_at_ms}}
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:68-89
Candidate Selection
Issues are eligible for dispatch when:
Has required fields: id, identifier, title, state
State is in active_states AND not in terminal_states
Not already in running map
Not already in claimed set
Global concurrency slots available: max_concurrent_agents - running_count > 0
Per-state concurrency slots available (if configured)
Blocker rule for Todo : If state is “Todo”, no non-terminal blockers exist
defp should_dispatch_issue? (
% Issue {} = issue,
% State { running: running, claimed: claimed} = state,
active_states,
terminal_states
) do
candidate_issue? (issue, active_states, terminal_states) and
! todo_issue_blocked_by_non_terminal? (issue, terminal_states) and
! MapSet . member? (claimed, issue.id) and
! Map . has_key? (running, issue.id) and
available_slots (state) > 0 and
state_slots_available? (issue, running)
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:473-485
Dispatch Sorting
Issues are sorted by:
Priority (ascending: 1..4 preferred, null sorts last)
Created at (oldest first)
Identifier (lexicographic tie-breaker)
defp sort_issues_for_dispatch (issues) when is_list (issues) do
Enum . sort_by (issues, fn
% Issue {} = issue ->
{ priority_rank (issue.priority), issue_created_at_sort_key (issue), issue.identifier || issue.id || "" }
end )
end
defp priority_rank (priority) when is_integer (priority) and priority in 1 .. 4 , do: priority
defp priority_rank ( _priority ), do: 5
Reference: elixir/lib/symphony_elixir/orchestrator.ex:453-464
Retry Backoff
Symphony uses two different retry strategies depending on exit reason.
Continuation retry (normal exit, issue still active):
delay = 1000ms (fixed short delay)
attempt = 1
Failure retry (abnormal exit, error, timeout, stall):
delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)
attempt increments: 1, 2, 3, ...
defp retry_delay (attempt, metadata) when is_integer (attempt) and attempt > 0 do
if metadata[ :delay_type ] == :continuation and attempt == 1 do
@continuation_retry_delay_ms # 1000
else
failure_retry_delay (attempt)
end
end
defp failure_retry_delay (attempt) do
max_delay_power = min (attempt - 1 , 10 )
min ( @failure_retry_base_ms * ( 1 <<< max_delay_power), Config . max_retry_backoff_ms ())
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:821-832
Reconciliation
Reconciliation has two parts:
defp reconcile_stalled_running_issues (% State {} = state) do
timeout_ms = Config . codex_stall_timeout_ms ()
cond do
timeout_ms <= 0 -> state # Disabled
map_size(state.running) == 0 -> state
true ->
now = DateTime . utc_now ()
Enum . reduce (state.running, state, fn {issue_id, running_entry}, state_acc ->
restart_stalled_issue (state_acc, issue_id, running_entry, now, timeout_ms)
end )
end
end
Elapsed time is computed from last_codex_timestamp (if any event seen) or started_at. Reference: elixir/lib/symphony_elixir/orchestrator.ex:367-384
Part B: Tracker State Refresh
defp reconcile_running_issues (% State {} = state) do
state = reconcile_stalled_running_issues (state)
running_ids = Map . keys (state.running)
if running_ids == [] do
state
else
case Tracker . fetch_issue_states_by_ids (running_ids) do
{ :ok , issues} ->
reconcile_running_issue_states (issues, state, active_state_set (), terminal_state_set ())
{ :error , reason} ->
Logger . debug ( "Failed to refresh: #{ inspect (reason) } ; keeping active workers" )
state
end
end
end
For each running issue:
Terminal state → terminate worker, clean workspace
Active state → update in-memory issue snapshot
Neither → terminate worker (no cleanup)
Reference: elixir/lib/symphony_elixir/orchestrator.ex:236-258
Token Accounting
The orchestrator tracks cumulative token usage across all sessions:
defp integrate_codex_update (running_entry, %{ event: event, timestamp: timestamp} = update) do
token_delta = extract_token_delta (running_entry, update)
{
Map . merge (running_entry, %{
codex_input_tokens: running_entry.codex_input_tokens + token_delta.input_tokens,
codex_output_tokens: running_entry.codex_output_tokens + token_delta.output_tokens,
codex_total_tokens: running_entry.codex_total_tokens + token_delta.total_tokens,
# ... other fields
}),
token_delta
}
end
Reference: elixir/lib/symphony_elixir/orchestrator.ex:987-1015
Token deltas are extracted from absolute totals (thread/tokenUsage/updated payloads) to avoid double-counting. Delta-style payloads are ignored for aggregate totals.
Workspace Manager
The Workspace Manager creates isolated per-issue directories and runs lifecycle hooks.
Implementation
defmodule SymphonyElixir . Workspace do
@spec create_for_issue ( map () | String . t () | nil ) :: { :ok , Path . t ()} | { :error , term ()}
def create_for_issue (issue_or_identifier) do
safe_id = safe_identifier (issue_context.issue_identifier)
workspace = workspace_path_for_issue (safe_id)
with :ok <- validate_workspace_path (workspace),
{ :ok , created?} <- ensure_workspace (workspace),
:ok <- maybe_run_after_create_hook (workspace, issue_context, created?) do
{ :ok , workspace}
end
end
end
Reference: elixir/lib/symphony_elixir/workspace.ex:11-30
Workspace Path Sanitization
Security-critical : Issue identifiers are sanitized before use in filesystem paths.
defp safe_identifier (identifier) do
String . replace (identifier || "issue" , ~r/[^a-zA-Z0-9._-]/ , "_" )
end
Only [A-Za-z0-9._-] are allowed. All other characters become _.
Reference: elixir/lib/symphony_elixir/workspace.ex:115-117
Path Validation
Workspace paths must pass strict safety checks:
defp validate_workspace_path (workspace) when is_binary (workspace) do
expanded_workspace = Path . expand (workspace)
root = Path . expand ( Config . workspace_root ())
root_prefix = root <> "/"
cond do
expanded_workspace == root ->
{ :error , { :workspace_equals_root , expanded_workspace, root}}
String .starts_with?(expanded_workspace <> "/" , root_prefix) ->
ensure_no_symlink_components (expanded_workspace, root)
true ->
{ :error , { :workspace_outside_root , expanded_workspace, root}}
end
end
Reference: elixir/lib/symphony_elixir/workspace.ex:213-228
Symlink escape prevention: Each path component is checked with File.lstat/1 to detect symlinks that might escape the workspace root.
Lifecycle Hooks
Hooks are shell scripts executed in the workspace directory:
after_create Runs only when workspace directory is newly created . Failure aborts workspace creation.
before_run Runs before each agent attempt. Failure aborts current attempt.
after_run Runs after each attempt (success/failure/timeout). Failure logged and ignored.
before_remove Runs before workspace deletion. Failure logged and ignored.
defp run_hook (command, workspace, issue_context, hook_name) do
timeout_ms = Config . workspace_hooks ()[ :timeout_ms ]
task = Task . async ( fn ->
System . cmd ( "sh" , [ "-lc" , command], cd: workspace, stderr_to_stdout: true )
end )
case Task . yield (task, timeout_ms) do
{ :ok , { _output , 0 }} -> :ok
{ :ok , {output, status}} -> { :error , { :workspace_hook_failed , hook_name, status, output}}
nil ->
Task . shutdown (task, :brutal_kill )
{ :error , { :workspace_hook_timeout , hook_name, timeout_ms}}
end
end
Reference: elixir/lib/symphony_elixir/workspace.ex:166-187
Agent Runner
The Agent Runner executes a single issue in an isolated workspace by launching Codex app-server.
Implementation
defmodule SymphonyElixir . AgentRunner do
@spec run ( map (), pid () | nil , keyword ()) :: :ok | no_return ()
def run (issue, codex_update_recipient, opts) do
case Workspace . create_for_issue (issue) do
{ :ok , workspace} ->
try do
with :ok <- Workspace . run_before_run_hook (workspace, issue),
:ok <- run_codex_turns (workspace, issue, codex_update_recipient, opts) do
:ok
end
after
Workspace . run_after_run_hook (workspace, issue)
end
end
end
end
Reference: elixir/lib/symphony_elixir/agent_runner.ex:10-33
Multi-Turn Sessions
The Agent Runner can execute multiple turns on the same Codex thread within a single worker run, up to agent.max_turns.
defp do_run_codex_turns (app_session, workspace, issue, recipient, opts, fetcher, turn_number, max_turns) do
prompt = build_turn_prompt (issue, opts, turn_number, max_turns)
with { :ok , turn_session} <- AppServer . run_turn (app_session, prompt, issue, on_message: handler) do
case continue_with_issue? (issue, fetcher) do
{ :continue , refreshed_issue} when turn_number < max_turns ->
# Start next turn on same thread
do_run_codex_turns (app_session, workspace, refreshed_issue, recipient, opts, fetcher, turn_number + 1 , max_turns)
{ :continue , _ } ->
Logger . info ( "Reached max_turns with issue still active; returning to orchestrator" )
:ok
{ :done , _ } ->
:ok
end
end
end
Reference: elixir/lib/symphony_elixir/agent_runner.ex:62-101
Prompt Construction
First turn : Full task prompt rendered from WORKFLOW.md template + issue data
Continuation turns : Short guidance message
defp build_turn_prompt ( _issue , _opts , turn_number, max_turns) when turn_number > 1 do
"""
Continuation guidance:
- The previous Codex turn completed normally, but the Linear issue is still active.
- This is continuation turn # #{ turn_number } of #{ max_turns } .
- Resume from current workspace state instead of restarting from scratch.
- The original task instructions are already in this thread, so do not restate them.
- Focus on remaining ticket work.
"""
end
Reference: elixir/lib/symphony_elixir/agent_runner.ex:105-115
Tracker Client
The Tracker Client provides a normalized interface to issue tracker APIs.
Adapter Pattern
defmodule SymphonyElixir . Tracker do
@callback fetch_candidate_issues () :: { :ok , [ term ()]} | { :error , term ()}
@callback fetch_issues_by_states ([ String . t ()]) :: { :ok , [ term ()]} | { :error , term ()}
@callback fetch_issue_states_by_ids ([ String . t ()]) :: { :ok , [ term ()]} | { :error , term ()}
@spec adapter () :: module ()
def adapter do
case Config . tracker_kind () do
"memory" -> SymphonyElixir . Tracker . Memory
_ -> SymphonyElixir . Linear . Adapter
end
end
end
Reference: elixir/lib/symphony_elixir/tracker.ex:1-46
Linear Adapter
The Linear adapter normalizes GraphQL responses into the issue model:
% Issue {
id: String . t (), # Tracker-internal ID
identifier: String . t (), # Human-readable key (e.g. "ABC-123")
title: String . t (),
description: String . t () | nil ,
priority: integer () | nil , # Lower = higher priority
state: String . t (),
branch_name: String . t () | nil ,
url: String . t () | nil ,
labels: [ String . t ()], # Normalized to lowercase
blocked_by: [blocker_ref],
created_at: DateTime . t () | nil ,
updated_at: DateTime . t () | nil
}
Query Operations
Candidate Issues Filters by project.slugId and active_states, paginated with page size 50
State Refresh Fetches current state for specific issue IDs (reconciliation)
Terminal Cleanup Fetches issues in terminal states (startup cleanup)
Codex App Server Client
The Codex App Server Client manages the JSON-RPC protocol over stdio.
Session Lifecycle
# 1. Start session
{ :ok , session} <- AppServer . start_session (workspace)
# 2. Send protocol messages
send_initialize (port) # id: 1
send_initialized (port)
send_thread_start (port) # id: 2
# 3. Run turn
send_turn_start (port) # id: 3
await_turn_completion (port, on_message, tool_executor, auto_approve)
# 4. Stop session
AppServer . stop_session (session)
Reference: elixir/lib/symphony_elixir/codex/app_server.ex:38-64
Dynamic tools are handled via SymphonyElixir.Codex.DynamicTool:
Approval Policy
Approval behavior is implementation-defined via codex.approval_policy:
High-trust : Auto-approve command execution and file changes
User-input : Treat as hard failure (non-interactive session)
Unsupported tools : Return failure result, continue session
Next Steps
Workflow Lifecycle End-to-end flow from issue polling to cleanup
Workspace Isolation Safety mechanisms and lifecycle hooks