Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/samkit511/SAW---Security-Analyst-Workspace/llms.txt

Use this file to discover all available pages before exploring further.

SAW is structured as a FastAPI service that accepts raw or structured log data, routes it through a five-agent orchestration pipeline, and persists every triage result — including agent runs, actions, and tasks — in SQLite. Each agent owns one discrete stage of the workflow and writes to a shared ExecutionContext dataclass that threads through the entire request lifecycle. Nothing is hardwired between agents; the CoordinatorAgent plans the execution order at runtime, which means the same codebase can handle log triage, incident follow-ups, and analyst task commands through different step plans without duplicating logic.

System layers

HTTP client


FastAPI / Uvicorn  ──  Pydantic request validation


CoordinatorAgent   ──  plan_workflow_tool() → step list

    ├─▶ DetectionAgent   (analyze_logs → detect_threat → validate_schema → compute_risk)
    ├─▶ RiskAgent        (update_events → evaluate_escalation → decision_engine → optional ADK review)
    ├─▶ MitigationAgent  (mitigate → create_action → create_task)
    └─▶ AuditAgent       (update_incident → persist workspace)


            SQLite storage

The five agents

CoordinatorAgent is the entry point for every request. It calls plan_workflow_tool() with the request_type to produce an ordered list of steps, then iterates over that list and dispatches each step to the correct sub-agent. It also owns the final aggregate() call that assembles the response payload. DetectionAgent normalizes the raw log with analyze_logs(), runs detect_threat() against the extracted payload, validates the output with validate_schema(), and computes a numeric risk_score via compute_risk(). It writes the classified threat — including confidence_bucket, detection_mode, and risk_breakdown — into ExecutionContext.classification. RiskAgent reads the classification from context, records the event in the in-memory attack_memory store via update_events(), evaluates burst and sustained escalation patterns with evaluate_escalation(), and calls decision_engine() to produce one of three outcomes: EXECUTE, OBSERVE, or IGNORE. When the confidence bucket is LOW or MEDIUM, it optionally requests an ADK advisory from the Gemini-backed ASAAgent. MitigationAgent acts on the decision from RiskAgent. For EXECUTE decisions, it calls mitigate() to apply control-plane actions (IP blocks, WAF rules, rate limits) and creates a high-priority follow-up task. For OBSERVE decisions, it creates a medium-priority analyst investigation task. For IGNORE decisions, it takes no action. AuditAgent always runs last, even if an earlier agent fails. It calls update_incident() to persist the final status, decision, and full trace snapshot to SQLite, then assembles the complete incident workspace — incident record, tasks, actions, and agent run history — into ExecutionContext.artifacts["workspace"].

ExecutionContext: the shared state carrier

Every agent reads from and writes to a single ExecutionContext dataclass instance created at the start of each request. You never pass data between agents as function arguments; instead, agents populate named fields that downstream agents read.
@dataclass
class ExecutionContext:
    request_id: str
    incident_id: str
    request_type: str
    raw_input: str
    normalized_input: str
    user_id: str = "demo"
    session_id: str | None = None
    source: str = "assistant_api"
    analysis: dict = field(default_factory=dict)       # DetectionAgent output
    classification: dict = field(default_factory=dict) # DetectionAgent + RiskAgent
    decision: dict = field(default_factory=dict)       # RiskAgent output
    actions: list = field(default_factory=list)        # MitigationAgent output
    tasks: list = field(default_factory=list)          # MitigationAgent output
    agent_messages: list = field(default_factory=list) # agent run trail
    artifacts: dict = field(default_factory=dict)      # workspace, event_id, memory
    trace: dict = field(default_factory=dict)          # per-stage debug data
    plan: list = field(default_factory=list)           # step plan from CoordinatorAgent
    agent_results: dict = field(default_factory=dict)  # per-agent status snapshot
    metadata: dict = field(default_factory=dict)       # request-specific extra data

Data flow

A log_triage request moves through the pipeline in a fixed sequence:
  1. FastAPI validates the request body and calls coordinator_agent.handle().
  2. CoordinatorAgent calls plan_workflow_tool("log_triage") and gets back a five-step plan.
  3. DetectionAgent normalizes the log, classifies the threat, and stores the result in context.classification.
  4. RiskAgent evaluates escalation, runs decision_engine(), and stores the result in context.decision.
  5. MitigationAgent applies actions or creates tasks based on context.decision.
  6. AuditAgent persists the full incident workspace to SQLite.
  7. CoordinatorAgent calls aggregate() and returns the structured response to the API caller.
AuditAgent is always deferred to run last, even if a previous agent fails. This guarantees that partial results and failure details are always persisted to the incident record.

Learn more

Multi-agent pipeline

Step-by-step walkthrough of what each agent does, the plan JSON structure, and escalation logic.

Hybrid execution

How SAW chooses between deterministic heuristics and LLM-assisted Gemini analysis.

Build docs developers (and LLMs) love