Crdt RFC 001 crdt runtime

RFC-001 — CRDT Runtime

Field Value
Author Koder Engineering
Date 20260415
Status Draft
Module platform/crdt

Table of Contents

  1. Summary
  2. Motivation
  3. Background — CRDTs Primer
  4. Types Implemented
  5. Rust API
  6. Serialization — Protobuf Schemas
  7. Integration Points
  8. Correctness Guarantees
  9. Property-Based Tests
  10. Performance Considerations
  11. Alternatives Considered
  12. Roadmap
  13. Open Questions

1. Summary

This RFC proposes a purposebuilt *RDT (Conflictfree Replicated Data Type) runtime*for the Koder platform. The library, housed at platform/crdt, provides four fundamental CRDT primitives — GCounter, LWWRegister, ORSet, and Vector Clock — with a unified Rust API, Protobuf serialization, and firstclass integration hooks for Koder Hub, Talk, and Kmail.

The runtime guarantees that any two replicas of any CRDT value, after exchanging their states, converge to the same result regardless of message ordering, duplication, or reordering. This makes it possible to operate across multi-cluster deployments without a coordination protocol (no distributed locks, no Paxos/Raft for every write).


2. Motivation

2.1 Current State

Koder Hub, Talk, and Kmail are each deployed across multiple geographic clusters (Europe, Americas, Southeast Asia). Within a cluster, consistency is provided by kdbnext (TiKV substrate, linearizable perkey). Across clusters, replication is asynchronous.

This means that concurrent writes to the same logical entity from different clusters — a package download counter incremented in Frankfurt and São Paulo simultaneously, a channel membership list modified concurrently in Singapore and London — result in *onflicts*that the current replication layer resolves with "last writer wins by wall clock", a strategy that is:

  • *on-deterministic*across nodes with clock skew.
  • *atalosing*for setlike structures (a concurrent add and remove can lose the add).
  • *nvisible*to operators — conflicts are silently resolved, never surfaced.

2.2 Concrete Failure Modes

Scenario Component Problem
Package download counter Store Two clusters increment independently; only one increment is kept on sync
App rating (float average) Store Concurrent rating updates from two regions → one is silently discarded
Channel member list Talk User A adds member M in cluster 1; user B removes M in cluster 2 concurrently; outcome is non-deterministic
Message delivery flag Talk "Delivered" and "Read" flags set from two different nodes → inconsistent state
Email read state Kmail Opened on mobile (cluster A) and web (cluster B) near-simultaneously → read state flickers

2.3 Why Not OT (Operational Transformation)?

Operational Transformation requires a central coordinator to sequence operations. This contradicts the goal of clusterpeerequal architecture where no cluster is the designated primary.

2.4 Why Not a Database with Serializable Isolation?

Crosscluster serializable isolation requires a distributed consensus round on every write (median latency: 200400ms for intercontinental quorum). At the scale of Store package installs (millionsday) and Talk messages (billionsday), this is economically and latencywise untenable.

2.5 Solution

CRDTs provide *onvergence without coordination* Each write is local, fast, and always succeeds. Merging happens lazily at sync time and is guaranteed to produce the same result regardless of the order in which partial states arrive.


3. Background — CRDTs Primer

A *tate-based CRDT*(or CvRDT) is a data type equipped with:

  1. A *artial order*(join-semilattice) on its state space.
  2. A *erge*(join) operation that computes the least upper bound of two states.
  3. A *ottom*element (the initial empty state).

Formally, for any states a, b, c:

merge(a, a) = a                  # idempotency
merge(a, b) = merge(b, a)        # commutativity
merge(a, merge(b, c)) = merge(merge(a, b), c)  # associativity

These three properties ensure that any replica receiving any subset of updates in any order will eventually converge to the same state as any other replica that received all updates.


4. Types Implemented

4.1 G-Counter

*urpose:*A counter that only grows (no decrements).

*tate:*A map from node ID to u64 — each node maintains its own local count.

GCounter = Map<NodeId, u64>

increment(node_id) → GCounter:
    self[node_id] += 1

value() → u64:
    sum(self.values())

merge(a, b) → GCounter:
    { node_id: max(a[node_id], b[node_id]) for all node_ids in a ∪ b }

*se cases in Koder:*

  • Package download counts in Store (each cluster node increments its local slice).
  • Install counts per package version.
  • Page view counters on Store catalog pages.

*hy GCounter and not a plain integer?*Because two clusters incrementing a plain integer from value N will both write N+1, and the merge of their states (by max) will be N+1 instead of N+2. GCounter partitions responsibility by node, making every increment unambiguously attributable.

4.2 LWWRegister (LastWrite-Wins Register)

*urpose:*A single mutable value where the most recent write (by logical timestamp) wins.

*tate:*A tuple (value: T, timestamp: HybridLogicalClock).

LwwRegister<T> = { value: T, timestamp: HLC }

write(value, hlc) → LwwRegister<T>:
    { value, timestamp: hlc }

read() → T:
    self.value

merge(a, b) → LwwRegister<T>:
    if a.timestamp >= b.timestamp { a } else { b }

*imestamp choice:*We use *ybrid Logical Clocks (HLC)*— not wall clocks. HLC combines physical time with a logical counter, providing total order without strict clock synchronization. Clock skew tolerance: ±500ms (configurable).

*se cases in Koder:*

  • App ratings (each user's last rating for a package).
  • User profile fields (display name, avatar URL, bio).
  • Package metadata fields (description, homepage URL) when the author updates them from two devices.
  • Kmail read state (last-modified timestamp of read/unread flag).

*aveat:*LWW discards concurrent writes; only the latest survives. This is acceptable for ratings and profile fields (idempotent writes) but not for set membership. For sets, use OR-Set.

4.3 ORSet (ObservedRemove Set)

*urpose:*A set supporting both add and remove, where concurrent add and remove of the same element resolves to *dd wins*(the "observed-remove" semantics).

*tate:*A map from element to a set of unique tags. An element is in the set if and only if it has at least one tag.

ORSet<E> = Map<E, Set<Tag>>     where Tag = UUID (random, per-add)

add(element) → ORSet<E>:
    self[element] ∪= { new_uuid() }

remove(element) → ORSet<E>:
    self[element] = ∅    # removes all currently observed tags

contains(element) → bool:
    self[element].len() > 0

merge(a, b) → ORSet<E>:
    { element: a[element] ∪ b[element] for all elements in a ∪ b }

*hy "add wins"?*When user A adds member M and user B removes M concurrently, the add operation generates a new tag that B has not observed. After merge, M has at least one tag (the one from A's add), so M is in the set. This is the desired behavior for channel membership: an add should not be silently lost.

*se cases in Koder:*

  • Channel member lists in Talk.
  • App categories/tags in Store (admin adds tag from cluster 1; author removes from cluster 2).
  • Mailing list members in Kmail.
  • Installed package list per user device.

*torage note:*OR-Set state grows with the number of add operations. A tombstone GC pass is required periodically (see Phase 2 roadmap).

4.4 Vector Clock

*urpose:*A mechanism for capturing causal ordering of events — "happened before" relationships — across nodes, without relying on synchronized wall clocks.

*tate:*A map from node ID to u64 logical counter.

VectorClock = Map<NodeId, u64>

tick(node_id) → VectorClock:
    self[node_id] += 1

happens_before(a, b) → bool:
    ∀ node_id: a[node_id] <= b[node_id]  AND  a ≠ b

concurrent(a, b) → bool:
    NOT happens_before(a, b) AND NOT happens_before(b, a)

merge(a, b) → VectorClock:
    { node_id: max(a[node_id], b[node_id]) for all node_ids in a ∪ b }

*se cases in Koder:*

  • Message ordering in Talk: each message carries a vector clock; clients render in causal order, not arrival order.
  • Detecting concurrent edits to package manifests in Store.
  • Event ordering in audit logs.

*ote:*Vector clocks provide a *artial order* not a total order. For total order (e.g., rendering a chat timeline), combine with an LWWRegister for tiebreaking when events are concurrent.


5. Rust API

The library exposes two core traits and four concrete types. All types are Send + Sync + 'static.

5.1 Trait CrdtValue

/// A state-based CRDT value.
///
/// `T` is the "read value" type — the type returned by `value()`.
/// The CRDT state itself may differ from T (e.g., GCounter state is a
/// map, but its value() is a u64).
pub trait CrdtValue<T>: Clone + Debug + PartialEq {
    /// Return the identity element (bottom of the join-semilattice).
    /// `merge(bottom(), x) == x` for all x.
    fn bottom() -> Self;

    /// Return true if this value equals bottom().
    fn is_bottom(&self) -> bool;

    /// Compute the least upper bound of self and other.
    /// Must satisfy commutativity, associativity, and idempotency.
    fn merge(&self, other: &Self) -> Self;

    /// Extract the logical value from the CRDT state.
    fn value(&self) -> T;
}

*ontract (informally):*

  • merge(x, x) == x — idempotency
  • merge(x, y) == merge(y, x) — commutativity
  • merge(x, merge(y, z)) == merge(merge(x, y), z) — associativity
  • merge(bottom(), x) == x — bottom identity

These are verified by property-based tests (see Section 9).

5.2 Trait CrdtStore

/// Persistent storage backend for CRDT values.
///
/// Implementations include: in-memory (for tests), kdb-next (production).
#[async_trait]
pub trait CrdtStore: Send + Sync {
    type Error: std::error::Error + Send + Sync + 'static;

    /// Load a CRDT state by key.
    /// Returns `None` if the key does not exist (equivalent to bottom()).
    async fn load<C>(&self, key: &CrdtKey) -> Result<Option<C>, Self::Error>
    where
        C: CrdtValue<C::ReadValue> + DeserializeOwned,
        C: HasReadValue;

    /// Persist a CRDT state. Must be called after every local mutation.
    async fn save<C>(&self, key: &CrdtKey, value: &C) -> Result<(), Self::Error>
    where
        C: CrdtValue<C::ReadValue> + Serialize,
        C: HasReadValue;

    /// Atomically merge a remote state into the stored state.
    ///
    /// Equivalent to:
    ///   let current = load(key).await?.unwrap_or(C::bottom());
    ///   let merged = current.merge(&remote);
    ///   save(key, &merged).await
    ///
    /// But performed atomically (compare-and-swap on the storage layer).
    async fn merge_remote<C>(
        &self,
        key: &CrdtKey,
        remote: &C,
    ) -> Result<C, Self::Error>
    where
        C: CrdtValue<C::ReadValue> + Serialize + DeserializeOwned,
        C: HasReadValue;
}

5.3 Type Signatures

/// Node identifier — unique per cluster node.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NodeId(pub String);

/// Hybrid Logical Clock timestamp.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Hlc {
    pub physical_ms: u64,  // milliseconds since Unix epoch
    pub logical: u32,      // tie-breaker counter
    pub node_id: NodeId,   // for total ordering within the same ms+logical
}

/// ---- G-Counter ----

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct GCounter {
    counts: HashMap<NodeId, u64>,
}

impl GCounter {
    pub fn increment(&mut self, node: &NodeId);
    pub fn count(&self) -> u64;  // sum of all per-node counts
}

impl CrdtValue<u64> for GCounter { ... }

/// ---- LWW-Register ----

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct LwwRegister<T: Clone + Debug + PartialEq + Serialize + DeserializeOwned> {
    value: Option<T>,
    timestamp: Option<Hlc>,
}

impl<T: ...> LwwRegister<T> {
    pub fn write(&mut self, value: T, hlc: Hlc);
    pub fn read(&self) -> Option<&T>;
}

impl<T: ...> CrdtValue<Option<T>> for LwwRegister<T> { ... }

/// ---- OR-Set ----

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct OrSet<E: Clone + Debug + Eq + Hash + Serialize + DeserializeOwned> {
    entries: HashMap<E, HashSet<Uuid>>,
}

impl<E: ...> OrSet<E> {
    pub fn add(&mut self, element: E);
    pub fn remove(&mut self, element: &E);
    pub fn contains(&self, element: &E) -> bool;
    pub fn iter(&self) -> impl Iterator<Item = &E>;
    pub fn len(&self) -> usize;
}

impl<E: ...> CrdtValue<HashSet<E>> for OrSet<E> { ... }

/// ---- Vector Clock ----

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct VectorClock {
    clocks: HashMap<NodeId, u64>,
}

impl VectorClock {
    pub fn tick(&mut self, node: &NodeId);
    pub fn happens_before(&self, other: &VectorClock) -> bool;
    pub fn concurrent_with(&self, other: &VectorClock) -> bool;
    pub fn dominates(&self, other: &VectorClock) -> bool;  // happens_before || eq
}

impl CrdtValue<HashMap<NodeId, u64>> for VectorClock { ... }

6. Serialization — Protobuf Schemas

All CRDT types serialize to Protobuf for wire transfer (crosscluster sync, kdbnext storage). Schema files live at platform/crdt/proto/.

6.1 crdt_common.proto

syntax = "proto3";
package koder.crdt.v1;

message NodeId {
  string id = 1;
}

message Hlc {
  uint64 physical_ms = 1;
  uint32 logical     = 2;
  NodeId node_id     = 3;
}

6.2 g_counter.proto

syntax = "proto3";
package koder.crdt.v1;
import "crdt_common.proto";

message GCounterEntry {
  NodeId node_id = 1;
  uint64 count   = 2;
}

message GCounter {
  repeated GCounterEntry entries = 1;
}

6.3 lww_register.proto

syntax = "proto3";
package koder.crdt.v1;
import "crdt_common.proto";

message LwwRegister {
  bytes value     = 1;  // serialized inner value (type-erased)
  Hlc   timestamp = 2;
  // type_url mirrors google.protobuf.Any semantics for inner value
  string type_url = 3;
}

6.4 or_set.proto

syntax = "proto3";
package koder.crdt.v1;

message OrSetEntry {
  bytes         element = 1;  // serialized element
  repeated bytes tags   = 2;  // UUIDs as 16-byte arrays
}

message OrSet {
  repeated OrSetEntry entries = 1;
}

6.5 vector_clock.proto

syntax = "proto3";
package koder.crdt.v1;
import "crdt_common.proto";

message VectorClockEntry {
  NodeId node_id = 1;
  uint64 counter = 2;
}

message VectorClock {
  repeated VectorClockEntry entries = 1;
}

6.6 Code Generation

Protobuf bindings are generated at build time via build.rs using prost-build. No checked-in generated code; the platform/crdt/src/proto/ module is gitignored and regenerated on every build.


7. Integration Points

7.1 Koder Hub

Store lives at platform/store. CRDT integration is additive — existing kdbnext read/write paths remain unchanged for nonreplicated data.

*ownload Counters*

Each Store backend node calls GCounter::increment(node_id) on every confirmed download event. The CRDT state is stored in kdbnext under the key store:pkg:{package_id}:downloads. Crosscluster sync pushes the full GCounter state via a background gRPC stream. On receive, CrdtStore::merge_remote is called.

Key scheme:  store:crdt:g-counter:{package_id}:downloads
Key scheme:  store:crdt:g-counter:{package_id}:{version}:downloads

*atings*

Each user's rating is stored as LwwRegister<f32> keyed by (package_id, user_id). Aggregate rating is computed on read as the arithmetic mean of all LWW values for the package.

Key scheme:  store:crdt:lww:{package_id}:rating:{user_id}

*pp Listings (categories and tags)*

Package category membership uses OR-Set to allow concurrent editorial operations:

Key scheme:  store:crdt:or-set:{package_id}:categories
Key scheme:  store:crdt:or-set:{package_id}:tags

7.2 Koder Talk

Talk lives at platform/talk. CRDT integration targets two subsystems.

*essage Delivery State*

Each message has a delivery state per recipient: {sent, delivered, read}. Modeled as LwwRegister<DeliveryState> — the latest state update wins. In practice, delivery states only move forward (sent → delivered → read), so LWW is semantically correct.

Key scheme:  talk:crdt:lww:msg:{message_id}:delivery:{recipient_id}

*hannel Membership*

Channel member list modeled as OrSet<UserId>. Concurrent add and remove of the same user resolves to "add wins", which is the desired behavior (joining takes precedence over being removed in a race).

Key scheme:  talk:crdt:or-set:channel:{channel_id}:members

*essage Causal Ordering*

Each message carries a VectorClock representing its causal history. Clients use this to sort messages and detect gaps (missing causal predecessors trigger a re-sync request).

7.3 Kmail

Kmail lives at platform/kmail (formerly platform/mail).

*ead State*

Each email's read/unread flag is modeled as LwwRegister<ReadState> where ReadState = { read: bool, read_at: Option<Hlc> }. Opening the email on any device updates the register; the most recent timestamp wins.

Key scheme:  kmail:crdt:lww:msg:{message_id}:read:{user_id}

8. Correctness Guarantees

The CRDT runtime makes the following guarantees, all mechanically verified:

8.1 Convergence

Any two replicas that have exchanged all states will hold equal values, regardless of the order in which states were exchanged.

*roof sketch:*Convergence follows from the join-semilattice structure. merge computes the least upper bound; since the semilattice is finite (in practice bounded by the number of nodes and events), all chains are finite, so repeated merging terminates at the unique maximum of the states received.

8.2 Commutativity

merge(a, b) == merge(b, a) for all states a, b.

This means: it does not matter which replica receives the other's state first.

8.3 Associativity

merge(a, merge(b, c)) == merge(merge(a, b), c) for all a, b, c.

This means: partial merges can be batched in any grouping.

8.4 Idempotency

merge(a, a) == a for all a.

This means: receiving the same state update twice has no effect. Duplicate delivery from the transport layer is safe.

8.5 Monotonicity

∀ a, b: a ≤ merge(a, b) under the partial order.

This means: merging never removes information. A value never regresses to a previous state.


9. Property-Based Tests

Property-based tests use the proptest crate. All four types are tested against all four algebraic laws.

// Example: G-Counter idempotency
proptest! {
    #[test]
    fn g_counter_merge_idempotent(a in arb_g_counter()) {
        assert_eq!(a.merge(&a), a);
    }

    #[test]
    fn g_counter_merge_commutative(a in arb_g_counter(), b in arb_g_counter()) {
        assert_eq!(a.merge(&b), b.merge(&a));
    }

    #[test]
    fn g_counter_merge_associative(
        a in arb_g_counter(),
        b in arb_g_counter(),
        c in arb_g_counter(),
    ) {
        assert_eq!(a.merge(&b.merge(&c)), a.merge(&b).merge(&c));
    }

    #[test]
    fn g_counter_merge_bottom_identity(a in arb_g_counter()) {
        assert_eq!(a.merge(&GCounter::bottom()), a);
        assert_eq!(GCounter::bottom().merge(&a), a);
    }
}

The same four tests are generated for LwwRegister, OrSet, and VectorClock via a macro assert_crdt_laws!(Type, arb_fn).

*dditional tests:*

  • ORSet: addthenremove removes element; concurrent addandremove preserves element (addwins).
  • LWW-Register: higher HLC timestamp always wins regardless of arrival order.
  • Vector Clock: happens_before is transitive; concurrent detection is symmetric.
  • GCounter: value() after N increments from K different nodes equals N*K (no double-counting).

10. Performance Considerations

10.1 State Size

Type State size
GCounter (N nodes) O(N) — one u64 per node
LwwRegister O(size of inner value) + O(1) for HLC
OrSet (E elements, A total adds) O(A) — grows with add history
VectorClock (N nodes) O(N)

For a cluster of up to 100 nodes, GCounter and VectorClock states are under 1 KB.

ORSet state grows with total adds. Mitigation: periodic GC removes entries with empty tag sets (elements that have been removed and will not be readded). GC is safe once all replicas have acknowledged the tombstone.

10.2 Merge Cost

All merge operations are O(N) where N is the number of distinct keys in the state maps. For typical workloads (≤100 nodes, ≤10K OR-Set elements), merge time is under 1ms.

10.3 Network Cost

Fullstate sync is used (as opposed to deltastate CRDTs). This is acceptable for Phase 12 given the small state sizes. DeltaCRDTs are a potential Phase 5 optimization if bandwidth becomes a bottleneck.


11. Alternatives Considered

11.1 Automerge (existing CRDT library)

Automerge is a mature CRDT library targeting JSON-like documents. Rejected because:

  • It targets document-level CRDTs; we need primitive types that compose into our existing data model.
  • Its Rust implementation has significant overhead for simple counter/register use cases.
  • We need tight Protobuf integration for cross-cluster sync; Automerge uses its own binary format.

11.2 Riak-style DT module

The Riak DT library (Erlang) inspired this design. Adapting it to Rust is essentially what this RFC proposes.

11.3 CRDTs in the Database Layer (kdb-next)

Embedding CRDTs in kdb-next (TiKV) via custom merge operators was considered. Rejected because:

  • TiKV merge operators are appendonly logs, not full joinsemilattice operations.
  • Business logic (e.g., OR-Set tag management) would bleed into the storage layer.
  • Testability is much worse when merge logic lives inside the database.

12. Roadmap

Phase 1 — Primitives (GCounter + LWWRegister)

Target: 20260515

Deliverables:

  • platform/crdt crate with GCounter and LWWRegister.
  • CrdtValue and CrdtStore traits.
  • In-memory CrdtStore implementation for tests.
  • Property-based tests for both types.
  • Protobuf schemas for both types + codegen.

Ticket: backlog/pending/001-g-counter-lww-register.md

Phase 2 — OR-Set + Vector Clock

Target: 20260615

Deliverables:

  • OR-Set and VectorClock implementations.
  • ORSet GC mechanism (markand-sweep, requires distributed acknowledgement).
  • Property-based tests for both types.
  • Protobuf schemas.

Ticket: backlog/pending/002-or-set-vector-clock.md

Phase 3 — kdb-next CrdtStore + Store Integration

Target: 20260715

Deliverables:

  • kdb-next backend for CrdtStore (using TiKV atomic CAS for merge_remote).
  • Store download counters migrated to GCounter.
  • Store ratings migrated to LwwRegister.
  • App categories/tags migrated to OrSet.
  • Integration tests with real kdb-next cluster.

Tickets: backlog/pending/004-crdt-store-trait.md, backlog/pending/005-store-integration.md

Phase 4 — Talk Integration

Target: 20260815

Deliverables:

  • Talk channel membership migrated to OrSet.
  • Message delivery state migrated to LwwRegister.
  • Message VectorClock field added and used for ordering.
  • Endtoend test: two clients on different clusters converge after sync.

Ticket: backlog/pending/006-talk-integration.md

Phase 5 (Future) — Delta-CRDTs

Convert to deltastate CRDTs to reduce sync bandwidth. Only needed at multibillion-events/day scale.


13. Open Questions

  1. *RSet GC coordination:*How do we ensure all replicas have acknowledged tombstones before GC? Proposed: use a VectorClock to track acknowledgement across nodes. Details TBD in a followup RFC.
  1. *LC max drift:*Current setting of ±500ms. Is this tight enough for LWW correctness? Need measurement of actual clock skew across clusters.
  1. *rdtStore for Kmail:*Kmail uses Stalwart Mail Server internally. Do we integrate at the Stalwart plugin level or at the Kmail application layer above it?
  1. *ross-org isolation:*Should CRDTs from different tenant organizations ever merge? Answer is almost certainly no — key namespacing must include tenant ID.
  1. *chema evolution:*When the inner type of LwwRegister changes (e.g., DeliveryState gains a new variant), how do old nodes deserialize new states? Need a versioned type_url scheme (already sketched in Section 6.3).

Source: ../home/koder/dev/koder/meta/docs/stack/rfcs/crdt-RFC-001-crdt-runtime.md