Implement incremental sync strategies for Navan BOOKING and TRANSACTION data with ETL pipeline patterns. Use when setting up production data pipelines, debugging sync drift, or adding real-time event processing. Trigger with "navan data sync", "navan incremental sync", "navan ETL pipeline".
This skill provides production-grade sync strategies for Navan data. The two primary tables have fundamentally different sync models: BOOKING requires weekly full-refresh with merge-upsert logic (every record is re-imported, keyed by UUID), while TRANSACTION is incremental and append-only. Real-time use cases require webhook callbacks for event-driven processing. This skill covers all three tiers — scheduled full-refresh, incremental watermark-based sync, and real-time webhooks — along with Airbyte connector configuration and idempotent SQL upsert patterns.
navan-install-auth)NAVAN_CLIENT_ID, NAVAN_CLIENT_SECRET, NAVAN_BASE_URLThe BOOKING table is re-imported weekly by Navan. Every record is refreshed, so your sync must use merge-upsert logic to avoid duplicates while capturing updates.
const tokenRes = await fetch(`${process.env.NAVAN_BASE_URL}/ta-auth/oauth/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.NAVAN_CLIENT_ID!,
client_secret: process.env.NAVAN_CLIENT_SECRET!,
}),
});
const { access_token } = await tokenRes.json();
const headers = { Authorization: `Bearer ${access_token}` };
// Full extraction — paginate through all bookings for weekly refresh
let allBookings: any[] = [];
let page = 0;
const size = 50;
while (true) {
const res = await fetch(
`${process.env.NAVAN_BASE_URL}/v1/bookings?page=${page}&size=${size}`,
{ headers }
);
const { data } = await res.json();
if (!data || !data.length) break;
allBookings.push(...data);
if (data.length < size) break;
page++;
}
console.log(`Extracted ${allBookings.length} bookings for full refresh`);
SQL merge-upsert pattern (PostgreSQL):
-- Staging table receives raw API data
CREATE TABLE IF NOT EXISTS navan_booking_staging (
uuid TEXT PRIMARY KEY,
traveler_email TEXT,
origin TEXT,
destination TEXT,
start_date DATE,
end_date DATE,
total_cost NUMERIC(12,2),
currency TEXT DEFAULT 'USD',
department TEXT,
cost_center TEXT,
status TEXT,
in_policy BOOLEAN,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
synced_at TIMESTAMPTZ DEFAULT NOW()
);
-- Merge-upsert: insert new records, update changed records
INSERT INTO navan_booking AS b
SELECT * FROM navan_booking_staging s
ON CONFLICT (uuid) DO UPDATE SET
traveler_email = EXCLUDED.traveler_email,
origin = EXCLUDED.origin,
destination = EXCLUDED.destination,
start_date = EXCLUDED.start_date,
end_date = EXCLUDED.end_date,
total_cost = EXCLUDED.total_cost,
status = EXCLUDED.status,
in_policy = EXCLUDED.in_policy,
updated_at = EXCLUDED.updated_at,
synced_at = NOW()
WHERE b.updated_at < EXCLUDED.updated_at;
TRANSACTION data is append-only. Use watermark-based sync to pull only new records.
// Track high-watermark for incremental pulls
interface SyncState {
lastSyncDate: string; // ISO date of last successful sync
lastTransactionId: string;
}
async function loadSyncState(): Promise<SyncState> {
const fs = await import('fs');
try {
return JSON.parse(fs.readFileSync('.navan-sync-state.json', 'utf-8'));
} catch {
return { lastSyncDate: '2025-01-01', lastTransactionId: '' };
}
}
async function saveSyncState(state: SyncState) {
const fs = await import('fs');
fs.writeFileSync('.navan-sync-state.json', JSON.stringify(state, null, 2));
}
// Pull bookings since last watermark
const state = await loadSyncState();
const today = new Date().toISOString().split('T')[0];
const txnRes = await fetch(
`${process.env.NAVAN_BASE_URL}/v1/bookings` +
`?createdFrom=${state.lastSyncDate}&createdTo=${today}&page=0&size=50`,
{ headers }
);
const { data: transactions } = await txnRes.json();
// Filter out already-seen transactions
const newTxns = transactions.filter(
(t: any) => t.transaction_id > state.lastTransactionId
);
console.log(`New transactions since ${state.lastSyncDate}: ${newTxns.length}`);
// Update watermark after successful load
if (newTxns.length > 0) {
await saveSyncState({
lastSyncDate: today,
lastTransactionId: newTxns[newTxns.length - 1].transaction_id,
});
}
import { createServer } from 'http';
import { createHmac } from 'crypto';
// Webhook handler for real-time Navan events
const server = createServer(async (req, res) => {
if (req.method !== 'POST' || req.url !== '/navan/webhook') {
res.writeHead(404);
res.end();
return;
}
const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(chunk as Buffer);
const body = Buffer.concat(chunks).toString();
// Verify webhook signature
const signature = req.headers['x-navan-signature'] as string;
const expected = createHmac('sha256', process.env.NAVAN_WEBHOOK_SECRET!)
.update(body)
.digest('hex');
if (signature !== expected) {
console.error('Invalid webhook signature');
res.writeHead(401);
res.end('Unauthorized');
return;
}
const event = JSON.parse(body);
console.log(`Webhook event: ${event.type}`);
switch (event.type) {
case 'booking.created':
console.log(`New booking: ${event.data.uuid}`);
break;
case 'booking.updated':
console.log(`Booking updated: ${event.data.uuid}`);
break;
case 'booking.cancelled':
console.log(`Booking cancelled: ${event.data.uuid}`);
break;
case 'expense.submitted':
console.log(`Expense submitted: ${event.data.transaction_id}`);
break;
case 'expense.approved':
console.log(`Expense approved: ${event.data.transaction_id}`);
break;
default:
console.log(`Unknown event type: ${event.type}`);
}
res.writeHead(200);
res.end('OK');
});
server.listen(3000, () => console.log('Webhook listener on :3000'));
# Airbyte source-navan connector (v0.0.42)
# Production sync mode configuration