skillbase/db-mongodb
MongoDB with Motor async driver: schema design for document databases, indexes, aggregation pipelines, migrations, and repository pattern implementation
SKILL.md
44
You are a senior database engineer specializing in MongoDB schema design, Motor async driver, aggregation pipelines, and index optimization.
45
46
This skill covers production MongoDB usage with the Motor async driver in Python/FastAPI projects: client setup, document modeling (embedding vs referencing), index strategy, aggregation pipelines, repository pattern, migrations, and transactions. The goal is to produce schemas driven by access patterns with proper indexes, clean domain/BSON separation in repositories, and safe migration practices. Common mistakes this skill prevents: unbounded array embedding, missing compound indexes, relational thinking applied to document modeling, and MongoDB-coupled domain models.
50
When performing MongoDB tasks, follow this process:
51
52
## Motor client setup
53
54
Initialize Motor with the FastAPI lifespan. Single client instance per process:
55
56
```python
57
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
58
from pymongo.errors import ConnectionFailure
59
60
61
class MongoConnection:
62
def __init__(self, dsn: str, database: str) -> None:
63
self._client = AsyncIOMotorClient(
64
dsn,
65
maxPoolSize=50,
66
minPoolSize=10,
67
serverSelectionTimeoutMS=5000,
68
connectTimeoutMS=5000,
69
retryWrites=True,
70
retryReads=True,
71
)
72
self._database = database
73
74
@property
75
def db(self) -> AsyncIOMotorDatabase:
76
return self._client[self._database]
77
78
async def ping(self) -> bool:
79
try:
80
await self._client.admin.command("ping")81
return True
82
except ConnectionFailure:
83
return False
84
85
def close(self) -> None:
86
self._client.close()
87
```
88
89
FastAPI lifespan integration:
90
91
```python
92
@asynccontextmanager
93
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
94
mongo = MongoConnection(dsn=settings.db.dsn, database=settings.db.name)
95
assert await mongo.ping(), "MongoDB connection failed"
96
app.state.mongo = mongo
97
await ensure_indexes(mongo.db)
98
yield
99
mongo.close()
100
```
101
102
## Schema design — embedding vs referencing
103
104
**Embed when:** data is always read together (1:1 or 1:few), child has no independent access, document stays under 16MB, updates are infrequent.
105
106
**Reference when:** data has independent lifecycle (1:many, many:many), arrays grow unboundedly, multiple entities reference the same child, child data needs independent queries.
107
108
### Embedding — Order with line items:
109
110
```python
111
{112
"_id": ObjectId("..."),113
"user_id": ObjectId("..."),114
"status": "confirmed",
115
"items": [
116
{"product_id": ObjectId("..."), "name": "Widget", "qty": 2, "price": "9.99"},117
],
118
"total": "44.97",
119
"created_at": datetime(2026, 3, 10, 12, 0, 0),
120
}
121
```
122
123
### Referencing — User with activity log:
124
125
```python
126
# users collection
127
{"_id": ObjectId("..."), "email": "alice@example.com", "name": "Alice"}128
129
# activities collection — grows unboundedly, needs independent queries
130
{"_id": ObjectId("..."), "user_id": ObjectId("..."), "action": "login", "timestamp": datetime(...)}131
```
132
133
### Schema versioning
134
135
Add `schema_version` to documents that may evolve:
136
137
```python
138
{"_id": ObjectId("..."), "schema_version": 2, "email": "alice@example.com", "preferences": {"theme": "dark"}}139
```
140
141
## Repository implementation
142
143
Map between domain models and BSON documents in the adapter layer:
144
145
```python
146
class MongoUserRepository(UserRepository):
147
def __init__(self, db: AsyncIOMotorDatabase) -> None:
148
self._collection = db["users"]
149
150
async def get_by_id(self, user_id: UserId) -> User | None:
151
doc = await self._collection.find_one({"_id": ObjectId(str(user_id.value))})152
return self._to_domain(doc) if doc else None
153
154
async def save(self, user: User) -> None:
155
doc = self._to_document(user)
156
await self._collection.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True)157
158
async def find_paginated(self, *, status: str | None = None, page: int = 1, size: int = 20) -> Page[User]:
159
query: dict[str, Any] = {}160
if status is not None:
161
query["status"] = status
162
163
total = await self._collection.count_documents(query)
164
cursor = self._collection.find(query).sort("created_at", -1).skip((page - 1) * size).limit(size)165
items = [self._to_domain(doc) async for doc in cursor]
166
return Page(items=items, total=total, page=page, size=size)
167
168
@staticmethod
169
def _to_domain(doc: dict[str, Any]) -> User:
170
return User(id=UserId(value=doc["_id"]), email=doc["email"], name=doc["name"], created_at=doc["created_at"])
171
172
@staticmethod
173
def _to_document(user: User) -> dict[str, Any]:
174
return {175
"_id": ObjectId(str(user.id.value)),
176
"email": user.email,
177
"name": user.name,
178
"created_at": user.created_at,
179
"updated_at": datetime.now(tz=UTC),
180
}
181
```
182
183
## Index design
184
185
Create indexes based on query patterns. Define in a centralized function:
186
187
```python
188
async def ensure_indexes(db: AsyncIOMotorDatabase) -> None:
189
"""Safe to call on every startup — MongoDB skips existing indexes."""
190
await db["users"].create_indexes([
191
IndexModel([("email", ASCENDING)], unique=True, name="idx_email_unique"),192
IndexModel([("created_at", DESCENDING)], name="idx_created_at"),193
])
194
195
await db["orders"].create_indexes([
196
IndexModel([("user_id", ASCENDING), ("status", ASCENDING), ("created_at", DESCENDING)], name="idx_user_status_date"),197
IndexModel([("status", ASCENDING), ("created_at", DESCENDING)], name="idx_status_date"),198
])
199
200
await db["activities"].create_indexes([
201
IndexModel([("user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_user_timestamp"),202
IndexModel([("timestamp", ASCENDING)], expireAfterSeconds=90 * 86400, name="idx_ttl_timestamp"),203
])
204
```
205
206
### Index rules
207
208
- **ESR (Equality-Sort-Range)**: equality fields first, then sort, then range in compound indexes
209
- **Covered queries**: include projected fields in the index
210
- **`unique=True`** for fields that must be unique (email, external_id)
211
- **TTL indexes** for time-expiring data (sessions, logs, tokens)
212
- Compound indexes on high-cardinality fields — low-cardinality fields alone (e.g., boolean status) produce index scans barely faster than collection scans
213
214
## Aggregation pipelines
215
216
```python
217
Pipeline = list[dict[str, Any]]
218
219
class OrderAnalytics:
220
def __init__(self, db: AsyncIOMotorDatabase) -> None:
221
self._collection = db["orders"]
222
223
async def revenue_by_status(self, *, since: datetime | None = None) -> list[dict[str, Any]]:
224
pipeline: Pipeline = []
225
if since is not None:
226
pipeline.append({"$match": {"created_at": {"$gte": since}}})227
228
pipeline.extend([
229
{"$group": {230
"_id": "$status",
231
"total_revenue": {"$sum": {"$toDecimal": "$total"}},232
"order_count": {"$sum": 1},233
"avg_order_value": {"$avg": {"$toDecimal": "$total"}},234
}},
235
{"$sort": {"total_revenue": -1}},236
{"$project": {237
"_id": 0, "status": "$_id",
238
"total_revenue": {"$toString": "$total_revenue"},239
"order_count": 1,
240
"avg_order_value": {"$toString": "$avg_order_value"},241
}},
242
])
243
return [doc async for doc in self._collection.aggregate(pipeline)]
244
```
245
246
## Data migrations
247
248
Use versioned, idempotent migration scripts:
249
250
```python
251
async def run_migration(db, migration_id: str, up_fn) -> None:
252
migrations = db["_migrations"]
253
if await migrations.find_one({"_id": migration_id}) is not None:254
return
255
await up_fn(db)
256
await migrations.insert_one({"_id": migration_id, "applied_at": datetime.now(tz=UTC)})257
```
258
259
## Bulk operations
260
261
Use `bulk_write` with `ordered=False` for batch operations:
262
263
```python
264
async def bulk_update_statuses(collection, updates: list[tuple[ObjectId, str]]) -> int:
265
if not updates:
266
return 0
267
operations = [
268
UpdateOne({"_id": oid}, {"$set": {"status": s, "updated_at": datetime.now(tz=UTC)}})269
for oid, s in updates
270
]
271
result = await collection.bulk_write(operations, ordered=False)
272
return result.modified_count
273
```
274
275
## Transactions
276
277
```python
278
async def transfer_funds(client, from_id, to_id, amount: Decimal) -> None:
279
async with await client.start_session() as session:
280
async with session.start_transaction():
281
db = client["finance"]
282
from_account = await db["accounts"].find_one({"_id": from_id}, session=session)283
if from_account is None or Decimal(from_account["balance"]) < amount:
284
raise InsufficientFundsError(...)
285
await db["accounts"].update_one({"_id": from_id}, {"$inc": {"balance": float(-amount)}}, session=session)286
await db["accounts"].update_one({"_id": to_id}, {"$inc": {"balance": float(amount)}}, session=session)287
```
294
User asks: "Design a MongoDB schema for a blog platform with posts, comments, and tags"
296
**Access patterns:** fetch post with metadata (frequent), list by tag sorted by date (frequent), list comments paginated (frequent), user's recent posts (moderate), full-text search (occasional).
297
298
**Schema:**
299
300
```python
301
# posts — embed tags (bounded), reference comments (unbounded)
302
{303
"_id": ObjectId("..."),304
"schema_version": 1,
305
"author_id": ObjectId("..."),306
"slug": "building-async-apis",
307
"title": "Building Async APIs with FastAPI",
308
"content": "...",
309
"tags": ["python", "fastapi", "async"], # bounded, always read with post
310
"status": "published",
311
"comment_count": 42, # denormalized, updated via $inc
312
"published_at": datetime(2026, 3, 14, 10, 0, 0),
313
"created_at": datetime(2026, 3, 10, 12, 0, 0),
314
}
315
316
# comments — referenced (unbounded growth)
317
{318
"_id": ObjectId("..."),319
"post_id": ObjectId("..."),320
"author_name": "Alice", # denormalized for display
321
"body": "Great article!",
322
"created_at": datetime(2026, 3, 14, 11, 0, 0),
323
}
324
```
325
326
**Indexes:**
327
328
```python
329
async def ensure_indexes(db: AsyncIOMotorDatabase) -> None:
330
await db["posts"].create_indexes([
331
IndexModel([("tags", ASCENDING), ("status", ASCENDING), ("published_at", DESCENDING)], name="idx_tags_status_published"),332
IndexModel([("author_id", ASCENDING), ("published_at", DESCENDING)], name="idx_author_published"),333
IndexModel([("slug", ASCENDING)], unique=True, name="idx_slug_unique"),334
IndexModel([("title", TEXT), ("content", TEXT)], weights={"title": 10, "content": 1}, name="idx_text_search"),335
])
336
await db["comments"].create_indexes([
337
IndexModel([("post_id", ASCENDING), ("created_at", ASCENDING)], name="idx_post_date"),338
])
339
```
343
- Design schemas around access patterns, not entity relationships — document DBs optimize for reads, not normalization
344
- Embed data read together with bounded growth; reference unbounded or independently accessed data — prevents document bloat and 16MB limit hits
345
- Denormalize fields read frequently but updated rarely — comment which collection owns the canonical value
346
- Follow ESR (Equality-Sort-Range) for compound index field order — maximizes index selectivity
347
- Use `create_indexes()` on every startup — MongoDB skips existing indexes, so this is safe and keeps indexes in sync with code
348
- Use TTL indexes for time-expiring data (sessions, logs, tokens) — automates cleanup without cron jobs
349
- Store monetary values as strings, convert with `$toDecimal` in aggregations — prevents floating-point precision loss
350
- Add `schema_version` to collections that may evolve — enables gradual migrations without downtime
351
- Use `update_one` with `upsert=True` for save operations — handles both insert and update in one call
352
- Use `bulk_write(ordered=False)` for batch operations — allows parallel execution and continues past individual failures
353
- Pass `session` to all operations within a transaction — operations outside the session run outside the transaction silently
354
- Map between domain models and BSON in the repository layer — domain models stay free of MongoDB dependencies