Test distributed systems with reproducible deterministic simulation using turmoil
Read this when testing distributed systems that need:
Key insight: Control all non-determinism (time, randomness, network, I/O) with a seed = identical behavior every run.
| Pillar | Problem | Solution |
|---|---|---|
| Execution | Thread scheduling is non-deterministic | Single-threaded async executor |
| Entropy | RNGs use system entropy | Seeded PRNG throughout |
| Time | Real clocks advance unpredictably | Simulated clock, manual advancement |
| I/O | Network has variable latency/loss | In-memory simulated network |
| Library | Purpose | Use When |
|---|---|---|
| turmoil | Network simulation for Tokio | Standard TCP/UDP patterns |
| madsim | Full libc interception | Need to catch all entropy/time sources |
| proptest | Property-based input generation | Generating test scenarios |
[features]
default = []
simulation = ["turmoil"]
[dependencies]
tokio = { version = "1", features = ["full"] }
rand = "0.8"
[dev-dependencies]
turmoil = "0.6"
use turmoil::Builder;
use std::time::Duration;
#[test]
fn test_echo_server() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(60))
.build();
// Server
sim.host("server", || async {
let listener = turmoil::net::TcpListener::bind("0.0.0.0:8080").await?;
let (mut socket, _) = listener.accept().await?;
let mut buf = [0u8; 1024];
loop {
let n = socket.read(&mut buf).await?;
if n == 0 { break; }
socket.write_all(&buf[..n]).await?;
}
Ok(())
});
// Client
sim.client("client", async {
let mut socket = turmoil::net::TcpStream::connect("server:8080").await?;
socket.write_all(b"hello").await?;
let mut buf = [0u8; 1024];
let n = socket.read(&mut buf).await?;
assert_eq!(&buf[..n], b"hello");
Ok(())
});
sim.run().unwrap();
}
fn run_with_seed<F>(test_fn: F)
where
F: FnOnce(u64) + std::panic::UnwindSafe
{
let seed: u64 = std::env::var("TEST_SEED")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| rand::random());
println!("TEST_SEED={}", seed);
let result = std::panic::catch_unwind(|| test_fn(seed));
if result.is_err() {
eprintln!("Reproduce with: TEST_SEED={} cargo test", seed);
std::panic::resume_unwind(result.unwrap_err());
}
}
use std::collections::HashMap;
use std::hash::{BuildHasher, Hasher};
#[derive(Clone, Default)]
pub struct DeterministicHasher {
state: u64,
}
impl Hasher for DeterministicHasher {
fn finish(&self) -> u64 {
self.state
}
fn write(&mut self, bytes: &[u8]) {
const FNV_PRIME: u64 = 1099511628211;
for &byte in bytes {
self.state ^= byte as u64;
self.state = self.state.wrapping_mul(FNV_PRIME);
}
}
}
#[derive(Clone, Default)]
pub struct DeterministicBuildHasher;
impl BuildHasher for DeterministicBuildHasher {
type Hasher = DeterministicHasher;
fn build_hasher(&self) -> Self::Hasher {
DeterministicHasher { state: 14695981039346656037 }
}
}
type DetHashMap<K, V> = HashMap<K, V, DeterministicBuildHasher>;
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_timeout() {
let start = tokio::time::Instant::now();
// This doesn't actually wait - time is simulated
tokio::time::sleep(Duration::from_secs(3600)).await;
// But an hour has "passed"
assert_eq!(start.elapsed(), Duration::from_secs(3600));
}
#[cfg(not(feature = "simulation"))]
pub use tokio::net::{TcpListener, TcpStream};
#[cfg(feature = "simulation")]
pub use turmoil::net::{TcpListener, TcpStream};
let mut sim = Builder::new()
.min_message_latency(Duration::from_millis(100))
.max_message_latency(Duration::from_millis(500))
.build();
// Network partition
sim.partition("node-a", "node-b");
// Run during partition
sim.run_until(Duration::from_secs(30));
// Heal partition
sim.repair("node-a", "node-b");
sim.run().unwrap();
// BAD ❌ - Bypasses simulated time