Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/samkit511/SAW---Security-Analyst-Workspace/llms.txt

Use this file to discover all available pages before exploring further.

When SAW receives a triage request, it doesn’t execute a fixed script. Instead, the CoordinatorAgent calls plan_workflow_tool() at runtime to produce an ordered step list for the specific request_type, then iterates over it and dispatches each step to the appropriate sub-agent. All agents share a single ExecutionContext instance — one agent writes to a field, and the next agent reads it. This section walks through every agent’s responsibilities in a log_triage request, which is the most complete flow.

Step plan structure

plan_workflow_tool() returns a plan object with a steps array. Each entry names the agent, the step label, and a human-readable purpose:
def plan_workflow_tool(request_type: str, ...) -> dict:
    if request_type == "log_triage":
        steps = [
            {"agent": "CoordinatorAgent", "step": "plan",   "purpose": "Decompose the incident workflow"},
            {"agent": "DetectionAgent",   "step": "detect", "purpose": "Normalize log and classify threat"},
            {"agent": "RiskAgent",        "step": "assess", "purpose": "Evaluate risk, escalation, and decision"},
            {"agent": "MitigationAgent",  "step": "act",    "purpose": "Apply mitigation and create analyst tasks"},
            {"agent": "AuditAgent",       "step": "record", "purpose": "Persist collaboration and explainability"},
        ]
For incident_followup requests, the plan omits DetectionAgent and runs only Coordinator → Risk → Mitigation → Audit. For task_command requests, it runs Coordinator → Mitigation → Audit.

Pipeline walkthrough

1

CoordinatorAgent: plan and route

CoordinatorAgent.handle() creates the ExecutionContext, calls self.plan() to get the step list, records the plan step, and then iterates the remaining steps — skipping itself and deferring AuditAgent to the very end.
context.plan = self.plan(context)
# plan() calls plan_workflow_tool(context.request_type, ...)
# returns the steps list
For each step, CoordinatorAgent.route() resolves the agent name to the correct instance and calls await agent.run(context). If any agent raises an exception, the loop records a failure entry in context.trace["agent_orchestration"]["failures"] and breaks — but AuditAgent still runs.
2

DetectionAgent: normalize, classify, score

DetectionAgent.run() calls three tools in sequence and populates context.analysis and context.classification:
analysis = analyze_logs(context.normalized_input)
classification = validate_schema(detect_threat(analysis["payload"]))
base_risk_score = compute_risk(classification["confidence"], classification["severity"])
  • analyze_logs() extracts ip, path, method, and payload from the raw log using key-value parsing with a regex fallback.
  • detect_threat() first tries heuristic_detect() for a fast deterministic match; if nothing matches, it calls the Gemini API (in HYBRID mode).
  • validate_schema() normalizes the output, enforces allowed type and severity values, and runs calibrate_confidence() to map raw confidence to a discrete bucket.
  • compute_risk() multiplies calibrated confidence by severity weight (LOW=1, MEDIUM=2, HIGH=3).
DetectionAgent also creates a persisted event record via create_event() and stores the event ID in context.artifacts["event_id"].
3

RiskAgent: escalation check and decision

RiskAgent.run() reads context.classification and drives two sub-processes:Event memory and escalation
memory = update_events(
    source_ip,
    time.time(),
    severity=context.classification["severity"],
    threat_type=context.classification["threat_type"],
)
escalation = evaluate_escalation(memory)
update_events() appends the current event to the in-memory attack_memory store keyed by source IP and prunes events older than 60 seconds. evaluate_escalation() then checks two thresholds:
  • Burst: 3 or more events from the same IP in the last 15 seconds → profile = "burst"
  • Sustained: 5 or more events in 60 seconds with a weighted severity score ≥ 9 → profile = "sustained"
burst_active = len(events_15s) >= 3
sustained_active = len(events_60s) >= 5 and weighted_score >= 9
If either condition is true, context.classification["effective_risk_score"] is overridden to 3.0 (the maximum) and behavior is set to "Aggressive Attacker".Decision engine
decision = decision_engine(context.classification, escalated=escalation["status"])
decision_engine() compares the effective risk score against two configurable thresholds:
Risk scoreDecisionMeaning
≥ 2.5 (or escalated)EXECUTEApply mitigations immediately
≥ 1.5 and < 2.5OBSERVECreate analyst task, log for review
< 1.5IGNORELow-risk or false positive, no action
ADK advisory (optional)When confidence_bucket is LOW or MEDIUM, or detection_mode is "fallback", RiskAgent calls asa_agent.run() to request a Gemini-backed review:
if context.classification.get("confidence_bucket") in {"LOW", "MEDIUM"} \
        or context.classification.get("detection_mode") == "fallback":
    decision["adk_review"] = await self._review_with_adk(context, decision)
    if decision["adk_review"].get("override_ready"):
        decision["decision"] = decision["adk_review"]["recommended_decision"]
        decision["decision_source"] = "ADKCoordinator"
        decision["decision_authority"] = "llm_override"
The ADK review can only override the decision to one of EXECUTE, OBSERVE, or IGNORE. High-confidence deterministic results are never subject to override.
4

MitigationAgent: act on the decision

MitigationAgent.run() reads context.decision["decision"] and takes one of three paths:EXECUTE — calls mitigate() with the threat type and context, persists each action with create_action(), then creates a HIGH-priority follow-up task:
mitigation_result = mitigate(
    {"type": context.classification["threat_type"], ...},
    {"ip": source_ip, "applied_actions": memory["applied_actions"],
     "confidence": ..., "escalated": ...},
)
Mitigation is idempotent: mitigate() tracks applied action keys per IP so duplicate actions are never re-applied in the same session.OBSERVE — creates a MEDIUM-priority investigation task without applying any control-plane actions.IGNORE — records the step as completed with no actions or tasks.All results are stored in context.actions and context.tasks.
5

AuditAgent: persist the incident workspace

AuditAgent.run() always runs last, regardless of earlier failures. It calls update_incident() to write the final status, decision, threat type, source IP, and full trace_snapshot to the SQLite incident record:
update_incident(
    context.incident_id,
    status=incident_status,        # "ACTIONED" or "OPEN"
    summary=incident_summary,
    threat_type=context.classification.get("threat_type"),
    decision=context.decision.get("decision"),
    source_ip=context.analysis.get("ip"),
    metadata=metadata_payload,     # includes trace_snapshot for replay
)
It then fetches the full workspace — incident, tasks, actions, and agent run history — and stores it in context.artifacts["workspace"]. This workspace is returned in the final API response under both workspace and trace.incident_workspace.

ExecutionContext field reference

FieldWritten byRead by
analysisDetectionAgentRiskAgent, MitigationAgent, AuditAgent
classificationDetectionAgentRiskAgent, MitigationAgent, AuditAgent
decisionRiskAgentMitigationAgent, AuditAgent
actionsMitigationAgentAuditAgent, aggregator
tasksMitigationAgent, RiskAgent (ADK)AuditAgent, aggregator
traceAll agentsAuditAgent (persists as trace_snapshot)
artifactsDetectionAgent, MitigationAgent, AuditAgentaggregator
planCoordinatorAgentCoordinatorAgent (loop), aggregator
agent_messagesAll agents via _record()aggregator
agent_resultsAll agents via _record()aggregator
All agents inherit _record() from BaseRoleAgent. Every call to _record() writes an agent run row to SQLite via create_agent_run() and appends an entry to context.agent_messages. This gives you a complete, ordered trace of what each agent did, even when a step fails.

Build docs developers (and LLMs) love