FastAPIのSQLAlchemy + SQL Server実装パターン。AsyncSession DI・接続設定・論理削除・Clockファクトリ・トランザクション・マイグレーション(Alembic)・SQL Server固有の制限をカバーする。リポジトリの実装・マイグレーションの作成・データベースレイヤーの設定時に使用する。
基本ルールは api-design.instructions.md を参照。
delete_flg == 0 フィルタを漏らしていないか(論理削除済みデータの混入)text() を使う場合、バインドパラメータを使用しているか(SQLインジェクション対策)selectinload で一括取得しているか)# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.config import settings
engine = create_async_engine(
settings.database_url,
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
async_session_factory = async_sessionmaker(engine, expire_on_commit=False)
async def get_async_session() -> AsyncSession:
async with async_session_factory() as session:
yield session
mssql+aioodbc は内部で pyodbc をスレッドプール上で実行する擬似非同期ドライバである。
真のノンブロッキング I/O ではないため、以下の点に注意すること。
| 項目 | 内容 |
|---|---|
| スレッドプール上限 | create_async_engine の pool_size + max_overflow がスレッド数の上限になる。デフォルト(10+5=15)を超える同時接続はキュー待ちになる |
| 推奨設定(AP: 6コア・16GB) | pool_size=10, max_overflow=5 を基本とし、負荷試験で調整すること |
pool_pre_ping=True 必須 | 長時間接続が切れた場合に再接続する。本番環境では必ず有効にすること |
# 推奨: AP サーバー(6コア・16GB)向け設定
engine = create_async_engine(
settings.database_url,
pool_size=10, # 常時保持する接続数
max_overflow=5, # 瞬間的な超過を許容する最大追加接続数
pool_timeout=30, # 接続取得待ちタイムアウト(秒)
pool_recycle=1800, # 接続を 30 分で再生成(SQL Server の接続タイムアウト対策)
pool_pre_ping=True, # 切断済み接続の自動検出・再接続
)
.env に定義(コード直書き禁止):
DATABASE_URL="mssql+aioodbc://sa:YourPassword123@localhost:1433/task_manager?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
| 項目 | PostgreSQL との差異 |
|---|---|
| JSON 型 | 非対応。String 型で代替 |
| UUID デフォルト値 | default=uuid4 をモデルで設定 |
| autoincrement | 使用可能。本プロジェクトは文字列 ID 採用 |
| 複合ユニーク制約 | UniqueConstraint 使用可能 |
| ケースセンシティビティ | Collation 依存。デフォルトは大小文字不区別の場合あり |
# app/models/task.py
from sqlalchemy import Column, String, Integer, DateTime
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Task(Base):
__tablename__ = "Task"
id: str = Column(String(36), primary_key=True)
title: str = Column(String(200), nullable=False)
status: str = Column(String(50), nullable=False, default="OPEN")
delete_flg: int = Column("DELETE_FLG", Integer, nullable=False, default=0)
created_at: DateTime = Column(DateTime, nullable=False)
updated_at: DateTime = Column(DateTime, nullable=False)
created_by_id: str = Column(String(36), nullable=False)
updated_by_id: str = Column(String(36), nullable=False)
本プロジェクトは DELETE_FLG カラム(delete_flg)で管理。deleted_at: DateTime? は不使用。
# 有効レコードのみ(必ず delete_flg == 0)
async def find_many(self, session: AsyncSession) -> Result[list[Task]]:
stmt = select(Task).where(Task.delete_flg == 0)
result = await session.execute(stmt)
return Ok(value=list(result.scalars().all()))
# 論理削除(updated_at/updated_by_id も更新)
async def soft_delete(self, session: AsyncSession, task_id: str, user_id: str, clock: Clock) -> Result[None]:
stmt = (
update(Task)
.where(Task.id == task_id)
.values(delete_flg=1, updated_at=clock.now(), updated_by_id=user_id)
)
await session.execute(stmt)
await session.commit()
return Ok(value=None)
delete_flg フィルタ漏れ = L1 違反。 Repository は原則
where(delete_flg == 0)を内包。
datetime.now() 直接使用禁止→ Clock 経由。
# app/common/clock.py
from datetime import datetime, timezone
from abc import ABC, abstractmethod
class Clock(ABC):
@abstractmethod
def now(self) -> datetime: ...
class SystemClock(Clock):
def now(self) -> datetime:
return datetime.now(timezone.utc)
# Repository メソッド内でセッションを使用
async def create(self, session: AsyncSession, task: Task) -> Result[Task]:
try:
session.add(task)
await session.commit()
await session.refresh(task)
return Ok(value=task)
except Exception as e:
await session.rollback()
return Err(error=AppError(type="INTERNAL", message="DB error", details=str(e)))
# Service 層でトランザクション制御
async def create_with_dependencies(
self, dto: CreateTaskDto, scope: OrganizationScope
) -> Result[Task]:
async with self.session.begin():
task_result = await self.task_repo.create(self.session, task)
if not task_result.ok:
return task_result
dep_result = await self.dep_repo.create(self.session, dep)
if not dep_result.ok:
return dep_result
return task_result
# マイグレーション作成
alembic revision --autogenerate -m "add_task_table"
# マイグレーション適用
alembic upgrade head
# ロールバック
alembic downgrade -1
[alembic]
script_location = alembic
sqlalchemy.url = %(DATABASE_URL)s
from sqlalchemy.ext.asyncio import create_async_engine
from app.models import Base
target_metadata = Base.metadata
# NG: N+1
tasks = await session.execute(select(Task))
for task in tasks.scalars():
deps = await session.execute(
select(TaskDependency).where(TaskDependency.task_id == task.id)
)
# OK: selectinload で一括取得
from sqlalchemy.orm import selectinload
stmt = select(Task).options(selectinload(Task.dependencies)).where(Task.delete_flg == 0)
result = await session.execute(stmt)
# NG: 文字列結合
await session.execute(text(f"SELECT * FROM Task WHERE id = '{user_input}'"))
# OK: バインドパラメータ
await session.execute(text("SELECT * FROM Task WHERE id = :id"), {"id": user_input})