Files
schemas/graph/pubsub_test.go
T
argoyle 80daed081d feat: add Cosmo Router config generation and PubSub support
Creates a new `GenerateCosmoRouterConfig` function to build and 
serialize a Cosmo Router configuration from subgraphs. Implements 
PubSub mechanism for managing schema updates, allowing 
subscription to updates. Adds Subscription resolver and updates 
existing structures to accommodate new functionalities. This 
enhances the system's capabilities for dynamic updates and 
configuration management.
2025-11-19 11:29:30 +01:00

257 lines
5.4 KiB
Go

package graph
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/unboundsoftware/schemas/graph/model"
)
func TestPubSub_SubscribeAndPublish(t *testing.T) {
ps := NewPubSub()
ref := "Test@dev"
// Subscribe
ch := ps.Subscribe(ref)
require.NotNil(t, ch, "Subscribe should return a channel")
// Publish
update := &model.SchemaUpdate{
Ref: ref,
ID: "test-id-1",
SubGraphs: []*model.SubGraph{
{
ID: "sg1",
Service: "test-service",
Sdl: "type Query { test: String }",
},
},
}
go ps.Publish(ref, update)
// Receive
select {
case received := <-ch:
assert.Equal(t, update.Ref, received.Ref, "Ref should match")
assert.Equal(t, update.ID, received.ID, "ID should match")
assert.Equal(t, len(update.SubGraphs), len(received.SubGraphs), "SubGraphs count should match")
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for published update")
}
}
func TestPubSub_MultipleSubscribers(t *testing.T) {
ps := NewPubSub()
ref := "Test@dev"
// Create multiple subscribers
ch1 := ps.Subscribe(ref)
ch2 := ps.Subscribe(ref)
ch3 := ps.Subscribe(ref)
update := &model.SchemaUpdate{
Ref: ref,
ID: "test-id-2",
}
// Publish once
ps.Publish(ref, update)
// All subscribers should receive the update
var wg sync.WaitGroup
wg.Add(3)
checkReceived := func(ch <-chan *model.SchemaUpdate, name string) {
defer wg.Done()
select {
case received := <-ch:
assert.Equal(t, update.ID, received.ID, "%s should receive correct update", name)
case <-time.After(1 * time.Second):
t.Errorf("%s: Timeout waiting for update", name)
}
}
go checkReceived(ch1, "Subscriber 1")
go checkReceived(ch2, "Subscriber 2")
go checkReceived(ch3, "Subscriber 3")
wg.Wait()
}
func TestPubSub_DifferentRefs(t *testing.T) {
ps := NewPubSub()
ref1 := "Test1@dev"
ref2 := "Test2@dev"
ch1 := ps.Subscribe(ref1)
ch2 := ps.Subscribe(ref2)
update1 := &model.SchemaUpdate{Ref: ref1, ID: "id1"}
update2 := &model.SchemaUpdate{Ref: ref2, ID: "id2"}
// Publish to ref1
ps.Publish(ref1, update1)
// Only ch1 should receive
select {
case received := <-ch1:
assert.Equal(t, "id1", received.ID)
case <-time.After(100 * time.Millisecond):
t.Fatal("ch1 should have received update")
}
// ch2 should not receive ref1's update
select {
case <-ch2:
t.Fatal("ch2 should not receive ref1's update")
case <-time.After(100 * time.Millisecond):
// Expected - no update
}
// Publish to ref2
ps.Publish(ref2, update2)
// Now ch2 should receive
select {
case received := <-ch2:
assert.Equal(t, "id2", received.ID)
case <-time.After(100 * time.Millisecond):
t.Fatal("ch2 should have received update")
}
}
func TestPubSub_Unsubscribe(t *testing.T) {
ps := NewPubSub()
ref := "Test@dev"
ch := ps.Subscribe(ref)
// Unsubscribe
ps.Unsubscribe(ref, ch)
// Channel should be closed
_, ok := <-ch
assert.False(t, ok, "Channel should be closed after unsubscribe")
// Publishing after unsubscribe should not panic
assert.NotPanics(t, func() {
ps.Publish(ref, &model.SchemaUpdate{Ref: ref})
})
}
func TestPubSub_BufferedChannel(t *testing.T) {
ps := NewPubSub()
ref := "Test@dev"
ch := ps.Subscribe(ref)
// Publish multiple updates quickly (up to buffer size of 10)
for i := 0; i < 10; i++ {
update := &model.SchemaUpdate{
Ref: ref,
ID: string(rune('a' + i)),
}
ps.Publish(ref, update)
}
// All 10 should be buffered and receivable
received := 0
timeout := time.After(1 * time.Second)
for received < 10 {
select {
case <-ch:
received++
case <-timeout:
t.Fatalf("Only received %d out of 10 updates", received)
}
}
assert.Equal(t, 10, received, "Should receive all buffered updates")
}
func TestPubSub_SlowSubscriber(t *testing.T) {
ps := NewPubSub()
ref := "Test@dev"
ch := ps.Subscribe(ref)
// Fill the buffer (10 items)
for i := 0; i < 10; i++ {
ps.Publish(ref, &model.SchemaUpdate{Ref: ref})
}
// Publish one more - this should be dropped (channel full, non-blocking send)
ps.Publish(ref, &model.SchemaUpdate{Ref: ref, ID: "should-be-dropped"})
// Drain the channel
count := 0
timeout := time.After(500 * time.Millisecond)
drainLoop:
for {
select {
case update := <-ch:
count++
// Should not receive the dropped update
assert.NotEqual(t, "should-be-dropped", update.ID, "Should not receive dropped update")
case <-timeout:
break drainLoop
}
}
// Should have received exactly 10 (the buffer size), not 11
assert.Equal(t, 10, count, "Should only receive buffered updates, not the dropped one")
}
func TestPubSub_ConcurrentPublish(t *testing.T) {
ps := NewPubSub()
ref := "Test@dev"
ch := ps.Subscribe(ref)
numPublishers := 10
updatesPerPublisher := 10
var wg sync.WaitGroup
wg.Add(numPublishers)
// Multiple goroutines publishing concurrently
for i := 0; i < numPublishers; i++ {
go func(publisherID int) {
defer wg.Done()
for j := 0; j < updatesPerPublisher; j++ {
ps.Publish(ref, &model.SchemaUpdate{
Ref: ref,
ID: string(rune('a' + publisherID)),
})
}
}(i)
}
wg.Wait()
// Should not panic and subscriber should receive updates
// (exact count may vary due to buffer and timing)
timeout := time.After(1 * time.Second)
received := 0
receiveLoop:
for {
select {
case <-ch:
received++
case <-timeout:
break receiveLoop
}
}
assert.Greater(t, received, 0, "Should have received some updates")
}