384 lines
9.5 KiB
Go
384 lines
9.5 KiB
Go
|
|
package subscriptions
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"io"
|
||
|
|
"log/slog"
|
||
|
|
"sync"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/stretchr/testify/assert"
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
)
|
||
|
|
|
||
|
|
type ping struct {
|
||
|
|
ID string
|
||
|
|
}
|
||
|
|
|
||
|
|
func quiet() Option {
|
||
|
|
return WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil)))
|
||
|
|
}
|
||
|
|
|
||
|
|
// recordObserver records the metric callbacks; safe for concurrent use.
|
||
|
|
type recordObserver struct {
|
||
|
|
mu sync.Mutex
|
||
|
|
pushed []string
|
||
|
|
skipped []string
|
||
|
|
dropped []string
|
||
|
|
channelFull []string
|
||
|
|
}
|
||
|
|
|
||
|
|
func (o *recordObserver) Pushed(k string) { o.add(&o.pushed, k) }
|
||
|
|
func (o *recordObserver) PushSkipped(k string) { o.add(&o.skipped, k) }
|
||
|
|
func (o *recordObserver) Dropped(k string) { o.add(&o.dropped, k) }
|
||
|
|
func (o *recordObserver) ChannelFull(k string) { o.add(&o.channelFull, k) }
|
||
|
|
|
||
|
|
func (o *recordObserver) add(dst *[]string, k string) {
|
||
|
|
o.mu.Lock()
|
||
|
|
defer o.mu.Unlock()
|
||
|
|
*dst = append(*dst, k)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (o *recordObserver) count(get func(*recordObserver) []string) int {
|
||
|
|
o.mu.Lock()
|
||
|
|
defer o.mu.Unlock()
|
||
|
|
return len(get(o))
|
||
|
|
}
|
||
|
|
|
||
|
|
func (o *recordObserver) pushedCount() int {
|
||
|
|
return o.count(func(r *recordObserver) []string { return r.pushed })
|
||
|
|
}
|
||
|
|
|
||
|
|
func (o *recordObserver) skippedCount() int {
|
||
|
|
return o.count(func(r *recordObserver) []string { return r.skipped })
|
||
|
|
}
|
||
|
|
|
||
|
|
func (o *recordObserver) droppedCount() int {
|
||
|
|
return o.count(func(r *recordObserver) []string { return r.dropped })
|
||
|
|
}
|
||
|
|
|
||
|
|
func (o *recordObserver) channelFullCount() int {
|
||
|
|
return o.count(func(r *recordObserver) []string { return r.channelFull })
|
||
|
|
}
|
||
|
|
|
||
|
|
func ready(p *ping) Producer[ping] {
|
||
|
|
return func(context.Context) (*ping, bool) { return p, true }
|
||
|
|
}
|
||
|
|
|
||
|
|
func never() Producer[ping] {
|
||
|
|
return func(context.Context) (*ping, bool) { return nil, false }
|
||
|
|
}
|
||
|
|
|
||
|
|
func recv(t *testing.T, ch <-chan *ping) *ping {
|
||
|
|
t.Helper()
|
||
|
|
select {
|
||
|
|
case v := <-ch:
|
||
|
|
return v
|
||
|
|
case <-time.After(2 * time.Second):
|
||
|
|
t.Fatal("timed out waiting for a push")
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func assertNoPush(t *testing.T, ch <-chan *ping) {
|
||
|
|
t.Helper()
|
||
|
|
select {
|
||
|
|
case v := <-ch:
|
||
|
|
t.Fatalf("unexpected push: %+v", v)
|
||
|
|
case <-time.After(100 * time.Millisecond):
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSubmit_pushesWhenReady(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
ch, cleanup, err := r.AddReceiver("c1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
||
|
|
|
||
|
|
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSubmit_noSubscribers_doesNotEnqueue(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
var calls int
|
||
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
||
|
|
calls++
|
||
|
|
return &ping{ID: "x"}, true
|
||
|
|
})
|
||
|
|
|
||
|
|
// No subscribers → Submit returns on the fast path without enqueuing, so the
|
||
|
|
// producer is never invoked.
|
||
|
|
assert.Equal(t, 0, calls)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSubmit_emptyKey_isNoop(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
var calls int
|
||
|
|
r.Submit("", func(context.Context) (*ping, bool) {
|
||
|
|
calls++
|
||
|
|
return nil, true
|
||
|
|
})
|
||
|
|
assert.Equal(t, 0, calls)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSubmit_lagThenReady(t *testing.T) {
|
||
|
|
r := New[ping](quiet(), WithReadRetry(50, time.Millisecond))
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
var mu sync.Mutex
|
||
|
|
calls := 0
|
||
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
||
|
|
mu.Lock()
|
||
|
|
defer mu.Unlock()
|
||
|
|
calls++
|
||
|
|
// Not visible on the first read; visible thereafter.
|
||
|
|
return &ping{ID: "eb1"}, calls > 1
|
||
|
|
})
|
||
|
|
|
||
|
|
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSubmit_neverReady_skipsAndReports(t *testing.T) {
|
||
|
|
obs := &recordObserver{}
|
||
|
|
r := New[ping](quiet(), WithObserver(obs), WithReadRetry(2, time.Millisecond))
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
r.Submit("c1", never())
|
||
|
|
|
||
|
|
assert.Eventually(t, func() bool { return obs.skippedCount() == 1 }, time.Second, 5*time.Millisecond)
|
||
|
|
assert.Equal(t, 0, obs.pushedCount())
|
||
|
|
assertNoPush(t, ch)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestObserver_pushedOnDelivery(t *testing.T) {
|
||
|
|
obs := &recordObserver{}
|
||
|
|
r := New[ping](quiet(), WithObserver(obs))
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
||
|
|
recv(t, ch)
|
||
|
|
|
||
|
|
assert.Eventually(t, func() bool { return obs.pushedCount() == 1 }, time.Second, 5*time.Millisecond)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPush_allSubscribersOfKey(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
chA, cleanupA, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanupA()
|
||
|
|
chB, cleanupB, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanupB()
|
||
|
|
|
||
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
||
|
|
|
||
|
|
assert.Equal(t, "eb1", recv(t, chA).ID)
|
||
|
|
assert.Equal(t, "eb1", recv(t, chB).ID)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSubmit_keysIsolated(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
chA, cleanupA, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanupA()
|
||
|
|
chB, cleanupB, _ := r.AddReceiver("c2")
|
||
|
|
defer cleanupB()
|
||
|
|
|
||
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
||
|
|
|
||
|
|
assert.Equal(t, "eb1", recv(t, chA).ID)
|
||
|
|
assertNoPush(t, chB)
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestPerKeyOrdering asserts that events for one key are delivered FIFO even
|
||
|
|
// across the worker pool (same key → same shard → one worker). This guards the
|
||
|
|
// ordering guarantee a payload the client consumes directly relies on.
|
||
|
|
func TestPerKeyOrdering(t *testing.T) {
|
||
|
|
r := New[ping](quiet(), WithWorkers(4))
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
const n = 10
|
||
|
|
for i := range n {
|
||
|
|
r.Submit("c1", ready(&ping{ID: fmt.Sprintf("e%02d", i)}))
|
||
|
|
}
|
||
|
|
for i := range n {
|
||
|
|
assert.Equal(t, fmt.Sprintf("e%02d", i), recv(t, ch).ID)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestAddReceiver_emptyKeyErrors(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
ch, cleanup, err := r.AddReceiver("")
|
||
|
|
assert.ErrorIs(t, err, ErrEmptyKey)
|
||
|
|
assert.Nil(t, ch)
|
||
|
|
assert.Nil(t, cleanup)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRemoveReceiver_closesChannel(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
cleanup()
|
||
|
|
|
||
|
|
// Draining a closed channel terminates the range/returns ok=false.
|
||
|
|
_, ok := <-ch
|
||
|
|
assert.False(t, ok)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRemoveReceiver_idempotent(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
_, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
cleanup()
|
||
|
|
assert.NotPanics(t, cleanup) // second call is a no-op
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestChannelFull_dropsAndReports(t *testing.T) {
|
||
|
|
obs := &recordObserver{}
|
||
|
|
// One worker so the two pushes are serialized; buffer 1 so the second drops.
|
||
|
|
r := New[ping](quiet(), WithObserver(obs), WithBufferSize(1), WithWorkers(1))
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
_, cleanup, _ := r.AddReceiver("c1") // never read from the channel
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
r.Submit("c1", ready(&ping{ID: "first"}))
|
||
|
|
r.Submit("c1", ready(&ping{ID: "second"}))
|
||
|
|
|
||
|
|
assert.Eventually(t, func() bool { return obs.channelFullCount() == 1 }, time.Second, 5*time.Millisecond)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestSubmit_queueFull_dropsAndReports(t *testing.T) {
|
||
|
|
obs := &recordObserver{}
|
||
|
|
started := make(chan struct{})
|
||
|
|
release := make(chan struct{})
|
||
|
|
|
||
|
|
r := New[ping](quiet(), WithObserver(obs), WithWorkers(1), WithQueueSize(1), WithReadRetry(0, time.Millisecond))
|
||
|
|
defer r.Close() // runs last
|
||
|
|
defer close(release) // runs before Close, unblocking the worker
|
||
|
|
|
||
|
|
_, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
var once sync.Once
|
||
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
||
|
|
once.Do(func() { close(started) })
|
||
|
|
<-release
|
||
|
|
return &ping{ID: "blocking"}, true
|
||
|
|
})
|
||
|
|
<-started // the single worker is now blocked in produce; its shard queue is empty
|
||
|
|
|
||
|
|
r.Submit("c1", ready(&ping{ID: "fills-queue"})) // occupies the cap-1 shard queue
|
||
|
|
r.Submit("c1", ready(&ping{ID: "dropped"})) // queue full → dropped
|
||
|
|
|
||
|
|
assert.Eventually(t, func() bool { return obs.droppedCount() >= 1 }, time.Second, 5*time.Millisecond)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestClose_isIdempotentAndSubmitAfterCloseIsNoop(t *testing.T) {
|
||
|
|
r := New[ping](quiet())
|
||
|
|
r.Close()
|
||
|
|
r.Close() // idempotent
|
||
|
|
|
||
|
|
assert.NotPanics(t, func() {
|
||
|
|
r.Submit("c1", ready(&ping{ID: "x"}))
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestClose_cancelsInFlightGate asserts Close returns promptly even while a
|
||
|
|
// worker is mid-gate, rather than blocking for the full retry budget (~5s by
|
||
|
|
// default). Guards the registry-level context cancellation.
|
||
|
|
func TestClose_cancelsInFlightGate(t *testing.T) {
|
||
|
|
started := make(chan struct{})
|
||
|
|
r := New[ping](quiet()) // default retry budget ≈ 5s
|
||
|
|
|
||
|
|
_, cleanup, _ := r.AddReceiver("c1")
|
||
|
|
defer cleanup()
|
||
|
|
|
||
|
|
var once sync.Once
|
||
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
||
|
|
once.Do(func() { close(started) })
|
||
|
|
return nil, false // never ready → worker stays in the gate loop
|
||
|
|
})
|
||
|
|
<-started // worker is now in the gate loop
|
||
|
|
|
||
|
|
done := make(chan struct{})
|
||
|
|
go func() { r.Close(); close(done) }()
|
||
|
|
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
case <-time.After(2 * time.Second):
|
||
|
|
t.Fatal("Close did not return promptly while a worker was gating")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// TestConcurrentChurn stresses the race between removeReceiver (which closes a
|
||
|
|
// subscriber channel under the write lock) and a worker push (which sends under
|
||
|
|
// the read lock). Before sends were moved under the read lock this panicked
|
||
|
|
// with "send on closed channel". Run with -race.
|
||
|
|
func TestConcurrentChurn(t *testing.T) {
|
||
|
|
r := New[ping](quiet(), WithReadRetry(0, time.Millisecond), WithWorkers(8))
|
||
|
|
defer r.Close()
|
||
|
|
|
||
|
|
// A long-lived reader so there is always at least one subscriber being
|
||
|
|
// pushed to while others churn.
|
||
|
|
_, steadyCleanup, _ := r.AddReceiver("c1")
|
||
|
|
defer steadyCleanup()
|
||
|
|
go func() {
|
||
|
|
ch, _, _ := r.AddReceiver("c1")
|
||
|
|
for range ch { //nolint:revive // drain
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
var wg sync.WaitGroup
|
||
|
|
|
||
|
|
wg.Add(1)
|
||
|
|
go func() {
|
||
|
|
defer wg.Done()
|
||
|
|
for range 2000 {
|
||
|
|
_, cleanup, err := r.AddReceiver("c1")
|
||
|
|
if err != nil {
|
||
|
|
t.Errorf("AddReceiver: %v", err)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
cleanup()
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
wg.Add(1)
|
||
|
|
go func() {
|
||
|
|
defer wg.Done()
|
||
|
|
for range 2000 {
|
||
|
|
r.Submit("c1", ready(&ping{ID: "x"}))
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
wg.Wait()
|
||
|
|
}
|