Architectural patterns including hexagonal architecture, CQRS, and event-driven design. Load when making architectural decisions or reviewing system design.
This document describes how to structure applications using functional domain modeling principles.
Topics covered:
For domain modeling patterns, see domain-modeling.md. For theoretical foundations, see theoretical-foundations.md.
Structure applications in concentric layers with domain logic at the center and infrastructure at the edges.
┌─────────────────────────────────────┐
│ Infrastructure (I/O, external) │
│ ┌───────────────────────────────┐ │
│ │ Application (workflows) │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ Domain (types, logic) │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────┘ │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘
Layer responsibilities:
Domain layer (innermost):
Application layer (middle):
Infrastructure layer (outermost):
Dependencies point inward only:
Infrastructure → Application → Domain
(I/O) (workflows) (pure logic)
Never:
Domain → Infrastructure ⊘
Domain → Application ⊘
This ensures:
Domain layer:
- Measurement types (Temperature, Pressure, QualityScore)
- Calibration models (CalibrationParams, CalibratedData)
- Validation rules (quality thresholds, range checks)
- State transitions (RawObservations → CalibratedData → ProcessedResults)
Application layer:
- process_observations workflow
- calibrate_measurements workflow
- validate_results workflow
- Coordinates domain logic with I/O dependencies
Infrastructure layer:
- File readers (CSV, HDF5, NetCDF)
- Database clients (save/load results)
- Cloud storage (S3, GCS)
- Plotting/visualization
- CLI argument parsing
Domain layer:
- Model architecture types
- Training state machines (Untrained → Training → Converged → Validated)
- Hyperparameter types with constraints
- Convergence criteria
Application layer:
- train_model workflow
- validate_model workflow
- deploy_model workflow
- Orchestrates training steps
Infrastructure layer:
- GPU/TPU allocation
- Checkpoint storage
- Metrics logging (Wandb, TensorBoard)
- Model serving infrastructure
- Data loaders
Domain layer:
- Order types (UnvalidatedOrder → ValidatedOrder → PricedOrder)
- Product codes, quantities
- Pricing rules
- Validation logic
Application layer:
- place_order workflow
- validate_order workflow
- price_order workflow
- Coordinates validation, pricing, acknowledgment
Infrastructure layer:
- Database access
- Email sending
- Payment gateway
- Address validation service
- Product catalog
Each architecture layer can be understood as a module algebra with its own signature, implementation, and composition rules. This perspective formalizes the layer separation and clarifies composition.
Each layer defines a signature (interface) hiding implementation details. Domain layer signatures use pure types; infrastructure signatures may include effects.
// Domain layer signature - pure
trait AccountDomain {
fn validate_account(data: AccountData) -> Result<Account, DomainError>;
fn apply_interest(account: Account, rate: Rate) -> Account;
}
// Infrastructure layer signature - effectful
trait AccountRepository {
fn find(&self, id: AccountId) -> impl Future<Output = Result<Account, DbError>>;
fn save(&self, account: &Account) -> impl Future<Output = Result<(), DbError>>;
}
Each layer implements its signature, potentially using services from inner layers. The domain layer is pure; outer layers add effects.
Layer boundaries are crossed via natural transformations (functors mapping between algebras). The application layer composes domain and infrastructure through these transformations.
// Application layer composes domain + infrastructure
async fn process_account(
id: AccountId,
repo: &impl AccountRepository,
) -> Result<Account, AppError> {
let account = repo.find(id).await.map_err(AppError::from)?;
let validated = AccountDomain::validate_account(account.into())?;
let updated = AccountDomain::apply_interest(validated, current_rate());
repo.save(&updated).await.map_err(AppError::from)?;
Ok(updated)
}
See also: domain-modeling.md#module-algebra-for-domain-services for the full signature/algebra/interpreter pattern.
Model domain processes as pipelines of transformations where each step:
Input → [Step1] → [Step2] → [Step3] → Output
↓dep ↓dep ↓dep
Service1 Service2 Service3
def workflow(
dependency1: Service1, # External dependencies first
dependency2: Service2,
config: Config, # Configuration
input_data: InputType # Input data last
) -> Result[OutputType, Error]: # Explicit error handling
"""
Workflow description.
Dependencies:
- dependency1: Purpose of this dependency
- dependency2: Purpose of this dependency
Returns:
- Ok(OutputType): On success
- Error(...): On failure with reason
"""
# Implementation
Key aspects:
# Individual steps with dependencies
def calibrate(
calibration_model: CalibrationModel,
quality_threshold: float,
raw: RawObservations
) -> Result[CalibratedData, CalibrationError]:
...
def infer(
inference_algorithm: InferenceAlgorithm,
calibrated: CalibratedData
) -> Result[InferredResults, InferenceError]:
...
def validate(
validation_metrics: ValidationMetrics,
inferred: InferredResults
) -> Result[ValidatedResults, ValidationError]:
...
# Composed pipeline
def process_observations(
calibration_model: CalibrationModel,
quality_threshold: float,
inference_algorithm: InferenceAlgorithm,
validation_metrics: ValidationMetrics,
raw: RawObservations
) -> Result[ValidatedResults, ProcessingError]:
"""Complete processing pipeline."""
return (
calibrate(calibration_model, quality_threshold, raw)
.map_error(ProcessingError.from_calibration)
.bind(lambda cal: infer(inference_algorithm, cal))
.map_error(ProcessingError.from_inference)
.bind(lambda inf: validate(validation_metrics, inf))
.map_error(ProcessingError.from_validation)
)
See also: railway-oriented-programming.md for Result composition details
Pass dependencies as function parameters rather than using dependency injection frameworks or global state.
Benefits:
from functools import partial
# Full signature with all dependencies
def process_data(
calibration_model: CalibrationModel, # dependency 1
quality_threshold: QualityThreshold, # dependency 2
inference_algo: InferenceAlgorithm, # dependency 3
data: RawObservations # input
) -> Result[ProcessedData, ProcessingError]:
"""Process observations with given dependencies."""
...
# Create specialized version with default dependencies
process_with_defaults = partial(
process_data,
default_calibration_model,
strict_quality_threshold,
standard_inference_algo
)
# Now just: RawObservations → Result[ProcessedData, Error]
result = process_with_defaults(observations)
# Alternative specialization for testing
process_for_testing = partial(
process_data,
mock_calibration_model,
permissive_threshold,
fast_test_algo
)
Define dependencies as simple function types, not heavyweight interfaces:
# Instead of interface with many methods:
class IProductCatalog(Protocol): ⊘
def get_product(self, code: ProductCode) -> Product: ...
def list_products(self) -> List[Product]: ...
def update_stock(self, code: ProductCode, qty: int) -> None: ...
# ... many methods
# Use specific function types:
GetProductPrice = Callable[[ProductCode], Price] ●
CheckProductExists = Callable[[ProductCode], bool]
def price_order(
get_price: GetProductPrice, # Only what we need
check_exists: CheckProductExists,
order: ValidatedOrder
) -> Result[PricedOrder, PricingError]:
...
Advantages:
For workflows with many shared dependencies, use Reader pattern:
@dataclass(frozen=True)
class Dependencies:
"""Shared dependencies for workflows."""
calibration_model: CalibrationModel
quality_threshold: float
inference_algo: InferenceAlgorithm
database: Database
def process_observations(
deps: Dependencies,
raw: RawObservations
) -> Result[ValidatedResults, Error]:
"""Workflow using shared dependencies."""
return (
calibrate(deps.calibration_model, deps.quality_threshold, raw)
.bind(lambda cal: infer(deps.inference_algo, cal))
.bind(lambda inf: save_results(deps.database, inf))
)
# Specialize with partial application
process = partial(process_observations, production_dependencies)
# Or create Reader
from expression import Reader
def process_observations_reader(
raw: RawObservations
) -> Reader[Dependencies, Result[ValidatedResults, Error]]:
return Reader(lambda deps: process_observations(deps, raw))
See also: theoretical-foundations.md#monad-transformers
Beyond simple function parameters, Kleisli arrows (A -> M B) compose effectful computations with embedded dependencies.
The Reader monad is a Kleisli category over the function type.
// Kleisli-style service composition
type ServiceOp<A> = impl Fn(&ServiceContext) -> Result<A, Error>;
fn find_account(id: AccountId) -> ServiceOp<Account> {
move |ctx| ctx.account_repo.find(id)
}
fn validate_balance(account: Account, amount: Amount) -> ServiceOp<Account> {
move |_ctx| {
if account.balance >= amount {
Ok(account)
} else {
Err(Error::InsufficientBalance)
}
}
}
// Compose via Kleisli composition (>=>)
fn withdraw(id: AccountId, amount: Amount) -> ServiceOp<Account> {
find_account(id)
.and_then(|a| validate_balance(a, amount))
.and_then(|a| debit(a, amount))
}
This pattern:
See also: railway-oriented-programming.md#combining-result-with-other-effects for monad transformer patterns.
Commands represent intent to perform an action:
@dataclass(frozen=True)
class ProcessObservationsCommand:
"""Command to process raw observations."""
observations: RawObservations
timestamp: datetime
user_id: UserId
request_id: RequestId # For idempotency, logging
@dataclass(frozen=True)
class TrainModelCommand:
"""Command to train a model."""
training_data: TrainingDataset
architecture: ModelArchitecture
hyperparameters: HyperParameters
timestamp: datetime
user_id: UserId
Characteristics:
Events represent facts about what happened:
@dataclass(frozen=True)
class ObservationsProcessed:
"""Event: observations were successfully processed."""
results: ValidatedResults
processing_time: timedelta
timestamp: datetime
@dataclass(frozen=True)
class ProcessingFailed:
"""Event: processing failed."""
error: ProcessingError
timestamp: datetime
# Union of possible events
ProcessingEvent = ObservationsProcessed | ProcessingFailed
@dataclass(frozen=True)
class ModelTrained:
"""Event: model training completed."""
model_id: ModelId
final_metrics: Metrics
checkpoint_path: Path
timestamp: datetime
Characteristics:
def process_observations_workflow(
deps: Dependencies,
command: ProcessObservationsCommand
) -> AsyncResult[list[ProcessingEvent], WorkflowError]:
"""
Process observations workflow.
Input: ProcessObservationsCommand
Output: List of events (ObservationsProcessed, etc.)
Effects: Async I/O, may fail
"""
...
# At application boundary (HTTP API, message queue):
async def handle_command(command_json: dict) -> Response:
# Deserialize command
command = ProcessObservationsCommand.from_dict(command_json)
# Execute workflow
result = await process_observations_workflow(deps, command)
# Handle result
match result:
case Ok(events):
# Publish events, return success
for event in events:
await event_bus.publish(event)
return Response(status=200, body={"events": events})
case Error(error):
# Log error, return failure
logger.error(f"Workflow failed: {error}")
return Response(status=500, body={"error": str(error)})
See also:
domain-modeling.md#workflows-as-type-safe-pipelinesevent-sourcing.md for full event sourcing patterns where events become the authoritative source of truthDocument all effects in function signatures:
# No effects (pure function)
def calculate_quality(
measurement: Measurement,
baseline: Baseline
) -> QualityScore:
...
# May fail
def validate_measurement(
measurement: Measurement
) -> Result[ValidatedMeasurement, ValidationError]:
...
# Async I/O
async def fetch_calibration_data(
experiment_id: ExperimentId
) -> CalibrationData:
...
# Async + may fail
async def save_results(
database: Database,
results: Results
) -> AsyncResult[SaveConfirmation, DatabaseError]:
...
# Multiple effects combined
async def process_with_retry(
max_retries: int,
operation: Callable[[T], AsyncResult[U, E]],
input: T
) -> AsyncResult[U, E]:
...
# Result + Async
AsyncResult[T, E] = Async[Result[T, E]]
# Option + Async
AsyncOption[T] = Async[Option[T]]
# Result + Option (rare, usually use Result with error for "not found")
ResultOption[T, E] = Result[Option[T], E]
Type aliases document the effects a computation requires. This makes effect requirements explicit in signatures.
// Effect signature documents: this operation needs DB access and may fail
type DbOperation<A> = impl Future<Output = Result<A, DbError>>;
// Effect signature documents: needs config and DB, may fail with app errors
type AppOperation<A> = impl Fn(&Config) -> impl Future<Output = Result<A, AppError>>;
// Pure domain operations have no effect signature
fn calculate_interest(principal: Amount, rate: Rate, days: u32) -> Amount { ... }
Effect signatures serve as documentation:
This connects to the module algebra pattern where signatures declare capabilities.
See also: domain-modeling.md#module-algebra-for-domain-services for signature/algebra/interpreter patterns.
AsyncResult instead of Async[Result[T, E]]AsyncResult over generic transformerExample of effect overload (avoid):
# Too many effects - hard to reason about
ReaderStateAsyncResultOption[Config, State, T, E] = ... ⊘
Better approach:
# Limit to essential effects
WorkflowResult[T] = AsyncResult[T, WorkflowError] ●
# Pass config/state as parameters
def workflow(
config: Config,
initial_state: State,
input: Input
) -> WorkflowResult[Output]:
...
See also:
When integrating code across languages (Python ↔ Rust ↔ TypeScript):
# Python wrapper for Rust function
def calibrate_rust(
params: CalibrationParams,
data: ndarray
) -> Result[CalibratedData, CalibrationError]:
"""
Call Rust calibration function.
Effect transformation:
- Rust: Result<CalibratedData, CalibrationError>
- Python: Result[CalibratedData, CalibrationError]
Serialization:
- params: JSON
- data: NumPy → zero-copy buffer
- result: JSON
"""
try:
# Serialize inputs
params_json = params.to_json()
data_buffer = data.tobytes()
# Call Rust via FFI/subprocess
result_json = _rust_calibrate(params_json, data_buffer)
# Deserialize output
if result_json["ok"]:
return Ok(CalibratedData.from_json(result_json["value"]))
else:
return Error(CalibrationError.from_json(result_json["error"]))
except Exception as e:
# Infrastructure error (FFI failure, serialization error)
return Error(CalibrationError(f"FFI failed: {e}"))
Use schema to generate types in multiple languages:
// calibration.schema.json
{
"CalibrationParams": {
"type": "object",
"properties": {
"baseline": {"type": "number"},
"threshold": {"type": "number"}
},
"required": ["baseline", "threshold"]
},
"CalibrationError": {
"type": "object",
"oneOf": [
{"type": "object", "properties": {"InvalidParams": {"type": "string"}}},
{"type": "object", "properties": {"InsufficientData": {"type": "string"}}}
]
}
}
Generate code:
Architectural decisions should align with the strategic importance of different domains rather than applying uniform approaches across the entire system. The Core/Supporting/Generic domain classification described in strategic-domain-analysis.md directly influences architectural choices about integration patterns, deployment topology, and team boundaries.
Core domains warrant the most sophisticated architectural treatment because they represent competitive advantage. These contexts receive dedicated infrastructure, independent deployment pipelines, and careful isolation from other system components. The hexagonal architecture patterns described earlier in this document apply most rigorously to core domains, with explicit ports and adapters ensuring that domain logic remains insulated from infrastructure concerns.
Supporting sub-domains receive solid but less elaborate architectural treatment. These contexts often share infrastructure with other supporting contexts, deploy together in service groups, and use standardized integration patterns. The workflow pipeline patterns remain appropriate, but the level of isolation and custom infrastructure investment is reduced compared to core domains.
Generic sub-domains receive minimal custom architecture, favoring integration with third-party services or shared platform capabilities. These contexts are often thin adapter layers that translate between internal domain models and external service interfaces. The dependency injection patterns enable swapping providers without modifying domain logic, but the domain logic itself is minimal.
Bounded contexts, detailed in bounded-context-design.md, serve as the fundamental architectural unit in domain-driven systems. Each context represents a deployment unit, an ownership boundary, and an integration interface.
The context relationship patterns (Partnership, Customer-Supplier, Anti-Corruption Layer) directly influence architectural decisions about synchronous versus asynchronous communication, data replication versus service calls, and tight versus loose coupling. Anti-Corruption Layers manifest as adapter modules that translate between external types and internal domain types, implementing the functor mappings that preserve semantic relationships across context boundaries.
When multiple contexts are owned by the same team, they may share deployment infrastructure while maintaining logical separation. When contexts are owned by different teams, deployment independence becomes more valuable, trading some efficiency for reduced coordination overhead.
Architectural patterns should support the communication patterns implied by team structure. Conway's Law ensures that architecture and organization will eventually align, so deliberate design should anticipate this alignment rather than fighting it.
Stream-aligned teams owning core domain contexts need architectural patterns that maximize autonomy and minimize dependencies on other teams. Platform teams providing generic capabilities need architectural patterns that emphasize stability, backward compatibility, and clear versioning. Enabling teams helping multiple teams adopt new patterns need architectural patterns that are portable and technology-agnostic.
The Team Topologies interaction modes (Collaboration, X-as-a-Service, Facilitating) map to context relationship patterns. Collaboration implies Partnership or Shared Kernel relationships with tight coupling. X-as-a-Service implies Open Host Service with stable, versioned interfaces. Facilitating implies enabling teams help stream-aligned teams design their own context architectures.
strategic-domain-analysis.md for detailed Core/Supporting/Generic classification frameworks and investment prioritization.
bounded-context-design.md for context relationship patterns, the Bounded Context Canvas, and the algebraic interpretation of Anti-Corruption Layers as functors between type algebras.
discovery-process.md for how strategic and organizational analysis fits into the broader discovery workflow.
Observability instrumentation follows the same layered discipline as other cross-cutting concerns in hexagonal/onion architecture. Telemetry is a side effect and belongs at the boundaries, not in the domain core.
The domain layer contains pure computation with no telemetry side effects. Domain functions take inputs and return outputs, including Result types for errors. Observability for domain behavior is achieved by the application layer wrapping domain calls in spans and recording the domain's return values as span attributes. The domain itself does not import tracing, logging, or metrics libraries. This preserves the domain's testability and composability, the same properties that motivate keeping I/O at the boundaries.
The application layer handles workflow orchestration with span creation.
Each workflow or use case creates a parent span that encompasses the entire operation.
Correlation IDs (trace_id) flow through the application layer via context propagation.
Business-meaningful events such as command acceptance, aggregate state changes, and notification delivery are recorded as span events or structured log entries at this layer.
Error classification happens here: domain errors become span attributes with appropriate status, while infrastructure errors trigger different observability treatment (see preferences-railway-oriented-programming).
The infrastructure layer performs I/O operations with automatic instrumentation. Database calls, HTTP requests, message queue operations, and cache interactions emit spans via automatic instrumentation through OpenTelemetry wrappers. These spans are children of the application layer's workflow span, creating a hierarchical trace. Health endpoints, metrics exposition, and telemetry export configuration live here.
The presentation layer creates request/response spans at the outer boundary. HTTP middleware creates the root span for each request, capturing method, path, status code, and duration. This is typically auto-instrumented and requires no manual code. Health check endpoints covering liveness, readiness, and startup probes are presentation-layer concerns.
The key principle is that observability instrumentation mirrors the dependency rule. Inner layers are unaware of telemetry; outer layers add observability context progressively. The domain remains a pure function from the perspective of instrumentation.
This layered approach to observability also clarifies where to draw the line on structured logging versus tracing. Structured log entries at the application layer carry the trace_id and span_id from the current context, enabling correlation between logs and traces without the domain layer needing awareness of either mechanism. Infrastructure-layer spans provide timing and error data that would otherwise require manual log instrumentation, reducing the surface area where developers must remember to add observability code.
Cross-reference preferences-observability-engineering for the foundational observability model and preferences-production-readiness for operational readiness requirements.
In the ideal case, all systems—regardless of language—would integrate as a coherent monad transformer stack in the category of functional effects.
Ideal properties:
bind/flatMap operations satisfy monad laws across language boundaries when composing multi-language systemsliftIO, liftState, etc.)IO/Result, TypeScript middle StateT s (EitherT e IO), Python top ReaderT config (StateT s (EitherT e IO)))Result/Either/Option types for error handling that composes vertically through all layers rather than runtime exceptionsPractical path to ideal:
Start with:
Progress toward: 5. Effect systems (Effect-TS, ZIO) where available 6. Monad transformers for 2-3 effects 7. Indexed monads for critical safety properties 8. Full monad transformer stack for complete effect tracking
See also: theoretical-foundations.md#indexed-monad-transformer-stacks-in-practice