package subscriptions import ( "context" "fmt" "io" "log/slog" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) type ping struct { ID string } func quiet() Option { return WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) } // recordObserver records the metric callbacks; safe for concurrent use. type recordObserver struct { mu sync.Mutex pushed []string skipped []string dropped []string channelFull []string } func (o *recordObserver) Pushed(k string) { o.add(&o.pushed, k) } func (o *recordObserver) PushSkipped(k string) { o.add(&o.skipped, k) } func (o *recordObserver) Dropped(k string) { o.add(&o.dropped, k) } func (o *recordObserver) ChannelFull(k string) { o.add(&o.channelFull, k) } func (o *recordObserver) add(dst *[]string, k string) { o.mu.Lock() defer o.mu.Unlock() *dst = append(*dst, k) } func (o *recordObserver) count(get func(*recordObserver) []string) int { o.mu.Lock() defer o.mu.Unlock() return len(get(o)) } func (o *recordObserver) pushedCount() int { return o.count(func(r *recordObserver) []string { return r.pushed }) } func (o *recordObserver) skippedCount() int { return o.count(func(r *recordObserver) []string { return r.skipped }) } func (o *recordObserver) droppedCount() int { return o.count(func(r *recordObserver) []string { return r.dropped }) } func (o *recordObserver) channelFullCount() int { return o.count(func(r *recordObserver) []string { return r.channelFull }) } func ready(p *ping) Producer[ping] { return func(context.Context) (*ping, bool) { return p, true } } func never() Producer[ping] { return func(context.Context) (*ping, bool) { return nil, false } } func recv(t *testing.T, ch <-chan *ping) *ping { t.Helper() select { case v := <-ch: return v case <-time.After(2 * time.Second): t.Fatal("timed out waiting for a push") return nil } } func assertNoPush(t *testing.T, ch <-chan *ping) { t.Helper() select { case v := <-ch: t.Fatalf("unexpected push: %+v", v) case <-time.After(100 * time.Millisecond): } } func TestSubmit_pushesWhenReady(t *testing.T) { r := New[ping](quiet()) defer r.Close() ch, cleanup, err := r.AddReceiver("c1") require.NoError(t, err) defer cleanup() r.Submit("c1", ready(&ping{ID: "eb1"})) assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch)) } func TestSubmit_noSubscribers_doesNotEnqueue(t *testing.T) { r := New[ping](quiet()) defer r.Close() var calls int r.Submit("c1", func(context.Context) (*ping, bool) { calls++ return &ping{ID: "x"}, true }) // No subscribers → Submit returns on the fast path without enqueuing, so the // producer is never invoked. assert.Equal(t, 0, calls) } func TestSubmit_emptyKey_isNoop(t *testing.T) { r := New[ping](quiet()) defer r.Close() var calls int r.Submit("", func(context.Context) (*ping, bool) { calls++ return nil, true }) assert.Equal(t, 0, calls) } func TestSubmit_lagThenReady(t *testing.T) { r := New[ping](quiet(), WithReadRetry(50, time.Millisecond)) defer r.Close() ch, cleanup, _ := r.AddReceiver("c1") defer cleanup() var mu sync.Mutex calls := 0 r.Submit("c1", func(context.Context) (*ping, bool) { mu.Lock() defer mu.Unlock() calls++ // Not visible on the first read; visible thereafter. return &ping{ID: "eb1"}, calls > 1 }) assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch)) } func TestSubmit_neverReady_skipsAndReports(t *testing.T) { obs := &recordObserver{} r := New[ping](quiet(), WithObserver(obs), WithReadRetry(2, time.Millisecond)) defer r.Close() ch, cleanup, _ := r.AddReceiver("c1") defer cleanup() r.Submit("c1", never()) assert.Eventually(t, func() bool { return obs.skippedCount() == 1 }, time.Second, 5*time.Millisecond) assert.Equal(t, 0, obs.pushedCount()) assertNoPush(t, ch) } func TestObserver_pushedOnDelivery(t *testing.T) { obs := &recordObserver{} r := New[ping](quiet(), WithObserver(obs)) defer r.Close() ch, cleanup, _ := r.AddReceiver("c1") defer cleanup() r.Submit("c1", ready(&ping{ID: "eb1"})) recv(t, ch) assert.Eventually(t, func() bool { return obs.pushedCount() == 1 }, time.Second, 5*time.Millisecond) } func TestPush_allSubscribersOfKey(t *testing.T) { r := New[ping](quiet()) defer r.Close() chA, cleanupA, _ := r.AddReceiver("c1") defer cleanupA() chB, cleanupB, _ := r.AddReceiver("c1") defer cleanupB() r.Submit("c1", ready(&ping{ID: "eb1"})) assert.Equal(t, "eb1", recv(t, chA).ID) assert.Equal(t, "eb1", recv(t, chB).ID) } func TestSubmit_keysIsolated(t *testing.T) { r := New[ping](quiet()) defer r.Close() chA, cleanupA, _ := r.AddReceiver("c1") defer cleanupA() chB, cleanupB, _ := r.AddReceiver("c2") defer cleanupB() r.Submit("c1", ready(&ping{ID: "eb1"})) assert.Equal(t, "eb1", recv(t, chA).ID) assertNoPush(t, chB) } // TestPerKeyOrdering asserts that events for one key are delivered FIFO even // across the worker pool (same key → same shard → one worker). This guards the // ordering guarantee a payload the client consumes directly relies on. func TestPerKeyOrdering(t *testing.T) { r := New[ping](quiet(), WithWorkers(4)) defer r.Close() ch, cleanup, _ := r.AddReceiver("c1") defer cleanup() const n = 10 for i := range n { r.Submit("c1", ready(&ping{ID: fmt.Sprintf("e%02d", i)})) } for i := range n { assert.Equal(t, fmt.Sprintf("e%02d", i), recv(t, ch).ID) } } func TestAddReceiver_emptyKeyErrors(t *testing.T) { r := New[ping](quiet()) defer r.Close() ch, cleanup, err := r.AddReceiver("") assert.ErrorIs(t, err, ErrEmptyKey) assert.Nil(t, ch) assert.Nil(t, cleanup) } func TestRemoveReceiver_closesChannel(t *testing.T) { r := New[ping](quiet()) defer r.Close() ch, cleanup, _ := r.AddReceiver("c1") cleanup() // Draining a closed channel terminates the range/returns ok=false. _, ok := <-ch assert.False(t, ok) } func TestRemoveReceiver_idempotent(t *testing.T) { r := New[ping](quiet()) defer r.Close() _, cleanup, _ := r.AddReceiver("c1") cleanup() assert.NotPanics(t, cleanup) // second call is a no-op } func TestChannelFull_dropsAndReports(t *testing.T) { obs := &recordObserver{} // One worker so the two pushes are serialized; buffer 1 so the second drops. r := New[ping](quiet(), WithObserver(obs), WithBufferSize(1), WithWorkers(1)) defer r.Close() _, cleanup, _ := r.AddReceiver("c1") // never read from the channel defer cleanup() r.Submit("c1", ready(&ping{ID: "first"})) r.Submit("c1", ready(&ping{ID: "second"})) assert.Eventually(t, func() bool { return obs.channelFullCount() == 1 }, time.Second, 5*time.Millisecond) } func TestSubmit_queueFull_dropsAndReports(t *testing.T) { obs := &recordObserver{} started := make(chan struct{}) release := make(chan struct{}) r := New[ping](quiet(), WithObserver(obs), WithWorkers(1), WithQueueSize(1), WithReadRetry(0, time.Millisecond)) defer r.Close() // runs last defer close(release) // runs before Close, unblocking the worker _, cleanup, _ := r.AddReceiver("c1") defer cleanup() var once sync.Once r.Submit("c1", func(context.Context) (*ping, bool) { once.Do(func() { close(started) }) <-release return &ping{ID: "blocking"}, true }) <-started // the single worker is now blocked in produce; its shard queue is empty r.Submit("c1", ready(&ping{ID: "fills-queue"})) // occupies the cap-1 shard queue r.Submit("c1", ready(&ping{ID: "dropped"})) // queue full → dropped assert.Eventually(t, func() bool { return obs.droppedCount() >= 1 }, time.Second, 5*time.Millisecond) } func TestClose_isIdempotentAndSubmitAfterCloseIsNoop(t *testing.T) { r := New[ping](quiet()) r.Close() r.Close() // idempotent assert.NotPanics(t, func() { r.Submit("c1", ready(&ping{ID: "x"})) }) } // TestClose_cancelsInFlightGate asserts Close returns promptly even while a // worker is mid-gate, rather than blocking for the full retry budget (~5s by // default). Guards the registry-level context cancellation. func TestClose_cancelsInFlightGate(t *testing.T) { started := make(chan struct{}) r := New[ping](quiet()) // default retry budget ≈ 5s _, cleanup, _ := r.AddReceiver("c1") defer cleanup() var once sync.Once r.Submit("c1", func(context.Context) (*ping, bool) { once.Do(func() { close(started) }) return nil, false // never ready → worker stays in the gate loop }) <-started // worker is now in the gate loop done := make(chan struct{}) go func() { r.Close(); close(done) }() select { case <-done: case <-time.After(2 * time.Second): t.Fatal("Close did not return promptly while a worker was gating") } } // TestConcurrentChurn stresses the race between removeReceiver (which closes a // subscriber channel under the write lock) and a worker push (which sends under // the read lock). Before sends were moved under the read lock this panicked // with "send on closed channel". Run with -race. func TestConcurrentChurn(t *testing.T) { r := New[ping](quiet(), WithReadRetry(0, time.Millisecond), WithWorkers(8)) defer r.Close() // A long-lived reader so there is always at least one subscriber being // pushed to while others churn. _, steadyCleanup, _ := r.AddReceiver("c1") defer steadyCleanup() go func() { ch, _, _ := r.AddReceiver("c1") for range ch { //nolint:revive // drain } }() var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for range 2000 { _, cleanup, err := r.AddReceiver("c1") if err != nil { t.Errorf("AddReceiver: %v", err) return } cleanup() } }() wg.Add(1) go func() { defer wg.Done() for range 2000 { r.Submit("c1", ready(&ping{ID: "x"})) } }() wg.Wait() }