描述 ruft 这个全类型安全 Raft 学习库的整体架构、核心类型系统、设计约束与关键模块, 供 AI agent 在修改或扩展代码前快速建立上下文。适用于所有对 ruft 代码的修改、分析、 测试和文档任务。
reference/raft-etcd/(Go 实现)。本库所有原语类型均使用 Newtype 模式包装,禁止混用裸整数:
| Newtype | 内部类型 | 含义 |
|---|---|---|
NodeId (types.rs) | u64 | 节点唯一 ID,Default 为 0 表示本地/无效 |
Index (log/entry.rs) | u64 | 日志索引,提供 .next() |
Term (log/entry.rs) | u64 | Raft 任期,提供 |
.next()Tick (types.rs) | u64 | 逻辑时钟单位,提供 .forward() / .reset() |
规则:函数签名中绝不直接使用
u64代替上述类型,始终使用对应 Newtype。
src/
├── lib.rs — 模块声明,不对外 pub re-export
├── config.rs — 节点启动配置(id, heartbeat_tick, election_tick)
├── types.rs — NodeId、Tick 的 Newtype 定义
├── state.rs — HardState(必须持久化:term, voted_node, committed_index)
├── message.rs — Message 及其 Variant、MessageStaleness 判断
├── node.rs — Node:库对外唯一入口(propose/step/tick/ready/advance)
├── order.rs — 空文件,待实现(可能用于命令/事件排序)
├── context.rs — Context<S>:Raft 状态机核心,注入式纯函数架构
├── context/
│ ├── role.rs — Role 枚举:Leader / Candidate / Follower
│ ├── outbox.rs — Outbox:Context 内部收集待发消息的容器
│ └── inherent.rs — (Context 的 impl 拆分文件,待查看)
├── log.rs — LogView<S>:日志读取视图(持久层 + 内存缓冲层)
├── log/
│ ├── entry.rs — Entry、EntryKey、EntryVariant、Index、Term
│ ├── log_buffer.rs — LogBuffer:未持久化日志的内存缓冲,含冲突处理逻辑
│ └── log_storage.rs — LogStorage trait:异步只读接口,由外部注入
├── progress.rs — Tracker(Leader 视角的复制进度)+ Progress(match/next)
├── progress/
│ └── config.rs — Config:管理 voters 集合
└── utils/
└── trace.rs — Trace trait:tracing 工具扩展
┌─────────────────────────────┐
│ Node(公共 API) │
│ propose / step / tick │
│ ready / advance │
└──────────┬──────────────────┘
│
┌──────────▼──────────────────┐
│ Context<S: LogStorage> │
│ │
│ id, term, voted │
│ role: Role │
│ view: LogView<S> │
│ tracker: Tracker │
│ hard_state: HardState │
│ election/heartbeat timers │
│ outbox: Outbox │
└──────────┬──────────────────┘
│ 读取(async)
┌──────────▼──────────────────┐
│ LogView<S> │
│ ├── LogBuffer(内存) │
│ └── S: LogStorage(磁盘) │
└─────────────────────────────┘
设计要点:
Context 持有完整 Raft 状态,是算法的"副作用沙箱"。tick() / step(msg) 是驱动 Context 的两个唯一入口。Outbox::prepare() 收集,外部统一取走发送。Context 内发生;HardState 的变化通过 prev_hard_state 与当前对比来检测是否需要落盘。LogView<S>
├── buffer: LogBuffer ← 未持久化,优先读取
└── storage: S ← 已持久化,LogStorage trait
(只读,写入由上层决定)
Index 语义:
committed_index ≤ applying_index ≤ applied_index ≤ last_index
LogBuffer::append() 处理三种追加情况:
head == offset + len,直接 extend。head ≤ offset,替换整个 buffer。offset < head < offset + buffer.len(),截断后 append。Message {
src: NodeId, // 0 = 本地消息
src_term: Term, // 0 = 本地消息(不参与 staleness 判断)
dst: Option<NodeId>, // None = 广播
variant: MessageVariant,
}
Message::staleness(current_term) 返回:
Local — src_term == Term::default()Stale — 消息任期 < 当前任期Consistent — 相等Fresh — 消息任期 > 当前任期(触发 term 更新)loop {
node.tick(); // 推进逻辑时钟
node.step(incoming_msg); // 注入网络消息
node.propose(kv_write_data); // 注入客户端写请求
if node.has_ready() {
let rd = node.ready();
// 1. 持久化 HardState + Entries(必须先于发消息)
// 2. 发送 rd.messages 到网络
// 3. 应用 rd.committed_entries 到 KV 状态机
node.advance();
}
}
message.rs 的 MessageVariant enum 内新增变体。Message impl 中添加构造函数 new_xxx()。context.rs 的 step() match 分支处理新消息。cargo clippy 确保 exhaustive 处理。tick() → 分角色分发:Leader 调 tick_heartbeat(),Follower/Candidate 调 tick_election()。step() → 先调 staleness() 判断消息是否过期,再按 MessageVariant 分发处理。self.outbox.prepare(msg) 缓存,不要直接发送。#[derive(Debug)]
struct MemStorage { entries: Vec<Entry> }
#[async_trait]
impl LogStorage for MemStorage {
async fn load_entries(&self, head: u64, tail: u64) -> StorageResult<Vec<Entry>> { ... }
async fn term_of(&self, index: u64) -> StorageResult<Term> { ... }
async fn last_index(&self) -> StorageResult<Index> { ... }
}
cargo build # 编译
cargo test # 运行所有单元测试
cargo clippy # Lint 检查(项目要求 clippy 通过)
测试风格:小规模、确定性、与实现文件放在同一 mod tests {} 块中。
| 文件 | 内容 |
|---|---|
docs/001-structure.md | 架构设想:Handler、Engine 分离模式 |
AGENT.md | 项目定位和约束说明 |
reference/raft-etcd/ | etcd 原版 Go 实现参考 |
reference/raft-etcd/docs/raft-kv-implementation-guide.md | KV 实现指引 |