Skip to main content
Aggregation pipelines allow you to process and analyze documents through a series of stages, similar to MongoDB’s aggregation framework.

Pipeline basics

What is aggregation?

Aggregation pipelines transform documents through multiple stages:
  1. Match - Filter documents
  2. Group - Group by field and calculate aggregates
  3. Sort - Order results
  4. Limit/Skip - Paginate results
  5. Project - Select or exclude fields
Each stage processes the output of the previous stage.

Simple aggregation

use jasonisnthappy::Database;
use serde_json::json;

let db = Database::open("my.db")?;
let users = db.collection("users");

// Count users by city
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .execute()?;

for result in results {
    println!("{}: {} users", result["_id"], result["total"]);
}

Match stage

Filter documents before processing (like find).
// Only aggregate active users
let results = users.aggregate()
    .match_("status is \"active\"")
    .group_by("city")
    .count("total")
    .execute()?;
Use match_ early in the pipeline to reduce the number of documents processed in later stages.

Group stage

Group documents by field values and apply aggregation functions.

Count

Count documents in each group.
let results = users.aggregate()
    .group_by("city")
    .count("user_count")
    .execute()?;

// Output:
// { "_id": "NYC", "user_count": 150 }
// { "_id": "LA", "user_count": 89 }

Sum

Sum numeric values across the group.
let sales = db.collection("sales");

// Total revenue by product
let results = sales.aggregate()
    .group_by("product_id")
    .sum("amount", "total_revenue")
    .execute()?;

// Output:
// { "_id": "prod_123", "total_revenue": 15420.50 }

Average

Calculate the mean of numeric values.
// Average age by city
let results = users.aggregate()
    .group_by("city")
    .avg("age", "avg_age")
    .execute()?;

// Output:
// { "_id": "NYC", "avg_age": 32.5 }
// { "_id": "LA", "avg_age": 28.3 }

Min and Max

Find minimum and maximum values.
// Youngest and oldest user by city
let results = users.aggregate()
    .group_by("city")
    .min("age", "youngest")
    .max("age", "oldest")
    .execute()?;

// Output:
// { "_id": "NYC", "youngest": 22, "oldest": 65 }

Multiple accumulators

Combine multiple aggregation functions.
let results = sales.aggregate()
    .group_by("product_id")
    .count("num_sales")
    .sum("amount", "total_revenue")
    .avg("amount", "avg_sale_price")
    .min("amount", "min_sale")
    .max("amount", "max_sale")
    .execute()?;

// Output:
// {
//   "_id": "prod_123",
//   "num_sales": 45,
//   "total_revenue": 15420.50,
//   "avg_sale_price": 342.68,
//   "min_sale": 29.99,
//   "max_sale": 999.99
// }

Sort stage

Order results by field values.
// Top cities by user count
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .sort("total", false)  // false = descending
    .limit(10)
    .execute()?;

// Ascending sort
let results = users.aggregate()
    .group_by("age")
    .count("count")
    .sort("age", true)  // true = ascending
    .execute()?;

Limit and Skip stages

Implement pagination on aggregated results.
let page_size = 20;
let page = 2;

let results = users.aggregate()
    .group_by("city")
    .count("total")
    .sort("total", false)
    .skip(page * page_size)
    .limit(page_size)
    .execute()?;

Project stage

Select or exclude fields from results.
let results = users.aggregate()
    .match_("age > 25")
    .project(&["name", "email", "city"])
    .execute()?;

// Only returns name, email, city, and _id

Complete pipeline example

Combine multiple stages for complex analysis.
use jasonisnthappy::Database;
use serde_json::json;

let db = Database::open("analytics.db")?;
let orders = db.collection("orders");

// Analyze sales by region for premium customers
let results = orders.aggregate()
    // Stage 1: Filter to completed orders from premium customers
    .match_("status is \"completed\" and customer_tier is \"premium\"")
    // Stage 2: Group by region and calculate metrics
    .group_by("region")
    .count("num_orders")
    .sum("total_amount", "revenue")
    .avg("total_amount", "avg_order_value")
    // Stage 3: Sort by revenue (highest first)
    .sort("revenue", false)
    // Stage 4: Top 10 regions only
    .limit(10)
    // Stage 5: Clean output (exclude internal fields)
    .exclude(&["_internal"])
    .execute()?;

for region in results {
    println!("Region: {}", region["_id"]);
    println!("  Orders: {}", region["num_orders"]);
    println!("  Revenue: ${:.2}", region["revenue"].as_f64().unwrap());
    println!("  Avg Order: ${:.2}", region["avg_order_value"].as_f64().unwrap());
    println!();
}

Real-world examples

E-commerce analytics

1
Daily sales report
2
let daily_sales = orders.aggregate()
    .match_("created_at >= \"2024-01-01\" and status is \"completed\"")
    .group_by("date")  // Assuming date field
    .count("orders")
    .sum("amount", "revenue")
    .avg("amount", "avg_order")
    .sort("date", true)
    .execute()?;
3
Top products
4
let top_products = order_items.aggregate()
    .group_by("product_id")
    .count("units_sold")
    .sum("price", "revenue")
    .sort("revenue", false)
    .limit(20)
    .execute()?;
5
Customer lifetime value
6
let customer_ltv = orders.aggregate()
    .match_("status is \"completed\"")
    .group_by("customer_id")
    .count("num_orders")
    .sum("total_amount", "lifetime_value")
    .avg("total_amount", "avg_order_value")
    .sort("lifetime_value", false)
    .execute()?;

User analytics

// Active users by signup month
let signups = users.aggregate()
    .match_("status is \"active\"")
    .group_by("signup_month")
    .count("new_users")
    .sort("signup_month", true)
    .execute()?;

// User distribution by age group
let age_distribution = users.aggregate()
    .group_by("age_group")  // e.g., "18-24", "25-34", etc.
    .count("users")
    .sort("users", false)
    .execute()?;

Event tracking

let events = db.collection("events");

// Most common events by user type
let event_summary = events.aggregate()
    .match_("timestamp > 1704067200")  // Last 24 hours
    .group_by("event_type")
    .count("occurrences")
    .sort("occurrences", false)
    .limit(10)
    .execute()?;

Performance tips

Use match_ early: Filter documents before expensive operations.
// Good: filter first
let results = users.aggregate()
    .match_("status is \"active\"")
    .group_by("city")
    .count("total")
    .execute()?;

// Bad: group all documents then filter
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .match_("total > 10")  // This doesn't work - match before group!
    .execute()?;
Create indexes on grouped fields:
// Index the field you group by
db.create_index("users", "city_idx", "city", false)?;

// Now grouping by city is faster
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .execute()?;
Use projection to reduce memory:
// Only load needed fields
let results = users.aggregate()
    .project(&["city", "age"])  // Don't load other fields
    .group_by("city")
    .avg("age", "avg_age")
    .execute()?;

Aggregation vs queries

When to use aggregation vs simple queries:
Use CaseMethodExample
Get documentsfind()users.find("age > 25")?
Count totalcount()users.count()?
Count with filtercount_with_query()users.count_with_query(Some("active"))?
Group and countaggregate()users.aggregate().group_by("city").count("n").execute()?
Calculate statsaggregate()users.aggregate().avg("age", "avg").execute()?
Complex analysisaggregate()Multi-stage pipelines
Aggregation is more powerful but has overhead. Use simple queries when possible.

Common patterns

Counting groups

// How many users in each city?
let city_counts = users.aggregate()
    .group_by("city")
    .count("count")
    .sort("count", false)
    .execute()?;

Finding outliers

// Cities with unusually high average age
let results = users.aggregate()
    .group_by("city")
    .avg("age", "avg_age")
    .count("sample_size")
    .sort("avg_age", false)
    .execute()?;

// Filter in application code: avg_age > global_average

Category summaries

// Sales by category
let category_sales = products.aggregate()
    .group_by("category")
    .sum("units_sold", "total_units")
    .sum("revenue", "total_revenue")
    .avg("price", "avg_price")
    .execute()?;

Time-based aggregation

// Assuming you have a "hour" field
let hourly_traffic = events.aggregate()
    .group_by("hour")
    .count("events")
    .sort("hour", true)
    .execute()?;

Limitations

Current limitations:
  • Grouping field must exist in documents (no computed fields)
  • Group by only supports single field (not multiple fields)
  • No $lookup for joins (use application-level joins)
  • All data loads into memory (not suitable for huge result sets)
For advanced use cases, consider:
  • Pre-aggregating data in your application
  • Using indexes to speed up filtering
  • Processing in batches

Next steps

Querying

Master the query language

Indexes

Speed up aggregations with indexes

Performance

Optimize aggregation performance

CRUD operations

Insert data to aggregate

Build docs developers (and LLMs) love