Documentation Index
Fetch the complete documentation index at: https://mintlify.com/hypertekorg/hyperstack/llms.txt
Use this file to discover all available pages before exploring further.
HyperStack streams implement Rust’s Stream trait and provide composable operators for filtering and transforming data.
Stream Types
UseStream<T>
Simplest stream type - emits merged entity values directly (filters out deletes):
let stream: UseStream<OreRound> = hs.views.latest().listen();
// Stream<Item = T>
EntityStream<T>
Full update stream with operation types:
let stream: EntityStream<OreRound> = hs.views.latest().watch();
// Stream<Item = Update<T>>
RichEntityStream<T>
Detailed stream with before/after diffs:
let stream: RichEntityStream<OreRound> = hs.views.latest().watch_rich();
// Stream<Item = RichUpdate<T>>
Filter Operators
filter()
Filter stream items by predicate:
use futures_util::StreamExt;
// Filter rounds with high motherlode
let stream = hs.views.list()
.listen()
.filter(|round| round.state.motherlode.unwrap_or(0) > 1000);
while let Some(round) = stream.next().await {
println!("High motherlode round: {:?}", round);
}
Signature: fn filter<F>(self, predicate: F) -> FilteredStream<Self, T, F>
Where: F: FnMut(&T) -> bool
filter_map()
Filter and transform in one step:
// Extract only the motherlode value from active rounds
let stream = hs.views.list()
.listen()
.filter_map(|round| {
if round.state.expires_at.is_some() {
round.state.motherlode
} else {
None
}
});
while let Some(motherlode) = stream.next().await {
println!("Active motherlode: {}", motherlode);
}
Signature: fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, T, U, F>
Where: F: FnMut(T) -> Option<U>
map()
Transform each stream item:
// Convert rounds to their IDs
let stream = hs.views.list()
.listen()
.map(|round| round.id.round_id.unwrap_or(0));
while let Some(round_id) = stream.next().await {
println!("Round ID: {}", round_id);
}
Signature: fn map<U, F>(self, f: F) -> MapStream<Self, T, U, F>
Where: F: FnMut(T) -> U
Chaining Operators
All operators return streams that support further chaining:
let stream = hs.views.list()
.listen()
.filter(|round| round.state.motherlode.is_some())
.map(|round| round.state.motherlode.unwrap())
.filter(|&motherlode| motherlode > 1000);
while let Some(motherlode) = stream.next().await {
println!("High motherlode: {}", motherlode);
}
Working with Updates
Filter by operation type
let stream = hs.views.list()
.watch()
.filter(|update| !update.is_delete());
while let Some(update) = stream.next().await {
if let Some(data) = update.data() {
println!("Non-delete update: {:?}", data);
}
}
let stream = hs.views.list()
.watch()
.filter_map(|update| update.into_data());
while let Some(round) = stream.next().await {
println!("Round data: {:?}", round);
}
let stream = hs.views.list()
.watch()
.map(|update| update.map(|round| round.id.round_id));
while let Some(update) = stream.next().await {
println!("Update key: {}, ID: {:?}", update.key(), update.data());
}
Working with RichUpdates
Filter by change type
let stream = hs.views.list()
.watch_rich()
.filter(|update| update.is_updated());
while let Some(update) = stream.next().await {
if let RichUpdate::Updated { key, before, after, .. } = update {
println!("Updated {}: {:?} -> {:?}", key, before, after);
}
}
Check for specific field changes
let stream = hs.views.list()
.watch_rich()
.filter(|update| update.has_patch_field("motherlode"));
while let Some(update) = stream.next().await {
println!("Motherlode changed: {:?}", update);
}
let stream = hs.views.list()
.watch_rich()
.filter_map(|update| {
update.patch().map(|p| (update.key().to_string(), p.clone()))
});
while let Some((key, patch)) = stream.next().await {
println!("Patch for {}: {:?}", key, patch);
}
Key Filtering
Filter streams to specific keys:
// Single key (StateView)
let stream = hs.views.state().listen("round_123");
// Multiple keys
let stream = hs.views.list().watch_keys(&["round_1", "round_2"]);
Stream Composition
Combining multiple streams
use futures_util::stream::{self, StreamExt};
let round_stream = hs.views.ore_round.latest().listen();
let treasury_stream = hs.views.ore_treasury.list().listen();
let mut combined = stream::select(round_stream, treasury_stream);
while let Some(either) = combined.next().await {
// Handle either type
}
Buffering and batching
use futures_util::StreamExt;
let stream = hs.views.list()
.listen()
.chunks(10); // Buffer 10 items
while let Some(batch) = stream.next().await {
println!("Batch of {} items", batch.len());
}
Advanced Patterns
Rate limiting
use futures_util::StreamExt;
use tokio::time::{interval, Duration};
let stream = hs.views.list().listen();
let mut throttle = interval(Duration::from_secs(1));
let mut throttled = stream.zip(throttle.map(|_| ()));
while let Some((item, _)) = throttled.next().await {
println!("Item: {:?}", item);
}
Debouncing
use futures_util::StreamExt;
use tokio::time::{sleep, Duration};
let mut stream = hs.views.latest().listen();
let mut last_item = None;
loop {
tokio::select! {
Some(item) = stream.next() => {
last_item = Some(item);
}
_ = sleep(Duration::from_millis(500)) => {
if let Some(item) = last_item.take() {
println!("Debounced item: {:?}", item);
}
}
}
}
Error handling
use futures_util::StreamExt;
let stream = hs.views.list()
.listen()
.map(|round| -> Result<_, anyhow::Error> {
// Process with potential errors
Ok(round)
})
.filter_map(|result| async move {
match result {
Ok(item) => Some(item),
Err(e) => {
eprintln!("Stream error: {}", e);
None
}
}
});
Server-Side Filtering
Use builder methods for server-side filtering:
let stream = hs.views.list()
.listen()
.take(10) // Server-side: limit to 10 items
.skip(5) // Server-side: skip first 5
.filter("status", "active") // Server-side: filter by status
.filter(|r| r.state.motherlode.is_some()); // Client-side: additional filter
Server-side filters:
take(n) - Limit results
skip(n) - Skip results
filter(key, value) - Field equality
Client-side filters:
.filter() - Custom predicates
.filter_map() - Transform and filter
.map() - Transform
- Use server-side filters first - Reduce network traffic
- Chain operators efficiently - Avoid unnecessary allocations
- Use
filter_map over filter().map() - Combine operations
- Consider buffering - Use
chunks() for batch processing
- Handle backpressure - Use tokio channels when needed
Best Practices
- Use
listen() for simple entity streams (deletes filtered)
- Use
watch() when you need to handle all operations
- Use
watch_rich() for detailed change tracking
- Prefer server-side filtering (
take, skip, filter) when possible
- Chain client-side operators for complex transformations
- Handle errors gracefully in stream processing