422 lines
14 KiB
Go
422 lines
14 KiB
Go
|
|
// Package subscriptions provides a reusable, type-generic registry for fanning
|
|||
|
|
// out change notifications to in-process GraphQL subscription consumers.
|
|||
|
|
//
|
|||
|
|
// It is the shared core of Shiny's cross-service read-your-writes pattern
|
|||
|
|
// (ADR-0012): an entity shown in the UI is frequently projected from another
|
|||
|
|
// service's event, so the owning service exposes a GraphQL subscription, drives
|
|||
|
|
// it from a per-replica transient AMQP consumer, and pushes a notification once
|
|||
|
|
// the change is visible in its own read view — the client then refetches the
|
|||
|
|
// authoritative query. Two services hand-rolled this (authz-service's
|
|||
|
|
// availableCompanies, accounting-service's entryBasesChanged); this package is
|
|||
|
|
// the extracted, hardened core so further cases reuse it instead of copying it.
|
|||
|
|
//
|
|||
|
|
// # Wiring requirement (do not get this wrong)
|
|||
|
|
//
|
|||
|
|
// Submit MUST be fed from a per-replica goamqp.TransientEventStreamConsumer (an
|
|||
|
|
// exclusive, randomly-named queue) bound to the owning service's OWN events, so
|
|||
|
|
// every replica receives every event and can push to the websockets it holds.
|
|||
|
|
// This is necessarily a DIFFERENT consumer from the shared, durable read-view
|
|||
|
|
// projection consumer (a work-queue, where exactly one replica handles each
|
|||
|
|
// event). Wiring Submit to a shared/durable consumer silently breaks delivery in
|
|||
|
|
// a multi-replica deployment: the one replica that handles an event usually does
|
|||
|
|
// not hold the subscriber's websocket, so the poke is lost with no error. The
|
|||
|
|
// library cannot enforce this (it is transport-agnostic) — the caller must.
|
|||
|
|
//
|
|||
|
|
// # Concurrency model
|
|||
|
|
//
|
|||
|
|
// - AddReceiver registers a subscriber (one per websocket) and returns a
|
|||
|
|
// buffered channel plus a cleanup func that must be called when the
|
|||
|
|
// subscription ends.
|
|||
|
|
// - Submit is called from the AMQP event handler. It does NOT block that
|
|||
|
|
// handler on the read view: it hands the work to a per-key worker that waits
|
|||
|
|
// (with a budget) for the read view to reflect the change — via the caller's
|
|||
|
|
// [Producer] — and only then pushes. Acking the AMQP message immediately is
|
|||
|
|
// safe because notifications are idempotent and drop-tolerant (see below).
|
|||
|
|
// - Work is sharded by key, so all events for one key are processed FIFO by a
|
|||
|
|
// single worker (preserving per-key order even for payloads the client
|
|||
|
|
// consumes directly), while distinct keys run in parallel (so one lagging
|
|||
|
|
// read view only delays its own key's shard, not everything).
|
|||
|
|
//
|
|||
|
|
// Pushes happen while holding the read lock, and cleanup closes a subscriber's
|
|||
|
|
// channel under the write lock, so a send can never race a close ("send on
|
|||
|
|
// closed channel"). A full subscriber buffer drops the notification rather than
|
|||
|
|
// blocking a slow consumer.
|
|||
|
|
//
|
|||
|
|
// # Payload contract
|
|||
|
|
//
|
|||
|
|
// T should be a lightweight notification the client reacts to by refetching the
|
|||
|
|
// authoritative query (a poke such as {id, removed}), not authoritative state
|
|||
|
|
// the client consumes as the source of truth. Notifications may be dropped (full
|
|||
|
|
// queue or buffer) and the push is best-effort (no AMQP requeue on a persistent
|
|||
|
|
// read failure), so reliability comes from the client's idempotent refetch.
|
|||
|
|
// Per-key FIFO ordering is preserved, so a payload the client does consume
|
|||
|
|
// directly is at least delivered in event order for a given key — but it can
|
|||
|
|
// still be dropped, so a refetch-on-receipt design is strongly preferred.
|
|||
|
|
package subscriptions
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"errors"
|
|||
|
|
"hash/fnv"
|
|||
|
|
"log/slog"
|
|||
|
|
"sync"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
"github.com/google/uuid"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// ErrEmptyKey is returned by AddReceiver when the subscription key is empty,
|
|||
|
|
// which almost always indicates an unpopulated id at the call site.
|
|||
|
|
var ErrEmptyKey = errors.New("subscriptions: empty subscription key")
|
|||
|
|
|
|||
|
|
// Producer reads the owning service's read view and returns the payload to push
|
|||
|
|
// together with whether the change is yet visible there.
|
|||
|
|
//
|
|||
|
|
// The registry calls a Producer repeatedly until it reports ready (or a bounded
|
|||
|
|
// budget elapses), so the client's refetch can never race ahead of the
|
|||
|
|
// projection. A Producer MUST read current read-view state on each call (rather
|
|||
|
|
// than capturing the event's historical state). On a transient read error it
|
|||
|
|
// should return (nil, false) to keep waiting — the event is already durable, so
|
|||
|
|
// only the projection is lagging.
|
|||
|
|
//
|
|||
|
|
// Convergence precondition: "retry until ready" only terminates as ready (rather
|
|||
|
|
// than burning the whole budget then skipping) if the read view the Producer
|
|||
|
|
// gates on is made consistent by the SAME ordered aggregate stream as the
|
|||
|
|
// triggering event. Gating on a row populated by a different aggregate's events
|
|||
|
|
// can fail to converge.
|
|||
|
|
type Producer[T any] func(ctx context.Context) (payload *T, ready bool)
|
|||
|
|
|
|||
|
|
// Observer receives best-effort notifications about pushes, for
|
|||
|
|
// metrics/observability. Its methods may be called concurrently. The default is
|
|||
|
|
// a no-op; wire an implementation (e.g. OTel counters) via [WithObserver].
|
|||
|
|
type Observer interface {
|
|||
|
|
// Pushed reports that a change was gated and delivered to the key's
|
|||
|
|
// subscribers — the denominator for a skip/drop rate.
|
|||
|
|
Pushed(key string)
|
|||
|
|
// PushSkipped reports that the read view never reflected the change within
|
|||
|
|
// the retry budget, so the push was skipped.
|
|||
|
|
PushSkipped(key string)
|
|||
|
|
// Dropped reports that the worker queue was full, so the notification was
|
|||
|
|
// dropped before it could be gated.
|
|||
|
|
Dropped(key string)
|
|||
|
|
// ChannelFull reports that a subscriber's buffer was full, so its
|
|||
|
|
// notification was dropped.
|
|||
|
|
ChannelFull(key string)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
type noopObserver struct{}
|
|||
|
|
|
|||
|
|
func (noopObserver) Pushed(string) {}
|
|||
|
|
func (noopObserver) PushSkipped(string) {}
|
|||
|
|
func (noopObserver) Dropped(string) {}
|
|||
|
|
func (noopObserver) ChannelFull(string) {}
|
|||
|
|
|
|||
|
|
type subscriber[T any] struct {
|
|||
|
|
id string
|
|||
|
|
channel chan *T
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
type job[T any] struct {
|
|||
|
|
key string
|
|||
|
|
produce Producer[T]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Registry fans out notifications of type T to in-process subscribers keyed by
|
|||
|
|
// an arbitrary string (e.g. a company id or a user email). The zero value is
|
|||
|
|
// not usable; construct one with [New].
|
|||
|
|
type Registry[T any] struct {
|
|||
|
|
logger *slog.Logger
|
|||
|
|
obs Observer
|
|||
|
|
bufferSize int
|
|||
|
|
retries int
|
|||
|
|
retryWait time.Duration
|
|||
|
|
|
|||
|
|
mu sync.RWMutex
|
|||
|
|
subscribers map[string]map[string]*subscriber[T]
|
|||
|
|
|
|||
|
|
shards []chan job[T]
|
|||
|
|
baseCtx context.Context
|
|||
|
|
cancel context.CancelFunc
|
|||
|
|
done chan struct{}
|
|||
|
|
wg sync.WaitGroup
|
|||
|
|
closeOnce sync.Once
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
type config struct {
|
|||
|
|
logger *slog.Logger
|
|||
|
|
obs Observer
|
|||
|
|
bufferSize int
|
|||
|
|
retries int
|
|||
|
|
retryWait time.Duration
|
|||
|
|
workers int
|
|||
|
|
queueSize int
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Option configures a [Registry].
|
|||
|
|
type Option func(*config)
|
|||
|
|
|
|||
|
|
// WithLogger sets the structured logger. Defaults to [slog.Default].
|
|||
|
|
func WithLogger(l *slog.Logger) Option {
|
|||
|
|
return func(c *config) {
|
|||
|
|
if l != nil {
|
|||
|
|
c.logger = l
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// WithObserver wires a metrics observer. Defaults to a no-op.
|
|||
|
|
func WithObserver(o Observer) Option {
|
|||
|
|
return func(c *config) {
|
|||
|
|
if o != nil {
|
|||
|
|
c.obs = o
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// WithReadRetry tunes how long a worker waits for the read view to reflect a
|
|||
|
|
// change before giving up on the push: up to attempts re-reads spaced by wait
|
|||
|
|
// (so total reads = attempts+1; default 25 × 200ms ≈ 5s). The read-view consumer
|
|||
|
|
// and this subscription consumer are independent consumers of the same event, so
|
|||
|
|
// a freshly-projected change may not be visible on the first read; retrying
|
|||
|
|
// avoids pushing a notification the client would refetch ahead of. Non-positive
|
|||
|
|
// values are ignored (attempts clamps to ≥0).
|
|||
|
|
func WithReadRetry(attempts int, wait time.Duration) Option {
|
|||
|
|
return func(c *config) {
|
|||
|
|
if attempts >= 0 {
|
|||
|
|
c.retries = attempts
|
|||
|
|
}
|
|||
|
|
if wait > 0 {
|
|||
|
|
c.retryWait = wait
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// WithBufferSize sets each subscriber channel's buffer (default 20). A full
|
|||
|
|
// buffer drops the notification rather than blocking.
|
|||
|
|
func WithBufferSize(n int) Option {
|
|||
|
|
return func(c *config) {
|
|||
|
|
if n > 0 {
|
|||
|
|
c.bufferSize = n
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// WithWorkers sets the number of key-shard workers (default 4). Each key is
|
|||
|
|
// handled FIFO by exactly one worker; more workers spread distinct keys over
|
|||
|
|
// more goroutines so a lagging read view delays only its own shard.
|
|||
|
|
func WithWorkers(n int) Option {
|
|||
|
|
return func(c *config) {
|
|||
|
|
if n > 0 {
|
|||
|
|
c.workers = n
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// WithQueueSize sets each shard's job-queue depth (default 64). A full queue
|
|||
|
|
// drops the notification (reported via [Observer.Dropped]) rather than blocking
|
|||
|
|
// the AMQP delivery goroutine.
|
|||
|
|
func WithQueueSize(n int) Option {
|
|||
|
|
return func(c *config) {
|
|||
|
|
if n > 0 {
|
|||
|
|
c.queueSize = n
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// New builds a Registry and starts its key-shard workers. Call [Registry.Close]
|
|||
|
|
// to stop them (optional; they exit with the process otherwise).
|
|||
|
|
func New[T any](opts ...Option) *Registry[T] {
|
|||
|
|
c := &config{
|
|||
|
|
logger: slog.Default(),
|
|||
|
|
obs: noopObserver{},
|
|||
|
|
bufferSize: 20,
|
|||
|
|
retries: 25,
|
|||
|
|
retryWait: 200 * time.Millisecond,
|
|||
|
|
workers: 4,
|
|||
|
|
queueSize: 64,
|
|||
|
|
}
|
|||
|
|
for _, o := range opts {
|
|||
|
|
o(c)
|
|||
|
|
}
|
|||
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|||
|
|
r := &Registry[T]{
|
|||
|
|
logger: c.logger,
|
|||
|
|
obs: c.obs,
|
|||
|
|
bufferSize: c.bufferSize,
|
|||
|
|
retries: c.retries,
|
|||
|
|
retryWait: c.retryWait,
|
|||
|
|
subscribers: make(map[string]map[string]*subscriber[T]),
|
|||
|
|
shards: make([]chan job[T], c.workers),
|
|||
|
|
baseCtx: ctx,
|
|||
|
|
cancel: cancel,
|
|||
|
|
done: make(chan struct{}),
|
|||
|
|
}
|
|||
|
|
r.wg.Add(c.workers)
|
|||
|
|
for i := range r.shards {
|
|||
|
|
r.shards[i] = make(chan job[T], c.queueSize)
|
|||
|
|
go r.worker(r.shards[i])
|
|||
|
|
}
|
|||
|
|
return r
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// AddReceiver registers a subscriber for the given key. It returns the channel
|
|||
|
|
// to stream and a cleanup func that MUST be called when the subscription ends
|
|||
|
|
// (e.g. from the resolver's ctx.Done) to close the channel and release the
|
|||
|
|
// registration. cleanup is idempotent. Returns [ErrEmptyKey] for an empty key.
|
|||
|
|
func (r *Registry[T]) AddReceiver(key string) (<-chan *T, func(), error) {
|
|||
|
|
if key == "" {
|
|||
|
|
return nil, nil, ErrEmptyKey
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
s := &subscriber[T]{
|
|||
|
|
id: uuid.NewString(),
|
|||
|
|
channel: make(chan *T, r.bufferSize),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
r.mu.Lock()
|
|||
|
|
if r.subscribers[key] == nil {
|
|||
|
|
r.subscribers[key] = make(map[string]*subscriber[T])
|
|||
|
|
}
|
|||
|
|
r.subscribers[key][s.id] = s
|
|||
|
|
total := len(r.subscribers[key])
|
|||
|
|
r.mu.Unlock()
|
|||
|
|
|
|||
|
|
r.logger.Info("subscription registered",
|
|||
|
|
"key", key, "subscription_id", s.id, "total_subscriptions", total)
|
|||
|
|
|
|||
|
|
cleanup := func() { r.removeReceiver(key, s.id) }
|
|||
|
|
return s.channel, cleanup, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (r *Registry[T]) removeReceiver(key, id string) {
|
|||
|
|
r.mu.Lock()
|
|||
|
|
defer r.mu.Unlock()
|
|||
|
|
|
|||
|
|
subs := r.subscribers[key]
|
|||
|
|
if subs == nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
s, ok := subs[id]
|
|||
|
|
if !ok {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
close(s.channel)
|
|||
|
|
delete(subs, id)
|
|||
|
|
remaining := len(subs)
|
|||
|
|
if remaining == 0 {
|
|||
|
|
delete(r.subscribers, key)
|
|||
|
|
}
|
|||
|
|
r.logger.Info("subscription removed",
|
|||
|
|
"key", key, "subscription_id", id, "remaining_subscriptions", remaining)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Submit schedules a gated push for the given key. It returns immediately: when
|
|||
|
|
// the key is empty or has no subscribers it does nothing, otherwise it enqueues
|
|||
|
|
// work for that key's shard worker (dropping, with an [Observer.Dropped], only if
|
|||
|
|
// the shard queue is full). produce is invoked on the worker, not on the calling
|
|||
|
|
// goroutine.
|
|||
|
|
func (r *Registry[T]) Submit(key string, produce Producer[T]) {
|
|||
|
|
if key == "" || !r.hasSubscribers(key) {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
shard := r.shards[shardIndex(key, len(r.shards))]
|
|||
|
|
select {
|
|||
|
|
case shard <- job[T]{key: key, produce: produce}:
|
|||
|
|
case <-r.done:
|
|||
|
|
// shutting down
|
|||
|
|
default:
|
|||
|
|
r.logger.Warn("subscription job queue full; dropping notification", "key", key)
|
|||
|
|
r.obs.Dropped(key)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Close stops the worker pool and waits for in-flight gating to finish. It
|
|||
|
|
// cancels any in-flight read-view wait so workers return promptly rather than
|
|||
|
|
// blocking for the full retry budget. Queued-but-unstarted notifications are
|
|||
|
|
// dropped (safe — they are drop-tolerant). Idempotent; Submit after Close is a
|
|||
|
|
// no-op.
|
|||
|
|
func (r *Registry[T]) Close() {
|
|||
|
|
r.closeOnce.Do(func() {
|
|||
|
|
r.cancel()
|
|||
|
|
close(r.done)
|
|||
|
|
r.wg.Wait()
|
|||
|
|
})
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (r *Registry[T]) hasSubscribers(key string) bool {
|
|||
|
|
r.mu.RLock()
|
|||
|
|
defer r.mu.RUnlock()
|
|||
|
|
return len(r.subscribers[key]) > 0
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (r *Registry[T]) worker(shard <-chan job[T]) {
|
|||
|
|
defer r.wg.Done()
|
|||
|
|
for {
|
|||
|
|
select {
|
|||
|
|
case j := <-shard:
|
|||
|
|
r.handle(j)
|
|||
|
|
case <-r.done:
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (r *Registry[T]) handle(j job[T]) {
|
|||
|
|
// Bound the read-view wait so a hung read or a never-reflected event (e.g.
|
|||
|
|
// redelivery of an event for a since-deleted entity) can't pin a worker.
|
|||
|
|
// Derived from baseCtx so Close cancels in-flight waits promptly.
|
|||
|
|
budget := time.Duration(r.retries+1) * r.retryWait
|
|||
|
|
ctx, cancel := context.WithTimeout(r.baseCtx, budget)
|
|||
|
|
defer cancel()
|
|||
|
|
|
|||
|
|
payload, ok := r.await(ctx, j.produce)
|
|||
|
|
if !ok {
|
|||
|
|
r.logger.Warn("change not visible in read view after retries; subscription push skipped",
|
|||
|
|
"key", j.key)
|
|||
|
|
r.obs.PushSkipped(j.key)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
r.obs.Pushed(j.key)
|
|||
|
|
r.push(j.key, payload)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (r *Registry[T]) await(ctx context.Context, produce Producer[T]) (*T, bool) {
|
|||
|
|
for attempt := 0; ; attempt++ {
|
|||
|
|
if payload, ready := produce(ctx); ready {
|
|||
|
|
return payload, true
|
|||
|
|
}
|
|||
|
|
if attempt >= r.retries {
|
|||
|
|
return nil, false
|
|||
|
|
}
|
|||
|
|
select {
|
|||
|
|
case <-ctx.Done():
|
|||
|
|
return nil, false
|
|||
|
|
case <-time.After(r.retryWait):
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// push delivers payload to every subscriber of key. The sends happen under the
|
|||
|
|
// read lock: sendNonBlocking never blocks (it drops on a full buffer), so
|
|||
|
|
// holding the lock is cheap, and it prevents removeReceiver — which takes the
|
|||
|
|
// write lock to close a channel — from closing a channel out from under an
|
|||
|
|
// in-flight send.
|
|||
|
|
func (r *Registry[T]) push(key string, payload *T) {
|
|||
|
|
r.mu.RLock()
|
|||
|
|
defer r.mu.RUnlock()
|
|||
|
|
for _, s := range r.subscribers[key] {
|
|||
|
|
select {
|
|||
|
|
case s.channel <- payload:
|
|||
|
|
default:
|
|||
|
|
r.logger.Warn("subscription channel full; dropping notification", "key", key)
|
|||
|
|
r.obs.ChannelFull(key)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func shardIndex(key string, n int) int {
|
|||
|
|
h := fnv.New32a()
|
|||
|
|
_, _ = h.Write([]byte(key))
|
|||
|
|
return int(h.Sum32() % uint32(n)) //nolint:gosec // modulo keeps the result in [0,n)
|
|||
|
|
}
|