Async Rust patterns: Future/Poll contract, Pin/Unpin, tokio tasks, select!/join!, cancellation safety, structured concurrency, async traits, streams. Load for async fn, .await, tokio::spawn, futures, channels.
Async runtime internals, concurrency composition, and task management patterns for Rust.
| Library | ctx7 ID | Use For |
|---|---|---|
| tokio | /websites/rs_tokio | Async runtime, tasks, channels |
| futures | /websites/rs_futures | Future combinators, streams |
| tokio-util | /websites/rs_tokio-util | Codec, framing, compat layers |
| pin-project | /taiki-e/pin-project | Safe pin projections |
Future::poll contract, Pin/Unpin semantics, and async state machine internals.
The Future::poll contract has three invariants:
Return Pending only after registering a waker. If you return Poll::Pending without calling cx.waker().wake_by_ref() or storing the waker for later notification, the executor has no way to know when to re-poll. The future is never woken and appears hung.
Never return Pending without waker registration. This causes a busy-loop: the executor keeps polling, gets Pending, and has no wake signal, so it either spins or gives up.
Never poll after Ready. Once a future returns Poll::Ready(value), polling again is a logic error. The future may panic or return garbage. Use FusedFuture or .fuse() if you need safe repeated polling.
Incorrect (Pending without waker registration):
impl Future for MyFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.ready {
Poll::Ready(42)
} else {
// BUG: no waker registered — this future will never wake
Poll::Pending
}
}
}
Correct (waker stored for later notification):
impl Future for MyFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.ready {
Poll::Ready(42)
} else {
// Store waker so the background task can call wake() when ready
self.get_mut().waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
Every async fn compiles to an anonymous enum state machine. Each .await is a yield point that creates a new variant. All variables captured across .await points are stored in the future struct.
[u8; 65536] across an .await is 64KB+ per instance.std::mem::size_of_val(&future) to profile future sizes.Box::pin(large_async_fn())..await — drop or move them before yielding.Incorrect (large buffer captured across .await):
async fn process() {
let buffer = [0u8; 65536]; // 64KB lives in the future struct
let result = network_call().await; // buffer held across .await
use_buffer(&buffer, result);
}
// Spawning 1000 tasks = 64MB just for buffers
for _ in 0..1000 {
tokio::spawn(process());
}
Correct (scope buffer before .await or box the future):
async fn process() {
let result = network_call().await;
let buffer = [0u8; 65536]; // Allocated AFTER .await, not captured across yield
use_buffer(&buffer, result);
}
// Or box the future to move it to heap with a single pointer on the stack
async fn process_boxed() {
let buffer = [0u8; 65536];
let fut = Box::pin(async move {
let result = network_call().await;
use_buffer(&buffer, result);
});
fut.await;
}
Tokio runtime configuration, spawn_blocking, and task lifecycle management.
Dropping a JoinHandle detaches the task — it does NOT cancel it. The task continues running in the background, potentially leaking resources, holding connections, or writing to closed channels.
Cancellation strategies:
AbortHandle::abort() — forcefully cancels the task at the next .await point. The JoinHandle returns JoinError::is_cancelled.CancellationToken — cooperative cancellation. The task checks token.is_cancelled() or token.cancelled().await at strategic points.select! — cancels by dropping at the .await yield point. Only safe if the future is cancellation-safe.Incorrect (assuming drop cancels):
async fn leak_example() {
let handle = tokio::spawn(async {
// This task runs FOREVER even after handle is dropped
loop {
do_work().await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
drop(handle); // BUG: task is detached, not cancelled — runs until process exits
}
Correct (cooperative cancellation with CancellationToken):
use tokio_util::sync::CancellationToken;
async fn managed_example() {
let token = CancellationToken::new();
let task_token = token.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = task_token.cancelled() => {
// Clean up resources
break;
}
_ = do_work() => {}
}
}
});
// Later: signal cancellation
token.cancel();
handle.await.unwrap(); // Task exits cleanly
}
Composing futures with select!, join!, try_join!, and collection types.
tokio::select! polls multiple futures and completes when the FIRST one resolves. All other branches are dropped immediately, cancelling them mid-execution.
Key rules:
.await point. Any partial progress is lost unless the future is cancellation-safe.biased for deterministic priority: without it, branches are polled in random order. With biased, the first matching branch always wins..fuse() on completed futures to prevent re-polling a finished future (returns Pending forever after Ready).Incorrect (future recreated each loop iteration):
loop {
tokio::select! {
// BUG: timeout is recreated every iteration — never fires
_ = tokio::time::sleep(Duration::from_secs(30)) => {
println!("timeout");
break;
}
msg = rx.recv() => {
process(msg);
}
}
}
Correct (future pinned outside loop):
let timeout = tokio::time::sleep(Duration::from_secs(30));