package graph import ( "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitea.unbound.se/unboundsoftware/schemas/graph/model" ) func TestPubSub_SubscribeAndPublish(t *testing.T) { ps := NewPubSub() ref := "Test@dev" // Subscribe ch := ps.Subscribe(ref) require.NotNil(t, ch, "Subscribe should return a channel") // Publish update := &model.SchemaUpdate{ Ref: ref, ID: "test-id-1", SubGraphs: []*model.SubGraph{ { ID: "sg1", Service: "test-service", Sdl: "type Query { test: String }", }, }, } go ps.Publish(ref, update) // Receive select { case received := <-ch: assert.Equal(t, update.Ref, received.Ref, "Ref should match") assert.Equal(t, update.ID, received.ID, "ID should match") assert.Equal(t, len(update.SubGraphs), len(received.SubGraphs), "SubGraphs count should match") case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for published update") } } func TestPubSub_MultipleSubscribers(t *testing.T) { ps := NewPubSub() ref := "Test@dev" // Create multiple subscribers ch1 := ps.Subscribe(ref) ch2 := ps.Subscribe(ref) ch3 := ps.Subscribe(ref) update := &model.SchemaUpdate{ Ref: ref, ID: "test-id-2", } // Publish once ps.Publish(ref, update) // All subscribers should receive the update var wg sync.WaitGroup wg.Add(3) checkReceived := func(ch <-chan *model.SchemaUpdate, name string) { defer wg.Done() select { case received := <-ch: assert.Equal(t, update.ID, received.ID, "%s should receive correct update", name) case <-time.After(1 * time.Second): t.Errorf("%s: Timeout waiting for update", name) } } go checkReceived(ch1, "Subscriber 1") go checkReceived(ch2, "Subscriber 2") go checkReceived(ch3, "Subscriber 3") wg.Wait() } func TestPubSub_DifferentRefs(t *testing.T) { ps := NewPubSub() ref1 := "Test1@dev" ref2 := "Test2@dev" ch1 := ps.Subscribe(ref1) ch2 := ps.Subscribe(ref2) update1 := &model.SchemaUpdate{Ref: ref1, ID: "id1"} update2 := &model.SchemaUpdate{Ref: ref2, ID: "id2"} // Publish to ref1 ps.Publish(ref1, update1) // Only ch1 should receive select { case received := <-ch1: assert.Equal(t, "id1", received.ID) case <-time.After(100 * time.Millisecond): t.Fatal("ch1 should have received update") } // ch2 should not receive ref1's update select { case <-ch2: t.Fatal("ch2 should not receive ref1's update") case <-time.After(100 * time.Millisecond): // Expected - no update } // Publish to ref2 ps.Publish(ref2, update2) // Now ch2 should receive select { case received := <-ch2: assert.Equal(t, "id2", received.ID) case <-time.After(100 * time.Millisecond): t.Fatal("ch2 should have received update") } } func TestPubSub_Unsubscribe(t *testing.T) { ps := NewPubSub() ref := "Test@dev" ch := ps.Subscribe(ref) // Unsubscribe ps.Unsubscribe(ref, ch) // Channel should be closed _, ok := <-ch assert.False(t, ok, "Channel should be closed after unsubscribe") // Publishing after unsubscribe should not panic assert.NotPanics(t, func() { ps.Publish(ref, &model.SchemaUpdate{Ref: ref}) }) } func TestPubSub_BufferedChannel(t *testing.T) { ps := NewPubSub() ref := "Test@dev" ch := ps.Subscribe(ref) // Publish multiple updates quickly (up to buffer size of 10) for i := 0; i < 10; i++ { update := &model.SchemaUpdate{ Ref: ref, ID: string(rune('a' + i)), } ps.Publish(ref, update) } // All 10 should be buffered and receivable received := 0 timeout := time.After(1 * time.Second) for received < 10 { select { case <-ch: received++ case <-timeout: t.Fatalf("Only received %d out of 10 updates", received) } } assert.Equal(t, 10, received, "Should receive all buffered updates") } func TestPubSub_SlowSubscriber(t *testing.T) { ps := NewPubSub() ref := "Test@dev" ch := ps.Subscribe(ref) // Fill the buffer (10 items) for i := 0; i < 10; i++ { ps.Publish(ref, &model.SchemaUpdate{Ref: ref}) } // Publish one more - this should be dropped (channel full, non-blocking send) ps.Publish(ref, &model.SchemaUpdate{Ref: ref, ID: "should-be-dropped"}) // Drain the channel count := 0 timeout := time.After(500 * time.Millisecond) drainLoop: for { select { case update := <-ch: count++ // Should not receive the dropped update assert.NotEqual(t, "should-be-dropped", update.ID, "Should not receive dropped update") case <-timeout: break drainLoop } } // Should have received exactly 10 (the buffer size), not 11 assert.Equal(t, 10, count, "Should only receive buffered updates, not the dropped one") } func TestPubSub_ConcurrentPublish(t *testing.T) { ps := NewPubSub() ref := "Test@dev" ch := ps.Subscribe(ref) numPublishers := 10 updatesPerPublisher := 10 var wg sync.WaitGroup wg.Add(numPublishers) // Multiple goroutines publishing concurrently for i := 0; i < numPublishers; i++ { go func(publisherID int) { defer wg.Done() for j := 0; j < updatesPerPublisher; j++ { ps.Publish(ref, &model.SchemaUpdate{ Ref: ref, ID: string(rune('a' + publisherID)), }) } }(i) } wg.Wait() // Should not panic and subscriber should receive updates // (exact count may vary due to buffer and timing) timeout := time.After(1 * time.Second) received := 0 receiveLoop: for { select { case <-ch: received++ case <-timeout: break receiveLoop } } assert.Greater(t, received, 0, "Should have received some updates") }