Streaming allows you to process large query results one document at a time without loading everything into memory. This is essential for data exports, migrations, and batch processing.
Understanding Streaming
Streaming uses async generators to yield documents one at a time, making it memory-efficient for large datasets.
Memory Usage Comparison
// ❌ Regular get() - loads everything into memory
const allUsers = await userRepo . query (). get ();
// Memory: ~1MB per 1000 users with all data in memory
for ( const user of allUsers ) {
await processUser ( user );
}
// ✅ Streaming - processes one at a time
for await ( const user of userRepo . query (). stream ()) {
await processUser ( user );
}
// Memory: ~1KB (only current user in memory)
Streaming is perfect for processing thousands of documents without running out of memory.
Basic Streaming
Stream All Documents
// Process all users one at a time
for await ( const user of userRepo . query (). stream ()) {
console . log ( `Processing user: ${ user . name } ` );
await sendEmail ( user . email );
}
Stream with Counter
let processed = 0 ;
for await ( const user of userRepo . query (). stream ()) {
await processUser ( user );
processed ++ ;
if ( processed % 100 === 0 ) {
console . log ( `Processed ${ processed } users...` );
}
}
console . log ( `Total processed: ${ processed } users` );
Streaming with Filters
Filter Before Streaming
// Stream only active users
for await ( const user of userRepo . query ()
. where ( 'status' , '==' , 'active' )
. stream ()) {
await sendNewsletter ( user . email );
}
Multiple Filters
// Stream premium users created this month
for await ( const user of userRepo . query ()
. where ( 'plan' , '==' , 'premium' )
. where ( 'createdAt' , '>=' , startOfMonth )
. where ( 'createdAt' , '<=' , endOfMonth )
. stream ()) {
await sendPremiumPerks ( user );
}
Ordered Streaming
// Stream users in chronological order
for await ( const user of userRepo . query ()
. orderBy ( 'createdAt' , 'asc' )
. stream ()) {
console . log ( `User ${ user . name } joined on ${ user . createdAt } ` );
}
Real-World Use Cases
Email Campaign
class EmailCampaignService {
async sendCampaign ( campaignId : string ) {
let sent = 0 ;
let failed = 0 ;
console . log ( 'Starting email campaign...' );
for await ( const user of userRepo . query ()
. where ( 'subscribed' , '==' , true )
. where ( 'status' , '==' , 'active' )
. stream ()) {
try {
await emailService . send ({
to: user . email ,
subject: 'Special Offer' ,
template: 'campaign' ,
data: { name: user . name }
});
sent ++ ;
if ( sent % 100 === 0 ) {
console . log ( `Sent ${ sent } emails...` );
}
} catch ( error ) {
console . error ( `Failed to send to ${ user . email } :` , error );
failed ++ ;
}
}
console . log ( `Campaign complete: ${ sent } sent, ${ failed } failed` );
return { sent , failed };
}
}
CSV Export
import { createWriteStream } from 'fs' ;
class ExportService {
async exportUsersToCSV ( filePath : string ) {
const stream = createWriteStream ( filePath );
// Write CSV header
stream . write ( 'ID,Name,Email,Status,Created At \n ' );
let count = 0 ;
for await ( const user of userRepo . query ()
. orderBy ( 'createdAt' , 'desc' )
. stream ()) {
// Write CSV row
stream . write (
` ${ user . id } ," ${ user . name } ", ${ user . email } , ${ user . status } , ${ user . createdAt } \n `
);
count ++ ;
if ( count % 1000 === 0 ) {
console . log ( `Exported ${ count } users...` );
}
}
stream . end ();
console . log ( `Export complete: ${ count } users written to ${ filePath } ` );
}
}
JSON Export
import { createWriteStream } from 'fs' ;
class DataExportService {
async exportToJSON ( filePath : string ) {
const stream = createWriteStream ( filePath );
stream . write ( '[ \n ' );
let first = true ;
let count = 0 ;
for await ( const order of orderRepo . query ()
. where ( 'status' , '==' , 'completed' )
. stream ()) {
if ( ! first ) {
stream . write ( ', \n ' );
}
stream . write ( JSON . stringify ( order , null , 2 ));
first = false ;
count ++ ;
if ( count % 500 === 0 ) {
console . log ( `Exported ${ count } orders...` );
}
}
stream . write ( ' \n ]' );
stream . end ();
console . log ( `Exported ${ count } orders` );
}
}
Data Migration
class MigrationService {
async migrateUserData () {
let migrated = 0 ;
let errors = 0 ;
for await ( const user of userRepo . query (). stream ()) {
try {
// Transform old data to new format
const migratedData = {
... user ,
fullName: ` ${ user . firstName } ${ user . lastName } ` ,
displayName: user . username || user . email . split ( '@' )[ 0 ],
version: 2
};
// Update in place
await userRepo . update ( user . id , migratedData );
migrated ++ ;
if ( migrated % 100 === 0 ) {
console . log ( `Migrated ${ migrated } users...` );
}
} catch ( error ) {
console . error ( `Failed to migrate user ${ user . id } :` , error );
errors ++ ;
}
}
console . log ( `Migration complete: ${ migrated } migrated, ${ errors } errors` );
}
}
Batch Processing with Delays
class BatchProcessor {
async processWithRateLimit () {
let processed = 0 ;
const batchSize = 10 ;
const delayMs = 1000 ; // 1 second delay between batches
let batch : User [] = [];
for await ( const user of userRepo . query (). stream ()) {
batch . push ( user );
if ( batch . length >= batchSize ) {
await this . processBatch ( batch );
processed += batch . length ;
console . log ( `Processed ${ processed } users` );
batch = [];
// Wait before next batch
await new Promise ( resolve => setTimeout ( resolve , delayMs ));
}
}
// Process remaining items
if ( batch . length > 0 ) {
await this . processBatch ( batch );
processed += batch . length ;
}
console . log ( `Total processed: ${ processed } users` );
}
private async processBatch ( users : User []) {
// Process batch of users
for ( const user of users ) {
await externalAPI . syncUser ( user );
}
}
}
Audit Log Generation
class AuditService {
async generateAuditReport ( startDate : string , endDate : string ) {
const report = {
users: 0 ,
orders: 0 ,
totalRevenue: 0 ,
byStatus: {} as Record < string , number >
};
// Process users
for await ( const user of userRepo . query ()
. where ( 'createdAt' , '>=' , startDate )
. where ( 'createdAt' , '<=' , endDate )
. stream ()) {
report . users ++ ;
}
// Process orders
for await ( const order of orderRepo . query ()
. where ( 'createdAt' , '>=' , startDate )
. where ( 'createdAt' , '<=' , endDate )
. stream ()) {
report . orders ++ ;
if ( order . status === 'completed' ) {
report . totalRevenue += order . total ;
}
report . byStatus [ order . status ] =
( report . byStatus [ order . status ] || 0 ) + 1 ;
}
return report ;
}
}
Advanced Streaming Patterns
Parallel Processing with Streaming
class ParallelProcessor {
async processInParallel ( concurrency : number = 5 ) {
const queue : Promise < void >[] = [];
let processed = 0 ;
for await ( const user of userRepo . query (). stream ()) {
// Add to queue
const promise = this . processUser ( user ). then (() => {
processed ++ ;
if ( processed % 100 === 0 ) {
console . log ( `Processed ${ processed } users` );
}
});
queue . push ( promise );
// Wait if queue is full
if ( queue . length >= concurrency ) {
await Promise . race ( queue );
// Remove completed promises
queue . splice (
queue . findIndex ( p => p === promise ),
1
);
}
}
// Wait for remaining
await Promise . all ( queue );
console . log ( `Total processed: ${ processed } ` );
}
private async processUser ( user : User ) {
await externalAPI . syncUser ( user );
}
}
Streaming with Error Collection
class RobustProcessor {
async processWithErrorTracking () {
const errors : Array <{ userId : string ; error : string }> = [];
let processed = 0 ;
for await ( const user of userRepo . query (). stream ()) {
try {
await processUser ( user );
processed ++ ;
} catch ( error ) {
errors . push ({
userId: user . id ,
error: error . message
});
}
}
console . log ( `Processed: ${ processed } , Errors: ${ errors . length } ` );
if ( errors . length > 0 ) {
// Write errors to file
await fs . writeFile (
'errors.json' ,
JSON . stringify ( errors , null , 2 )
);
}
return { processed , errors: errors . length };
}
}
Streaming with Progress Tracking
class ProgressTracker {
async processWithProgress () {
// Get total count first (optional)
const total = await userRepo . query (). count ();
let processed = 0 ;
const startTime = Date . now ();
for await ( const user of userRepo . query (). stream ()) {
await processUser ( user );
processed ++ ;
// Calculate progress
const progress = ( processed / total ) * 100 ;
const elapsed = ( Date . now () - startTime ) / 1000 ;
const rate = processed / elapsed ;
const remaining = ( total - processed ) / rate ;
console . log (
`Progress: ${ progress . toFixed ( 1 ) } % ` +
`( ${ processed } / ${ total } ) ` +
`Rate: ${ rate . toFixed ( 1 ) } /s ` +
`ETA: ${ remaining . toFixed ( 0 ) } s`
);
}
}
}
Streaming vs Get: When to Use Each
Use Streaming When:
Large Datasets Processing thousands of documents
Memory Constrained Running in limited memory environments
Long Operations Processing takes time per document
Data Export Exporting data to files
Use Get When:
// ✅ Small result sets
const recentUsers = await userRepo . query ()
. orderBy ( 'createdAt' , 'desc' )
. limit ( 10 )
. get ();
// ✅ Need all data in memory
const allUsers = await userRepo . query (). get ();
const userMap = new Map ( users . map ( u => [ u . id , u ]));
// ✅ Need array operations
const users = await userRepo . query (). get ();
const sorted = users . sort (( a , b ) => b . score - a . score );
Firestore Costs
// Both cost the same in document reads
// Stream 10,000 documents
for await ( const doc of repo . query (). stream ()) {
await process ( doc );
}
// Cost: 10,000 document reads
// Memory: ~1KB at a time
// Get 10,000 documents
const docs = await repo . query (). get ();
for ( const doc of docs ) {
await process ( doc );
}
// Cost: 10,000 document reads (same)
// Memory: ~10MB all at once
Streaming and get() have the same Firestore read costs. The difference is memory usage and processing patterns.
Network Efficiency
// Streaming fetches results in batches internally
for await ( const doc of repo . query (). stream ()) {
// Firestore SDK fetches ~100-500 docs at a time
// Yields them one by one
await process ( doc );
}
Error Handling
Graceful Error Handling
try {
for await ( const user of userRepo . query (). stream ()) {
try {
await processUser ( user );
} catch ( error ) {
// Log but continue processing
console . error ( `Error processing user ${ user . id } :` , error );
}
}
} catch ( error ) {
// Fatal error with stream itself
console . error ( 'Stream error:' , error );
}
Stop Streaming on Error
for await ( const user of userRepo . query (). stream ()) {
const result = await processUser ( user );
if ( result . criticalError ) {
console . error ( 'Critical error, stopping stream' );
break ; // Stop processing
}
}
Best Practices
Use Streaming for Large Datasets
Always use streaming when processing >1000 documents. // ✅ Good - memory efficient
for await ( const doc of repo . query (). stream ()) {
await process ( doc );
}
// ❌ Bad - loads 10,000 docs in memory
const docs = await repo . query (). get ();
for ( const doc of docs ) {
await process ( doc );
}
Add Progress Logging
Keep track of progress for long-running operations. let processed = 0 ;
for await ( const doc of repo . query (). stream ()) {
await process ( doc );
processed ++ ;
if ( processed % 100 === 0 ) {
console . log ( `Processed ${ processed } ...` );
}
}
Handle Errors Gracefully
Don’t let one error stop the entire stream. for await ( const doc of repo . query (). stream ()) {
try {
await process ( doc );
} catch ( error ) {
console . error ( `Error processing ${ doc . id } :` , error );
// Continue processing
}
}
Filter Before Streaming
Use query filters to reduce documents processed. // ✅ Filter in query
for await ( const user of userRepo . query ()
. where ( 'status' , '==' , 'active' )
. stream ()) {
await process ( user );
}
// ❌ Filter in code
for await ( const user of userRepo . query (). stream ()) {
if ( user . status === 'active' ) {
await process ( user );
}
}
Next Steps
Queries Build queries to filter streamed data
Pagination Alternative approach for processing large datasets
Bulk Operations Batch operations for efficiency
Performance Optimize streaming performance