Implement data ingestion adapters for external data sources with idempotent upserts.
Implement ingestion adapters for external data sources with proper error handling, retries, and idempotent operations.
Do NOT use when:
What to do: Design the adapter interface and data flow.
Design Pattern:
# src/ingestion/adapters/base.py
from abc import ABC, abstractmethod
from typing import Iterator, Dict, Any
class DataAdapter(ABC):
"""Base class for data ingestion adapters."""
@abstractmethod
def fetch(self, **kwargs) -> Iterator[Dict[str, Any]]:
"""Fetch data from source. Yields records."""
pass
@abstractmethod
def validate(self, record: Dict[str, Any]) -> bool:
"""Validate record before storage."""
pass
@abstractmethod
def transform(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""Transform record to target schema."""
pass
Validation:
What to do: Implement data fetching with retries and backoff.
Code Pattern:
# src/ingestion/adapters/example_api.py
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
class ExampleAPIAdapter(DataAdapter):
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.example.com/v1"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=lambda e: isinstance(e, (requests.Timeout, requests.ConnectionError))
)
def fetch(self, start_date: str = None, end_date: str = None):
"""Fetch data with pagination and retries."""
page = 1
while True:
response = requests.get(
f"{self.base_url}/data",
params={
"api_key": self.api_key,
"page": page,
"start_date": start_date,
"end_date": end_date
},
timeout=30
)
response.raise_for_status()
data = response.json()
for record in data["results"]:
yield record
if not data["has_more"]:
break
page += 1
# Rate limiting
time.sleep(1) # Respect API limits
Validation:
What to do: Transform source data to target schema.
Code Pattern:
def transform(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""Transform API record to database schema."""
return {
"source_id": record["id"],
"name": record["name"].strip().title(),
"created_at": parse_datetime(record["created_at"]),
"metadata": {
"source": "example_api",
"version": record.get("version", "1.0")
}
}
Validation:
What to do: Implement idempotent upsert to database.
Code Pattern:
# src/ingestion/persistence.py
from sqlalchemy.dialects.postgresql import insert
def upsert_records(session, model, records: list, conflict_columns: list):
"""Idempotent upsert - safe to re-run."""
if not records:
return 0
stmt = insert(model).values(records)
# Update on conflict (upsert)
update_dict = {
c.name: c for c in stmt.excluded if c.name not in conflict_columns
}
stmt = stmt.on_conflict_do_update(
index_elements=conflict_columns,
set_=update_dict
)
result = session.execute(stmt)
return result.rowcount
# Usage
adapter = ExampleAPIAdapter(api_key="xxx")
records = []
for record in adapter.fetch(start_date="2026-01-01"):
if adapter.validate(record):
records.append(adapter.transform(record))
# Batch insert
if len(records) >= 1000:
upsert_records(session, DataModel, records, ["source_id"])
records = []
# Final batch
if records:
upsert_records(session, DataModel, records, ["source_id"])
Validation:
What to do: Implement data quality checks.
Code Pattern:
def validate(self, record: Dict[str, Any]) -> bool:
"""Validate record before storage."""
required_fields = ["id", "name", "created_at"]
# Check required fields
for field in required_fields:
if field not in record or record[field] is None:
logger.warning(f"Missing required field: {field}")
return False
# Type validation
if not isinstance(record["id"], (int, str)):
logger.warning(f"Invalid id type: {type(record['id'])}")
return False
# Business rules
if len(record.get("name", "")) < 1:
logger.warning("Empty name")
return False
return True
Validation:
What to do: Create runnable ingestion script.
Code Pattern:
# scripts/ingest_example.py
import typer
from datetime import datetime, timedelta
from src.ingestion.adapters.example_api import ExampleAPIAdapter
from src.ingestion.persistence import upsert_records
from src.database import SessionLocal
app = typer.Typer()
@app.command()
def ingest(
start_date: str = None,
end_date: str = None,
full: bool = False
):
"""Ingest data from Example API."""
if full:
start_date = "2020-01-01"
end_date = datetime.now().strftime("%Y-%m-%d")
adapter = ExampleAPIAdapter(api_key=config.EXAMPLE_API_KEY)
session = SessionLocal()
try:
total = 0
records = []
for record in adapter.fetch(start_date, end_date):
if adapter.validate(record):
records.append(adapter.transform(record))
if len(records) >= 1000:
count = upsert_records(session, DataModel, records, ["source_id"])
total += count
session.commit()
records = []
typer.echo(f"Ingested {total} records...")
# Final batch
if records:
count = upsert_records(session, DataModel, records, ["source_id"])
total += count
session.commit()
typer.echo(f"Complete! Total records: {total}")
except Exception as e:
session.rollback()
logger.error(f"Ingestion failed: {e}")
raise
finally:
session.close()
if __name__ == "__main__":
app()
# Test ingestion
uv run python scripts/ingest_example.py --start-date 2026-01-01
# Run validation
uv run pytest tests/ingestion/test_example_adapter.py -v
# Check data quality
uv run python scripts/validate_data.py
# Check what was ingested
psql -d mydb -c "SELECT COUNT(*) FROM data_model WHERE created_at > '2026-01-01'"
# Delete bad data (if needed)
psql -d mydb -c "DELETE FROM data_model WHERE source = 'example_api' AND created_at > '2026-01-01'"
# Or rollback transaction (if caught early)
# Ingestion should use transactions - rollback on error
.agent/CONTEXT.md.agent/AGENTS.mddocs/data/sources/docs/knowledge_base.mdScenario: Ingest daily weather data from external API.
Pattern:
Scenario: Process uploaded CSV files.
Pattern:
Remember: Always test with small batches first!