Files
subscriptions/CLAUDE.md
T

83 lines
3.6 KiB
Markdown
Raw Normal View History

# subscriptions
Shared Go library: the reusable core of Shiny's cross-service read-your-writes
GraphQL subscriptions.
## Shared Documentation
@../docs/claude/architecture.md
@../docs/claude/go-services.md
@../docs/claude/event-sourcing.md
@../docs/claude/conventions.md
## Library Information
### Purpose
Single home for the subscription subscriber-registry + read-view gate + fan-out
that was hand-rolled (near-identically) in `authz-service/subscription` and
`accounting-service/subscription`. Implements the mechanism mandated by
**ADR-0012** (cross-service read-your-writes via owning-service subscriptions),
which is the concrete form of **ADR-0009** tier-3. ADR-0012 requires new
instances of the pattern to use this library rather than copy it.
### Usage
```go
import "gitea.unbound.se/shiny/subscriptions"
// One registry per subscription field, parameterised by the GraphQL payload.
reg := subscriptions.New[model.EntryBasisChange](
subscriptions.WithLogger(logger),
subscriptions.WithObserver(otelObserver), // optional metrics
)
// Resolver — register the websocket consumer; cleanup on ctx.Done.
ch, cleanup, err := reg.AddReceiver(companyID)
// AMQP Process — gate on the read view, push off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
basis, err := readView.FindEntryBasisById(ctx, id)
if err != nil { return nil, false }
return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})
```
### Exported API
- `New[T](opts...) *Registry[T]` — starts the worker pool; `Close()` stops it.
- `(*Registry[T]).AddReceiver(key) (<-chan *T, cleanup func(), error)` — register
a subscriber; the resolver returns the channel and calls cleanup on ctx.Done.
- `(*Registry[T]).Submit(key, Producer[T])` — from the AMQP handler; non-blocking.
- `Producer[T] func(ctx) (*T, ready bool)` — reads current read-view state,
returns the payload + whether the change is visible; retried until ready.
- Options: `WithLogger`, `WithObserver`, `WithReadRetry(attempts, wait)`,
`WithBufferSize`, `WithWorkers`, `WithQueueSize`.
- `Observer``PushSkipped`/`Dropped`/`ChannelFull` hooks for metrics.
### Design notes (the load-bearing bits, per ADR-0012)
- **Per-replica.** Feed `Submit` from a `goamqp.TransientEventStreamConsumer` on
the owning service's *own* events, so every replica sees every event and can
push to the websockets it holds — distinct from the shared durable read-view
consumer.
- **Read-view gate.** The `Producer` must read *current* read-view state on each
call (so out-of-order delivery across workers is still consistent) and report
not-ready on a transient read error. The registry retries until ready or the
budget elapses, so the client's refetch can't race the projection.
- **Off the delivery goroutine.** `Submit` enqueues to a bounded worker pool and
returns; the AMQP message is acked immediately. The poke is idempotent and
drop-tolerant, so losing at-least-once on the poke is fine — the client
refetches on any poke.
- **No send on closed channel.** Pushes happen under the read lock; cleanup
closes under the write lock.
### Conventions
Standard Shiny library scaffolding: `gofumpt`/`goimports -local`, golangci-lint,
gitleaks and conventional-commit checks via pre-commit; coverage-regression gate
in CI (`.testcoverage.yml`); releases auto-tagged from conventional commits by
the shared Release workflow. Bump consuming services' `go.mod` after a release.
This library is concurrency-critical — always run `go test -race` and keep the
concurrent-churn test green before changing the locking or worker model.