Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/vrashmanyu605-eng/devops-root-cause-analysis-agent/llms.txt

Use this file to discover all available pages before exploring further.

The RCA Agent ships with built-in connectors for Prometheus, Elasticsearch, and Jaeger, but the connector system is designed as an open plugin architecture — any observability platform, internal data store, or custom signal source can be integrated by implementing a single abstract base class and registering the connector in two places. The agent discovers connectors at startup, so once your class is registered it becomes available immediately in both the Streamlit UI source selector and the Python API sources parameter.

Connector Interface

Every connector must extend BaseConnector from app/connectors/base.py and implement two methods: fetch_signals, which retrieves raw signals for a given time window, and health_check, which the agent calls to verify the data source is reachable before dispatching a task.
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List
from app.models import Signal

class BaseConnector(ABC):
    """Base class for all RCA Agent data source connectors."""

    @abstractmethod
    def fetch_signals(
        self,
        start_time: datetime,
        end_time: datetime,
        context: str = "",
    ) -> List[Signal]:
        """Fetch signals from the data source for the given time window."""
        ...

    @abstractmethod
    def health_check(self) -> bool:
        """Return True if the data source is reachable and healthy."""
        ...
The context string passed to fetch_signals is the same incident description the user provided at analysis time. Use it to scope queries to relevant services or namespaces — for example, extracting a service name from the context and filtering your search index accordingly will produce cleaner, more focused signals.

The Signal Model

fetch_signals must return a list of Signal objects (defined in app/models.py). The agent’s correlation and LLM ranking steps operate entirely on this standard model, so the quality of your connector’s output directly affects the quality of the hypotheses produced.
FieldTypeRequiredDescription
sourcestrUnique identifier for this connector (e.g. "my-splunk"). Appears in evidence excerpts.
signal_typestrOne of "log", "metric", or "trace".
timestampdatetimeWhen the signal occurred. Should be timezone-aware UTC.
contentstrRaw signal content: a log line, metric value string, or trace span summary.
severitystrOne of "info", "warning", "error", or "critical". Used to weight signals during correlation.
metadatadictArbitrary key-value pairs for additional context (e.g. {"service": "payment-api", "region": "us-east-1"}).
The LLM reasoning step receives content and severity as its primary inputs. Richer, more specific content strings produce better hypotheses — prefer full log lines and labeled metric readings over truncated or unlabeled values.

Implementing a Connector

The example below implements a Splunk connector that searches a configured index and maps each result event to a Signal object. Use it as a reference for your own integration.
import os
import requests
from datetime import datetime
from typing import List
from app.connectors.base import BaseConnector
from app.models import Signal

class SplunkConnector(BaseConnector):
    def __init__(self):
        self.base_url = os.getenv("SPLUNK_URL")
        self.token = os.getenv("SPLUNK_TOKEN")
        self.index = os.getenv("SPLUNK_INDEX", "main")

    def fetch_signals(
        self,
        start_time: datetime,
        end_time: datetime,
        context: str = "",
    ) -> List[Signal]:
        response = requests.get(
            f"{self.base_url}/services/search/jobs/export",
            headers={"Authorization": f"Bearer {self.token}"},
            params={
                "search": f"search index={self.index} earliest={start_time.isoformat()} latest={end_time.isoformat()}",
                "output_mode": "json",
            },
        )
        response.raise_for_status()
        return [
            Signal(
                source="splunk",
                signal_type="log",
                timestamp=datetime.fromisoformat(event["_time"]),
                content=event["_raw"],
                severity=event.get("severity", "info"),
            )
            for event in response.json()["results"]
        ]

    def health_check(self) -> bool:
        try:
            resp = requests.get(
                f"{self.base_url}/services/server/info",
                headers={"Authorization": f"Bearer {self.token}"},
                timeout=5,
            )
            return resp.status_code == 200
        except Exception:
            return False
Save this file as app/connectors/splunk.py. Keep one connector per file and name the file after the connector key you plan to register in the next step.

Registering the Connector

Connector registration happens in two places: the connector registry (so the agent can instantiate your class by name) and agent.yaml (so it appears in the source selector and is included in analyses). 1. Add the class to the connector registry in app/connectors/__init__.py:
from app.connectors.splunk import SplunkConnector

CONNECTOR_REGISTRY = {
    "splunk": SplunkConnector,
    # ... other connectors
}
The dictionary key is the canonical name used everywhere else — in the sources list, in agent.yaml, and in the source field of the Signal objects your connector emits. 2. Enable the connector in agent.yaml:
analysis:
  sources:
    - prometheus
    - elasticsearch
    - splunk   # your new connector
Connectors listed here are included in every analysis run by default (unless the user deselects them in the UI or omits them from the sources parameter in the Python API).
Add the environment variables your connector needs to .env (or your secrets manager) before starting the agent. For the Splunk example above, set SPLUNK_URL, SPLUNK_TOKEN, and optionally SPLUNK_INDEX. Missing environment variables will cause fetch_signals to fail at runtime with an unhelpful requests error rather than a clear configuration message — validate them in __init__ and raise a ValueError early if any are absent.
Before deploying a new connector, write a unit test that mocks the external HTTP call and asserts that fetch_signals returns the expected Signal objects. Use unittest.mock.patch or responses to intercept requests.get and return a fixture payload. A passing test confirms that your timestamp parsing, severity mapping, and Signal field assignments are correct — catching bugs before they corrupt a live incident analysis.

Build docs developers (and LLMs) love