Cross-Service Event Delivery (Redis Streams)

ratified-2026-05-14

Cross-Service Event Delivery (Redis Streams)

Origem

services/foundation/id/engine/services/identity/internal/service/event_bus.go autodocumenta: "intentionally simple — a KDBbacked durable queue can replace it later without changing the publisher interface." Esse "later" chegou: tickets #091, #092, #093 precisam que authssokeys services consumam identity.user.erased cross-process.

Ticket #097 levantou 3 opções (gRPC RPC, durable queue, webhook reuse). Esta spec ratifica *pção B — Durable queue via Redis Streams*

Decisão (20260514)

*edis Streams*é o substrate escolhido para event delivery cross-service *té que kdbstream layer da RFC001 shipe* momento em que a migração é drop-in (mesma semântica pub/sub).

Razões:

  1. *euse-first*— pkg/redisclient/redisclient.go já existe no engine; sem nova infra
  2. *elfhostedfirst*— Redis é OSS, self-hostable, deployment já estabelecido em ops
  3. *onsumer groups*— retry + offset + DLQ built-in; cada serviço tem seu próprio progresso
  4. *igration path*— kdb-stream futuro usa XADD/XREADGROUP semantics paralelas
  5. *rop simplicity vs NATS JetStream*— Redis já está in-stack; NATS exigiria nova infra
  6. *ebhook_deliveries reuse rejeitada*— semantic stretch (intended for external webhooks, polling latency ruim pra cascade)

Contrato (R-rules)

R1 — Stream key naming

Único stream global por serviço-origem:

Stream key Publisher Consumers
koder:events:id services/foundation/id/engine (identity) auth, sso, keys
koder:events:auth (futuro) services/foundation/id/engine (auth) (TBD per event)

Prefixo koder:events: reservado.

R2 — Event envelope

Cada XADD carrega campos chave-valor (Redis Streams native format):

XADD koder:events:id * \
  type identity.user.erased \
  event_id evt_<ulid> \
  occurred_at 2026-05-14T12:34:56Z \
  tenant_id tenant-A \
  subject_id user-u123 \
  actor system \
  data '{"erasure_id":"ERS-2026-05-14-abc12345"}'

Campos obrigatórios:

  • type — event type (e.g. identity.user.erased)
  • event_id — globally unique (ULID); idempotency key for consumers
  • occurred_at — ISO 8601 UTC
  • tenant_id — tenant scope
  • subject_id — primary subject (userid, workspaceid, etc.)
  • actor — who/what triggered (system, user:<id>, admin:<id>)
  • data — JSONencoded eventspecific payload

R3 — Consumer group naming

Cada serviço consumidor cria seu próprio consumer group:

<service-name>-<purpose>
e.g.:
  auth-erasure-cascade
  sso-erasure-cascade
  keys-erasure-cascade

Pattern: <service>-<purpose>. Group names são globally unique per stream.

R4 — Consumer behavior

  1. Boot-time: XGROUP CREATE koder:events:id <group> $ MKSTREAM (start from now; future events only)
  2. Loop: XREADGROUP GROUP <group> <consumer-id> COUNT 100 BLOCK 5000 STREAMS koder:events:id >
  3. Per message: deserialize → handle (idempotent — same event_id may be re-delivered)
  4. Success: XACK koder:events:id <group> <message_id>
  5. Failure: log + leave un-ack'd; XPENDING tracks; retry via XCLAIM after timeout
  6. Hard failure (>3 retries): XCLAIM + move to DLQ stream koder:events:id:dlq for ops

R5 — Idempotency contract

Consumers MUST be idempotent per event_id. Recommended:

  • Maintain processed_events table with event_id PK + TTL of 24h
  • Skip if event_id already processed within window
  • For erasure cascade: anonimization/deletion are naturally idempotent (reapplying = noop)

R6 — Publisher behavior

Publisher (e.g. ErasureService.ExecuteErasure):

  1. Generate event_id via ULID
  2. XADD koder:events:id MAXLEN ~ 100000 * type ... event_id ... data ...
  3. MAXLEN ~ 100000 (~ approx) trims stream to last 100k events; older purge automatically
  4. On Redis failure: log + emit fallback inprocess EventBus (existing) for SAMEprocess consumers; cross-service consumers retry on next poll if Redis recovers

R7 — TLS + auth

  • Production: Redis TLS + AUTH password (env KODER_REDIS_URL=rediss://user:pass@host:6379/0)
  • Development: plain Redis OK
  • Connection pooling via pkg/redisclient (existing)

R8 — Observability

Publishers + consumers emit slog:

  • Publish: event_emitted {type, event_id, subject_id, stream}
  • Consume start: event_received {type, event_id, group, consumer}
  • Consume ack: event_acked {type, event_id, elapsed_ms}
  • Consume fail: event_failed {type, event_id, error, retry_count}
  • DLQ: event_dlq {type, event_id, final_error} (warn level)

Metrics (Prometheus, future): eventcounttotal, eventlagseconds, eventdlqtotal.

R9 — Migration to kdb-stream

When kdbstream layer ships per RFC001:

  1. Add KODER_EVENT_BUS=kdb-stream env flag
  2. New pkg/eventbus/kdb_stream.go impl satisfying same publisher/consumer interfaces
  3. Migration script copies inflight stream state from Redis → kdbstream
  4. Cutover: env flip + restart services
  5. Redis decommission

Dropin because XADD/XREADGROUP have direct parallels in kdbstream design.

Implementation

Wave 1 (now) — publisher abstraction in engine

  • [ ] services/foundation/id/engine/pkg/eventbus/ package
  • [ ] Publisher interface (Publish method) + Redis Streams impl
  • [ ] Consumer interface (Subscribe + Process methods) + Redis Streams impl
  • [ ] Wire publisher in ErasureService.ExecuteErasure (replace direct identity.PublishUserErased with bus.Publish; inprocess EventBus keeps firing for SAMEprocess consumers via dual-emit)

Wave 2 — consumers in sibling services

  • [ ] services/auth/internal/listener/erasure_listener.go (ticket #091)
  • [ ] services/sso/internal/listener/erasure_listener.go (ticket #092)
  • [ ] services/keys/internal/listener/erasure_listener.go (ticket #093)

Wave 3 — DLQ + observability + metrics

  • [ ] DLQ stream + admin endpoint to inspect/replay
  • [ ] Prometheus metrics

Test contract

T1 — Round-trip happy path

Publisher → XADD → Consumer XREADGROUP → handler invoked → XACK. Assert: message acked, handler received correct envelope.

T2 — Consumer idempotency

Same event delivered twice → handler invoked twice but business state changes once. Assert: anonimization/deletion idempotent.

T3 — Retry on failure

Handler returns error → message stays un-ack'd → XCLAIM after timeout → retry. Assert: 2nd attempt invokes handler.

T4 — DLQ on persistent failure

Handler fails 3× → message moved to DLQ stream. Assert: original stream cleared, DLQ has entry with error.

T5 — Concurrent consumers

2 consumer instances in same group → messages distributed (load balanced). Assert: each msg processed exactly once across instances.

N1 — Redis down on publish

Publisher Redis call fails → falls back to in-process EventBus + retry on next event. Assert: sameprocess consumers still work; crossservice consumers catch up on Redis recover.

N2 — Malformed event

Consumer receives event with missing required field → moves to DLQ immediately, doesn't crash.

Não-escopo

  • Sameprocess pub/sub: existing inprocess EventBus continues for badges, webhooks, etc. This spec only covers CROSS-process delivery.
  • Synchronous request/reply (RPC pattern) — outofscope; events are fireandforget.
  • Event sourcing / replayfromzero — Wave 3 of erasure flow (#094 covers replay from erasure_requests table; not from stream).

Referências

Source: ../home/koder/dev/koder/meta/docs/stack/specs/messaging/cross-service-events.kmd