Files
subscriptions/subscriptions_test.go
argoyle da4e7df6ce
subscriptions / test (push) Has been skipped
subscriptions / vulnerabilities (push) Has been skipped
Release / release (push) Successful in 38s
subscriptions / coverage-baseline (push) Successful in 2m58s
pre-commit / pre-commit (push) Successful in 4m58s
feat: type-generic registry for cross-service read-your-writes subscriptions
The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0012), extracted and hardened from the near-identical hand-rolled handlers
in authz-service (availableCompanies) and accounting-service (entryBasesChanged)
before a third copy is written.

Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends
under the read lock so a close can't race a send), a key-sharded worker pool that
runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO
order while distinct keys run in parallel), the bounded retry/timeout budget, and
Observer metric hooks. Services supply only the event->key+payload mapping, the
read-view Producer closure, and the per-replica transient-consumer wiring.

Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:22:34 +02:00

384 lines
9.5 KiB
Go

package subscriptions
import (
"context"
"fmt"
"io"
"log/slog"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type ping struct {
ID string
}
func quiet() Option {
return WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil)))
}
// recordObserver records the metric callbacks; safe for concurrent use.
type recordObserver struct {
mu sync.Mutex
pushed []string
skipped []string
dropped []string
channelFull []string
}
func (o *recordObserver) Pushed(k string) { o.add(&o.pushed, k) }
func (o *recordObserver) PushSkipped(k string) { o.add(&o.skipped, k) }
func (o *recordObserver) Dropped(k string) { o.add(&o.dropped, k) }
func (o *recordObserver) ChannelFull(k string) { o.add(&o.channelFull, k) }
func (o *recordObserver) add(dst *[]string, k string) {
o.mu.Lock()
defer o.mu.Unlock()
*dst = append(*dst, k)
}
func (o *recordObserver) count(get func(*recordObserver) []string) int {
o.mu.Lock()
defer o.mu.Unlock()
return len(get(o))
}
func (o *recordObserver) pushedCount() int {
return o.count(func(r *recordObserver) []string { return r.pushed })
}
func (o *recordObserver) skippedCount() int {
return o.count(func(r *recordObserver) []string { return r.skipped })
}
func (o *recordObserver) droppedCount() int {
return o.count(func(r *recordObserver) []string { return r.dropped })
}
func (o *recordObserver) channelFullCount() int {
return o.count(func(r *recordObserver) []string { return r.channelFull })
}
func ready(p *ping) Producer[ping] {
return func(context.Context) (*ping, bool) { return p, true }
}
func never() Producer[ping] {
return func(context.Context) (*ping, bool) { return nil, false }
}
func recv(t *testing.T, ch <-chan *ping) *ping {
t.Helper()
select {
case v := <-ch:
return v
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for a push")
return nil
}
}
func assertNoPush(t *testing.T, ch <-chan *ping) {
t.Helper()
select {
case v := <-ch:
t.Fatalf("unexpected push: %+v", v)
case <-time.After(100 * time.Millisecond):
}
}
func TestSubmit_pushesWhenReady(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
ch, cleanup, err := r.AddReceiver("c1")
require.NoError(t, err)
defer cleanup()
r.Submit("c1", ready(&ping{ID: "eb1"}))
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
}
func TestSubmit_noSubscribers_doesNotEnqueue(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
var calls int
r.Submit("c1", func(context.Context) (*ping, bool) {
calls++
return &ping{ID: "x"}, true
})
// No subscribers → Submit returns on the fast path without enqueuing, so the
// producer is never invoked.
assert.Equal(t, 0, calls)
}
func TestSubmit_emptyKey_isNoop(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
var calls int
r.Submit("", func(context.Context) (*ping, bool) {
calls++
return nil, true
})
assert.Equal(t, 0, calls)
}
func TestSubmit_lagThenReady(t *testing.T) {
r := New[ping](quiet(), WithReadRetry(50, time.Millisecond))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
var mu sync.Mutex
calls := 0
r.Submit("c1", func(context.Context) (*ping, bool) {
mu.Lock()
defer mu.Unlock()
calls++
// Not visible on the first read; visible thereafter.
return &ping{ID: "eb1"}, calls > 1
})
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
}
func TestSubmit_neverReady_skipsAndReports(t *testing.T) {
obs := &recordObserver{}
r := New[ping](quiet(), WithObserver(obs), WithReadRetry(2, time.Millisecond))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
r.Submit("c1", never())
assert.Eventually(t, func() bool { return obs.skippedCount() == 1 }, time.Second, 5*time.Millisecond)
assert.Equal(t, 0, obs.pushedCount())
assertNoPush(t, ch)
}
func TestObserver_pushedOnDelivery(t *testing.T) {
obs := &recordObserver{}
r := New[ping](quiet(), WithObserver(obs))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
r.Submit("c1", ready(&ping{ID: "eb1"}))
recv(t, ch)
assert.Eventually(t, func() bool { return obs.pushedCount() == 1 }, time.Second, 5*time.Millisecond)
}
func TestPush_allSubscribersOfKey(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
chA, cleanupA, _ := r.AddReceiver("c1")
defer cleanupA()
chB, cleanupB, _ := r.AddReceiver("c1")
defer cleanupB()
r.Submit("c1", ready(&ping{ID: "eb1"}))
assert.Equal(t, "eb1", recv(t, chA).ID)
assert.Equal(t, "eb1", recv(t, chB).ID)
}
func TestSubmit_keysIsolated(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
chA, cleanupA, _ := r.AddReceiver("c1")
defer cleanupA()
chB, cleanupB, _ := r.AddReceiver("c2")
defer cleanupB()
r.Submit("c1", ready(&ping{ID: "eb1"}))
assert.Equal(t, "eb1", recv(t, chA).ID)
assertNoPush(t, chB)
}
// TestPerKeyOrdering asserts that events for one key are delivered FIFO even
// across the worker pool (same key → same shard → one worker). This guards the
// ordering guarantee a payload the client consumes directly relies on.
func TestPerKeyOrdering(t *testing.T) {
r := New[ping](quiet(), WithWorkers(4))
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
const n = 10
for i := range n {
r.Submit("c1", ready(&ping{ID: fmt.Sprintf("e%02d", i)}))
}
for i := range n {
assert.Equal(t, fmt.Sprintf("e%02d", i), recv(t, ch).ID)
}
}
func TestAddReceiver_emptyKeyErrors(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
ch, cleanup, err := r.AddReceiver("")
assert.ErrorIs(t, err, ErrEmptyKey)
assert.Nil(t, ch)
assert.Nil(t, cleanup)
}
func TestRemoveReceiver_closesChannel(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
ch, cleanup, _ := r.AddReceiver("c1")
cleanup()
// Draining a closed channel terminates the range/returns ok=false.
_, ok := <-ch
assert.False(t, ok)
}
func TestRemoveReceiver_idempotent(t *testing.T) {
r := New[ping](quiet())
defer r.Close()
_, cleanup, _ := r.AddReceiver("c1")
cleanup()
assert.NotPanics(t, cleanup) // second call is a no-op
}
func TestChannelFull_dropsAndReports(t *testing.T) {
obs := &recordObserver{}
// One worker so the two pushes are serialized; buffer 1 so the second drops.
r := New[ping](quiet(), WithObserver(obs), WithBufferSize(1), WithWorkers(1))
defer r.Close()
_, cleanup, _ := r.AddReceiver("c1") // never read from the channel
defer cleanup()
r.Submit("c1", ready(&ping{ID: "first"}))
r.Submit("c1", ready(&ping{ID: "second"}))
assert.Eventually(t, func() bool { return obs.channelFullCount() == 1 }, time.Second, 5*time.Millisecond)
}
func TestSubmit_queueFull_dropsAndReports(t *testing.T) {
obs := &recordObserver{}
started := make(chan struct{})
release := make(chan struct{})
r := New[ping](quiet(), WithObserver(obs), WithWorkers(1), WithQueueSize(1), WithReadRetry(0, time.Millisecond))
defer r.Close() // runs last
defer close(release) // runs before Close, unblocking the worker
_, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
var once sync.Once
r.Submit("c1", func(context.Context) (*ping, bool) {
once.Do(func() { close(started) })
<-release
return &ping{ID: "blocking"}, true
})
<-started // the single worker is now blocked in produce; its shard queue is empty
r.Submit("c1", ready(&ping{ID: "fills-queue"})) // occupies the cap-1 shard queue
r.Submit("c1", ready(&ping{ID: "dropped"})) // queue full → dropped
assert.Eventually(t, func() bool { return obs.droppedCount() >= 1 }, time.Second, 5*time.Millisecond)
}
func TestClose_isIdempotentAndSubmitAfterCloseIsNoop(t *testing.T) {
r := New[ping](quiet())
r.Close()
r.Close() // idempotent
assert.NotPanics(t, func() {
r.Submit("c1", ready(&ping{ID: "x"}))
})
}
// TestClose_cancelsInFlightGate asserts Close returns promptly even while a
// worker is mid-gate, rather than blocking for the full retry budget (~5s by
// default). Guards the registry-level context cancellation.
func TestClose_cancelsInFlightGate(t *testing.T) {
started := make(chan struct{})
r := New[ping](quiet()) // default retry budget ≈ 5s
_, cleanup, _ := r.AddReceiver("c1")
defer cleanup()
var once sync.Once
r.Submit("c1", func(context.Context) (*ping, bool) {
once.Do(func() { close(started) })
return nil, false // never ready → worker stays in the gate loop
})
<-started // worker is now in the gate loop
done := make(chan struct{})
go func() { r.Close(); close(done) }()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("Close did not return promptly while a worker was gating")
}
}
// TestConcurrentChurn stresses the race between removeReceiver (which closes a
// subscriber channel under the write lock) and a worker push (which sends under
// the read lock). Before sends were moved under the read lock this panicked
// with "send on closed channel". Run with -race.
func TestConcurrentChurn(t *testing.T) {
r := New[ping](quiet(), WithReadRetry(0, time.Millisecond), WithWorkers(8))
defer r.Close()
// A long-lived reader so there is always at least one subscriber being
// pushed to while others churn.
_, steadyCleanup, _ := r.AddReceiver("c1")
defer steadyCleanup()
go func() {
ch, _, _ := r.AddReceiver("c1")
for range ch { //nolint:revive // drain
}
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for range 2000 {
_, cleanup, err := r.AddReceiver("c1")
if err != nil {
t.Errorf("AddReceiver: %v", err)
return
}
cleanup()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for range 2000 {
r.Submit("c1", ready(&ping{ID: "x"}))
}
}()
wg.Wait()
}