Monitors Excel spreadsheets (.xlsx/.xls) and extracts key sales metrics (MTD, YTD, Year End) to generate internal live reports. Use when the user needs to parse sales reports, track revenue figures, pull MTD/YTD data from a spreadsheet, build a sales dashboard, or automate extraction of sales performance data from Excel files into a database.
This agent watches a directory for Excel workbooks, parses sales metrics from named sheets (MTD, YTD, Year End), maps flexible column schemas via fuzzy matching, validates rows, bulk-inserts results into PostgreSQL, and logs every import. It emits a completion event for downstream agents and dashboards.
~$) and wait for write completion before processingWorkbooks typically contain one or more sheets named after the metric type. Example:
Sheet: "MTD"
| rep_email | rep_name | revenue | units | quota | deals |
|-----------------------|--------------|-----------|-------|-----------|-------|
| [email protected] | Alice Smith | $124,500 | 83 | $100,000 | 12 |
| [email protected] | Bob Jones | $98,200 | 61 | $120,000 | 9 |
Sheet: "YTD"
| rep_email | rep_name | total_sales | qty | annual_quota |
|-----------------------|--------------|-------------|-----|--------------|
| [email protected] | Alice Smith | $1,340,000 | 890 | $1,200,000 |
Column names vary — use fuzzy matching (see below).
Use watchdog (Observer + FileSystemEventHandler) to watch WATCH_DIR for .xlsx/.xls creates and modifications. Before processing, poll file size at 1-second intervals (up to 10 retries) and proceed only when size stabilises. Skip filenames starting with ~$.
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCH_DIR = "/data/sales_imports"
STABLE_RETRIES = 10
STABLE_INTERVAL = 1 # seconds
class SalesFileHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
self._handle(event.src_path)
def on_modified(self, event):
if not event.is_directory:
self._handle(event.src_path)
def _handle(self, path):
if not path.endswith((".xlsx", ".xls")):
return
if os.path.basename(path).startswith("~$"):
return
if self._wait_until_stable(path):
process_workbook(path)
def _wait_until_stable(self, path):
prev_size = -1
for _ in range(STABLE_RETRIES):
size = os.path.getsize(path)
if size == prev_size:
return True
prev_size = size
time.sleep(STABLE_INTERVAL)
return False # never stabilised; skip
observer = Observer()
observer.schedule(SalesFileHandler(), WATCH_DIR, recursive=False)
observer.start()
Use rapidfuzz with fuzz.token_sort_ratio at threshold ≥ 80. Iterate all aliases per canonical name and keep the highest-scoring match.
Canonical → alias list:
revenue → ["revenue", "sales", "total_sales", "amount", "gross_sales"]units → ["units", "qty", "quantity", "volume"]quota → ["quota", "target", "annual_quota", "monthly_quota"]deals → ["deals", "opportunities", "closed_won"]rep_email → ["rep_email", "email", "email_address"]rep_name → ["rep_name", "name", "full_name", "salesperson"]from rapidfuzz import fuzz
COLUMN_ALIASES = {
"revenue": ["revenue", "sales", "total_sales", "amount", "gross_sales"],
"units": ["units", "qty", "quantity", "volume"],
"quota": ["quota", "target", "annual_quota", "monthly_quota"],
"deals": ["deals", "opportunities", "closed_won"],
"rep_email": ["rep_email", "email", "email_address"],
"rep_name": ["rep_name", "name", "full_name", "salesperson"],
}
FUZZY_THRESHOLD = 80
def map_columns(actual_columns: list[str]) -> dict[str, str]:
"""Return {canonical_name: actual_column} for all matched columns."""
mapping = {}
for canonical, aliases in COLUMN_ALIASES.items():
best_score, best_col = 0, None
for col in actual_columns:
score = max(fuzz.token_sort_ratio(col.lower(), alias) for alias in aliases)
if score > best_score:
best_score, best_col = score, col
if best_score >= FUZZY_THRESHOLD:
mapping[canonical] = best_col
return mapping
Match sheet name (lowercased) against keyword groups:
MTD → ["mtd", "month to date", "monthly"]YTD → ["ytd", "year to date", "annual"]YEAR_END → ["year end", "yearend", "projection", "forecast"]"UNKNOWN"METRIC_KEYWORDS = {
"MTD": ["mtd", "month to date", "monthly"],
"YTD": ["ytd", "year to date", "annual"],
"YEAR_END": ["year end", "yearend", "projection", "forecast"],
}
def detect_metric_type(sheet_name: str) -> str:
lower = sheet_name.lower()
for metric, keywords in METRIC_KEYWORDS.items():
if any(kw in lower for kw in keywords):
return metric
return "UNKNOWN"
For each row:
$ and commas from currency fields, cast to float; return None on failurerep_email / rep_name; log a warning and skip otherwisequota_attainment = round(revenue / quota, 4) when both are non-nullrep_email to lowercase, strippedimport logging
def parse_currency(value) -> float | None:
try:
return float(str(value).replace("$", "").replace(",", "").strip())
except (ValueError, TypeError):
return None
def extract_row(raw: dict, col_map: dict, metric_type: str, source_file: str) -> dict | None:
get = lambda key: raw.get(col_map[key]) if key in col_map else None
rep_email = (get("rep_email") or "").strip().lower() or None
rep_name = (get("rep_name") or "").strip() or None
if not rep_email and not rep_name:
logging.warning("Skipping row — no rep_email or rep_name: %s", raw)
return None
revenue = parse_currency(get("revenue"))
quota = parse_currency(get("quota"))
quota_attainment = round(revenue / quota, 4) if revenue and quota else None
return {
"rep_email": rep_email,
"rep_name": rep_name,
"metric_type": metric_type,
"revenue": revenue,
"units": get("units"),
"quota": quota,
"deals": get("deals"),
"quota_attainment": quota_attainment,
"source_file": source_file,
}
CREATE TABLE sales_metrics (
id SERIAL PRIMARY KEY,
rep_email TEXT,
rep_name TEXT,
metric_type TEXT NOT NULL, -- MTD | YTD | YEAR_END
revenue NUMERIC(14,2),
units INTEGER,
quota NUMERIC(14,2),
deals INTEGER,
quota_attainment NUMERIC(6,4),
source_file TEXT NOT NULL,
imported_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE import_log (
id SERIAL PRIMARY KEY,
source_file TEXT NOT NULL,
status TEXT NOT NULL, -- processing | success | failed
rows_processed INTEGER,
rows_failed INTEGER,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error_message TEXT
);
Use psycopg2.extras.execute_values for bulk inserts within a single transaction. After the insert, verify the row count matches and rollback on mismatch.
import psycopg2
import psycopg2.extras
INSERT_SQL = """
INSERT INTO sales_metrics
(rep_email, rep_name, metric_type, revenue, units, quota, deals,
quota_attainment, source_file)
VALUES %s
"""
def bulk_insert(conn, records: list[dict], source_file: str):
tuples = [
(r["rep_email"], r["rep_name"], r["metric_type"], r["revenue"],
r["units"], r["quota"], r["deals"], r["quota_attainment"], r["source_file"])
for r in records
]
with conn: # auto-commit / rollback context
with conn.cursor() as cur:
psycopg2.extras.execute_values(cur, INSERT_SQL, tuples)
# Post-insert verification
cur.execute(
"SELECT COUNT(*) FROM sales_metrics WHERE source_file = %s",
(source_file,)
)
inserted = cur.fetchone()[0]
if inserted != len(records):
raise ValueError(
f"Count mismatch: expected {len(records)}, found {inserted}"
)
1. File detected in watch directory
│
▼
2. Log import as "processing" in import_log (record started_at)
│
▼
3. Read workbook with openpyxl/pandas; iterate sheets
│
▼
4. Detect metric type per sheet name
│
▼
5. Map columns via fuzzy matching; parse & clean each row
│
▼ [VALIDATION CHECKPOINT]
│ - Confirm at least one of rep_email / rep_name present
│ - Confirm revenue is numeric after currency stripping
│ - Log & skip invalid rows; track rows_failed count
▼
6. Bulk insert validated records in a single transaction
│
▼ [POST-INSERT VERIFICATION]
│ - SELECT COUNT(*) WHERE source_file = <this file>
│ - Confirm inserted count == len(valid_records)
│ - ROLLBACK and raise alert if counts mismatch
▼
7. Update import_log: status="success", rows_processed, rows_failed, completed_at
│
▼
8. Emit completion event for downstream agents / dashboards
On any exception during process_workbook:
logging.errorimport_log with status="failed", partial counts, and error_messagepsycopg2 context manager automatically rolls back the transaction on exception