Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/8BitTacoSupreme/flowstate/llms.txt

Use this file to discover all available pages before exploring further.

The orchestrator in flowstate/orchestrator.py is the single file that owns pipeline sequencing. It creates the MemoryStore and EventBus, calls write_context_files(), dispatches each tool adapter through _run_step(), runs the discipline audit, then prints a summary and next-step commands. Tools that fail are marked BLOCKED and the pipeline continues — a failure in research does not prevent strategy or discipline from running.

Constants: TOOL_ORDER, STEP_LABELS, STEP_STYLES

Three module-level dicts drive the ordered rendering of every tool in print_status() and the colored step headers printed during run_pipeline():
TOOL_ORDER = ["research", "strategy", "gsd", "discipline"]

STEP_LABELS = {
    "research": "Research",
    "strategy": "Strategy",
    "gsd": "Management",
    "discipline": "Discipline",
}

STEP_STYLES = {
    "research": "cyan",
    "strategy": "yellow",
    "gsd": "green",
    "discipline": "magenta",
}
Adding a new pipeline step means extending all three dicts and adding the corresponding adapter call in run_pipeline().

run_pipeline()

run_pipeline(state, root) is the top-level entry point called by cli.py after the interview completes. It returns the mutated FlowStateModel with all tool statuses and artifact paths populated.
1

Bridge construction and dry-run fallback

_make_bridge(root, dry_run, preferences) constructs a ClaudeBridge from a BridgeConfig. If dry_run=False but the claude CLI is not found on PATH, the orchestrator silently falls back to a dry-run bridge:
if not dry_run and not bridge.available:
    mode_tag = "[red](no claude CLI — falling back to dry-run)[/red]"
    bridge = ClaudeBridge(config=bridge.config, dry_run=True)
2

MemoryStore and EventBus setup

A new MemoryStore (SQLite FTS5) and EventBus are created at the start of every pipeline run. A run_id is generated as a 12-character UUID hex prefix to group all memories from this run:
run_id = uuid4().hex[:12]
memory = MemoryStore(root=root)
bus = EventBus()
for h in create_memory_handlers(memory, root, run_id=run_id):
    bus.register(h)
3

Interview answers stored as a decision memory

Before any tool runs, the interview answers are written to memory.db as a MemoryKind.DECISION entry. This means subsequent pipeline runs can inject prior problem/vision context into research prompts:
memory.add(
    MemoryEntry.create(
        MemoryKind.DECISION,
        (
            f"Core problem: {answers.core_problem}\n"
            f"10x vision: {answers.ten_x_vision}\n"
            f"Architecture: {answers.architecture_pattern}\n"
            f"Research focus: {answers.research_focus}"
        ),
        "Interview answers",
        source="interview",
        tags=["interview", "decision"],
        run_id=run_id,
    )
)
4

Step 1 — Context Generation (deterministic)

write_context_files(state, root) is called directly — not through _run_step() — because it is pure Python and cannot fail with a ToolResult. The five context files are written synchronously. If an exception is raised, it is caught and printed in red, but the pipeline continues. save_state is called immediately after.
console.print("\n[bold blue]1/5 Context Generation[/] — deterministic")
try:
    created = write_context_files(state, root)
    console.print(f"  [green]{len(created)} context files written[/green]")
except Exception as e:
    console.print(f"  [red]Context generation failed: {e}[/red]")
save_state(state, root)
5

Step 2 — Research (ResearchAdapter)

A ResearchAdapter is instantiated with the shared bridge and memory, then dispatched through _run_step(). On success, the artifact path is recorded in state.artifacts["research_report"].
research = ResearchAdapter(root=root, dry_run=dry_run, bridge=bridge, memory=memory)
result = _run_step(
    state, root, "research", 2, 5,
    lambda: research.execute(state.interview),
    bus=bus,
)
if result and result.success:
    for a in result.artifacts:
        state.artifacts["research_report"] = a
6

Step 3 — Strategy (StrategyAdapter)

A StrategyAdapter is instantiated and dispatched. On success, state.artifacts["strategy_report"] is populated. This step always runs regardless of whether research succeeded or failed.
strategy = StrategyAdapter(root=root, dry_run=dry_run, bridge=bridge, memory=memory)
result = _run_step(
    state, root, "strategy", 3, 5,
    lambda: strategy.pressure_test(state.interview),
    bus=bus,
)
7

Step 4 — GSD Management (GSDAdapter)

GSDAdapter.new_project(state) re-runs write_context_files() from inside the adapter (or writes the mock roadmap in dry-run mode). The artifact is recorded in state.artifacts["roadmap"].
gsd = GSDAdapter(root=root, dry_run=dry_run, bridge=bridge, memory=memory)
result = _run_step(state, root, "gsd", 4, 5, lambda: gsd.new_project(state), bus=bus)
8

Step 5 — Discipline Audit (pure Python)

check_setup(root) is called directly (not via _run_step()) because it returns an AuditResult, not a ToolResult. The tool status is manually set to RUNNING then COMPLETED. The audit never marks itself BLOCKED.
console.print("\n[bold magenta]5/5 Discipline[/] — audit")
update_tool(state, "discipline", status=ToolStatus.RUNNING)
save_state(state, root)
audit = check_setup(root)
update_tool(state, "discipline", status=ToolStatus.COMPLETED)
console.print(f"  [green]{audit.summary}[/green]")
save_state(state, root)
9

Close MemoryStore, print summary, show next steps

After all five steps, memory.close() flushes and closes the SQLite connection. _print_summary() renders completed/blocked counts and the list of context files. print_next_steps() from launcher.py detects installed tools and prints suggested commands.
memory.close()
_print_summary(state)
print_next_steps(state, root)

_run_step() — The Generic Step Runner

_run_step() is used for steps 2–4 (research, strategy, gsd). It handles the full lifecycle of a tool execution:
def _run_step(
    state: FlowStateModel,
    root: Path,
    tool_name: str,
    step_num: int,
    total_steps: int,
    execute_fn,
    bus: EventBus | None = None,
) -> None:
The execution flow inside _run_step():
  1. Prints a colored step header using STEP_STYLES and STEP_LABELS
  2. Calls update_tool(state, tool_name, status=ToolStatus.RUNNING) and save_state()
  3. Calls execute_fn() — the adapter method wrapped in a lambda
  4. On success: sets status to COMPLETED, appends each artifact, prints the first 300 chars of output, emits StepCompleted
  5. On failure: sets status to BLOCKED, stores the error string, prints it in red, emits StepFailed
  6. Calls save_state() again — state is always persisted whether the step succeeded or failed
Because save_state() is called both before and after execute_fn(), a crash during a long bridge call will leave the tool in RUNNING state in flowstate.json. Re-running flowstate init --skip-interview will re-execute that step rather than skipping it.
print_status(root) loads flowstate.json and renders a Rich table. It is called by flowstate status:
def print_status(root: Path) -> None:
    state = load_state(root)
    table = Table(title="FlowState Status", border_style="blue")
    table.add_column("Tool", style="bold")
    table.add_column("Phase")
    table.add_column("Status")
    table.add_column("Artifacts")
    table.add_column("Error")
    ...
Status values are colored: READY → dim, RUNNING → yellow, COMPLETED → green, BLOCKED → red. After the table, context files and named artifacts are listed separately, followed by the project name and state file path.

run_phase()

run_phase(state, root, phase) prints the native session command for executing a specific GSD phase. It delegates to launch_command("gsd", phase, root) from launcher.py and does not call any tool itself:
def run_phase(state: FlowStateModel, root: Path, phase: int) -> FlowStateModel:
    from flowstate.launcher import launch_command

    console.print(f"\n[bold blue]Phase {phase} — Native Execution[/bold blue]")
    console.print(
        "\n[yellow]GSD phases run natively inside Claude Code sessions.[/yellow]"
        "\nUse the following command:\n"
    )

    cmd = launch_command("gsd", phase, root)
    console.print(f"  [cyan]{cmd}[/cyan]")
    console.print()

    save_state(state, root)
    return state
The printed command looks like:
cd /path/to/project && claude
 /gsd:plan-phase 1

Bridge Construction and Preferences

_make_bridge() reads three optional preferences from ProjectPreferences when constructing the BridgeConfig. All three are optional — passing None leaves the bridge using its defaults:
def _make_bridge(root: Path, dry_run: bool, preferences=None) -> ClaudeBridge:
    kwargs = {"project_root": root}
    if preferences:
        if preferences.model:
            kwargs["model"] = preferences.model
        if preferences.max_budget_usd is not None:
            kwargs["max_budget_usd"] = preferences.max_budget_usd
        if preferences.effort:
            kwargs["effort"] = preferences.effort
    config = BridgeConfig(**kwargs)
    return ClaudeBridge(config=config, dry_run=dry_run)
The same bridge instance is shared across all three adapters (research, strategy, gsd). This means all three bridge calls in a single pipeline run use the same model, budget, and effort settings.

Build docs developers (and LLMs) love