Files
subscriptions/README.md
T

44 lines
2.0 KiB
Markdown
Raw Normal View History

# subscriptions
Shared core for Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0009 tier-3, ADR-0012).
An entity shown in the UI is frequently projected from *another* service's
event, so the owning service exposes a GraphQL subscription, drives it from a
per-replica transient AMQP consumer, and pushes a lightweight poke once the
change is visible in its own read view — the client then refetches the
authoritative query. This package is the reusable, type-generic, hardened core
of that pattern, extracted from the hand-rolled copies in `authz-service`
(`availableCompanies`) and `accounting-service` (`entryBasesChanged`).
```go
import "gitea.unbound.se/shiny/subscriptions"
// One registry per subscription, parameterised by the GraphQL payload type.
reg := subscriptions.New[model.EntryBasisChange](subscriptions.WithLogger(logger))
// Resolver: register a websocket consumer (key by company, user, …).
ch, cleanup, _ := reg.AddReceiver(companyID)
go func() { <-ctx.Done(); cleanup() }()
return ch, nil
// AMQP handler: gate the push on the read view, off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
basis, err := readView.FindEntryBasisById(ctx, id)
if err != nil {
return nil, false // transient read error — keep waiting
}
return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})
```
What the registry owns (so services don't re-roll it): the keyed subscriber map,
non-blocking buffered fan-out (sends under the read lock so a close can't race a
send), a bounded worker pool that runs the read-view gate **off** the AMQP
delivery goroutine, and the retry/timeout budget. What stays in the service: the
event→(key, payload) mapping and the `Producer` read-view closure.
The poke is idempotent and drop-tolerant — the client refetches on any poke — so
the worker acks immediately and a dropped/duplicated poke self-heals. Wire an
`Observer` to surface dropped/skipped pushes as metrics.