AsyncAbstractRepository

class pydantic_mongo.AsyncAbstractRepository(database: AsyncDatabase)[source]

Bases: BaseAbstractRepository[T]

An asynchronous repository implementation for MongoDB using Pydantic models.

This class provides a high-level interface for performing asynchronous CRUD operations on MongoDB collections using Pydantic models for type safety and data validation.

Example

class UserRepository(AsyncAbstractRepository[User]):
class Meta:

collection_name = ‘users’

repo = UserRepository(database) user = await repo.find_one_by_id(user_id)

Generic type T must be a Pydantic model with an ‘id’ field.

class Meta[source]

Bases: object

collection_name: str
async delete(model: T)[source]

Asynchronously delete a model instance from the database.

Parameters:

model – The model instance to delete

Returns:

The result of the delete operation

Return type:

DeleteResult

async delete_by_id(_id: Any)[source]

Asynchronously delete a model instance from the database by its ID.

Parameters:

_id – The ID of the model instance to delete

Returns:

The result of the delete operation

Return type:

DeleteResult

async find_by(query: dict, skip: int | None = None, limit: int | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[T][source]

Asynchronously find multiple model instances by a MongoDB query.

Parameters:
  • query – MongoDB query dictionary

  • skip – Number of documents to skip

  • limit – Maximum number of documents to return

  • sort – List of (field, direction) tuples for sorting

  • projection – MongoDB projection dictionary

Returns:

Iterator of model instances

Return type:

Iterable[T]

async find_by_with_output_type(output_type: Type[OutputT], query: dict, skip: int | None = None, limit: int | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[OutputT][source]

Asynchronously find multiple model instances with custom output type.

This method allows querying with a different output model than the repository’s base model type, useful for projections and transformations.

Parameters:
  • output_type – The Pydantic model class for the output

  • query – MongoDB query dictionary

  • skip – Number of documents to skip

  • limit – Maximum number of documents to return

  • sort – List of (field, direction) tuples for sorting

  • projection – MongoDB projection dictionary

Returns:

Iterator of model instances of the specified output type

Return type:

Iterable[OutputT]

async find_one_by(query: dict) T | None[source]

Asynchronously find a single model instance by a MongoDB query.

Parameters:

query – MongoDB query dictionary

Returns:

The found model instance or None if not found

Return type:

Optional[T]

Example

user = await repo.find_one_by({“email”: “user@example.com”})

async find_one_by_id(_id: Any) T | None[source]

Asynchronously find a single model instance by its ID.

Parameters:

_id – The ID to search for. Must match the type of the model’s ID field (typically ObjectId for MongoDB)

Returns:

The found model instance or None if not found

Return type:

Optional[T]

get_collection() AsyncCollection[source]

Get the MongoDB collection associated with this repository.

Returns:

PyMongo AsyncCollection instance

Return type:

AsyncCollection

async paginate(query: dict, limit: int, after: str | None = None, before: str | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[Edge[TypeVar]][source]

Asynchronously paginate through model instances using cursor-based pagination.

This method implements cursor-based pagination which is more reliable than offset-based pagination for large datasets.

Parameters:
  • query – MongoDB query dictionary

  • limit – Maximum number of documents per page

  • after – Cursor string for fetching next page

  • before – Cursor string for fetching previous page

  • sort – List of (field, direction) tuples for sorting

  • projection – MongoDB projection dictionary

Returns:

Iterator of Edge objects containing model instances

and pagination cursors

Return type:

Iterable[Edge[T]]

Example

Get first page:

edges = await repo.paginate({"status": "active"}, limit=10)

Get next page using the last cursor:

next_edges = await repo.paginate(
    {"status": "active"},
    limit=10,
    after=list(edges)[-1].cursor
)
async paginate_with_output_type(output_type: Type[OutputT], query: dict, limit: int, after: str | None = None, before: str | None = None, sort: Sequence[Tuple[str, int]] | None = None, projection: Dict[str, int] | None = None) Iterable[Edge[TypeVar]][source]

Paginate through model instances with custom output type.

This method implements cursor-based pagination which is more reliable than offset-based pagination for large datasets.

Parameters:
  • output_type – The Pydantic model class for the output

  • query – MongoDB query dictionary

  • limit – Maximum number of documents per page

  • after – Cursor string for fetching next page

  • before – Cursor string for fetching previous page

  • sort – List of (field, direction) tuples for sorting

  • projection – MongoDB projection dictionary

Returns:

Iterator of Edge objects containing model instances

and pagination cursors

Return type:

Iterable[Edge[OutputT]]

async save(model: T) InsertOneResult | UpdateResult[source]

Asynchronously save a model instance to the database.

This method will: - Insert the model if it doesn’t have an ID - Update the model if it has an ID

Parameters:

model – The model instance to save

Returns:

The result of the save operation

Return type:

Union[InsertOneResult, UpdateResult]

async save_many(models: Iterable[T])[source]

Asynchronously save multiple model instances to the database in bulk.

This method optimizes bulk operations by: - Grouping models into insert and update operations - Performing bulk inserts and updates

Parameters:

models – Iterable of model instances to save