Documentation Index Fetch the complete documentation index at: https://mintlify.com/mastra-ai/mastra/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Workflow class provides type-safe, composable task execution with built-in error handling, state management, and observability. Workflows are composed of steps that can be chained, parallelized, or conditionally executed.
Type Parameters
TEngineType
any
default: "DefaultEngineType"
The execution engine type
Array of step types in the workflow
The workflow identifier type
The previous step’s output schema type
TRequestContext
Record<string, any>
default: "unknown"
The request context schema type
Creating Workflows
createWorkflow
createWorkflow < TWorkflowId , TState , TInput , TOutput , TSteps , TRequestContext >(
params : WorkflowConfig < TWorkflowId , TState , TInput , TOutput , TSteps , TRequestContext >
): Workflow < DefaultEngineType , TSteps , TWorkflowId , TState , TInput , TOutput , TInput , TRequestContext >
Creates a new workflow with the specified configuration.
Workflow configuration Unique identifier for the workflow
inputSchema
SchemaWithValidation<TInput>
required
Zod schema defining the input structure
outputSchema
SchemaWithValidation<TOutput>
required
Zod schema defining the output structure
stateSchema
SchemaWithValidation<TState>
Zod schema for workflow state
requestContextSchema
SchemaWithValidation<TRequestContext>
Zod schema for request context validation
Description of what the workflow does
Reference to the Mastra runtime instance
Pre-defined steps for the workflow
retryConfig
{ attempts?: number; delay?: number }
Retry configuration for failed steps
type
'default' | 'processor'
default: "'default'"
Type of workflow - ‘processor’ for processor workflows, ‘default’ otherwise
Additional workflow options Policy for trace generation
Whether to validate inputs with schemas
shouldPersistSnapshot
(params: { stepResults: Record<string, StepResult>; workflowStatus: WorkflowRunStatus }) => boolean
Function to determine if snapshots should be persisted
onFinish
(result: WorkflowFinishCallbackResult) => Promise<void> | void
Called when workflow execution completes (success, failed, suspended, or tripwire)
onError
(errorInfo: WorkflowErrorCallbackInfo) => Promise<void> | void
Called only when workflow execution fails (failed or tripwire status)
A new workflow builder instance
Properties
The unique identifier for the workflow
Description of the workflow
inputSchema
SchemaWithValidation<TInput>
Schema for validating input
outputSchema
SchemaWithValidation<TOutput>
Schema for validating output
stateSchema
SchemaWithValidation<TState> | undefined
Schema for workflow state
requestContextSchema
SchemaWithValidation<TRequestContext> | undefined
Schema for request context
steps
Record<string, StepWithComponent>
The steps in the workflow
The execution engine type (always ‘default’)
Whether the workflow has been committed (finalized)
retryConfig
{ attempts?: number; delay?: number }
Retry configuration
Workflow options including tracing and callbacks
Methods
Building Workflows
then
then < TStep extends Step > (
step : TStep
): Workflow <...>
Adds a sequential step to the workflow.
The step to add to the workflow chain
The workflow builder for method chaining
parallel
parallel < TSteps extends Step [] > (
... steps : TSteps
): Workflow <...>
Adds steps that execute in parallel.
The steps to execute in parallel
The workflow builder for method chaining
branch
branch < TSteps extends Step []>( config : {
conditions : ConditionFunction [];
steps : TSteps ;
}) : Workflow <...>
Adds conditional branching to the workflow.
Branch configuration conditions
ConditionFunction[]
required
Array of condition functions that determine which step to execute
Steps to potentially execute (one per condition)
The workflow builder for method chaining
loop
loop < TStep extends Step >( config : {
step : TStep ;
condition : LoopConditionFunction ;
type : 'dowhile' | 'dountil' ;
}) : Workflow <...>
Adds a loop to the workflow.
Loop configuration The step to execute in the loop
condition
LoopConditionFunction
required
Condition function that determines if the loop should continue
type
'dowhile' | 'dountil'
required
Loop type: ‘dowhile’ continues while condition is true, ‘dountil’ continues until condition is true
The workflow builder for method chaining
foreach
foreach < TStep extends Step >( config : {
step : TStep ;
concurrency ?: number ;
}) : Workflow <...>
Adds a foreach loop that iterates over array items.
Foreach configuration The step to execute for each item
Number of concurrent executions
The workflow builder for method chaining
sleep
sleep ( config : {
id: string ;
duration ?: number ;
fn ?: ExecuteFunction ;
}): Workflow <...>
Adds a sleep/delay step to the workflow.
Sleep configuration Unique identifier for the sleep step
Duration in milliseconds to sleep
Optional function to compute dynamic sleep duration
The workflow builder for method chaining
sleepUntil
sleepUntil ( config : {
id: string ;
date ?: Date ;
fn ?: ExecuteFunction ;
}): Workflow <...>
Adds a sleep until specific date/time step.
Sleep until configuration Unique identifier for the sleep step
Optional function to compute dynamic target date
The workflow builder for method chaining
commit
Finalizes the workflow and makes it executable.
The finalized workflow ready for execution
Executing Workflows
createRun
createRun ( options : {
runId? : string ;
resourceId ?: string ;
requestContext ?: RequestContext ;
}): Promise < Run >
Creates a new workflow run instance.
Run creation options Custom run ID (auto-generated if not provided)
Resource ID for multi-tenant scenarios
Request context for the run
A new workflow run instance
execute
execute ( params : {
inputData: TInput ;
initialState ?: TState ;
requestContext ?: RequestContext ;
resourceId ?: string ;
runId ?: string ;
tracingOptions ?: TracingOptions ;
}): Promise < WorkflowResult < TOutput >>
Executes the workflow with the given input and returns the result.
Execution parameters Input data for the workflow
Resource ID for multi-tenant scenarios
The workflow execution result
stream
stream ( params : {
inputData: TInput ;
initialState ?: TState ;
requestContext ?: RequestContext ;
resourceId ?: string ;
runId ?: string ;
tracingOptions ?: TracingOptions ;
}): Promise < MastraWorkflowStream >
Executes the workflow and streams the results.
Same as execute parameters
A stream of workflow execution events
Workflow Management
listWorkflowRuns
listWorkflowRuns ( params ?: StorageListWorkflowRunsInput ): Promise < WorkflowRuns >
Lists workflow runs with optional filtering.
params
StorageListWorkflowRunsInput
Filter parameters Filter runs created after this date
Filter runs created before this date
Page number for pagination
Items per page, or false to fetch all
Object containing runs array and total count
getWorkflowRunById
getWorkflowRunById ( params : {
runId: string ;
fields ?: WorkflowStateField [];
}): Promise < WorkflowState | null >
Gets a specific workflow run by ID.
Query parameters Optional fields to include: ‘result’, ‘error’, ‘payload’, ‘steps’, ‘activeStepsPath’, ‘serializedStepGraph’
The workflow run state, or null if not found
deleteWorkflowRun
deleteWorkflowRun ( runId : string ): Promise < void >
Deletes a workflow run.
Serialization
serialize
serialize (): WorkflowInfo
Serializes the workflow to a JSON-compatible format.
Serialized workflow information including steps, schemas, and configuration
Example
import { createWorkflow , createStep } from '@mastra/core/workflows' ;
import { z } from 'zod' ;
// Define a workflow
const processDataWorkflow = createWorkflow ({
id: 'process-data' ,
description: 'Processes user data through multiple steps' ,
inputSchema: z . object ({
userId: z . string (),
data: z . any ()
}),
outputSchema: z . object ({
processed: z . boolean (),
result: z . any ()
})
})
. then (
createStep ({
id: 'validate' ,
description: 'Validate input data' ,
inputSchema: z . object ({ userId: z . string (), data: z . any () }),
outputSchema: z . object ({ valid: z . boolean () }),
execute : async ({ inputData }) => {
return { valid: true };
}
})
)
. then (
createStep ({
id: 'transform' ,
description: 'Transform data' ,
inputSchema: z . object ({ valid: z . boolean () }),
outputSchema: z . object ({ transformed: z . any () }),
execute : async ({ inputData }) => {
return { transformed: { ... inputData , timestamp: Date . now () } };
}
})
)
. commit ();
// Execute the workflow
const result = await processDataWorkflow . execute ({
inputData: {
userId: '123' ,
data: { name: 'John' }
}
});
console . log ( result ); // { processed: true, result: ... }