Documentation Index Fetch the complete documentation index at: https://mintlify.com/rowboatlabs/rowboat/llms.txt
Use this file to discover all available pages before exploring further.
The agent runtime orchestrates LLM interactions, tool execution, and state management for both interactive and background agents.
Runtime Interface
The core runtime interface provides a single method for triggering agent execution:
interface IAgentRuntime {
trigger ( runId : string ) : Promise < void >;
}
Location: apps/x/packages/core/src/agents/runtime.ts:32
AgentRuntime Class
The main runtime implementation handles:
Run locking - Prevents concurrent execution of the same run
Abort management - Handles user-requested stops
Event streaming - Publishes events to the message bus
State persistence - Saves events to runs repository
const runtime = new AgentRuntime ({
runsRepo: IRunsRepo , // Run persistence
idGenerator: IMonotonicallyIncreasingIdGenerator ,
bus: IBus , // Event publishing
messageQueue: IMessageQueue , // User message queue
modelConfigRepo: IModelConfigRepo ,
runsLock: IRunsLock , // Distributed locking
abortRegistry: IAbortRegistry // Abort signal management
});
Location: apps/x/packages/core/src/agents/runtime.ts:36
Execution Flow
1. Run Initialization
async trigger ( runId : string ): Promise < void > {
// Acquire distributed lock
if (! await this.runsLock.lock( runId )) {
return ; // Another process is running this
}
// Create abort signal
const signal = this . abortRegistry . createForRun ( runId );
// Publish start event
await this.bus.publish({
runId ,
type : "run-processing-start" ,
subflow : []
});
}
Location: apps/x/packages/core/src/agents/runtime.ts:71
2. Main Loop
The runtime executes in a loop until completion:
while ( true ) {
// Check for abort
if ( signal . aborted ) break ;
// Fetch current run state
const run = await this . runsRepo . fetch ( runId );
// Rebuild state from event log
const state = new AgentState ();
for ( const event of run . log ) {
state . ingest ( event );
}
// Stream agent execution
for await ( const event of streamAgent ({
state ,
idGenerator: this . idGenerator ,
runId ,
messageQueue: this . messageQueue ,
modelConfigRepo: this . modelConfigRepo ,
signal ,
abortRegistry: this . abortRegistry
})) {
// Persist and publish events
if ( event . type !== "llm-stream-event" ) {
await this . runsRepo . appendEvents ( runId , [ event ]);
}
await this . bus . publish ( event );
}
// Break if no events generated
if ( ! eventCount ) break ;
}
Location: apps/x/packages/core/src/agents/runtime.ts:83
3. Cleanup
Always executed, even on error:
finally {
this . abortRegistry . cleanup ( runId );
await this . runsLock . release ( runId );
await this . bus . publish ({
runId ,
type: "run-processing-end" ,
subflow: []
});
}
Location: apps/x/packages/core/src/agents/runtime.ts:139
Agent Streaming
The streamAgent function implements the core agent logic:
Location : apps/x/packages/core/src/agents/runtime.ts:672This is the heart of agent execution - it loads the agent, builds tools, runs the LLM loop, and handles tool execution.
Setup Phase
// Load model config
const modelConfig = await modelConfigRepo . getConfig ();
// Load agent definition
const agent = await loadAgent ( state . agentName ! );
// Build tool set
const tools = await buildTools ( agent );
// Create provider and model
const provider = createProvider ( modelConfig . provider );
const knowledgeGraphAgents = [ "note_creation" , "email-draft" , "meeting-prep" ];
const modelId = knowledgeGraphAgents . includes ( state . agentName ! )
? modelConfig . knowledgeGraphModel
: modelConfig . model ;
const model = provider . languageModel ( modelId );
Location: apps/x/packages/core/src/agents/runtime.ts:696
Agent Loop
The agent executes in iterations:
Tool Execution
Exit Conditions
LLM Turn
// Execute pending tool calls
for ( const toolCallId of Object . keys ( state . pendingToolCalls )) {
const toolCall = state . toolCallIdMap [ toolCallId ];
// Skip ask-human (handled separately)
if ( toolCall . toolName === "ask-human" ) continue ;
// Check for denial
if ( state . deniedToolCallIds [ toolCallId ]) {
yield * processEvent ({
runId ,
type: "message" ,
message: {
role: "tool" ,
content: "Permission was denied." ,
toolCallId ,
toolName: toolCall . toolName
},
subflow: []
});
continue ;
}
// Check for pending permission
if ( state . pendingToolPermissionRequests [ toolCallId ]) {
continue ; // Wait for approval
}
// Execute tool
let result ;
if ( agent . tools [ toolCall . toolName ]. type === "agent" ) {
// Recursive agent call
result = await executeSubAgent ( ... );
} else {
// Direct tool execution
result = await execTool ( ... );
}
// Yield result
yield * processEvent ({
runId ,
type: "message" ,
message: {
role: "tool" ,
content: JSON . stringify ( result ),
toolCallId ,
toolName: toolCall . toolName
},
subflow: []
});
}
Location: apps/x/packages/core/src/agents/runtime.ts:717
After LLM response, process tool calls:
if ( message . content instanceof Array ) {
for ( const part of message . content ) {
if ( part . type !== "tool-call" ) continue ;
const underlyingTool = agent . tools [ part . toolName ];
// Handle ask-human
if ( underlyingTool . type === "builtin" &&
underlyingTool . name === "ask-human" ) {
yield * processEvent ({
type: "ask-human-request" ,
toolCallId: part . toolCallId ,
query: part . arguments . question ,
subflow: []
});
}
// Handle command permission
if ( underlyingTool . type === "builtin" &&
underlyingTool . name === "executeCommand" ) {
if ( isBlocked ( part . arguments . command , state . sessionAllowedCommands )) {
yield * processEvent ({
type: "tool-permission-request" ,
toolCall: part ,
subflow: []
});
}
}
// Handle agent call (spawn subflow)
if ( underlyingTool . type === "agent" ) {
yield * processEvent ({
type: "spawn-subflow" ,
agentName: underlyingTool . name ,
toolCallId: part . toolCallId ,
subflow: []
});
yield * processEvent ({
type: "message" ,
message: {
role: "user" ,
content: part . arguments . message
},
subflow: [ part . toolCallId ]
});
}
}
}
Location: apps/x/packages/core/src/agents/runtime.ts:917
Message Building
The StreamStepMessageBuilder assembles assistant messages from LLM stream events:
class StreamStepMessageBuilder {
private parts : AssistantContentPart [] = [];
private textBuffer : string = "" ;
private reasoningBuffer : string = "" ;
ingest ( event : LlmStepStreamEvent ) {
switch ( event . type ) {
case "reasoning-delta" :
this . reasoningBuffer += event . delta ;
break ;
case "text-delta" :
this . textBuffer += event . delta ;
break ;
case "tool-call" :
this . flushBuffers ();
this . parts . push ({
type: "tool-call" ,
toolCallId: event . toolCallId ,
toolName: event . toolName ,
arguments: event . input
});
break ;
}
}
get () : AssistantMessage {
this . flushBuffers ();
return {
role: "assistant" ,
content: this . parts ,
providerOptions: this . providerOptions
};
}
}
Location: apps/x/packages/core/src/agents/runtime.ts:224
Agent Loading
Agents are loaded from Markdown files or special built-in agents:
export async function loadAgent ( id : string ) : Promise < Agent > {
// Built-in copilot
if ( id === "copilot" || id === "rowboatx" ) {
return CopilotAgent ;
}
// Note creation agent (with strictness variants)
if ( id === "note_creation" ) {
const strictness = getNoteCreationStrictness ();
let raw = strictness === "medium" ? noteCreationMediumRaw
: strictness === "low" ? noteCreationLowRaw
: noteCreationHighRaw ;
// Parse frontmatter if present
if ( raw . startsWith ( "---" )) {
const end = raw . indexOf ( " \n ---" , 3 );
const fm = raw . slice ( 3 , end ). trim ();
const content = raw . slice ( end + 4 ). trim ();
const yaml = parse ( fm );
return {
name: id ,
... yaml ,
instructions: content
};
}
}
// Load from repository
const repo = container . resolve < IAgentsRepo >( 'agentsRepo' );
return await repo . fetch ( id );
}
Location: apps/x/packages/core/src/agents/runtime.ts:313
Tools are mapped from agent definitions to executable functions:
async function buildTools ( agent : Agent ) : Promise < ToolSet > {
const tools : ToolSet = {};
for ( const [ name , tool ] of Object . entries ( agent . tools ?? {})) {
// Skip unavailable builtin tools
if ( tool . type === 'builtin' ) {
const builtin = BuiltinTools [ tool . name ];
if ( builtin ?. isAvailable && ! ( await builtin . isAvailable ())) {
continue ;
}
}
tools [ name ] = await mapAgentTool ( tool );
}
return tools ;
}
export async function mapAgentTool ( t : ToolAttachment ) : Promise < Tool > {
switch ( t . type ) {
case "mcp" :
return tool ({
name: t . name ,
description: t . description ,
inputSchema: jsonSchema ( t . inputSchema )
});
case "agent" :
const agent = await loadAgent ( t . name );
return tool ({
name: t . name ,
description: agent . description ,
inputSchema: z . object ({
message: z . string (). describe ( "Message to send to agent" )
})
});
case "builtin" :
if ( t . name === "ask-human" ) {
return tool ({
description: "Ask a human before proceeding" ,
inputSchema: z . object ({
question: z . string ()
})
});
}
const match = BuiltinTools [ t . name ];
return tool ({
description: match . description ,
inputSchema: match . inputSchema
});
}
}
Location: apps/x/packages/core/src/agents/runtime.ts:464
State Management
AgentState maintains execution state and processes events:
class AgentState {
runId : string | null = null ;
agent : Agent | null = null ;
agentName : string | null = null ;
messages : MessageList = [];
lastAssistantMsg : AssistantMessage | null = null ;
// Tool tracking
toolCallIdMap : Record < string , ToolCallPart > = {};
pendingToolCalls : Record < string , true > = {};
// Permission tracking
pendingToolPermissionRequests : Record < string , ToolPermissionRequestEvent > = {};
pendingAskHumanRequests : Record < string , AskHumanRequestEvent > = {};
allowedToolCallIds : Record < string , true > = {};
deniedToolCallIds : Record < string , true > = {};
sessionAllowedCommands : Set < string > = new Set ();
// Subflow tracking (for agent tools)
subflowStates : Record < string , AgentState > = {};
}
Location: apps/x/packages/core/src/agents/runtime.ts:484
ingest ( event : RunEvent ) {
// Route to subflow if needed
if ( event . subflow . length > 0 ) {
const subflowId = event . subflow [ 0 ];
if ( ! this . subflowStates [ subflowId ]) {
this . subflowStates [ subflowId ] = new AgentState ();
}
this . subflowStates [ subflowId ]. ingest ({
... event ,
subflow: event . subflow . slice ( 1 )
});
return ;
}
switch ( event . type ) {
case "start" :
this . runId = event . runId ;
this . agentName = event . agentName ;
break ;
case "message" :
this . messages . push ( event . message );
// Track tool calls
if ( event . message . content instanceof Array ) {
for ( const part of event . message . content ) {
if ( part . type === "tool-call" ) {
this . toolCallIdMap [ part . toolCallId ] = part ;
this . pendingToolCalls [ part . toolCallId ] = true ;
}
}
}
// Clear pending on tool result
if ( event . message . role === "tool" ) {
delete this . pendingToolCalls [ event . message . toolCallId ];
}
break ;
case "tool-permission-response" :
if ( event . response === "approve" ) {
this . allowedToolCallIds [ event . toolCallId ] = true ;
// Add to session allowlist if requested
if ( event . scope === "session" ) {
const names = extractCommandNames ( toolCall . arguments . command );
for ( const name of names ) {
this . sessionAllowedCommands . add ( name );
}
}
} else {
this . deniedToolCallIds [ event . toolCallId ] = true ;
}
delete this . pendingToolPermissionRequests [ event . toolCallId ];
break ;
}
}
Location: apps/x/packages/core/src/agents/runtime.ts:585
Abort Handling
The runtime supports graceful abort:
// Check for abort throughout execution
signal . throwIfAborted ();
// Handle abort in main loop
try {
for await ( const event of streamAgent ({ signal })) {
// Process events
}
} catch ( error ) {
if ( error instanceof Error && error . name === "AbortError" ) {
break ; // Exit cleanly
}
throw error ;
}
// Emit stopped event
if ( signal . aborted ) {
await this . runsRepo . appendEvents ( runId , [{
runId ,
type: "run-stopped" ,
reason: "user-requested" ,
subflow: []
}]);
}
Location: apps/x/packages/core/src/agents/runtime.ts:84
Run Logging
Events are logged to JSONL files for debugging:
class RunLogger {
constructor ( runId : string ) {
this . logFile = path . join ( WorkDir , "runs" , ` ${ runId } .jsonl` );
this . fileHandle = fs . createWriteStream ( this . logFile , {
flags: "a" ,
encoding: "utf8"
});
}
log ( event : RunEvent ) {
if ( event . type !== "llm-stream-event" ) {
this . fileHandle . write ( JSON . stringify ( event ) + " \n " );
}
}
}
Location: apps/x/packages/core/src/agents/runtime.ts:193
Next Steps
Background Agents Learn about scheduled execution
Skills Explore the skill system