From d652c1e446c208c170b2ee44441b05858951fd38 Mon Sep 17 00:00:00 2001 From: Joakim Olsson Date: Tue, 19 May 2026 09:37:43 +0200 Subject: [PATCH] perf(graph): cache merged SDL and SchemaUpdate per ref Both Supergraph and LatestSchema resolvers recomputed their result on every request. The work is non-trivial: - Supergraph: sdlmerge.MergeSDLs() runs AST validation + normalization + custom merge walkers over all subgraph SDLs. - LatestSchema: CosmoGenerator.Generate() shells out to wgc router compose (Node via npx), spending 100-300m CPU per call. Because the output is fully determined by the set of subgraph SDLs and their lastUpdate timestamp, the result can be cached and reused across requests until a SubGraphUpdated event bumps the lastUpdate for the (orgId, ref) key. Add two precomputation caches to cache.Cache, both versioned by the existing lastUpdate map so a single timestamp comparison invalidates stale entries implicitly: - mergedSDLs: cached MergeSDLs output for Supergraph - schemaUpdates: cached SchemaUpdate (subgraphs + cosmo config) for LatestSchema The UpdateSubGraph debounce already computes the cosmo config to publish through PubSub; it now also stores the SchemaUpdate so the next LatestSchema query is warm. OrganizationRemoved evicts both caches alongside lastUpdate. This eliminates the per-request CPU bursts that were tripping the HPA into TooManyReplicas territory. --- cache/cache.go | 76 +++++++++++++++++++++++++++++++++++++-- graph/schema.resolvers.go | 27 +++++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 5e0f761..d35f65b 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -10,6 +10,7 @@ import ( "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitea.unbound.se/unboundsoftware/schemas/domain" + "gitea.unbound.se/unboundsoftware/schemas/graph/model" "gitea.unbound.se/unboundsoftware/schemas/hash" ) @@ -21,9 +22,19 @@ type Cache struct { services map[string]map[string]map[string]struct{} subGraphs map[string]string lastUpdate map[string]string + mergedSDLs map[string]*mergedSDLEntry + schemaUpdates map[string]*model.SchemaUpdate logger *slog.Logger } +// mergedSDLEntry holds a precomputed merged SDL together with the lastUpdate +// id it was computed against, so stale entries can be detected on read. +type mergedSDLEntry struct { + ID string + SDL string + SubGraphs []*model.SubGraph +} + func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization { c.mu.RLock() defer c.mu.RUnlock() @@ -184,8 +195,11 @@ func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) { for service := range refs[ref] { delete(c.subGraphs, subGraphKey(orgId, ref, service)) } - // Remove lastUpdate for this org/ref - delete(c.lastUpdate, refKey(orgId, ref)) + // Remove cached results for this org/ref + rk := refKey(orgId, ref) + delete(c.lastUpdate, rk) + delete(c.mergedSDLs, rk) + delete(c.schemaUpdates, rk) } delete(c.services, orgId) } @@ -253,10 +267,68 @@ func New(logger *slog.Logger) *Cache { services: make(map[string]map[string]map[string]struct{}), subGraphs: make(map[string]string), lastUpdate: make(map[string]string), + mergedSDLs: make(map[string]*mergedSDLEntry), + schemaUpdates: make(map[string]*model.SchemaUpdate), logger: logger, } } +// GetMergedSDL returns the cached merged SDL for (orgId, ref) if it was +// computed against the current lastUpdate. Returns nil when missing or stale. +func (c *Cache) GetMergedSDL(orgId, ref string) *mergedSDLEntry { + c.mu.RLock() + defer c.mu.RUnlock() + + key := refKey(orgId, ref) + entry := c.mergedSDLs[key] + if entry == nil || entry.ID != c.lastUpdate[key] { + return nil + } + return entry +} + +// MergedSDLEntry exposes the cached merged SDL fields to callers. +func (e *mergedSDLEntry) Unpack() (id, sdl string, subGraphs []*model.SubGraph) { + return e.ID, e.SDL, e.SubGraphs +} + +// SetMergedSDL stores a precomputed merged SDL for (orgId, ref). The entry +// is only retained while c.lastUpdate[key] matches id; subsequent updates +// invalidate it implicitly via the version mismatch in GetMergedSDL. +func (c *Cache) SetMergedSDL(orgId, ref, id, sdl string, subGraphs []*model.SubGraph) { + c.mu.Lock() + defer c.mu.Unlock() + + c.mergedSDLs[refKey(orgId, ref)] = &mergedSDLEntry{ + ID: id, + SDL: sdl, + SubGraphs: subGraphs, + } +} + +// GetSchemaUpdate returns the cached SchemaUpdate (subgraphs + cosmo router +// config) for (orgId, ref) when its id matches the current lastUpdate. +// Returns nil when missing or stale. +func (c *Cache) GetSchemaUpdate(orgId, ref string) *model.SchemaUpdate { + c.mu.RLock() + defer c.mu.RUnlock() + + key := refKey(orgId, ref) + upd := c.schemaUpdates[key] + if upd == nil || upd.ID != c.lastUpdate[key] { + return nil + } + return upd +} + +// SetSchemaUpdate stores a precomputed SchemaUpdate for (orgId, ref). +func (c *Cache) SetSchemaUpdate(orgId, ref string, update *model.SchemaUpdate) { + c.mu.Lock() + defer c.mu.Unlock() + + c.schemaUpdates[refKey(orgId, ref)] = update +} + func refKey(orgId string, ref string) string { return fmt.Sprintf("%s<->%s", orgId, ref) } diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 9b3a49b..975c24f 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -210,6 +210,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input SubGraphs: subGraphs, CosmoRouterConfig: &cosmoConfig, } + r.Cache.SetSchemaUpdate(orgId, input.Ref, update) r.Logger.Info( "Publishing schema update to subscribers", @@ -280,13 +281,25 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str if isAfter != nil { after = *isAfter } - services, lastUpdate := r.Cache.Services(orgId, ref, after) + _, lastUpdate := r.Cache.Services(orgId, ref, after) if after == lastUpdate { return &model.Unchanged{ ID: lastUpdate, MinDelaySeconds: 10, }, nil } + + if cached := r.Cache.GetMergedSDL(orgId, ref); cached != nil { + id, sdl, subGraphs := cached.Unpack() + return &model.SubGraphs{ + ID: id, + SubGraphs: subGraphs, + Sdl: sdl, + MinDelaySeconds: 10, + }, nil + } + + services, _ := r.Cache.Services(orgId, ref, "") subGraphs := make([]*model.SubGraph, len(services)) serviceSDLs := make([]string, len(services)) for i, id := range services { @@ -302,6 +315,7 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str if err != nil { return nil, err } + r.Cache.SetMergedSDL(orgId, ref, lastUpdate, sdl, subGraphs) return &model.SubGraphs{ ID: lastUpdate, SubGraphs: subGraphs, @@ -344,6 +358,16 @@ func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.Sc return nil, fmt.Errorf("no authentication provided") } + if cached := r.Cache.GetSchemaUpdate(orgId, ref); cached != nil { + r.Logger.Info( + "Latest schema served from cache", + "ref", ref, + "orgId", orgId, + "id", cached.ID, + ) + return cached, nil + } + // Get current services and schema services, lastUpdate := r.Cache.Services(orgId, ref, "") r.Logger.Info( @@ -385,6 +409,7 @@ func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.Sc SubGraphs: subGraphs, CosmoRouterConfig: &cosmoConfig, } + r.Cache.SetSchemaUpdate(orgId, ref, update) r.Logger.Info( "Latest schema fetched",