Follow these patterns when implementing activity emission and audit logging in OptAIC. Use for emitting ActivityEnvelopes on mutations (create, update, delete, execute), designing payloads, and ensuring audit compliance.
Guide for implementing audit-compliant activity emission in OptAIC services.
Apply when:
If it changes state, it MUST emit an activity.
All mutations must emit activities in the service layer (not API handlers or models).
ActivityEnvelope(
tenant_id=UUID,
actor_principal_id=UUID,
resource_id=UUID,
resource_type=str, # "signal", "dataset", "portfolio"
action=str, # "signal.created", "run.completed"
visibility=str, # "private"|"resource"|"scope"|"tenant"
payload=dict, # Action-specific data
delivery_channels=list, # Where to publish
correlation_id=UUID # Links related activities
)
Use pattern: <resource>.<verb>
signal.registered signal.validated signal.promoted
dataset.created dataset.previewed dataset.refresh_started
dataset.refresh_completed dataset.refresh_failed
pipeline_def.submitted pipeline_def.deployed
pipeline_instance.created pipeline.run_started pipeline.run_completed
experiment.created experiment.updated
experiment.run_completed experiment.run_failed
expression.evaluated macro.saved
run.started run.completed run.failed run.cancelled
backtest.started backtest.completed backtest.failed
training.started training.completed training.failed
inference.started inference.completed inference.failed
optimization.started optimization.completed optimization.failed
monitoring.started monitoring.completed monitoring.alert
portfolio.rebalanced portfolio.constraints_updated
portfolio.weights_computed portfolio.optimization_started
promotion.requested promotion.approved promotion.merged promotion.rejected
guardrails.validated guardrails.blocked guardrails.warned
monitoring.drift_detected monitoring.performance_alert
monitoring.data_quality_alert monitoring.threshold_breach
await record_activity_with_outbox(
session=self.session,
envelope=ActivityEnvelope(
action="signal.created",
actor_principal_id=self.actor_id,
tenant_id=self.tenant_id,
resource_id=resource.id,
resource_type="signal",
payload={"signal_type": dto.signal_type}
)
)
from libs.core.activity import tx_activity
result, activity = await tx_activity(db, envelope, domain_fn)
Include: Changed fields, related IDs, computed metrics, status transitions Exclude: Passwords, API keys, large blobs, PII beyond necessity
Link related activities in workflows:
correlation_id = uuid4()
# Use same correlation_id throughout workflow
await emit("promotion.requested", correlation_id=correlation_id)
await emit("guardrails.validated", correlation_id=correlation_id)
await emit("promotion.merged", correlation_id=correlation_id)
Activities are processed by the outbox worker which publishes to Centrifugo for real-time WebSocket delivery.
| Type | Who | Mechanism | Opt-in? |
|---|---|---|---|
| Implicit | Owner + Delegators | Query Resource + RoleBinding | Automatic |
| Explicit | Subscribers | Query Subscription table | User opts in |
# Outbox worker builds watchers set: