Motia provides built-in state management through the StateManager API. State is scoped, persistent across executions, and accessible from any step handler via ctx.state.
Overview
State in Motia is organized by scopes (groups) and keys:
scope: user-sessions
├─ key: session-abc123 → { userId: '42', expiresAt: 1234567890 }
├─ key: session-xyz789 → { userId: '88', expiresAt: 1234567999 }
└─ key: session-def456 → { userId: '15', expiresAt: 1234568000 }
scope: rate-limits
├─ key: user-42 → { count: 10, resetAt: 1234567890 }
└─ key: user-88 → { count: 5, resetAt: 1234567899 }
Scopes provide logical grouping and isolation. Choose meaningful scope names like user-sessions, feature-flags, or rate-limits.
StateManager interface
interface InternalStateManager {
get < T >( scope : string , key : string ) : Promise < T | null >
set < T >( scope : string , key : string , value : T ) : Promise < StreamSetResult < T > | null >
update < T >( scope : string , key : string , ops : UpdateOp []) : Promise < StreamSetResult < T > | null >
delete < T >( scope : string , key : string ) : Promise < T | null >
list < T >( scope : string ) : Promise < T []>
clear ( scope : string ) : Promise < void >
listGroups () : Promise < string []>
}
class InternalStateManager ( Protocol ):
async def get ( self , scope : str , key : str ) -> Any | None : ...
async def set ( self , scope : str , key : str , value : Any) -> Any: ...
async def update ( self , scope : str , key : str , ops : list[dict[ str , Any]]) -> Any: ...
async def delete ( self , scope : str , key : str ) -> Any | None : ...
async def list ( self , scope : str ) -> list[Any]: ...
async def clear ( self , scope : str ) -> None : ...
async def list_groups ( self ) -> list[ str ]: ...
Get
Retrieve a value by scope and key:
import { step , http } from 'motia'
export default step ({
name: 'get-session' ,
triggers: [ http ( 'GET' , '/session/:sessionId' )]
}, async ( input , ctx ) => {
const { sessionId } = input . request . pathParams
const session = await ctx . state . get < Session >( 'user-sessions' , sessionId )
if ( ! session ) {
return { status: 404 , body: { error: 'Session not found' } }
}
return { status: 200 , body: session }
} )
interface Session {
userId : string
expiresAt : number
data : Record < string , unknown >
}
from motia import step, http
default = step({
'name' : 'get-session' ,
'triggers' : [http( 'GET' , '/session/ {sessionId} ' )]
}, async def handler( input , ctx):
session_id = input .request.path_params[ 'sessionId' ]
session = await ctx.state.get( 'user-sessions' , session_id)
if not session:
return { 'status' : 404 , 'body' : { 'error' : 'Session not found' }}
return { 'status' : 200 , 'body' : session}
)
The scope (group) to read from
The stored value or null if not found
Set
Store a value with scope and key:
import { step , http } from 'motia'
export default step ({
name: 'create-session' ,
triggers: [ http ( 'POST' , '/session' )]
}, async ( input , ctx ) => {
const { userId } = input . request . body
const sessionId = generateSessionId ()
const expiresAt = Date . now () + 3600000 // 1 hour
await ctx . state . set ( 'user-sessions' , sessionId , {
userId ,
expiresAt ,
createdAt: Date . now (),
data: {}
})
return {
status: 201 ,
body: { sessionId , expiresAt }
}
} )
from motia import step, http
from time import time
default = step({
'name' : 'create-session' ,
'triggers' : [http( 'POST' , '/session' )]
}, async def handler( input , ctx):
user_id = input .request.body[ 'userId' ]
session_id = generate_session_id()
expires_at = int (time() * 1000 ) + 3600000 # 1 hour
await ctx.state.set( 'user-sessions' , session_id, {
'userId' : user_id,
'expiresAt' : expires_at,
'createdAt' : int (time() * 1000 ),
'data' : {}
})
return {
'status' : 201 ,
'body' : { 'sessionId' : session_id, 'expiresAt' : expires_at}
}
)
The scope (group) to store in
The value to store (must be JSON-serializable)
State values are automatically serialized. Store plain objects, arrays, strings, numbers, or booleans.
Update
Apply partial updates without reading the entire value:
import { step , http } from 'motia'
export default step ({
name: 'update-session' ,
triggers: [ http ( 'PATCH' , '/session/:sessionId' )]
}, async ( input , ctx ) => {
const { sessionId } = input . request . pathParams
const updates = input . request . body
await ctx . state . update ( 'user-sessions' , sessionId , [
{ type: 'set' , path: 'data.lastActivity' , value: Date . now () },
{ type: 'set' , path: 'data.pageViews' , value: updates . pageViews }
])
return { status: 200 , body: { updated: true } }
} )
from motia import step, http
from time import time
default = step({
'name' : 'update-session' ,
'triggers' : [http( 'PATCH' , '/session/ {sessionId} ' )]
}, async def handler( input , ctx):
session_id = input .request.path_params[ 'sessionId' ]
updates = input .request.body
await ctx.state.update( 'user-sessions' , session_id, [
{ 'type' : 'set' , 'path' : 'data.lastActivity' , 'value' : int (time() * 1000 )},
{ 'type' : 'set' , 'path' : 'data.pageViews' , 'value' : updates[ 'pageViews' ]}
])
return { 'status' : 200 , 'body' : { 'updated' : True }}
)
Update operations
type UpdateOp =
| { type : 'set' ; path : string ; value : unknown }
| { type : 'delete' ; path : string }
| { type : 'increment' ; path : string ; value : number }
// Examples:
await ctx . state . update ( 'counters' , 'user-42' , [
// Set a field
{ type : 'set' , path : 'name' , value : 'Alice' },
// Set a nested field
{ type : 'set' , path : 'profile.age' , value : 30 },
// Delete a field
{ type : 'delete' , path : 'oldField' },
// Increment a counter
{ type : 'increment' , path : 'loginCount' , value : 1 }
])
# Examples:
await ctx.state.update( 'counters' , 'user-42' , [
# Set a field
{ 'type' : 'set' , 'path' : 'name' , 'value' : 'Alice' },
# Set a nested field
{ 'type' : 'set' , 'path' : 'profile.age' , 'value' : 30 },
# Delete a field
{ 'type' : 'delete' , 'path' : 'oldField' },
# Increment a counter
{ 'type' : 'increment' , 'path' : 'loginCount' , 'value' : 1 }
])
Update operations are atomic. All operations in the array are applied together.
Delete
Remove a value and return the previous value:
import { step , http } from 'motia'
export default step ({
name: 'delete-session' ,
triggers: [ http ( 'DELETE' , '/session/:sessionId' )]
}, async ( input , ctx ) => {
const { sessionId } = input . request . pathParams
const deleted = await ctx . state . delete ( 'user-sessions' , sessionId )
if ( ! deleted ) {
return { status: 404 , body: { error: 'Session not found' } }
}
ctx . logger . info ( 'Session deleted' , { sessionId , userId: deleted . userId })
return { status: 200 , body: { deleted: true } }
} )
from motia import step, http
default = step({
'name' : 'delete-session' ,
'triggers' : [http( 'DELETE' , '/session/ {sessionId} ' )]
}, async def handler( input , ctx):
session_id = input .request.path_params[ 'sessionId' ]
deleted = await ctx.state.delete( 'user-sessions' , session_id)
if not deleted:
return { 'status' : 404 , 'body' : { 'error' : 'Session not found' }}
ctx.logger.info( 'Session deleted' ,
extra = { 'sessionId' : session_id, 'userId' : deleted[ 'userId' ]})
return { 'status' : 200 , 'body' : { 'deleted' : True }}
)
List
Retrieve all values in a scope:
import { step , http } from 'motia'
export default step ({
name: 'list-sessions' ,
triggers: [ http ( 'GET' , '/sessions' )]
}, async ( input , ctx ) => {
const sessions = await ctx . state . list < Session >( 'user-sessions' )
// Filter expired sessions
const now = Date . now ()
const active = sessions . filter ( s => s . expiresAt > now )
return {
status: 200 ,
body: {
total: sessions . length ,
active: active . length ,
sessions: active
}
}
} )
from motia import step, http
from time import time
default = step({
'name' : 'list-sessions' ,
'triggers' : [http( 'GET' , '/sessions' )]
}, async def handler( input , ctx):
sessions = await ctx.state.list( 'user-sessions' )
# Filter expired sessions
now = int (time() * 1000 )
active = [s for s in sessions if s.get( 'expiresAt' , 0 ) > now]
return {
'status' : 200 ,
'body' : {
'total' : len (sessions),
'active' : len (active),
'sessions' : active
}
}
)
list() returns all items in a scope. For large scopes, consider pagination or filtering strategies.
Clear
Delete all values in a scope:
import { step , cron } from 'motia'
export default step ({
name: 'cleanup-expired-sessions' ,
triggers: [ cron ( '0 * * * *' )] // Every hour
}, async ( _ , ctx ) => {
const sessions = await ctx . state . list < Session >( 'user-sessions' )
const now = Date . now ()
let expiredCount = 0
for ( const session of sessions ) {
if ( session . expiresAt <= now ) {
await ctx . state . delete ( 'user-sessions' , session . id )
expiredCount ++
}
}
ctx . logger . info ( 'Cleaned up expired sessions' , { expiredCount })
} )
from motia import step, cron
from time import time
default = step({
'name' : 'cleanup-expired-sessions' ,
'triggers' : [cron( '0 * * * *' )] # Every hour
}, async def handler(_, ctx):
sessions = await ctx.state.list( 'user-sessions' )
now = int (time() * 1000 )
expired_count = 0
for session in sessions:
if session.get( 'expiresAt' , 0 ) <= now:
await ctx.state.delete( 'user-sessions' , session[ 'id' ])
expired_count += 1
ctx.logger.info( 'Cleaned up expired sessions' ,
extra = { 'expiredCount' : expired_count})
)
List groups
Get all scope IDs:
import { step , http } from 'motia'
export default step ({
name: 'list-all-scopes' ,
triggers: [ http ( 'GET' , '/admin/scopes' )]
}, async ( input , ctx ) => {
const groups = await ctx . state . listGroups ()
return { status: 200 , body: { scopes: groups } }
} )
from motia import step, http
default = step({
'name' : 'list-all-scopes' ,
'triggers' : [http( 'GET' , '/admin/scopes' )]
}, async def handler( input , ctx):
groups = await ctx.state.list_groups()
return { 'status' : 200 , 'body' : { 'scopes' : groups}}
)
Use cases
Rate limiting
import { step , http } from 'motia'
export default step ({
name: 'rate-limited-api' ,
triggers: [ http ( 'POST' , '/api/action' )]
}, async ( input , ctx ) => {
const userId = input . request . headers [ 'x-user-id' ] as string
const scope = 'rate-limits'
// Get current rate limit
const limit = await ctx . state . get < RateLimit >( scope , userId )
const now = Date . now ()
const windowMs = 60000 // 1 minute
if ( limit && limit . resetAt > now ) {
if ( limit . count >= 100 ) {
return {
status: 429 ,
body: { error: 'Rate limit exceeded' }
}
}
await ctx . state . update ( scope , userId , [
{ type: 'increment' , path: 'count' , value: 1 }
])
} else {
await ctx . state . set ( scope , userId , {
count: 1 ,
resetAt: now + windowMs
})
}
// Process the request
return { status: 200 , body: { success: true } }
} )
interface RateLimit {
count : number
resetAt : number
}
Session management
import { step , http } from 'motia'
export default step ({
name: 'check-session' ,
triggers: [ http ( 'GET' , '/protected' )]
}, async ( input , ctx ) => {
const sessionId = input . request . headers [ 'x-session-id' ] as string
if ( ! sessionId ) {
return { status: 401 , body: { error: 'No session' } }
}
const session = await ctx . state . get < Session >( 'user-sessions' , sessionId )
if ( ! session || session . expiresAt < Date . now ()) {
return { status: 401 , body: { error: 'Invalid or expired session' } }
}
// Update last activity
await ctx . state . update ( 'user-sessions' , sessionId , [
{ type: 'set' , path: 'lastActivity' , value: Date . now () }
])
return {
status: 200 ,
body: { userId: session . userId , data: 'protected content' }
}
} )
Feature flags
import { step , http } from 'motia'
export default step ({
name: 'check-feature' ,
triggers: [ http ( 'GET' , '/feature/:featureName' )]
}, async ( input , ctx ) => {
const { featureName } = input . request . pathParams
const userId = input . request . headers [ 'x-user-id' ] as string
const flag = await ctx . state . get < FeatureFlag >( 'feature-flags' , featureName )
if ( ! flag ?. enabled ) {
return { status: 200 , body: { enabled: false } }
}
// Check user-specific rollout
if ( flag . rolloutPercent < 100 ) {
const hash = simpleHash ( userId + featureName )
const userPercent = ( hash % 100 )
if ( userPercent >= flag . rolloutPercent ) {
return { status: 200 , body: { enabled: false } }
}
}
return { status: 200 , body: { enabled: true } }
} )
interface FeatureFlag {
enabled : boolean
rolloutPercent : number
}
Best practices
Choose meaningful scopes Use descriptive scope names that reflect the data domain: user-sessions, rate-limits, cache.
Design for scale Be mindful of scope size. Large scopes may impact list() and clear() performance.
Handle missing values Always check for null returns from get(). Missing state is a normal condition.
Use update for concurrency Prefer update() over read-modify-write patterns to avoid race conditions.
Clean up expired data Use cron triggers to periodically clean up old or expired state.
Next steps
Triggers Learn about state triggers
Streams Compare state with streams
Context API Explore full FlowContext
Handlers Handler patterns with state