Skip to main content
The AggregationPipeline provides a fluent API for performing complex data aggregations similar to MongoDB’s aggregation framework.

Creating a pipeline

Get an aggregation pipeline from a collection:
let users = db.collection("users");
let pipeline = users.aggregate();

Pipeline stages

Match stage

match_

Filter documents in the pipeline.
pub fn match_(self, query: &str) -> Self
query
&str
required
Query string to filter documents
Example:
let results = users.aggregate()
    .match_("age > 25")
    .execute()?;

Group stage

group_by

Group documents by a field value.
pub fn group_by(self, field: &str) -> Self
field
&str
required
Field to group by
Example:
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .execute()?;

// Results: [{"_id": "NYC", "total": 150}, {"_id": "LA", "total": 95}, ...]

Accumulator operations

count

Count documents in each group.
pub fn count(self, output_field: &str) -> Self
output_field
&str
required
Name of the output field for the count
Example:
let results = users.aggregate()
    .group_by("department")
    .count("employee_count")
    .execute()?;

sum

Sum values of a field in each group.
pub fn sum(self, field: &str, output_field: &str) -> Self
field
&str
required
Field to sum
output_field
&str
required
Name of the output field for the sum
Example:
let results = sales.aggregate()
    .group_by("product_category")
    .sum("price", "total_revenue")
    .execute()?;

avg

Calculate average of a field in each group.
pub fn avg(self, field: &str, output_field: &str) -> Self
field
&str
required
Field to average
output_field
&str
required
Name of the output field for the average
Example:
let results = users.aggregate()
    .group_by("city")
    .avg("age", "average_age")
    .execute()?;

min

Find minimum value of a field in each group.
pub fn min(self, field: &str, output_field: &str) -> Self
field
&str
required
Field to find minimum
output_field
&str
required
Name of the output field for the minimum
Example:
let results = products.aggregate()
    .group_by("category")
    .min("price", "lowest_price")
    .execute()?;

max

Find maximum value of a field in each group.
pub fn max(self, field: &str, output_field: &str) -> Self
field
&str
required
Field to find maximum
output_field
&str
required
Name of the output field for the maximum
Example:
let results = products.aggregate()
    .group_by("category")
    .max("price", "highest_price")
    .execute()?;

Sort stage

sort

Sort aggregation results.
pub fn sort(self, field: &str, ascending: bool) -> Self
field
&str
required
Field to sort by
ascending
bool
required
true for ascending, false for descending
Example:
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .sort("total", false) // Descending
    .execute()?;

Limit and skip stages

limit

Limit the number of results.
pub fn limit(self, n: usize) -> Self
n
usize
required
Maximum number of results
Example:
// Top 5 cities by user count
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .sort("total", false)
    .limit(5)
    .execute()?;

skip

Skip a number of results.
pub fn skip(self, n: usize) -> Self
n
usize
required
Number of results to skip
Example:
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .sort("total", false)
    .skip(5)
    .limit(5)
    .execute()?;

Projection stages

project

Include only specified fields.
pub fn project(self, fields: &[&str]) -> Self
fields
&[&str]
required
Array of field names to include
Example:
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .project(&["_id", "total"])
    .execute()?;

exclude

Exclude specified fields.
pub fn exclude(self, fields: &[&str]) -> Self
fields
&[&str]
required
Array of field names to exclude
Example:
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .avg("age", "avg_age")
    .exclude(&["avg_age"])
    .execute()?;

Executing the pipeline

execute

Execute the aggregation pipeline.
pub fn execute(self) -> Result<Vec<Value>>
Returns: Result<Vec<Value>> - Aggregation results

Complete examples

Group and count

let results = users.aggregate()
    .group_by("city")
    .count("user_count")
    .execute()?;

// Results:
// [{"_id": "NYC", "user_count": 150},
//  {"_id": "LA", "user_count": 95},
//  ...]

Multiple accumulators

let results = users.aggregate()
    .group_by("city")
    .count("total_users")
    .avg("age", "average_age")
    .min("age", "youngest")
    .max("age", "oldest")
    .execute()?;

// Results:
// [{"_id": "NYC",
//   "total_users": 150,
//   "average_age": 32.5,
//   "youngest": 18,
//   "oldest": 65}, ...]

Filter then aggregate

let results = users.aggregate()
    .match_("age >= 18") // Only adults
    .group_by("city")
    .count("adult_count")
    .avg("age", "avg_age")
    .execute()?;

Sales analysis

let results = sales.aggregate()
    .match_("date >= \"2024-01-01\"")
    .group_by("product_category")
    .sum("amount", "total_revenue")
    .count("num_sales")
    .sort("total_revenue", false) // Highest revenue first
    .limit(10)
    .execute()?;

for result in results {
    println!("{}: ${} ({} sales)",
        result["_id"],
        result["total_revenue"],
        result["num_sales"]);
}

User demographics

let results = users.aggregate()
    .match_("status is \"active\"")
    .group_by("city")
    .count("total")
    .avg("age", "avg_age")
    .sort("total", false)
    .limit(20)
    .execute()?;

println!("Top 20 cities by active user count:");
for (i, city) in results.iter().enumerate() {
    println!("{}. {}: {} users (avg age: {:.1})",
        i + 1,
        city["_id"],
        city["total"],
        city["avg_age"]);
}

Complex pipeline

let results = orders.aggregate()
    .match_("status is \"completed\" and total > 100")
    .group_by("customer_id")
    .sum("total", "total_spent")
    .count("order_count")
    .avg("total", "avg_order_value")
    .sort("total_spent", false)
    .limit(100)
    .project(&["_id", "total_spent", "order_count"])
    .execute()?;

println!("Top 100 customers by spending:");
for customer in results {
    println!("Customer {}: ${:.2} across {} orders",
        customer["_id"],
        customer["total_spent"],
        customer["order_count"]);
}

Grouping by multiple fields

// Group by city and get stats
let results = users.aggregate()
    .group_by("city")
    .count("users")
    .avg("age", "avg_age")
    .execute()?;

// Then you could process these results further
for group in results {
    println!("{}: {} users, avg age {:.1}",
        group["_id"],
        group["users"],
        group["avg_age"]);
}

Build docs developers (and LLMs) love