Knows how to build automation slices
An AUTOMATION slice contains processors that react to events and trigger commands automatically. This represents event-driven automation logic - the "glue" between different parts of the system.
Automation slices are typically embedded within STATE_CHANGE slices (as processors array), but the automation pattern itself is distinct.
Automation processors appear in the processors array of a slice:
{
"sliceType": "STATE_CHANGE",
"title": "slice: <Name>",
"processors": [
{
"id": "...",
"title": "Admin Assignment",
"type": "AUTOMATION",
"fields": [...],
"aggregate": "default",
"dependencies": [
{
"type": "OUTBOUND",
"title": "Assign Admin to Organization",
"elementType": "COMMAND"
}
]
}
]
}
Source Event → @EventHandler Processor → CommandGateway.send() → Target Command
↓
(optional) QueryGateway.query() → ReadModel (for data lookup)
@EventHandlerCommandGateway to send commands@DisallowReplay to prevent re-triggering on event replayLook for events that trigger the processor. The processor's fields often match event fields, indicating which event triggers it.
Check dependencies:
If not explicit, look at field matching between processor and events in the slice.
Location: src/main/kotlin/de/alex/<module>/<processorname>/internal/<ProcessorName>Processor.kt
Basic Template (Event → Command):
package de.alex.<module>.<processorname>.internal
import de.alex.common.Processor
import de.alex.events.<SourceEvent>Event
import de.alex.<module>.ProcessingGroups
import de.alex.<module>.domain.commands.<commandname>.<TargetCommand>Command
import mu.KotlinLogging
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.config.ProcessingGroup
import org.axonframework.eventhandling.DisallowReplay
import org.axonframework.eventhandling.EventHandler
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
/*
Boardlink: https://miro.com/app/board/uXjVJzSW8-Y=/?moveToWidget=<processor.id>
*/
@ProcessingGroup(ProcessingGroups.<MODULE>)
@Component
class <ProcessorName>Processor : Processor {
var logger = KotlinLogging.logger {}
@Autowired
lateinit var commandGateway: CommandGateway
@DisallowReplay
@EventHandler
fun on(event: <SourceEvent>Event) {
commandGateway.send<<TargetCommand>Command>(
<TargetCommand>Command(
// Map event fields to command fields
// Use processor.fields[].mapping to determine field mappings
aggregateId = event.aggregateId,
<field> = event.<eventField>
)
)
}
}
Template with MetaData (for lawFirmId context):
@DisallowReplay
@EventHandler
fun on(event: <SourceEvent>Event) {
commandGateway.send<<TargetCommand>Command>(
<TargetCommand>Command(
aggregateId = event.aggregateId,
// ... field mappings
),
MetaData.with(LAW_FIRM_ID_HEADER, event.aggregateId)
)
}
Additional imports needed:
import de.alex.auth.LAW_FIRM_ID_HEADER
import org.axonframework.messaging.MetaData
Template with QueryGateway (for read model lookup):
package de.alex.<module>.<processorname>.internal
import de.alex.common.Processor
import de.alex.events.<SourceEvent>Event
import de.alex.<module>.ProcessingGroups
import de.alex.<module>.domain.commands.<commandname>.<TargetCommand>Command
import de.alex.<module>.<readmodel>.<ReadModel>ReadModelQuery
import de.alex.<module>.<readmodel>.<ReadModel>ReadModel
import mu.KotlinLogging
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.queryhandling.QueryGateway
import org.axonframework.config.ProcessingGroup
import org.axonframework.eventhandling.DisallowReplay
import org.axonframework.eventhandling.EventHandler
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
@ProcessingGroup(ProcessingGroups.<MODULE>)
@Component
class <ProcessorName>Processor : Processor {
var logger = KotlinLogging.logger {}
@Autowired
lateinit var commandGateway: CommandGateway
@Autowired
lateinit var queryGateway: QueryGateway
@DisallowReplay
@EventHandler
fun on(event: <SourceEvent>Event) {
// Query for additional data if needed
val readModel = queryGateway.query(
<ReadModel>ReadModelQuery(event.aggregateId),
<ReadModel>ReadModel::class.java
).join() ?: return
commandGateway.send<<TargetCommand>Command>(
<TargetCommand>Command(
aggregateId = event.aggregateId,
<field> = readModel.data.<field>
// Mix event and read model data
)
)
}
}
When processor fields have a mapping attribute:
{
"name": "userId",
"type": "String",
"mapping": "owner"
}
This means:
userIdowner fieldExample:
// Event has: owner: String
// Processor field: userId mapping: owner
// Command expects: userId: String
commandGateway.send(
SomeCommand(
userId = event.owner // Map using the mapping attribute
)
)
If processor has multiple OUTBOUND dependencies to different commands, emit multiple commands:
@DisallowReplay
@EventHandler
fun on(event: <SourceEvent>Event) {
// First command
commandGateway.send<FirstCommand>(
FirstCommand(...)
)
// Second command (may be conditional)
if (event.someCondition) {
commandGateway.send<SecondCommand>(
SecondCommand(...)
)
}
}
CRITICAL: Always use @DisallowReplay on automation event handlers to prevent:
@DisallowReplay
@EventHandler
fun on(event: SomeEvent) {
// This won't execute during replays
}
Place @DisallowReplay either:
Ensure the processor uses the correct processing group for its module:
@ProcessingGroup(ProcessingGroups.<MODULE>)
This determines:
Processors should handle errors gracefully:
@DisallowReplay
@EventHandler
fun on(event: SomeEvent) {
try {
commandGateway.send<SomeCommand>(
SomeCommand(...)
)
} catch (e: Exception) {
logger.error { "Failed to process event ${event.aggregateId}: ${e.message}" }
throw e // Re-throw to trigger DLQ handling
}
}
For non-critical automations, you may catch and log without re-throwing:
@DisallowReplay
@EventHandler
fun on(event: SomeEvent) {
try {
commandGateway.send<SomeCommand>(SomeCommand(...))
} catch (e: Exception) {
logger.warn { "Non-critical automation failed: ${e.message}" }
// Don't re-throw - allow processing to continue
}
}
Input:
{
"title": "Admin Assignment",
"type": "AUTOMATION",
"fields": [
{"name": "aggregateId", "type": "UUID"},
{"name": "name", "type": "String"},
{"name": "userId", "type": "String", "mapping": "owner"},
{"name": "email", "type": "String"}
],
"dependencies": [
{
"type": "OUTBOUND",
"title": "Assign Admin to Organization",
"elementType": "COMMAND"
}
]
}
Source Event: LawFirmCreatedEvent (determined by field matching)
Generated Code:
package de.alex.lawfirm.assignadmin.internal
import de.alex.auth.LAW_FIRM_ID_HEADER
import de.alex.common.Processor
import de.alex.events.LawFirmCreatedEvent
import de.alex.lawfirm.ProcessingGroups
import de.alex.lawfirm.domain.commands.assignadmin.AssignAdminToOrganizationCommand
import mu.KotlinLogging
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.config.ProcessingGroup
import org.axonframework.eventhandling.DisallowReplay
import org.axonframework.eventhandling.EventHandler
import org.axonframework.messaging.MetaData
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
@ProcessingGroup(ProcessingGroups.LAWFIRM)
@Component
class AdminAssignmentProcessor : Processor {
var logger = KotlinLogging.logger {}
@Autowired
lateinit var commandGateway: CommandGateway
@DisallowReplay
@EventHandler
fun on(event: LawFirmCreatedEvent) {
commandGateway.send<AssignAdminToOrganizationCommand>(
AssignAdminToOrganizationCommand(
event.aggregateId, // aggregateId
event.email, // email
event.owner // userId (mapped from owner)
),
MetaData.with(LAW_FIRM_ID_HEADER, event.aggregateId)
)
}
}
For time-based automations (like "Auto Deactivate License"), you may need a scheduler in addition to or instead of event handlers:
@Component
class ScheduledAutomationProcessor : Processor {
@Autowired
lateinit var commandGateway: CommandGateway
@Autowired
lateinit var queryGateway: QueryGateway
@Scheduled(cron = "0 0 * * * *") // Every hour
fun processExpiredItems() {
val expiredItems = queryGateway.query(
ExpiredItemsQuery(),
ExpiredItemsReadModel::class.java
).join()
expiredItems.data.forEach { item ->
commandGateway.send<DeactivateCommand>(
DeactivateCommand(item.aggregateId)
)
}
}
}
Location: src/test/kotlin/de/alex/<module>/<ProcessorName>ProcessorTest.kt
package de.alex.<module>
import de.alex.BaseIntegrationTest
import de.alex.events.<SourceEvent>Event
import de.alex.<module>.domain.commands.<commandname>.<TargetCommand>Command
import io.mockk.mockk
import io.mockk.verify
import org.axonframework.commandhandling.gateway.CommandGateway
import org.junit.jupiter.api.Test
class <ProcessorName>ProcessorTest : BaseIntegrationTest() {
@Test
fun `should send command when event received`() {
val commandGateway = mockk<CommandGateway>(relaxed = true)
val processor = <ProcessorName>Processor().apply {
this.commandGateway = commandGateway
}
val event = <SourceEvent>Event(
aggregateId = UUID.randomUUID(),
// ... other fields
)
processor.on(event)
verify {
commandGateway.send<<TargetCommand>Command>(match {
it.aggregateId == event.aggregateId
// ... verify field mappings
})
}
}
}
Processor triggers a command that emits another event, enabling event chains:
Event A → Processor A → Command B → Event B → Processor B → Command C → ...
Processor queries read model to get additional data before sending command:
Event → Processor → Query ReadModel → Enrich + Send Command
Processor applies business logic to determine if/what command to send:
@EventHandler
fun on(event: SomeEvent) {
when (event.status) {
"APPROVED" -> commandGateway.send(ApproveCommand(...))
"REJECTED" -> commandGateway.send(RejectCommand(...))
else -> logger.info { "No action for status ${event.status}" }
}
}
Processor coordinates between different aggregates:
@EventHandler
fun on(event: OrderCreatedEvent) {
// Update inventory aggregate
commandGateway.send(ReserveInventoryCommand(event.productId, event.quantity))
// Notify customer aggregate
commandGateway.send(NotifyCustomerCommand(event.customerId, event.orderId))
}