Cross-Service Event Delivery (Redis Streams)
Cross-Service Event Delivery (Redis Streams)
Origem
services/foundation/id/engine/services/identity/internal/service/event_bus.go autodocumenta: "intentionally simple — a KDBbacked durable queue can replace it later without changing the publisher interface." Esse "later" chegou: tickets #091, #092, #093 precisam que authssokeys services consumam identity.user.erased cross-process.
Ticket #097 levantou 3 opções (gRPC RPC, durable queue, webhook reuse). Esta spec ratifica *pção B — Durable queue via Redis Streams*
Decisão (20260514)
*edis Streams*é o substrate escolhido para event delivery cross-service *té que kdbstream layer da RFC001 shipe* momento em que a migração é drop-in (mesma semântica pub/sub).
Razões:
- *euse-first*—
pkg/redisclient/redisclient.gojá existe no engine; sem nova infra - *elf
hostedfirst*— Redis é OSS, self-hostable, deployment já estabelecido em ops - *onsumer groups*— retry + offset + DLQ built-in; cada serviço tem seu próprio progresso
- *igration path*— kdb-stream futuro usa XADD/XREADGROUP semantics paralelas
- *rop simplicity vs NATS JetStream*— Redis já está in-stack; NATS exigiria nova infra
- *ebhook_deliveries reuse rejeitada*— semantic stretch (intended for external webhooks, polling latency ruim pra cascade)
Contrato (R-rules)
R1 — Stream key naming
Único stream global por serviço-origem:
| Stream key | Publisher | Consumers |
|---|---|---|
koder:events:id |
services/foundation/id/engine (identity) |
auth, sso, keys |
koder:events:auth (futuro) |
services/foundation/id/engine (auth) |
(TBD per event) |
Prefixo koder:events: reservado.
R2 — Event envelope
Cada XADD carrega campos chave-valor (Redis Streams native format):
XADD koder:events:id * \
type identity.user.erased \
event_id evt_<ulid> \
occurred_at 2026-05-14T12:34:56Z \
tenant_id tenant-A \
subject_id user-u123 \
actor system \
data '{"erasure_id":"ERS-2026-05-14-abc12345"}'Campos obrigatórios:
type— event type (e.g.identity.user.erased)event_id— globally unique (ULID); idempotency key for consumersoccurred_at— ISO 8601 UTCtenant_id— tenant scopesubject_id— primary subject (userid, workspaceid, etc.)actor— who/what triggered (system,user:<id>,admin:<id>)data— JSONencoded eventspecific payload
R3 — Consumer group naming
Cada serviço consumidor cria seu próprio consumer group:
<service-name>-<purpose>
e.g.:
auth-erasure-cascade
sso-erasure-cascade
keys-erasure-cascadePattern: <service>-<purpose>. Group names são globally unique per stream.
R4 — Consumer behavior
- Boot-time:
XGROUP CREATE koder:events:id <group> $ MKSTREAM(start from now; future events only) - Loop:
XREADGROUP GROUP <group> <consumer-id> COUNT 100 BLOCK 5000 STREAMS koder:events:id > - Per message: deserialize → handle (idempotent — same
event_idmay be re-delivered) - Success:
XACK koder:events:id <group> <message_id> - Failure: log + leave un-ack'd; XPENDING tracks; retry via
XCLAIMafter timeout - Hard failure (>3 retries):
XCLAIM+ move to DLQ streamkoder:events:id:dlqfor ops
R5 — Idempotency contract
Consumers MUST be idempotent per event_id. Recommended:
- Maintain
processed_eventstable withevent_idPK + TTL of 24h - Skip if
event_idalready processed within window - For erasure cascade: anonimization/deletion are naturally idempotent (re
applying = noop)
R6 — Publisher behavior
Publisher (e.g. ErasureService.ExecuteErasure):
- Generate
event_idvia ULID XADD koder:events:id MAXLEN ~ 100000 * type ... event_id ... data ...- MAXLEN ~ 100000 (~ approx) trims stream to last 100k events; older purge automatically
- On Redis failure: log + emit fallback in
process EventBus (existing) for SAMEprocess consumers; cross-service consumers retry on next poll if Redis recovers
R7 — TLS + auth
- Production: Redis TLS + AUTH password (env
KODER_REDIS_URL=rediss://user:pass@host:6379/0) - Development: plain Redis OK
- Connection pooling via
pkg/redisclient(existing)
R8 — Observability
Publishers + consumers emit slog:
- Publish:
event_emitted {type, event_id, subject_id, stream} - Consume start:
event_received {type, event_id, group, consumer} - Consume ack:
event_acked {type, event_id, elapsed_ms} - Consume fail:
event_failed {type, event_id, error, retry_count} - DLQ:
event_dlq {type, event_id, final_error}(warn level)
Metrics (Prometheus, future): eventcounttotal, eventlagseconds, eventdlqtotal.
R9 — Migration to kdb-stream
When kdbstream layer ships per RFC001:
- Add
KODER_EVENT_BUS=kdb-streamenv flag - New
pkg/eventbus/kdb_stream.goimpl satisfying same publisher/consumer interfaces - Migration script copies in
flight stream state from Redis → kdbstream - Cutover: env flip + restart services
- Redis decommission
Dropin because XADD/XREADGROUP have direct parallels in kdbstream design.
Implementation
Wave 1 (now) — publisher abstraction in engine
- [ ]
services/foundation/id/engine/pkg/eventbus/package - [ ]
Publisherinterface (Publish method) + Redis Streams impl - [ ]
Consumerinterface (Subscribe + Process methods) + Redis Streams impl - [ ] Wire publisher in
ErasureService.ExecuteErasure(replace directidentity.PublishUserErasedwith bus.Publish; inprocess EventBus keeps firing for SAMEprocess consumers via dual-emit)
Wave 2 — consumers in sibling services
- [ ]
services/auth/internal/listener/erasure_listener.go(ticket #091) - [ ]
services/sso/internal/listener/erasure_listener.go(ticket #092) - [ ]
services/keys/internal/listener/erasure_listener.go(ticket #093)
Wave 3 — DLQ + observability + metrics
- [ ] DLQ stream + admin endpoint to inspect/replay
- [ ] Prometheus metrics
Test contract
T1 — Round-trip happy path
Publisher → XADD → Consumer XREADGROUP → handler invoked → XACK. Assert: message acked, handler received correct envelope.
T2 — Consumer idempotency
Same event delivered twice → handler invoked twice but business state changes once. Assert: anonimization/deletion idempotent.
T3 — Retry on failure
Handler returns error → message stays un-ack'd → XCLAIM after timeout → retry. Assert: 2nd attempt invokes handler.
T4 — DLQ on persistent failure
Handler fails 3× → message moved to DLQ stream. Assert: original stream cleared, DLQ has entry with error.
T5 — Concurrent consumers
2 consumer instances in same group → messages distributed (load balanced). Assert: each msg processed exactly once across instances.
N1 — Redis down on publish
Publisher Redis call fails → falls back to in-process EventBus + retry on next event. Assert: sameprocess consumers still work; crossservice consumers catch up on Redis recover.
N2 — Malformed event
Consumer receives event with missing required field → moves to DLQ immediately, doesn't crash.
Não-escopo
- Same
process pub/sub: existing inprocessEventBuscontinues for badges, webhooks, etc. This spec only covers CROSS-process delivery. - Synchronous request/reply (RPC pattern) — out
ofscope; events are fireandforget. - Event sourcing / replay
fromzero — Wave 3 of erasure flow (#094covers replay fromerasure_requeststable; not from stream).
Referências
- Redis Streams docs: https://redis.io/docs/data-types/streams/
- Consumer groups: https://redis.io/docs/data-types/streams/#consumer-groups
- RFC
001 kdbasunifieddataplane § Layers (kdbstream future) - servicesfoundationid/engine#097 (decision ticket)