AsyncAbstractRepository
- class pydantic_mongo.AsyncAbstractRepository(database: AsyncDatabase)[source]
Bases:
BaseAbstractRepository[T]An asynchronous repository implementation for MongoDB using Pydantic models.
This class provides a high-level interface for performing asynchronous CRUD operations on MongoDB collections using Pydantic models for type safety and data validation.
Example
- class UserRepository(AsyncAbstractRepository[User]):
- class Meta:
collection_name = ‘users’
repo = UserRepository(database) user = await repo.find_one_by_id(user_id)
Generic type T must be a Pydantic model with an ‘id’ field.
- async delete(model: T)[source]
Asynchronously delete a model instance from the database.
- Parameters:
model – The model instance to delete
- Returns:
The result of the delete operation
- Return type:
DeleteResult
- async delete_by_id(_id: Any)[source]
Asynchronously delete a model instance from the database by its ID.
- Parameters:
_id – The ID of the model instance to delete
- Returns:
The result of the delete operation
- Return type:
DeleteResult
- async find_by(query: dict, skip: int | None = None, limit: int | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[T][source]
Asynchronously find multiple model instances by a MongoDB query.
- Parameters:
query – MongoDB query dictionary
skip – Number of documents to skip
limit – Maximum number of documents to return
sort – List of (field, direction) tuples for sorting
projection – MongoDB projection dictionary
- Returns:
Iterator of model instances
- Return type:
Iterable[T]
- async find_by_with_output_type(output_type: Type[OutputT], query: dict, skip: int | None = None, limit: int | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[OutputT][source]
Asynchronously find multiple model instances with custom output type.
This method allows querying with a different output model than the repository’s base model type, useful for projections and transformations.
- Parameters:
output_type – The Pydantic model class for the output
query – MongoDB query dictionary
skip – Number of documents to skip
limit – Maximum number of documents to return
sort – List of (field, direction) tuples for sorting
projection – MongoDB projection dictionary
- Returns:
Iterator of model instances of the specified output type
- Return type:
Iterable[OutputT]
- async find_one_by(query: dict) T | None[source]
Asynchronously find a single model instance by a MongoDB query.
- Parameters:
query – MongoDB query dictionary
- Returns:
The found model instance or None if not found
- Return type:
Optional[T]
Example
user = await repo.find_one_by({“email”: “user@example.com”})
- async find_one_by_id(_id: Any) T | None[source]
Asynchronously find a single model instance by its ID.
- Parameters:
_id – The ID to search for. Must match the type of the model’s ID field (typically ObjectId for MongoDB)
- Returns:
The found model instance or None if not found
- Return type:
Optional[T]
- get_collection() AsyncCollection[source]
Get the MongoDB collection associated with this repository.
- Returns:
PyMongo AsyncCollection instance
- Return type:
AsyncCollection
- async paginate(query: dict, limit: int, after: str | None = None, before: str | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[Edge[TypeVar]][source]
Asynchronously paginate through model instances using cursor-based pagination.
This method implements cursor-based pagination which is more reliable than offset-based pagination for large datasets.
- Parameters:
query – MongoDB query dictionary
limit – Maximum number of documents per page
after – Cursor string for fetching next page
before – Cursor string for fetching previous page
sort – List of (field, direction) tuples for sorting
projection – MongoDB projection dictionary
- Returns:
- Iterator of Edge objects containing model instances
and pagination cursors
- Return type:
Iterable[Edge[T]]
Example
Get first page:
edges = await repo.paginate({"status": "active"}, limit=10)
Get next page using the last cursor:
next_edges = await repo.paginate( {"status": "active"}, limit=10, after=list(edges)[-1].cursor )
- async paginate_with_output_type(output_type: Type[OutputT], query: dict, limit: int, after: str | None = None, before: str | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[Edge[TypeVar]][source]
Paginate through model instances with custom output type.
This method implements cursor-based pagination which is more reliable than offset-based pagination for large datasets.
- Parameters:
output_type – The Pydantic model class for the output
query – MongoDB query dictionary
limit – Maximum number of documents per page
after – Cursor string for fetching next page
before – Cursor string for fetching previous page
sort – List of (field, direction) tuples for sorting
projection – MongoDB projection dictionary
- Returns:
- Iterator of Edge objects containing model instances
and pagination cursors
- Return type:
Iterable[Edge[OutputT]]
- async save(model: T) InsertOneResult | UpdateResult[source]
Asynchronously save a model instance to the database.
This method will: - Insert the model if it doesn’t have an ID - Update the model if it has an ID
- Parameters:
model – The model instance to save
- Returns:
The result of the save operation
- Return type:
Union[InsertOneResult, UpdateResult]
- async save_many(models: Iterable[T])[source]
Asynchronously save multiple model instances to the database in bulk.
This method optimizes bulk operations by: - Grouping models into insert and update operations - Performing bulk inserts and updates
- Parameters:
models – Iterable of model instances to save