Documentation Index
Fetch the complete documentation index at: https://mintlify.com/JetBrains/koog/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Koog provides comprehensive observability capabilities to monitor AI agents in production. Track execution flows, measure performance, debug issues, and gain insights into agent behavior using OpenTelemetry, custom tracing, and metrics.
OpenTelemetry Integration
Installation
Add the OpenTelemetry feature to your agent (JVM only):
import ai.koog.agents.features.opentelemetry.feature.OpenTelemetry
val agent = AIAgent("production-agent") {
install(OpenTelemetry) {
setServiceInfo(
serviceName = "my-ai-service",
serviceVersion = "1.0.0"
)
// Configure exporters
addSpanExporter(JaegerExporter())
// Enable verbose mode for detailed telemetry
setVerbose(true)
}
}
Configuration Options
install(OpenTelemetry) {
// Service identification
setServiceInfo(
serviceName = "ai-agent-service",
serviceVersion = "2.1.0"
)
// Add custom exporters
addSpanExporter(OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:4317")
.build())
// Configure span processors
addSpanProcessor { exporter ->
BatchSpanProcessor.builder(exporter)
.setMaxQueueSize(2048)
.setMaxExportBatchSize(512)
.build()
}
// Add resource attributes
addResourceAttributes(mapOf(
AttributeKey.stringKey("environment") to "production",
AttributeKey.stringKey("region") to "us-west-2",
AttributeKey.stringKey("version") to "1.0.0"
))
// Configure sampling
setSampler(Sampler.traceIdRatioBased(0.1)) // Sample 10%
// Enable verbose telemetry
setVerbose(true)
}
Using Custom SDK
Integrate with existing OpenTelemetry setup:
val sdk = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.build()
install(OpenTelemetry) {
setSdk(sdk)
}
Span Hierarchy
OpenTelemetry creates a hierarchical span structure:
📊 Create Agent Span (ai.koog.agent.create)
└─ 🚀 Invoke Agent Span (ai.koog.agent.invoke)
├─ 🎯 Strategy Span (ai.koog.agent.strategy)
│ ├─ 📦 Subgraph Execute Span (ai.koog.agent.subgraph.execute)
│ │ ├─ 🔵 Node Execute Span (ai.koog.agent.node.execute)
│ │ │ └─ 🤖 Inference Span (ai.koog.llm.inference)
│ │ └─ 🔵 Node Execute Span
│ │ └─ 🛠️ Execute Tool Span (ai.koog.tool.execute)
│ └─ 📦 Subgraph Execute Span
└─ 🎯 Strategy Span
Span Types
Agent Spans
ai.koog.agent.create - Agent creation and configuration
ai.koog.agent.invoke - Single agent execution run
Strategy Spans
ai.koog.agent.strategy - Strategy execution (graph, functional, planner)
Graph Spans (Graph agents only)
ai.koog.agent.subgraph.execute - Subgraph execution
ai.koog.agent.node.execute - Individual node execution
LLM Spans
ai.koog.llm.inference - LLM API call with prompt and response
Tool Spans
ai.koog.tool.execute - Tool execution with arguments and result
Span Attributes
Spans include rich metadata following OpenTelemetry semantic conventions:
// Agent attributes
gen_ai.agent.id = "my-agent"
gen_ai.agent.run_id = "run-12345"
// LLM attributes
gen_ai.system = "openai"
gen_ai.request.model = "gpt-4"
gen_ai.request.max_tokens = 2000
gen_ai.request.temperature = 0.7
gen_ai.response.finish_reasons = ["stop"]
gen_ai.usage.prompt_tokens = 150
gen_ai.usage.completion_tokens = 75
// Tool attributes
gen_ai.tool.name = "search_documents"
gen_ai.tool.description = "Search through documentation"
Span Events
Important events are recorded within spans:
// Message events
gen_ai.system.message
gen_ai.user.message
gen_ai.assistant.message
gen_ai.tool.message
// Choice events (tool calls)
gen_ai.choice
Tracing Feature
Installation
The Tracing feature provides lightweight debugging (all platforms):
import ai.koog.agents.features.tracing.feature.Tracing
import ai.koog.agents.features.tracing.writer.*
val agent = AIAgent("debug-agent") {
install(Tracing) {
// Log to console
addMessageProcessor(TraceFeatureMessageLogWriter(logger))
// Write to file
addMessageProcessor(
TraceFeatureMessageFileWriter(
outputFile = File("agent-trace.jsonl"),
sinkProvider = FileSystem.SYSTEM::sink
)
)
// Send to remote endpoint
addMessageProcessor(
TraceFeatureMessageRemoteWriter(
endpoint = "https://trace-collector.example.com/traces"
)
)
}
}
Traces are emitted as JSON events:
{
"type": "AgentStarting",
"timestamp": "2024-03-05T10:15:30.123Z",
"agentId": "my-agent",
"runId": "run-12345",
"model": "gpt-4",
"messages": [
{"role": "system", "content": "You are a helpful assistant"},
{"role": "user", "content": "What is Kotlin?"}
]
}
{
"type": "LLMCallStarting",
"timestamp": "2024-03-05T10:15:30.456Z",
"eventId": "llm-call-1",
"model": "gpt-4",
"provider": "openai"
}
{
"type": "LLMCallCompleted",
"timestamp": "2024-03-05T10:15:32.789Z",
"eventId": "llm-call-1",
"responses": [
{"role": "assistant", "content": "Kotlin is a modern programming language..."}
],
"durationMs": 2333
}
Custom Trace Processors
Implement custom trace handling:
class MetricsTraceProcessor : FeatureMessageProcessor<TraceFeatureEventMessage> {
override suspend fun processMessage(message: TraceFeatureEventMessage) {
when (message) {
is TraceFeatureEventMessage.LLMCallCompleted -> {
metricsCollector.recordLatency(
provider = message.model.provider,
latencyMs = message.durationMs
)
}
is TraceFeatureEventMessage.ToolCallCompleted -> {
metricsCollector.recordToolUsage(
tool = message.toolName
)
}
}
}
}
install(Tracing) {
addMessageProcessor(MetricsTraceProcessor())
}
Metrics Collection
Custom Metrics Feature
Create a custom feature for metrics:
class AgentMetrics {
private val requestCounter = AtomicLong(0)
private val errorCounter = AtomicLong(0)
private val latencies = ConcurrentLinkedQueue<Long>()
companion object Feature : AIAgentGraphFeature<MetricsConfig, AgentMetrics> {
override val key = AIAgentStorageKey<AgentMetrics>("agent-metrics")
override fun createInitialConfig() = MetricsConfig()
override fun install(
config: MetricsConfig,
pipeline: AIAgentGraphPipeline
): AgentMetrics {
val metrics = AgentMetrics()
// Track requests
pipeline.interceptAgentStarting(this) {
metrics.requestCounter.incrementAndGet()
storage.set(createStorageKey("start_time"), Clock.System.now())
}
// Track latency
pipeline.interceptAgentCompleted(this) { context ->
val startTime = context.storage.get<Instant>(createStorageKey("start_time"))
val duration = (Clock.System.now() - startTime!!).inWholeMilliseconds
metrics.latencies.add(duration)
config.metricsRegistry?.recordHistogram(
"agent.execution.duration",
duration.toDouble(),
"agentId" to context.agentId
)
}
// Track errors
pipeline.interceptAgentExecutionFailed(this) {
metrics.errorCounter.incrementAndGet()
config.metricsRegistry?.incrementCounter(
"agent.execution.errors",
"agentId" to it.agentId,
"error" to it.throwable.message.orEmpty()
)
}
return metrics
}
}
fun getStats(): MetricsStats {
return MetricsStats(
totalRequests = requestCounter.get(),
totalErrors = errorCounter.get(),
avgLatencyMs = latencies.average(),
p95LatencyMs = latencies.sorted()[latencies.size * 95 / 100]
)
}
}
Prometheus Integration
import io.prometheus.client.Counter
import io.prometheus.client.Histogram
class PrometheusMetrics {
private val requestCounter = Counter.build()
.name("agent_requests_total")
.help("Total agent requests")
.labelNames("agent_id", "status")
.register()
private val latencyHistogram = Histogram.build()
.name("agent_execution_duration_seconds")
.help("Agent execution duration")
.labelNames("agent_id")
.register()
companion object Feature : AIAgentGraphFeature<PrometheusConfig, PrometheusMetrics> {
override fun install(
config: PrometheusConfig,
pipeline: AIAgentGraphPipeline
): PrometheusMetrics {
val metrics = PrometheusMetrics()
pipeline.interceptAgentCompleted(this) { context ->
metrics.requestCounter
.labels(context.agentId, "success")
.inc()
val duration = /* calculate duration */
metrics.latencyHistogram
.labels(context.agentId)
.observe(duration.inWholeSeconds.toDouble())
}
pipeline.interceptAgentExecutionFailed(this) { context ->
metrics.requestCounter
.labels(context.agentId, "error")
.inc()
}
return metrics
}
}
}
Logging
Structured Logging
import io.github.oshai.kotlinlogging.KotlinLogging
private val logger = KotlinLogging.logger {}
val agent = AIAgent("logged-agent") {
// Koog uses kotlin-logging internally
// Custom logging in nodes
node<String, String>("process") { input ->
logger.info { "Processing input: $input" }
val result = process(input)
logger.debug { "Result: $result" }
result
}
}
// Logback configuration (logback.xml)
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>logs/agent.log</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<logger name="ai.koog" level="DEBUG" />
<logger name="ai.koog.agents.core" level="INFO" />
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
</configuration>
Monitoring Dashboards
Jaeger Dashboard
View distributed traces in Jaeger:
# Run Jaeger locally
docker run -d --name jaeger \
-p 16686:16686 \
-p 4317:4317 \
jaegertracing/all-in-one:latest
# Access UI at http://localhost:16686
Grafana Dashboard
Create custom dashboards with Prometheus metrics:
{
"dashboard": {
"title": "AI Agent Metrics",
"panels": [
{
"title": "Request Rate",
"targets": [{
"expr": "rate(agent_requests_total[5m])"
}]
},
{
"title": "P95 Latency",
"targets": [{
"expr": "histogram_quantile(0.95, agent_execution_duration_seconds)"
}]
},
{
"title": "Error Rate",
"targets": [{
"expr": "rate(agent_requests_total{status=\"error\"}[5m])"
}]
}
]
}
}
Best Practices
1. Use Sampling in Production
install(OpenTelemetry) {
// Sample 10% of requests to reduce overhead
setSampler(Sampler.traceIdRatioBased(0.1))
}
2. Add Contextual Attributes
pipeline.interceptAgentStarting(this) { context ->
val span = Span.current()
span.setAttribute("user.id", context.userId)
span.setAttribute("request.id", context.requestId)
span.setAttribute("environment", "production")
}
3. Monitor Key Metrics
Track these essential metrics:
- Request rate (requests/second)
- Error rate (errors/total requests)
- P50, P95, P99 latency
- Token usage per request
- Tool execution frequency
- Agent success rate
4. Set Up Alerts
# Prometheus alerting rules
groups:
- name: agent_alerts
rules:
- alert: HighErrorRate
expr: rate(agent_requests_total{status="error"}[5m]) > 0.05
for: 5m
annotations:
summary: "High agent error rate"
- alert: HighLatency
expr: histogram_quantile(0.95, agent_execution_duration_seconds) > 10
for: 5m
annotations:
summary: "Agent P95 latency > 10s"
5. Use Correlation IDs
val agent = AIAgent("tracked-agent") {
node<Request, Response>("process") { request ->
val correlationId = request.headers["X-Correlation-ID"]
logger.info { "Processing request $correlationId" }
Span.current().setAttribute("correlation.id", correlationId)
// ...
}
}
Debugging in Production
Enable Verbose Mode Temporarily
// Use feature flags or environment variables
val verboseMode = System.getenv("ENABLE_VERBOSE_TELEMETRY") == "true"
install(OpenTelemetry) {
setVerbose(verboseMode)
}
Query Traces by Attributes
Find specific executions in Jaeger:
// Find all errors for a specific agent
service="my-ai-service" AND gen_ai.agent.id="production-agent" AND error=true
// Find slow requests
service="my-ai-service" AND duration>5s
// Find requests using specific tools
service="my-ai-service" AND gen_ai.tool.name="database_query"
OpenTelemetry Overhead
- Sampling (10%): ~2-5% overhead
- Full sampling: ~10-15% overhead
- Verbose mode: Additional 5-10% overhead
Optimization Tips
install(OpenTelemetry) {
// Use batch processing
addSpanProcessor { exporter ->
BatchSpanProcessor.builder(exporter)
.setMaxQueueSize(2048) // Buffer spans
.setMaxExportBatchSize(512) // Batch exports
.setExporterTimeout(Duration.ofSeconds(30))
.build()
}
// Disable verbose mode in production
setVerbose(false)
// Use appropriate sampling
setSampler(Sampler.traceIdRatioBased(0.1))
}
Resources