MercadoLibre API integration patterns - OAuth, webhooks, item sync, order tracking
expires_at - 300 secondshttpx (async HTTP client) | MercadoLibre API v1 | OAuth 2.0 | FastAPI BackgroundTasks
import httpx
import os
from typing import Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class MercadoLibreAPIClient:
"""Client for MercadoLibre API with OAuth handling"""
def __init__(self):
self.base_url = "https://api.mercadolibre.com"
self.client_id = os.getenv("ML_CLIENT_ID")
self.client_secret = os.getenv("ML_CLIENT_SECRET")
self.user_id = os.getenv("ML_USER_ID")
self.refresh_token = os.getenv("ML_REFRESH_TOKEN")
self.access_token: Optional[str] = None
self.token_expires_at: Optional[datetime] = None
async def get_access_token(self) -> str:
"""Get or refresh access token"""
# Return cached token if still valid
if self.access_token and self.token_expires_at:
if datetime.now() < self.token_expires_at:
return self.access_token
# Refresh token
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/oauth/token",
data={
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token
}
)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
expires_in = data.get("expires_in", 21600) # 6 hours default
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in - 300)
# Update refresh token if provided
if "refresh_token" in data:
self.refresh_token = data["refresh_token"]
# TODO: Persist new refresh_token to secure storage
return self.access_token
except Exception as e:
logger.error(f"Error refreshing ML token: {e}")
raise
async def get_item(self, item_id: str) -> Optional[dict]:
"""
Fetch single item from MercadoLibre.
Args:
item_id: MLA/MLB item ID
Returns:
Item data or None if not found
"""
try:
token = await self.get_access_token()
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{self.base_url}/items/{item_id}",
headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 404:
logger.warning(f"Item {item_id} not found")
return None
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
logger.warning(f"Rate limit hit for item {item_id}")
else:
logger.error(f"HTTP error fetching item {item_id}: {e}")
return None
except Exception as e:
logger.error(f"Error fetching item {item_id}: {e}")
return None
async def get_items_batch(self, item_ids: list[str]) -> dict[str, dict]:
"""
Fetch multiple items in batch (max 20 per request).
Args:
item_ids: List of item IDs
Returns:
Dict mapping item_id -> item_data
"""
results = {}
if not item_ids:
return results
try:
token = await self.get_access_token()
# ML allows up to 20 items per request
batch_size = 20
for i in range(0, len(item_ids), batch_size):
batch = item_ids[i:i + batch_size]
ids_param = ",".join(batch)
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.get(
f"{self.base_url}/items",
params={"ids": ids_param},
headers={"Authorization": f"Bearer {token}"}
)
response.raise_for_status()
# Response is array of {code, body}
data = response.json()
for item_response in data:
if item_response.get("code") == 200:
body = item_response.get("body")
if body:
results[body["id"]] = body
except Exception as e:
logger.error(f"Error fetching batch: {e}")
return results
async def update_item_price(self, item_id: str, new_price: float) -> bool:
"""
Update item price on MercadoLibre.
Args:
item_id: MLA/MLB item ID
new_price: New price (float)
Returns:
True if successful, False otherwise
"""
try:
token = await self.get_access_token()
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.put(
f"{self.base_url}/items/{item_id}",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
json={"price": new_price}
)
response.raise_for_status()
logger.info(f"Updated price for {item_id} to {new_price}")
return True
except Exception as e:
logger.error(f"Error updating price for {item_id}: {e}")
return False
from fastapi import APIRouter, BackgroundTasks, Request
from app.services.ml_webhook_service import process_ml_notification
router = APIRouter(prefix="/webhooks", tags=["webhooks"])
@router.post("/mercadolibre")
async def ml_webhook(
request: Request,
background_tasks: BackgroundTasks
):
"""
MercadoLibre webhook endpoint.
Responds quickly and queues processing.
"""
try:
# Get raw body for logging
body = await request.json()
# Log received notification
logger.info(f"ML webhook received: {body.get('topic')}")
# Queue background processing
background_tasks.add_task(process_ml_notification, body)
# Respond immediately (ML requires < 3 seconds)
return {"status": "ok"}
except Exception as e:
logger.error(f"Error receiving ML webhook: {e}")
return {"status": "error"}, 500
async def process_ml_notification(notification: dict):
"""
Process ML notification in background.
Notification types:
- orders: New order created
- items: Item updated
- questions: New question
"""
try:
topic = notification.get("topic")
resource = notification.get("resource")
if topic == "orders_v2":
# Fetch full order data
order_id = resource.split("/")[-1]
await sync_order_from_ml(order_id)
elif topic == "items":
# Item was updated (price, stock, etc.)
item_id = resource.split("/")[-1]
await sync_item_from_ml(item_id)
elif topic == "questions":
# New question received
question_id = resource.split("/")[-1]
await handle_new_question(question_id)
else:
logger.warning(f"Unknown webhook topic: {topic}")
except Exception as e:
logger.error(f"Error processing ML notification: {e}")
# MercadoLibre OAuth
ML_CLIENT_ID=your_client_id
ML_CLIENT_SECRET=your_client_secret
ML_USER_ID=your_user_id
ML_REFRESH_TOKEN=your_refresh_token
# Optional
ML_WEBHOOK_SECRET=your_webhook_secret # For signature validation