Skip to main content
Streaming allows you to process large query results one document at a time without loading everything into memory. This is essential for data exports, migrations, and batch processing.

Understanding Streaming

Streaming uses async generators to yield documents one at a time, making it memory-efficient for large datasets.

Memory Usage Comparison

// ❌ Regular get() - loads everything into memory
const allUsers = await userRepo.query().get();
// Memory: ~1MB per 1000 users with all data in memory
for (const user of allUsers) {
  await processUser(user);
}

// ✅ Streaming - processes one at a time
for await (const user of userRepo.query().stream()) {
  await processUser(user);
}
// Memory: ~1KB (only current user in memory)
Streaming is perfect for processing thousands of documents without running out of memory.

Basic Streaming

Stream All Documents

// Process all users one at a time
for await (const user of userRepo.query().stream()) {
  console.log(`Processing user: ${user.name}`);
  await sendEmail(user.email);
}

Stream with Counter

let processed = 0;

for await (const user of userRepo.query().stream()) {
  await processUser(user);
  processed++;
  
  if (processed % 100 === 0) {
    console.log(`Processed ${processed} users...`);
  }
}

console.log(`Total processed: ${processed} users`);

Streaming with Filters

Filter Before Streaming

// Stream only active users
for await (const user of userRepo.query()
  .where('status', '==', 'active')
  .stream()) {
  await sendNewsletter(user.email);
}

Multiple Filters

// Stream premium users created this month
for await (const user of userRepo.query()
  .where('plan', '==', 'premium')
  .where('createdAt', '>=', startOfMonth)
  .where('createdAt', '<=', endOfMonth)
  .stream()) {
  await sendPremiumPerks(user);
}

Ordered Streaming

// Stream users in chronological order
for await (const user of userRepo.query()
  .orderBy('createdAt', 'asc')
  .stream()) {
  console.log(`User ${user.name} joined on ${user.createdAt}`);
}

Real-World Use Cases

Email Campaign

class EmailCampaignService {
  async sendCampaign(campaignId: string) {
    let sent = 0;
    let failed = 0;
    
    console.log('Starting email campaign...');
    
    for await (const user of userRepo.query()
      .where('subscribed', '==', true)
      .where('status', '==', 'active')
      .stream()) {
      try {
        await emailService.send({
          to: user.email,
          subject: 'Special Offer',
          template: 'campaign',
          data: { name: user.name }
        });
        
        sent++;
        
        if (sent % 100 === 0) {
          console.log(`Sent ${sent} emails...`);
        }
      } catch (error) {
        console.error(`Failed to send to ${user.email}:`, error);
        failed++;
      }
    }
    
    console.log(`Campaign complete: ${sent} sent, ${failed} failed`);
    return { sent, failed };
  }
}

CSV Export

import { createWriteStream } from 'fs';

class ExportService {
  async exportUsersToCSV(filePath: string) {
    const stream = createWriteStream(filePath);
    
    // Write CSV header
    stream.write('ID,Name,Email,Status,Created At\n');
    
    let count = 0;
    
    for await (const user of userRepo.query()
      .orderBy('createdAt', 'desc')
      .stream()) {
      // Write CSV row
      stream.write(
        `${user.id},"${user.name}",${user.email},${user.status},${user.createdAt}\n`
      );
      
      count++;
      
      if (count % 1000 === 0) {
        console.log(`Exported ${count} users...`);
      }
    }
    
    stream.end();
    console.log(`Export complete: ${count} users written to ${filePath}`);
  }
}

JSON Export

import { createWriteStream } from 'fs';

class DataExportService {
  async exportToJSON(filePath: string) {
    const stream = createWriteStream(filePath);
    
    stream.write('[\n');
    
    let first = true;
    let count = 0;
    
    for await (const order of orderRepo.query()
      .where('status', '==', 'completed')
      .stream()) {
      if (!first) {
        stream.write(',\n');
      }
      
      stream.write(JSON.stringify(order, null, 2));
      first = false;
      count++;
      
      if (count % 500 === 0) {
        console.log(`Exported ${count} orders...`);
      }
    }
    
    stream.write('\n]');
    stream.end();
    
    console.log(`Exported ${count} orders`);
  }
}

Data Migration

class MigrationService {
  async migrateUserData() {
    let migrated = 0;
    let errors = 0;
    
    for await (const user of userRepo.query().stream()) {
      try {
        // Transform old data to new format
        const migratedData = {
          ...user,
          fullName: `${user.firstName} ${user.lastName}`,
          displayName: user.username || user.email.split('@')[0],
          version: 2
        };
        
        // Update in place
        await userRepo.update(user.id, migratedData);
        
        migrated++;
        
        if (migrated % 100 === 0) {
          console.log(`Migrated ${migrated} users...`);
        }
      } catch (error) {
        console.error(`Failed to migrate user ${user.id}:`, error);
        errors++;
      }
    }
    
    console.log(`Migration complete: ${migrated} migrated, ${errors} errors`);
  }
}

Batch Processing with Delays

class BatchProcessor {
  async processWithRateLimit() {
    let processed = 0;
    const batchSize = 10;
    const delayMs = 1000; // 1 second delay between batches
    
    let batch: User[] = [];
    
    for await (const user of userRepo.query().stream()) {
      batch.push(user);
      
      if (batch.length >= batchSize) {
        await this.processBatch(batch);
        processed += batch.length;
        console.log(`Processed ${processed} users`);
        
        batch = [];
        
        // Wait before next batch
        await new Promise(resolve => setTimeout(resolve, delayMs));
      }
    }
    
    // Process remaining items
    if (batch.length > 0) {
      await this.processBatch(batch);
      processed += batch.length;
    }
    
    console.log(`Total processed: ${processed} users`);
  }
  
  private async processBatch(users: User[]) {
    // Process batch of users
    for (const user of users) {
      await externalAPI.syncUser(user);
    }
  }
}

Audit Log Generation

class AuditService {
  async generateAuditReport(startDate: string, endDate: string) {
    const report = {
      users: 0,
      orders: 0,
      totalRevenue: 0,
      byStatus: {} as Record<string, number>
    };
    
    // Process users
    for await (const user of userRepo.query()
      .where('createdAt', '>=', startDate)
      .where('createdAt', '<=', endDate)
      .stream()) {
      report.users++;
    }
    
    // Process orders
    for await (const order of orderRepo.query()
      .where('createdAt', '>=', startDate)
      .where('createdAt', '<=', endDate)
      .stream()) {
      report.orders++;
      
      if (order.status === 'completed') {
        report.totalRevenue += order.total;
      }
      
      report.byStatus[order.status] = 
        (report.byStatus[order.status] || 0) + 1;
    }
    
    return report;
  }
}

Advanced Streaming Patterns

Parallel Processing with Streaming

class ParallelProcessor {
  async processInParallel(concurrency: number = 5) {
    const queue: Promise<void>[] = [];
    let processed = 0;
    
    for await (const user of userRepo.query().stream()) {
      // Add to queue
      const promise = this.processUser(user).then(() => {
        processed++;
        if (processed % 100 === 0) {
          console.log(`Processed ${processed} users`);
        }
      });
      
      queue.push(promise);
      
      // Wait if queue is full
      if (queue.length >= concurrency) {
        await Promise.race(queue);
        // Remove completed promises
        queue.splice(
          queue.findIndex(p => p === promise),
          1
        );
      }
    }
    
    // Wait for remaining
    await Promise.all(queue);
    console.log(`Total processed: ${processed}`);
  }
  
  private async processUser(user: User) {
    await externalAPI.syncUser(user);
  }
}

Streaming with Error Collection

class RobustProcessor {
  async processWithErrorTracking() {
    const errors: Array<{ userId: string; error: string }> = [];
    let processed = 0;
    
    for await (const user of userRepo.query().stream()) {
      try {
        await processUser(user);
        processed++;
      } catch (error) {
        errors.push({
          userId: user.id,
          error: error.message
        });
      }
    }
    
    console.log(`Processed: ${processed}, Errors: ${errors.length}`);
    
    if (errors.length > 0) {
      // Write errors to file
      await fs.writeFile(
        'errors.json',
        JSON.stringify(errors, null, 2)
      );
    }
    
    return { processed, errors: errors.length };
  }
}

Streaming with Progress Tracking

class ProgressTracker {
  async processWithProgress() {
    // Get total count first (optional)
    const total = await userRepo.query().count();
    
    let processed = 0;
    const startTime = Date.now();
    
    for await (const user of userRepo.query().stream()) {
      await processUser(user);
      processed++;
      
      // Calculate progress
      const progress = (processed / total) * 100;
      const elapsed = (Date.now() - startTime) / 1000;
      const rate = processed / elapsed;
      const remaining = (total - processed) / rate;
      
      console.log(
        `Progress: ${progress.toFixed(1)}% ` +
        `(${processed}/${total}) ` +
        `Rate: ${rate.toFixed(1)}/s ` +
        `ETA: ${remaining.toFixed(0)}s`
      );
    }
  }
}

Streaming vs Get: When to Use Each

Use Streaming When:

Large Datasets

Processing thousands of documents

Memory Constrained

Running in limited memory environments

Long Operations

Processing takes time per document

Data Export

Exporting data to files

Use Get When:

// ✅ Small result sets
const recentUsers = await userRepo.query()
  .orderBy('createdAt', 'desc')
  .limit(10)
  .get();

// ✅ Need all data in memory
const allUsers = await userRepo.query().get();
const userMap = new Map(users.map(u => [u.id, u]));

// ✅ Need array operations
const users = await userRepo.query().get();
const sorted = users.sort((a, b) => b.score - a.score);

Performance Considerations

Firestore Costs

// Both cost the same in document reads

// Stream 10,000 documents
for await (const doc of repo.query().stream()) {
  await process(doc);
}
// Cost: 10,000 document reads
// Memory: ~1KB at a time

// Get 10,000 documents
const docs = await repo.query().get();
for (const doc of docs) {
  await process(doc);
}
// Cost: 10,000 document reads (same)
// Memory: ~10MB all at once
Streaming and get() have the same Firestore read costs. The difference is memory usage and processing patterns.

Network Efficiency

// Streaming fetches results in batches internally
for await (const doc of repo.query().stream()) {
  // Firestore SDK fetches ~100-500 docs at a time
  // Yields them one by one
  await process(doc);
}

Error Handling

Graceful Error Handling

try {
  for await (const user of userRepo.query().stream()) {
    try {
      await processUser(user);
    } catch (error) {
      // Log but continue processing
      console.error(`Error processing user ${user.id}:`, error);
    }
  }
} catch (error) {
  // Fatal error with stream itself
  console.error('Stream error:', error);
}

Stop Streaming on Error

for await (const user of userRepo.query().stream()) {
  const result = await processUser(user);
  
  if (result.criticalError) {
    console.error('Critical error, stopping stream');
    break; // Stop processing
  }
}

Best Practices

1

Use Streaming for Large Datasets

Always use streaming when processing >1000 documents.
// ✅ Good - memory efficient
for await (const doc of repo.query().stream()) {
  await process(doc);
}

// ❌ Bad - loads 10,000 docs in memory
const docs = await repo.query().get();
for (const doc of docs) {
  await process(doc);
}
2

Add Progress Logging

Keep track of progress for long-running operations.
let processed = 0;
for await (const doc of repo.query().stream()) {
  await process(doc);
  processed++;
  if (processed % 100 === 0) {
    console.log(`Processed ${processed}...`);
  }
}
3

Handle Errors Gracefully

Don’t let one error stop the entire stream.
for await (const doc of repo.query().stream()) {
  try {
    await process(doc);
  } catch (error) {
    console.error(`Error processing ${doc.id}:`, error);
    // Continue processing
  }
}
4

Filter Before Streaming

Use query filters to reduce documents processed.
// ✅ Filter in query
for await (const user of userRepo.query()
  .where('status', '==', 'active')
  .stream()) {
  await process(user);
}

// ❌ Filter in code
for await (const user of userRepo.query().stream()) {
  if (user.status === 'active') {
    await process(user);
  }
}

Next Steps

Queries

Build queries to filter streamed data

Pagination

Alternative approach for processing large datasets

Bulk Operations

Batch operations for efficiency

Performance

Optimize streaming performance

Build docs developers (and LLMs) love