決済データの照合(リコンシリエーション)パターン。Stripeと自社DBの整合性検証、 リアルタイム照合とバッチ照合の使い分け、キューベースの非同期同期、 不整合検出・自動修復・アラート設計を網羅する。決済データの不整合は 会計上の問題に直結するため、検出と通知の仕組みが運用上不可欠である。
import Stripe from "stripe";
const stripe = new Stripe(process.env.STRIPE_SECRET_KEY!);
// Webhook受信時にリアルタイムで照合
async function reconcileOnWebhook(event: Stripe.Event): Promise<void> {
if (event.type === "payment_intent.succeeded") {
const paymentIntent = event.data.object as Stripe.PaymentIntent;
await reconcilePaymentIntent(paymentIntent);
}
}
async function reconcilePaymentIntent(
paymentIntent: Stripe.PaymentIntent,
): Promise<ReconciliationResult> {
const orderId = paymentIntent.metadata.order_id;
if (!orderId) {
return recordDiscrepancy({
type: "missing_metadata",
stripeObjectId: paymentIntent.id,
stripeObjectType: "payment_intent",
description: "PaymentIntentにorder_idメタデータが未設定",
severity: "high",
});
}
const order = await db
.select()
.from(orders)
.where(eq(orders.id, orderId))
.get();
if (!order) {
return recordDiscrepancy({
type: "missing_local_record",
stripeObjectId: paymentIntent.id,
stripeObjectType: "payment_intent",
localId: orderId,
description: `注文 ${orderId} がDBに存在しない`,
severity: "critical",
});
}
// 金額照合
if (order.amount !== paymentIntent.amount) {
return recordDiscrepancy({
type: "amount_mismatch",
stripeObjectId: paymentIntent.id,
stripeObjectType: "payment_intent",
localId: orderId,
expected: order.amount,
actual: paymentIntent.amount,
description: `金額不一致: DB=${order.amount}, Stripe=${paymentIntent.amount}`,
severity: "critical",
});
}
// 通貨照合
if (order.currency !== paymentIntent.currency) {
return recordDiscrepancy({
type: "currency_mismatch",
stripeObjectId: paymentIntent.id,
stripeObjectType: "payment_intent",
localId: orderId,
expected: order.currency,
actual: paymentIntent.currency,
description: `通貨不一致: DB=${order.currency}, Stripe=${paymentIntent.currency}`,
severity: "critical",
});
}
// ステータス照合
const expectedStatus = mapStripeStatusToLocal(paymentIntent.status);
if (order.status !== expectedStatus) {
return recordDiscrepancy({
type: "status_mismatch",
stripeObjectId: paymentIntent.id,
stripeObjectType: "payment_intent",
localId: orderId,
expected: expectedStatus,
actual: order.status,
description: `ステータス不一致: DB=${order.status}, 期待=${expectedStatus}`,
severity: "high",
autoFixable: true,
});
}
return { status: "matched", stripeObjectId: paymentIntent.id };
}
function mapStripeStatusToLocal(
stripeStatus: Stripe.PaymentIntent.Status,
): string {
const mapping: Record<string, string> = {
succeeded: "paid",
requires_payment_method: "pending",
requires_confirmation: "pending",
requires_action: "pending_auth",
processing: "processing",
canceled: "canceled",
requires_capture: "authorized",
};
return mapping[stripeStatus] ?? "unknown";
}
// discrepancies テーブル
// CREATE TABLE discrepancies (
// id TEXT PRIMARY KEY,
// type TEXT NOT NULL,
// severity TEXT NOT NULL, -- critical | high | medium | low
// stripe_object_id TEXT NOT NULL,
// stripe_object_type TEXT NOT NULL,
// local_id TEXT,
// expected TEXT,
// actual TEXT,
// description TEXT NOT NULL,
// auto_fixable BOOLEAN DEFAULT FALSE,
// status TEXT DEFAULT 'open', -- open | investigating | resolved | ignored
// resolved_at TIMESTAMP,
// resolved_by TEXT,
// resolution_note TEXT,
// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
// );
interface Discrepancy {
type: string;
severity: "critical" | "high" | "medium" | "low";
stripeObjectId: string;
stripeObjectType: string;
localId?: string;
expected?: unknown;
actual?: unknown;
description: string;
autoFixable?: boolean;
}
interface ReconciliationResult {
status: "matched" | "discrepancy";
stripeObjectId: string;
discrepancyId?: string;
}
async function recordDiscrepancy(
discrepancy: Discrepancy,
): Promise<ReconciliationResult> {
// 同一の不整合が既に記録されていないか確認
const existing = await db
.select()
.from(discrepancies)
.where(
and(
eq(discrepancies.stripeObjectId, discrepancy.stripeObjectId),
eq(discrepancies.type, discrepancy.type),
eq(discrepancies.status, "open"),
),
)
.get();
if (existing) {
return {
status: "discrepancy",
stripeObjectId: discrepancy.stripeObjectId,
discrepancyId: existing.id,
};
}
const id = crypto.randomUUID();
await db.insert(discrepancies).values({
id,
...discrepancy,
expected: discrepancy.expected ? String(discrepancy.expected) : null,
actual: discrepancy.actual ? String(discrepancy.actual) : null,
autoFixable: discrepancy.autoFixable ?? false,
});
// severity に応じたアラート
if (discrepancy.severity === "critical") {
await notifyAlertChannel({
title: "決済データ不整合(Critical)",
description: discrepancy.description,
stripeObjectId: discrepancy.stripeObjectId,
});
}
return {
status: "discrepancy",
stripeObjectId: discrepancy.stripeObjectId,
discrepancyId: id,
};
}
// 日次バッチ照合: Stripeの全PaymentIntentとDBの注文を突合
async function dailyReconciliationJob(): Promise<{
totalChecked: number;
matched: number;
discrepancies: number;
}> {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
yesterday.setHours(0, 0, 0, 0);
const today = new Date();
today.setHours(0, 0, 0, 0);
let totalChecked = 0;
let matched = 0;
let discrepancyCount = 0;
// Stripe側: 前日のPaymentIntentを全件取得
const stripePayments = new Map<string, Stripe.PaymentIntent>();
for await (const pi of stripe.paymentIntents.list({
created: {
gte: Math.floor(yesterday.getTime() / 1000),
lt: Math.floor(today.getTime() / 1000),
},
limit: 100,
})) {
stripePayments.set(pi.id, pi);
}
// DB側: 前日の注文を全件取得
const localOrders = await db
.select()
.from(orders)
.where(
and(
gte(orders.createdAt, yesterday),
lt(orders.createdAt, today),
),
);
// 1. DB注文 → Stripe照合
for (const order of localOrders) {
totalChecked++;
if (!order.paymentIntentId) {
// PaymentIntent未作成の注文(カート放棄等)はスキップ
continue;
}
const stripePi = stripePayments.get(order.paymentIntentId);
if (!stripePi) {
discrepancyCount++;
await recordDiscrepancy({
type: "missing_stripe_record",
stripeObjectId: order.paymentIntentId,
stripeObjectType: "payment_intent",
localId: order.id,
description: `Stripeに PaymentIntent ${order.paymentIntentId} が存在しない`,
severity: "critical",
});
continue;
}
const result = await reconcilePaymentIntent(stripePi);
if (result.status === "matched") {
matched++;
} else {
discrepancyCount++;
}
// 照合済みとしてマーク
stripePayments.delete(order.paymentIntentId);
}
// 2. Stripe → DB照合(DB側にレコードがないStripe決済)
for (const [piId, pi] of stripePayments) {
// metadata にorder_idがある場合のみ対象(テスト決済等は除外)
if (pi.metadata.order_id) {
totalChecked++;
discrepancyCount++;
await recordDiscrepancy({
type: "orphaned_stripe_record",
stripeObjectId: piId,
stripeObjectType: "payment_intent",
localId: pi.metadata.order_id,
description: `Stripeの PaymentIntent ${piId} に対応するDB注文が不明`,
severity: "high",
});
}
}
// 照合結果サマリーを保存
await db.insert(reconciliationRuns).values({
id: crypto.randomUUID(),
runDate: yesterday,
totalChecked,
matched,
discrepancies: discrepancyCount,
status: discrepancyCount > 0 ? "has_discrepancies" : "clean",
});
return { totalChecked, matched, discrepancies: discrepancyCount };
}
// 自動修復可能な不整合を処理
async function autoFixDiscrepancies(): Promise<{
fixed: number;
failed: number;
}> {
const fixableItems = await db
.select()
.from(discrepancies)
.where(
and(
eq(discrepancies.autoFixable, true),
eq(discrepancies.status, "open"),
),
)
.limit(100);
let fixed = 0;
let failed = 0;
for (const item of fixableItems) {
try {
await applyAutoFix(item);
await db
.update(discrepancies)
.set({
status: "resolved",
resolvedAt: new Date(),
resolvedBy: "auto_fix",
resolutionNote: "自動修復により解決",
})
.where(eq(discrepancies.id, item.id));
fixed++;
} catch (error) {
console.error(`Auto-fix failed for discrepancy ${item.id}:`, error);
failed++;
}
}
return { fixed, failed };
}
async function applyAutoFix(discrepancy: typeof discrepancies.$inferSelect): Promise<void> {
switch (discrepancy.type) {
case "status_mismatch": {
// Stripeの状態をDBに反映
const pi = await stripe.paymentIntents.retrieve(discrepancy.stripeObjectId);
const correctStatus = mapStripeStatusToLocal(pi.status);
await db
.update(orders)
.set({ status: correctStatus })
.where(eq(orders.id, discrepancy.localId!));
break;
}
default:
throw new Error(`Auto-fix not supported for type: ${discrepancy.type}`);
}
}
// 照合結果の集計
async function getReconciliationSummary(params: {
from: Date;
to: Date;
}): Promise<{
totalRuns: number;
cleanRuns: number;
totalDiscrepancies: number;
openDiscrepancies: number;
discrepanciesBySeverity: Record<string, number>;
discrepanciesByType: Record<string, number>;
}> {
const runs = await db
.select()
.from(reconciliationRuns)
.where(
and(
gte(reconciliationRuns.runDate, params.from),
lte(reconciliationRuns.runDate, params.to),
),
);
const openItems = await db
.select({
type: discrepancies.type,
severity: discrepancies.severity,
count: sql<number>`count(*)`,
})
.from(discrepancies)
.where(eq(discrepancies.status, "open"))
.groupBy(discrepancies.type, discrepancies.severity);
const bySeverity: Record<string, number> = {};
const byType: Record<string, number> = {};
let totalOpen = 0;
for (const item of openItems) {
bySeverity[item.severity] = (bySeverity[item.severity] ?? 0) + item.count;
byType[item.type] = (byType[item.type] ?? 0) + item.count;
totalOpen += item.count;
}
return {
totalRuns: runs.length,
cleanRuns: runs.filter((r) => r.status === "clean").length,
totalDiscrepancies: runs.reduce((sum, r) => sum + r.discrepancies, 0),
openDiscrepancies: totalOpen,
discrepanciesBySeverity: bySeverity,
discrepanciesByType: byType,
};
}
// 照合イベントをキューに投入(大量データ処理時)
interface ReconciliationTask {
type: "reconcile_payment_intent" | "reconcile_subscription" | "reconcile_invoice";
stripeObjectId: string;
priority: number;
}
async function enqueueReconciliation(task: ReconciliationTask): Promise<void> {
await db.insert(reconciliationQueue).values({
id: crypto.randomUUID(),
taskType: task.type,
stripeObjectId: task.stripeObjectId,
priority: task.priority,
status: "pending",
scheduledAt: new Date(),
});
}
// キューワーカー
async function processReconciliationQueue(): Promise<void> {
const batch = await db
.select()
.from(reconciliationQueue)
.where(eq(reconciliationQueue.status, "pending"))
.orderBy(asc(reconciliationQueue.priority), asc(reconciliationQueue.scheduledAt))
.limit(50);
for (const task of batch) {
// 楽観ロック
const updated = await db
.update(reconciliationQueue)
.set({ status: "processing" })
.where(
and(
eq(reconciliationQueue.id, task.id),
eq(reconciliationQueue.status, "pending"),
),
)
.returning()
.get();
if (!updated) continue;
try {
switch (task.taskType) {
case "reconcile_payment_intent": {
const pi = await stripe.paymentIntents.retrieve(task.stripeObjectId);
await reconcilePaymentIntent(pi);
break;
}
// 他のタスクタイプ...
}
await db
.update(reconciliationQueue)
.set({ status: "completed", completedAt: new Date() })
.where(eq(reconciliationQueue.id, task.id));
} catch (error) {
await db
.update(reconciliationQueue)
.set({ status: "failed" })
.where(eq(reconciliationQueue.id, task.id));
console.error(`Reconciliation task failed: ${task.id}`, error);
}
}
}