The AggregationPipeline provides a fluent API for performing complex data aggregations similar to MongoDB’s aggregation framework.
Creating a pipeline
Get an aggregation pipeline from a collection:
let users = db.collection("users");
let pipeline = users.aggregate();
Pipeline stages
Match stage
match_
Filter documents in the pipeline.
pub fn match_(self, query: &str) -> Self
Query string to filter documents
Example:
let results = users.aggregate()
.match_("age > 25")
.execute()?;
Group stage
group_by
Group documents by a field value.
pub fn group_by(self, field: &str) -> Self
Example:
let results = users.aggregate()
.group_by("city")
.count("total")
.execute()?;
// Results: [{"_id": "NYC", "total": 150}, {"_id": "LA", "total": 95}, ...]
Accumulator operations
count
Count documents in each group.
pub fn count(self, output_field: &str) -> Self
Name of the output field for the count
Example:
let results = users.aggregate()
.group_by("department")
.count("employee_count")
.execute()?;
sum
Sum values of a field in each group.
pub fn sum(self, field: &str, output_field: &str) -> Self
Name of the output field for the sum
Example:
let results = sales.aggregate()
.group_by("product_category")
.sum("price", "total_revenue")
.execute()?;
avg
Calculate average of a field in each group.
pub fn avg(self, field: &str, output_field: &str) -> Self
Name of the output field for the average
Example:
let results = users.aggregate()
.group_by("city")
.avg("age", "average_age")
.execute()?;
min
Find minimum value of a field in each group.
pub fn min(self, field: &str, output_field: &str) -> Self
Name of the output field for the minimum
Example:
let results = products.aggregate()
.group_by("category")
.min("price", "lowest_price")
.execute()?;
max
Find maximum value of a field in each group.
pub fn max(self, field: &str, output_field: &str) -> Self
Name of the output field for the maximum
Example:
let results = products.aggregate()
.group_by("category")
.max("price", "highest_price")
.execute()?;
Sort stage
sort
Sort aggregation results.
pub fn sort(self, field: &str, ascending: bool) -> Self
true for ascending, false for descending
Example:
let results = users.aggregate()
.group_by("city")
.count("total")
.sort("total", false) // Descending
.execute()?;
Limit and skip stages
limit
Limit the number of results.
pub fn limit(self, n: usize) -> Self
Maximum number of results
Example:
// Top 5 cities by user count
let results = users.aggregate()
.group_by("city")
.count("total")
.sort("total", false)
.limit(5)
.execute()?;
skip
Skip a number of results.
pub fn skip(self, n: usize) -> Self
Number of results to skip
Example:
let results = users.aggregate()
.group_by("city")
.count("total")
.sort("total", false)
.skip(5)
.limit(5)
.execute()?;
Projection stages
project
Include only specified fields.
pub fn project(self, fields: &[&str]) -> Self
Array of field names to include
Example:
let results = users.aggregate()
.group_by("city")
.count("total")
.project(&["_id", "total"])
.execute()?;
exclude
Exclude specified fields.
pub fn exclude(self, fields: &[&str]) -> Self
Array of field names to exclude
Example:
let results = users.aggregate()
.group_by("city")
.count("total")
.avg("age", "avg_age")
.exclude(&["avg_age"])
.execute()?;
Executing the pipeline
execute
Execute the aggregation pipeline.
pub fn execute(self) -> Result<Vec<Value>>
Returns: Result<Vec<Value>> - Aggregation results
Complete examples
Group and count
let results = users.aggregate()
.group_by("city")
.count("user_count")
.execute()?;
// Results:
// [{"_id": "NYC", "user_count": 150},
// {"_id": "LA", "user_count": 95},
// ...]
Multiple accumulators
let results = users.aggregate()
.group_by("city")
.count("total_users")
.avg("age", "average_age")
.min("age", "youngest")
.max("age", "oldest")
.execute()?;
// Results:
// [{"_id": "NYC",
// "total_users": 150,
// "average_age": 32.5,
// "youngest": 18,
// "oldest": 65}, ...]
Filter then aggregate
let results = users.aggregate()
.match_("age >= 18") // Only adults
.group_by("city")
.count("adult_count")
.avg("age", "avg_age")
.execute()?;
Sales analysis
let results = sales.aggregate()
.match_("date >= \"2024-01-01\"")
.group_by("product_category")
.sum("amount", "total_revenue")
.count("num_sales")
.sort("total_revenue", false) // Highest revenue first
.limit(10)
.execute()?;
for result in results {
println!("{}: ${} ({} sales)",
result["_id"],
result["total_revenue"],
result["num_sales"]);
}
User demographics
let results = users.aggregate()
.match_("status is \"active\"")
.group_by("city")
.count("total")
.avg("age", "avg_age")
.sort("total", false)
.limit(20)
.execute()?;
println!("Top 20 cities by active user count:");
for (i, city) in results.iter().enumerate() {
println!("{}. {}: {} users (avg age: {:.1})",
i + 1,
city["_id"],
city["total"],
city["avg_age"]);
}
Complex pipeline
let results = orders.aggregate()
.match_("status is \"completed\" and total > 100")
.group_by("customer_id")
.sum("total", "total_spent")
.count("order_count")
.avg("total", "avg_order_value")
.sort("total_spent", false)
.limit(100)
.project(&["_id", "total_spent", "order_count"])
.execute()?;
println!("Top 100 customers by spending:");
for customer in results {
println!("Customer {}: ${:.2} across {} orders",
customer["_id"],
customer["total_spent"],
customer["order_count"]);
}
Grouping by multiple fields
// Group by city and get stats
let results = users.aggregate()
.group_by("city")
.count("users")
.avg("age", "avg_age")
.execute()?;
// Then you could process these results further
for group in results {
println!("{}: {} users, avg age {:.1}",
group["_id"],
group["users"],
group["avg_age"]);
}