Skip to main content
Create a production-ready real-time dashboard that streams live metrics, events, and status updates to connected clients. This example demonstrates how to use Motia’s stream triggers and SSE capabilities to build responsive, scalable dashboards.

Overview

This example shows how to:
  • Stream real-time metrics using Server-Sent Events (SSE)
  • Update dashboard data with stream triggers
  • Aggregate and process metrics in real-time
  • Handle concurrent client connections efficiently
  • Build responsive UI with live data updates

Architecture

The real-time dashboard consists of four components:
1
1. Metrics Collector
2
Collects system metrics, user activity, and business events via HTTP endpoints and cron jobs.
3
2. Stream Processor
4
Processes and aggregates metrics in real-time using Motia’s stream triggers.
5
3. SSE Broadcaster
6
Streams dashboard updates to connected clients using Server-Sent Events.
7
4. Dashboard State Manager
8
Maintains current dashboard state and handles client subscriptions.

Implementation

Step 1: Metrics Collection

Create endpoints to receive metrics:
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'

const metricSchema = z.object({
  type: z.enum(['page_view', 'user_action', 'api_call', 'error', 'performance']),
  value: z.number(),
  userId: z.string().optional(),
  metadata: z.record(z.any()).optional(),
})

export const config = {
  name: 'Collect Metrics',
  description: 'Receives and stores metrics events',
  flows: ['real-time-dashboard'],
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/metrics/track',
      bodySchema: metricSchema,
    },
  ],
  enqueues: ['metrics.received'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, streams, enqueue }
) => {
  const { type, value, userId, metadata } = request.body

  logger.info('Metric received', { type, value, userId })

  // Create metric event
  const metricId = `${type}-${Date.now()}-${Math.random().toString(36).slice(2)}`
  const metric = {
    id: metricId,
    type,
    value,
    userId,
    metadata,
    timestamp: Date.now(),
  }

  // Store in metrics stream - this will trigger real-time updates
  await streams.metrics.set('live-metrics', metricId, metric)

  // Enqueue for further processing
  await enqueue({
    topic: 'metrics.received',
    data: metric,
  })

  return {
    status: 200,
    body: { success: true, metricId },
  }
}

Step 2: Define Metrics Stream

Create a stream configuration for metrics:
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const metricsSchema = z.object({
  id: z.string(),
  type: z.string(),
  value: z.number(),
  userId: z.string().optional(),
  metadata: z.record(z.any()).optional(),
  timestamp: z.number(),
})

export const config: StreamConfig = {
  baseConfig: { storageType: 'default' },
  name: 'metrics',
  schema: metricsSchema,
}

export type MetricsStreamItem = z.infer<typeof metricsSchema>

Step 3: Real-Time Aggregation

Aggregate metrics using stream triggers:
import { type Handlers, type StepConfig, type StreamTriggerInput } from 'motia'
import type { MetricsStreamItem } from './metrics.stream'

export const config = {
  name: 'Aggregate Metrics',
  description: 'Aggregates metrics in real-time',
  flows: ['real-time-dashboard'],
  triggers: [
    {
      type: 'stream',
      streamName: 'metrics',
      groupId: 'live-metrics',
      condition: (input: StreamTriggerInput<MetricsStreamItem>) => {
        return input.event.type === 'set'
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  request,
  { logger, state }
) => {
  const metric = request.event.data as MetricsStreamItem

  logger.info('Aggregating metric', { type: metric.type, value: metric.value })

  // Update aggregated stats
  const statsKey = `dashboard-stats`
  
  await state.update('dashboard', statsKey, [
    { type: 'increment', path: `metrics.${metric.type}.count`, by: 1 },
    { type: 'increment', path: `metrics.${metric.type}.total`, by: metric.value },
    { type: 'set', path: `metrics.${metric.type}.lastUpdate`, value: metric.timestamp },
  ])

  // Update per-minute buckets for time-series data
  const minuteBucket = Math.floor(metric.timestamp / 60000) // 1-minute buckets
  
  await state.update('time-series', `${metric.type}-${minuteBucket}`, [
    { type: 'increment', path: 'count', by: 1 },
    { type: 'increment', path: 'sum', by: metric.value },
    { type: 'set', path: 'timestamp', value: minuteBucket * 60000 },
  ])

  logger.info('Metric aggregated', { type: metric.type, bucket: minuteBucket })
}

Step 4: SSE Dashboard Stream

Stream dashboard updates to clients:
import { type Handlers, http, type StepConfig } from 'motia'

export const config = {
  name: 'Dashboard Stream',
  description: 'Streams real-time dashboard updates via SSE',
  flows: ['real-time-dashboard'],
  triggers: [http('GET', '/dashboard/stream')],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { response },
  { logger, state, streams }
) => {
  logger.info('Dashboard stream client connected')

  // Set SSE headers
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })

  // Send initial dashboard state
  const dashboardStats = await state.get('dashboard', 'dashboard-stats') || {
    metrics: {},
    connectedClients: 0,
  }

  response.stream.write(
    `event: init\ndata: ${JSON.stringify(dashboardStats)}\n\n`
  )

  // Set up interval to send periodic updates
  const intervalId = setInterval(async () => {
    try {
      // Fetch latest stats
      const stats = await state.get('dashboard', 'dashboard-stats')
      
      // Get recent time-series data
      const now = Date.now()
      const recentBuckets = []
      
      for (let i = 0; i < 10; i++) {
        const bucket = Math.floor((now - i * 60000) / 60000)
        const data = await state.get('time-series', `page_view-${bucket}`)
        if (data) {
          recentBuckets.push(data)
        }
      }

      const update = {
        stats,
        timeSeries: recentBuckets,
        timestamp: now,
      }

      response.stream.write(
        `event: update\ndata: ${JSON.stringify(update)}\n\n`
      )
    } catch (error) {
      logger.error('Error sending dashboard update', { error })
    }
  }, 2000) // Update every 2 seconds

  // Clean up on connection close
  response.onClose(() => {
    clearInterval(intervalId)
    logger.info('Dashboard stream client disconnected')
  })
}

Step 5: System Metrics Collector

Collect system metrics periodically:
import { type Handlers, cron, type StepConfig } from 'motia'
import os from 'os'

export const config = {
  name: 'System Metrics',
  description: 'Collects system performance metrics',
  flows: ['real-time-dashboard'],
  triggers: [cron('*/30 * * * * *')], // Every 30 seconds
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  _,
  { logger, streams }
) => {
  logger.info('Collecting system metrics')

  const metrics = {
    cpu: os.loadavg()[0],
    memory: {
      total: os.totalmem(),
      free: os.freemem(),
      used: os.totalmem() - os.freemem(),
      usagePercent: ((os.totalmem() - os.freemem()) / os.totalmem()) * 100,
    },
    uptime: os.uptime(),
    timestamp: Date.now(),
  }

  // Store CPU metric
  await streams.metrics.set('live-metrics', `cpu-${Date.now()}`, {
    id: `cpu-${Date.now()}`,
    type: 'performance',
    value: metrics.cpu,
    metadata: { metric: 'cpu_load' },
    timestamp: metrics.timestamp,
  })

  // Store memory metric
  await streams.metrics.set('live-metrics', `memory-${Date.now()}`, {
    id: `memory-${Date.now()}`,
    type: 'performance',
    value: metrics.memory.usagePercent,
    metadata: { metric: 'memory_usage' },
    timestamp: metrics.timestamp,
  })

  logger.info('System metrics collected', {
    cpu: metrics.cpu.toFixed(2),
    memoryUsage: `${metrics.memory.usagePercent.toFixed(1)}%`,
  })
}

Client Integration

JavaScript/TypeScript Client

class DashboardClient {
  private eventSource: EventSource | null = null
  private onUpdateCallback: ((data: any) => void) | null = null

  connect(onUpdate: (data: any) => void, onError?: (error: any) => void) {
    this.onUpdateCallback = onUpdate

    this.eventSource = new EventSource('http://localhost:3111/dashboard/stream')

    this.eventSource.addEventListener('init', (event) => {
      const data = JSON.parse(event.data)
      console.log('Dashboard initialized:', data)
      onUpdate(data)
    })

    this.eventSource.addEventListener('update', (event) => {
      const data = JSON.parse(event.data)
      onUpdate(data)
    })

    this.eventSource.onerror = (error) => {
      console.error('Dashboard stream error:', error)
      onError?.(error)
    }
  }

  disconnect() {
    if (this.eventSource) {
      this.eventSource.close()
      this.eventSource = null
    }
  }

  async trackMetric(type: string, value: number, metadata?: Record<string, any>) {
    await fetch('http://localhost:3111/metrics/track', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ type, value, metadata }),
    })
  }
}

// Usage
const dashboard = new DashboardClient()

dashboard.connect(
  (data) => {
    console.log('Dashboard update:', data)
    updateUI(data)
  },
  (error) => console.error('Error:', error)
)

// Track a metric
await dashboard.trackMetric('page_view', 1, { page: '/home' })

React Dashboard Component

import { useEffect, useState } from 'react'

interface DashboardData {
  stats: any
  timeSeries: any[]
  timestamp: number
}

function RealtimeDashboard() {
  const [data, setData] = useState<DashboardData | null>(null)
  const [connected, setConnected] = useState(false)

  useEffect(() => {
    const dashboard = new DashboardClient()

    dashboard.connect(
      (update) => {
        setData(update)
        setConnected(true)
      },
      (error) => {
        console.error('Connection error:', error)
        setConnected(false)
      }
    )

    return () => dashboard.disconnect()
  }, [])

  return (
    <div className="dashboard">
      <div className="status">
        <span className={connected ? 'connected' : 'disconnected'}>
          {connected ? '● Live' : '○ Disconnected'}
        </span>
      </div>

      {data && (
        <>
          <div className="metrics-grid">
            {Object.entries(data.stats.metrics || {}).map(([type, stats]: any) => (
              <div key={type} className="metric-card">
                <h3>{type.replace('_', ' ')}</h3>
                <div className="metric-value">{stats.count}</div>
                <div className="metric-total">Total: {stats.total}</div>
              </div>
            ))}
          </div>

          <div className="time-series-chart">
            <h3>Activity (Last 10 minutes)</h3>
            {/* Render chart with data.timeSeries */}
          </div>
        </>
      )}
    </div>
  )
}

Key Features

Real-Time Updates

Instant metric updates streamed to all connected clients using SSE, with automatic reconnection on disconnect.

Time-Series Aggregation

Efficient bucketing and aggregation of metrics for historical analysis and trend visualization.

Concurrent Connections

Handle thousands of simultaneous dashboard clients with efficient SSE streaming.

Automatic Cleanup

Periodic cleanup of old time-series data to manage storage and maintain performance.

Flexible Metrics

Track any type of metric - page views, API calls, errors, performance, business events.

Testing

Test the dashboard stream:
# Connect to dashboard stream
curl http://localhost:3111/dashboard/stream

# Track a metric
curl -X POST http://localhost:3111/metrics/track \
  -H "Content-Type: application/json" \
  -d '{
    "type": "page_view",
    "value": 1,
    "metadata": { "page": "/dashboard" }
  }'

Advanced Features

Custom Alerts

Trigger alerts when metrics exceed thresholds:
export const config = {
  name: 'Metric Alerts',
  triggers: [
    {
      type: 'state',
      condition: (input: StateTriggerInput) => {
        return input.new_value?.metrics?.error?.count > 100
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  request,
  { logger, enqueue }
) => {
  logger.error('High error rate detected!')
  
  await enqueue({
    topic: 'alerts.send',
    data: {
      type: 'error_spike',
      severity: 'critical',
      message: 'Error rate exceeded threshold',
    },
  })
}

Custom Time Windows

Aggregate metrics over custom time windows:
const windowSizes = [60, 300, 3600] // 1min, 5min, 1hour in seconds

for (const window of windowSizes) {
  const bucket = Math.floor(metric.timestamp / (window * 1000))
  await state.update('time-series', `${metric.type}-${window}s-${bucket}`, [
    { type: 'increment', path: 'count', by: 1 },
    { type: 'increment', path: 'sum', by: metric.value },
  ])
}

Multi-Tenant Dashboards

Isolate metrics by organization or tenant:
const tenantId = request.headers['x-tenant-id']
const metricKey = `${tenantId}-${metric.type}-${bucket}`

await state.update('time-series', metricKey, updates)

Production Considerations

  1. Scalability: Use Redis for distributed state and metrics storage
  2. Data Retention: Implement automatic cleanup of old time-series data
  3. Connection Limits: Monitor and limit concurrent SSE connections
  4. Buffering: Buffer metrics during high traffic to prevent overwhelming storage
  5. Monitoring: Track dashboard performance and connection health

Next Steps

  • Add custom visualization components
  • Implement user-specific dashboard views
  • Build alerting and notification system
  • Add data export capabilities
  • Integrate with monitoring tools (Datadog, Grafana)

Build docs developers (and LLMs) love