perf(graph): warm schema cache on startup to kill cold-start spikes (#843)
Release / release (push) Failing after 59s
schemas / vulnerabilities (push) Successful in 2m34s
schemas / check (push) Successful in 2m59s
schemas / check-release (push) Successful in 3m18s
pre-commit / pre-commit (push) Successful in 6m28s
schemas / build (push) Successful in 6m20s
schemas / deploy-prod (push) Successful in 1m58s
Release / release (push) Failing after 59s
schemas / vulnerabilities (push) Successful in 2m34s
schemas / check (push) Successful in 2m59s
schemas / check-release (push) Successful in 3m18s
pre-commit / pre-commit (push) Successful in 6m28s
schemas / build (push) Successful in 6m20s
schemas / deploy-prod (push) Successful in 1m58s
This commit was merged in pull request #843.
This commit is contained in:
Vendored
+24
@@ -102,6 +102,30 @@ func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) {
|
|||||||
return services, c.lastUpdate[key]
|
return services, c.lastUpdate[key]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OrgRef identifies a single (organizationId, ref) pair that the cache
|
||||||
|
// tracks subgraphs for.
|
||||||
|
type OrgRef struct {
|
||||||
|
OrgId string
|
||||||
|
Ref string
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllOrgRefs returns every (orgId, ref) pair that currently has at least
|
||||||
|
// one subgraph in the cache. Used by startup warmup to pre-compute the
|
||||||
|
// merged SDL and SchemaUpdate for every known ref before the pod starts
|
||||||
|
// serving traffic.
|
||||||
|
func (c *Cache) AllOrgRefs() []OrgRef {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
|
var out []OrgRef
|
||||||
|
for orgId, refs := range c.services {
|
||||||
|
for ref := range refs {
|
||||||
|
out = append(out, OrgRef{OrgId: orgId, Ref: ref})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cache) SubGraphId(orgId, ref, service string) string {
|
func (c *Cache) SubGraphId(orgId, ref, service string) string {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
|
|||||||
@@ -210,6 +210,8 @@ func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(u
|
|||||||
Debouncer: graph.NewDebouncer(500 * time.Millisecond),
|
Debouncer: graph.NewDebouncer(500 * time.Millisecond),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resolver.WarmCache(rootCtx)
|
||||||
|
|
||||||
config := generated.Config{
|
config := generated.Config{
|
||||||
Resolvers: resolver,
|
Resolvers: resolver,
|
||||||
Complexity: generated.ComplexityRoot{},
|
Complexity: generated.ComplexityRoot{},
|
||||||
|
|||||||
@@ -8,7 +8,9 @@ import (
|
|||||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||||
|
|
||||||
"gitea.unbound.se/unboundsoftware/schemas/cache"
|
"gitea.unbound.se/unboundsoftware/schemas/cache"
|
||||||
|
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
|
||||||
"gitea.unbound.se/unboundsoftware/schemas/middleware"
|
"gitea.unbound.se/unboundsoftware/schemas/middleware"
|
||||||
|
"gitea.unbound.se/unboundsoftware/schemas/sdlmerge"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate go run github.com/99designs/gqlgen
|
//go:generate go run github.com/99designs/gqlgen
|
||||||
@@ -60,3 +62,60 @@ func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate
|
|||||||
func apiKeyId(orgId, name string) string {
|
func apiKeyId(orgId, name string) string {
|
||||||
return fmt.Sprintf("%s-%s", orgId, name)
|
return fmt.Sprintf("%s-%s", orgId, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WarmCache precomputes the merged SDL and SchemaUpdate (cosmo router
|
||||||
|
// config) for every (orgId, ref) tracked in the cache. Intended to run
|
||||||
|
// once at startup, after the event-sourced caches have been populated
|
||||||
|
// but before the pod accepts traffic, so the first request per ref does
|
||||||
|
// not pay the cold-start cost of running sdlmerge + wgc compose.
|
||||||
|
//
|
||||||
|
// Errors per ref are logged and skipped rather than aborting the whole
|
||||||
|
// warmup: a single bad ref must not block the pod from serving the
|
||||||
|
// remaining refs.
|
||||||
|
func (r *Resolver) WarmCache(ctx context.Context) {
|
||||||
|
refs := r.Cache.AllOrgRefs()
|
||||||
|
r.Logger.Info("Warming schema cache on startup", "refCount", len(refs))
|
||||||
|
|
||||||
|
for _, or := range refs {
|
||||||
|
services, lastUpdate := r.Cache.Services(or.OrgId, or.Ref, "")
|
||||||
|
if len(services) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
subGraphs := make([]*model.SubGraph, len(services))
|
||||||
|
serviceSDLs := make([]string, len(services))
|
||||||
|
for i, id := range services {
|
||||||
|
sg, err := r.fetchSubGraph(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
r.Logger.Error("warmup: fetch subgraph", "error", err, "orgId", or.OrgId, "ref", or.Ref, "id", id)
|
||||||
|
subGraphs = nil
|
||||||
|
break
|
||||||
|
}
|
||||||
|
subGraphs[i] = r.toGqlSubGraph(sg)
|
||||||
|
serviceSDLs[i] = sg.Sdl
|
||||||
|
}
|
||||||
|
if subGraphs == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if sdl, err := sdlmerge.MergeSDLs(serviceSDLs...); err != nil {
|
||||||
|
r.Logger.Error("warmup: merge SDLs", "error", err, "orgId", or.OrgId, "ref", or.Ref)
|
||||||
|
} else {
|
||||||
|
r.Cache.SetMergedSDL(or.OrgId, or.Ref, lastUpdate, sdl, subGraphs)
|
||||||
|
}
|
||||||
|
|
||||||
|
cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs)
|
||||||
|
if err != nil {
|
||||||
|
r.Logger.Error("warmup: generate cosmo config", "error", err, "orgId", or.OrgId, "ref", or.Ref)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
r.Cache.SetSchemaUpdate(or.OrgId, or.Ref, &model.SchemaUpdate{
|
||||||
|
Ref: or.Ref,
|
||||||
|
ID: lastUpdate,
|
||||||
|
SubGraphs: subGraphs,
|
||||||
|
CosmoRouterConfig: &cosmoConfig,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Logger.Info("Schema cache warmup complete", "refCount", len(refs))
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user