Skip to main content

Overview

Building reliable AI agents requires robust error handling strategies. Koog provides multiple mechanisms for handling failures, implementing retries, and ensuring fault tolerance at different levels of the agent execution pipeline.

Error Types

Agent Errors

Koog defines several error types:
// Service-level errors
enum class AgentServiceErrorType {
    UNEXPECTED_MESSAGE_TYPE,  // Invalid message format
    MALFORMED_MESSAGE,        // Parsing errors
    AGENT_NOT_FOUND,          // Agent lookup failure
    UNEXPECTED_ERROR          // Catch-all for unexpected issues
}

// Error model
data class AgentServiceError(
    val type: AgentServiceErrorType,
    val message: String
) {
    fun asException(): AgentEngineException
}

Common Failure Scenarios

  1. LLM API Failures
    • Rate limits (429)
    • Timeouts
    • Invalid API keys
    • Service outages
  2. Tool Execution Failures
    • Tool not found
    • Invalid arguments
    • Execution timeout
    • External service errors
  3. Network Failures
    • Connection timeouts
    • DNS resolution failures
    • TLS/SSL errors
  4. Validation Failures
    • Schema validation errors
    • Type mismatches
    • Missing required fields

Retry Strategies

LLM Client Retries

Automatic retry with exponential backoff for LLM calls:
import ai.koog.prompt.executor.clients.retry.RetryingLLMClient
import ai.koog.prompt.executor.clients.retry.RetryConfig

// Create retrying client
val client = AnthropicLLMClient(apiKey)
val retryingClient = RetryingLLMClient(
    delegate = client,
    config = RetryConfig(
        maxAttempts = 3,
        initialDelay = 1.seconds,
        maxDelay = 30.seconds,
        backoffMultiplier = 2.0,
        jitterFactor = 0.1
    )
)

val executor = PromptExecutor(retryingClient)

Built-in Retry Configurations

// Conservative: Fewer retries, longer delays
val conservativeClient = client.toRetryingClient(
    RetryConfig.CONSERVATIVE
)

// Aggressive: More retries, shorter delays
val aggressiveClient = client.toRetryingClient(
    RetryConfig.AGGRESSIVE
)

// Default: Balanced approach
val defaultClient = client.toRetryingClient(
    RetryConfig.DEFAULT
)

// Custom configuration
val customClient = client.toRetryingClient(
    RetryConfig(
        maxAttempts = 5,
        initialDelay = 500.milliseconds,
        maxDelay = 60.seconds,
        backoffMultiplier = 2.5,
        jitterFactor = 0.2,
        retryablePatterns = listOf(
            Regex("rate limit", RegexOption.IGNORE_CASE),
            Regex("timeout", RegexOption.IGNORE_CASE),
            Regex("503 Service Unavailable"),
            Regex("502 Bad Gateway")
        )
    )
)

Retry Configuration Options

class RetryConfig(
    // Maximum number of retry attempts
    val maxAttempts: Int = 3,
    
    // Initial delay before first retry
    val initialDelay: Duration = 1.seconds,
    
    // Maximum delay between retries
    val maxDelay: Duration = 30.seconds,
    
    // Exponential backoff multiplier
    val backoffMultiplier: Double = 2.0,
    
    // Jitter factor (0.0 to 1.0) to randomize delays
    val jitterFactor: Double = 0.1,
    
    // Patterns to match against error messages
    val retryablePatterns: List<Regex> = defaultRetryablePatterns,
    
    // Extract retry-after hints from errors
    val retryAfterExtractor: RetryAfterExtractor? = null
)

Streaming Retry Behavior

Streaming calls have special retry semantics:
// Only retries connection failures before first token
val stream = retryingClient.executeStreaming(prompt, model, tools)

stream.collect { chunk ->
    // Once streaming starts, errors propagate without retry
    // This prevents duplicate content
    process(chunk)
}

Subgraph Retries

Basic Retry Subgraph

Retry a subgraph until a condition is met:
val retrySearch by subgraphWithRetry<String, SearchResult>(
    condition = { result ->
        if (result.documents.isNotEmpty()) {
            ConditionResult.Approve
        } else {
            ConditionResult.Reject("No documents found. Try different keywords.")
        }
    },
    maxRetries = 3,
    conditionDescription = "Search must return at least one document"
) {
    val searchNode by nodeExecuteTool<SearchArgs>(SearchTool)
    nodeStart then searchNode then nodeFinish
}

// Usage
nodeStart then retrySearch then processResults

// Result includes metadata
val result: RetrySubgraphResult<SearchResult> = retrySearch.execute("kotlin agents")
println("Success: ${result.success}, Attempts: ${result.retryCount}")

Simple Retry Subgraph

Return output directly without metadata:
val retryLLMCall by subgraphWithRetrySimple<String, Message.Response>(
    condition = { response ->
        // Approve if we get a tool call
        (response is Message.Tool.Call).asConditionResult
    },
    maxRetries = 2,
    strict = true  // Throw error if condition not met
) {
    val callLLM by nodeLLMRequest("generate_tool_call")
    nodeStart then callLLM then nodeFinish
}

// Direct usage
val response: Message.Response = retryLLMCall.execute("Search for Kotlin docs")

Conditional Retry Logic

val validateOutput by subgraphWithRetry<Data, ValidatedData>(
    condition = { output ->
        when {
            output.isValid() -> ConditionResult.Approve
            output.canBeFixed() -> ConditionResult.Reject(
                "Data validation failed: ${output.errors}. Please fix these issues."
            )
            else -> ConditionResult.Reject(
                "Data is invalid and cannot be fixed. Stopping retries."
            )
        }
    },
    maxRetries = 5
) {
    val processNode by node<Data, ValidatedData> { /* ... */ }
    nodeStart then processNode then nodeFinish
}

Try-Catch in Nodes

Node-Level Error Handling

val robustNode by node<Input, Output>("robust-processor") { input ->
    try {
        // Main processing logic
        val result = processInput(input)
        result
    } catch (e: ValidationException) {
        logger.warn { "Validation failed: ${e.message}" }
        // Return default or fallback value
        Output.default()
    } catch (e: TimeoutException) {
        logger.error { "Processing timeout" }
        // Retry with simpler approach
        processFallback(input)
    } catch (e: Exception) {
        logger.error(e) { "Unexpected error" }
        // Propagate critical errors
        throw e
    }
}

Graceful Degradation

val llmWithFallback by node<String, String>("llm-with-fallback") { input ->
    try {
        // Try primary model
        llm.complete(input, model = GPT4)
    } catch (e: RateLimitException) {
        logger.warn { "GPT-4 rate limited, falling back to GPT-3.5" }
        llm.complete(input, model = GPT3_5_TURBO)
    } catch (e: TimeoutException) {
        logger.warn { "LLM timeout, using cached response" }
        cache.getOrDefault(input, "Unable to process request")
    }
}

Pipeline-Level Error Handling

Intercept Failures

Handle errors at different pipeline stages:
val agent = AIAgent("fault-tolerant") {
    // Custom error handling feature
    install(ErrorRecovery) {
        // Handle agent-level failures
        onAgentExecutionFailed { context ->
            logger.error(context.throwable) { "Agent execution failed" }
            notifyAdministrator(context.throwable)
            // Optionally return default response
        }
        
        // Handle LLM failures
        onLLMCallFailed { context ->
            when {
                context.error.message?.contains("rate limit") == true -> {
                    logger.warn { "Rate limited, will retry" }
                    delay(retryDelay)
                }
                context.error.message?.contains("timeout") == true -> {
                    logger.warn { "Timeout, switching to faster model" }
                    switchToFallbackModel()
                }
            }
        }
        
        // Handle tool failures
        onToolCallFailed { context ->
            logger.error { "Tool ${context.toolName} failed: ${context.error.message}" }
            metrics.recordToolFailure(context.toolName)
        }
    }
}

Custom Error Recovery Feature

class ErrorRecovery {
    companion object Feature : AIAgentGraphFeature<ErrorRecoveryConfig, ErrorRecovery> {
        override val key = AIAgentStorageKey<ErrorRecovery>("error-recovery")
        
        override fun createInitialConfig() = ErrorRecoveryConfig()
        
        override fun install(
            config: ErrorRecoveryConfig,
            pipeline: AIAgentGraphPipeline
        ): ErrorRecovery {
            // Intercept LLM failures
            pipeline.interceptLLMCallFailed(this) { context ->
                val error = context.error
                
                when {
                    isRateLimitError(error) -> {
                        delay(config.rateLimitDelay)
                        // Retry happens automatically
                    }
                    
                    isTimeoutError(error) -> {
                        // Switch to faster model
                        context.context.llm.writeSession {
                            this.config = config.copy(
                                model = config.fallbackModel
                            )
                        }
                    }
                    
                    isAuthenticationError(error) -> {
                        // Don't retry auth errors
                        config.alertHandler?.alert(
                            "Authentication failed for ${context.model.provider}"
                        )
                        throw error
                    }
                    
                    else -> {
                        config.errorLogger?.log(error)
                    }
                }
            }
            
            // Intercept tool failures
            pipeline.interceptToolCallFailed(this) { context ->
                if (config.continueOnToolFailure) {
                    logger.warn { "Tool ${context.toolName} failed, continuing" }
                    // Don't propagate error
                } else {
                    throw context.error.asException()
                }
            }
            
            return ErrorRecovery()
        }
    }
}

Fallback Strategies

Model Fallback Chain

class ModelFallbackChain(private val models: List<LLModel>) {
    suspend fun execute(prompt: Prompt): Message.Assistant {
        var lastError: Throwable? = null
        
        for (model in models) {
            try {
                return executor.execute(prompt, model).first() as Message.Assistant
            } catch (e: Exception) {
                logger.warn { "Model $model failed: ${e.message}" }
                lastError = e
            }
        }
        
        throw lastError ?: IllegalStateException("All models failed")
    }
}

val fallbackChain = ModelFallbackChain(listOf(
    GPT4,
    GPT3_5_TURBO,
    CLAUDE_3_HAIKU
))

Service Fallback

val searchWithFallback by node<Query, Results>("search") { query ->
    try {
        // Try primary search service
        primarySearchService.search(query)
    } catch (e: Exception) {
        logger.warn { "Primary search failed, trying fallback" }
        try {
            // Try fallback search service
            fallbackSearchService.search(query)
        } catch (e2: Exception) {
            logger.error { "All search services failed" }
            // Return cached or default results
            cachedSearchResults.getOrDefault(query, emptyResults())
        }
    }
}

Circuit Breaker Pattern

class CircuitBreaker(
    private val failureThreshold: Int = 5,
    private val resetTimeout: Duration = 60.seconds
) {
    private var failureCount = 0
    private var state = State.CLOSED
    private var lastFailureTime: Instant? = null
    
    enum class State { CLOSED, OPEN, HALF_OPEN }
    
    suspend fun <T> execute(block: suspend () -> T): T {
        when (state) {
            State.OPEN -> {
                if (Clock.System.now() - lastFailureTime!! > resetTimeout) {
                    state = State.HALF_OPEN
                } else {
                    throw CircuitBreakerOpenException("Circuit breaker is open")
                }
            }
            State.HALF_OPEN -> {
                // Allow one request through
            }
            State.CLOSED -> {
                // Normal operation
            }
        }
        
        return try {
            val result = block()
            onSuccess()
            result
        } catch (e: Exception) {
            onFailure()
            throw e
        }
    }
    
    private fun onSuccess() {
        failureCount = 0
        state = State.CLOSED
    }
    
    private fun onFailure() {
        failureCount++
        lastFailureTime = Clock.System.now()
        
        if (failureCount >= failureThreshold) {
            state = State.OPEN
        }
    }
}

// Usage
val circuitBreaker = CircuitBreaker()

val externalCall by node<Input, Output>("external-call") { input ->
    circuitBreaker.execute {
        externalService.call(input)
    }
}

Timeout Configuration

HTTP Client Timeouts

val httpClient = HttpClient(CIO) {
    install(HttpTimeout) {
        requestTimeoutMillis = 30000  // 30 seconds
        connectTimeoutMillis = 10000  // 10 seconds
        socketTimeoutMillis = 30000   // 30 seconds
    }
}

Node Execution Timeouts

val timeoutNode by node<Input, Output>("with-timeout") { input ->
    withTimeout(30.seconds) {
        longRunningOperation(input)
    }
}

Best Practices

1. Categorize Errors

when (error) {
    is RetryableError -> retry()
    is PermanentError -> fail()
    is TransientError -> retryWithBackoff()
}

2. Log Contextual Information

catch (e: Exception) {
    logger.error(e) {
        """Agent execution failed:
        |  Agent ID: $agentId
        |  Run ID: $runId
        |  Input: $input
        |  Stage: ${executionInfo.path()}
        """.trimMargin()
    }
}

3. Set Appropriate Timeouts

// Short timeout for fast operations
val quickCheck = withTimeout(5.seconds) { /* ... */ }

// Long timeout for complex analysis
val deepAnalysis = withTimeout(5.minutes) { /* ... */ }

4. Monitor Error Rates

pipeline.interceptAgentExecutionFailed(this) { context ->
    metrics.recordError(
        agentId = context.agentId,
        errorType = context.throwable::class.simpleName,
        errorMessage = context.throwable.message
    )
}

5. Provide Meaningful Error Messages

// ❌ Bad: Generic error
throw Exception("Error")

// ✅ Good: Specific, actionable error
throw ToolExecutionException(
    toolName = "search",
    message = "Search failed: API key invalid or expired. Please check configuration.",
    cause = originalException
)

Testing Error Scenarios

class ErrorHandlingTest {
    @Test
    fun `should retry on rate limit error`() = runTest {
        var attempts = 0
        
        val mockClient = object : LLMClient() {
            override suspend fun execute(
                prompt: Prompt,
                model: LLModel,
                tools: List<ToolDescriptor>
            ): List<Message.Response> {
                attempts++
                if (attempts < 3) {
                    throw RateLimitException("Rate limit exceeded")
                }
                return listOf(Message.Assistant("Success"))
            }
        }
        
        val retryingClient = RetryingLLMClient(mockClient)
        val result = retryingClient.execute(prompt, model, emptyList())
        
        assertEquals(3, attempts)
        assertEquals("Success", (result.first() as Message.Assistant).content)
    }
}

Resources

Build docs developers (and LLMs) love