feat: type-generic registry for cross-service read-your-writes subscriptions

The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0012), extracted and hardened from the near-identical hand-rolled handlers
in authz-service (availableCompanies) and accounting-service (entryBasesChanged)
before a third copy is written.

Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends
under the read lock so a close can't race a send), a key-sharded worker pool that
runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO
order while distinct keys run in parallel), the bounded retry/timeout budget, and
Observer metric hooks. Services supply only the event->key+payload mapping, the
read-view Producer closure, and the per-replica transient-consumer wiring.

Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-16 14:22:34 +02:00
commit f52f7276ff
18 changed files with 1286 additions and 0 deletions
+11
View File
@@ -0,0 +1,11 @@
root = true
[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
trim_trailing_whitespace = true
[*.go]
indent_style = tab
indent_size = 2
+107
View File
@@ -0,0 +1,107 @@
name: subscriptions
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
if: gitea.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/setup-go@v6
with:
go-version: 'stable'
- name: Format check
run: |
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -l .)"
- name: Run tests
run: go test -race -coverprofile=coverage.txt ./...
- name: Filter test files from coverage
run: |
grep -v -E '_test\.go:' coverage.txt > coverage.filtered.txt || true
mv coverage.filtered.txt coverage.txt
- name: Check coverage
id: coverage
run: |
go install github.com/vladopajic/go-test-coverage/v2@latest
go-test-coverage --config ./.testcoverage.yml --github-action-output
- name: Restore baseline coverage
uses: actions/cache/restore@v5
with:
path: coverage-baseline.txt
key: coverage-baseline-${{ gitea.run_id }}
restore-keys: |
coverage-baseline-
- name: Compare coverage
run: |
CURRENT="${{ steps.coverage.outputs.total-coverage }}"
if [ -f coverage-baseline.txt ]; then
BASE=$(cat coverage-baseline.txt)
echo "Base coverage: ${BASE}%"
echo "Current coverage: ${CURRENT}%"
if [ "$(echo "$CURRENT < $BASE" | bc -l)" -eq 1 ]; then
echo "::error::Coverage decreased from ${BASE}% to ${CURRENT}%"
exit 1
fi
echo "Coverage maintained or improved: ${BASE}% -> ${CURRENT}%"
else
echo "No baseline coverage found yet, skipping comparison"
echo "Current coverage: ${CURRENT}%"
fi
- name: Post coverage comment
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
GITEA_URL: ${{ gitea.server_url }}
run: |
COVERAGE="${{ steps.coverage.outputs.total-coverage }}"
curl -X POST "${GITEA_URL}/api/v1/repos/${{ gitea.repository }}/issues/${{ gitea.event.pull_request.number }}/comments" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d "{\"body\": \"## Coverage Report\n\nTotal coverage: **${COVERAGE}%**\"}"
coverage-baseline:
# Records main's coverage into the Actions cache for the next PR's
# regression gate to read. Post-merge only, not a required check, blocks
# nothing (cf. ADR-0010).
if: gitea.event_name == 'push'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/setup-go@v6
with:
go-version: 'stable'
- name: Compute coverage
id: coverage
run: |
go install github.com/vladopajic/go-test-coverage/v2@latest
go test -coverprofile=coverage.txt ./...
grep -v -E '_test\.go:' coverage.txt > coverage.filtered.txt || true
mv coverage.filtered.txt coverage.txt
go-test-coverage --config ./.testcoverage.yml --github-action-output
- name: Write baseline file
run: echo "${{ steps.coverage.outputs.total-coverage }}" > coverage-baseline.txt
- name: Save baseline to cache
uses: actions/cache/save@v5
with:
path: coverage-baseline.txt
key: coverage-baseline-${{ gitea.run_id }}
vulnerabilities:
if: gitea.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/setup-go@v6
with:
go-version: 'stable'
- name: Check vulnerabilities
run: |
go install golang.org/x/vuln/cmd/govulncheck@latest
govulncheck ./...
+25
View File
@@ -0,0 +1,25 @@
name: pre-commit
permissions: read-all
on:
pull_request:
push:
branches:
- main
jobs:
pre-commit:
runs-on: ubuntu-latest
env:
SKIP: no-commit-to-branch
steps:
- uses: actions/checkout@v6
- uses: actions/setup-go@v6
with:
go-version: stable
- uses: actions/setup-python@v6
with:
python-version: '3.14'
- name: Install goimports
run: go install golang.org/x/tools/cmd/goimports@latest
- uses: pre-commit/action@v3.0.1
+9
View File
@@ -0,0 +1,9 @@
name: Release
on:
push:
branches: [main]
jobs:
release:
uses: unboundsoftware/shared-workflows/.gitea/workflows/Release.yml@main
+5
View File
@@ -0,0 +1,5 @@
.idea
.claude
/release
coverage.txt
coverage-baseline.txt
+22
View File
@@ -0,0 +1,22 @@
version: "2"
run:
allow-parallel-runners: true
linters:
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
paths:
- third_party$
- builtin$
- examples$
formatters:
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
+39
View File
@@ -0,0 +1,39 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
args:
- --allow-multiple-documents
- id: check-added-large-files
- repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook
rev: v9.25.0
hooks:
- id: commitlint
stages: [ commit-msg ]
additional_dependencies: [ '@commitlint/config-conventional' ]
- repo: https://github.com/dnephin/pre-commit-golang
rev: v0.5.1
hooks:
- id: go-mod-tidy
- id: go-imports
args:
- -local
- gitea.unbound.se/shiny/subscriptions
- repo: https://github.com/lietu/go-pre-commit
rev: v1.0.0
hooks:
- id: go-test
- id: gofumpt
- repo: https://github.com/golangci/golangci-lint
rev: v2.12.2
hooks:
- id: golangci-lint-full
- repo: https://github.com/gitleaks/gitleaks
rev: v8.30.1
hooks:
- id: gitleaks
+13
View File
@@ -0,0 +1,13 @@
# Coverage configuration for go-test-coverage
# https://github.com/vladopajic/go-test-coverage
profile: coverage.txt
threshold:
file: 0
package: 0
total: 0
exclude:
paths:
- _test\.go$
+3
View File
@@ -0,0 +1,3 @@
{
"version": "v0.1.0"
}
+11
View File
@@ -0,0 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.
## [0.1.0] - 2026-06-16
### 🚀 Features
- Initial version: type-generic `Registry[T]` for cross-service read-your-writes GraphQL subscriptions (ADR-0012) — keyed subscriber map, non-blocking fan-out (sends under the read lock), a bounded worker pool that runs the read-view gate off the AMQP delivery goroutine, and `Observer` metric hooks. Extracted and hardened from the hand-rolled `authz-service` and `accounting-service` copies.
<!-- generated by git-cliff -->
+82
View File
@@ -0,0 +1,82 @@
# subscriptions
Shared Go library: the reusable core of Shiny's cross-service read-your-writes
GraphQL subscriptions.
## Shared Documentation
@../docs/claude/architecture.md
@../docs/claude/go-services.md
@../docs/claude/event-sourcing.md
@../docs/claude/conventions.md
## Library Information
### Purpose
Single home for the subscription subscriber-registry + read-view gate + fan-out
that was hand-rolled (near-identically) in `authz-service/subscription` and
`accounting-service/subscription`. Implements the mechanism mandated by
**ADR-0012** (cross-service read-your-writes via owning-service subscriptions),
which is the concrete form of **ADR-0009** tier-3. ADR-0012 requires new
instances of the pattern to use this library rather than copy it.
### Usage
```go
import "gitea.unbound.se/shiny/subscriptions"
// One registry per subscription field, parameterised by the GraphQL payload.
reg := subscriptions.New[model.EntryBasisChange](
subscriptions.WithLogger(logger),
subscriptions.WithObserver(otelObserver), // optional metrics
)
// Resolver — register the websocket consumer; cleanup on ctx.Done.
ch, cleanup, err := reg.AddReceiver(companyID)
// AMQP Process — gate on the read view, push off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
basis, err := readView.FindEntryBasisById(ctx, id)
if err != nil { return nil, false }
return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})
```
### Exported API
- `New[T](opts...) *Registry[T]` — starts the worker pool; `Close()` stops it.
- `(*Registry[T]).AddReceiver(key) (<-chan *T, cleanup func(), error)` — register
a subscriber; the resolver returns the channel and calls cleanup on ctx.Done.
- `(*Registry[T]).Submit(key, Producer[T])` — from the AMQP handler; non-blocking.
- `Producer[T] func(ctx) (*T, ready bool)` — reads current read-view state,
returns the payload + whether the change is visible; retried until ready.
- Options: `WithLogger`, `WithObserver`, `WithReadRetry(attempts, wait)`,
`WithBufferSize`, `WithWorkers`, `WithQueueSize`.
- `Observer``PushSkipped`/`Dropped`/`ChannelFull` hooks for metrics.
### Design notes (the load-bearing bits, per ADR-0012)
- **Per-replica.** Feed `Submit` from a `goamqp.TransientEventStreamConsumer` on
the owning service's *own* events, so every replica sees every event and can
push to the websockets it holds — distinct from the shared durable read-view
consumer.
- **Read-view gate.** The `Producer` must read *current* read-view state on each
call (so out-of-order delivery across workers is still consistent) and report
not-ready on a transient read error. The registry retries until ready or the
budget elapses, so the client's refetch can't race the projection.
- **Off the delivery goroutine.** `Submit` enqueues to a bounded worker pool and
returns; the AMQP message is acked immediately. The poke is idempotent and
drop-tolerant, so losing at-least-once on the poke is fine — the client
refetches on any poke.
- **No send on closed channel.** Pushes happen under the read lock; cleanup
closes under the write lock.
### Conventions
Standard Shiny library scaffolding: `gofumpt`/`goimports -local`, golangci-lint,
gitleaks and conventional-commit checks via pre-commit; coverage-regression gate
in CI (`.testcoverage.yml`); releases auto-tagged from conventional commits by
the shared Release workflow. Bump consuming services' `go.mod` after a release.
This library is concurrency-critical — always run `go test -race` and keep the
concurrent-churn test green before changing the locking or worker model.
+43
View File
@@ -0,0 +1,43 @@
# subscriptions
Shared core for Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0009 tier-3, ADR-0012).
An entity shown in the UI is frequently projected from *another* service's
event, so the owning service exposes a GraphQL subscription, drives it from a
per-replica transient AMQP consumer, and pushes a lightweight poke once the
change is visible in its own read view — the client then refetches the
authoritative query. This package is the reusable, type-generic, hardened core
of that pattern, extracted from the hand-rolled copies in `authz-service`
(`availableCompanies`) and `accounting-service` (`entryBasesChanged`).
```go
import "gitea.unbound.se/shiny/subscriptions"
// One registry per subscription, parameterised by the GraphQL payload type.
reg := subscriptions.New[model.EntryBasisChange](subscriptions.WithLogger(logger))
// Resolver: register a websocket consumer (key by company, user, …).
ch, cleanup, _ := reg.AddReceiver(companyID)
go func() { <-ctx.Done(); cleanup() }()
return ch, nil
// AMQP handler: gate the push on the read view, off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
basis, err := readView.FindEntryBasisById(ctx, id)
if err != nil {
return nil, false // transient read error — keep waiting
}
return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})
```
What the registry owns (so services don't re-roll it): the keyed subscriber map,
non-blocking buffered fan-out (sends under the read lock so a close can't race a
send), a bounded worker pool that runs the read-view gate **off** the AMQP
delivery goroutine, and the retry/timeout budget. What stays in the service: the
event→(key, payload) mapping and the `Producer` read-view closure.
The poke is idempotent and drop-tolerant — the client refetches on any poke — so
the worker acks immediately and a dropped/duplicated poke self-heals. Wire an
`Observer` to surface dropped/skipped pushes as metrics.
+80
View File
@@ -0,0 +1,80 @@
# git-cliff ~ default configuration file
# https://git-cliff.org/docs/configuration
#
# Lines starting with "#" are comments.
# Configuration options are organized into tables and keys.
# See documentation for more information on available options.
[changelog]
# template for the changelog header
header = """
# Changelog\n
All notable changes to this project will be documented in this file.\n
"""
# template for the changelog body
# https://keats.github.io/tera/docs/#introduction
body = """
{% if version %}\
## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }}
{% else %}\
## [unreleased]
{% endif %}\
{% for group, commits in commits | group_by(attribute="group") %}
### {{ group | striptags | trim | upper_first }}
{% for commit in commits %}
- {% if commit.scope %}*({{ commit.scope }})* {% endif %}\
{% if commit.breaking %}[**breaking**] {% endif %}\
{{ commit.message | upper_first }}\
{% endfor %}
{% endfor %}\n
"""
# template for the changelog footer
footer = """
<!-- generated by git-cliff -->
"""
# remove the leading and trailing s
trim = true
# postprocessors
postprocessors = [
# { pattern = '<REPO>', replace = "https://github.com/orhun/git-cliff" }, # replace repository URL
]
# render body even when there are no releases to process
# render_always = true
# output file path
# output = "test.md"
[git]
# parse the commits based on https://www.conventionalcommits.org
conventional_commits = true
# filter out the commits that are not conventional
filter_unconventional = true
# process each line of a commit as an individual commit
split_commits = false
# regex for preprocessing the commit messages
commit_preprocessors = [
# Replace issue numbers
#{ pattern = '\((\w+\s)?#([0-9]+)\)', replace = "([#${2}](<REPO>/issues/${2}))"},
# Check spelling of the commit with https://github.com/crate-ci/typos
# If the spelling is incorrect, it will be automatically fixed.
#{ pattern = '.*', replace_command = 'typos --write-changes -' },
]
# regex for parsing and grouping commits
commit_parsers = [
{ message = "^feat", group = "<!-- 0 -->🚀 Features" },
{ message = "^fix", group = "<!-- 1 -->🐛 Bug Fixes" },
{ message = "^doc", group = "<!-- 3 -->📚 Documentation" },
{ message = "^perf", group = "<!-- 4 -->⚡ Performance" },
{ message = "^refactor", group = "<!-- 2 -->🚜 Refactor" },
{ message = "^style", group = "<!-- 5 -->🎨 Styling" },
{ message = "^test", group = "<!-- 6 -->🧪 Testing" },
{ message = "^chore\\(release\\): prepare for", skip = true },
{ message = "^chore|^ci", group = "<!-- 7 -->⚙️ Miscellaneous Tasks" },
{ body = ".*security", group = "<!-- 8 -->🛡️ Security" },
{ message = "^revert", group = "<!-- 9 -->◀️ Revert" },
]
# filter out the commits that are not matched by commit parsers
filter_commits = false
# sort the tags topologically
topo_order = false
# sort the commits inside sections by oldest/newest order
sort_commits = "oldest"
+14
View File
@@ -0,0 +1,14 @@
module gitea.unbound.se/shiny/subscriptions
go 1.25
require (
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.11.1
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+12
View File
@@ -0,0 +1,12 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+6
View File
@@ -0,0 +1,6 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:recommended"
]
}
+421
View File
@@ -0,0 +1,421 @@
// Package subscriptions provides a reusable, type-generic registry for fanning
// out change notifications to in-process GraphQL subscription consumers.
//
// It is the shared core of Shiny's cross-service read-your-writes pattern
// (ADR-0012): an entity shown in the UI is frequently projected from another
// service's event, so the owning service exposes a GraphQL subscription, drives
// it from a per-replica transient AMQP consumer, and pushes a notification once
// the change is visible in its own read view — the client then refetches the
// authoritative query. Two services hand-rolled this (authz-service's
// availableCompanies, accounting-service's entryBasesChanged); this package is
// the extracted, hardened core so further cases reuse it instead of copying it.
//
// # Wiring requirement (do not get this wrong)
//
// Submit MUST be fed from a per-replica goamqp.TransientEventStreamConsumer (an
// exclusive, randomly-named queue) bound to the owning service's OWN events, so
// every replica receives every event and can push to the websockets it holds.
// This is necessarily a DIFFERENT consumer from the shared, durable read-view
// projection consumer (a work-queue, where exactly one replica handles each
// event). Wiring Submit to a shared/durable consumer silently breaks delivery in
// a multi-replica deployment: the one replica that handles an event usually does
// not hold the subscriber's websocket, so the poke is lost with no error. The
// library cannot enforce this (it is transport-agnostic) — the caller must.
//
// # Concurrency model
//
// - AddReceiver registers a subscriber (one per websocket) and returns a
// buffered channel plus a cleanup func that must be called when the
// subscription ends.
// - Submit is called from the AMQP event handler. It does NOT block that
// handler on the read view: it hands the work to a per-key worker that waits
// (with a budget) for the read view to reflect the change — via the caller's
// [Producer] — and only then pushes. Acking the AMQP message immediately is
// safe because notifications are idempotent and drop-tolerant (see below).
// - Work is sharded by key, so all events for one key are processed FIFO by a
// single worker (preserving per-key order even for payloads the client
// consumes directly), while distinct keys run in parallel (so one lagging
// read view only delays its own key's shard, not everything).
//
// Pushes happen while holding the read lock, and cleanup closes a subscriber's
// channel under the write lock, so a send can never race a close ("send on
// closed channel"). A full subscriber buffer drops the notification rather than
// blocking a slow consumer.
//
// # Payload contract
//
// T should be a lightweight notification the client reacts to by refetching the
// authoritative query (a poke such as {id, removed}), not authoritative state
// the client consumes as the source of truth. Notifications may be dropped (full
// queue or buffer) and the push is best-effort (no AMQP requeue on a persistent
// read failure), so reliability comes from the client's idempotent refetch.
// Per-key FIFO ordering is preserved, so a payload the client does consume
// directly is at least delivered in event order for a given key — but it can
// still be dropped, so a refetch-on-receipt design is strongly preferred.
package subscriptions
import (
"context"
"errors"
"hash/fnv"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
// ErrEmptyKey is returned by AddReceiver when the subscription key is empty,
// which almost always indicates an unpopulated id at the call site.
var ErrEmptyKey = errors.New("subscriptions: empty subscription key")
// Producer reads the owning service's read view and returns the payload to push
// together with whether the change is yet visible there.
//
// The registry calls a Producer repeatedly until it reports ready (or a bounded
// budget elapses), so the client's refetch can never race ahead of the
// projection. A Producer MUST read current read-view state on each call (rather
// than capturing the event's historical state). On a transient read error it
// should return (nil, false) to keep waiting — the event is already durable, so
// only the projection is lagging.
//
// Convergence precondition: "retry until ready" only terminates as ready (rather
// than burning the whole budget then skipping) if the read view the Producer
// gates on is made consistent by the SAME ordered aggregate stream as the
// triggering event. Gating on a row populated by a different aggregate's events
// can fail to converge.
type Producer[T any] func(ctx context.Context) (payload *T, ready bool)
// Observer receives best-effort notifications about pushes, for
// metrics/observability. Its methods may be called concurrently. The default is
// a no-op; wire an implementation (e.g. OTel counters) via [WithObserver].
type Observer interface {
// Pushed reports that a change was gated and delivered to the key's
// subscribers — the denominator for a skip/drop rate.
Pushed(key string)
// PushSkipped reports that the read view never reflected the change within
// the retry budget, so the push was skipped.
PushSkipped(key string)
// Dropped reports that the worker queue was full, so the notification was
// dropped before it could be gated.
Dropped(key string)
// ChannelFull reports that a subscriber's buffer was full, so its
// notification was dropped.
ChannelFull(key string)
}
type noopObserver struct{}
func (noopObserver) Pushed(string) {}
func (noopObserver) PushSkipped(string) {}
func (noopObserver) Dropped(string) {}
func (noopObserver) ChannelFull(string) {}
type subscriber[T any] struct {
id string
channel chan *T
}
type job[T any] struct {
key string
produce Producer[T]
}
// Registry fans out notifications of type T to in-process subscribers keyed by
// an arbitrary string (e.g. a company id or a user email). The zero value is
// not usable; construct one with [New].
type Registry[T any] struct {
logger *slog.Logger
obs Observer
bufferSize int
retries int
retryWait time.Duration
mu sync.RWMutex
subscribers map[string]map[string]*subscriber[T]
shards []chan job[T]
baseCtx context.Context
cancel context.CancelFunc
done chan struct{}
wg sync.WaitGroup
closeOnce sync.Once
}
type config struct {
logger *slog.Logger
obs Observer
bufferSize int
retries int
retryWait time.Duration
workers int
queueSize int
}
// Option configures a [Registry].
type Option func(*config)
// WithLogger sets the structured logger. Defaults to [slog.Default].
func WithLogger(l *slog.Logger) Option {
return func(c *config) {
if l != nil {
c.logger = l
}
}
}
// WithObserver wires a metrics observer. Defaults to a no-op.
func WithObserver(o Observer) Option {
return func(c *config) {
if o != nil {
c.obs = o
}
}
}
// WithReadRetry tunes how long a worker waits for the read view to reflect a
// change before giving up on the push: up to attempts re-reads spaced by wait
// (so total reads = attempts+1; default 25 × 200ms ≈ 5s). The read-view consumer
// and this subscription consumer are independent consumers of the same event, so
// a freshly-projected change may not be visible on the first read; retrying
// avoids pushing a notification the client would refetch ahead of. Non-positive
// values are ignored (attempts clamps to ≥0).
func WithReadRetry(attempts int, wait time.Duration) Option {
return func(c *config) {
if attempts >= 0 {
c.retries = attempts
}
if wait > 0 {
c.retryWait = wait
}
}
}
// WithBufferSize sets each subscriber channel's buffer (default 20). A full
// buffer drops the notification rather than blocking.
func WithBufferSize(n int) Option {
return func(c *config) {
if n > 0 {
c.bufferSize = n
}
}
}
// WithWorkers sets the number of key-shard workers (default 4). Each key is
// handled FIFO by exactly one worker; more workers spread distinct keys over
// more goroutines so a lagging read view delays only its own shard.
func WithWorkers(n int) Option {
return func(c *config) {
if n > 0 {
c.workers = n
}
}
}
// WithQueueSize sets each shard's job-queue depth (default 64). A full queue
// drops the notification (reported via [Observer.Dropped]) rather than blocking
// the AMQP delivery goroutine.
func WithQueueSize(n int) Option {
return func(c *config) {
if n > 0 {
c.queueSize = n
}
}
}
// New builds a Registry and starts its key-shard workers. Call [Registry.Close]
// to stop them (optional; they exit with the process otherwise).
func New[T any](opts ...Option) *Registry[T] {
c := &config{
logger: slog.Default(),
obs: noopObserver{},
bufferSize: 20,
retries: 25,
retryWait: 200 * time.Millisecond,
workers: 4,
queueSize: 64,
}
for _, o := range opts {
o(c)
}
ctx, cancel := context.WithCancel(context.Background())
r := &Registry[T]{
logger: c.logger,
obs: c.obs,
bufferSize: c.bufferSize,
retries: c.retries,
retryWait: c.retryWait,
subscribers: make(map[string]map[string]*subscriber[T]),
shards: make([]chan job[T], c.workers),
baseCtx: ctx,
cancel: cancel,
done: make(chan struct{}),
}
r.wg.Add(c.workers)
for i := range r.shards {
r.shards[i] = make(chan job[T], c.queueSize)
go r.worker(r.shards[i])
}
return r
}
// AddReceiver registers a subscriber for the given key. It returns the channel
// to stream and a cleanup func that MUST be called when the subscription ends
// (e.g. from the resolver's ctx.Done) to close the channel and release the
// registration. cleanup is idempotent. Returns [ErrEmptyKey] for an empty key.
func (r *Registry[T]) AddReceiver(key string) (<-chan *T, func(), error) {
if key == "" {
return nil, nil, ErrEmptyKey
}
s := &subscriber[T]{
id: uuid.NewString(),
channel: make(chan *T, r.bufferSize),
}
r.mu.Lock()
if r.subscribers[key] == nil {
r.subscribers[key] = make(map[string]*subscriber[T])
}
r.subscribers[key][s.id] = s
total := len(r.subscribers[key])
r.mu.Unlock()
r.logger.Info("subscription registered",
"key", key, "subscription_id", s.id, "total_subscriptions", total)
cleanup := func() { r.removeReceiver(key, s.id) }
return s.channel, cleanup, nil
}
func (r *Registry[T]) removeReceiver(key, id string) {
r.mu.Lock()
defer r.mu.Unlock()
subs := r.subscribers[key]
if subs == nil {
return
}
s, ok := subs[id]
if !ok {
return
}
close(s.channel)
delete(subs, id)
remaining := len(subs)
if remaining == 0 {
delete(r.subscribers, key)
}
r.logger.Info("subscription removed",
"key", key, "subscription_id", id, "remaining_subscriptions", remaining)
}
// Submit schedules a gated push for the given key. It returns immediately: when
// the key is empty or has no subscribers it does nothing, otherwise it enqueues
// work for that key's shard worker (dropping, with an [Observer.Dropped], only if
// the shard queue is full). produce is invoked on the worker, not on the calling
// goroutine.
func (r *Registry[T]) Submit(key string, produce Producer[T]) {
if key == "" || !r.hasSubscribers(key) {
return
}
shard := r.shards[shardIndex(key, len(r.shards))]
select {
case shard <- job[T]{key: key, produce: produce}:
case <-r.done:
// shutting down
default:
r.logger.Warn("subscription job queue full; dropping notification", "key", key)
r.obs.Dropped(key)
}
}
// Close stops the worker pool and waits for in-flight gating to finish. It
// cancels any in-flight read-view wait so workers return promptly rather than
// blocking for the full retry budget. Queued-but-unstarted notifications are
// dropped (safe — they are drop-tolerant). Idempotent; Submit after Close is a
// no-op.
func (r *Registry[T]) Close() {
r.closeOnce.Do(func() {
r.cancel()
close(r.done)
r.wg.Wait()
})
}
func (r *Registry[T]) hasSubscribers(key string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.subscribers[key]) > 0
}
func (r *Registry[T]) worker(shard <-chan job[T]) {
defer r.wg.Done()
for {
select {
case j := <-shard:
r.handle(j)
case <-r.done:
return
}
}
}
func (r *Registry[T]) handle(j job[T]) {
// Bound the read-view wait so a hung read or a never-reflected event (e.g.
// redelivery of an event for a since-deleted entity) can't pin a worker.
// Derived from baseCtx so Close cancels in-flight waits promptly.
budget := time.Duration(r.retries+1) * r.retryWait
ctx, cancel := context.WithTimeout(r.baseCtx, budget)
defer cancel()
payload, ok := r.await(ctx, j.produce)
if !ok {
r.logger.Warn("change not visible in read view after retries; subscription push skipped",
"key", j.key)
r.obs.PushSkipped(j.key)
return
}
r.obs.Pushed(j.key)
r.push(j.key, payload)
}
func (r *Registry[T]) await(ctx context.Context, produce Producer[T]) (*T, bool) {
for attempt := 0; ; attempt++ {
if payload, ready := produce(ctx); ready {
return payload, true
}
if attempt >= r.retries {
return nil, false
}
select {
case <-ctx.Done():
return nil, false
case <-time.After(r.retryWait):
}
}
}
// push delivers payload to every subscriber of key. The sends happen under the
// read lock: sendNonBlocking never blocks (it drops on a full buffer), so
// holding the lock is cheap, and it prevents removeReceiver — which takes the
// write lock to close a channel — from closing a channel out from under an
// in-flight send.
func (r *Registry[T]) push(key string, payload *T) {
r.mu.RLock()
defer r.mu.RUnlock()
for _, s := range r.subscribers[key] {
select {
case s.channel <- payload:
default:
r.logger.Warn("subscription channel full; dropping notification", "key", key)
r.obs.ChannelFull(key)
}
}
}
func shardIndex(key string, n int) int {
h := fnv.New32a()
_, _ = h.Write([]byte(key))
return int(h.Sum32() % uint32(n)) //nolint:gosec // modulo keeps the result in [0,n)
}
+383
View File
@@ -0,0 +1,383 @@
package subscriptions
import (
"context"
"fmt"
"io"
"log/slog"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type ping struct {
ID string
}
func quiet() Option {
return WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil)))
}
// recordObserver records the metric callbacks; safe for concurrent use.
type recordObserver struct {
mu sync.Mutex
pushed []string
skipped []string
dropped []string
channelFull []string
}
func (o *recordObserver) Pushed(k string) { o.add(&o.pushed, k) }
func (o *recordObserver) PushSkipped(k string) { o.add(&o.skipped, k) }
func (o *recordObserver) Dropped(k string) { o.add(&o.dropped, k) }
func (o *recordObserver) ChannelFull(k string) { o.add(&o.channelFull, k) }
func (o *recordObserver) add(dst *[]string, k string) {
o.mu.Lock()
defer o.mu.Unlock()
*dst = append(*dst, k)
}
func (o *recordObserver) count(get func(*recordObserver) []string) int {
o.mu.Lock()
defer o.mu.Unlock()
return len(get(o))
}
func (o *recordObserver) pushedCount() int {
return o.count(func(r *recordObserver) []string { return r.pushed })
}
func (o *recordObserver) skippedCount() int {
return o.count(func(r *recordObserver) []string { return r.skipped })
}
func (o *recordObserver) droppedCount() int {
return o.count(func(r *recordObserver) []string { return r.dropped })
}
func (o *recordObserver) channelFullCount() int {
return o.count(func(r *recordObserver) []string { return r.channelFull })
}
func ready(p *ping) Producer[ping] {
return func(context.Context) (*ping, bool) { return p, true }
}
func never() Producer[ping] {
return func(context.Context) (*ping, bool) { return nil, false }
}
func recv(t *testing.T, ch <-chan *ping) *ping {
t.Helper()
select {
case v := <-ch:
return v
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for a push")
return nil
}
}
func assertNoPush(t *testing.T, ch <-chan *ping) {
t.Helper()
select {
case v := <-ch:
t.Fatalf("unexpected push: %+v", v)
case <-time.After(100 * time.Millisecond):
}
}
func TestSubmit_pushesWhenReady(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
ch, cleanup, err := r.AddReceiver("c1")
require.NoError(t, err)
defer cleanup()
r.Submit("c1", ready(&ping{ID: "eb1"}))
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
}
func TestSubmit_noSubscribers_doesNotEnqueue(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
var calls int
r.Submit("c1", func(context.Context) (*ping, bool) {
calls++
return &ping{ID: "x"}, true
})
// No subscribers → Submit returns on the fast path without enqueuing, so the
// producer is never invoked.
assert.Equal(t, 0, calls)
}
func TestSubmit_emptyKey_isNoop(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
var calls int
r.Submit("", func(context.Context) (*ping, bool) {
calls++
return nil, true
})
assert.Equal(t, 0, calls)
}
func TestSubmit_lagThenReady(t *testing.T) {
r := New[ping](quiet(), WithReadRetry(50, time.Millisecond))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
var mu sync.Mutex
calls := 0
r.Submit("c1", func(context.Context) (*ping, bool) {
mu.Lock()
defer mu.Unlock()
calls++
// Not visible on the first read; visible thereafter.
return &ping{ID: "eb1"}, calls > 1
})
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
}
func TestSubmit_neverReady_skipsAndReports(t *testing.T) {
obs := &recordObserver{}
r := New[ping](quiet(), WithObserver(obs), WithReadRetry(2, time.Millisecond))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
r.Submit("c1", never())
assert.Eventually(t, func() bool { return obs.skippedCount() == 1 }, time.Second, 5*time.Millisecond)
assert.Equal(t, 0, obs.pushedCount())
assertNoPush(t, ch)
}
func TestObserver_pushedOnDelivery(t *testing.T) {
obs := &recordObserver{}
r := New[ping](quiet(), WithObserver(obs))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
r.Submit("c1", ready(&ping{ID: "eb1"}))
recv(t, ch)
assert.Eventually(t, func() bool { return obs.pushedCount() == 1 }, time.Second, 5*time.Millisecond)
}
func TestPush_allSubscribersOfKey(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
chA, cleanupA, _ := r.AddReceiver("c1")
defer cleanupA()
chB, cleanupB, _ := r.AddReceiver("c1")
defer cleanupB()
r.Submit("c1", ready(&ping{ID: "eb1"}))
assert.Equal(t, "eb1", recv(t, chA).ID)
assert.Equal(t, "eb1", recv(t, chB).ID)
}
func TestSubmit_keysIsolated(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
chA, cleanupA, _ := r.AddReceiver("c1")
defer cleanupA()
chB, cleanupB, _ := r.AddReceiver("c2")
defer cleanupB()
r.Submit("c1", ready(&ping{ID: "eb1"}))
assert.Equal(t, "eb1", recv(t, chA).ID)
assertNoPush(t, chB)
}
// TestPerKeyOrdering asserts that events for one key are delivered FIFO even
// across the worker pool (same key → same shard → one worker). This guards the
// ordering guarantee a payload the client consumes directly relies on.
func TestPerKeyOrdering(t *testing.T) {
r := New[ping](quiet(), WithWorkers(4))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
const n = 10
for i := range n {
r.Submit("c1", ready(&ping{ID: fmt.Sprintf("e%02d", i)}))
}
for i := range n {
assert.Equal(t, fmt.Sprintf("e%02d", i), recv(t, ch).ID)
}
}
func TestAddReceiver_emptyKeyErrors(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
ch, cleanup, err := r.AddReceiver("")
assert.ErrorIs(t, err, ErrEmptyKey)
assert.Nil(t, ch)
assert.Nil(t, cleanup)
}
func TestRemoveReceiver_closesChannel(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
cleanup()
// Draining a closed channel terminates the range/returns ok=false.
_, ok := <-ch
assert.False(t, ok)
}
func TestRemoveReceiver_idempotent(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
_, cleanup, _ := r.AddReceiver("c1")
cleanup()
assert.NotPanics(t, cleanup) // second call is a no-op
}
func TestChannelFull_dropsAndReports(t *testing.T) {
obs := &recordObserver{}
// One worker so the two pushes are serialized; buffer 1 so the second drops.
r := New[ping](quiet(), WithObserver(obs), WithBufferSize(1), WithWorkers(1))
defer r.Close()
_, cleanup, _ := r.AddReceiver("c1") // never read from the channel
defer cleanup()
r.Submit("c1", ready(&ping{ID: "first"}))
r.Submit("c1", ready(&ping{ID: "second"}))
assert.Eventually(t, func() bool { return obs.channelFullCount() == 1 }, time.Second, 5*time.Millisecond)
}
func TestSubmit_queueFull_dropsAndReports(t *testing.T) {
obs := &recordObserver{}
started := make(chan struct{})
release := make(chan struct{})
r := New[ping](quiet(), WithObserver(obs), WithWorkers(1), WithQueueSize(1), WithReadRetry(0, time.Millisecond))
defer r.Close() // runs last
defer close(release) // runs before Close, unblocking the worker
_, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
var once sync.Once
r.Submit("c1", func(context.Context) (*ping, bool) {
once.Do(func() { close(started) })
<-release
return &ping{ID: "blocking"}, true
})
<-started // the single worker is now blocked in produce; its shard queue is empty
r.Submit("c1", ready(&ping{ID: "fills-queue"})) // occupies the cap-1 shard queue
r.Submit("c1", ready(&ping{ID: "dropped"})) // queue full → dropped
assert.Eventually(t, func() bool { return obs.droppedCount() >= 1 }, time.Second, 5*time.Millisecond)
}
func TestClose_isIdempotentAndSubmitAfterCloseIsNoop(t *testing.T) {
r := New[ping](quiet())
r.Close()
r.Close() // idempotent
assert.NotPanics(t, func() {
r.Submit("c1", ready(&ping{ID: "x"}))
})
}
// TestClose_cancelsInFlightGate asserts Close returns promptly even while a
// worker is mid-gate, rather than blocking for the full retry budget (~5s by
// default). Guards the registry-level context cancellation.
func TestClose_cancelsInFlightGate(t *testing.T) {
started := make(chan struct{})
r := New[ping](quiet()) // default retry budget ≈ 5s
_, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
var once sync.Once
r.Submit("c1", func(context.Context) (*ping, bool) {
once.Do(func() { close(started) })
return nil, false // never ready → worker stays in the gate loop
})
<-started // worker is now in the gate loop
done := make(chan struct{})
go func() { r.Close(); close(done) }()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("Close did not return promptly while a worker was gating")
}
}
// TestConcurrentChurn stresses the race between removeReceiver (which closes a
// subscriber channel under the write lock) and a worker push (which sends under
// the read lock). Before sends were moved under the read lock this panicked
// with "send on closed channel". Run with -race.
func TestConcurrentChurn(t *testing.T) {
r := New[ping](quiet(), WithReadRetry(0, time.Millisecond), WithWorkers(8))
defer r.Close()
// A long-lived reader so there is always at least one subscriber being
// pushed to while others churn.
_, steadyCleanup, _ := r.AddReceiver("c1")
defer steadyCleanup()
go func() {
ch, _, _ := r.AddReceiver("c1")
for range ch { //nolint:revive // drain
}
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for range 2000 {
_, cleanup, err := r.AddReceiver("c1")
if err != nil {
t.Errorf("AddReceiver: %v", err)
return
}
cleanup()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for range 2000 {
r.Submit("c1", ready(&ping{ID: "x"}))
}
}()
wg.Wait()
}