Overview
Building reliable AI agents requires robust error handling strategies. Koog provides multiple mechanisms for handling failures, implementing retries, and ensuring fault tolerance at different levels of the agent execution pipeline.Error Types
Agent Errors
Koog defines several error types:// Service-level errors
enum class AgentServiceErrorType {
UNEXPECTED_MESSAGE_TYPE, // Invalid message format
MALFORMED_MESSAGE, // Parsing errors
AGENT_NOT_FOUND, // Agent lookup failure
UNEXPECTED_ERROR // Catch-all for unexpected issues
}
// Error model
data class AgentServiceError(
val type: AgentServiceErrorType,
val message: String
) {
fun asException(): AgentEngineException
}
Common Failure Scenarios
-
LLM API Failures
- Rate limits (429)
- Timeouts
- Invalid API keys
- Service outages
-
Tool Execution Failures
- Tool not found
- Invalid arguments
- Execution timeout
- External service errors
-
Network Failures
- Connection timeouts
- DNS resolution failures
- TLS/SSL errors
-
Validation Failures
- Schema validation errors
- Type mismatches
- Missing required fields
Retry Strategies
LLM Client Retries
Automatic retry with exponential backoff for LLM calls:import ai.koog.prompt.executor.clients.retry.RetryingLLMClient
import ai.koog.prompt.executor.clients.retry.RetryConfig
// Create retrying client
val client = AnthropicLLMClient(apiKey)
val retryingClient = RetryingLLMClient(
delegate = client,
config = RetryConfig(
maxAttempts = 3,
initialDelay = 1.seconds,
maxDelay = 30.seconds,
backoffMultiplier = 2.0,
jitterFactor = 0.1
)
)
val executor = PromptExecutor(retryingClient)
Built-in Retry Configurations
// Conservative: Fewer retries, longer delays
val conservativeClient = client.toRetryingClient(
RetryConfig.CONSERVATIVE
)
// Aggressive: More retries, shorter delays
val aggressiveClient = client.toRetryingClient(
RetryConfig.AGGRESSIVE
)
// Default: Balanced approach
val defaultClient = client.toRetryingClient(
RetryConfig.DEFAULT
)
// Custom configuration
val customClient = client.toRetryingClient(
RetryConfig(
maxAttempts = 5,
initialDelay = 500.milliseconds,
maxDelay = 60.seconds,
backoffMultiplier = 2.5,
jitterFactor = 0.2,
retryablePatterns = listOf(
Regex("rate limit", RegexOption.IGNORE_CASE),
Regex("timeout", RegexOption.IGNORE_CASE),
Regex("503 Service Unavailable"),
Regex("502 Bad Gateway")
)
)
)
Retry Configuration Options
class RetryConfig(
// Maximum number of retry attempts
val maxAttempts: Int = 3,
// Initial delay before first retry
val initialDelay: Duration = 1.seconds,
// Maximum delay between retries
val maxDelay: Duration = 30.seconds,
// Exponential backoff multiplier
val backoffMultiplier: Double = 2.0,
// Jitter factor (0.0 to 1.0) to randomize delays
val jitterFactor: Double = 0.1,
// Patterns to match against error messages
val retryablePatterns: List<Regex> = defaultRetryablePatterns,
// Extract retry-after hints from errors
val retryAfterExtractor: RetryAfterExtractor? = null
)
Streaming Retry Behavior
Streaming calls have special retry semantics:// Only retries connection failures before first token
val stream = retryingClient.executeStreaming(prompt, model, tools)
stream.collect { chunk ->
// Once streaming starts, errors propagate without retry
// This prevents duplicate content
process(chunk)
}
Subgraph Retries
Basic Retry Subgraph
Retry a subgraph until a condition is met:val retrySearch by subgraphWithRetry<String, SearchResult>(
condition = { result ->
if (result.documents.isNotEmpty()) {
ConditionResult.Approve
} else {
ConditionResult.Reject("No documents found. Try different keywords.")
}
},
maxRetries = 3,
conditionDescription = "Search must return at least one document"
) {
val searchNode by nodeExecuteTool<SearchArgs>(SearchTool)
nodeStart then searchNode then nodeFinish
}
// Usage
nodeStart then retrySearch then processResults
// Result includes metadata
val result: RetrySubgraphResult<SearchResult> = retrySearch.execute("kotlin agents")
println("Success: ${result.success}, Attempts: ${result.retryCount}")
Simple Retry Subgraph
Return output directly without metadata:val retryLLMCall by subgraphWithRetrySimple<String, Message.Response>(
condition = { response ->
// Approve if we get a tool call
(response is Message.Tool.Call).asConditionResult
},
maxRetries = 2,
strict = true // Throw error if condition not met
) {
val callLLM by nodeLLMRequest("generate_tool_call")
nodeStart then callLLM then nodeFinish
}
// Direct usage
val response: Message.Response = retryLLMCall.execute("Search for Kotlin docs")
Conditional Retry Logic
val validateOutput by subgraphWithRetry<Data, ValidatedData>(
condition = { output ->
when {
output.isValid() -> ConditionResult.Approve
output.canBeFixed() -> ConditionResult.Reject(
"Data validation failed: ${output.errors}. Please fix these issues."
)
else -> ConditionResult.Reject(
"Data is invalid and cannot be fixed. Stopping retries."
)
}
},
maxRetries = 5
) {
val processNode by node<Data, ValidatedData> { /* ... */ }
nodeStart then processNode then nodeFinish
}
Try-Catch in Nodes
Node-Level Error Handling
val robustNode by node<Input, Output>("robust-processor") { input ->
try {
// Main processing logic
val result = processInput(input)
result
} catch (e: ValidationException) {
logger.warn { "Validation failed: ${e.message}" }
// Return default or fallback value
Output.default()
} catch (e: TimeoutException) {
logger.error { "Processing timeout" }
// Retry with simpler approach
processFallback(input)
} catch (e: Exception) {
logger.error(e) { "Unexpected error" }
// Propagate critical errors
throw e
}
}
Graceful Degradation
val llmWithFallback by node<String, String>("llm-with-fallback") { input ->
try {
// Try primary model
llm.complete(input, model = GPT4)
} catch (e: RateLimitException) {
logger.warn { "GPT-4 rate limited, falling back to GPT-3.5" }
llm.complete(input, model = GPT3_5_TURBO)
} catch (e: TimeoutException) {
logger.warn { "LLM timeout, using cached response" }
cache.getOrDefault(input, "Unable to process request")
}
}
Pipeline-Level Error Handling
Intercept Failures
Handle errors at different pipeline stages:val agent = AIAgent("fault-tolerant") {
// Custom error handling feature
install(ErrorRecovery) {
// Handle agent-level failures
onAgentExecutionFailed { context ->
logger.error(context.throwable) { "Agent execution failed" }
notifyAdministrator(context.throwable)
// Optionally return default response
}
// Handle LLM failures
onLLMCallFailed { context ->
when {
context.error.message?.contains("rate limit") == true -> {
logger.warn { "Rate limited, will retry" }
delay(retryDelay)
}
context.error.message?.contains("timeout") == true -> {
logger.warn { "Timeout, switching to faster model" }
switchToFallbackModel()
}
}
}
// Handle tool failures
onToolCallFailed { context ->
logger.error { "Tool ${context.toolName} failed: ${context.error.message}" }
metrics.recordToolFailure(context.toolName)
}
}
}
Custom Error Recovery Feature
class ErrorRecovery {
companion object Feature : AIAgentGraphFeature<ErrorRecoveryConfig, ErrorRecovery> {
override val key = AIAgentStorageKey<ErrorRecovery>("error-recovery")
override fun createInitialConfig() = ErrorRecoveryConfig()
override fun install(
config: ErrorRecoveryConfig,
pipeline: AIAgentGraphPipeline
): ErrorRecovery {
// Intercept LLM failures
pipeline.interceptLLMCallFailed(this) { context ->
val error = context.error
when {
isRateLimitError(error) -> {
delay(config.rateLimitDelay)
// Retry happens automatically
}
isTimeoutError(error) -> {
// Switch to faster model
context.context.llm.writeSession {
this.config = config.copy(
model = config.fallbackModel
)
}
}
isAuthenticationError(error) -> {
// Don't retry auth errors
config.alertHandler?.alert(
"Authentication failed for ${context.model.provider}"
)
throw error
}
else -> {
config.errorLogger?.log(error)
}
}
}
// Intercept tool failures
pipeline.interceptToolCallFailed(this) { context ->
if (config.continueOnToolFailure) {
logger.warn { "Tool ${context.toolName} failed, continuing" }
// Don't propagate error
} else {
throw context.error.asException()
}
}
return ErrorRecovery()
}
}
}
Fallback Strategies
Model Fallback Chain
class ModelFallbackChain(private val models: List<LLModel>) {
suspend fun execute(prompt: Prompt): Message.Assistant {
var lastError: Throwable? = null
for (model in models) {
try {
return executor.execute(prompt, model).first() as Message.Assistant
} catch (e: Exception) {
logger.warn { "Model $model failed: ${e.message}" }
lastError = e
}
}
throw lastError ?: IllegalStateException("All models failed")
}
}
val fallbackChain = ModelFallbackChain(listOf(
GPT4,
GPT3_5_TURBO,
CLAUDE_3_HAIKU
))
Service Fallback
val searchWithFallback by node<Query, Results>("search") { query ->
try {
// Try primary search service
primarySearchService.search(query)
} catch (e: Exception) {
logger.warn { "Primary search failed, trying fallback" }
try {
// Try fallback search service
fallbackSearchService.search(query)
} catch (e2: Exception) {
logger.error { "All search services failed" }
// Return cached or default results
cachedSearchResults.getOrDefault(query, emptyResults())
}
}
}
Circuit Breaker Pattern
class CircuitBreaker(
private val failureThreshold: Int = 5,
private val resetTimeout: Duration = 60.seconds
) {
private var failureCount = 0
private var state = State.CLOSED
private var lastFailureTime: Instant? = null
enum class State { CLOSED, OPEN, HALF_OPEN }
suspend fun <T> execute(block: suspend () -> T): T {
when (state) {
State.OPEN -> {
if (Clock.System.now() - lastFailureTime!! > resetTimeout) {
state = State.HALF_OPEN
} else {
throw CircuitBreakerOpenException("Circuit breaker is open")
}
}
State.HALF_OPEN -> {
// Allow one request through
}
State.CLOSED -> {
// Normal operation
}
}
return try {
val result = block()
onSuccess()
result
} catch (e: Exception) {
onFailure()
throw e
}
}
private fun onSuccess() {
failureCount = 0
state = State.CLOSED
}
private fun onFailure() {
failureCount++
lastFailureTime = Clock.System.now()
if (failureCount >= failureThreshold) {
state = State.OPEN
}
}
}
// Usage
val circuitBreaker = CircuitBreaker()
val externalCall by node<Input, Output>("external-call") { input ->
circuitBreaker.execute {
externalService.call(input)
}
}
Timeout Configuration
HTTP Client Timeouts
val httpClient = HttpClient(CIO) {
install(HttpTimeout) {
requestTimeoutMillis = 30000 // 30 seconds
connectTimeoutMillis = 10000 // 10 seconds
socketTimeoutMillis = 30000 // 30 seconds
}
}
Node Execution Timeouts
val timeoutNode by node<Input, Output>("with-timeout") { input ->
withTimeout(30.seconds) {
longRunningOperation(input)
}
}
Best Practices
1. Categorize Errors
when (error) {
is RetryableError -> retry()
is PermanentError -> fail()
is TransientError -> retryWithBackoff()
}
2. Log Contextual Information
catch (e: Exception) {
logger.error(e) {
"""Agent execution failed:
| Agent ID: $agentId
| Run ID: $runId
| Input: $input
| Stage: ${executionInfo.path()}
""".trimMargin()
}
}
3. Set Appropriate Timeouts
// Short timeout for fast operations
val quickCheck = withTimeout(5.seconds) { /* ... */ }
// Long timeout for complex analysis
val deepAnalysis = withTimeout(5.minutes) { /* ... */ }
4. Monitor Error Rates
pipeline.interceptAgentExecutionFailed(this) { context ->
metrics.recordError(
agentId = context.agentId,
errorType = context.throwable::class.simpleName,
errorMessage = context.throwable.message
)
}
5. Provide Meaningful Error Messages
// ❌ Bad: Generic error
throw Exception("Error")
// ✅ Good: Specific, actionable error
throw ToolExecutionException(
toolName = "search",
message = "Search failed: API key invalid or expired. Please check configuration.",
cause = originalException
)
Testing Error Scenarios
class ErrorHandlingTest {
@Test
fun `should retry on rate limit error`() = runTest {
var attempts = 0
val mockClient = object : LLMClient() {
override suspend fun execute(
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>
): List<Message.Response> {
attempts++
if (attempts < 3) {
throw RateLimitException("Rate limit exceeded")
}
return listOf(Message.Assistant("Success"))
}
}
val retryingClient = RetryingLLMClient(mockClient)
val result = retryingClient.execute(prompt, model, emptyList())
assertEquals(3, attempts)
assertEquals("Success", (result.first() as Message.Assistant).content)
}
}