Cliente PostgreSQL async de alto rendimiento para el backend FastAPI del pipeline KYC
asyncpg es un driver PostgreSQL asíncrono de alto rendimiento para Python, diseñado para maximizar el throughput en aplicaciones asyncio como el backend FastAPI del sistema KYC. Proporciona acceso directo al protocolo binario de PostgreSQL sin capas intermedias, ofreciendo latencias significativamente menores que los drivers síncronos en operaciones de lectura/escritura de sesiones de verificación.
Usa esta skill cuando necesites configurar el acceso directo async a PostgreSQL desde el backend FastAPI del pipeline de verificación. Pertenece al database_agent y se enfoca exclusivamente en el driver asyncpg, sin incluir pooling externo ni ORM.
pip install asyncpg
import asyncpg
async def get_connection():
conn = await asyncpg.connect(
host='localhost',
port=5432,
user='verifid_app',
password='secure_password',
database='verifid_kyc',
timeout=10,
statement_cache_size=100
)
return conn
import asyncpg
async def create_pool():
pool = await asyncpg.create_pool(
dsn='postgresql://verifid_app:password@localhost:5432/verifid_kyc',
min_size=5,
max_size=20,
max_inactive_connection_lifetime=300,
command_timeout=30
)
return pool
async def create_verification_session(pool, session_data: dict):
async with pool.acquire() as conn:
row = await conn.fetchrow(
'''
INSERT INTO kyc.verification_sessions
(status, ip_address, device_fingerprint)
VALUES ($1, $2::inet, $3)
RETURNING session_id, created_at
''',
session_data['status'],
session_data['ip_address'],
session_data['device_fingerprint']
)
return dict(row)
async def complete_verification(pool, session_id: str, score: float, reasons: list):
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
'''
UPDATE kyc.verification_sessions
SET status = $1, confidence_score = $2,
reasons = $3::jsonb, completed_at = now()
WHERE session_id = $4
''',
'verified' if score > 0.85 else 'rejected',
score, json.dumps(reasons), session_id
)
await conn.execute(
'''
INSERT INTO kyc.audit_logs (session_id, module_name, module_score, details)
VALUES ($1, 'decision_engine', $2, $3::jsonb)
''',
session_id, score, json.dumps({'reasons': reasons})
)
import json
async def init_connection(conn):
await conn.set_type_codec(
'jsonb',
encoder=json.dumps,
decoder=json.loads,
schema='pg_catalog'
)
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.db_pool = await asyncpg.create_pool(
dsn='postgresql://verifid_app:password@localhost:5432/verifid_kyc',
min_size=5,
max_size=20,
init=init_connection
)
@app.on_event("shutdown")
async def shutdown():
await app.state.db_pool.close()
pgbouncer, y para ORM async consultar sqlalchemy_async.$1, $2) en lugar de interpolación de strings para prevenir SQL injection en los endpoints de verificación.