Using aggregations

Aggregations allow for more complex queries and data processing. You can use the get_collection() method to access the underlying PyMongo collection and perform raw aggregation operations, then map the results to a custom model.

from pydantic import BaseModel
from pydantic_mongo import AbstractRepository

# Custom model for aggregation results
class UserStats(BaseModel):
    role: str
    average_age: float
    total_count: int

class UserRepository(AbstractRepository[User]):
    class Meta:
        collection_name = 'users'

    def get_user_stats(self) -> list[UserStats]:
        """Get statistics about users grouped by role."""
        pipeline = [
            {"$group": {
                "_id": "$role",
                "average_age": {"$avg": "$age"},
                "total_count": {"$sum": 1}
            }},
            {"$project": {
                "role": "$_id",
                "average_age": 1,
                "total_count": 1,
                "_id": 0
            }}
        ]
        collection = self.get_collection()
        results = collection.aggregate(pipeline)
        return [UserStats(**doc) for doc in results]

# Use the custom method
repo = UserRepository(database)
stats = repo.get_user_stats()
for stat in stats:
    print(f"Role: {stat.role}, Avg Age: {stat.average_age}, Count: {stat.total_count}")