Crdt RFC 001 crdt runtime
RFC-001 — CRDT Runtime
| Field | Value |
|---|---|
| Author | Koder Engineering |
| Date | 2026 |
| Status | Draft |
| Module | platform/crdt |
Table of Contents
- Summary
- Motivation
- Background — CRDTs Primer
- Types Implemented
- 4.1 G-Counter
- 4.2 LWW-Register
- 4.3 OR-Set
- 4.4 Vector Clock
- Rust API
- 5.1 Trait CrdtValue
- 5.2 Trait CrdtStore
- 5.3 Type Signatures
- Serialization — Protobuf Schemas
- Integration Points
- 7.1 Koder Hub
- 7.2 Koder Talk
- 7.3 Kmail
- Correctness Guarantees
- Property-Based Tests
- Performance Considerations
- Alternatives Considered
- Roadmap
- Open Questions
1. Summary
This RFC proposes a purposebuilt *RDT (Conflictfree Replicated Data Type) runtime*for the Koder platform. The library, housed at platform/crdt, provides four fundamental CRDT primitives — GCounter, LWWRegister, ORSet, and Vector Clock — with a unified Rust API, Protobuf serialization, and firstclass integration hooks for Koder Hub, Talk, and Kmail.
The runtime guarantees that any two replicas of any CRDT value, after exchanging their states, converge to the same result regardless of message ordering, duplication, or reordering. This makes it possible to operate across multi-cluster deployments without a coordination protocol (no distributed locks, no Paxos/Raft for every write).
2. Motivation
2.1 Current State
Koder Hub, Talk, and Kmail are each deployed across multiple geographic clusters (Europe, Americas, Southeast Asia). Within a cluster, consistency is provided by kdbnext (TiKV substrate, linearizable perkey). Across clusters, replication is asynchronous.
This means that concurrent writes to the same logical entity from different clusters — a package download counter incremented in Frankfurt and São Paulo simultaneously, a channel membership list modified concurrently in Singapore and London — result in *onflicts*that the current replication layer resolves with "last writer wins by wall clock", a strategy that is:
- *on-deterministic*across nodes with clock skew.
- *ata
losing*for setlike structures (a concurrent add and remove can lose the add). - *nvisible*to operators — conflicts are silently resolved, never surfaced.
2.2 Concrete Failure Modes
| Scenario | Component | Problem |
|---|---|---|
| Package download counter | Store | Two clusters increment independently; only one increment is kept on sync |
| App rating (float average) | Store | Concurrent rating updates from two regions → one is silently discarded |
| Channel member list | Talk | User A adds member M in cluster 1; user B removes M in cluster 2 concurrently; outcome is non-deterministic |
| Message delivery flag | Talk | "Delivered" and "Read" flags set from two different nodes → inconsistent state |
| Email read state | Kmail | Opened on mobile (cluster A) and web (cluster B) near-simultaneously → read state flickers |
2.3 Why Not OT (Operational Transformation)?
Operational Transformation requires a central coordinator to sequence operations. This contradicts the goal of clusterpeerequal architecture where no cluster is the designated primary.
2.4 Why Not a Database with Serializable Isolation?
Crosscluster serializable isolation requires a distributed consensus round on every write (median latency: 200400ms for intercontinental quorum). At the scale of Store package installs (millionsday) and Talk messages (billionsday), this is economically and latencywise untenable.
2.5 Solution
CRDTs provide *onvergence without coordination* Each write is local, fast, and always succeeds. Merging happens lazily at sync time and is guaranteed to produce the same result regardless of the order in which partial states arrive.
3. Background — CRDTs Primer
A *tate-based CRDT*(or CvRDT) is a data type equipped with:
- A *artial order*(join-semilattice) on its state space.
- A *erge*(join) operation that computes the least upper bound of two states.
- A *ottom*element (the initial empty state).
Formally, for any states a, b, c:
merge(a, a) = a # idempotency
merge(a, b) = merge(b, a) # commutativity
merge(a, merge(b, c)) = merge(merge(a, b), c) # associativityThese three properties ensure that any replica receiving any subset of updates in any order will eventually converge to the same state as any other replica that received all updates.
4. Types Implemented
4.1 G-Counter
*urpose:*A counter that only grows (no decrements).
*tate:*A map from node ID to u64 — each node maintains its own local count.
GCounter = Map<NodeId, u64>
increment(node_id) → GCounter:
self[node_id] += 1
value() → u64:
sum(self.values())
merge(a, b) → GCounter:
{ node_id: max(a[node_id], b[node_id]) for all node_ids in a ∪ b }*se cases in Koder:*
- Package download counts in Store (each cluster node increments its local slice).
- Install counts per package version.
- Page view counters on Store catalog pages.
*hy GCounter and not a plain integer?*Because two clusters incrementing a plain integer from value N will both write N+1, and the merge of their states (by Counter partitions responsibility by node, making every increment unambiguously attributable.max) will be N+1 instead of N+2. G
4.2 LWWRegister (LastWrite-Wins Register)
*urpose:*A single mutable value where the most recent write (by logical timestamp) wins.
*tate:*A tuple (value: T, timestamp: HybridLogicalClock).
LwwRegister<T> = { value: T, timestamp: HLC }
write(value, hlc) → LwwRegister<T>:
{ value, timestamp: hlc }
read() → T:
self.value
merge(a, b) → LwwRegister<T>:
if a.timestamp >= b.timestamp { a } else { b }*imestamp choice:*We use *ybrid Logical Clocks (HLC)*— not wall clocks. HLC combines physical time with a logical counter, providing total order without strict clock synchronization. Clock skew tolerance: ±500ms (configurable).
*se cases in Koder:*
- App ratings (each user's last rating for a package).
- User profile fields (display name, avatar URL, bio).
- Package metadata fields (description, homepage URL) when the author updates them from two devices.
- Kmail read state (last-modified timestamp of read/unread flag).
*aveat:*LWW discards concurrent writes; only the latest survives. This is acceptable for ratings and profile fields (idempotent writes) but not for set membership. For sets, use OR-Set.
4.3 ORSet (ObservedRemove Set)
*urpose:*A set supporting both add and remove, where concurrent add and remove of the same element resolves to *dd wins*(the "observed-remove" semantics).
*tate:*A map from element to a set of unique tags. An element is in the set if and only if it has at least one tag.
ORSet<E> = Map<E, Set<Tag>> where Tag = UUID (random, per-add)
add(element) → ORSet<E>:
self[element] ∪= { new_uuid() }
remove(element) → ORSet<E>:
self[element] = ∅ # removes all currently observed tags
contains(element) → bool:
self[element].len() > 0
merge(a, b) → ORSet<E>:
{ element: a[element] ∪ b[element] for all elements in a ∪ b }*hy "add wins"?*When user A adds member M and user B removes M concurrently, the add operation generates a new tag that B has not observed. After merge, M has at least one tag (the one from A's add), so M is in the set. This is the desired behavior for channel membership: an add should not be silently lost.
*se cases in Koder:*
- Channel member lists in Talk.
- App categories/tags in Store (admin adds tag from cluster 1; author removes from cluster 2).
- Mailing list members in Kmail.
- Installed package list per user device.
*torage note:*OR-Set state grows with the number of add operations. A tombstone GC pass is required periodically (see Phase 2 roadmap).
4.4 Vector Clock
*urpose:*A mechanism for capturing causal ordering of events — "happened before" relationships — across nodes, without relying on synchronized wall clocks.
*tate:*A map from node ID to u64 logical counter.
VectorClock = Map<NodeId, u64>
tick(node_id) → VectorClock:
self[node_id] += 1
happens_before(a, b) → bool:
∀ node_id: a[node_id] <= b[node_id] AND a ≠ b
concurrent(a, b) → bool:
NOT happens_before(a, b) AND NOT happens_before(b, a)
merge(a, b) → VectorClock:
{ node_id: max(a[node_id], b[node_id]) for all node_ids in a ∪ b }*se cases in Koder:*
- Message ordering in Talk: each message carries a vector clock; clients render in causal order, not arrival order.
- Detecting concurrent edits to package manifests in Store.
- Event ordering in audit logs.
*ote:*Vector clocks provide a *artial order* not a total order. For total order (e.g., rendering a chat timeline), combine with an LWWRegister for tiebreaking when events are concurrent.
5. Rust API
The library exposes two core traits and four concrete types. All types are Send + Sync + 'static.
5.1 Trait CrdtValue
/// A state-based CRDT value.
///
/// `T` is the "read value" type — the type returned by `value()`.
/// The CRDT state itself may differ from T (e.g., GCounter state is a
/// map, but its value() is a u64).
pub trait CrdtValue<T>: Clone + Debug + PartialEq {
/// Return the identity element (bottom of the join-semilattice).
/// `merge(bottom(), x) == x` for all x.
fn bottom() -> Self;
/// Return true if this value equals bottom().
fn is_bottom(&self) -> bool;
/// Compute the least upper bound of self and other.
/// Must satisfy commutativity, associativity, and idempotency.
fn merge(&self, other: &Self) -> Self;
/// Extract the logical value from the CRDT state.
fn value(&self) -> T;
}*ontract (informally):*
merge(x, x) == x— idempotencymerge(x, y) == merge(y, x)— commutativitymerge(x, merge(y, z)) == merge(merge(x, y), z)— associativitymerge(bottom(), x) == x— bottom identity
These are verified by property-based tests (see Section 9).
5.2 Trait CrdtStore
/// Persistent storage backend for CRDT values.
///
/// Implementations include: in-memory (for tests), kdb-next (production).
#[async_trait]
pub trait CrdtStore: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
/// Load a CRDT state by key.
/// Returns `None` if the key does not exist (equivalent to bottom()).
async fn load<C>(&self, key: &CrdtKey) -> Result<Option<C>, Self::Error>
where
C: CrdtValue<C::ReadValue> + DeserializeOwned,
C: HasReadValue;
/// Persist a CRDT state. Must be called after every local mutation.
async fn save<C>(&self, key: &CrdtKey, value: &C) -> Result<(), Self::Error>
where
C: CrdtValue<C::ReadValue> + Serialize,
C: HasReadValue;
/// Atomically merge a remote state into the stored state.
///
/// Equivalent to:
/// let current = load(key).await?.unwrap_or(C::bottom());
/// let merged = current.merge(&remote);
/// save(key, &merged).await
///
/// But performed atomically (compare-and-swap on the storage layer).
async fn merge_remote<C>(
&self,
key: &CrdtKey,
remote: &C,
) -> Result<C, Self::Error>
where
C: CrdtValue<C::ReadValue> + Serialize + DeserializeOwned,
C: HasReadValue;
}5.3 Type Signatures
/// Node identifier — unique per cluster node.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NodeId(pub String);
/// Hybrid Logical Clock timestamp.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Hlc {
pub physical_ms: u64, // milliseconds since Unix epoch
pub logical: u32, // tie-breaker counter
pub node_id: NodeId, // for total ordering within the same ms+logical
}
/// ---- G-Counter ----
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct GCounter {
counts: HashMap<NodeId, u64>,
}
impl GCounter {
pub fn increment(&mut self, node: &NodeId);
pub fn count(&self) -> u64; // sum of all per-node counts
}
impl CrdtValue<u64> for GCounter { ... }
/// ---- LWW-Register ----
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct LwwRegister<T: Clone + Debug + PartialEq + Serialize + DeserializeOwned> {
value: Option<T>,
timestamp: Option<Hlc>,
}
impl<T: ...> LwwRegister<T> {
pub fn write(&mut self, value: T, hlc: Hlc);
pub fn read(&self) -> Option<&T>;
}
impl<T: ...> CrdtValue<Option<T>> for LwwRegister<T> { ... }
/// ---- OR-Set ----
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct OrSet<E: Clone + Debug + Eq + Hash + Serialize + DeserializeOwned> {
entries: HashMap<E, HashSet<Uuid>>,
}
impl<E: ...> OrSet<E> {
pub fn add(&mut self, element: E);
pub fn remove(&mut self, element: &E);
pub fn contains(&self, element: &E) -> bool;
pub fn iter(&self) -> impl Iterator<Item = &E>;
pub fn len(&self) -> usize;
}
impl<E: ...> CrdtValue<HashSet<E>> for OrSet<E> { ... }
/// ---- Vector Clock ----
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct VectorClock {
clocks: HashMap<NodeId, u64>,
}
impl VectorClock {
pub fn tick(&mut self, node: &NodeId);
pub fn happens_before(&self, other: &VectorClock) -> bool;
pub fn concurrent_with(&self, other: &VectorClock) -> bool;
pub fn dominates(&self, other: &VectorClock) -> bool; // happens_before || eq
}
impl CrdtValue<HashMap<NodeId, u64>> for VectorClock { ... }6. Serialization — Protobuf Schemas
All CRDT types serialize to Protobuf for wire transfer (crosscluster sync, kdbnext storage). Schema files live at platform/crdt/proto/.
6.1 crdt_common.proto
syntax = "proto3";
package koder.crdt.v1;
message NodeId {
string id = 1;
}
message Hlc {
uint64 physical_ms = 1;
uint32 logical = 2;
NodeId node_id = 3;
}6.2 g_counter.proto
syntax = "proto3";
package koder.crdt.v1;
import "crdt_common.proto";
message GCounterEntry {
NodeId node_id = 1;
uint64 count = 2;
}
message GCounter {
repeated GCounterEntry entries = 1;
}6.3 lww_register.proto
syntax = "proto3";
package koder.crdt.v1;
import "crdt_common.proto";
message LwwRegister {
bytes value = 1; // serialized inner value (type-erased)
Hlc timestamp = 2;
// type_url mirrors google.protobuf.Any semantics for inner value
string type_url = 3;
}6.4 or_set.proto
syntax = "proto3";
package koder.crdt.v1;
message OrSetEntry {
bytes element = 1; // serialized element
repeated bytes tags = 2; // UUIDs as 16-byte arrays
}
message OrSet {
repeated OrSetEntry entries = 1;
}6.5 vector_clock.proto
syntax = "proto3";
package koder.crdt.v1;
import "crdt_common.proto";
message VectorClockEntry {
NodeId node_id = 1;
uint64 counter = 2;
}
message VectorClock {
repeated VectorClockEntry entries = 1;
}6.6 Code Generation
Protobuf bindings are generated at build time via build.rs using prost-build. No checked-in generated code; the platform/crdt/src/proto/ module is gitignored and regenerated on every build.
7. Integration Points
7.1 Koder Hub
Store lives at platform/store. CRDT integration is additive — existing kdbnext read/write paths remain unchanged for nonreplicated data.
*ownload Counters*
Each Store backend node calls GCounter::increment(node_id) on every confirmed download event. The CRDT state is stored in kdbnext under the key cluster sync pushes the full GCounter state via a background gRPC stream. On receive, store:pkg:{package_id}:downloads. CrossCrdtStore::merge_remote is called.
Key scheme: store:crdt:g-counter:{package_id}:downloads
Key scheme: store:crdt:g-counter:{package_id}:{version}:downloads*atings*
Each user's rating is stored as LwwRegister<f32> keyed by (package_id, user_id). Aggregate rating is computed on read as the arithmetic mean of all LWW values for the package.
Key scheme: store:crdt:lww:{package_id}:rating:{user_id}*pp Listings (categories and tags)*
Package category membership uses OR-Set to allow concurrent editorial operations:
Key scheme: store:crdt:or-set:{package_id}:categories
Key scheme: store:crdt:or-set:{package_id}:tags7.2 Koder Talk
Talk lives at platform/talk. CRDT integration targets two subsystems.
*essage Delivery State*
Each message has a delivery state per recipient: {sent, delivered, read}. Modeled as LwwRegister<DeliveryState> — the latest state update wins. In practice, delivery states only move forward (sent → delivered → read), so LWW is semantically correct.
Key scheme: talk:crdt:lww:msg:{message_id}:delivery:{recipient_id}*hannel Membership*
Channel member list modeled as OrSet<UserId>. Concurrent add and remove of the same user resolves to "add wins", which is the desired behavior (joining takes precedence over being removed in a race).
Key scheme: talk:crdt:or-set:channel:{channel_id}:members*essage Causal Ordering*
Each message carries a VectorClock representing its causal history. Clients use this to sort messages and detect gaps (missing causal predecessors trigger a re-sync request).
7.3 Kmail
Kmail lives at platform/kmail (formerly platform/mail).
*ead State*
Each email's read/unread flag is modeled as LwwRegister<ReadState> where ReadState = { read: bool, read_at: Option<Hlc> }. Opening the email on any device updates the register; the most recent timestamp wins.
Key scheme: kmail:crdt:lww:msg:{message_id}:read:{user_id}8. Correctness Guarantees
The CRDT runtime makes the following guarantees, all mechanically verified:
8.1 Convergence
Any two replicas that have exchanged all states will hold equal values, regardless of the order in which states were exchanged.
*roof sketch:*Convergence follows from the join-semilattice structure. merge computes the least upper bound; since the semilattice is finite (in practice bounded by the number of nodes and events), all chains are finite, so repeated merging terminates at the unique maximum of the states received.
8.2 Commutativity
merge(a, b) == merge(b, a) for all states a, b.
This means: it does not matter which replica receives the other's state first.
8.3 Associativity
merge(a, merge(b, c)) == merge(merge(a, b), c) for all a, b, c.
This means: partial merges can be batched in any grouping.
8.4 Idempotency
merge(a, a) == a for all a.
This means: receiving the same state update twice has no effect. Duplicate delivery from the transport layer is safe.
8.5 Monotonicity
∀ a, b: a ≤ merge(a, b) under the partial order.
This means: merging never removes information. A value never regresses to a previous state.
9. Property-Based Tests
Property-based tests use the proptest crate. All four types are tested against all four algebraic laws.
// Example: G-Counter idempotency
proptest! {
#[test]
fn g_counter_merge_idempotent(a in arb_g_counter()) {
assert_eq!(a.merge(&a), a);
}
#[test]
fn g_counter_merge_commutative(a in arb_g_counter(), b in arb_g_counter()) {
assert_eq!(a.merge(&b), b.merge(&a));
}
#[test]
fn g_counter_merge_associative(
a in arb_g_counter(),
b in arb_g_counter(),
c in arb_g_counter(),
) {
assert_eq!(a.merge(&b.merge(&c)), a.merge(&b).merge(&c));
}
#[test]
fn g_counter_merge_bottom_identity(a in arb_g_counter()) {
assert_eq!(a.merge(&GCounter::bottom()), a);
assert_eq!(GCounter::bottom().merge(&a), a);
}
}The same four tests are generated for LwwRegister, OrSet, and VectorClock via a macro assert_crdt_laws!(Type, arb_fn).
*dditional tests:*
- OR
Set: addthenremove removes element; concurrent addandremove preserves element (addwins). - LWW-Register: higher HLC timestamp always wins regardless of arrival order.
- Vector Clock:
happens_beforeis transitive; concurrent detection is symmetric. - GCounter:
value()after N increments from K different nodes equals N*K (no double-counting).
10. Performance Considerations
10.1 State Size
| Type | State size |
|---|---|
| GCounter (N nodes) | O(N) — one u64 per node |
| LwwRegister | O(size of inner value) + O(1) for HLC |
| OrSet (E elements, A total adds) | O(A) — grows with add history |
| VectorClock (N nodes) | O(N) |
For a cluster of up to 100 nodes, GCounter and VectorClock states are under 1 KB.
ORSet state grows with total adds. Mitigation: periodic GC removes entries with empty tag sets (elements that have been removed and will not be readded). GC is safe once all replicas have acknowledged the tombstone.
10.2 Merge Cost
All merge operations are O(N) where N is the number of distinct keys in the state maps. For typical workloads (≤100 nodes, ≤10K OR-Set elements), merge time is under 1ms.
10.3 Network Cost
Fullstate sync is used (as opposed to deltastate CRDTs). This is acceptable for Phase 12 given the small state sizes. DeltaCRDTs are a potential Phase 5 optimization if bandwidth becomes a bottleneck.
11. Alternatives Considered
11.1 Automerge (existing CRDT library)
Automerge is a mature CRDT library targeting JSON-like documents. Rejected because:
- It targets document-level CRDTs; we need primitive types that compose into our existing data model.
- Its Rust implementation has significant overhead for simple counter/register use cases.
- We need tight Protobuf integration for cross-cluster sync; Automerge uses its own binary format.
11.2 Riak-style DT module
The Riak DT library (Erlang) inspired this design. Adapting it to Rust is essentially what this RFC proposes.
11.3 CRDTs in the Database Layer (kdb-next)
Embedding CRDTs in kdb-next (TiKV) via custom merge operators was considered. Rejected because:
- TiKV merge operators are append
only logs, not full joinsemilattice operations. - Business logic (e.g., OR-Set tag management) would bleed into the storage layer.
- Testability is much worse when merge logic lives inside the database.
12. Roadmap
Phase 1 — Primitives (GCounter + LWWRegister)
Target: 20260515
Deliverables:
platform/crdtcrate with GCounter and LWWRegister.- CrdtValue and CrdtStore traits.
- In-memory CrdtStore implementation for tests.
- Property-based tests for both types.
- Protobuf schemas for both types + codegen.
Ticket: backlog/pending/001-g-counter-lww-register.md
Phase 2 — OR-Set + Vector Clock
Target: 20260615
Deliverables:
- OR-Set and VectorClock implementations.
- OR
Set GC mechanism (markand-sweep, requires distributed acknowledgement). - Property-based tests for both types.
- Protobuf schemas.
Ticket: backlog/pending/002-or-set-vector-clock.md
Phase 3 — kdb-next CrdtStore + Store Integration
Target: 20260715
Deliverables:
- kdb-next backend for CrdtStore (using TiKV atomic CAS for
merge_remote). - Store download counters migrated to GCounter.
- Store ratings migrated to LwwRegister.
- App categories/tags migrated to OrSet.
- Integration tests with real kdb-next cluster.
Tickets: backlog/pending/004-crdt-store-trait.md, backlog/pending/005-store-integration.md
Phase 4 — Talk Integration
Target: 20260815
Deliverables:
- Talk channel membership migrated to OrSet.
- Message delivery state migrated to LwwRegister.
- Message VectorClock field added and used for ordering.
- End
toend test: two clients on different clusters converge after sync.
Ticket: backlog/pending/006-talk-integration.md
Phase 5 (Future) — Delta-CRDTs
Convert to deltastate CRDTs to reduce sync bandwidth. Only needed at multibillion-events/day scale.
13. Open Questions
- *R
Set GC coordination:*How do we ensure all replicas have acknowledged tombstones before GC? Proposed: use a VectorClock to track acknowledgement across nodes. Details TBD in a followup RFC.
- *LC max drift:*Current setting of ±500ms. Is this tight enough for LWW correctness? Need measurement of actual clock skew across clusters.
- *rdtStore for Kmail:*Kmail uses Stalwart Mail Server internally. Do we integrate at the Stalwart plugin level or at the Kmail application layer above it?
- *ross-org isolation:*Should CRDTs from different tenant organizations ever merge? Answer is almost certainly no — key namespacing must include tenant ID.
- *chema evolution:*When the inner type of LwwRegister changes (e.g., DeliveryState gains a new variant), how do old nodes deserialize new states? Need a versioned type_url scheme (already sketched in Section 6.3).