Documentation Index Fetch the complete documentation index at: https://mintlify.com/mastra-ai/mastra/llms.txt
Use this file to discover all available pages before exploring further.
Workflows in Mastra provide type-safe, composable task execution with built-in error handling, state management, and suspend/resume capabilities. They enable building complex execution flows with branching logic and parallel operations.
Core Concept
Workflows are:
Graph-based : Chain steps with .then(), .branch(), .parallel()
Type-safe : Full TypeScript inference across step inputs/outputs
Resumable : Suspend execution and resume from any step
Composable : Nest workflows within workflows
Observable : Track execution with observability spans
Basic Workflow
Create a simple sequential workflow:
import { createWorkflow , createStep } from '@mastra/core/workflows' ;
import { z } from 'zod' ;
const validateStep = createStep ({
id: 'validate' ,
inputSchema: z . object ({ email: z . string () }),
outputSchema: z . object ({ valid: z . boolean () }),
execute : async ({ email }) => {
return { valid: email . includes ( '@' ) };
}
});
const saveStep = createStep ({
id: 'save' ,
inputSchema: z . object ({ email: z . string (), valid: z . boolean () }),
outputSchema: z . object ({ saved: z . boolean () }),
execute : async ({ email , valid }) => {
if ( ! valid ) throw new Error ( 'Invalid email' );
await db . users . create ({ email });
return { saved: true };
}
});
const workflow = createWorkflow ({
id: 'user-signup' ,
inputSchema: z . object ({ email: z . string () })
})
. then ( validateStep )
. then ( saveStep )
. commit ();
// Execute workflow
const run = await workflow . createRun ({
inputData: { email: 'user@example.com' }
});
const result = await run . start ();
console . log ( result . output ); // { saved: true }
Workflow Configuration
interface WorkflowConfig < TInputSchema , TOutputSchema > {
id : string ; // Unique identifier
inputSchema : TInputSchema ; // Zod schema for input
outputSchema ?: TOutputSchema ; // Optional output schema
stateSchema ?: ZodSchema ; // Workflow state schema
type ?: 'default' | 'processor' ; // Workflow type
options ?: {
validateInputs ?: boolean ; // Validate step inputs
tracingPolicy ?: TracingPolicy ; // Observability config
};
}
Chaining Steps
Sequential Execution
Use .then() to chain steps sequentially:
const workflow = createWorkflow ({
id: 'data-pipeline' ,
inputSchema: z . object ({ rawData: z . string () })
})
. then ( parseStep ) // Step 1: Parse data
. then ( validateStep ) // Step 2: Validate
. then ( transformStep ) // Step 3: Transform
. then ( saveStep ) // Step 4: Save
. commit ();
Each step receives the output of the previous step.
Dynamic Mapping
Map specific fields from previous steps:
import { mapVariable } from '@mastra/core/workflows' ;
const workflow = createWorkflow ({
id: 'extract-process' ,
inputSchema: z . object ({ url: z . string () })
})
. then ( fetchStep ) // Returns { data, metadata }
. then (
processStep , // Only needs 'data' field
{
data: mapVariable ({ step: fetchStep , path: 'data' })
}
)
. commit ();
Branching
Conditionally execute different steps:
const workflow = createWorkflow ({
id: 'user-approval' ,
inputSchema: z . object ({ userId: z . string () })
})
. then ( checkUserStep ) // Returns { approved: boolean }
. branch ({
// Condition function
when : ({ checkUserStep }) => checkUserStep . approved ,
// Steps to run if true
then: [ approveStep , notifyStep ],
// Steps to run if false (optional)
otherwise: [ rejectStep ]
})
. commit ();
Multiple Branches
const workflow = createWorkflow ({
id: 'payment-router' ,
inputSchema: z . object ({ amount: z . number (), method: z . string () })
})
. then ( validatePaymentStep )
. branch ({
when : ({ validatePaymentStep }) =>
validatePaymentStep . method === 'card' ,
then: [ processCardStep ]
})
. branch ({
when : ({ validatePaymentStep }) =>
validatePaymentStep . method === 'bank' ,
then: [ processBankStep ]
})
. commit ();
Parallel Execution
Run multiple steps concurrently:
const workflow = createWorkflow ({
id: 'data-enrichment' ,
inputSchema: z . object ({ userId: z . string () })
})
. then ( fetchUserStep ) // Get user data
. parallel ([
fetchOrdersStep , // Fetch orders concurrently
fetchPreferencesStep , // Fetch preferences concurrently
fetchActivityStep // Fetch activity concurrently
])
. then ( combineDataStep ) // Combine results
. commit ();
All parallel steps receive the same input and execute concurrently.
Agent Steps
Wrap agents as workflow steps:
import { Agent } from '@mastra/core/agent' ;
const summaryAgent = new Agent ({
id: 'summarizer' ,
instructions: 'Summarize the content concisely' ,
model: 'openai/gpt-5'
});
const workflow = createWorkflow ({
id: 'content-pipeline' ,
inputSchema: z . object ({ url: z . string () })
})
. then ( fetchContentStep )
. then (
createStep ( summaryAgent , {
structuredOutput: {
schema: z . object ({
summary: z . string (),
keyPoints: z . array ( z . string ())
})
}
}),
{ prompt: mapVariable ({ step: fetchContentStep , path: 'content' }) }
)
. commit ();
Use tools as workflow steps:
import { createTool } from '@mastra/core/tools' ;
const weatherTool = createTool ({
id: 'get-weather' ,
description: 'Get weather for location' ,
inputSchema: z . object ({ location: z . string () }),
execute : async ({ location }) => {
return await fetchWeather ( location );
}
});
const workflow = createWorkflow ({
id: 'weather-report' ,
inputSchema: z . object ({ city: z . string () })
})
. then (
createStep ( weatherTool ),
{ location: mapVariable ({ initData: workflow , path: 'city' }) }
)
. commit ();
State Management
Maintain workflow state across steps:
const workflow = createWorkflow ({
id: 'stateful-workflow' ,
inputSchema: z . object ({ items: z . array ( z . string ()) }),
stateSchema: z . object ({ processed: z . number () })
})
. then ( createStep ({
id: 'process' ,
execute : async ({ items }, { state , setState }) => {
const processed = ( state ?. processed || 0 ) + items . length ;
setState ({ processed });
return { processed };
}
}))
. commit ();
const run = await workflow . createRun ({
inputData: { items: [ 'a' , 'b' , 'c' ] },
initialState: { processed: 0 }
});
Suspend and Resume
Suspend execution for human approval or external input:
const approvalStep = createStep ({
id: 'approval' ,
inputSchema: z . object ({ amount: z . number () }),
outputSchema: z . object ({ approved: z . boolean () }),
suspendSchema: z . object ({ approvalId: z . string () }),
resumeSchema: z . object ({ approved: z . boolean () }),
execute : async ({ amount }, { suspend , resumeData }) => {
// If resuming, use resume data
if ( resumeData ) {
return { approved: resumeData . approved };
}
// Otherwise suspend for approval
const approvalId = await createApprovalRequest ( amount );
await suspend ({ approvalId });
// This code won't execute until resumed
return { approved: false };
}
});
const workflow = createWorkflow ({
id: 'payment-approval' ,
inputSchema: z . object ({ amount: z . number () })
})
. then ( approvalStep )
. then ( processPaymentStep )
. commit ();
// Start workflow
const run = await workflow . createRun ({
inputData: { amount: 10000 }
});
await run . start (); // Suspends at approval step
// Later, resume with approval decision
await run . resume ({
runId: run . id ,
resumeData: { approved: true }
});
Error Handling
Handle step failures with retries:
const retryableStep = createStep (
{
id: 'fetch-api' ,
execute : async () => {
const response = await fetch ( 'https://api.example.com/data' );
if ( ! response . ok ) throw new Error ( 'API failed' );
return response . json ();
}
},
{ retries: 3 } // Retry up to 3 times
);
const workflow = createWorkflow ({
id: 'api-workflow' ,
inputSchema: z . object ({})
})
. then ( retryableStep )
. commit ();
try {
const result = await run . start ();
} catch ( error ) {
console . error ( 'Workflow failed:' , error );
}
Nested Workflows
Compose workflows within workflows:
const dataWorkflow = createWorkflow ({
id: 'data-processing' ,
inputSchema: z . object ({ data: z . string () })
})
. then ( parseStep )
. then ( validateStep )
. commit ();
const mainWorkflow = createWorkflow ({
id: 'main' ,
inputSchema: z . object ({ rawData: z . string () })
})
. then ( fetchStep )
. then (
dataWorkflow , // Nest the data workflow
{ data: mapVariable ({ step: fetchStep , path: 'content' }) }
)
. then ( saveStep )
. commit ();
Streaming Output
Stream workflow progress in real-time:
const run = await workflow . createRun ({
inputData: { query: 'search term' }
});
const stream = await run . streamStart ();
for await ( const chunk of stream ) {
if ( chunk . type === 'step-start' ) {
console . log ( `Starting step: ${ chunk . stepId } ` );
}
if ( chunk . type === 'step-finish' ) {
console . log ( `Finished step: ${ chunk . stepId } ` , chunk . output );
}
}
Workflow Runs
Manage workflow execution:
// Create run
const run = await workflow . createRun ({
inputData: { email: 'user@example.com' },
initialState: { count: 0 },
requestContext: ctx
});
// Start execution
const result = await run . start ();
// Get run status
const status = await run . getStatus ();
console . log ( status ); // 'running' | 'completed' | 'failed' | 'suspended'
// Cancel run
await run . cancel ();
// Resume suspended run
await run . resume ({ resumeData: { approved: true } });
Observability
Workflows automatically create spans for observability:
const workflow = createWorkflow ({
id: 'traced-workflow' ,
inputSchema: z . object ({ data: z . string () }),
options: {
tracingPolicy: {
internal: false // Mark spans as user-facing
}
}
})
. then ( step1 )
. then ( step2 )
. commit ();
Spans include:
Workflow execution span
Step execution spans
Step input/output data
Timing information
Best Practices
Use clear, descriptive IDs for better observability: // Good
createStep ({ id: 'validate-email' , ... })
createStep ({ id: 'send-notification' , ... })
// Avoid
createStep ({ id: 'step1' , ... })
createStep ({ id: 's2' , ... })
Handle errors at appropriate levels
Use retries for transient failures, error handlers for expected errors: const workflow = createWorkflow ( ... )
. then ( createStep ( fetchStep ), { retries: 3 })
. branch ({
when : ({ fetchStep }) => fetchStep . status === 'failed' ,
then: [ logErrorStep , notifyStep ]
})
. commit ();
Use parallel steps for independent operations
Execute independent operations concurrently: // Good: Parallel execution
workflow . parallel ([
fetchUserStep ,
fetchOrdersStep ,
fetchPreferencesStep
])
// Avoid: Sequential when unnecessary
workflow
. then ( fetchUserStep )
. then ( fetchOrdersStep )
. then ( fetchPreferencesStep )
Use state for complex workflows
Store workflow state for multi-step coordination: const workflow = createWorkflow ({
stateSchema: z . object ({
attempts: z . number (),
lastError: z . string (). optional ()
})
})
. then ( createStep ({
execute : async ( input , { state , setState }) => {
const attempts = ( state ?. attempts || 0 ) + 1 ;
setState ({ attempts });
// ...
}
}))
. commit ();