Change Data Capture - architecture, entrypoints, bytecode emission, sync engine integration, tests
CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a
dedicated CDC table (turso_cdc by default). It is per-connection, enabled via PRAGMA, and
operates at the bytecode generation (translate) layer. The sync engine consumes CDC records
to push local changes to the remote.
User SQL (INSERT/UPDATE/DELETE/DDL)
|
v
┌─────────────────────────────────────────────────┐
│ Translate layer (core/translate/) │
│ ┌───────────────────────────────────────────┐ │
│ │ prepare_cdc_if_necessary() │ │
│ │ - checks CaptureDataChangesInfo │ │
│ │ - opens CDC table cursor (OpenWrite) │ │
│ │ - skips if target == CDC table itself │ │
│ └───────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────┐ │
│ │ emit_cdc_insns() │ │
│ │ - writes (change_id, change_time, │ │
│ │ change_type, table_name, id, │ │
│ │ before, after, updates) into CDC tbl │ │
│ └───────────────────────────────────────────┘ │
│ + emit_cdc_full_record() / emit_cdc_patch_record() │
└─────────────────────────────────────────────────┘
|
v
CDC table (turso_cdc or custom name)
|
v
┌─────────────────────────────────────────────────┐
│ Sync engine (sync/engine/) │
│ DatabaseTape reads CDC table → DatabaseChange │
│ → apply/revert → push to remote │
└─────────────────────────────────────────────────┘
CaptureDataChangesMode + CaptureDataChangesInfo — core/lib.rsCDC behavior is controlled by two types:
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
V1 = 1,
V2 = 2,
}
const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;
enum CaptureDataChangesMode {
Id, // capture only rowid
Before, // capture before-image
After, // capture after-image
Full, // before + after + updates
}
struct CaptureDataChangesInfo {
mode: CaptureDataChangesMode,
table: String, // CDC table name
version: Option<CdcVersion>, // schema version (V1 or V2)
}
The connection stores Option<CaptureDataChangesInfo> — None means CDC is off.
Key methods on CdcVersion:
has_commit_record() — self >= V2, gates COMMIT record emissionDisplay/FromStr — round-trips "v1" ↔ V1, "v2" ↔ V2Key methods on CaptureDataChangesInfo:
parse(value: &str, version: Option<CdcVersion>) — parses PRAGMA argument "<mode>[,<table_name>]", returns None for "off"cdc_version() — returns CdcVersion (panics if version is None). Single accessor replacing old is_v1()/is_v2()/version() methods.has_before() / has_after() / has_updates() — mode capability checksmode_name() — returns mode as stringConvenience trait CaptureDataChangesExt on Option<CaptureDataChangesInfo> provides:
has_before() / has_after() / has_updates() — delegates to inner, returns false for Nonetable() — returns Option<&str>, None when CDC is offDefault table name: turso_cdc (constant TURSO_CDC_DEFAULT_TABLE_NAME)
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB -- binary record of per-column changes
);
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB, -- binary record of per-column changes
change_txn_id INTEGER -- transaction ID (groups rows into transactions)
);
v2 adds:
change_txn_id column — groups CDC rows by transaction. Assigned via conn_txn_id(candidate) opcode which get-or-sets a per-connection transaction ID.change_type=2 (COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicit COMMIT.The CDC table is created at runtime by the InitCdcVersion opcode via CREATE TABLE IF NOT EXISTS.
When CDC is first enabled, a version tracking table is created:
CREATE TABLE turso_cdc_version (
table_name TEXT PRIMARY KEY,
version TEXT NOT NULL
);
Current version: CDC_VERSION_CURRENT = CdcVersion::V2 (defined in core/lib.rs, re-exported from core/translate/pragma.rs)
The InitCdcVersion opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it:
DatabaseChange — sync/engine/src/types.rs:229-249Sync engine's Rust representation of a CDC row. Has into_apply() and into_revert() methods
for forward/backward replay.
OperationMode — core/translate/emitter.rsUsed by emit_cdc_insns() to determine change_type value:
INSERT → 1UPDATE / SELECT → 0DELETE → -1COMMIT → 2 (v2 only, emitted by emit_cdc_commit_insns)Set: core/translate/pragma.rs
CaptureDataChangesInfo::parse() with CDC_VERSION_CURRENTInitCdcVersion opcode — all CDC setup (table creation, version tracking, state change) happens at execution timeGet (read current mode): core/translate/pragma.rs
mode, table, version("off", NULL, NULL)(mode_name, table, version)Pragma registration: core/pragma.rs — CaptureDataChangesConn (and deprecated alias UnstableCaptureDataChangesConn) with columns ["mode", "table", "version"]
Field: core/connection.rs — capture_data_changes: RwLock<Option<CaptureDataChangesInfo>>
Getter: get_capture_data_changes_info() — returns read guard
Setter: set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)
Default: initialized as None (CDC off)
Field: core/vdbe/builder.rs — capture_data_changes_info: Option<CaptureDataChangesInfo>
Accessor: capture_data_changes_info() — returns &Option<CaptureDataChangesInfo>
Passed from: core/translate/mod.rs — read from connection when creating builder
Field: core/vdbe/mod.rs — capture_data_changes: Option<CaptureDataChangesInfo>
Set from: PrepareContext::from_connection() — clones from connection.get_capture_data_changes_info()
core/vdbe/execute.rsAlways emitted by PRAGMA SET. Handles all CDC setup at execution time:
None in state.pending_cdc_info, returns earlyCREATE TABLE IF NOT EXISTS <cdc_table_name> ...) — v2 schema with change_txn_id columnCREATE TABLE IF NOT EXISTS turso_cdc_version ...)INSERT OR IGNORE to preserve existing version rows.CaptureDataChangesInfo in state.pending_cdc_infoThe connection's CDC state is not applied in the opcode. Instead, pending_cdc_info is applied in halt() only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged.
All table creation is done via nested conn.prepare()/run_ignore_rows() calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema.
These are the core CDC code generation functions:
| Function | Purpose |
|---|---|
prepare_cdc_if_necessary() | Opens CDC table cursor if CDC is active and target != CDC table |
emit_cdc_full_record() | Reads all columns from cursor into a MakeRecord (for before/after image) |
emit_cdc_patch_record() | Builds record from in-flight register values (for after-image of INSERT/UPDATE) |
emit_cdc_insns() | Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. |
emit_cdc_commit_insns() | Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. |
emit_cdc_autocommit_commit() | End-of-statement COMMIT emission. Checks is_autocommit() at runtime — only emits COMMIT if in autocommit mode. v2 only. |
Per-row call sites use emit_cdc_insns() (no COMMIT). End-of-statement sites call emit_cdc_autocommit_commit() which checks is_autocommit() at runtime:
BEGIN...COMMIT): skips per-statement COMMIT; the explicit COMMIT statement emits the COMMIT record via emit_cdc_commit_insns()This ensures multi-row statements like INSERT INTO t VALUES (1),(2),(3) produce one COMMIT at the end, not one per row.
core/translate/insert.rsemit_cdc_insns() after insert, and before delete for REPLACE/conflictemit_cdc_autocommit_commit() in emit_epilogue() after the insert loopcore/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit() after the update loopcore/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit() after the delete loopcore/translate/upsert.rsemit_cdc_insns() for all three cases: pure insert, update after conflict, replacecore/translate/schema.rsemit_cdc_insns() (insert into sqlite_schema) + emit_cdc_autocommit_commit()emit_cdc_insns() per-row in metadata loop + emit_cdc_autocommit_commit() after loopemit_cdc_insns() + emit_cdc_autocommit_commit() (core/translate/schema.rs)emit_cdc_insns() per-row + emit_cdc_autocommit_commit() after loop (core/translate/index.rs)DDL in explicit transactions (BEGIN; CREATE TABLE t(x); COMMIT) does NOT emit per-statement COMMIT — the autocommit check prevents it.
core/translate/update.rscdc_update_alter_statement on the update plan when CDC has updates modecore/translate/view.rs — passes None for CDC cursorcore/translate/trigger.rs — passes None for CDC cursorcore/translate/subquery.rs — cdc_cursor_id: Nonetable_columns_json_array(table_name) — core/function.rs, core/vdbe/execute.rsReturns JSON array of column names for a table. Used to interpret binary records.
bin_record_json_object(columns_json, blob) — core/function.rs, core/vdbe/execute.rsDecodes a binary record (from before/after/updates columns) into a JSON object using column names.
The sync engine is the primary consumer of CDC data.
sync/engine/src/database_tape.rsDEFAULT_CDC_TABLE_NAME = "turso_cdc", DEFAULT_CDC_MODE = "full"CDC_PRAGMA_NAME = "capture_data_changes_conn"connect() sets CDC pragma and caches cdc_version from turso_cdc_version table. Must be called before iterate_changes().cdc_version: RwLock<Option<CdcVersion>> — set by connect(), read by iterate_changes(). Panics if not set.DatabaseChangesIterator reads CDC table in batches, emits DatabaseTapeOperation. For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch. ignore_schema_changes: true (default) filters out sqlite_schema row changes but not COMMIT records.sync/engine/src/database_sync_operations.rsSELECT COUNT(*) FROM turso_cdc WHERE change_id > ?sync/engine/src/database_sync_engine.rsopen_db() calls main_tape.connect(coro) to ensure CDC is set up and version is cached before any iterate_changes() calls.apply_changes, checks if CDC table existed, re-creates it after syncsync/engine/src/database_replay_generator.rsupdates column to be populated (full mode)All bindings expose cdc_operations as part of sync stats:
| Binding | File |
|---|---|
| Python | bindings/python/src/turso_sync.rs |
| JavaScript | bindings/javascript/sync/src/lib.rs |
| JS (generator) | bindings/javascript/sync/src/generator.rs |
| Go | bindings/go/bindings_sync.go |
| React Native | bindings/react-native/src/types.ts |
| SDK Kit (C header) | sync/sdk-kit/turso_sync.h |
| SDK Kit (Rust) | sync/sdk-kit/src/bindings.rs |
tests/integration/functions/test_cdc.rs — covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered in tests/integration/functions/mod.rs.sync/engine/src/database_tape.rs — CDC table reads, tape iteration, replay of schema changes.bindings/javascript/sync/packages/{wasm,native}/promise.test.tsRun: cargo test -- test_cdc (integration) or cargo test -p turso_sync_engine -- database_tape (sync engine).
cli/manuals/cdc.md — accessible via .manual cdc in the REPLdocs/manual.md — CDC section linked in TOCturso_cdc_version table are never captured (checked in prepare_cdc_if_necessary).sqlite_schema table.turso_cdc_version table and carried in CaptureDataChangesInfo.version for future schema evolution.pending_cdc_info in ProgramState and applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged.emit_cdc_autocommit_commit() which checks is_autocommit() at runtime. In explicit transactions, only the final COMMIT emits a COMMIT CDC record.turso_cdc_version) are detected by checking table existence before creation. Existing tables get CdcVersion::V1 inserted into the version table.CdcVersion enum with #[repr(u8)] and Ord/PartialOrd enables feature gating via integer comparison (has_commit_record() = self >= V2). Display/FromStr handles database round-trip.