Skip to main content
This example demonstrates building an AI research agent that:
  • Accepts research queries via API
  • Performs iterative web searches
  • Analyzes and synthesizes information
  • Streams results in real-time
  • Stores research sessions

View source code

Complete source code on GitHub

Architecture

The research agent uses a multi-step workflow:
  1. Query endpoint: Accepts research questions
  2. Research planner: Breaks down query into sub-questions
  3. Web searcher: Performs searches for each sub-question
  4. Content analyzer: Analyzes search results with LLM
  5. Synthesizer: Combines findings into final report

Step 1: Create the query endpoint

Accept research queries and initialize the research session:
steps/research-query.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'ResearchQuery',
  description: 'Accept research queries and initialize research session',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/research',
      bodySchema: z.object({
        query: z.string().min(10).max(500),
        depth: z.enum(['shallow', 'medium', 'deep']).default('medium'),
        maxSources: z.number().min(3).max(20).default(10),
      }),
      responseSchema: {
        200: z.object({
          sessionId: z.string(),
          status: z.string(),
          streamUrl: z.string(),
        }),
      },
    },
  ],
  enqueues: ['research.started'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  request,
  { enqueue, state, logger, streams }
) => {
  const { query, depth, maxSources } = request.body
  const sessionId = `research-${Date.now()}-${Math.random().toString(36).substring(7)}`

  logger.info('Starting research session', { sessionId, query })

  // Initialize research session in state
  await state.set('research-sessions', sessionId, {
    query,
    depth,
    maxSources,
    status: 'planning',
    startedAt: new Date().toISOString(),
    subQuestions: [],
    findings: [],
  })

  // Stream initial status
  await streams.research.append(sessionId, {
    type: 'started',
    query,
    timestamp: new Date().toISOString(),
  })

  // Start research planning
  await enqueue({
    topic: 'research.started',
    data: { sessionId, query, depth, maxSources },
  })

  return {
    status: 200,
    body: {
      sessionId,
      status: 'started',
      streamUrl: `/streams/research/${sessionId}`,
    },
  }
}

Step 2: Research planner

Use an LLM to break down the query into sub-questions:
steps/research-planner.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const inputSchema = z.object({
  sessionId: z.string(),
  query: z.string(),
  depth: z.string(),
  maxSources: z.number(),
})

export const config = {
  name: 'ResearchPlanner',
  description: 'Break down research query into sub-questions',
  triggers: [
    {
      type: 'queue',
      topic: 'research.started',
      input: inputSchema,
    },
  ],
  enqueues: ['research.search'],
} as const satisfies StepConfig

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const handler: Handlers<typeof config> = async (
  input,
  { enqueue, state, logger, streams }
) => {
  const { sessionId, query, depth, maxSources } = input

  logger.info('Planning research', { sessionId, query })

  // Stream planning status
  await streams.research.append(sessionId, {
    type: 'planning',
    message: 'Breaking down research query...',
    timestamp: new Date().toISOString(),
  })

  // Get sub-questions from LLM
  const subQuestionCount = depth === 'deep' ? 5 : depth === 'medium' ? 3 : 2

  const response = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [
      {
        role: 'system',
        content: `You are a research assistant. Break down research queries into ${subQuestionCount} focused sub-questions that will help answer the main question comprehensively.`,
      },
      {
        role: 'user',
        content: `Main question: ${query}\n\nProvide ${subQuestionCount} sub-questions in JSON format: {"questions": ["q1", "q2", ...]}`,
      },
    ],
  })

  const result = JSON.parse(response.choices[0].message.content)
  const subQuestions = result.questions

  // Update state with sub-questions
  await state.update('research-sessions', sessionId, {
    subQuestions,
    status: 'searching',
  })

  // Stream sub-questions
  await streams.research.append(sessionId, {
    type: 'sub_questions',
    questions: subQuestions,
    timestamp: new Date().toISOString(),
  })

  // Enqueue search for each sub-question
  for (const [index, subQuestion] of subQuestions.entries()) {
    await enqueue({
      topic: 'research.search',
      data: {
        sessionId,
        subQuestion,
        index,
        maxSources: Math.ceil(maxSources / subQuestions.length),
      },
    })
  }

  logger.info('Research plan created', {
    sessionId,
    subQuestionCount: subQuestions.length,
  })
}

Step 3: Web searcher

Perform web searches for each sub-question:
steps/web-searcher.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import axios from 'axios'

const inputSchema = z.object({
  sessionId: z.string(),
  subQuestion: z.string(),
  index: z.number(),
  maxSources: z.number(),
})

export const config = {
  name: 'WebSearcher',
  description: 'Perform web searches for research questions',
  triggers: [
    {
      type: 'queue',
      topic: 'research.search',
      input: inputSchema,
    },
  ],
  enqueues: ['research.analyze'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { enqueue, logger, streams }
) => {
  const { sessionId, subQuestion, index, maxSources } = input

  logger.info('Searching web', { sessionId, subQuestion, index })

  // Stream search status
  await streams.research.append(sessionId, {
    type: 'searching',
    question: subQuestion,
    index,
    timestamp: new Date().toISOString(),
  })

  // Perform web search (using Brave Search API, Serper, etc.)
  const results = await performWebSearch(subQuestion, maxSources)

  // Stream search results
  await streams.research.append(sessionId, {
    type: 'search_results',
    question: subQuestion,
    index,
    resultsCount: results.length,
    timestamp: new Date().toISOString(),
  })

  // Enqueue for analysis
  await enqueue({
    topic: 'research.analyze',
    data: {
      sessionId,
      subQuestion,
      index,
      results,
    },
  })

  logger.info('Search completed', {
    sessionId,
    index,
    resultsCount: results.length,
  })
}

async function performWebSearch(query: string, limit: number) {
  // Example using Brave Search API
  const response = await axios.get('https://api.search.brave.com/res/v1/web/search', {
    headers: {
      'X-Subscription-Token': process.env.BRAVE_API_KEY,
    },
    params: {
      q: query,
      count: limit,
    },
  })

  return response.data.web.results.map((result: any) => ({
    title: result.title,
    url: result.url,
    description: result.description,
  }))
}

Step 4: Content analyzer

Analyze search results with LLM:
steps/content-analyzer.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const inputSchema = z.object({
  sessionId: z.string(),
  subQuestion: z.string(),
  index: z.number(),
  results: z.array(
    z.object({
      title: z.string(),
      url: z.string(),
      description: z.string(),
    })
  ),
})

export const config = {
  name: 'ContentAnalyzer',
  description: 'Analyze search results with LLM',
  triggers: [
    {
      type: 'queue',
      topic: 'research.analyze',
      input: inputSchema,
    },
  ],
  enqueues: ['research.synthesize'],
} as const satisfies StepConfig

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const handler: Handlers<typeof config> = async (
  input,
  { enqueue, state, logger, streams }
) => {
  const { sessionId, subQuestion, index, results } = input

  logger.info('Analyzing content', { sessionId, subQuestion, index })

  // Stream analysis status
  await streams.research.append(sessionId, {
    type: 'analyzing',
    question: subQuestion,
    index,
    timestamp: new Date().toISOString(),
  })

  // Analyze with LLM
  const resultsText = results
    .map((r, i) => `[${i + 1}] ${r.title}\n${r.description}\nSource: ${r.url}`)
    .join('\n\n')

  const response = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [
      {
        role: 'system',
        content:
          'You are a research analyst. Analyze search results and extract key findings relevant to the research question.',
      },
      {
        role: 'user',
        content: `Research question: ${subQuestion}\n\nSearch results:\n${resultsText}\n\nProvide a concise summary of key findings (2-3 paragraphs).`,
      },
    ],
  })

  const analysis = response.choices[0].message.content

  // Update state with findings
  const session = await state.get('research-sessions', sessionId)
  const updatedFindings = [...(session.findings || []), { subQuestion, index, analysis }]

  await state.update('research-sessions', sessionId, {
    findings: updatedFindings,
  })

  // Stream finding
  await streams.research.append(sessionId, {
    type: 'finding',
    question: subQuestion,
    index,
    analysis,
    timestamp: new Date().toISOString(),
  })

  // Check if all sub-questions analyzed
  if (updatedFindings.length === session.subQuestions.length) {
    await enqueue({
      topic: 'research.synthesize',
      data: { sessionId },
    })
  }

  logger.info('Analysis completed', { sessionId, index })
}

Step 5: Synthesizer

Combine findings into final report:
steps/synthesizer.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const inputSchema = z.object({
  sessionId: z.string(),
})

export const config = {
  name: 'Synthesizer',
  description: 'Synthesize research findings into final report',
  triggers: [
    {
      type: 'queue',
      topic: 'research.synthesize',
      input: inputSchema,
    },
  ],
} as const satisfies StepConfig

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const handler: Handlers<typeof config> = async (
  input,
  { state, logger, streams }
) => {
  const { sessionId } = input

  logger.info('Synthesizing research', { sessionId })

  // Get research session
  const session = await state.get('research-sessions', sessionId)

  // Stream synthesis status
  await streams.research.append(sessionId, {
    type: 'synthesizing',
    message: 'Creating final report...',
    timestamp: new Date().toISOString(),
  })

  // Combine findings
  const findingsText = session.findings
    .map((f: any) => `Q: ${f.subQuestion}\n\nFindings:\n${f.analysis}`)
    .join('\n\n---\n\n')

  // Generate final report
  const response = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [
      {
        role: 'system',
        content:
          'You are a research writer. Synthesize research findings into a comprehensive, well-structured report.',
      },
      {
        role: 'user',
        content: `Original question: ${session.query}\n\nResearch findings:\n\n${findingsText}\n\nCreate a comprehensive report with introduction, key findings, and conclusion.`,
      },
    ],
  })

  const report = response.choices[0].message.content

  // Update state
  await state.update('research-sessions', sessionId, {
    report,
    status: 'completed',
    completedAt: new Date().toISOString(),
  })

  // Stream final report
  await streams.research.append(sessionId, {
    type: 'completed',
    report,
    timestamp: new Date().toISOString(),
  })

  logger.info('Research completed', { sessionId })
}

Frontend integration

Connect to the research stream:
components/ResearchViewer.tsx
import { useMotiaStream } from 'motia-stream-client-react'
import { useState } from 'react'

export function ResearchViewer({ sessionId }: { sessionId: string }) {
  const [status, setStatus] = useState('Initializing...')
  const [findings, setFindings] = useState([])
  const [report, setReport] = useState(null)

  const stream = useMotiaStream({
    url: 'http://localhost:3000',
    streamName: 'research',
    groupId: sessionId,
  })

  stream.on('sub_questions', (event) => {
    setStatus(`Researching ${event.questions.length} topics...`)
  })

  stream.on('finding', (event) => {
    setFindings((prev) => [...prev, event])
  })

  stream.on('completed', (event) => {
    setReport(event.report)
    setStatus('Research completed')
  })

  return (
    <div>
      <h2>Status: {status}</h2>
      <div>
        {findings.map((f, i) => (
          <div key={i}>
            <h3>{f.question}</h3>
            <p>{f.analysis}</p>
          </div>
        ))}
      </div>
      {report && (
        <div>
          <h2>Final Report</h2>
          <div>{report}</div>
        </div>
      )}
    </div>
  )
}

What you learned

AI agents

Build multi-step AI workflows

Real-time streaming

Stream progress updates to frontend

Queue-based workflows

Chain Steps for complex processing

State management

Store and update research sessions

Next steps

Streaming chatbot

Build a real-time AI chatbot

Gmail automation

Automate email workflows

Build docs developers (and LLMs) love