From da4e7df6cee6c158cde1da1fa99bea537f2fba63 Mon Sep 17 00:00:00 2001 From: Joakim Olsson Date: Tue, 16 Jun 2026 14:22:34 +0200 Subject: [PATCH] feat: type-generic registry for cross-service read-your-writes subscriptions The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions (ADR-0012), extracted and hardened from the near-identical hand-rolled handlers in authz-service (availableCompanies) and accounting-service (entryBasesChanged) before a third copy is written. Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends under the read lock so a close can't race a send), a key-sharded worker pool that runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO order while distinct keys run in parallel), the bounded retry/timeout budget, and Observer metric hooks. Services supply only the event->key+payload mapping, the read-view Producer closure, and the per-replica transient-consumer wiring. Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- .editorconfig | 11 + .gitea/workflows/ci.yaml | 107 ++++++++ .gitea/workflows/pre-commit.yaml | 25 ++ .gitea/workflows/release.yaml | 9 + .gitignore | 5 + .golangci.yml | 22 ++ .pre-commit-config.yaml | 39 +++ .testcoverage.yml | 13 + .version | 3 + CHANGELOG.md | 11 + CLAUDE.md | 82 ++++++ README.md | 43 ++++ cliff.toml | 80 ++++++ go.mod | 14 + go.sum | 12 + renovate.json | 6 + subscriptions.go | 421 +++++++++++++++++++++++++++++++ subscriptions_test.go | 383 ++++++++++++++++++++++++++++ 18 files changed, 1286 insertions(+) create mode 100644 .editorconfig create mode 100644 .gitea/workflows/ci.yaml create mode 100644 .gitea/workflows/pre-commit.yaml create mode 100644 .gitea/workflows/release.yaml create mode 100644 .gitignore create mode 100644 .golangci.yml create mode 100644 .pre-commit-config.yaml create mode 100644 .testcoverage.yml create mode 100644 .version create mode 100644 CHANGELOG.md create mode 100644 CLAUDE.md create mode 100644 README.md create mode 100644 cliff.toml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 renovate.json create mode 100644 subscriptions.go create mode 100644 subscriptions_test.go diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..04cd3ad --- /dev/null +++ b/.editorconfig @@ -0,0 +1,11 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +trim_trailing_whitespace = true + +[*.go] +indent_style = tab +indent_size = 2 diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml new file mode 100644 index 0000000..e42acf9 --- /dev/null +++ b/.gitea/workflows/ci.yaml @@ -0,0 +1,107 @@ +name: subscriptions + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test: + if: gitea.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: 'stable' + - name: Format check + run: | + go install mvdan.cc/gofumpt@latest + test -z "$(gofumpt -l .)" + - name: Run tests + run: go test -race -coverprofile=coverage.txt ./... + + - name: Filter test files from coverage + run: | + grep -v -E '_test\.go:' coverage.txt > coverage.filtered.txt || true + mv coverage.filtered.txt coverage.txt + + - name: Check coverage + id: coverage + run: | + go install github.com/vladopajic/go-test-coverage/v2@latest + go-test-coverage --config ./.testcoverage.yml --github-action-output + - name: Restore baseline coverage + uses: actions/cache/restore@v5 + with: + path: coverage-baseline.txt + key: coverage-baseline-${{ gitea.run_id }} + restore-keys: | + coverage-baseline- + - name: Compare coverage + run: | + CURRENT="${{ steps.coverage.outputs.total-coverage }}" + if [ -f coverage-baseline.txt ]; then + BASE=$(cat coverage-baseline.txt) + echo "Base coverage: ${BASE}%" + echo "Current coverage: ${CURRENT}%" + if [ "$(echo "$CURRENT < $BASE" | bc -l)" -eq 1 ]; then + echo "::error::Coverage decreased from ${BASE}% to ${CURRENT}%" + exit 1 + fi + echo "Coverage maintained or improved: ${BASE}% -> ${CURRENT}%" + else + echo "No baseline coverage found yet, skipping comparison" + echo "Current coverage: ${CURRENT}%" + fi + - name: Post coverage comment + env: + GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} + GITEA_URL: ${{ gitea.server_url }} + run: | + COVERAGE="${{ steps.coverage.outputs.total-coverage }}" + curl -X POST "${GITEA_URL}/api/v1/repos/${{ gitea.repository }}/issues/${{ gitea.event.pull_request.number }}/comments" \ + -H "Authorization: token ${GITEA_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"body\": \"## Coverage Report\n\nTotal coverage: **${COVERAGE}%**\"}" + + coverage-baseline: + # Records main's coverage into the Actions cache for the next PR's + # regression gate to read. Post-merge only, not a required check, blocks + # nothing (cf. ADR-0010). + if: gitea.event_name == 'push' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: 'stable' + - name: Compute coverage + id: coverage + run: | + go install github.com/vladopajic/go-test-coverage/v2@latest + go test -coverprofile=coverage.txt ./... + grep -v -E '_test\.go:' coverage.txt > coverage.filtered.txt || true + mv coverage.filtered.txt coverage.txt + go-test-coverage --config ./.testcoverage.yml --github-action-output + - name: Write baseline file + run: echo "${{ steps.coverage.outputs.total-coverage }}" > coverage-baseline.txt + - name: Save baseline to cache + uses: actions/cache/save@v5 + with: + path: coverage-baseline.txt + key: coverage-baseline-${{ gitea.run_id }} + + vulnerabilities: + if: gitea.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: 'stable' + - name: Check vulnerabilities + run: | + go install golang.org/x/vuln/cmd/govulncheck@latest + govulncheck ./... diff --git a/.gitea/workflows/pre-commit.yaml b/.gitea/workflows/pre-commit.yaml new file mode 100644 index 0000000..e427748 --- /dev/null +++ b/.gitea/workflows/pre-commit.yaml @@ -0,0 +1,25 @@ +name: pre-commit +permissions: read-all + +on: + pull_request: + push: + branches: + - main + +jobs: + pre-commit: + runs-on: ubuntu-latest + env: + SKIP: no-commit-to-branch + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: stable + - uses: actions/setup-python@v6 + with: + python-version: '3.14' + - name: Install goimports + run: go install golang.org/x/tools/cmd/goimports@latest + - uses: pre-commit/action@v3.0.1 diff --git a/.gitea/workflows/release.yaml b/.gitea/workflows/release.yaml new file mode 100644 index 0000000..ef6ec99 --- /dev/null +++ b/.gitea/workflows/release.yaml @@ -0,0 +1,9 @@ +name: Release + +on: + push: + branches: [main] + +jobs: + release: + uses: unboundsoftware/shared-workflows/.gitea/workflows/Release.yml@main diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ab2f1aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +.claude +/release +coverage.txt +coverage-baseline.txt diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..5381cc5 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,22 @@ +version: "2" +run: + allow-parallel-runners: true +linters: + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..02cc8ec --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,39 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + args: + - --allow-multiple-documents + - id: check-added-large-files +- repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook + rev: v9.25.0 + hooks: + - id: commitlint + stages: [ commit-msg ] + additional_dependencies: [ '@commitlint/config-conventional' ] +- repo: https://github.com/dnephin/pre-commit-golang + rev: v0.5.1 + hooks: + - id: go-mod-tidy + - id: go-imports + args: + - -local + - gitea.unbound.se/shiny/subscriptions +- repo: https://github.com/lietu/go-pre-commit + rev: v1.0.0 + hooks: + - id: go-test + - id: gofumpt +- repo: https://github.com/golangci/golangci-lint + rev: v2.12.2 + hooks: + - id: golangci-lint-full +- repo: https://github.com/gitleaks/gitleaks + rev: v8.30.1 + hooks: + - id: gitleaks diff --git a/.testcoverage.yml b/.testcoverage.yml new file mode 100644 index 0000000..7aec8b6 --- /dev/null +++ b/.testcoverage.yml @@ -0,0 +1,13 @@ +# Coverage configuration for go-test-coverage +# https://github.com/vladopajic/go-test-coverage + +profile: coverage.txt + +threshold: + file: 0 + package: 0 + total: 0 + +exclude: + paths: + - _test\.go$ diff --git a/.version b/.version new file mode 100644 index 0000000..557859c --- /dev/null +++ b/.version @@ -0,0 +1,3 @@ +{ + "version": "v0.1.0" +} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..30c79b3 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,11 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [0.1.0] - 2026-06-16 + +### πŸš€ Features + +- Initial version: type-generic `Registry[T]` for cross-service read-your-writes GraphQL subscriptions (ADR-0012) β€” keyed subscriber map, non-blocking fan-out (sends under the read lock), a bounded worker pool that runs the read-view gate off the AMQP delivery goroutine, and `Observer` metric hooks. Extracted and hardened from the hand-rolled `authz-service` and `accounting-service` copies. + + diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..399aa0a --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,82 @@ +# subscriptions + +Shared Go library: the reusable core of Shiny's cross-service read-your-writes +GraphQL subscriptions. + +## Shared Documentation + +@../docs/claude/architecture.md +@../docs/claude/go-services.md +@../docs/claude/event-sourcing.md +@../docs/claude/conventions.md + +## Library Information + +### Purpose + +Single home for the subscription subscriber-registry + read-view gate + fan-out +that was hand-rolled (near-identically) in `authz-service/subscription` and +`accounting-service/subscription`. Implements the mechanism mandated by +**ADR-0012** (cross-service read-your-writes via owning-service subscriptions), +which is the concrete form of **ADR-0009** tier-3. ADR-0012 requires new +instances of the pattern to use this library rather than copy it. + +### Usage + +```go +import "gitea.unbound.se/shiny/subscriptions" + +// One registry per subscription field, parameterised by the GraphQL payload. +reg := subscriptions.New[model.EntryBasisChange]( + subscriptions.WithLogger(logger), + subscriptions.WithObserver(otelObserver), // optional metrics +) + +// Resolver β€” register the websocket consumer; cleanup on ctx.Done. +ch, cleanup, err := reg.AddReceiver(companyID) + +// AMQP Process β€” gate on the read view, push off the delivery goroutine. +reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) { + basis, err := readView.FindEntryBasisById(ctx, id) + if err != nil { return nil, false } + return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil) +}) +``` + +### Exported API + +- `New[T](opts...) *Registry[T]` β€” starts the worker pool; `Close()` stops it. +- `(*Registry[T]).AddReceiver(key) (<-chan *T, cleanup func(), error)` β€” register + a subscriber; the resolver returns the channel and calls cleanup on ctx.Done. +- `(*Registry[T]).Submit(key, Producer[T])` β€” from the AMQP handler; non-blocking. +- `Producer[T] func(ctx) (*T, ready bool)` β€” reads current read-view state, + returns the payload + whether the change is visible; retried until ready. +- Options: `WithLogger`, `WithObserver`, `WithReadRetry(attempts, wait)`, + `WithBufferSize`, `WithWorkers`, `WithQueueSize`. +- `Observer` β€” `PushSkipped`/`Dropped`/`ChannelFull` hooks for metrics. + +### Design notes (the load-bearing bits, per ADR-0012) + +- **Per-replica.** Feed `Submit` from a `goamqp.TransientEventStreamConsumer` on + the owning service's *own* events, so every replica sees every event and can + push to the websockets it holds β€” distinct from the shared durable read-view + consumer. +- **Read-view gate.** The `Producer` must read *current* read-view state on each + call (so out-of-order delivery across workers is still consistent) and report + not-ready on a transient read error. The registry retries until ready or the + budget elapses, so the client's refetch can't race the projection. +- **Off the delivery goroutine.** `Submit` enqueues to a bounded worker pool and + returns; the AMQP message is acked immediately. The poke is idempotent and + drop-tolerant, so losing at-least-once on the poke is fine β€” the client + refetches on any poke. +- **No send on closed channel.** Pushes happen under the read lock; cleanup + closes under the write lock. + +### Conventions + +Standard Shiny library scaffolding: `gofumpt`/`goimports -local`, golangci-lint, +gitleaks and conventional-commit checks via pre-commit; coverage-regression gate +in CI (`.testcoverage.yml`); releases auto-tagged from conventional commits by +the shared Release workflow. Bump consuming services' `go.mod` after a release. +This library is concurrency-critical β€” always run `go test -race` and keep the +concurrent-churn test green before changing the locking or worker model. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ed1e440 --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +# subscriptions + +Shared core for Shiny's cross-service read-your-writes GraphQL subscriptions +(ADR-0009 tier-3, ADR-0012). + +An entity shown in the UI is frequently projected from *another* service's +event, so the owning service exposes a GraphQL subscription, drives it from a +per-replica transient AMQP consumer, and pushes a lightweight poke once the +change is visible in its own read view β€” the client then refetches the +authoritative query. This package is the reusable, type-generic, hardened core +of that pattern, extracted from the hand-rolled copies in `authz-service` +(`availableCompanies`) and `accounting-service` (`entryBasesChanged`). + +```go +import "gitea.unbound.se/shiny/subscriptions" + +// One registry per subscription, parameterised by the GraphQL payload type. +reg := subscriptions.New[model.EntryBasisChange](subscriptions.WithLogger(logger)) + +// Resolver: register a websocket consumer (key by company, user, …). +ch, cleanup, _ := reg.AddReceiver(companyID) +go func() { <-ctx.Done(); cleanup() }() +return ch, nil + +// AMQP handler: gate the push on the read view, off the delivery goroutine. +reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) { + basis, err := readView.FindEntryBasisById(ctx, id) + if err != nil { + return nil, false // transient read error β€” keep waiting + } + return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil) +}) +``` + +What the registry owns (so services don't re-roll it): the keyed subscriber map, +non-blocking buffered fan-out (sends under the read lock so a close can't race a +send), a bounded worker pool that runs the read-view gate **off** the AMQP +delivery goroutine, and the retry/timeout budget. What stays in the service: the +eventβ†’(key, payload) mapping and the `Producer` read-view closure. + +The poke is idempotent and drop-tolerant β€” the client refetches on any poke β€” so +the worker acks immediately and a dropped/duplicated poke self-heals. Wire an +`Observer` to surface dropped/skipped pushes as metrics. diff --git a/cliff.toml b/cliff.toml new file mode 100644 index 0000000..ac04085 --- /dev/null +++ b/cliff.toml @@ -0,0 +1,80 @@ +# git-cliff ~ default configuration file +# https://git-cliff.org/docs/configuration +# +# Lines starting with "#" are comments. +# Configuration options are organized into tables and keys. +# See documentation for more information on available options. + +[changelog] +# template for the changelog header +header = """ +# Changelog\n +All notable changes to this project will be documented in this file.\n +""" +# template for the changelog body +# https://keats.github.io/tera/docs/#introduction +body = """ +{% if version %}\ + ## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }} +{% else %}\ + ## [unreleased] +{% endif %}\ +{% for group, commits in commits | group_by(attribute="group") %} + ### {{ group | striptags | trim | upper_first }} + {% for commit in commits %} + - {% if commit.scope %}*({{ commit.scope }})* {% endif %}\ + {% if commit.breaking %}[**breaking**] {% endif %}\ + {{ commit.message | upper_first }}\ + {% endfor %} +{% endfor %}\n +""" +# template for the changelog footer +footer = """ + +""" +# remove the leading and trailing s +trim = true +# postprocessors +postprocessors = [ + # { pattern = '', replace = "https://github.com/orhun/git-cliff" }, # replace repository URL +] +# render body even when there are no releases to process +# render_always = true +# output file path +# output = "test.md" + +[git] +# parse the commits based on https://www.conventionalcommits.org +conventional_commits = true +# filter out the commits that are not conventional +filter_unconventional = true +# process each line of a commit as an individual commit +split_commits = false +# regex for preprocessing the commit messages +commit_preprocessors = [ + # Replace issue numbers + #{ pattern = '\((\w+\s)?#([0-9]+)\)', replace = "([#${2}](/issues/${2}))"}, + # Check spelling of the commit with https://github.com/crate-ci/typos + # If the spelling is incorrect, it will be automatically fixed. + #{ pattern = '.*', replace_command = 'typos --write-changes -' }, +] +# regex for parsing and grouping commits +commit_parsers = [ + { message = "^feat", group = "πŸš€ Features" }, + { message = "^fix", group = "πŸ› Bug Fixes" }, + { message = "^doc", group = "πŸ“š Documentation" }, + { message = "^perf", group = "⚑ Performance" }, + { message = "^refactor", group = "🚜 Refactor" }, + { message = "^style", group = "🎨 Styling" }, + { message = "^test", group = "πŸ§ͺ Testing" }, + { message = "^chore\\(release\\): prepare for", skip = true }, + { message = "^chore|^ci", group = "βš™οΈ Miscellaneous Tasks" }, + { body = ".*security", group = "πŸ›‘οΈ Security" }, + { message = "^revert", group = "◀️ Revert" }, +] +# filter out the commits that are not matched by commit parsers +filter_commits = false +# sort the tags topologically +topo_order = false +# sort the commits inside sections by oldest/newest order +sort_commits = "oldest" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0134165 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module gitea.unbound.se/shiny/subscriptions + +go 1.25 + +require ( + github.com/google/uuid v1.6.0 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..42976e0 --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/renovate.json b/renovate.json new file mode 100644 index 0000000..5db72dd --- /dev/null +++ b/renovate.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "config:recommended" + ] +} diff --git a/subscriptions.go b/subscriptions.go new file mode 100644 index 0000000..4b52bed --- /dev/null +++ b/subscriptions.go @@ -0,0 +1,421 @@ +// Package subscriptions provides a reusable, type-generic registry for fanning +// out change notifications to in-process GraphQL subscription consumers. +// +// It is the shared core of Shiny's cross-service read-your-writes pattern +// (ADR-0012): an entity shown in the UI is frequently projected from another +// service's event, so the owning service exposes a GraphQL subscription, drives +// it from a per-replica transient AMQP consumer, and pushes a notification once +// the change is visible in its own read view β€” the client then refetches the +// authoritative query. Two services hand-rolled this (authz-service's +// availableCompanies, accounting-service's entryBasesChanged); this package is +// the extracted, hardened core so further cases reuse it instead of copying it. +// +// # Wiring requirement (do not get this wrong) +// +// Submit MUST be fed from a per-replica goamqp.TransientEventStreamConsumer (an +// exclusive, randomly-named queue) bound to the owning service's OWN events, so +// every replica receives every event and can push to the websockets it holds. +// This is necessarily a DIFFERENT consumer from the shared, durable read-view +// projection consumer (a work-queue, where exactly one replica handles each +// event). Wiring Submit to a shared/durable consumer silently breaks delivery in +// a multi-replica deployment: the one replica that handles an event usually does +// not hold the subscriber's websocket, so the poke is lost with no error. The +// library cannot enforce this (it is transport-agnostic) β€” the caller must. +// +// # Concurrency model +// +// - AddReceiver registers a subscriber (one per websocket) and returns a +// buffered channel plus a cleanup func that must be called when the +// subscription ends. +// - Submit is called from the AMQP event handler. It does NOT block that +// handler on the read view: it hands the work to a per-key worker that waits +// (with a budget) for the read view to reflect the change β€” via the caller's +// [Producer] β€” and only then pushes. Acking the AMQP message immediately is +// safe because notifications are idempotent and drop-tolerant (see below). +// - Work is sharded by key, so all events for one key are processed FIFO by a +// single worker (preserving per-key order even for payloads the client +// consumes directly), while distinct keys run in parallel (so one lagging +// read view only delays its own key's shard, not everything). +// +// Pushes happen while holding the read lock, and cleanup closes a subscriber's +// channel under the write lock, so a send can never race a close ("send on +// closed channel"). A full subscriber buffer drops the notification rather than +// blocking a slow consumer. +// +// # Payload contract +// +// T should be a lightweight notification the client reacts to by refetching the +// authoritative query (a poke such as {id, removed}), not authoritative state +// the client consumes as the source of truth. Notifications may be dropped (full +// queue or buffer) and the push is best-effort (no AMQP requeue on a persistent +// read failure), so reliability comes from the client's idempotent refetch. +// Per-key FIFO ordering is preserved, so a payload the client does consume +// directly is at least delivered in event order for a given key β€” but it can +// still be dropped, so a refetch-on-receipt design is strongly preferred. +package subscriptions + +import ( + "context" + "errors" + "hash/fnv" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" +) + +// ErrEmptyKey is returned by AddReceiver when the subscription key is empty, +// which almost always indicates an unpopulated id at the call site. +var ErrEmptyKey = errors.New("subscriptions: empty subscription key") + +// Producer reads the owning service's read view and returns the payload to push +// together with whether the change is yet visible there. +// +// The registry calls a Producer repeatedly until it reports ready (or a bounded +// budget elapses), so the client's refetch can never race ahead of the +// projection. A Producer MUST read current read-view state on each call (rather +// than capturing the event's historical state). On a transient read error it +// should return (nil, false) to keep waiting β€” the event is already durable, so +// only the projection is lagging. +// +// Convergence precondition: "retry until ready" only terminates as ready (rather +// than burning the whole budget then skipping) if the read view the Producer +// gates on is made consistent by the SAME ordered aggregate stream as the +// triggering event. Gating on a row populated by a different aggregate's events +// can fail to converge. +type Producer[T any] func(ctx context.Context) (payload *T, ready bool) + +// Observer receives best-effort notifications about pushes, for +// metrics/observability. Its methods may be called concurrently. The default is +// a no-op; wire an implementation (e.g. OTel counters) via [WithObserver]. +type Observer interface { + // Pushed reports that a change was gated and delivered to the key's + // subscribers β€” the denominator for a skip/drop rate. + Pushed(key string) + // PushSkipped reports that the read view never reflected the change within + // the retry budget, so the push was skipped. + PushSkipped(key string) + // Dropped reports that the worker queue was full, so the notification was + // dropped before it could be gated. + Dropped(key string) + // ChannelFull reports that a subscriber's buffer was full, so its + // notification was dropped. + ChannelFull(key string) +} + +type noopObserver struct{} + +func (noopObserver) Pushed(string) {} +func (noopObserver) PushSkipped(string) {} +func (noopObserver) Dropped(string) {} +func (noopObserver) ChannelFull(string) {} + +type subscriber[T any] struct { + id string + channel chan *T +} + +type job[T any] struct { + key string + produce Producer[T] +} + +// Registry fans out notifications of type T to in-process subscribers keyed by +// an arbitrary string (e.g. a company id or a user email). The zero value is +// not usable; construct one with [New]. +type Registry[T any] struct { + logger *slog.Logger + obs Observer + bufferSize int + retries int + retryWait time.Duration + + mu sync.RWMutex + subscribers map[string]map[string]*subscriber[T] + + shards []chan job[T] + baseCtx context.Context + cancel context.CancelFunc + done chan struct{} + wg sync.WaitGroup + closeOnce sync.Once +} + +type config struct { + logger *slog.Logger + obs Observer + bufferSize int + retries int + retryWait time.Duration + workers int + queueSize int +} + +// Option configures a [Registry]. +type Option func(*config) + +// WithLogger sets the structured logger. Defaults to [slog.Default]. +func WithLogger(l *slog.Logger) Option { + return func(c *config) { + if l != nil { + c.logger = l + } + } +} + +// WithObserver wires a metrics observer. Defaults to a no-op. +func WithObserver(o Observer) Option { + return func(c *config) { + if o != nil { + c.obs = o + } + } +} + +// WithReadRetry tunes how long a worker waits for the read view to reflect a +// change before giving up on the push: up to attempts re-reads spaced by wait +// (so total reads = attempts+1; default 25 Γ— 200ms β‰ˆ 5s). The read-view consumer +// and this subscription consumer are independent consumers of the same event, so +// a freshly-projected change may not be visible on the first read; retrying +// avoids pushing a notification the client would refetch ahead of. Non-positive +// values are ignored (attempts clamps to β‰₯0). +func WithReadRetry(attempts int, wait time.Duration) Option { + return func(c *config) { + if attempts >= 0 { + c.retries = attempts + } + if wait > 0 { + c.retryWait = wait + } + } +} + +// WithBufferSize sets each subscriber channel's buffer (default 20). A full +// buffer drops the notification rather than blocking. +func WithBufferSize(n int) Option { + return func(c *config) { + if n > 0 { + c.bufferSize = n + } + } +} + +// WithWorkers sets the number of key-shard workers (default 4). Each key is +// handled FIFO by exactly one worker; more workers spread distinct keys over +// more goroutines so a lagging read view delays only its own shard. +func WithWorkers(n int) Option { + return func(c *config) { + if n > 0 { + c.workers = n + } + } +} + +// WithQueueSize sets each shard's job-queue depth (default 64). A full queue +// drops the notification (reported via [Observer.Dropped]) rather than blocking +// the AMQP delivery goroutine. +func WithQueueSize(n int) Option { + return func(c *config) { + if n > 0 { + c.queueSize = n + } + } +} + +// New builds a Registry and starts its key-shard workers. Call [Registry.Close] +// to stop them (optional; they exit with the process otherwise). +func New[T any](opts ...Option) *Registry[T] { + c := &config{ + logger: slog.Default(), + obs: noopObserver{}, + bufferSize: 20, + retries: 25, + retryWait: 200 * time.Millisecond, + workers: 4, + queueSize: 64, + } + for _, o := range opts { + o(c) + } + ctx, cancel := context.WithCancel(context.Background()) + r := &Registry[T]{ + logger: c.logger, + obs: c.obs, + bufferSize: c.bufferSize, + retries: c.retries, + retryWait: c.retryWait, + subscribers: make(map[string]map[string]*subscriber[T]), + shards: make([]chan job[T], c.workers), + baseCtx: ctx, + cancel: cancel, + done: make(chan struct{}), + } + r.wg.Add(c.workers) + for i := range r.shards { + r.shards[i] = make(chan job[T], c.queueSize) + go r.worker(r.shards[i]) + } + return r +} + +// AddReceiver registers a subscriber for the given key. It returns the channel +// to stream and a cleanup func that MUST be called when the subscription ends +// (e.g. from the resolver's ctx.Done) to close the channel and release the +// registration. cleanup is idempotent. Returns [ErrEmptyKey] for an empty key. +func (r *Registry[T]) AddReceiver(key string) (<-chan *T, func(), error) { + if key == "" { + return nil, nil, ErrEmptyKey + } + + s := &subscriber[T]{ + id: uuid.NewString(), + channel: make(chan *T, r.bufferSize), + } + + r.mu.Lock() + if r.subscribers[key] == nil { + r.subscribers[key] = make(map[string]*subscriber[T]) + } + r.subscribers[key][s.id] = s + total := len(r.subscribers[key]) + r.mu.Unlock() + + r.logger.Info("subscription registered", + "key", key, "subscription_id", s.id, "total_subscriptions", total) + + cleanup := func() { r.removeReceiver(key, s.id) } + return s.channel, cleanup, nil +} + +func (r *Registry[T]) removeReceiver(key, id string) { + r.mu.Lock() + defer r.mu.Unlock() + + subs := r.subscribers[key] + if subs == nil { + return + } + s, ok := subs[id] + if !ok { + return + } + close(s.channel) + delete(subs, id) + remaining := len(subs) + if remaining == 0 { + delete(r.subscribers, key) + } + r.logger.Info("subscription removed", + "key", key, "subscription_id", id, "remaining_subscriptions", remaining) +} + +// Submit schedules a gated push for the given key. It returns immediately: when +// the key is empty or has no subscribers it does nothing, otherwise it enqueues +// work for that key's shard worker (dropping, with an [Observer.Dropped], only if +// the shard queue is full). produce is invoked on the worker, not on the calling +// goroutine. +func (r *Registry[T]) Submit(key string, produce Producer[T]) { + if key == "" || !r.hasSubscribers(key) { + return + } + shard := r.shards[shardIndex(key, len(r.shards))] + select { + case shard <- job[T]{key: key, produce: produce}: + case <-r.done: + // shutting down + default: + r.logger.Warn("subscription job queue full; dropping notification", "key", key) + r.obs.Dropped(key) + } +} + +// Close stops the worker pool and waits for in-flight gating to finish. It +// cancels any in-flight read-view wait so workers return promptly rather than +// blocking for the full retry budget. Queued-but-unstarted notifications are +// dropped (safe β€” they are drop-tolerant). Idempotent; Submit after Close is a +// no-op. +func (r *Registry[T]) Close() { + r.closeOnce.Do(func() { + r.cancel() + close(r.done) + r.wg.Wait() + }) +} + +func (r *Registry[T]) hasSubscribers(key string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.subscribers[key]) > 0 +} + +func (r *Registry[T]) worker(shard <-chan job[T]) { + defer r.wg.Done() + for { + select { + case j := <-shard: + r.handle(j) + case <-r.done: + return + } + } +} + +func (r *Registry[T]) handle(j job[T]) { + // Bound the read-view wait so a hung read or a never-reflected event (e.g. + // redelivery of an event for a since-deleted entity) can't pin a worker. + // Derived from baseCtx so Close cancels in-flight waits promptly. + budget := time.Duration(r.retries+1) * r.retryWait + ctx, cancel := context.WithTimeout(r.baseCtx, budget) + defer cancel() + + payload, ok := r.await(ctx, j.produce) + if !ok { + r.logger.Warn("change not visible in read view after retries; subscription push skipped", + "key", j.key) + r.obs.PushSkipped(j.key) + return + } + r.obs.Pushed(j.key) + r.push(j.key, payload) +} + +func (r *Registry[T]) await(ctx context.Context, produce Producer[T]) (*T, bool) { + for attempt := 0; ; attempt++ { + if payload, ready := produce(ctx); ready { + return payload, true + } + if attempt >= r.retries { + return nil, false + } + select { + case <-ctx.Done(): + return nil, false + case <-time.After(r.retryWait): + } + } +} + +// push delivers payload to every subscriber of key. The sends happen under the +// read lock: sendNonBlocking never blocks (it drops on a full buffer), so +// holding the lock is cheap, and it prevents removeReceiver β€” which takes the +// write lock to close a channel β€” from closing a channel out from under an +// in-flight send. +func (r *Registry[T]) push(key string, payload *T) { + r.mu.RLock() + defer r.mu.RUnlock() + for _, s := range r.subscribers[key] { + select { + case s.channel <- payload: + default: + r.logger.Warn("subscription channel full; dropping notification", "key", key) + r.obs.ChannelFull(key) + } + } +} + +func shardIndex(key string, n int) int { + h := fnv.New32a() + _, _ = h.Write([]byte(key)) + return int(h.Sum32() % uint32(n)) //nolint:gosec // modulo keeps the result in [0,n) +} diff --git a/subscriptions_test.go b/subscriptions_test.go new file mode 100644 index 0000000..4d16d3b --- /dev/null +++ b/subscriptions_test.go @@ -0,0 +1,383 @@ +package subscriptions + +import ( + "context" + "fmt" + "io" + "log/slog" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type ping struct { + ID string +} + +func quiet() Option { + return WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) +} + +// recordObserver records the metric callbacks; safe for concurrent use. +type recordObserver struct { + mu sync.Mutex + pushed []string + skipped []string + dropped []string + channelFull []string +} + +func (o *recordObserver) Pushed(k string) { o.add(&o.pushed, k) } +func (o *recordObserver) PushSkipped(k string) { o.add(&o.skipped, k) } +func (o *recordObserver) Dropped(k string) { o.add(&o.dropped, k) } +func (o *recordObserver) ChannelFull(k string) { o.add(&o.channelFull, k) } + +func (o *recordObserver) add(dst *[]string, k string) { + o.mu.Lock() + defer o.mu.Unlock() + *dst = append(*dst, k) +} + +func (o *recordObserver) count(get func(*recordObserver) []string) int { + o.mu.Lock() + defer o.mu.Unlock() + return len(get(o)) +} + +func (o *recordObserver) pushedCount() int { + return o.count(func(r *recordObserver) []string { return r.pushed }) +} + +func (o *recordObserver) skippedCount() int { + return o.count(func(r *recordObserver) []string { return r.skipped }) +} + +func (o *recordObserver) droppedCount() int { + return o.count(func(r *recordObserver) []string { return r.dropped }) +} + +func (o *recordObserver) channelFullCount() int { + return o.count(func(r *recordObserver) []string { return r.channelFull }) +} + +func ready(p *ping) Producer[ping] { + return func(context.Context) (*ping, bool) { return p, true } +} + +func never() Producer[ping] { + return func(context.Context) (*ping, bool) { return nil, false } +} + +func recv(t *testing.T, ch <-chan *ping) *ping { + t.Helper() + select { + case v := <-ch: + return v + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for a push") + return nil + } +} + +func assertNoPush(t *testing.T, ch <-chan *ping) { + t.Helper() + select { + case v := <-ch: + t.Fatalf("unexpected push: %+v", v) + case <-time.After(100 * time.Millisecond): + } +} + +func TestSubmit_pushesWhenReady(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + ch, cleanup, err := r.AddReceiver("c1") + require.NoError(t, err) + defer cleanup() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + + assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch)) +} + +func TestSubmit_noSubscribers_doesNotEnqueue(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + var calls int + r.Submit("c1", func(context.Context) (*ping, bool) { + calls++ + return &ping{ID: "x"}, true + }) + + // No subscribers β†’ Submit returns on the fast path without enqueuing, so the + // producer is never invoked. + assert.Equal(t, 0, calls) +} + +func TestSubmit_emptyKey_isNoop(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + var calls int + r.Submit("", func(context.Context) (*ping, bool) { + calls++ + return nil, true + }) + assert.Equal(t, 0, calls) +} + +func TestSubmit_lagThenReady(t *testing.T) { + r := New[ping](quiet(), WithReadRetry(50, time.Millisecond)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + var mu sync.Mutex + calls := 0 + r.Submit("c1", func(context.Context) (*ping, bool) { + mu.Lock() + defer mu.Unlock() + calls++ + // Not visible on the first read; visible thereafter. + return &ping{ID: "eb1"}, calls > 1 + }) + + assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch)) +} + +func TestSubmit_neverReady_skipsAndReports(t *testing.T) { + obs := &recordObserver{} + r := New[ping](quiet(), WithObserver(obs), WithReadRetry(2, time.Millisecond)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + r.Submit("c1", never()) + + assert.Eventually(t, func() bool { return obs.skippedCount() == 1 }, time.Second, 5*time.Millisecond) + assert.Equal(t, 0, obs.pushedCount()) + assertNoPush(t, ch) +} + +func TestObserver_pushedOnDelivery(t *testing.T) { + obs := &recordObserver{} + r := New[ping](quiet(), WithObserver(obs)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + recv(t, ch) + + assert.Eventually(t, func() bool { return obs.pushedCount() == 1 }, time.Second, 5*time.Millisecond) +} + +func TestPush_allSubscribersOfKey(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + chA, cleanupA, _ := r.AddReceiver("c1") + defer cleanupA() + chB, cleanupB, _ := r.AddReceiver("c1") + defer cleanupB() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + + assert.Equal(t, "eb1", recv(t, chA).ID) + assert.Equal(t, "eb1", recv(t, chB).ID) +} + +func TestSubmit_keysIsolated(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + chA, cleanupA, _ := r.AddReceiver("c1") + defer cleanupA() + chB, cleanupB, _ := r.AddReceiver("c2") + defer cleanupB() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + + assert.Equal(t, "eb1", recv(t, chA).ID) + assertNoPush(t, chB) +} + +// TestPerKeyOrdering asserts that events for one key are delivered FIFO even +// across the worker pool (same key β†’ same shard β†’ one worker). This guards the +// ordering guarantee a payload the client consumes directly relies on. +func TestPerKeyOrdering(t *testing.T) { + r := New[ping](quiet(), WithWorkers(4)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + const n = 10 + for i := range n { + r.Submit("c1", ready(&ping{ID: fmt.Sprintf("e%02d", i)})) + } + for i := range n { + assert.Equal(t, fmt.Sprintf("e%02d", i), recv(t, ch).ID) + } +} + +func TestAddReceiver_emptyKeyErrors(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + ch, cleanup, err := r.AddReceiver("") + assert.ErrorIs(t, err, ErrEmptyKey) + assert.Nil(t, ch) + assert.Nil(t, cleanup) +} + +func TestRemoveReceiver_closesChannel(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + cleanup() + + // Draining a closed channel terminates the range/returns ok=false. + _, ok := <-ch + assert.False(t, ok) +} + +func TestRemoveReceiver_idempotent(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + _, cleanup, _ := r.AddReceiver("c1") + cleanup() + assert.NotPanics(t, cleanup) // second call is a no-op +} + +func TestChannelFull_dropsAndReports(t *testing.T) { + obs := &recordObserver{} + // One worker so the two pushes are serialized; buffer 1 so the second drops. + r := New[ping](quiet(), WithObserver(obs), WithBufferSize(1), WithWorkers(1)) + defer r.Close() + + _, cleanup, _ := r.AddReceiver("c1") // never read from the channel + defer cleanup() + + r.Submit("c1", ready(&ping{ID: "first"})) + r.Submit("c1", ready(&ping{ID: "second"})) + + assert.Eventually(t, func() bool { return obs.channelFullCount() == 1 }, time.Second, 5*time.Millisecond) +} + +func TestSubmit_queueFull_dropsAndReports(t *testing.T) { + obs := &recordObserver{} + started := make(chan struct{}) + release := make(chan struct{}) + + r := New[ping](quiet(), WithObserver(obs), WithWorkers(1), WithQueueSize(1), WithReadRetry(0, time.Millisecond)) + defer r.Close() // runs last + defer close(release) // runs before Close, unblocking the worker + + _, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + var once sync.Once + r.Submit("c1", func(context.Context) (*ping, bool) { + once.Do(func() { close(started) }) + <-release + return &ping{ID: "blocking"}, true + }) + <-started // the single worker is now blocked in produce; its shard queue is empty + + r.Submit("c1", ready(&ping{ID: "fills-queue"})) // occupies the cap-1 shard queue + r.Submit("c1", ready(&ping{ID: "dropped"})) // queue full β†’ dropped + + assert.Eventually(t, func() bool { return obs.droppedCount() >= 1 }, time.Second, 5*time.Millisecond) +} + +func TestClose_isIdempotentAndSubmitAfterCloseIsNoop(t *testing.T) { + r := New[ping](quiet()) + r.Close() + r.Close() // idempotent + + assert.NotPanics(t, func() { + r.Submit("c1", ready(&ping{ID: "x"})) + }) +} + +// TestClose_cancelsInFlightGate asserts Close returns promptly even while a +// worker is mid-gate, rather than blocking for the full retry budget (~5s by +// default). Guards the registry-level context cancellation. +func TestClose_cancelsInFlightGate(t *testing.T) { + started := make(chan struct{}) + r := New[ping](quiet()) // default retry budget β‰ˆ 5s + + _, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + var once sync.Once + r.Submit("c1", func(context.Context) (*ping, bool) { + once.Do(func() { close(started) }) + return nil, false // never ready β†’ worker stays in the gate loop + }) + <-started // worker is now in the gate loop + + done := make(chan struct{}) + go func() { r.Close(); close(done) }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Close did not return promptly while a worker was gating") + } +} + +// TestConcurrentChurn stresses the race between removeReceiver (which closes a +// subscriber channel under the write lock) and a worker push (which sends under +// the read lock). Before sends were moved under the read lock this panicked +// with "send on closed channel". Run with -race. +func TestConcurrentChurn(t *testing.T) { + r := New[ping](quiet(), WithReadRetry(0, time.Millisecond), WithWorkers(8)) + defer r.Close() + + // A long-lived reader so there is always at least one subscriber being + // pushed to while others churn. + _, steadyCleanup, _ := r.AddReceiver("c1") + defer steadyCleanup() + go func() { + ch, _, _ := r.AddReceiver("c1") + for range ch { //nolint:revive // drain + } + }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for range 2000 { + _, cleanup, err := r.AddReceiver("c1") + if err != nil { + t.Errorf("AddReceiver: %v", err) + return + } + cleanup() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for range 2000 { + r.Submit("c1", ready(&ping{ID: "x"})) + } + }() + + wg.Wait() +}