Simple AI chatbot
Build a basic chatbot with streaming responses:// steps/chat.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
export const config = {
name: 'Chat',
triggers: [
{
type: 'http',
method: 'POST',
path: '/chat',
bodySchema: z.object({
message: z.string(),
conversationId: z.string(),
}),
},
],
enqueues: [],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request, response }, { logger, state }) => {
const { message, conversationId } = request.body
// Get conversation history
const history = await state.get('conversations', conversationId) || { messages: [] }
// Add user message
history.messages.push({ role: 'user', content: message })
logger.info('Processing chat message', { conversationId, message })
// Set up SSE streaming
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
'connection': 'keep-alive',
})
// Stream AI response
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: history.messages,
stream: true,
})
let fullResponse = ''
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
fullResponse += content
response.stream.write(`data: ${JSON.stringify({ content })}\n\n`)
}
}
// Save assistant message
history.messages.push({ role: 'assistant', content: fullResponse })
await state.set('conversations', conversationId, history)
response.stream.write(`data: [DONE]\n\n`)
response.close()
}
Multi-step AI agent
Build an agent that performs research in multiple steps:Step 1: Receive research request
Create an API endpoint that starts the research:
// steps/start-research.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'StartResearch',
triggers: [
{
type: 'http',
method: 'POST',
path: '/research',
bodySchema: z.object({
query: z.string(),
depth: z.number().optional(),
}),
},
],
enqueues: ['research.gather'],
flows: ['ai-research'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { enqueue, logger }) => {
const { query, depth = 3 } = request.body
const researchId = `research-${Date.now()}`
logger.info('Starting research', { researchId, query, depth })
await enqueue({
topic: 'research.gather',
data: {
researchId,
query,
depth,
step: 1,
},
})
return {
status: 200,
body: { researchId, status: 'started' },
}
}
Step 2: Gather information
Use an AI agent to gather information:
// steps/gather-info.step.ts
import { queue } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
const researchSchema = z.object({
researchId: z.string(),
query: z.string(),
depth: z.number(),
step: z.number(),
})
export const config = {
name: 'GatherInfo',
triggers: [queue('research.gather', { input: researchSchema })],
enqueues: ['research.analyze'],
flows: ['ai-research'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue }) => {
const { researchId, query, depth, step } = input
logger.info('Gathering information', { researchId, step })
// Use AI to generate search queries
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: 'You are a research assistant. Generate 3 specific search queries to research this topic.',
},
{
role: 'user',
content: query,
},
],
})
const queries = completion.choices[0].message.content.split('\n').filter(Boolean)
// Perform web searches (using your preferred search API)
const results = await Promise.all(
queries.map(q => searchWeb(q))
)
// Store results
await state.set('research-data', `${researchId}-step-${step}`, {
queries,
results,
timestamp: new Date().toISOString(),
})
// Enqueue analysis
await enqueue({
topic: 'research.analyze',
data: {
researchId,
query,
depth,
step,
},
})
}
Step 3: Analyze and synthesize
Analyze gathered information and create a report:
// steps/analyze-research.step.ts
import { queue } from 'motia'
import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
export const config = {
name: 'AnalyzeResearch',
triggers: [queue('research.analyze', { input: researchSchema })],
enqueues: ['research.complete'],
flows: ['ai-research'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { logger, state, enqueue }) => {
const { researchId, query, depth, step } = input
logger.info('Analyzing research', { researchId, step })
// Get all research data
const allSteps = await state.list(`research-data`, {
prefix: `${researchId}-`,
})
// Synthesize findings with AI
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: 'You are a research analyst. Synthesize the following research into a comprehensive report.',
},
{
role: 'user',
content: JSON.stringify(allSteps),
},
],
})
const report = completion.choices[0].message.content
// Store final report
await state.set('research-reports', researchId, {
query,
report,
completedAt: new Date().toISOString(),
})
// Continue or complete
if (step < depth) {
await enqueue({
topic: 'research.gather',
data: {
researchId,
query: report, // Use report to guide next research step
depth,
step: step + 1,
},
})
} else {
await enqueue({
topic: 'research.complete',
data: { researchId },
})
}
}
AI agent with tool calling
Build an agent that can call external tools:import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
const tools = [
{
type: 'function',
function: {
name: 'search_web',
description: 'Search the web for information',
parameters: {
type: 'object',
properties: {
query: { type: 'string', description: 'The search query' },
},
required: ['query'],
},
},
},
{
type: 'function',
function: {
name: 'get_weather',
description: 'Get current weather for a location',
parameters: {
type: 'object',
properties: {
location: { type: 'string', description: 'City name' },
},
required: ['location'],
},
},
},
]
export const handler: Handlers<typeof config> = async ({ request }, { logger, state }) => {
const { message, conversationId } = request.body
const history = await state.get('conversations', conversationId) || { messages: [] }
history.messages.push({ role: 'user', content: message })
let response = await openai.chat.completions.create({
model: 'gpt-4',
messages: history.messages,
tools,
})
let message = response.choices[0].message
// Handle tool calls
while (message.tool_calls) {
history.messages.push(message)
for (const toolCall of message.tool_calls) {
const functionName = toolCall.function.name
const args = JSON.parse(toolCall.function.arguments)
logger.info('Calling tool', { functionName, args })
let result
if (functionName === 'search_web') {
result = await searchWeb(args.query)
} else if (functionName === 'get_weather') {
result = await getWeather(args.location)
}
history.messages.push({
role: 'tool',
tool_call_id: toolCall.id,
content: JSON.stringify(result),
})
}
// Get next response
response = await openai.chat.completions.create({
model: 'gpt-4',
messages: history.messages,
tools,
})
message = response.choices[0].message
}
history.messages.push(message)
await state.set('conversations', conversationId, history)
return {
status: 200,
body: { message: message.content },
}
}
Agent memory with state
Use state management for persistent agent memory:export const handler: Handlers<typeof config> = async ({ request }, { state, logger }) => {
const { message, userId } = request.body
// Get user memory
const memory = await state.get('user-memory', userId) || {
preferences: {},
history: [],
context: {},
}
// Add to history
memory.history.push({
message,
timestamp: new Date().toISOString(),
})
// Use memory in AI prompt
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: `User preferences: ${JSON.stringify(memory.preferences)}`,
},
...memory.history.slice(-10).map(h => ({
role: 'user',
content: h.message,
})),
],
})
// Update memory
await state.set('user-memory', userId, memory)
return {
status: 200,
body: { message: completion.choices[0].message.content },
}
}
Real-world example: ChessArena.ai
Check out ChessArena.ai - a production AI agent built with Motia:- Multi-agent LLM evaluation (OpenAI, Claude, Gemini, Grok)
- Python engine integration (Stockfish chess evaluation)
- Real-time streaming with live move updates
- TypeScript APIs to Python processors
- Full observability and tracing
Related concepts
Real-time streaming
Stream AI responses in real-time
Workflows
Build multi-step agent workflows
State management
Store agent memory and context
Multi-language
Combine TypeScript and Python