Overview
Motia is designed for building AI-powered applications. This guide shows you how to integrate LLMs, build multi-agent systems, and create sophisticated AI workflows.Basic LLM integration
Start by creating a simple AI-powered endpoint:Create an AI endpoint
steps/ai-chat.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const config = {
name: 'AI Chat',
description: 'Process chat messages with GPT-4',
flows: ['ai-chat'],
triggers: [
{
type: 'http',
method: 'POST',
path: '/chat',
bodySchema: z.object({
message: z.string(),
conversationId: z.string().optional(),
}),
responseSchema: {
200: z.object({
response: z.string(),
conversationId: z.string(),
}),
},
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
{ request },
{ logger, state }
) => {
const { message, conversationId } = request.body
const convId = conversationId || `conv-${Date.now()}`
logger.info('Processing chat message', { conversationId: convId })
// Retrieve conversation history
const history = await state.get<Array<{role: string, content: string}>>(
'conversations',
convId
) || []
// Add user message
history.push({ role: 'user', content: message })
// Call OpenAI
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: history,
})
const response = completion.choices[0].message.content || ''
// Add assistant response to history
history.push({ role: 'assistant', content: response })
// Save conversation
await state.set('conversations', convId, history)
logger.info('Chat response generated', {
conversationId: convId,
messageLength: response.length,
})
return {
status: 200,
body: {
response,
conversationId: convId,
},
}
}
Streaming AI responses
For better UX, stream AI responses as they’re generated:steps/ai-stream.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const config = {
name: 'AI Chat Stream',
description: 'Stream AI responses in real-time',
flows: ['ai-chat'],
triggers: [
http('POST', '/chat/stream', {
bodySchema: z.object({
message: z.string(),
conversationId: z.string().optional(),
}),
}),
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
{ request, response },
{ logger, state }
) => {
const { message, conversationId } = request.body
const convId = conversationId || `conv-${Date.now()}`
// Set up SSE
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
connection: 'keep-alive',
})
try {
// Get conversation history
const history = await state.get<Array<{role: string, content: string}>>(
'conversations',
convId
) || []
history.push({ role: 'user', content: message })
// Stream from OpenAI
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: history,
stream: true,
})
let fullResponse = ''
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
fullResponse += content
// Stream each token to client
response.stream.write(
`event: token\ndata: ${JSON.stringify({ content })}\n\n`
)
}
}
// Save complete response
history.push({ role: 'assistant', content: fullResponse })
await state.set('conversations', convId, history)
// Send completion event
response.stream.write(
`event: done\ndata: ${JSON.stringify({
conversationId: convId,
})}\n\n`
)
logger.info('Streaming complete', { conversationId: convId })
} catch (error) {
logger.error('AI streaming failed', { error })
response.stream.write(
`event: error\ndata: ${JSON.stringify({
error: 'Failed to generate response',
})}\n\n`
)
}
response.close()
}
Multi-agent orchestration
Build sophisticated workflows where multiple AI agents collaborate:steps/multi-agent.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
export const config = {
name: 'Research Orchestrator',
description: 'Coordinate multiple AI agents for research tasks',
flows: ['ai-research'],
triggers: [
{
type: 'http',
method: 'POST',
path: '/research',
bodySchema: z.object({
topic: z.string(),
}),
},
],
enqueues: ['research-complete'],
} as const satisfies StepConfig
const AGENTS = {
researcher: {
role: 'You are a research specialist. Gather key facts about the topic.',
model: 'gpt-4',
},
analyst: {
role: 'You are an analyst. Analyze the research and identify patterns.',
model: 'gpt-4',
},
writer: {
role: 'You are a writer. Create a clear summary from the analysis.',
model: 'gpt-4',
},
}
export const handler: Handlers<typeof config> = async (
{ request },
{ logger, state, enqueue }
) => {
const { topic } = request.body
const researchId = `research-${Date.now()}`
logger.info('Starting multi-agent research', { topic, researchId })
try {
// Agent 1: Research
logger.info('Agent 1: Researching')
const researchResponse = await openai.chat.completions.create({
model: AGENTS.researcher.model,
messages: [
{ role: 'system', content: AGENTS.researcher.role },
{ role: 'user', content: `Research this topic: ${topic}` },
],
})
const research = researchResponse.choices[0].message.content
await state.set('research', `${researchId}:research`, {
agent: 'researcher',
content: research,
timestamp: new Date().toISOString(),
})
// Agent 2: Analysis
logger.info('Agent 2: Analyzing')
const analysisResponse = await openai.chat.completions.create({
model: AGENTS.analyst.model,
messages: [
{ role: 'system', content: AGENTS.analyst.role },
{ role: 'user', content: `Analyze this research: ${research}` },
],
})
const analysis = analysisResponse.choices[0].message.content
await state.set('research', `${researchId}:analysis`, {
agent: 'analyst',
content: analysis,
timestamp: new Date().toISOString(),
})
// Agent 3: Writing
logger.info('Agent 3: Writing summary')
const summaryResponse = await openai.chat.completions.create({
model: AGENTS.writer.model,
messages: [
{ role: 'system', content: AGENTS.writer.role },
{
role: 'user',
content: `Create a summary from this analysis: ${analysis}`
},
],
})
const summary = summaryResponse.choices[0].message.content
// Store final result
const result = {
topic,
research,
analysis,
summary,
createdAt: new Date().toISOString(),
}
await state.set('research', researchId, result)
// Notify completion
await enqueue({
topic: 'research-complete',
data: { researchId, topic },
})
logger.info('Multi-agent research complete', { researchId })
return {
status: 200,
body: {
researchId,
summary,
},
}
} catch (error) {
logger.error('Multi-agent research failed', { error, researchId })
return {
status: 500,
body: {
error: 'Research failed',
researchId,
},
}
}
}
Background AI processing
Process AI tasks in the background to avoid blocking API responses:steps/ai-background.step.ts
import { queue, step } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
const analysisSchema = z.object({
contentId: z.string(),
content: z.string(),
analysisType: z.enum(['sentiment', 'summary', 'classification']),
})
export const stepConfig = {
name: 'AI Content Analyzer',
description: 'Analyze content with AI in the background',
flows: ['ai-analysis'],
triggers: [
queue('analyze-content', { input: analysisSchema }),
],
enqueues: ['analysis-complete'],
}
export const { config, handler } = step(stepConfig, async (_input, ctx) => {
const { contentId, content, analysisType } = ctx.getData()
ctx.logger.info('Starting AI analysis', { contentId, analysisType })
try {
let prompt = ''
switch (analysisType) {
case 'sentiment':
prompt = `Analyze the sentiment of this text. Respond with only: positive, negative, or neutral.\n\n${content}`
break
case 'summary':
prompt = `Provide a concise summary of this text in 2-3 sentences:\n\n${content}`
break
case 'classification':
prompt = `Classify this text into categories. List up to 3 categories:\n\n${content}`
break
}
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: prompt }],
})
const result = response.choices[0].message.content
// Store analysis result
await ctx.state.set('analysis', contentId, {
contentId,
type: analysisType,
result,
analyzedAt: new Date().toISOString(),
})
// Notify completion
await ctx.enqueue({
topic: 'analysis-complete',
data: { contentId, type: analysisType, result },
})
ctx.logger.info('Analysis complete', { contentId, analysisType })
} catch (error) {
ctx.logger.error('Analysis failed', { contentId, error })
throw error // Will trigger retry
}
})
steps/trigger-analysis.step.ts
export const handler: Handlers<typeof config> = async (
{ request },
{ enqueue }
) => {
const { contentId, content, analysisType } = request.body
// Enqueue for background processing
await enqueue({
topic: 'analyze-content',
data: { contentId, content, analysisType },
})
return {
status: 202,
body: {
message: 'Analysis started',
contentId,
},
}
}
Using different LLMs
Motia works with any LLM provider. Here’s an example with Anthropic’s Claude:import Anthropic from '@anthropic-ai/sdk'
const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
})
export const handler: Handlers<typeof config> = async (
{ request },
{ logger }
) => {
const { message } = request.body
const response = await anthropic.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: message }],
})
const reply = response.content[0].text
logger.info('Claude response generated')
return {
status: 200,
body: { response: reply },
}
}
RAG (Retrieval Augmented Generation)
Build RAG systems to enhance AI responses with your own data:import { embed, search } from './vector-db'
export const handler: Handlers<typeof config> = async (
{ request },
{ logger, state }
) => {
const { question } = request.body
// 1. Embed the question
const questionEmbedding = await embed(question)
// 2. Search for relevant documents
const relevantDocs = await search(questionEmbedding, { limit: 3 })
// 3. Build context from documents
const context = relevantDocs
.map(doc => doc.content)
.join('\n\n')
// 4. Generate response with context
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: 'Answer questions based on the provided context.',
},
{
role: 'user',
content: `Context:\n${context}\n\nQuestion: ${question}`,
},
],
})
const answer = response.choices[0].message.content
logger.info('RAG response generated', {
docsUsed: relevantDocs.length,
})
return {
status: 200,
body: {
answer,
sources: relevantDocs.map(d => d.id),
},
}
}
Best practices
Handle rate limits
Implement exponential backoff and retry logic for API rate limits.
Cache responses
Cache AI responses when appropriate to reduce costs and latency.
Monitor costs
Track token usage and costs. Set budgets and alerts.
Use background jobs
Process long-running AI tasks in background queues, not inline.
Next steps
Multi-language
Mix TypeScript and Python in one project
Building APIs
Learn more about API patterns and validation