Skip to main content

Overview

Koog provides comprehensive observability capabilities to monitor AI agents in production. Track execution flows, measure performance, debug issues, and gain insights into agent behavior using OpenTelemetry, custom tracing, and metrics.

OpenTelemetry Integration

Installation

Add the OpenTelemetry feature to your agent (JVM only):
import ai.koog.agents.features.opentelemetry.feature.OpenTelemetry

val agent = AIAgent("production-agent") {
    install(OpenTelemetry) {
        setServiceInfo(
            serviceName = "my-ai-service",
            serviceVersion = "1.0.0"
        )
        
        // Configure exporters
        addSpanExporter(JaegerExporter())
        
        // Enable verbose mode for detailed telemetry
        setVerbose(true)
    }
}

Configuration Options

install(OpenTelemetry) {
    // Service identification
    setServiceInfo(
        serviceName = "ai-agent-service",
        serviceVersion = "2.1.0"
    )
    
    // Add custom exporters
    addSpanExporter(OtlpGrpcSpanExporter.builder()
        .setEndpoint("http://localhost:4317")
        .build())
    
    // Configure span processors
    addSpanProcessor { exporter ->
        BatchSpanProcessor.builder(exporter)
            .setMaxQueueSize(2048)
            .setMaxExportBatchSize(512)
            .build()
    }
    
    // Add resource attributes
    addResourceAttributes(mapOf(
        AttributeKey.stringKey("environment") to "production",
        AttributeKey.stringKey("region") to "us-west-2",
        AttributeKey.stringKey("version") to "1.0.0"
    ))
    
    // Configure sampling
    setSampler(Sampler.traceIdRatioBased(0.1)) // Sample 10%
    
    // Enable verbose telemetry
    setVerbose(true)
}

Using Custom SDK

Integrate with existing OpenTelemetry setup:
val sdk = OpenTelemetrySdk.builder()
    .setTracerProvider(tracerProvider)
    .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
    .build()

install(OpenTelemetry) {
    setSdk(sdk)
}

Span Hierarchy

OpenTelemetry creates a hierarchical span structure:
📊 Create Agent Span (ai.koog.agent.create)
  └─ 🚀 Invoke Agent Span (ai.koog.agent.invoke)
      ├─ 🎯 Strategy Span (ai.koog.agent.strategy)
      │   ├─ 📦 Subgraph Execute Span (ai.koog.agent.subgraph.execute)
      │   │   ├─ 🔵 Node Execute Span (ai.koog.agent.node.execute)
      │   │   │   └─ 🤖 Inference Span (ai.koog.llm.inference)
      │   │   └─ 🔵 Node Execute Span
      │   │       └─ 🛠️ Execute Tool Span (ai.koog.tool.execute)
      │   └─ 📦 Subgraph Execute Span
      └─ 🎯 Strategy Span

Span Types

Agent Spans
  • ai.koog.agent.create - Agent creation and configuration
  • ai.koog.agent.invoke - Single agent execution run
Strategy Spans
  • ai.koog.agent.strategy - Strategy execution (graph, functional, planner)
Graph Spans (Graph agents only)
  • ai.koog.agent.subgraph.execute - Subgraph execution
  • ai.koog.agent.node.execute - Individual node execution
LLM Spans
  • ai.koog.llm.inference - LLM API call with prompt and response
Tool Spans
  • ai.koog.tool.execute - Tool execution with arguments and result

Span Attributes

Spans include rich metadata following OpenTelemetry semantic conventions:
// Agent attributes
gen_ai.agent.id = "my-agent"
gen_ai.agent.run_id = "run-12345"

// LLM attributes
gen_ai.system = "openai"
gen_ai.request.model = "gpt-4"
gen_ai.request.max_tokens = 2000
gen_ai.request.temperature = 0.7
gen_ai.response.finish_reasons = ["stop"]
gen_ai.usage.prompt_tokens = 150
gen_ai.usage.completion_tokens = 75

// Tool attributes
gen_ai.tool.name = "search_documents"
gen_ai.tool.description = "Search through documentation"

Span Events

Important events are recorded within spans:
// Message events
gen_ai.system.message
gen_ai.user.message  
gen_ai.assistant.message
gen_ai.tool.message

// Choice events (tool calls)
gen_ai.choice

Tracing Feature

Installation

The Tracing feature provides lightweight debugging (all platforms):
import ai.koog.agents.features.tracing.feature.Tracing
import ai.koog.agents.features.tracing.writer.*

val agent = AIAgent("debug-agent") {
    install(Tracing) {
        // Log to console
        addMessageProcessor(TraceFeatureMessageLogWriter(logger))
        
        // Write to file
        addMessageProcessor(
            TraceFeatureMessageFileWriter(
                outputFile = File("agent-trace.jsonl"),
                sinkProvider = FileSystem.SYSTEM::sink
            )
        )
        
        // Send to remote endpoint
        addMessageProcessor(
            TraceFeatureMessageRemoteWriter(
                endpoint = "https://trace-collector.example.com/traces"
            )
        )
    }
}

Trace Output Format

Traces are emitted as JSON events:
{
  "type": "AgentStarting",
  "timestamp": "2024-03-05T10:15:30.123Z",
  "agentId": "my-agent",
  "runId": "run-12345",
  "model": "gpt-4",
  "messages": [
    {"role": "system", "content": "You are a helpful assistant"},
    {"role": "user", "content": "What is Kotlin?"}
  ]
}

{
  "type": "LLMCallStarting",
  "timestamp": "2024-03-05T10:15:30.456Z",
  "eventId": "llm-call-1",
  "model": "gpt-4",
  "provider": "openai"
}

{
  "type": "LLMCallCompleted",
  "timestamp": "2024-03-05T10:15:32.789Z",
  "eventId": "llm-call-1",
  "responses": [
    {"role": "assistant", "content": "Kotlin is a modern programming language..."}
  ],
  "durationMs": 2333
}

Custom Trace Processors

Implement custom trace handling:
class MetricsTraceProcessor : FeatureMessageProcessor<TraceFeatureEventMessage> {
    override suspend fun processMessage(message: TraceFeatureEventMessage) {
        when (message) {
            is TraceFeatureEventMessage.LLMCallCompleted -> {
                metricsCollector.recordLatency(
                    provider = message.model.provider,
                    latencyMs = message.durationMs
                )
            }
            is TraceFeatureEventMessage.ToolCallCompleted -> {
                metricsCollector.recordToolUsage(
                    tool = message.toolName
                )
            }
        }
    }
}

install(Tracing) {
    addMessageProcessor(MetricsTraceProcessor())
}

Metrics Collection

Custom Metrics Feature

Create a custom feature for metrics:
class AgentMetrics {
    private val requestCounter = AtomicLong(0)
    private val errorCounter = AtomicLong(0)
    private val latencies = ConcurrentLinkedQueue<Long>()
    
    companion object Feature : AIAgentGraphFeature<MetricsConfig, AgentMetrics> {
        override val key = AIAgentStorageKey<AgentMetrics>("agent-metrics")
        
        override fun createInitialConfig() = MetricsConfig()
        
        override fun install(
            config: MetricsConfig,
            pipeline: AIAgentGraphPipeline
        ): AgentMetrics {
            val metrics = AgentMetrics()
            
            // Track requests
            pipeline.interceptAgentStarting(this) {
                metrics.requestCounter.incrementAndGet()
                storage.set(createStorageKey("start_time"), Clock.System.now())
            }
            
            // Track latency
            pipeline.interceptAgentCompleted(this) { context ->
                val startTime = context.storage.get<Instant>(createStorageKey("start_time"))
                val duration = (Clock.System.now() - startTime!!).inWholeMilliseconds
                metrics.latencies.add(duration)
                
                config.metricsRegistry?.recordHistogram(
                    "agent.execution.duration",
                    duration.toDouble(),
                    "agentId" to context.agentId
                )
            }
            
            // Track errors
            pipeline.interceptAgentExecutionFailed(this) {
                metrics.errorCounter.incrementAndGet()
                
                config.metricsRegistry?.incrementCounter(
                    "agent.execution.errors",
                    "agentId" to it.agentId,
                    "error" to it.throwable.message.orEmpty()
                )
            }
            
            return metrics
        }
    }
    
    fun getStats(): MetricsStats {
        return MetricsStats(
            totalRequests = requestCounter.get(),
            totalErrors = errorCounter.get(),
            avgLatencyMs = latencies.average(),
            p95LatencyMs = latencies.sorted()[latencies.size * 95 / 100]
        )
    }
}

Prometheus Integration

import io.prometheus.client.Counter
import io.prometheus.client.Histogram

class PrometheusMetrics {
    private val requestCounter = Counter.build()
        .name("agent_requests_total")
        .help("Total agent requests")
        .labelNames("agent_id", "status")
        .register()
    
    private val latencyHistogram = Histogram.build()
        .name("agent_execution_duration_seconds")
        .help("Agent execution duration")
        .labelNames("agent_id")
        .register()
    
    companion object Feature : AIAgentGraphFeature<PrometheusConfig, PrometheusMetrics> {
        override fun install(
            config: PrometheusConfig,
            pipeline: AIAgentGraphPipeline
        ): PrometheusMetrics {
            val metrics = PrometheusMetrics()
            
            pipeline.interceptAgentCompleted(this) { context ->
                metrics.requestCounter
                    .labels(context.agentId, "success")
                    .inc()
                
                val duration = /* calculate duration */
                metrics.latencyHistogram
                    .labels(context.agentId)
                    .observe(duration.inWholeSeconds.toDouble())
            }
            
            pipeline.interceptAgentExecutionFailed(this) { context ->
                metrics.requestCounter
                    .labels(context.agentId, "error")
                    .inc()
            }
            
            return metrics
        }
    }
}

Logging

Structured Logging

import io.github.oshai.kotlinlogging.KotlinLogging

private val logger = KotlinLogging.logger {}

val agent = AIAgent("logged-agent") {
    // Koog uses kotlin-logging internally
    
    // Custom logging in nodes
    node<String, String>("process") { input ->
        logger.info { "Processing input: $input" }
        val result = process(input)
        logger.debug { "Result: $result" }
        result
    }
}

Configure Logging Backend

// Logback configuration (logback.xml)
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <file>logs/agent.log</file>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>
    
    <logger name="ai.koog" level="DEBUG" />
    <logger name="ai.koog.agents.core" level="INFO" />
    
    <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILE" />
    </root>
</configuration>

Monitoring Dashboards

Jaeger Dashboard

View distributed traces in Jaeger:
# Run Jaeger locally
docker run -d --name jaeger \
  -p 16686:16686 \
  -p 4317:4317 \
  jaegertracing/all-in-one:latest

# Access UI at http://localhost:16686

Grafana Dashboard

Create custom dashboards with Prometheus metrics:
{
  "dashboard": {
    "title": "AI Agent Metrics",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [{
          "expr": "rate(agent_requests_total[5m])"
        }]
      },
      {
        "title": "P95 Latency",
        "targets": [{
          "expr": "histogram_quantile(0.95, agent_execution_duration_seconds)"
        }]
      },
      {
        "title": "Error Rate",
        "targets": [{
          "expr": "rate(agent_requests_total{status=\"error\"}[5m])"
        }]
      }
    ]
  }
}

Best Practices

1. Use Sampling in Production

install(OpenTelemetry) {
    // Sample 10% of requests to reduce overhead
    setSampler(Sampler.traceIdRatioBased(0.1))
}

2. Add Contextual Attributes

pipeline.interceptAgentStarting(this) { context ->
    val span = Span.current()
    span.setAttribute("user.id", context.userId)
    span.setAttribute("request.id", context.requestId)
    span.setAttribute("environment", "production")
}

3. Monitor Key Metrics

Track these essential metrics:
  • Request rate (requests/second)
  • Error rate (errors/total requests)
  • P50, P95, P99 latency
  • Token usage per request
  • Tool execution frequency
  • Agent success rate

4. Set Up Alerts

# Prometheus alerting rules
groups:
  - name: agent_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(agent_requests_total{status="error"}[5m]) > 0.05
        for: 5m
        annotations:
          summary: "High agent error rate"
          
      - alert: HighLatency
        expr: histogram_quantile(0.95, agent_execution_duration_seconds) > 10
        for: 5m
        annotations:
          summary: "Agent P95 latency > 10s"

5. Use Correlation IDs

val agent = AIAgent("tracked-agent") {
    node<Request, Response>("process") { request ->
        val correlationId = request.headers["X-Correlation-ID"]
        logger.info { "Processing request $correlationId" }
        
        Span.current().setAttribute("correlation.id", correlationId)
        // ...
    }
}

Debugging in Production

Enable Verbose Mode Temporarily

// Use feature flags or environment variables
val verboseMode = System.getenv("ENABLE_VERBOSE_TELEMETRY") == "true"

install(OpenTelemetry) {
    setVerbose(verboseMode)
}

Query Traces by Attributes

Find specific executions in Jaeger:
// Find all errors for a specific agent
service="my-ai-service" AND gen_ai.agent.id="production-agent" AND error=true

// Find slow requests
service="my-ai-service" AND duration>5s

// Find requests using specific tools
service="my-ai-service" AND gen_ai.tool.name="database_query"

Performance Impact

OpenTelemetry Overhead

  • Sampling (10%): ~2-5% overhead
  • Full sampling: ~10-15% overhead
  • Verbose mode: Additional 5-10% overhead

Optimization Tips

install(OpenTelemetry) {
    // Use batch processing
    addSpanProcessor { exporter ->
        BatchSpanProcessor.builder(exporter)
            .setMaxQueueSize(2048)      // Buffer spans
            .setMaxExportBatchSize(512) // Batch exports
            .setExporterTimeout(Duration.ofSeconds(30))
            .build()
    }
    
    // Disable verbose mode in production
    setVerbose(false)
    
    // Use appropriate sampling
    setSampler(Sampler.traceIdRatioBased(0.1))
}

Resources

Build docs developers (and LLMs) love