fix: prevent OOM on rapid schema publishing
schemas / vulnerabilities (pull_request) Successful in 6m43s
schemas / check-release (pull_request) Successful in 11m27s
schemas / check (pull_request) Successful in 14m51s
pre-commit / pre-commit (pull_request) Successful in 19m39s
schemas / build (pull_request) Successful in 8m26s
schemas / deploy-prod (pull_request) Has been skipped

Add concurrency-limited CosmoGenerator (semaphore limit=1, 60s timeout)
to prevent unbounded concurrent wgc process spawning. Add debouncer
(500ms) to coalesce rapid schema updates per org+ref. Fix double
subgraph fetch in Supergraph resolver and goroutine leak in
SchemaUpdates subscription.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-23 08:05:47 +01:00
parent a9885f8b65
commit 28aa32ad8c
8 changed files with 283 additions and 60 deletions
+2
View File
@@ -206,6 +206,8 @@ func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(u
Logger: logger,
Cache: serviceCache,
PubSub: graph.NewPubSub(),
CosmoGenerator: graph.NewCosmoGenerator(&graph.DefaultCommandExecutor{}, 60*time.Second),
Debouncer: graph.NewDebouncer(500 * time.Millisecond),
}
config := generated.Config{
+1 -1
View File
@@ -31,6 +31,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.40.0
go.opentelemetry.io/otel/trace v1.40.0
golang.org/x/crypto v0.48.0
golang.org/x/sync v0.19.0
gopkg.in/yaml.v3 v3.0.1
)
@@ -80,7 +81,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/net v0.50.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/tools v0.42.0 // indirect
+36
View File
@@ -1,11 +1,14 @@
package graph
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"time"
"golang.org/x/sync/semaphore"
"gopkg.in/yaml.v3"
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
@@ -123,3 +126,36 @@ func GenerateCosmoRouterConfigWithExecutor(subGraphs []*model.SubGraph, executor
return string(configJSON), nil
}
// CosmoGenerator wraps config generation with a concurrency limit and timeout
// to prevent unbounded wgc process spawning under rapid schema updates.
type CosmoGenerator struct {
sem *semaphore.Weighted
executor CommandExecutor
timeout time.Duration
}
// NewCosmoGenerator creates a CosmoGenerator that allows at most one concurrent
// wgc process and applies the given timeout to each generation attempt.
func NewCosmoGenerator(executor CommandExecutor, timeout time.Duration) *CosmoGenerator {
return &CosmoGenerator{
sem: semaphore.NewWeighted(1),
executor: executor,
timeout: timeout,
}
}
// Generate produces a Cosmo Router config, blocking if another generation is
// already in progress. The provided context (plus the configured timeout)
// controls cancellation.
func (g *CosmoGenerator) Generate(ctx context.Context, subGraphs []*model.SubGraph) (string, error) {
ctx, cancel := context.WithTimeout(ctx, g.timeout)
defer cancel()
if err := g.sem.Acquire(ctx, 1); err != nil {
return "", fmt.Errorf("acquire cosmo generator: %w", err)
}
defer g.sem.Release(1)
return GenerateCosmoRouterConfigWithExecutor(subGraphs, g.executor)
}
+112
View File
@@ -1,11 +1,15 @@
package graph
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -459,6 +463,114 @@ func TestGenerateCosmoRouterConfig_MockError(t *testing.T) {
assert.Equal(t, 1, mockExecutor.CallCount, "Should have attempted to call executor")
}
// SlowMockExecutor simulates a slow wgc command for concurrency testing.
type SlowMockExecutor struct {
MockCommandExecutor
delay time.Duration
mu sync.Mutex
concurrent atomic.Int32
maxSeen atomic.Int32
}
func (m *SlowMockExecutor) Execute(name string, args ...string) ([]byte, error) {
cur := m.concurrent.Add(1)
// Track the maximum concurrent executions observed.
for {
old := m.maxSeen.Load()
if cur <= old || m.maxSeen.CompareAndSwap(old, cur) {
break
}
}
defer m.concurrent.Add(-1)
time.Sleep(m.delay)
m.mu.Lock()
defer m.mu.Unlock()
return m.MockCommandExecutor.Execute(name, args...)
}
func TestCosmoGenerator_ConcurrencyLimit(t *testing.T) {
executor := &SlowMockExecutor{delay: 100 * time.Millisecond}
gen := NewCosmoGenerator(executor, 5*time.Second)
subGraphs := []*model.SubGraph{
{
Service: "svc",
URL: stringPtr("http://localhost:4001/query"),
Sdl: "type Query { hello: String }",
},
}
var wg sync.WaitGroup
for range 5 {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = gen.Generate(context.Background(), subGraphs)
}()
}
wg.Wait()
assert.Equal(t, int32(1), executor.maxSeen.Load(),
"at most 1 wgc process should run concurrently")
}
func TestCosmoGenerator_Timeout(t *testing.T) {
// Executor that takes longer than the timeout.
executor := &SlowMockExecutor{delay: 500 * time.Millisecond}
gen := NewCosmoGenerator(executor, 50*time.Millisecond)
subGraphs := []*model.SubGraph{
{
Service: "svc",
URL: stringPtr("http://localhost:4001/query"),
Sdl: "type Query { hello: String }",
},
}
// First call: occupies the semaphore for 500ms.
go func() {
_, _ = gen.Generate(context.Background(), subGraphs)
}()
// Give the first goroutine time to acquire the semaphore.
time.Sleep(20 * time.Millisecond)
// Second call: should timeout waiting for the semaphore.
_, err := gen.Generate(context.Background(), subGraphs)
require.Error(t, err)
assert.Contains(t, err.Error(), "acquire cosmo generator")
}
func TestCosmoGenerator_ContextCancellation(t *testing.T) {
executor := &SlowMockExecutor{delay: 500 * time.Millisecond}
gen := NewCosmoGenerator(executor, 5*time.Second)
subGraphs := []*model.SubGraph{
{
Service: "svc",
URL: stringPtr("http://localhost:4001/query"),
Sdl: "type Query { hello: String }",
},
}
// First call: occupies the semaphore.
go func() {
_, _ = gen.Generate(context.Background(), subGraphs)
}()
time.Sleep(20 * time.Millisecond)
// Second call with an already-cancelled context.
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := gen.Generate(ctx, subGraphs)
require.Error(t, err)
assert.Contains(t, err.Error(), "acquire cosmo generator")
}
// Helper function for tests
func stringPtr(s string) *string {
return &s
+42
View File
@@ -0,0 +1,42 @@
package graph
import (
"sync"
"time"
)
// Debouncer coalesces rapid calls with the same key, executing only the last
// one after a configurable delay. This prevents redundant work when multiple
// updates arrive in quick succession (e.g., rapid schema publishing).
type Debouncer struct {
mu sync.Mutex
delay time.Duration
timers map[string]*time.Timer
}
// NewDebouncer creates a Debouncer with the given delay window.
func NewDebouncer(delay time.Duration) *Debouncer {
return &Debouncer{
delay: delay,
timers: make(map[string]*time.Timer),
}
}
// Debounce resets the timer for key. When the timer fires (after delay with no
// new calls for the same key), fn is executed in a new goroutine.
func (d *Debouncer) Debounce(key string, fn func()) {
d.mu.Lock()
defer d.mu.Unlock()
if t, ok := d.timers[key]; ok {
t.Stop()
}
d.timers[key] = time.AfterFunc(d.delay, func() {
d.mu.Lock()
delete(d.timers, key)
d.mu.Unlock()
fn()
})
}
+57
View File
@@ -0,0 +1,57 @@
package graph
import (
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDebouncer_Coalesces(t *testing.T) {
d := NewDebouncer(50 * time.Millisecond)
var calls atomic.Int32
// Fire 10 rapid calls for the same key — only the last should execute.
for range 10 {
d.Debounce("key1", func() {
calls.Add(1)
})
}
// Wait for the debounce delay plus some margin.
time.Sleep(150 * time.Millisecond)
assert.Equal(t, int32(1), calls.Load(), "rapid calls should coalesce into a single execution")
}
func TestDebouncer_DifferentKeys(t *testing.T) {
d := NewDebouncer(50 * time.Millisecond)
var calls atomic.Int32
d.Debounce("key-a", func() { calls.Add(1) })
d.Debounce("key-b", func() { calls.Add(1) })
d.Debounce("key-c", func() { calls.Add(1) })
time.Sleep(150 * time.Millisecond)
assert.Equal(t, int32(3), calls.Load(), "different keys should fire independently")
}
func TestDebouncer_TimerReset(t *testing.T) {
d := NewDebouncer(100 * time.Millisecond)
var value atomic.Int32
// First call sets value to 1.
d.Debounce("key", func() { value.Store(1) })
// Wait 60ms (less than the 100ms delay), then replace with value 2.
time.Sleep(60 * time.Millisecond)
d.Debounce("key", func() { value.Store(2) })
// At 60ms the first timer hasn't fired yet. Wait for the second timer.
time.Sleep(150 * time.Millisecond)
require.Equal(t, int32(2), value.Load(), "later call should replace the earlier one")
}
+2
View File
@@ -29,6 +29,8 @@ type Resolver struct {
Logger *slog.Logger
Cache *cache.Cache
PubSub *PubSub
CosmoGenerator *CosmoGenerator
Debouncer *Debouncer
}
func (r *Resolver) apiKeyCanAccessRef(ctx context.Context, ref string, publish bool) (string, error) {
+21 -49
View File
@@ -174,8 +174,9 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
return nil, err
}
// Publish schema update to subscribers
go func() {
// Debounce schema update publishing so rapid successive updates for the
// same org+ref only trigger one config generation.
r.Debouncer.Debounce(orgId+":"+input.Ref, func() {
services, lastUpdate := r.Cache.Services(orgId, input.Ref, "")
r.Logger.Info("Publishing schema update after subgraph change",
"ref", input.Ref,
@@ -191,19 +192,11 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
r.Logger.Error("fetch subgraph for update notification", "error", err)
continue
}
subGraphs[i] = &model.SubGraph{
ID: sg.ID.String(),
Service: sg.Service,
URL: sg.Url,
WsURL: sg.WSUrl,
Sdl: sg.Sdl,
ChangedBy: sg.ChangedBy,
ChangedAt: sg.ChangedAt,
}
subGraphs[i] = r.toGqlSubGraph(sg)
}
// Generate Cosmo router config
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
// Generate Cosmo router config (concurrency-limited)
cosmoConfig, err := r.CosmoGenerator.Generate(context.Background(), subGraphs)
if err != nil {
r.Logger.Error("generate cosmo config for update", "error", err)
cosmoConfig = "" // Send empty if generation fails
@@ -225,7 +218,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
)
r.PubSub.Publish(input.Ref, update)
}()
})
return r.toGqlSubGraph(subGraph), nil
}
@@ -292,30 +285,16 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
}, nil
}
subGraphs := make([]*model.SubGraph, len(services))
serviceSDLs := make([]string, len(services))
for i, id := range services {
sg, err := r.fetchSubGraph(ctx, id)
if err != nil {
return nil, err
}
subGraphs[i] = &model.SubGraph{
ID: sg.ID.String(),
Service: sg.Service,
URL: sg.Url,
WsURL: sg.WSUrl,
Sdl: sg.Sdl,
ChangedBy: sg.ChangedBy,
ChangedAt: sg.ChangedAt,
}
subGraphs[i] = r.toGqlSubGraph(sg)
serviceSDLs[i] = sg.Sdl
}
var serviceSDLs []string
for _, id := range services {
sg, err := r.fetchSubGraph(ctx, id)
if err != nil {
return nil, err
}
serviceSDLs = append(serviceSDLs, sg.Sdl)
}
sdl, err := sdlmerge.MergeSDLs(serviceSDLs...)
if err != nil {
return nil, err
@@ -388,8 +367,8 @@ func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.Sc
}
}
// Generate Cosmo router config
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
// Generate Cosmo router config (concurrency-limited)
cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs)
if err != nil {
r.Logger.Error("generate cosmo config", "error", err)
cosmoConfig = "" // Return empty if generation fails
@@ -432,9 +411,6 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<
// Send initial state immediately
go func() {
// Use background context for async operation
bgCtx := context.Background()
services, lastUpdate := r.Cache.Services(orgId, ref, "")
r.Logger.Info("Preparing initial schema update",
"ref", ref,
@@ -445,24 +421,16 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<
subGraphs := make([]*model.SubGraph, len(services))
for i, id := range services {
sg, err := r.fetchSubGraph(bgCtx, id)
sg, err := r.fetchSubGraph(ctx, id)
if err != nil {
r.Logger.Error("fetch subgraph for initial update", "error", err, "id", id)
continue
}
subGraphs[i] = &model.SubGraph{
ID: sg.ID.String(),
Service: sg.Service,
URL: sg.Url,
WsURL: sg.WSUrl,
Sdl: sg.Sdl,
ChangedBy: sg.ChangedBy,
ChangedAt: sg.ChangedAt,
}
subGraphs[i] = r.toGqlSubGraph(sg)
}
// Generate Cosmo router config
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
// Generate Cosmo router config (concurrency-limited)
cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs)
if err != nil {
r.Logger.Error("generate cosmo config", "error", err)
cosmoConfig = "" // Send empty if generation fails
@@ -483,7 +451,11 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<
"cosmoConfigLength", len(cosmoConfig),
)
ch <- update
select {
case ch <- update:
case <-ctx.Done():
return
}
}()
// Clean up subscription when context is done