Advanced Kotlin coroutines patterns for AmethystMultiplatform. Use when working with: (1) Structured concurrency (supervisorScope, coroutineScope), (2) Advanced Flow operators (flatMapLatest, combine, merge, shareIn, stateIn), (3) Channels and callbackFlow, (4) Dispatcher management and context switching, (5) Exception handling (CoroutineExceptionHandler, SupervisorJob), (6) Testing async code (runTest, Turbine), (7) Nostr relay connection pools and subscriptions, (8) Backpressure handling in event streams. Delegates to kotlin-expert for basic StateFlow/SharedFlow patterns. Complements nostr-expert for relay communication.
Expert guidance for complex async operations in Amethyst: relay pools, event streams, structured concurrency, and testing.
Async Architecture in Amethyst:
Relay Pool (supervisorScope)
├── Relay 1 (launch) → callbackFlow → Events
├── Relay 2 (launch) → callbackFlow → Events
└── Relay 3 (launch) → callbackFlow → Events
↓
merge() → distinctBy(id) → shareIn
↓
Multiple Collectors (ViewModels, Services)
Key principles:
Use for advanced async patterns:
Delegate to kotlin-expert for:
// Real pattern from NostrClientStaticReqAsStateFlow.kt
fun INostrClient.reqAsFlow(
relay: NormalizedRelayUrl,
filters: List<Filter>,
): Flow<List<Event>> = callbackFlow {
val subId = RandomInstance.randomChars(10)
var hasBeenLive = false
val eventIds = mutableSetOf<HexKey>()
var currentEvents = listOf<Event>()
val listener = object : IRequestListener {
override fun onEvent(event: Event, ...) {
if (event.id !in eventIds) {
currentEvents = if (hasBeenLive) {
// After EOSE: prepend
listOf(event) + currentEvents
} else {
// Before EOSE: append
currentEvents + event
}
eventIds.add(event.id)
trySend(currentEvents)
}
}
override fun onEose(...) {
hasBeenLive = true
}
}
openReqSubscription(subId, mapOf(relay to filters), listener)
awaitClose { close(subId) }
}
Key techniques:
suspend fun connectToRelays(relays: List<Relay>) = supervisorScope {
relays.forEach { relay ->
launch {
try {
relay.connect()
relay.subscribe(filters).collect { event ->
eventChannel.send(event)
}
} catch (e: IOException) {
Log.e("Relay", "Connection failed: ${relay.url}", e)
// Other relays continue
}
}
}
}
Why supervisorScope:
// Real pattern from PushNotificationReceiverService.kt
class MyService : Service() {
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
Log.e("Service", "Caught: ${throwable.message}", throwable)
}
private val scope = CoroutineScope(
Dispatchers.IO + SupervisorJob() + exceptionHandler
)
override fun onDestroy() {
scope.cancel()
super.onDestroy()
}
}
Pattern benefits:
// Real pattern from ConnectivityFlow.kt
val status = callbackFlow {
val networkCallback = object : NetworkCallback() {
override fun onAvailable(network: Network) {
trySend(ConnectivityStatus.Active(...))
}
override fun onLost(network: Network) {
trySend(ConnectivityStatus.Off)
}
}
connectivityManager.registerCallback(networkCallback)
// Initial state
activeNetwork?.let { trySend(ConnectivityStatus.Active(...)) }
awaitClose {
connectivityManager.unregisterCallback(networkCallback)
}
}
.distinctUntilChanged()
.debounce(200) // Stabilize flapping
.flowOn(Dispatchers.IO)
Key patterns:
fun observeFromRelays(
relays: List<NormalizedRelayUrl>,
filters: List<Filter>
): Flow<Event> =
relays.map { relay ->
client.reqAsFlow(relay, filters)
.flatMapConcat { it.asFlow() }
}.merge()
.distinctBy { it.id }
Flow:
Flow<List<Event>>Flow<Event>For comprehensive coverage of Flow operators:
| Operator | Use Case | Example |
|---|---|---|
| flatMapLatest | Cancel previous, switch to new | Search (cancel old query) |
| combine | Latest from ALL flows | combine(account, settings, connectivity) |
| merge | Single stream from multiple | merge(relay1, relay2, relay3) |
| shareIn | Multiple collectors, single upstream | Share expensive computation |
| stateIn | StateFlow from Flow | ViewModel state |
| buffer(DROP_OLDEST) | High-frequency streams | Real-time event feed |
| conflate | Latest only | UI updates |
| debounce | Wait for quiet period | Search input |
For complete relay-specific patterns: → See relay-patterns.md
Covers:
For comprehensive testing patterns: → See testing-coroutines.md
Quick testing pattern:
@Test
fun `relay subscription receives events`() = runTest {
val client = FakeNostrClient()
client.reqAsFlow(relay, filters).test {
assertEquals(emptyList(), awaitItem())
client.sendEvent(event1)
assertEquals(listOf(event1), awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
Testing tools:
runTest - Virtual time, auto cleanup.test {} - Flow assertionsadvanceTimeBy() - Control timeSteps:
Example: Add subscription for specific event kind
fun observeKind(kind: Int): Flow<Event> = callbackFlow {
val listener = object : IRequestListener {
override fun onEvent(event: Event, ...) {
if (event.kind == kind) {
trySend(event)
}
}
}
client.subscribe(listener)
awaitClose { client.unsubscribe(listener) }
}
Steps:
Example: Reconnect relays on connectivity
connectivityFlow
.flatMapLatest { status ->
when (status) {
Active -> relayPool.observeEvents()
else -> emptyFlow()
}
}
.catch { e -> Log.e("Error", e) }
.collect { event -> handleEvent(event) }
Steps:
Example: Share relay subscription
val events: SharedFlow<Event> = client
.reqAsFlow(relay, filters)
.flatMapConcat { it.asFlow() }
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
replay = 0
)
❌ Using GlobalScope
GlobalScope.launch { /* Leaks, no structured concurrency */ }
✅ Use scoped coroutines
viewModelScope.launch { /* Cancelled with ViewModel */ }
❌ Forgetting awaitClose
callbackFlow {
registerCallback()
// Missing cleanup!
}
✅ Always cleanup
callbackFlow {
registerCallback()
awaitClose { unregisterCallback() }
}
❌ Blocking in Flow
flow.map { Thread.sleep(1000); process(it) }
✅ Suspend, don't block
flow.map { delay(1000); process(it) }.flowOn(Dispatchers.Default)
❌ Ignoring backpressure
fastProducer.collect { slowConsumer(it) } // Blocks producer!
✅ Handle backpressure
fastProducer
.buffer(64, BufferOverflow.DROP_OLDEST)
.collect { slowConsumer(it) }
Use kotlin-expert for:
Use nostr-expert for:
This skill provides:
Need async operation?
├─ Simple ViewModel state update → kotlin-expert (StateFlow)
├─ Android callback → This skill (callbackFlow)
├─ Multiple concurrent operations → This skill (supervisorScope)
├─ Complex Flow transformation → This skill (references/advanced-flow-operators.md)
├─ Relay subscription → This skill (references/relay-patterns.md)
└─ Testing async code → This skill (references/testing-coroutines.md)