releaser b8546c1609
subscriptions / vulnerabilities (pull_request) Successful in 1m25s
subscriptions / test (pull_request) Successful in 2m40s
subscriptions / coverage-baseline (pull_request) Has been skipped
pre-commit / pre-commit (pull_request) Successful in 5m21s
chore(release): prepare for v0.1.1
2026-06-20 20:31:25 +00:00
2026-06-20 20:31:25 +00:00

subscriptions

Shared core for Shiny's cross-service read-your-writes GraphQL subscriptions (ADR-0009 tier-3, ADR-0012).

An entity shown in the UI is frequently projected from another service's event, so the owning service exposes a GraphQL subscription, drives it from a per-replica transient AMQP consumer, and pushes a lightweight poke once the change is visible in its own read view — the client then refetches the authoritative query. This package is the reusable, type-generic, hardened core of that pattern, extracted from the hand-rolled copies in authz-service (availableCompanies) and accounting-service (entryBasesChanged).

import "gitea.unbound.se/shiny/subscriptions"

// One registry per subscription, parameterised by the GraphQL payload type.
reg := subscriptions.New[model.EntryBasisChange](subscriptions.WithLogger(logger))

// Resolver: register a websocket consumer (key by company, user, …).
ch, cleanup, _ := reg.AddReceiver(companyID)
go func() { <-ctx.Done(); cleanup() }()
return ch, nil

// AMQP handler: gate the push on the read view, off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
    basis, err := readView.FindEntryBasisById(ctx, id)
    if err != nil {
        return nil, false // transient read error — keep waiting
    }
    return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})

What the registry owns (so services don't re-roll it): the keyed subscriber map, non-blocking buffered fan-out (sends under the read lock so a close can't race a send), a bounded worker pool that runs the read-view gate off the AMQP delivery goroutine, and the retry/timeout budget. What stays in the service: the event→(key, payload) mapping and the Producer read-view closure.

The poke is idempotent and drop-tolerant — the client refetches on any poke — so the worker acks immediately and a dropped/duplicated poke self-heals. Wire an Observer to surface dropped/skipped pushes as metrics.

S
Description
Shared core for cross-service read-your-writes GraphQL subscriptions (ADR-0012)
Readme 51 KiB
Languages
Go 100%