The Streaming Helper provides utilities for creating streaming responses, including Server-Sent Events (SSE) and text streaming.
Import
import { stream, streamSSE, streamText } from 'hono/streaming'
import { SSEStreamingApi } from 'hono/streaming'
import type { SSEMessage } from 'hono/streaming'
Functions
stream()
Creates a streaming response with full control over the stream.
function stream(
c: Context,
cb: (stream: StreamingApi) => Promise<void>,
onError?: (e: Error, stream: StreamingApi) => Promise<void>
): Response
cb
(stream: StreamingApi) => Promise<void>
required
Callback function that receives a StreamingApi instance to write data to the stream
onError
(e: Error, stream: StreamingApi) => Promise<void>
Optional error handler called when an error occurs during streaming
A Response object with a readable stream
Example
import { stream } from 'hono/streaming'
app.get('/stream', (c) => {
return stream(c, async (stream) => {
// Write data to stream
await stream.write('Hello ')
await stream.write('World')
// Stream will auto-close when callback completes
})
})
// With error handling
app.get('/stream-with-error', (c) => {
return stream(
c,
async (stream) => {
for (let i = 0; i < 10; i++) {
await stream.write(`Chunk ${i}\n`)
await stream.sleep(1000) // Wait 1 second
}
},
async (err, stream) => {
console.error('Stream error:', err)
await stream.write('Error occurred')
}
)
})
streamSSE()
Creates a Server-Sent Events (SSE) streaming response.
function streamSSE(
c: Context,
cb: (stream: SSEStreamingApi) => Promise<void>,
onError?: (e: Error, stream: SSEStreamingApi) => Promise<void>
): Response
cb
(stream: SSEStreamingApi) => Promise<void>
required
Callback function that receives an SSEStreamingApi instance
onError
(e: Error, stream: SSEStreamingApi) => Promise<void>
Optional error handler
A Response object with SSE headers and streaming body
Example
import { streamSSE } from 'hono/streaming'
app.get('/sse', (c) => {
return streamSSE(c, async (stream) => {
let id = 0
while (true) {
// Send SSE message
await stream.writeSSE({
data: JSON.stringify({ time: new Date().toISOString() }),
event: 'time-update',
id: String(id++)
})
await stream.sleep(1000)
// Check if client disconnected
if (stream.closed) {
break
}
}
})
})
streamText()
Creates a text streaming response with appropriate headers.
function streamText(
c: Context,
cb: (stream: StreamingApi) => Promise<void>,
onError?: (e: Error, stream: StreamingApi) => Promise<void>
): Response
cb
(stream: StreamingApi) => Promise<void>
required
Callback function that receives a StreamingApi instance
onError
(e: Error, stream: StreamingApi) => Promise<void>
Optional error handler
A Response object with text/plain content type and streaming body
Example
import { streamText } from 'hono/streaming'
app.get('/text-stream', (c) => {
return streamText(c, async (stream) => {
const lines = [
'First line',
'Second line',
'Third line'
]
for (const line of lines) {
await stream.writeln(line)
await stream.sleep(500)
}
})
})
Classes
StreamingApi
API for controlling a stream.
Methods:
write(input: string | Uint8Array): Promise<void> - Write data to stream
writeln(input: string): Promise<void> - Write data with newline
sleep(ms: number): Promise<void> - Sleep for specified milliseconds
close(): Promise<void> - Close the stream
abort(): void - Abort the stream
Properties:
closed: boolean - Whether the stream is closed
SSEStreamingApi
Extends StreamingApi with SSE-specific methods.
Methods:
writeSSE(message: SSEMessage): Promise<void> - Write an SSE message
- All methods from
StreamingApi
Types
SSEMessage
interface SSEMessage {
data: string | Promise<string>
event?: string
id?: string
retry?: number
}
data
string | Promise<string>
required
The message data. Can be a string or Promise that resolves to a string.
Optional event type. Cannot contain \r or \n.
Optional message ID. Cannot contain \r or \n.
Optional reconnection time in milliseconds
Examples
Real-time Updates
import { streamSSE } from 'hono/streaming'
app.get('/updates', (c) => {
return streamSSE(c, async (stream) => {
const updates = getUpdatesChannel()
for await (const update of updates) {
await stream.writeSSE({
data: JSON.stringify(update),
event: 'update',
id: update.id
})
if (stream.closed) break
}
})
})
Progress Streaming
app.post('/process', (c) => {
return streamText(c, async (stream) => {
await stream.writeln('Starting process...')
for (let i = 0; i <= 100; i += 10) {
await stream.writeln(`Progress: ${i}%`)
await stream.sleep(500)
}
await stream.writeln('Complete!')
})
})
AI Streaming Response
import { streamText } from 'hono/streaming'
app.post('/ai/chat', async (c) => {
const { prompt } = await c.req.json()
return streamText(c, async (stream) => {
const response = await callAIService(prompt)
// Stream response token by token
for await (const token of response) {
await stream.write(token)
}
})
})
SSE with Error Recovery
app.get('/events', (c) => {
return streamSSE(
c,
async (stream) => {
let lastId = 0
while (!stream.closed) {
try {
const event = await fetchNextEvent()
await stream.writeSSE({
data: JSON.stringify(event),
id: String(++lastId),
event: 'message'
})
} catch (error) {
throw error // Will be caught by onError
}
}
},
async (err, stream) => {
// Send error event to client
await stream.writeSSE({
event: 'error',
data: err.message
})
}
)
})
streamSSE() automatically sets:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
streamText() automatically sets:
Content-Type: text/plain
X-Content-Type-Options: nosniff
Transfer-Encoding: chunked