# subscriptions Shared Go library: the reusable core of Shiny's cross-service read-your-writes GraphQL subscriptions. ## Shared Documentation @../docs/claude/architecture.md @../docs/claude/go-services.md @../docs/claude/event-sourcing.md @../docs/claude/conventions.md ## Library Information ### Purpose Single home for the subscription subscriber-registry + read-view gate + fan-out that was hand-rolled (near-identically) in `authz-service/subscription` and `accounting-service/subscription`. Implements the mechanism mandated by **ADR-0012** (cross-service read-your-writes via owning-service subscriptions), which is the concrete form of **ADR-0009** tier-3. ADR-0012 requires new instances of the pattern to use this library rather than copy it. ### Usage ```go import "gitea.unbound.se/shiny/subscriptions" // One registry per subscription field, parameterised by the GraphQL payload. reg := subscriptions.New[model.EntryBasisChange]( subscriptions.WithLogger(logger), subscriptions.WithObserver(otelObserver), // optional metrics ) // Resolver — register the websocket consumer; cleanup on ctx.Done. ch, cleanup, err := reg.AddReceiver(companyID) // AMQP Process — gate on the read view, push off the delivery goroutine. reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) { basis, err := readView.FindEntryBasisById(ctx, id) if err != nil { return nil, false } return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil) }) ``` ### Exported API - `New[T](opts...) *Registry[T]` — starts the worker pool; `Close()` stops it. - `(*Registry[T]).AddReceiver(key) (<-chan *T, cleanup func(), error)` — register a subscriber; the resolver returns the channel and calls cleanup on ctx.Done. - `(*Registry[T]).Submit(key, Producer[T])` — from the AMQP handler; non-blocking. - `Producer[T] func(ctx) (*T, ready bool)` — reads current read-view state, returns the payload + whether the change is visible; retried until ready. - Options: `WithLogger`, `WithObserver`, `WithReadRetry(attempts, wait)`, `WithBufferSize`, `WithWorkers`, `WithQueueSize`. - `Observer` — `PushSkipped`/`Dropped`/`ChannelFull` hooks for metrics. ### Design notes (the load-bearing bits, per ADR-0012) - **Per-replica.** Feed `Submit` from a `goamqp.TransientEventStreamConsumer` on the owning service's *own* events, so every replica sees every event and can push to the websockets it holds — distinct from the shared durable read-view consumer. - **Read-view gate.** The `Producer` must read *current* read-view state on each call (so out-of-order delivery across workers is still consistent) and report not-ready on a transient read error. The registry retries until ready or the budget elapses, so the client's refetch can't race the projection. - **Off the delivery goroutine.** `Submit` enqueues to a bounded worker pool and returns; the AMQP message is acked immediately. The poke is idempotent and drop-tolerant, so losing at-least-once on the poke is fine — the client refetches on any poke. - **No send on closed channel.** Pushes happen under the read lock; cleanup closes under the write lock. ### Conventions Standard Shiny library scaffolding: `gofumpt`/`goimports -local`, golangci-lint, gitleaks and conventional-commit checks via pre-commit; coverage-regression gate in CI (`.testcoverage.yml`); releases auto-tagged from conventional commits by the shared Release workflow. Bump consuming services' `go.mod` after a release. This library is concurrency-critical — always run `go test -race` and keep the concurrent-churn test green before changing the locking or worker model.