diff --git a/cmd/service/service.go b/cmd/service/service.go index d190b69..8d99d70 100644 --- a/cmd/service/service.go +++ b/cmd/service/service.go @@ -201,11 +201,13 @@ func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(u defer rootCancel() resolver := &graph.Resolver{ - EventStore: eventStore, - Publisher: eventPublisher, - Logger: logger, - Cache: serviceCache, - PubSub: graph.NewPubSub(), + EventStore: eventStore, + Publisher: eventPublisher, + Logger: logger, + Cache: serviceCache, + PubSub: graph.NewPubSub(), + CosmoGenerator: graph.NewCosmoGenerator(&graph.DefaultCommandExecutor{}, 60*time.Second), + Debouncer: graph.NewDebouncer(500 * time.Millisecond), } config := generated.Config{ diff --git a/go.mod b/go.mod index 717d10b..27d4530 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.40.0 go.opentelemetry.io/otel/trace v1.40.0 golang.org/x/crypto v0.48.0 + golang.org/x/sync v0.19.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -80,7 +81,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.33.0 // indirect golang.org/x/net v0.50.0 // indirect - golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect golang.org/x/tools v0.42.0 // indirect diff --git a/graph/cosmo.go b/graph/cosmo.go index d2e435b..7f04893 100644 --- a/graph/cosmo.go +++ b/graph/cosmo.go @@ -1,11 +1,14 @@ package graph import ( + "context" "fmt" "os" "os/exec" "path/filepath" + "time" + "golang.org/x/sync/semaphore" "gopkg.in/yaml.v3" "gitea.unbound.se/unboundsoftware/schemas/graph/model" @@ -123,3 +126,36 @@ func GenerateCosmoRouterConfigWithExecutor(subGraphs []*model.SubGraph, executor return string(configJSON), nil } + +// CosmoGenerator wraps config generation with a concurrency limit and timeout +// to prevent unbounded wgc process spawning under rapid schema updates. +type CosmoGenerator struct { + sem *semaphore.Weighted + executor CommandExecutor + timeout time.Duration +} + +// NewCosmoGenerator creates a CosmoGenerator that allows at most one concurrent +// wgc process and applies the given timeout to each generation attempt. +func NewCosmoGenerator(executor CommandExecutor, timeout time.Duration) *CosmoGenerator { + return &CosmoGenerator{ + sem: semaphore.NewWeighted(1), + executor: executor, + timeout: timeout, + } +} + +// Generate produces a Cosmo Router config, blocking if another generation is +// already in progress. The provided context (plus the configured timeout) +// controls cancellation. +func (g *CosmoGenerator) Generate(ctx context.Context, subGraphs []*model.SubGraph) (string, error) { + ctx, cancel := context.WithTimeout(ctx, g.timeout) + defer cancel() + + if err := g.sem.Acquire(ctx, 1); err != nil { + return "", fmt.Errorf("acquire cosmo generator: %w", err) + } + defer g.sem.Release(1) + + return GenerateCosmoRouterConfigWithExecutor(subGraphs, g.executor) +} diff --git a/graph/cosmo_test.go b/graph/cosmo_test.go index bb24ab1..aa6d8e6 100644 --- a/graph/cosmo_test.go +++ b/graph/cosmo_test.go @@ -1,11 +1,15 @@ package graph import ( + "context" "encoding/json" "fmt" "os" "strings" + "sync" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -459,6 +463,114 @@ func TestGenerateCosmoRouterConfig_MockError(t *testing.T) { assert.Equal(t, 1, mockExecutor.CallCount, "Should have attempted to call executor") } +// SlowMockExecutor simulates a slow wgc command for concurrency testing. +type SlowMockExecutor struct { + MockCommandExecutor + delay time.Duration + mu sync.Mutex + concurrent atomic.Int32 + maxSeen atomic.Int32 +} + +func (m *SlowMockExecutor) Execute(name string, args ...string) ([]byte, error) { + cur := m.concurrent.Add(1) + // Track the maximum concurrent executions observed. + for { + old := m.maxSeen.Load() + if cur <= old || m.maxSeen.CompareAndSwap(old, cur) { + break + } + } + defer m.concurrent.Add(-1) + + time.Sleep(m.delay) + + m.mu.Lock() + defer m.mu.Unlock() + return m.MockCommandExecutor.Execute(name, args...) +} + +func TestCosmoGenerator_ConcurrencyLimit(t *testing.T) { + executor := &SlowMockExecutor{delay: 100 * time.Millisecond} + gen := NewCosmoGenerator(executor, 5*time.Second) + + subGraphs := []*model.SubGraph{ + { + Service: "svc", + URL: stringPtr("http://localhost:4001/query"), + Sdl: "type Query { hello: String }", + }, + } + + var wg sync.WaitGroup + for range 5 { + wg.Add(1) + go func() { + defer wg.Done() + _, _ = gen.Generate(context.Background(), subGraphs) + }() + } + wg.Wait() + + assert.Equal(t, int32(1), executor.maxSeen.Load(), + "at most 1 wgc process should run concurrently") +} + +func TestCosmoGenerator_Timeout(t *testing.T) { + // Executor that takes longer than the timeout. + executor := &SlowMockExecutor{delay: 500 * time.Millisecond} + gen := NewCosmoGenerator(executor, 50*time.Millisecond) + + subGraphs := []*model.SubGraph{ + { + Service: "svc", + URL: stringPtr("http://localhost:4001/query"), + Sdl: "type Query { hello: String }", + }, + } + + // First call: occupies the semaphore for 500ms. + go func() { + _, _ = gen.Generate(context.Background(), subGraphs) + }() + + // Give the first goroutine time to acquire the semaphore. + time.Sleep(20 * time.Millisecond) + + // Second call: should timeout waiting for the semaphore. + _, err := gen.Generate(context.Background(), subGraphs) + require.Error(t, err) + assert.Contains(t, err.Error(), "acquire cosmo generator") +} + +func TestCosmoGenerator_ContextCancellation(t *testing.T) { + executor := &SlowMockExecutor{delay: 500 * time.Millisecond} + gen := NewCosmoGenerator(executor, 5*time.Second) + + subGraphs := []*model.SubGraph{ + { + Service: "svc", + URL: stringPtr("http://localhost:4001/query"), + Sdl: "type Query { hello: String }", + }, + } + + // First call: occupies the semaphore. + go func() { + _, _ = gen.Generate(context.Background(), subGraphs) + }() + + time.Sleep(20 * time.Millisecond) + + // Second call with an already-cancelled context. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := gen.Generate(ctx, subGraphs) + require.Error(t, err) + assert.Contains(t, err.Error(), "acquire cosmo generator") +} + // Helper function for tests func stringPtr(s string) *string { return &s diff --git a/graph/debounce.go b/graph/debounce.go new file mode 100644 index 0000000..0de0606 --- /dev/null +++ b/graph/debounce.go @@ -0,0 +1,42 @@ +package graph + +import ( + "sync" + "time" +) + +// Debouncer coalesces rapid calls with the same key, executing only the last +// one after a configurable delay. This prevents redundant work when multiple +// updates arrive in quick succession (e.g., rapid schema publishing). +type Debouncer struct { + mu sync.Mutex + delay time.Duration + timers map[string]*time.Timer +} + +// NewDebouncer creates a Debouncer with the given delay window. +func NewDebouncer(delay time.Duration) *Debouncer { + return &Debouncer{ + delay: delay, + timers: make(map[string]*time.Timer), + } +} + +// Debounce resets the timer for key. When the timer fires (after delay with no +// new calls for the same key), fn is executed in a new goroutine. +func (d *Debouncer) Debounce(key string, fn func()) { + d.mu.Lock() + defer d.mu.Unlock() + + if t, ok := d.timers[key]; ok { + t.Stop() + } + + d.timers[key] = time.AfterFunc(d.delay, func() { + d.mu.Lock() + delete(d.timers, key) + d.mu.Unlock() + + fn() + }) +} diff --git a/graph/debounce_test.go b/graph/debounce_test.go new file mode 100644 index 0000000..89b8187 --- /dev/null +++ b/graph/debounce_test.go @@ -0,0 +1,57 @@ +package graph + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDebouncer_Coalesces(t *testing.T) { + d := NewDebouncer(50 * time.Millisecond) + var calls atomic.Int32 + + // Fire 10 rapid calls for the same key — only the last should execute. + for range 10 { + d.Debounce("key1", func() { + calls.Add(1) + }) + } + + // Wait for the debounce delay plus some margin. + time.Sleep(150 * time.Millisecond) + + assert.Equal(t, int32(1), calls.Load(), "rapid calls should coalesce into a single execution") +} + +func TestDebouncer_DifferentKeys(t *testing.T) { + d := NewDebouncer(50 * time.Millisecond) + var calls atomic.Int32 + + d.Debounce("key-a", func() { calls.Add(1) }) + d.Debounce("key-b", func() { calls.Add(1) }) + d.Debounce("key-c", func() { calls.Add(1) }) + + time.Sleep(150 * time.Millisecond) + + assert.Equal(t, int32(3), calls.Load(), "different keys should fire independently") +} + +func TestDebouncer_TimerReset(t *testing.T) { + d := NewDebouncer(100 * time.Millisecond) + var value atomic.Int32 + + // First call sets value to 1. + d.Debounce("key", func() { value.Store(1) }) + + // Wait 60ms (less than the 100ms delay), then replace with value 2. + time.Sleep(60 * time.Millisecond) + d.Debounce("key", func() { value.Store(2) }) + + // At 60ms the first timer hasn't fired yet. Wait for the second timer. + time.Sleep(150 * time.Millisecond) + + require.Equal(t, int32(2), value.Load(), "later call should replace the earlier one") +} diff --git a/graph/resolver.go b/graph/resolver.go index b8f74e1..706be97 100644 --- a/graph/resolver.go +++ b/graph/resolver.go @@ -24,11 +24,13 @@ type Publisher interface { } type Resolver struct { - EventStore eventsourced.EventStore - Publisher Publisher - Logger *slog.Logger - Cache *cache.Cache - PubSub *PubSub + EventStore eventsourced.EventStore + Publisher Publisher + Logger *slog.Logger + Cache *cache.Cache + PubSub *PubSub + CosmoGenerator *CosmoGenerator + Debouncer *Debouncer } func (r *Resolver) apiKeyCanAccessRef(ctx context.Context, ref string, publish bool) (string, error) { diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 84bfb76..e9c302a 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -174,8 +174,9 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input return nil, err } - // Publish schema update to subscribers - go func() { + // Debounce schema update publishing so rapid successive updates for the + // same org+ref only trigger one config generation. + r.Debouncer.Debounce(orgId+":"+input.Ref, func() { services, lastUpdate := r.Cache.Services(orgId, input.Ref, "") r.Logger.Info("Publishing schema update after subgraph change", "ref", input.Ref, @@ -191,19 +192,11 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input r.Logger.Error("fetch subgraph for update notification", "error", err) continue } - subGraphs[i] = &model.SubGraph{ - ID: sg.ID.String(), - Service: sg.Service, - URL: sg.Url, - WsURL: sg.WSUrl, - Sdl: sg.Sdl, - ChangedBy: sg.ChangedBy, - ChangedAt: sg.ChangedAt, - } + subGraphs[i] = r.toGqlSubGraph(sg) } - // Generate Cosmo router config - cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs) + // Generate Cosmo router config (concurrency-limited) + cosmoConfig, err := r.CosmoGenerator.Generate(context.Background(), subGraphs) if err != nil { r.Logger.Error("generate cosmo config for update", "error", err) cosmoConfig = "" // Send empty if generation fails @@ -225,7 +218,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input ) r.PubSub.Publish(input.Ref, update) - }() + }) return r.toGqlSubGraph(subGraph), nil } @@ -292,30 +285,16 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str }, nil } subGraphs := make([]*model.SubGraph, len(services)) + serviceSDLs := make([]string, len(services)) for i, id := range services { sg, err := r.fetchSubGraph(ctx, id) if err != nil { return nil, err } - subGraphs[i] = &model.SubGraph{ - ID: sg.ID.String(), - Service: sg.Service, - URL: sg.Url, - WsURL: sg.WSUrl, - Sdl: sg.Sdl, - ChangedBy: sg.ChangedBy, - ChangedAt: sg.ChangedAt, - } + subGraphs[i] = r.toGqlSubGraph(sg) + serviceSDLs[i] = sg.Sdl } - var serviceSDLs []string - for _, id := range services { - sg, err := r.fetchSubGraph(ctx, id) - if err != nil { - return nil, err - } - serviceSDLs = append(serviceSDLs, sg.Sdl) - } sdl, err := sdlmerge.MergeSDLs(serviceSDLs...) if err != nil { return nil, err @@ -388,8 +367,8 @@ func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.Sc } } - // Generate Cosmo router config - cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs) + // Generate Cosmo router config (concurrency-limited) + cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs) if err != nil { r.Logger.Error("generate cosmo config", "error", err) cosmoConfig = "" // Return empty if generation fails @@ -432,9 +411,6 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (< // Send initial state immediately go func() { - // Use background context for async operation - bgCtx := context.Background() - services, lastUpdate := r.Cache.Services(orgId, ref, "") r.Logger.Info("Preparing initial schema update", "ref", ref, @@ -445,24 +421,16 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (< subGraphs := make([]*model.SubGraph, len(services)) for i, id := range services { - sg, err := r.fetchSubGraph(bgCtx, id) + sg, err := r.fetchSubGraph(ctx, id) if err != nil { r.Logger.Error("fetch subgraph for initial update", "error", err, "id", id) continue } - subGraphs[i] = &model.SubGraph{ - ID: sg.ID.String(), - Service: sg.Service, - URL: sg.Url, - WsURL: sg.WSUrl, - Sdl: sg.Sdl, - ChangedBy: sg.ChangedBy, - ChangedAt: sg.ChangedAt, - } + subGraphs[i] = r.toGqlSubGraph(sg) } - // Generate Cosmo router config - cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs) + // Generate Cosmo router config (concurrency-limited) + cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs) if err != nil { r.Logger.Error("generate cosmo config", "error", err) cosmoConfig = "" // Send empty if generation fails @@ -483,7 +451,11 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (< "cosmoConfigLength", len(cosmoConfig), ) - ch <- update + select { + case ch <- update: + case <-ctx.Done(): + return + } }() // Clean up subscription when context is done