Skillbase / spm
Packages

skillbase/db-mongodb

MongoDB with Motor async driver: schema design for document databases, indexes, aggregation pipelines, migrations, and repository pattern implementation

SKILL.md
44
You are a senior database engineer specializing in MongoDB schema design, Motor async driver, aggregation pipelines, and index optimization.
45

46
This skill covers production MongoDB usage with the Motor async driver in Python/FastAPI projects: client setup, document modeling (embedding vs referencing), index strategy, aggregation pipelines, repository pattern, migrations, and transactions. The goal is to produce schemas driven by access patterns with proper indexes, clean domain/BSON separation in repositories, and safe migration practices. Common mistakes this skill prevents: unbounded array embedding, missing compound indexes, relational thinking applied to document modeling, and MongoDB-coupled domain models.
50
When performing MongoDB tasks, follow this process:
51

52
## Motor client setup
53

54
Initialize Motor with the FastAPI lifespan. Single client instance per process:
55

56
```python
57
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
58
from pymongo.errors import ConnectionFailure
59

60

61
class MongoConnection:
62
    def __init__(self, dsn: str, database: str) -> None:
63
        self._client = AsyncIOMotorClient(
64
            dsn,
65
            maxPoolSize=50,
66
            minPoolSize=10,
67
            serverSelectionTimeoutMS=5000,
68
            connectTimeoutMS=5000,
69
            retryWrites=True,
70
            retryReads=True,
71
        )
72
        self._database = database
73

74
    @property
75
    def db(self) -> AsyncIOMotorDatabase:
76
        return self._client[self._database]
77

78
    async def ping(self) -> bool:
79
        try:
80
            await self._client.admin.command("ping")
81
            return True
82
        except ConnectionFailure:
83
            return False
84

85
    def close(self) -> None:
86
        self._client.close()
87
```
88

89
FastAPI lifespan integration:
90

91
```python
92
@asynccontextmanager
93
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
94
    mongo = MongoConnection(dsn=settings.db.dsn, database=settings.db.name)
95
    assert await mongo.ping(), "MongoDB connection failed"
96
    app.state.mongo = mongo
97
    await ensure_indexes(mongo.db)
98
    yield
99
    mongo.close()
100
```
101

102
## Schema design — embedding vs referencing
103

104
**Embed when:** data is always read together (1:1 or 1:few), child has no independent access, document stays under 16MB, updates are infrequent.
105

106
**Reference when:** data has independent lifecycle (1:many, many:many), arrays grow unboundedly, multiple entities reference the same child, child data needs independent queries.
107

108
### Embedding — Order with line items:
109

110
```python
111
{
112
    "_id": ObjectId("..."),
113
    "user_id": ObjectId("..."),
114
    "status": "confirmed",
115
    "items": [
116
        {"product_id": ObjectId("..."), "name": "Widget", "qty": 2, "price": "9.99"},
117
    ],
118
    "total": "44.97",
119
    "created_at": datetime(2026, 3, 10, 12, 0, 0),
120
}
121
```
122

123
### Referencing — User with activity log:
124

125
```python
126
# users collection
127
{"_id": ObjectId("..."), "email": "alice@example.com", "name": "Alice"}
128

129
# activities collection — grows unboundedly, needs independent queries
130
{"_id": ObjectId("..."), "user_id": ObjectId("..."), "action": "login", "timestamp": datetime(...)}
131
```
132

133
### Schema versioning
134

135
Add `schema_version` to documents that may evolve:
136

137
```python
138
{"_id": ObjectId("..."), "schema_version": 2, "email": "alice@example.com", "preferences": {"theme": "dark"}}
139
```
140

141
## Repository implementation
142

143
Map between domain models and BSON documents in the adapter layer:
144

145
```python
146
class MongoUserRepository(UserRepository):
147
    def __init__(self, db: AsyncIOMotorDatabase) -> None:
148
        self._collection = db["users"]
149

150
    async def get_by_id(self, user_id: UserId) -> User | None:
151
        doc = await self._collection.find_one({"_id": ObjectId(str(user_id.value))})
152
        return self._to_domain(doc) if doc else None
153

154
    async def save(self, user: User) -> None:
155
        doc = self._to_document(user)
156
        await self._collection.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True)
157

158
    async def find_paginated(self, *, status: str | None = None, page: int = 1, size: int = 20) -> Page[User]:
159
        query: dict[str, Any] = {}
160
        if status is not None:
161
            query["status"] = status
162

163
        total = await self._collection.count_documents(query)
164
        cursor = self._collection.find(query).sort("created_at", -1).skip((page - 1) * size).limit(size)
165
        items = [self._to_domain(doc) async for doc in cursor]
166
        return Page(items=items, total=total, page=page, size=size)
167

168
    @staticmethod
169
    def _to_domain(doc: dict[str, Any]) -> User:
170
        return User(id=UserId(value=doc["_id"]), email=doc["email"], name=doc["name"], created_at=doc["created_at"])
171

172
    @staticmethod
173
    def _to_document(user: User) -> dict[str, Any]:
174
        return {
175
            "_id": ObjectId(str(user.id.value)),
176
            "email": user.email,
177
            "name": user.name,
178
            "created_at": user.created_at,
179
            "updated_at": datetime.now(tz=UTC),
180
        }
181
```
182

183
## Index design
184

185
Create indexes based on query patterns. Define in a centralized function:
186

187
```python
188
async def ensure_indexes(db: AsyncIOMotorDatabase) -> None:
189
    """Safe to call on every startup — MongoDB skips existing indexes."""
190
    await db["users"].create_indexes([
191
        IndexModel([("email", ASCENDING)], unique=True, name="idx_email_unique"),
192
        IndexModel([("created_at", DESCENDING)], name="idx_created_at"),
193
    ])
194

195
    await db["orders"].create_indexes([
196
        IndexModel([("user_id", ASCENDING), ("status", ASCENDING), ("created_at", DESCENDING)], name="idx_user_status_date"),
197
        IndexModel([("status", ASCENDING), ("created_at", DESCENDING)], name="idx_status_date"),
198
    ])
199

200
    await db["activities"].create_indexes([
201
        IndexModel([("user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_user_timestamp"),
202
        IndexModel([("timestamp", ASCENDING)], expireAfterSeconds=90 * 86400, name="idx_ttl_timestamp"),
203
    ])
204
```
205

206
### Index rules
207

208
- **ESR (Equality-Sort-Range)**: equality fields first, then sort, then range in compound indexes
209
- **Covered queries**: include projected fields in the index
210
- **`unique=True`** for fields that must be unique (email, external_id)
211
- **TTL indexes** for time-expiring data (sessions, logs, tokens)
212
- Compound indexes on high-cardinality fields — low-cardinality fields alone (e.g., boolean status) produce index scans barely faster than collection scans
213

214
## Aggregation pipelines
215

216
```python
217
Pipeline = list[dict[str, Any]]
218

219
class OrderAnalytics:
220
    def __init__(self, db: AsyncIOMotorDatabase) -> None:
221
        self._collection = db["orders"]
222

223
    async def revenue_by_status(self, *, since: datetime | None = None) -> list[dict[str, Any]]:
224
        pipeline: Pipeline = []
225
        if since is not None:
226
            pipeline.append({"$match": {"created_at": {"$gte": since}}})
227

228
        pipeline.extend([
229
            {"$group": {
230
                "_id": "$status",
231
                "total_revenue": {"$sum": {"$toDecimal": "$total"}},
232
                "order_count": {"$sum": 1},
233
                "avg_order_value": {"$avg": {"$toDecimal": "$total"}},
234
            }},
235
            {"$sort": {"total_revenue": -1}},
236
            {"$project": {
237
                "_id": 0, "status": "$_id",
238
                "total_revenue": {"$toString": "$total_revenue"},
239
                "order_count": 1,
240
                "avg_order_value": {"$toString": "$avg_order_value"},
241
            }},
242
        ])
243
        return [doc async for doc in self._collection.aggregate(pipeline)]
244
```
245

246
## Data migrations
247

248
Use versioned, idempotent migration scripts:
249

250
```python
251
async def run_migration(db, migration_id: str, up_fn) -> None:
252
    migrations = db["_migrations"]
253
    if await migrations.find_one({"_id": migration_id}) is not None:
254
        return
255
    await up_fn(db)
256
    await migrations.insert_one({"_id": migration_id, "applied_at": datetime.now(tz=UTC)})
257
```
258

259
## Bulk operations
260

261
Use `bulk_write` with `ordered=False` for batch operations:
262

263
```python
264
async def bulk_update_statuses(collection, updates: list[tuple[ObjectId, str]]) -> int:
265
    if not updates:
266
        return 0
267
    operations = [
268
        UpdateOne({"_id": oid}, {"$set": {"status": s, "updated_at": datetime.now(tz=UTC)}})
269
        for oid, s in updates
270
    ]
271
    result = await collection.bulk_write(operations, ordered=False)
272
    return result.modified_count
273
```
274

275
## Transactions
276

277
```python
278
async def transfer_funds(client, from_id, to_id, amount: Decimal) -> None:
279
    async with await client.start_session() as session:
280
        async with session.start_transaction():
281
            db = client["finance"]
282
            from_account = await db["accounts"].find_one({"_id": from_id}, session=session)
283
            if from_account is None or Decimal(from_account["balance"]) < amount:
284
                raise InsufficientFundsError(...)
285
            await db["accounts"].update_one({"_id": from_id}, {"$inc": {"balance": float(-amount)}}, session=session)
286
            await db["accounts"].update_one({"_id": to_id}, {"$inc": {"balance": float(amount)}}, session=session)
287
```
294
User asks: "Design a MongoDB schema for a blog platform with posts, comments, and tags"
296
**Access patterns:** fetch post with metadata (frequent), list by tag sorted by date (frequent), list comments paginated (frequent), user's recent posts (moderate), full-text search (occasional).
297

298
**Schema:**
299

300
```python
301
# posts — embed tags (bounded), reference comments (unbounded)
302
{
303
    "_id": ObjectId("..."),
304
    "schema_version": 1,
305
    "author_id": ObjectId("..."),
306
    "slug": "building-async-apis",
307
    "title": "Building Async APIs with FastAPI",
308
    "content": "...",
309
    "tags": ["python", "fastapi", "async"],  # bounded, always read with post
310
    "status": "published",
311
    "comment_count": 42,  # denormalized, updated via $inc
312
    "published_at": datetime(2026, 3, 14, 10, 0, 0),
313
    "created_at": datetime(2026, 3, 10, 12, 0, 0),
314
}
315

316
# comments — referenced (unbounded growth)
317
{
318
    "_id": ObjectId("..."),
319
    "post_id": ObjectId("..."),
320
    "author_name": "Alice",  # denormalized for display
321
    "body": "Great article!",
322
    "created_at": datetime(2026, 3, 14, 11, 0, 0),
323
}
324
```
325

326
**Indexes:**
327

328
```python
329
async def ensure_indexes(db: AsyncIOMotorDatabase) -> None:
330
    await db["posts"].create_indexes([
331
        IndexModel([("tags", ASCENDING), ("status", ASCENDING), ("published_at", DESCENDING)], name="idx_tags_status_published"),
332
        IndexModel([("author_id", ASCENDING), ("published_at", DESCENDING)], name="idx_author_published"),
333
        IndexModel([("slug", ASCENDING)], unique=True, name="idx_slug_unique"),
334
        IndexModel([("title", TEXT), ("content", TEXT)], weights={"title": 10, "content": 1}, name="idx_text_search"),
335
    ])
336
    await db["comments"].create_indexes([
337
        IndexModel([("post_id", ASCENDING), ("created_at", ASCENDING)], name="idx_post_date"),
338
    ])
339
```
343
- Design schemas around access patterns, not entity relationships — document DBs optimize for reads, not normalization
344
- Embed data read together with bounded growth; reference unbounded or independently accessed data — prevents document bloat and 16MB limit hits
345
- Denormalize fields read frequently but updated rarely — comment which collection owns the canonical value
346
- Follow ESR (Equality-Sort-Range) for compound index field order — maximizes index selectivity
347
- Use `create_indexes()` on every startup — MongoDB skips existing indexes, so this is safe and keeps indexes in sync with code
348
- Use TTL indexes for time-expiring data (sessions, logs, tokens) — automates cleanup without cron jobs
349
- Store monetary values as strings, convert with `$toDecimal` in aggregations — prevents floating-point precision loss
350
- Add `schema_version` to collections that may evolve — enables gradual migrations without downtime
351
- Use `update_one` with `upsert=True` for save operations — handles both insert and update in one call
352
- Use `bulk_write(ordered=False)` for batch operations — allows parallel execution and continues past individual failures
353
- Pass `session` to all operations within a transaction — operations outside the session run outside the transaction silently
354
- Map between domain models and BSON in the repository layer — domain models stay free of MongoDB dependencies