This skill should be used when developing FastAPI endpoints, Celery background tasks, or database interactions for Mix Analyzer. Triggers on requests like "add a new API endpoint", "create a background task", "add database model", or "modify the upload route".
This skill provides patterns for developing FastAPI endpoints, Celery tasks, and database interactions.
backend/app/
├── api/routes/ # FastAPI endpoints
│ ├── analysis.py # GET/DELETE analysis
│ ├── upload.py # POST upload
│ ├── stats.py # GET statistics
│ ├── health.py # GET health check
│ ├── websocket.py # WS real-time updates
│ └── commercial.py # External API integrations
├── workers/
│ ├── celery_app.py # Celery configuration
│ └── tasks/
│ ├── audio_processing.py # Main analysis task
│ └── cleanup.py # File cleanup task
├── models/
│ └── analysis.py # SQLAlchemy models
├── services/
│ ├── database.py # Database session management
│ ├── redis.py # Redis client
│ └── webhooks.py # Webhook service
└── schemas/
└── analysis.py # Pydantic schemas
In schemas/analysis.py (or create new schema file):
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class AnalysisResponse(BaseModel):
"""Response schema for analysis endpoint."""
id: str
filename: str
status: str
progress: int = 0
overall_score: Optional[float] = None
results: dict = {}
created_at: datetime
completed_at: Optional[datetime] = None
class Config:
from_attributes = True
In api/routes/ (e.g., api/routes/analysis.py):
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.analysis import Analysis
from app.services.database import async_session
from app.schemas.analysis import AnalysisResponse
router = APIRouter(prefix="/api/v1", tags=["analysis"])
@router.get("/analysis/{analysis_id}", response_model=AnalysisResponse)
async def get_analysis(analysis_id: str):
"""Get analysis results by ID."""
async with async_session() as session:
stmt = select(Analysis).where(Analysis.id == analysis_id)
result = await session.execute(stmt)
analysis = result.scalar_one_or_none()
if not analysis:
raise HTTPException(status_code=404, detail="Analysis not found")
return analysis.to_dict()
In app/main.py:
from app.api.routes import analysis, upload, stats, health
app.include_router(analysis.router)
app.include_router(upload.router)
app.include_router(stats.router)
app.include_router(health.router)
# services/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db():
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.post("/items")
async def create_item():
async with get_db() as db:
# Database operations
pass
# models/analysis.py
from sqlalchemy import String, Float, Integer, DateTime, Text
from sqlalchemy.orm import Mapped, mapped_column
from app.services.database import Base, CompatJSONB as JSONB
class Analysis(Base):
__tablename__ = "analyses"
id: Mapped[str] = mapped_column(String(36), primary_key=True)
filename: Mapped[str] = mapped_column(String(255), nullable=False)
status: Mapped[str] = mapped_column(String(20), default="pending")
# JSONB for flexible data storage
custom_data: Mapped[Optional[dict]] = mapped_column(JSONB)
def to_dict(self) -> dict:
return {
"id": self.id,
"filename": self.filename,
"status": self.status,
# ...
}
# workers/tasks/cleanup.py
from celery import shared_task
from datetime import datetime, timezone, timedelta
import os
from app.config import settings
from app.services.database import async_session
from app.models.analysis import Analysis
@shared_task(
bind=True,
name="cleanup_expired",
autoretry_for=(Exception,),
max_retries=3,
retry_backoff=True,
)
def cleanup_expired_task(self):
"""Remove expired analysis files."""
return asyncio.run(_cleanup_expired_async())
async def _cleanup_expired_async():
"""Async implementation of cleanup."""
async with async_session() as session:
now = datetime.now(timezone.utc)
# Find expired analyses
stmt = select(Analysis).where(
Analysis.expires_at < now,
Analysis.status != "deleted"
)
result = await session.execute(stmt)
expired = result.scalars().all()
deleted_count = 0
for analysis in expired:
# Delete file
filepath = os.path.join(settings.upload_dir, analysis.id)
if os.path.exists(filepath):
os.remove(filepath)
deleted_count += 1
# Mark as deleted
analysis.status = "deleted"
await session.commit()
return {"deleted_files": deleted_count}
# workers/tasks/audio_processing.py (excerpt)
async def _publish_progress(analysis_id: str, data: dict):
"""Publish progress to Redis for WebSocket consumers."""
try:
if redis_client.client is None:
return
channel = f"analysis:{analysis_id}:progress"
message = json.dumps(data)
async with redis_client.client.pipeline(transaction=False) as pipe:
pipe.publish(channel, message)
pipe.set(f"analysis:{analysis_id}:status", message, ex=3600)
await pipe.execute()
except Exception:
pass # Don't fail analysis if Redis fails
# In API route
from app.workers.tasks.audio_processing import process_audio_task
@router.post("/upload")
async def upload_file(file: UploadFile):
# ... save file ...
# Queue background task
process_audio_task.delay(
analysis_id=analysis.id,
filepath=filepath,
modules=["all"] # or specific modules
)
return {"analysis_id": analysis.id, "status": "pending"}
from fastapi import HTTPException
# Not found
raise HTTPException(status_code=404, detail="Analysis not found")
# Validation error
raise HTTPException(status_code=400, detail="Invalid file format")
# Server error
raise HTTPException(status_code=500, detail="Internal server error")
@shared_task(
autoretry_for=(Exception,), # Retry on any exception
max_retries=2,
retry_backoff=True,
retry_backoff_max=30,
dont_autoretry_for=(ValueError, FileNotFoundError), # Don't retry these
)
def my_task():
pass
# api/routes/websocket.py
from fastapi import WebSocket, WebSocketDisconnect
from app.services.redis import redis_client
@router.websocket("/ws/analysis/{analysis_id}")
async def websocket_endpoint(websocket: WebSocket, analysis_id: str):
await websocket.accept()
try:
# Subscribe to Redis channel
pubsub = redis_client.client.pubsub()
await pubsub.subscribe(f"analysis:{analysis_id}:progress")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await websocket.send_json(data)
if data.get("type") in ("completed", "failed"):
break
except WebSocketDisconnect:
pass
finally:
await pubsub.unsubscribe(f"analysis:{analysis_id}:progress")
from fastapi import UploadFile, File
from app.config import settings
import hashlib
@router.post("/upload")
async def upload_file(file: UploadFile = File(...)):
# Validate file type
allowed_types = ["audio/mpeg", "audio/wav", "audio/flac", "audio/ogg"]
if file.content_type not in allowed_types:
raise HTTPException(400, "Unsupported file type")
# Validate file size
content = await file.read()
if len(content) > settings.max_file_size_bytes:
raise HTTPException(400, "File too large")
# Calculate hash for deduplication
file_hash = hashlib.sha256(content).hexdigest()
# Save file
filepath = os.path.join(settings.upload_dir, f"{analysis_id}_{file.filename}")
with open(filepath, "wb") as f:
f.write(content)
return {"analysis_id": analysis_id}
# Create migration
cd backend
alembic revision --autogenerate -m "Description of change"
# Apply migrations
alembic upgrade head
# Rollback one migration
alembic downgrade -1