Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/hypertekorg/hyperstack/llms.txt

Use this file to discover all available pages before exploring further.

HyperStack streams implement Rust’s Stream trait and provide composable operators for filtering and transforming data.

Stream Types

UseStream<T>

Simplest stream type - emits merged entity values directly (filters out deletes):
let stream: UseStream<OreRound> = hs.views.latest().listen();
// Stream<Item = T>

EntityStream<T>

Full update stream with operation types:
let stream: EntityStream<OreRound> = hs.views.latest().watch();
// Stream<Item = Update<T>>

RichEntityStream<T>

Detailed stream with before/after diffs:
let stream: RichEntityStream<OreRound> = hs.views.latest().watch_rich();
// Stream<Item = RichUpdate<T>>

Filter Operators

filter()

Filter stream items by predicate:
use futures_util::StreamExt;

// Filter rounds with high motherlode
let stream = hs.views.list()
    .listen()
    .filter(|round| round.state.motherlode.unwrap_or(0) > 1000);

while let Some(round) = stream.next().await {
    println!("High motherlode round: {:?}", round);
}
Signature: fn filter<F>(self, predicate: F) -> FilteredStream<Self, T, F>
Where: F: FnMut(&T) -> bool

filter_map()

Filter and transform in one step:
// Extract only the motherlode value from active rounds
let stream = hs.views.list()
    .listen()
    .filter_map(|round| {
        if round.state.expires_at.is_some() {
            round.state.motherlode
        } else {
            None
        }
    });

while let Some(motherlode) = stream.next().await {
    println!("Active motherlode: {}", motherlode);
}
Signature: fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, T, U, F>
Where: F: FnMut(T) -> Option<U>

map()

Transform each stream item:
// Convert rounds to their IDs
let stream = hs.views.list()
    .listen()
    .map(|round| round.id.round_id.unwrap_or(0));

while let Some(round_id) = stream.next().await {
    println!("Round ID: {}", round_id);
}
Signature: fn map<U, F>(self, f: F) -> MapStream<Self, T, U, F>
Where: F: FnMut(T) -> U

Chaining Operators

All operators return streams that support further chaining:
let stream = hs.views.list()
    .listen()
    .filter(|round| round.state.motherlode.is_some())
    .map(|round| round.state.motherlode.unwrap())
    .filter(|&motherlode| motherlode > 1000);

while let Some(motherlode) = stream.next().await {
    println!("High motherlode: {}", motherlode);
}

Working with Updates

Filter by operation type

let stream = hs.views.list()
    .watch()
    .filter(|update| !update.is_delete());

while let Some(update) = stream.next().await {
    if let Some(data) = update.data() {
        println!("Non-delete update: {:?}", data);
    }
}

Extract data from updates

let stream = hs.views.list()
    .watch()
    .filter_map(|update| update.into_data());

while let Some(round) = stream.next().await {
    println!("Round data: {:?}", round);
}

Transform updates

let stream = hs.views.list()
    .watch()
    .map(|update| update.map(|round| round.id.round_id));

while let Some(update) = stream.next().await {
    println!("Update key: {}, ID: {:?}", update.key(), update.data());
}

Working with RichUpdates

Filter by change type

let stream = hs.views.list()
    .watch_rich()
    .filter(|update| update.is_updated());

while let Some(update) = stream.next().await {
    if let RichUpdate::Updated { key, before, after, .. } = update {
        println!("Updated {}: {:?} -> {:?}", key, before, after);
    }
}

Check for specific field changes

let stream = hs.views.list()
    .watch_rich()
    .filter(|update| update.has_patch_field("motherlode"));

while let Some(update) = stream.next().await {
    println!("Motherlode changed: {:?}", update);
}

Extract patch information

let stream = hs.views.list()
    .watch_rich()
    .filter_map(|update| {
        update.patch().map(|p| (update.key().to_string(), p.clone()))
    });

while let Some((key, patch)) = stream.next().await {
    println!("Patch for {}: {:?}", key, patch);
}

Key Filtering

Filter streams to specific keys:
// Single key (StateView)
let stream = hs.views.state().listen("round_123");

// Multiple keys
let stream = hs.views.list().watch_keys(&["round_1", "round_2"]);

Stream Composition

Combining multiple streams

use futures_util::stream::{self, StreamExt};

let round_stream = hs.views.ore_round.latest().listen();
let treasury_stream = hs.views.ore_treasury.list().listen();

let mut combined = stream::select(round_stream, treasury_stream);

while let Some(either) = combined.next().await {
    // Handle either type
}

Buffering and batching

use futures_util::StreamExt;

let stream = hs.views.list()
    .listen()
    .chunks(10); // Buffer 10 items

while let Some(batch) = stream.next().await {
    println!("Batch of {} items", batch.len());
}

Advanced Patterns

Rate limiting

use futures_util::StreamExt;
use tokio::time::{interval, Duration};

let stream = hs.views.list().listen();
let mut throttle = interval(Duration::from_secs(1));

let mut throttled = stream.zip(throttle.map(|_| ()));

while let Some((item, _)) = throttled.next().await {
    println!("Item: {:?}", item);
}

Debouncing

use futures_util::StreamExt;
use tokio::time::{sleep, Duration};

let mut stream = hs.views.latest().listen();
let mut last_item = None;

loop {
    tokio::select! {
        Some(item) = stream.next() => {
            last_item = Some(item);
        }
        _ = sleep(Duration::from_millis(500)) => {
            if let Some(item) = last_item.take() {
                println!("Debounced item: {:?}", item);
            }
        }
    }
}

Error handling

use futures_util::StreamExt;

let stream = hs.views.list()
    .listen()
    .map(|round| -> Result<_, anyhow::Error> {
        // Process with potential errors
        Ok(round)
    })
    .filter_map(|result| async move {
        match result {
            Ok(item) => Some(item),
            Err(e) => {
                eprintln!("Stream error: {}", e);
                None
            }
        }
    });

Server-Side Filtering

Use builder methods for server-side filtering:
let stream = hs.views.list()
    .listen()
    .take(10)              // Server-side: limit to 10 items
    .skip(5)               // Server-side: skip first 5
    .filter("status", "active") // Server-side: filter by status
    .filter(|r| r.state.motherlode.is_some()); // Client-side: additional filter
Server-side filters:
  • take(n) - Limit results
  • skip(n) - Skip results
  • filter(key, value) - Field equality
Client-side filters:
  • .filter() - Custom predicates
  • .filter_map() - Transform and filter
  • .map() - Transform

Performance Tips

  1. Use server-side filters first - Reduce network traffic
  2. Chain operators efficiently - Avoid unnecessary allocations
  3. Use filter_map over filter().map() - Combine operations
  4. Consider buffering - Use chunks() for batch processing
  5. Handle backpressure - Use tokio channels when needed

Best Practices

  • Use listen() for simple entity streams (deletes filtered)
  • Use watch() when you need to handle all operations
  • Use watch_rich() for detailed change tracking
  • Prefer server-side filtering (take, skip, filter) when possible
  • Chain client-side operators for complex transformations
  • Handle errors gracefully in stream processing

Build docs developers (and LLMs) love