Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MotiaDev/motia/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The @motiadev/stream-client package provides a client-side API for subscribing to real-time data streams. It supports WebSocket connections and provides reactive subscriptions to individual items and groups.
Installation
npm install @motiadev/stream-client
Classes
Stream
The main client class for managing stream connections and subscriptions.
class Stream {
constructor(adapterFactory: SocketAdapterFactory)
subscribeItem<TData extends { id: string }>(
streamName: string,
groupId: string,
id: string
): StreamItemSubscription<TData>
subscribeGroup<TData extends { id: string }>(
streamName: string,
groupId: string,
sortKey?: keyof TData
): StreamGroupSubscription<TData>
close(): void
}
Creates a new Stream client instance.adapterFactory
SocketAdapterFactory
required
Factory function that creates socket adapter instances.type SocketAdapterFactory = () => SocketAdapter
Methods
Subscribe to a specific item in a stream.return
StreamItemSubscription<TData>
A subscription object for the specific item.
Subscribe to all items in a group.Optional key to sort items by.
return
StreamGroupSubscription<TData>
A subscription object for the group.
Closes the stream connection and cleans up all subscriptions.
StreamItemSubscription
Subscription to a single item in a stream.
class StreamItemSubscription<TData extends { id: string }> {
getState(): TData | null
addChangeListener(listener: Listener<TData | null>): void
removeChangeListener(listener: Listener<TData | null>): void
onEvent(type: string, listener: CustomEventListener): void
offEvent(type: string, listener: CustomEventListener): void
onClose(listener: () => void): void
close(): void
}
Returns the current state of the item.The current item data or null if deleted/not yet loaded.
Adds a listener that is called whenever the item changes.listener
Listener<TData | null>
required
Callback function receiving the updated item state.type Listener<TData> = (state: TData | null) => void
Removes a previously added change listener.listener
Listener<TData | null>
required
The listener to remove.
Adds a listener for custom events.The custom event type to listen for.
listener
CustomEventListener
required
Callback function for the event.type CustomEventListener<TData> = (event: TData) => void
Removes a custom event listener.listener
CustomEventListener
required
The listener to remove.
Adds a callback to be called when the subscription is closed.
Closes the subscription and stops receiving updates.
StreamGroupSubscription
Subscription to all items in a group.
class StreamGroupSubscription<TData extends { id: string }> {
getState(): TData[]
addChangeListener(listener: Listener<TData[]>): void
removeChangeListener(listener: Listener<TData[]>): void
onEvent(type: string, listener: CustomEventListener): void
offEvent(type: string, listener: CustomEventListener): void
onClose(listener: () => void): void
close(): void
}
Returns the current state of all items in the group.Array of all items in the group.
Adds a listener that is called whenever any item in the group changes.listener
Listener<TData[]>
required
Callback function receiving the updated group state.
Removes a previously added change listener.listener
Listener<TData[]>
required
The listener to remove.
Other methods work the same as StreamItemSubscription.
Types
BaseMessage
type BaseMessage = {
streamName: string
groupId: string
id?: string
timestamp: number
}
StreamEvent
type StreamEvent<TData extends { id: string }> =
| { type: 'create'; data: TData }
| { type: 'update'; data: TData }
| { type: 'delete'; data: TData }
| { type: 'event'; event: CustomEvent }
ItemStreamEvent
type ItemStreamEvent<TData extends { id: string }> =
| StreamEvent<TData>
| { type: 'sync'; data: TData }
GroupStreamEvent
type GroupStreamEvent<TData extends { id: string }> =
| StreamEvent<TData>
| { type: 'sync'; data: TData[] }
CustomEvent
type CustomEvent = {
type: string
data: any
}
SocketAdapter
interface SocketAdapter {
send(message: string): void
onMessage(callback: (message: string) => void): void
onOpen(callback: () => void): void
onClose(callback: () => void): void
close(): void
isOpen(): boolean
}
SocketAdapterFactory
type SocketAdapterFactory = () => SocketAdapter
Usage Example
Basic Item Subscription
import { Stream } from '@motiadev/stream-client'
// Create socket adapter factory (example with WebSocket)
const adapterFactory = () => ({
ws: new WebSocket('wss://your-server.com/stream'),
send(msg) { this.ws.send(msg) },
onMessage(cb) { this.ws.onmessage = (e) => cb(e.data) },
onOpen(cb) { this.ws.onopen = cb },
onClose(cb) { this.ws.onclose = cb },
close() { this.ws.close() },
isOpen() { return this.ws.readyState === WebSocket.OPEN }
})
// Create stream client
const stream = new Stream(adapterFactory)
// Subscribe to a specific item
const subscription = stream.subscribeItem<{ id: string; message: string }>(
'chat',
'room-123',
'message-456'
)
// Listen for changes
subscription.addChangeListener((item) => {
if (item) {
console.log('Message updated:', item.message)
} else {
console.log('Message deleted')
}
})
// Get current state
const currentState = subscription.getState()
// Clean up
subscription.close()
Group Subscription with Sorting
interface Message {
id: string
text: string
timestamp: number
}
// Subscribe to all messages in a room
const groupSub = stream.subscribeGroup<Message>(
'chat',
'room-123',
'timestamp' // Sort by timestamp
)
// Listen for any changes in the group
groupSub.addChangeListener((messages) => {
console.log(`${messages.length} messages in room`)
messages.forEach(msg => {
console.log(`${msg.timestamp}: ${msg.text}`)
})
})
// Get all current messages
const allMessages = groupSub.getState()
Custom Events
const subscription = stream.subscribeItem('notifications', 'user-123', 'alerts')
// Listen for custom events
subscription.onEvent('new-notification', (data) => {
console.log('New notification:', data)
})
// Remove event listener
const handler = (data) => console.log(data)
subscription.onEvent('custom-event', handler)
subscription.offEvent('custom-event', handler)
React Integration Example
import { useEffect, useState } from 'react'
import { Stream } from '@motiadev/stream-client'
function ChatRoom({ roomId }) {
const [messages, setMessages] = useState([])
useEffect(() => {
const stream = new Stream(adapterFactory)
const subscription = stream.subscribeGroup('chat', roomId, 'timestamp')
subscription.addChangeListener(setMessages)
return () => {
subscription.close()
stream.close()
}
}, [roomId])
return (
<div>
{messages.map(msg => (
<div key={msg.id}>{msg.text}</div>
))}
</div>
)
}