feat: type-generic registry for cross-service read-your-writes subscriptions
The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions (ADR-0012), extracted and hardened from the near-identical hand-rolled handlers in authz-service (availableCompanies) and accounting-service (entryBasesChanged) before a third copy is written. Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends under the read lock so a close can't race a send), a key-sharded worker pool that runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO order while distinct keys run in parallel), the bounded retry/timeout budget, and Observer metric hooks. Services supply only the event->key+payload mapping, the read-view Producer closure, and the per-replica transient-consumer wiring. Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,82 @@
|
||||
# subscriptions
|
||||
|
||||
Shared Go library: the reusable core of Shiny's cross-service read-your-writes
|
||||
GraphQL subscriptions.
|
||||
|
||||
## Shared Documentation
|
||||
|
||||
@../docs/claude/architecture.md
|
||||
@../docs/claude/go-services.md
|
||||
@../docs/claude/event-sourcing.md
|
||||
@../docs/claude/conventions.md
|
||||
|
||||
## Library Information
|
||||
|
||||
### Purpose
|
||||
|
||||
Single home for the subscription subscriber-registry + read-view gate + fan-out
|
||||
that was hand-rolled (near-identically) in `authz-service/subscription` and
|
||||
`accounting-service/subscription`. Implements the mechanism mandated by
|
||||
**ADR-0012** (cross-service read-your-writes via owning-service subscriptions),
|
||||
which is the concrete form of **ADR-0009** tier-3. ADR-0012 requires new
|
||||
instances of the pattern to use this library rather than copy it.
|
||||
|
||||
### Usage
|
||||
|
||||
```go
|
||||
import "gitea.unbound.se/shiny/subscriptions"
|
||||
|
||||
// One registry per subscription field, parameterised by the GraphQL payload.
|
||||
reg := subscriptions.New[model.EntryBasisChange](
|
||||
subscriptions.WithLogger(logger),
|
||||
subscriptions.WithObserver(otelObserver), // optional metrics
|
||||
)
|
||||
|
||||
// Resolver — register the websocket consumer; cleanup on ctx.Done.
|
||||
ch, cleanup, err := reg.AddReceiver(companyID)
|
||||
|
||||
// AMQP Process — gate on the read view, push off the delivery goroutine.
|
||||
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
|
||||
basis, err := readView.FindEntryBasisById(ctx, id)
|
||||
if err != nil { return nil, false }
|
||||
return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
|
||||
})
|
||||
```
|
||||
|
||||
### Exported API
|
||||
|
||||
- `New[T](opts...) *Registry[T]` — starts the worker pool; `Close()` stops it.
|
||||
- `(*Registry[T]).AddReceiver(key) (<-chan *T, cleanup func(), error)` — register
|
||||
a subscriber; the resolver returns the channel and calls cleanup on ctx.Done.
|
||||
- `(*Registry[T]).Submit(key, Producer[T])` — from the AMQP handler; non-blocking.
|
||||
- `Producer[T] func(ctx) (*T, ready bool)` — reads current read-view state,
|
||||
returns the payload + whether the change is visible; retried until ready.
|
||||
- Options: `WithLogger`, `WithObserver`, `WithReadRetry(attempts, wait)`,
|
||||
`WithBufferSize`, `WithWorkers`, `WithQueueSize`.
|
||||
- `Observer` — `PushSkipped`/`Dropped`/`ChannelFull` hooks for metrics.
|
||||
|
||||
### Design notes (the load-bearing bits, per ADR-0012)
|
||||
|
||||
- **Per-replica.** Feed `Submit` from a `goamqp.TransientEventStreamConsumer` on
|
||||
the owning service's *own* events, so every replica sees every event and can
|
||||
push to the websockets it holds — distinct from the shared durable read-view
|
||||
consumer.
|
||||
- **Read-view gate.** The `Producer` must read *current* read-view state on each
|
||||
call (so out-of-order delivery across workers is still consistent) and report
|
||||
not-ready on a transient read error. The registry retries until ready or the
|
||||
budget elapses, so the client's refetch can't race the projection.
|
||||
- **Off the delivery goroutine.** `Submit` enqueues to a bounded worker pool and
|
||||
returns; the AMQP message is acked immediately. The poke is idempotent and
|
||||
drop-tolerant, so losing at-least-once on the poke is fine — the client
|
||||
refetches on any poke.
|
||||
- **No send on closed channel.** Pushes happen under the read lock; cleanup
|
||||
closes under the write lock.
|
||||
|
||||
### Conventions
|
||||
|
||||
Standard Shiny library scaffolding: `gofumpt`/`goimports -local`, golangci-lint,
|
||||
gitleaks and conventional-commit checks via pre-commit; coverage-regression gate
|
||||
in CI (`.testcoverage.yml`); releases auto-tagged from conventional commits by
|
||||
the shared Release workflow. Bump consuming services' `go.mod` after a release.
|
||||
This library is concurrency-critical — always run `go test -race` and keep the
|
||||
concurrent-churn test green before changing the locking or worker model.
|
||||
Reference in New Issue
Block a user