Receive real-time updates from Interactor via webhooks (push) or Server-Sent Events (pull). Use when building real-time UIs, monitoring credential changes, tracking workflow progress, or streaming AI chat responses.
Receive real-time updates from Interactor via webhooks (push to your server) or Server-Sent Events (pull from browser/client).
interactor-auth skill)| Use Case | Recommended | Reason |
|---|---|---|
| Backend notifications | Webhooks | Server-to-server, reliable delivery |
| Credential status changes | Webhooks | Background processing, no UI needed |
| Workflow completion | Webhooks | Trigger backend actions |
| Real-time chat UI | SSE | Low latency, browser-native |
| Live workflow progress | SSE | Visual feedback for users |
| Streaming AI responses | SSE | Token-by-token display |
General Rule: Use webhooks for backend-to-backend, SSE for frontend real-time updates.
Webhooks push events to your server when things happen in Interactor.
curl https://core.interactor.com/api/v1/webhooks/event-types \
-H "Authorization: Bearer <token>"
Response:
{
"data": {
"event_types": [
"credential.created",
"credential.refreshed",
"credential.expired",
"credential.revoked",
"workflow.instance.created",
"workflow.instance.completed",
"workflow.instance.failed",
"workflow.instance.halted",
"agent.room.message",
"agent.room.closed"
]
}
}
Note: Additional events like
workflow.instance.resumedand tool invocation events are available via SSE streams only. See the SSE section for details.
| Category | Webhook Events | Description |
|---|---|---|
| Credentials | credential.created, credential.refreshed, credential.expired, credential.revoked | OAuth token lifecycle |
| Workflows | workflow.instance.created, workflow.instance.completed, workflow.instance.failed, workflow.instance.halted | Workflow execution status |
| Agents | agent.room.message, agent.room.closed | AI chat events |
SSE-Only Events:
workflow.instance.resumed,tool_use,tool_resultare available via Server-Sent Events streams only.
Interactor follows these principles for webhook payload changes:
| Change Type | Versioning | Example |
|---|---|---|
| New optional fields | Non-breaking, no version bump | Adding metadata field to events |
| New event types | Non-breaking, subscribe to receive | credential.metadata_updated |
| Field type changes | Breaking, announced 90 days ahead | amount from string to number |
| Field removal | Breaking, announced 90 days ahead | Removing deprecated fields |
| Payload restructure | New API version (v2) | Complete payload format change |
Best practices for forward compatibility:
metadata?: Record<string, unknown>/webhooks/:id/test endpoint after updates| Event | Trigger | Delivery | Typical Handler Action |
|---|---|---|---|
credential.created | User completes OAuth flow | Webhook | Log for audit, update UI state |
credential.refreshed | Token auto-refreshed | Webhook | Log for audit (usually no action needed) |
credential.expired | Refresh token failed | Webhook | Notify user to reconnect, disable features |
credential.revoked | User revoked via provider | Webhook | Notify user to reconnect, disable features |
workflow.instance.created | Workflow started | Webhook | Track in analytics, show in dashboard |
workflow.instance.halted | Workflow needs user input | Webhook | Notify user, show input form |
workflow.instance.completed | Workflow finished successfully | Webhook | Process results, update records |
workflow.instance.failed | Workflow error | Webhook | Alert ops, log error details |
agent.room.message | AI sent complete message | Webhook | Forward to push notification or websocket |
agent.room.closed | Chat session ended | Webhook | Log analytics, cleanup resources |
state_changed | Workflow state transition | SSE | Update progress UI |
workflow_data_updated | Workflow data modified | SSE | Refresh displayed data |
halted | Workflow needs input | SSE | Show input form |
resumed | User provided input | SSE | Update UI, show processing |
completed | Workflow finished | SSE | Show completion, redirect |
message | Complete message received | SSE | Display in chat |
message_start | AI started responding | SSE | Show typing indicator |
message_delta | Token received | SSE | Append to streaming message |
message_end | AI finished message | SSE | Finalize message, enable input |
tool_use | AI invoked a tool | SSE | Show tool activity indicator |
tool_result | Tool returned result | SSE | Display tool result (optional) |
heartbeat | Connection keepalive | SSE | Reset connection health timer |
Webhook management requires specific permissions in Interactor:
| Action | Required Permission | Who Has It |
|---|---|---|
| List webhooks | webhooks:read | Admin, Developer |
| Create webhook | webhooks:write | Admin, Developer |
| Update webhook | webhooks:write | Admin, Developer |
| Delete webhook | webhooks:delete | Admin only |
| Regenerate secret | webhooks:write | Admin, Developer |
| View delivery history | webhooks:read | Admin, Developer |
API Token Scopes:
When creating API tokens for webhook management, request these scopes:
webhooks - Full webhook management (read + write + delete)webhooks:read - Read-only access to webhook configurationwebhooks:write - Create and update (no delete)# Token with full webhook access
curl -X POST https://core.interactor.com/api/v1/tokens \
-H "Authorization: Bearer <admin_token>" \
-d '{"name": "Webhook Manager", "scopes": ["webhooks"]}'
Security Note: Webhook secrets are only shown once at creation and regeneration. Store them securely in environment variables or a secrets manager.
curl -X POST https://core.interactor.com/api/v1/webhooks \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"url": "https://yourapp.com/webhooks/interactor",
"events": [
"credential.expired",
"credential.revoked",
"workflow.instance.completed",
"workflow.instance.halted"
],
"enabled": true
}'
Response:
{
"data": {
"id": "wh_abc",
"url": "https://yourapp.com/webhooks/interactor",
"secret": "whsec_xyz_SAVE_THIS",
"events": [
"credential.expired",
"credential.revoked",
"workflow.instance.completed",
"workflow.instance.halted"
],
"enabled": true,
"created_at": "2026-01-20T12:00:00Z"
}
}
CRITICAL: Save the
secret- you'll need it to verify webhook signatures. It's only shown once!
curl https://core.interactor.com/api/v1/webhooks \
-H "Authorization: Bearer <token>"
Response:
{
"data": {
"webhooks": [
{
"id": "wh_abc",
"url": "https://yourapp.com/webhooks/interactor",
"events": ["credential.expired", "workflow.instance.completed"],
"enabled": true,
"created_at": "2026-01-20T12:00:00Z",
"last_delivery_at": "2026-01-20T12:30:00Z",
"last_delivery_status": "delivered"
}
]
}
}
curl https://core.interactor.com/api/v1/webhooks/wh_abc \
-H "Authorization: Bearer <token>"
curl -X PUT https://core.interactor.com/api/v1/webhooks/wh_abc \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"events": ["credential.created", "credential.expired"],
"url": "https://yourapp.com/webhooks/v2/interactor"
}'
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/toggle \
-H "Authorization: Bearer <token>"
curl -X DELETE https://core.interactor.com/api/v1/webhooks/wh_abc \
-H "Authorization: Bearer <token>"
If your secret is compromised:
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/regenerate-secret \
-H "Authorization: Bearer <token>"
Response:
{
"data": {
"id": "wh_abc",
"secret": "whsec_NEW_SECRET_SAVE_THIS",
"regenerated_at": "2026-01-20T12:00:00Z"
}
}
CRITICAL: The new secret is only shown once. Update your webhook handler immediately with the new secret.
See delivery history and debug issues:
curl https://core.interactor.com/api/v1/webhooks/wh_abc/events \
-H "Authorization: Bearer <token>"
Response:
{
"data": {
"events": [
{
"id": "evt_123",
"type": "workflow.instance.completed",
"delivered_at": "2026-01-20T12:00:00Z",
"status": "delivered",
"response_code": 200,
"response_time_ms": 145
},
{
"id": "evt_122",
"type": "credential.expired",
"delivered_at": "2026-01-20T11:55:00Z",
"status": "failed",
"response_code": 500,
"retry_count": 2
}
]
}
}
Send a test event to verify your endpoint:
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/test \
-H "Authorization: Bearer <token>"
All webhook events follow this structure:
{
"id": "evt_abc123",
"type": "workflow.instance.completed",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"instance_id": "inst_xyz",
"workflow_name": "approval_workflow",
"namespace": "user_123",
"status": "completed",
"output": {
"approved": true,
"amount": 5000
}
}
}
credential.created:
{
"id": "evt_001",
"type": "credential.created",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"credential_id": "cred_abc",
"service_id": "google_calendar",
"service_name": "Google Calendar",
"namespace": "user_123",
"scopes": ["calendar.readonly", "calendar.events"]
}
}
credential.refreshed:
{
"id": "evt_002",
"type": "credential.refreshed",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"credential_id": "cred_abc",
"service_id": "google_calendar",
"service_name": "Google Calendar",
"namespace": "user_123",
"expires_at": "2026-01-20T13:00:00Z"
}
}
credential.expired:
{
"id": "evt_003",
"type": "credential.expired",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"credential_id": "cred_abc",
"service_id": "google_calendar",
"service_name": "Google Calendar",
"namespace": "user_123",
"reason": "refresh_token_invalid"
}
}
credential.revoked:
{
"id": "evt_004",
"type": "credential.revoked",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"credential_id": "cred_abc",
"service_id": "google_calendar",
"service_name": "Google Calendar",
"namespace": "user_123",
"reason": "user_revoked_access"
}
}
workflow.instance.created:
{
"id": "evt_005",
"type": "workflow.instance.created",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"instance_id": "inst_xyz",
"workflow_name": "approval_workflow",
"workflow_id": "wf_abc",
"namespace": "user_123",
"initial_input": {
"request_id": "req_456",
"amount": 5000
}
}
}
workflow.instance.halted:
{
"id": "evt_006",
"type": "workflow.instance.halted",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"instance_id": "inst_xyz",
"workflow_name": "approval_workflow",
"namespace": "user_123",
"current_state": "await_approval",
"halting_presentation": {
"type": "form",
"title": "Approval Required",
"fields": [...]
}
}
}
workflow.instance.completed:
{
"id": "evt_007",
"type": "workflow.instance.completed",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"instance_id": "inst_xyz",
"workflow_name": "approval_workflow",
"namespace": "user_123",
"final_state": "approved",
"workflow_data": {
"request_id": "req_456",
"approved": true,
"amount": 5000
}
}
}
workflow.instance.failed:
{
"id": "evt_008",
"type": "workflow.instance.failed",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"instance_id": "inst_xyz",
"workflow_name": "approval_workflow",
"namespace": "user_123",
"failed_state": "process_payment",
"error": {
"code": "payment_declined",
"message": "Card was declined by issuer"
}
}
}
agent.room.message:
{
"id": "evt_009",
"type": "agent.room.message",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"room_id": "room_xyz",
"assistant_id": "asst_abc",
"namespace": "user_123",
"message_id": "msg_123",
"role": "assistant",
"content": "Here's what I found about your billing question..."
}
}
agent.room.closed:
{
"id": "evt_010",
"type": "agent.room.closed",
"timestamp": "2026-01-20T12:00:00Z",
"data": {
"room_id": "room_xyz",
"assistant_id": "asst_abc",
"namespace": "user_123",
"reason": "user_closed",
"message_count": 15,
"duration_seconds": 300
}
}
Note: Not all events require explicit handlers. For example,
credential.createdandcredential.refreshedare often only logged for audit purposes, whileworkflow.instance.createdmay only need tracking in analytics systems.
CRITICAL: Always verify signatures to ensure webhooks came from Interactor.
Webhooks include two headers for verification:
X-Interactor-Signature: sha256=<64 hex characters>
X-Interactor-Timestamp: 2026-01-20T12:00:00Z
Example:
X-Interactor-Signature: sha256=a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2
X-Interactor-Timestamp: 2026-01-20T12:00:00Z
Format validation: The signature header MUST match the format
sha256=followed by exactly 64 lowercase hexadecimal characters. Reject any other format.
CRITICAL: Always validate the timestamp to prevent replay attacks.
X-Interactor-Timestamp as ISO8601|now - timestamp| > allowed_skew (recommended: 5 minutes)const MAX_TIMESTAMP_SKEW_SECONDS = 300; // 5 minutes
function validateTimestamp(timestampHeader: string | undefined): boolean {
if (!timestampHeader) return false;
const timestamp = new Date(timestampHeader);
if (isNaN(timestamp.getTime())) return false;
const now = Date.now();
const diff = Math.abs(now - timestamp.getTime());
return diff <= MAX_TIMESTAMP_SKEW_SECONDS * 1000;
}
When rotating webhook secrets, you may have a period where both old and new secrets are valid:
function verifyWithMultipleSecrets(
payload: string,
signatureHeader: string,
secrets: string[]
): boolean {
for (const secret of secrets) {
if (isValidSignature(signatureHeader, Buffer.from(payload), secret)) {
return true;
}
}
return false;
}
// During rotation, configure both secrets:
const WEBHOOK_SECRETS = [
process.env.INTERACTOR_WEBHOOK_SECRET!, // Current secret
process.env.INTERACTOR_WEBHOOK_SECRET_PREVIOUS!, // Previous secret (optional)
].filter(Boolean);
Rotation procedure:
/regenerate-secret endpointINTERACTOR_WEBHOOK_SECRETINTERACTOR_WEBHOOK_SECRET_PREVIOUS for 24-48 hoursimport crypto from 'crypto';
import express from 'express';
const app = express();
const MAX_TIMESTAMP_SKEW_MS = 5 * 60 * 1000; // 5 minutes
/**
* Validates the X-Interactor-Timestamp header to prevent replay attacks.
* Returns true if the timestamp is within the allowed skew window.
*/
function validateTimestamp(timestampHeader: string | undefined): boolean {
if (!timestampHeader) return false;
const timestamp = new Date(timestampHeader);
if (isNaN(timestamp.getTime())) return false;
const diff = Math.abs(Date.now() - timestamp.getTime());
return diff <= MAX_TIMESTAMP_SKEW_MS;
}
/**
* Validates and verifies the webhook signature using timing-safe comparison.
* Properly handles the sha256= prefix and validates hex format.
*
* IMPORTANT: This function never throws - it returns false for any invalid input.
*/
function isValidSignature(
signatureHeader: string | undefined,
payload: Buffer,
secret: string
): boolean {
// Guard: header must exist
if (!signatureHeader) return false;
// Guard: header must match exact format sha256=<64 hex chars>
const match = signatureHeader.match(/^sha256=([0-9a-f]{64})$/);
if (!match) return false;
const providedSignature = match[1];
// Compute expected signature
const expectedSignature = crypto
.createHmac('sha256', secret)
.update(payload)
.digest('hex');
// Convert to buffers for timing-safe comparison
// Both are now guaranteed to be 64 hex chars = 32 bytes when decoded
const providedBuffer = Buffer.from(providedSignature, 'hex');
const expectedBuffer = Buffer.from(expectedSignature, 'hex');
// Length check (should always pass given regex, but defense in depth)
if (providedBuffer.length !== expectedBuffer.length) return false;
return crypto.timingSafeEqual(providedBuffer, expectedBuffer);
}
// IMPORTANT: Use raw body for signature verification
app.post(
'/webhooks/interactor',
express.raw({ type: 'application/json' }),
(req, res) => {
const signatureHeader = req.headers['x-interactor-signature'] as string;
const timestampHeader = req.headers['x-interactor-timestamp'] as string;
const payload = req.body; // Keep as Buffer
// Step 1: Validate timestamp (prevent replay attacks)
if (!validateTimestamp(timestampHeader)) {
console.warn('Webhook rejected: invalid or stale timestamp');
return res.status(401).json({ error: 'invalid_timestamp' });
}
// Step 2: Verify signature
if (!isValidSignature(signatureHeader, payload, process.env.INTERACTOR_WEBHOOK_SECRET!)) {
console.warn('Webhook rejected: invalid signature');
return res.status(401).json({ error: 'invalid_signature' });
}
// Step 3: Parse and handle event
let event: WebhookEvent;
try {
event = JSON.parse(payload.toString());
} catch {
return res.status(400).json({ error: 'invalid_json' });
}
console.log(`Received event: ${event.type} (${event.id})`);
// Handle asynchronously - respond immediately
handleWebhookEvent(event).catch((err) => {
console.error(`Failed to process event ${event.id}:`, err);
});
// Always respond quickly (< 5 seconds)
res.status(200).json({ received: true });
}
);
async function handleWebhookEvent(event: WebhookEvent) {
switch (event.type) {
case 'credential.expired':
case 'credential.revoked':
// Notify user to reconnect their account
await notifyUserToReconnect(
event.data.namespace,
event.data.service_name
);
break;
case 'workflow.instance.halted':
// Notify user they have a pending approval
await notifyUserOfPendingApproval(
event.data.namespace,
event.data.instance_id,
event.data.halting_presentation
);
break;
case 'workflow.instance.completed':
// Process completed workflow
await processCompletedWorkflow(
event.data.instance_id,
event.data.workflow_data
);
break;
case 'workflow.instance.failed':
// Handle workflow failure
await handleWorkflowFailure(
event.data.namespace,
event.data.instance_id,
event.data.error,
event.data.failed_state
);
break;
case 'agent.room.message':
// Forward message to real-time channel (if not using SSE)
await forwardMessageToClient(
event.data.namespace,
event.data.room_id,
event.data.message_id,
event.data.content
);
break;
}
}
// Webhook event types for type safety
type WebhookEventType =
| 'credential.created'
| 'credential.refreshed'
| 'credential.expired'
| 'credential.revoked'
| 'workflow.instance.created'
| 'workflow.instance.completed'
| 'workflow.instance.failed'
| 'workflow.instance.halted'
| 'agent.room.message'
| 'agent.room.closed';
// SSE-only event types (not available via webhooks)
type SSEEventType =
| 'state_changed'
| 'workflow_data_updated'
| 'halted'
| 'resumed'
| 'completed'
| 'message'
| 'message_start'
| 'message_delta'
| 'message_end'
| 'tool_use'
| 'tool_result'
| 'heartbeat';
interface WebhookEvent<T = Record<string, unknown>> {
id: string;
type: WebhookEventType;
timestamp: string;
data: T;
}
// Specific payload types for each event
interface CredentialEventData {
credential_id: string;
service_id: string;
service_name?: string;
namespace: string;
reason?: string;
scopes?: string[];
expires_at?: string; // For credential.refreshed
}
interface WorkflowEventData {
instance_id: string;
workflow_name: string;
workflow_id?: string;
namespace: string;
status?: 'created' | 'running' | 'halted' | 'completed' | 'failed';
current_state?: string;
final_state?: string;
failed_state?: string;
error?: { code: string; message: string };
initial_input?: Record<string, unknown>;
workflow_data?: Record<string, unknown>;
output?: Record<string, unknown>;
halting_presentation?: Record<string, unknown>;
}
interface AgentMessageEventData {
room_id: string;
assistant_id: string;
namespace: string;
message_id: string;
role: 'user' | 'assistant';
content: string;
}
interface AgentRoomClosedEventData {
room_id: string;
assistant_id: string;
namespace: string;
reason: 'user_closed' | 'timeout' | 'error';
message_count: number;
duration_seconds: number;
}
import hmac
import hashlib
import os
import re
from datetime import datetime, timezone, timedelta
from flask import Flask, request, jsonify
app = Flask(__name__)
MAX_TIMESTAMP_SKEW = timedelta(minutes=5)
SIGNATURE_PATTERN = re.compile(r'^sha256=([0-9a-f]{64})$')
def validate_timestamp(timestamp_header: str | None) -> bool:
"""Validate timestamp to prevent replay attacks."""
if not timestamp_header:
return False
try:
timestamp = datetime.fromisoformat(timestamp_header.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
return abs(now - timestamp) <= MAX_TIMESTAMP_SKEW
except (ValueError, TypeError):
return False
def is_valid_signature(signature_header: str | None, payload: bytes, secret: str) -> bool:
"""
Validate and verify webhook signature with timing-safe comparison.
Returns False for any invalid input - never raises exceptions.
"""
if not signature_header:
return False
# Validate format: sha256=<64 hex chars>
match = SIGNATURE_PATTERN.match(signature_header)
if not match:
return False
provided_signature = match.group(1)
expected_signature = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(provided_signature, expected_signature)
@app.route('/webhooks/interactor', methods=['POST'])
def handle_webhook():
signature_header = request.headers.get('X-Interactor-Signature')
timestamp_header = request.headers.get('X-Interactor-Timestamp')
payload = request.get_data()
# Step 1: Validate timestamp (prevent replay attacks)
if not validate_timestamp(timestamp_header):
print('Webhook rejected: invalid or stale timestamp')
return jsonify({'error': 'invalid_timestamp'}), 401
# Step 2: Verify signature
if not is_valid_signature(signature_header, payload, os.environ['INTERACTOR_WEBHOOK_SECRET']):
print('Webhook rejected: invalid signature')
return jsonify({'error': 'invalid_signature'}), 401
# Step 3: Parse and handle event
try:
event = request.get_json()
except Exception:
return jsonify({'error': 'invalid_json'}), 400
print(f"Received event: {event['type']} ({event['id']})")
# Handle asynchronously (use Celery, RQ, or similar in production)
handle_webhook_event(event)
# Always respond quickly (< 5 seconds)
return jsonify({'received': True}), 200
def handle_webhook_event(event: dict):
event_type = event['type']
data = event['data']
if event_type in ['credential.expired', 'credential.revoked']:
notify_user_to_reconnect(data['namespace'], data.get('service_name'))
elif event_type == 'workflow.instance.halted':
notify_user_of_pending_approval(
data['namespace'],
data['instance_id'],
data.get('halting_presentation')
)
elif event_type == 'workflow.instance.completed':
process_completed_workflow(data['instance_id'], data.get('workflow_data'))
elif event_type == 'workflow.instance.failed':
handle_workflow_failure(
data['namespace'],
data['instance_id'],
data.get('error'),
data.get('failed_state')
)
elif event_type == 'agent.room.message':
forward_message_to_client(
data['namespace'],
data['room_id'],
data['message_id'],
data['content']
)
defmodule MyAppWeb.WebhookController do
use MyAppWeb, :controller
import Plug.Conn, only: [get_req_header: 2]
# Maximum body size for webhooks (1MB should be plenty)
@max_body_length 1_048_576
# Maximum timestamp skew (5 minutes in seconds)
@max_timestamp_skew 300
# Regex to validate signature format: sha256=<64 hex chars>
@signature_pattern ~r/^sha256=([0-9a-f]{64})$/
def interactor(conn, _params) do
signature_header = get_req_header(conn, "x-interactor-signature") |> List.first()
timestamp_header = get_req_header(conn, "x-interactor-timestamp") |> List.first()
with {:ok, payload, conn} <- read_body(conn, length: @max_body_length),
:ok <- validate_timestamp(timestamp_header),
secret <- Application.fetch_env!(:my_app, :interactor_webhook_secret),
:ok <- verify_signature(payload, signature_header, secret),
{:ok, event} <- Jason.decode(payload) do
# Handle asynchronously to respond quickly
Task.start(fn -> handle_event(event) end)
conn
|> put_status(200)
|> json(%{received: true})
else
{:more, _partial, conn} ->
conn
|> put_status(413)
|> json(%{error: "payload_too_large"})
{:error, :invalid_timestamp} ->
conn
|> put_status(401)
|> json(%{error: "invalid_timestamp"})
{:error, :invalid_signature} ->
conn
|> put_status(401)
|> json(%{error: "invalid_signature"})
{:error, _reason} ->
conn
|> put_status(400)
|> json(%{error: "invalid_json"})
end
end
defp validate_timestamp(nil), do: {:error, :invalid_timestamp}
defp validate_timestamp(timestamp_header) do
case DateTime.from_iso8601(timestamp_header) do
{:ok, timestamp, _offset} ->
now = DateTime.utc_now()
diff = abs(DateTime.diff(now, timestamp, :second))
if diff <= @max_timestamp_skew do
:ok
else
{:error, :invalid_timestamp}
end
{:error, _} ->
{:error, :invalid_timestamp}
end
end
defp verify_signature(_payload, nil, _secret), do: {:error, :invalid_signature}
defp verify_signature(payload, signature_header, secret) do
case Regex.run(@signature_pattern, signature_header) do
[_, provided_hex] ->
expected_hex =
:crypto.mac(:hmac, :sha256, secret, payload)
|> Base.encode16(case: :lower)
if Plug.Crypto.secure_compare(provided_hex, expected_hex) do
:ok
else
{:error, :invalid_signature}
end
_ ->
{:error, :invalid_signature}
end
end
defp handle_event(%{"type" => "credential.expired", "data" => data}) do
MyApp.Notifications.notify_reconnect(data["namespace"], data["service_name"])
end
defp handle_event(%{"type" => "workflow.instance.halted", "data" => data}) do
MyApp.Notifications.notify_pending_approval(
data["namespace"],
data["instance_id"],
data["halting_presentation"]
)
end
defp handle_event(%{"type" => "workflow.instance.completed", "data" => data}) do
MyApp.Workflows.process_completed(data["instance_id"], data["workflow_data"])
end
defp handle_event(%{"type" => "workflow.instance.failed", "data" => data}) do
MyApp.Workflows.handle_failure(
data["namespace"],
data["instance_id"],
data["error"],
data["failed_state"]
)
end
defp handle_event(%{"type" => "agent.room.message", "data" => data}) do
MyApp.Chat.forward_message(
data["namespace"],
data["room_id"],
data["message_id"],
data["content"]
)
end
defp handle_event(_event), do: :ok
end
Interactor retries failed webhook deliveries with exponential backoff:
| Attempt | Delay | Total Time |
|---|---|---|
| 1 | Immediate | 0 |
| 2 | 1 minute | 1 min |
| 3 | 5 minutes | 6 min |
| 4 | 30 minutes | 36 min |
| 5 | 2 hours | 2h 36min |
After 5 failed attempts, the webhook is disabled. Re-enable via the toggle endpoint.
Your webhook handler's HTTP response determines Interactor's retry behavior:
| HTTP Status | Interactor Behavior | Your Action |
|---|---|---|
200-299 | ✅ Success - no retry | Event processed successfully |
400 | ❌ Permanent failure - no retry | Bad request, fix your handler |
401 | ❌ Permanent failure - no retry | Signature invalid, check secret |
403 | ❌ Permanent failure - no retry | Forbidden, check permissions |
404 | ❌ Permanent failure - no retry | Endpoint not found, check URL |
408 | 🔄 Retry with backoff | Request timeout, respond faster |
429 | 🔄 Retry with backoff | Rate limited, will retry later |
500 | 🔄 Retry with backoff | Server error, will retry |
502-504 | 🔄 Retry with backoff | Gateway/timeout, will retry |
| Timeout (>30s) | 🔄 Retry with backoff | No response received, will retry |
| Connection refused | 🔄 Retry with backoff | Server unreachable, will retry |
Important: Return
200 OKimmediately, then process asynchronously. If you return4xxerrors for transient issues, Interactor won't retry.
// Example: Idempotent event processing with Redis
const IDEMPOTENCY_TTL = 7 * 24 * 60 * 60; // 7 days in seconds
async function handleWebhookEvent(event: WebhookEvent) {
const idempotencyKey = `webhook:processed:${event.id}`;
// Atomic check-and-set to prevent race conditions
const wasSet = await redis.set(idempotencyKey, Date.now(), 'NX', 'EX', IDEMPOTENCY_TTL);
if (!wasSet) {
console.log(`Event ${event.id} already processed, skipping`);
return;
}
try {
await processEvent(event);
console.log(`Successfully processed event ${event.id}`);
} catch (error) {
// Delete the key so retry can process it
await redis.del(idempotencyKey);
throw error;
}
}
Idempotency Storage Recommendations:
| Storage | TTL | Use Case |
|---|---|---|
| Redis | 7 days | High-throughput, distributed systems |
| PostgreSQL | 30 days | Audit trail needed, lower throughput |
| In-memory | Session | Development/testing only |
For events that repeatedly fail processing, implement a DLQ:
const MAX_PROCESS_ATTEMPTS = 3;
const DLQ_KEY = 'webhook:dlq';
async function handleWebhookEvent(event: WebhookEvent) {
const attemptKey = `webhook:attempts:${event.id}`;
const attempts = await redis.incr(attemptKey);
await redis.expire(attemptKey, 24 * 60 * 60); // 24 hour window
try {
await processEvent(event);
await redis.del(attemptKey);
} catch (error) {
if (attempts >= MAX_PROCESS_ATTEMPTS) {
// Move to DLQ for manual review
await redis.rpush(DLQ_KEY, JSON.stringify({
event,
error: error.message,
failedAt: new Date().toISOString(),
attempts
}));
await redis.del(attemptKey);
console.error(`Event ${event.id} moved to DLQ after ${attempts} attempts`);
// Alert operations team
await alertOps(`Webhook event ${event.id} failed ${attempts} times`);
} else {
console.warn(`Event ${event.id} failed (attempt ${attempts}/${MAX_PROCESS_ATTEMPTS})`);
throw error; // Will be retried by Interactor
}
}
}
// Periodic DLQ processor (run via cron)
async function processDLQ() {
while (true) {
const item = await redis.lpop(DLQ_KEY);
if (!item) break;
const { event, error, failedAt } = JSON.parse(item);
console.log(`DLQ item: ${event.id} failed at ${failedAt}: ${error}`);
// Manual review or automated recovery logic
}
}
Track webhook health with these metrics:
Prometheus Metrics Example:
import { Counter, Histogram, Gauge } from 'prom-client';
// Webhook metrics
const webhookReceived = new Counter({
name: 'interactor_webhook_received_total',
help: 'Total webhooks received',
labelNames: ['event_type', 'status'] // status: success, invalid_signature, invalid_timestamp, processing_error
});
const webhookProcessingDuration = new Histogram({
name: 'interactor_webhook_processing_duration_seconds',
help: 'Webhook processing duration in seconds',
labelNames: ['event_type'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
});
const webhookDLQSize = new Gauge({
name: 'interactor_webhook_dlq_size',
help: 'Current size of the dead-letter queue'
});
// Usage in handler
app.post('/webhooks/interactor', async (req, res) => {
const timer = webhookProcessingDuration.startTimer();
try {
// ... validation ...
if (!isValidSignature(...)) {
webhookReceived.inc({ event_type: 'unknown', status: 'invalid_signature' });
return res.status(401).json({ error: 'invalid_signature' });
}
const event = JSON.parse(payload.toString());
webhookReceived.inc({ event_type: event.type, status: 'success' });
await handleWebhookEvent(event);
timer({ event_type: event.type });
res.status(200).json({ received: true });
} catch (error) {
webhookReceived.inc({ event_type: 'unknown', status: 'processing_error' });
timer({ event_type: 'error' });
throw error;
}
});
Key Metrics to Monitor:
| Metric | Alert Threshold | Description |
|---|---|---|
webhook_received_total{status="invalid_signature"} | >5 in 5min | Possible secret mismatch or attack |
webhook_processing_duration_seconds | p99 >5s | Risk of timeout, scale handlers |
webhook_dlq_size | >0 | Events need manual review |
webhook_received_total{status="processing_error"} | >10 in 5min | Handler bugs, investigate logs |
For real-time streaming in browsers and clients.
Stream updates for a specific workflow instance:
curl -N https://core.interactor.com/api/v1/workflows/instances/inst_xyz/stream \
-H "Authorization: Bearer <token>" \
-H "Accept: text/event-stream"
Events: