- Accepts research queries via API
- Performs iterative web searches
- Analyzes and synthesizes information
- Streams results in real-time
- Stores research sessions
View source code
Complete source code on GitHub
Architecture
The research agent uses a multi-step workflow:- Query endpoint: Accepts research questions
- Research planner: Breaks down query into sub-questions
- Web searcher: Performs searches for each sub-question
- Content analyzer: Analyzes search results with LLM
- Synthesizer: Combines findings into final report
Step 1: Create the query endpoint
Accept research queries and initialize the research session:steps/research-query.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'ResearchQuery',
description: 'Accept research queries and initialize research session',
triggers: [
{
type: 'http',
method: 'POST',
path: '/research',
bodySchema: z.object({
query: z.string().min(10).max(500),
depth: z.enum(['shallow', 'medium', 'deep']).default('medium'),
maxSources: z.number().min(3).max(20).default(10),
}),
responseSchema: {
200: z.object({
sessionId: z.string(),
status: z.string(),
streamUrl: z.string(),
}),
},
},
],
enqueues: ['research.started'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
request,
{ enqueue, state, logger, streams }
) => {
const { query, depth, maxSources } = request.body
const sessionId = `research-${Date.now()}-${Math.random().toString(36).substring(7)}`
logger.info('Starting research session', { sessionId, query })
// Initialize research session in state
await state.set('research-sessions', sessionId, {
query,
depth,
maxSources,
status: 'planning',
startedAt: new Date().toISOString(),
subQuestions: [],
findings: [],
})
// Stream initial status
await streams.research.append(sessionId, {
type: 'started',
query,
timestamp: new Date().toISOString(),
})
// Start research planning
await enqueue({
topic: 'research.started',
data: { sessionId, query, depth, maxSources },
})
return {
status: 200,
body: {
sessionId,
status: 'started',
streamUrl: `/streams/research/${sessionId}`,
},
}
}
Step 2: Research planner
Use an LLM to break down the query into sub-questions:steps/research-planner.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const inputSchema = z.object({
sessionId: z.string(),
query: z.string(),
depth: z.string(),
maxSources: z.number(),
})
export const config = {
name: 'ResearchPlanner',
description: 'Break down research query into sub-questions',
triggers: [
{
type: 'queue',
topic: 'research.started',
input: inputSchema,
},
],
enqueues: ['research.search'],
} as const satisfies StepConfig
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const handler: Handlers<typeof config> = async (
input,
{ enqueue, state, logger, streams }
) => {
const { sessionId, query, depth, maxSources } = input
logger.info('Planning research', { sessionId, query })
// Stream planning status
await streams.research.append(sessionId, {
type: 'planning',
message: 'Breaking down research query...',
timestamp: new Date().toISOString(),
})
// Get sub-questions from LLM
const subQuestionCount = depth === 'deep' ? 5 : depth === 'medium' ? 3 : 2
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: `You are a research assistant. Break down research queries into ${subQuestionCount} focused sub-questions that will help answer the main question comprehensively.`,
},
{
role: 'user',
content: `Main question: ${query}\n\nProvide ${subQuestionCount} sub-questions in JSON format: {"questions": ["q1", "q2", ...]}`,
},
],
})
const result = JSON.parse(response.choices[0].message.content)
const subQuestions = result.questions
// Update state with sub-questions
await state.update('research-sessions', sessionId, {
subQuestions,
status: 'searching',
})
// Stream sub-questions
await streams.research.append(sessionId, {
type: 'sub_questions',
questions: subQuestions,
timestamp: new Date().toISOString(),
})
// Enqueue search for each sub-question
for (const [index, subQuestion] of subQuestions.entries()) {
await enqueue({
topic: 'research.search',
data: {
sessionId,
subQuestion,
index,
maxSources: Math.ceil(maxSources / subQuestions.length),
},
})
}
logger.info('Research plan created', {
sessionId,
subQuestionCount: subQuestions.length,
})
}
Step 3: Web searcher
Perform web searches for each sub-question:steps/web-searcher.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import axios from 'axios'
const inputSchema = z.object({
sessionId: z.string(),
subQuestion: z.string(),
index: z.number(),
maxSources: z.number(),
})
export const config = {
name: 'WebSearcher',
description: 'Perform web searches for research questions',
triggers: [
{
type: 'queue',
topic: 'research.search',
input: inputSchema,
},
],
enqueues: ['research.analyze'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
input,
{ enqueue, logger, streams }
) => {
const { sessionId, subQuestion, index, maxSources } = input
logger.info('Searching web', { sessionId, subQuestion, index })
// Stream search status
await streams.research.append(sessionId, {
type: 'searching',
question: subQuestion,
index,
timestamp: new Date().toISOString(),
})
// Perform web search (using Brave Search API, Serper, etc.)
const results = await performWebSearch(subQuestion, maxSources)
// Stream search results
await streams.research.append(sessionId, {
type: 'search_results',
question: subQuestion,
index,
resultsCount: results.length,
timestamp: new Date().toISOString(),
})
// Enqueue for analysis
await enqueue({
topic: 'research.analyze',
data: {
sessionId,
subQuestion,
index,
results,
},
})
logger.info('Search completed', {
sessionId,
index,
resultsCount: results.length,
})
}
async function performWebSearch(query: string, limit: number) {
// Example using Brave Search API
const response = await axios.get('https://api.search.brave.com/res/v1/web/search', {
headers: {
'X-Subscription-Token': process.env.BRAVE_API_KEY,
},
params: {
q: query,
count: limit,
},
})
return response.data.web.results.map((result: any) => ({
title: result.title,
url: result.url,
description: result.description,
}))
}
Step 4: Content analyzer
Analyze search results with LLM:steps/content-analyzer.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const inputSchema = z.object({
sessionId: z.string(),
subQuestion: z.string(),
index: z.number(),
results: z.array(
z.object({
title: z.string(),
url: z.string(),
description: z.string(),
})
),
})
export const config = {
name: 'ContentAnalyzer',
description: 'Analyze search results with LLM',
triggers: [
{
type: 'queue',
topic: 'research.analyze',
input: inputSchema,
},
],
enqueues: ['research.synthesize'],
} as const satisfies StepConfig
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const handler: Handlers<typeof config> = async (
input,
{ enqueue, state, logger, streams }
) => {
const { sessionId, subQuestion, index, results } = input
logger.info('Analyzing content', { sessionId, subQuestion, index })
// Stream analysis status
await streams.research.append(sessionId, {
type: 'analyzing',
question: subQuestion,
index,
timestamp: new Date().toISOString(),
})
// Analyze with LLM
const resultsText = results
.map((r, i) => `[${i + 1}] ${r.title}\n${r.description}\nSource: ${r.url}`)
.join('\n\n')
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content:
'You are a research analyst. Analyze search results and extract key findings relevant to the research question.',
},
{
role: 'user',
content: `Research question: ${subQuestion}\n\nSearch results:\n${resultsText}\n\nProvide a concise summary of key findings (2-3 paragraphs).`,
},
],
})
const analysis = response.choices[0].message.content
// Update state with findings
const session = await state.get('research-sessions', sessionId)
const updatedFindings = [...(session.findings || []), { subQuestion, index, analysis }]
await state.update('research-sessions', sessionId, {
findings: updatedFindings,
})
// Stream finding
await streams.research.append(sessionId, {
type: 'finding',
question: subQuestion,
index,
analysis,
timestamp: new Date().toISOString(),
})
// Check if all sub-questions analyzed
if (updatedFindings.length === session.subQuestions.length) {
await enqueue({
topic: 'research.synthesize',
data: { sessionId },
})
}
logger.info('Analysis completed', { sessionId, index })
}
Step 5: Synthesizer
Combine findings into final report:steps/synthesizer.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const inputSchema = z.object({
sessionId: z.string(),
})
export const config = {
name: 'Synthesizer',
description: 'Synthesize research findings into final report',
triggers: [
{
type: 'queue',
topic: 'research.synthesize',
input: inputSchema,
},
],
} as const satisfies StepConfig
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const handler: Handlers<typeof config> = async (
input,
{ state, logger, streams }
) => {
const { sessionId } = input
logger.info('Synthesizing research', { sessionId })
// Get research session
const session = await state.get('research-sessions', sessionId)
// Stream synthesis status
await streams.research.append(sessionId, {
type: 'synthesizing',
message: 'Creating final report...',
timestamp: new Date().toISOString(),
})
// Combine findings
const findingsText = session.findings
.map((f: any) => `Q: ${f.subQuestion}\n\nFindings:\n${f.analysis}`)
.join('\n\n---\n\n')
// Generate final report
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content:
'You are a research writer. Synthesize research findings into a comprehensive, well-structured report.',
},
{
role: 'user',
content: `Original question: ${session.query}\n\nResearch findings:\n\n${findingsText}\n\nCreate a comprehensive report with introduction, key findings, and conclusion.`,
},
],
})
const report = response.choices[0].message.content
// Update state
await state.update('research-sessions', sessionId, {
report,
status: 'completed',
completedAt: new Date().toISOString(),
})
// Stream final report
await streams.research.append(sessionId, {
type: 'completed',
report,
timestamp: new Date().toISOString(),
})
logger.info('Research completed', { sessionId })
}
Frontend integration
Connect to the research stream:components/ResearchViewer.tsx
import { useMotiaStream } from 'motia-stream-client-react'
import { useState } from 'react'
export function ResearchViewer({ sessionId }: { sessionId: string }) {
const [status, setStatus] = useState('Initializing...')
const [findings, setFindings] = useState([])
const [report, setReport] = useState(null)
const stream = useMotiaStream({
url: 'http://localhost:3000',
streamName: 'research',
groupId: sessionId,
})
stream.on('sub_questions', (event) => {
setStatus(`Researching ${event.questions.length} topics...`)
})
stream.on('finding', (event) => {
setFindings((prev) => [...prev, event])
})
stream.on('completed', (event) => {
setReport(event.report)
setStatus('Research completed')
})
return (
<div>
<h2>Status: {status}</h2>
<div>
{findings.map((f, i) => (
<div key={i}>
<h3>{f.question}</h3>
<p>{f.analysis}</p>
</div>
))}
</div>
{report && (
<div>
<h2>Final Report</h2>
<div>{report}</div>
</div>
)}
</div>
)
}
What you learned
AI agents
Build multi-step AI workflows
Real-time streaming
Stream progress updates to frontend
Queue-based workflows
Chain Steps for complex processing
State management
Store and update research sessions
Next steps
Streaming chatbot
Build a real-time AI chatbot
Gmail automation
Automate email workflows