// Package subscriptions provides a reusable, type-generic registry for fanning // out change notifications to in-process GraphQL subscription consumers. // // It is the shared core of Shiny's cross-service read-your-writes pattern // (ADR-0012): an entity shown in the UI is frequently projected from another // service's event, so the owning service exposes a GraphQL subscription, drives // it from a per-replica transient AMQP consumer, and pushes a notification once // the change is visible in its own read view — the client then refetches the // authoritative query. Two services hand-rolled this (authz-service's // availableCompanies, accounting-service's entryBasesChanged); this package is // the extracted, hardened core so further cases reuse it instead of copying it. // // # Wiring requirement (do not get this wrong) // // Submit MUST be fed from a per-replica goamqp.TransientEventStreamConsumer (an // exclusive, randomly-named queue) bound to the owning service's OWN events, so // every replica receives every event and can push to the websockets it holds. // This is necessarily a DIFFERENT consumer from the shared, durable read-view // projection consumer (a work-queue, where exactly one replica handles each // event). Wiring Submit to a shared/durable consumer silently breaks delivery in // a multi-replica deployment: the one replica that handles an event usually does // not hold the subscriber's websocket, so the poke is lost with no error. The // library cannot enforce this (it is transport-agnostic) — the caller must. // // # Concurrency model // // - AddReceiver registers a subscriber (one per websocket) and returns a // buffered channel plus a cleanup func that must be called when the // subscription ends. // - Submit is called from the AMQP event handler. It does NOT block that // handler on the read view: it hands the work to a per-key worker that waits // (with a budget) for the read view to reflect the change — via the caller's // [Producer] — and only then pushes. Acking the AMQP message immediately is // safe because notifications are idempotent and drop-tolerant (see below). // - Work is sharded by key, so all events for one key are processed FIFO by a // single worker (preserving per-key order even for payloads the client // consumes directly), while distinct keys run in parallel (so one lagging // read view only delays its own key's shard, not everything). // // Pushes happen while holding the read lock, and cleanup closes a subscriber's // channel under the write lock, so a send can never race a close ("send on // closed channel"). A full subscriber buffer drops the notification rather than // blocking a slow consumer. // // # Payload contract // // T should be a lightweight notification the client reacts to by refetching the // authoritative query (a poke such as {id, removed}), not authoritative state // the client consumes as the source of truth. Notifications may be dropped (full // queue or buffer) and the push is best-effort (no AMQP requeue on a persistent // read failure), so reliability comes from the client's idempotent refetch. // Per-key FIFO ordering is preserved, so a payload the client does consume // directly is at least delivered in event order for a given key — but it can // still be dropped, so a refetch-on-receipt design is strongly preferred. package subscriptions import ( "context" "errors" "hash/fnv" "log/slog" "sync" "time" "github.com/google/uuid" ) // ErrEmptyKey is returned by AddReceiver when the subscription key is empty, // which almost always indicates an unpopulated id at the call site. var ErrEmptyKey = errors.New("subscriptions: empty subscription key") // Producer reads the owning service's read view and returns the payload to push // together with whether the change is yet visible there. // // The registry calls a Producer repeatedly until it reports ready (or a bounded // budget elapses), so the client's refetch can never race ahead of the // projection. A Producer MUST read current read-view state on each call (rather // than capturing the event's historical state). On a transient read error it // should return (nil, false) to keep waiting — the event is already durable, so // only the projection is lagging. // // Convergence precondition: "retry until ready" only terminates as ready (rather // than burning the whole budget then skipping) if the read view the Producer // gates on is made consistent by the SAME ordered aggregate stream as the // triggering event. Gating on a row populated by a different aggregate's events // can fail to converge. type Producer[T any] func(ctx context.Context) (payload *T, ready bool) // Observer receives best-effort notifications about pushes, for // metrics/observability. Its methods may be called concurrently. The default is // a no-op; wire an implementation (e.g. OTel counters) via [WithObserver]. type Observer interface { // Pushed reports that a change was gated and delivered to the key's // subscribers — the denominator for a skip/drop rate. Pushed(key string) // PushSkipped reports that the read view never reflected the change within // the retry budget, so the push was skipped. PushSkipped(key string) // Dropped reports that the worker queue was full, so the notification was // dropped before it could be gated. Dropped(key string) // ChannelFull reports that a subscriber's buffer was full, so its // notification was dropped. ChannelFull(key string) } type noopObserver struct{} func (noopObserver) Pushed(string) {} func (noopObserver) PushSkipped(string) {} func (noopObserver) Dropped(string) {} func (noopObserver) ChannelFull(string) {} type subscriber[T any] struct { id string channel chan *T } type job[T any] struct { key string produce Producer[T] } // Registry fans out notifications of type T to in-process subscribers keyed by // an arbitrary string (e.g. a company id or a user email). The zero value is // not usable; construct one with [New]. type Registry[T any] struct { logger *slog.Logger obs Observer bufferSize int retries int retryWait time.Duration mu sync.RWMutex subscribers map[string]map[string]*subscriber[T] shards []chan job[T] baseCtx context.Context cancel context.CancelFunc done chan struct{} wg sync.WaitGroup closeOnce sync.Once } type config struct { logger *slog.Logger obs Observer bufferSize int retries int retryWait time.Duration workers int queueSize int } // Option configures a [Registry]. type Option func(*config) // WithLogger sets the structured logger. Defaults to [slog.Default]. func WithLogger(l *slog.Logger) Option { return func(c *config) { if l != nil { c.logger = l } } } // WithObserver wires a metrics observer. Defaults to a no-op. func WithObserver(o Observer) Option { return func(c *config) { if o != nil { c.obs = o } } } // WithReadRetry tunes how long a worker waits for the read view to reflect a // change before giving up on the push: up to attempts re-reads spaced by wait // (so total reads = attempts+1; default 25 × 200ms ≈ 5s). The read-view consumer // and this subscription consumer are independent consumers of the same event, so // a freshly-projected change may not be visible on the first read; retrying // avoids pushing a notification the client would refetch ahead of. Non-positive // values are ignored (attempts clamps to ≥0). func WithReadRetry(attempts int, wait time.Duration) Option { return func(c *config) { if attempts >= 0 { c.retries = attempts } if wait > 0 { c.retryWait = wait } } } // WithBufferSize sets each subscriber channel's buffer (default 20). A full // buffer drops the notification rather than blocking. func WithBufferSize(n int) Option { return func(c *config) { if n > 0 { c.bufferSize = n } } } // WithWorkers sets the number of key-shard workers (default 4). Each key is // handled FIFO by exactly one worker; more workers spread distinct keys over // more goroutines so a lagging read view delays only its own shard. func WithWorkers(n int) Option { return func(c *config) { if n > 0 { c.workers = n } } } // WithQueueSize sets each shard's job-queue depth (default 64). A full queue // drops the notification (reported via [Observer.Dropped]) rather than blocking // the AMQP delivery goroutine. func WithQueueSize(n int) Option { return func(c *config) { if n > 0 { c.queueSize = n } } } // New builds a Registry and starts its key-shard workers. Call [Registry.Close] // to stop them (optional; they exit with the process otherwise). func New[T any](opts ...Option) *Registry[T] { c := &config{ logger: slog.Default(), obs: noopObserver{}, bufferSize: 20, retries: 25, retryWait: 200 * time.Millisecond, workers: 4, queueSize: 64, } for _, o := range opts { o(c) } ctx, cancel := context.WithCancel(context.Background()) r := &Registry[T]{ logger: c.logger, obs: c.obs, bufferSize: c.bufferSize, retries: c.retries, retryWait: c.retryWait, subscribers: make(map[string]map[string]*subscriber[T]), shards: make([]chan job[T], c.workers), baseCtx: ctx, cancel: cancel, done: make(chan struct{}), } r.wg.Add(c.workers) for i := range r.shards { r.shards[i] = make(chan job[T], c.queueSize) go r.worker(r.shards[i]) } return r } // AddReceiver registers a subscriber for the given key. It returns the channel // to stream and a cleanup func that MUST be called when the subscription ends // (e.g. from the resolver's ctx.Done) to close the channel and release the // registration. cleanup is idempotent. Returns [ErrEmptyKey] for an empty key. func (r *Registry[T]) AddReceiver(key string) (<-chan *T, func(), error) { if key == "" { return nil, nil, ErrEmptyKey } s := &subscriber[T]{ id: uuid.NewString(), channel: make(chan *T, r.bufferSize), } r.mu.Lock() if r.subscribers[key] == nil { r.subscribers[key] = make(map[string]*subscriber[T]) } r.subscribers[key][s.id] = s total := len(r.subscribers[key]) r.mu.Unlock() r.logger.Info("subscription registered", "key", key, "subscription_id", s.id, "total_subscriptions", total) cleanup := func() { r.removeReceiver(key, s.id) } return s.channel, cleanup, nil } func (r *Registry[T]) removeReceiver(key, id string) { r.mu.Lock() defer r.mu.Unlock() subs := r.subscribers[key] if subs == nil { return } s, ok := subs[id] if !ok { return } close(s.channel) delete(subs, id) remaining := len(subs) if remaining == 0 { delete(r.subscribers, key) } r.logger.Info("subscription removed", "key", key, "subscription_id", id, "remaining_subscriptions", remaining) } // Submit schedules a gated push for the given key. It returns immediately: when // the key is empty or has no subscribers it does nothing, otherwise it enqueues // work for that key's shard worker (dropping, with an [Observer.Dropped], only if // the shard queue is full). produce is invoked on the worker, not on the calling // goroutine. func (r *Registry[T]) Submit(key string, produce Producer[T]) { if key == "" || !r.hasSubscribers(key) { return } shard := r.shards[shardIndex(key, len(r.shards))] select { case shard <- job[T]{key: key, produce: produce}: case <-r.done: // shutting down default: r.logger.Warn("subscription job queue full; dropping notification", "key", key) r.obs.Dropped(key) } } // Close stops the worker pool and waits for in-flight gating to finish. It // cancels any in-flight read-view wait so workers return promptly rather than // blocking for the full retry budget. Queued-but-unstarted notifications are // dropped (safe — they are drop-tolerant). Idempotent; Submit after Close is a // no-op. func (r *Registry[T]) Close() { r.closeOnce.Do(func() { r.cancel() close(r.done) r.wg.Wait() }) } func (r *Registry[T]) hasSubscribers(key string) bool { r.mu.RLock() defer r.mu.RUnlock() return len(r.subscribers[key]) > 0 } func (r *Registry[T]) worker(shard <-chan job[T]) { defer r.wg.Done() for { select { case j := <-shard: r.handle(j) case <-r.done: return } } } func (r *Registry[T]) handle(j job[T]) { // Bound the read-view wait so a hung read or a never-reflected event (e.g. // redelivery of an event for a since-deleted entity) can't pin a worker. // Derived from baseCtx so Close cancels in-flight waits promptly. budget := time.Duration(r.retries+1) * r.retryWait ctx, cancel := context.WithTimeout(r.baseCtx, budget) defer cancel() payload, ok := r.await(ctx, j.produce) if !ok { r.logger.Warn("change not visible in read view after retries; subscription push skipped", "key", j.key) r.obs.PushSkipped(j.key) return } r.obs.Pushed(j.key) r.push(j.key, payload) } func (r *Registry[T]) await(ctx context.Context, produce Producer[T]) (*T, bool) { for attempt := 0; ; attempt++ { if payload, ready := produce(ctx); ready { return payload, true } if attempt >= r.retries { return nil, false } select { case <-ctx.Done(): return nil, false case <-time.After(r.retryWait): } } } // push delivers payload to every subscriber of key. The sends happen under the // read lock: sendNonBlocking never blocks (it drops on a full buffer), so // holding the lock is cheap, and it prevents removeReceiver — which takes the // write lock to close a channel — from closing a channel out from under an // in-flight send. func (r *Registry[T]) push(key string, payload *T) { r.mu.RLock() defer r.mu.RUnlock() for _, s := range r.subscribers[key] { select { case s.channel <- payload: default: r.logger.Warn("subscription channel full; dropping notification", "key", key) r.obs.ChannelFull(key) } } } func shardIndex(key string, n int) int { h := fnv.New32a() _, _ = h.Write([]byte(key)) return int(h.Sum32() % uint32(n)) //nolint:gosec // modulo keeps the result in [0,n) }