- Real-time AI responses using Server-Sent Events (SSE)
- Conversation history management
- Multi-user support with isolated sessions
- Stream-based real-time updates
View source code
Complete source code on GitHub
What you’ll build
A chatbot system with:- HTTP endpoint for sending messages
- Streaming LLM responses
- Persistent conversation history
- Real-time UI updates via Motia streams
Architecture
Step 1: Create chat endpoint
Createsteps/chat-message.step.ts to receive messages:
steps/chat-message.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'ChatMessage',
description: 'Receive chat messages and enqueue for processing',
triggers: [
{
type: 'http',
method: 'POST',
path: '/chat',
bodySchema: z.object({
message: z.string().min(1).max(2000),
conversationId: z.string().optional(),
userId: z.string(),
}),
responseSchema: {
200: z.object({
conversationId: z.string(),
messageId: z.string(),
status: z.string(),
}),
},
},
],
enqueues: ['chat.process'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
request,
{ enqueue, state, logger, streams }
) => {
const { message, conversationId, userId } = request.body
const convId = conversationId || `conv-${Date.now()}-${Math.random().toString(36).substring(7)}`
const messageId = `msg-${Date.now()}`
logger.info('Received chat message', { convId, messageId, userId })
// Get or create conversation
let conversation = await state.get('conversations', convId)
if (!conversation) {
conversation = {
id: convId,
userId,
messages: [],
createdAt: new Date().toISOString(),
}
await state.set('conversations', convId, conversation)
}
// Add user message to conversation
conversation.messages.push({
id: messageId,
role: 'user',
content: message,
timestamp: new Date().toISOString(),
})
await state.update('conversations', convId, {
messages: conversation.messages,
})
// Stream user message to frontend
await streams.chat.append(convId, {
type: 'user_message',
messageId,
content: message,
timestamp: new Date().toISOString(),
})
// Enqueue for AI processing
await enqueue({
topic: 'chat.process',
data: {
conversationId: convId,
messageId,
message,
history: conversation.messages,
},
})
return {
status: 200,
body: {
conversationId: convId,
messageId,
status: 'processing',
},
}
}
Step 2: Process with streaming
Createsteps/process-message.step.ts for AI processing with streaming:
steps/process-message.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const inputSchema = z.object({
conversationId: z.string(),
messageId: z.string(),
message: z.string(),
history: z.array(
z.object({
role: z.enum(['user', 'assistant']),
content: z.string(),
})
),
})
export const config = {
name: 'ProcessMessage',
description: 'Process chat messages with streaming AI responses',
triggers: [
{
type: 'queue',
topic: 'chat.process',
input: inputSchema,
},
],
} as const satisfies StepConfig
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const handler: Handlers<typeof config> = async (
input,
{ state, logger, streams }
) => {
const { conversationId, messageId, message, history } = input
logger.info('Processing message with AI', { conversationId, messageId })
// Stream thinking status
await streams.chat.append(conversationId, {
type: 'assistant_thinking',
timestamp: new Date().toISOString(),
})
// Prepare conversation history for OpenAI
const messages = history.map((msg) => ({
role: msg.role as 'user' | 'assistant',
content: msg.content,
}))
// Stream response from OpenAI
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: 'You are a helpful assistant. Provide clear, concise, and friendly responses.',
},
...messages,
],
stream: true,
})
let fullResponse = ''
const assistantMessageId = `msg-${Date.now()}`
// Stream each chunk to frontend
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
fullResponse += content
// Stream chunk to frontend
await streams.chat.append(conversationId, {
type: 'assistant_chunk',
messageId: assistantMessageId,
content,
timestamp: new Date().toISOString(),
})
}
}
// Stream completion
await streams.chat.append(conversationId, {
type: 'assistant_complete',
messageId: assistantMessageId,
fullResponse,
timestamp: new Date().toISOString(),
})
// Update conversation history
const conversation = await state.get('conversations', conversationId)
conversation.messages.push({
id: assistantMessageId,
role: 'assistant',
content: fullResponse,
timestamp: new Date().toISOString(),
})
await state.update('conversations', conversationId, {
messages: conversation.messages,
updatedAt: new Date().toISOString(),
})
logger.info('Message processed', {
conversationId,
assistantMessageId,
responseLength: fullResponse.length,
})
}
Step 3: Configure the chat stream
Createsteps/chat.stream.ts for real-time updates:
steps/chat.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const chatEventSchema = z.discriminatedUnion('type', [
z.object({
type: z.literal('user_message'),
messageId: z.string(),
content: z.string(),
timestamp: z.string(),
}),
z.object({
type: z.literal('assistant_thinking'),
timestamp: z.string(),
}),
z.object({
type: z.literal('assistant_chunk'),
messageId: z.string(),
content: z.string(),
timestamp: z.string(),
}),
z.object({
type: z.literal('assistant_complete'),
messageId: z.string(),
fullResponse: z.string(),
timestamp: z.string(),
}),
])
export const config: StreamConfig = {
name: 'chat',
schema: chatEventSchema,
baseConfig: { storageType: 'default' },
onJoin: async (subscription, context) => {
context.logger.info('User joined chat', {
conversationId: subscription.groupId,
})
return { unauthorized: false }
},
onLeave: async (subscription, context) => {
context.logger.info('User left chat', {
conversationId: subscription.groupId,
})
},
}
Step 4: Alternative - SSE endpoint
For direct SSE without Motia streams, createsteps/chat-sse.step.ts:
steps/chat-sse.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import OpenAI from 'openai'
export const config = {
name: 'ChatSSE',
description: 'Stream chat responses via Server-Sent Events',
triggers: [http('POST', '/chat/stream')],
} as const satisfies StepConfig
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const handler: Handlers<typeof config> = async (
{ request, response },
{ logger }
) => {
logger.info('SSE chat request received')
// Set SSE headers
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
connection: 'keep-alive',
})
// Parse request body
const chunks: string[] = []
for await (const chunk of request.requestBody.stream) {
chunks.push(Buffer.from(chunk).toString('utf-8'))
}
const body = JSON.parse(chunks.join(''))
const { message, history = [] } = body
// Stream thinking event
response.stream.write(
`event: thinking\ndata: ${JSON.stringify({ status: 'processing' })}\n\n`
)
// Get streaming response from OpenAI
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: 'You are a helpful assistant.',
},
...history,
{ role: 'user', content: message },
],
stream: true,
})
// Stream each chunk
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
response.stream.write(`event: chunk\ndata: ${JSON.stringify({ content })}\n\n`)
}
}
// Stream done event
response.stream.write(`event: done\ndata: ${JSON.stringify({ complete: true })}\n\n`)
response.close()
}
Frontend: React integration
Connect to the chat stream:components/ChatWindow.tsx
import { useMotiaStream } from 'motia-stream-client-react'
import { useState } from 'react'
interface Message {
id: string
role: 'user' | 'assistant'
content: string
timestamp: string
}
export function ChatWindow({ conversationId }: { conversationId: string }) {
const [messages, setMessages] = useState<Message[]>([])
const [currentAssistantMessage, setCurrentAssistantMessage] = useState('')
const [isThinking, setIsThinking] = useState(false)
const stream = useMotiaStream({
url: 'http://localhost:3000',
streamName: 'chat',
groupId: conversationId,
})
stream.on('user_message', (event) => {
setMessages((prev) => [
...prev,
{
id: event.messageId,
role: 'user',
content: event.content,
timestamp: event.timestamp,
},
])
})
stream.on('assistant_thinking', () => {
setIsThinking(true)
setCurrentAssistantMessage('')
})
stream.on('assistant_chunk', (event) => {
setIsThinking(false)
setCurrentAssistantMessage((prev) => prev + event.content)
})
stream.on('assistant_complete', (event) => {
setMessages((prev) => [
...prev,
{
id: event.messageId,
role: 'assistant',
content: event.fullResponse,
timestamp: event.timestamp,
},
])
setCurrentAssistantMessage('')
})
return (
<div>
<div className="messages">
{messages.map((msg) => (
<div key={msg.id} className={`message ${msg.role}`}>
<strong>{msg.role}:</strong> {msg.content}
</div>
))}
{isThinking && <div className="thinking">Assistant is thinking...</div>}
{currentAssistantMessage && (
<div className="message assistant streaming">
<strong>assistant:</strong> {currentAssistantMessage}
</div>
)}
</div>
</div>
)
}
Testing the chatbot
Send a message:curl -X POST http://localhost:3000/chat \
-H "Content-Type: application/json" \
-d '{
"message": "Hello, how are you?",
"userId": "user-123"
}'
{
"conversationId": "conv-1234567890-abc123",
"messageId": "msg-1234567890",
"status": "processing"
}
What you learned
Real-time streaming
Stream AI responses in real-time
Server-Sent Events
Use SSE for one-way server-to-client streaming
Conversation state
Manage conversation history with state
Motia streams
Use Motia streams for real-time updates
Next steps
Gmail automation
Build email workflow automation
AI agents guide
Learn advanced AI agent patterns