perf(graph): cache merged SDL and SchemaUpdate per ref (#841)
Release / release (push) Failing after 54s
schemas / vulnerabilities (push) Successful in 2m11s
schemas / check (push) Successful in 2m36s
schemas / check-release (push) Successful in 2m54s
pre-commit / pre-commit (push) Successful in 8m10s
schemas / build (push) Successful in 5m43s
schemas / deploy-prod (push) Successful in 1m9s

This commit was merged in pull request #841.
This commit is contained in:
2026-05-19 07:51:49 +00:00
parent 9a4b05d897
commit 39cf6fbb8c
2 changed files with 100 additions and 3 deletions
+74 -2
View File
@@ -10,6 +10,7 @@ import (
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
"gitea.unbound.se/unboundsoftware/schemas/domain"
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
"gitea.unbound.se/unboundsoftware/schemas/hash"
)
@@ -21,9 +22,19 @@ type Cache struct {
services map[string]map[string]map[string]struct{}
subGraphs map[string]string
lastUpdate map[string]string
mergedSDLs map[string]*mergedSDLEntry
schemaUpdates map[string]*model.SchemaUpdate
logger *slog.Logger
}
// mergedSDLEntry holds a precomputed merged SDL together with the lastUpdate
// id it was computed against, so stale entries can be detected on read.
type mergedSDLEntry struct {
ID string
SDL string
SubGraphs []*model.SubGraph
}
func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization {
c.mu.RLock()
defer c.mu.RUnlock()
@@ -184,8 +195,11 @@ func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
for service := range refs[ref] {
delete(c.subGraphs, subGraphKey(orgId, ref, service))
}
// Remove lastUpdate for this org/ref
delete(c.lastUpdate, refKey(orgId, ref))
// Remove cached results for this org/ref
rk := refKey(orgId, ref)
delete(c.lastUpdate, rk)
delete(c.mergedSDLs, rk)
delete(c.schemaUpdates, rk)
}
delete(c.services, orgId)
}
@@ -253,10 +267,68 @@ func New(logger *slog.Logger) *Cache {
services: make(map[string]map[string]map[string]struct{}),
subGraphs: make(map[string]string),
lastUpdate: make(map[string]string),
mergedSDLs: make(map[string]*mergedSDLEntry),
schemaUpdates: make(map[string]*model.SchemaUpdate),
logger: logger,
}
}
// GetMergedSDL returns the cached merged SDL for (orgId, ref) if it was
// computed against the current lastUpdate. Returns nil when missing or stale.
func (c *Cache) GetMergedSDL(orgId, ref string) *mergedSDLEntry {
c.mu.RLock()
defer c.mu.RUnlock()
key := refKey(orgId, ref)
entry := c.mergedSDLs[key]
if entry == nil || entry.ID != c.lastUpdate[key] {
return nil
}
return entry
}
// MergedSDLEntry exposes the cached merged SDL fields to callers.
func (e *mergedSDLEntry) Unpack() (id, sdl string, subGraphs []*model.SubGraph) {
return e.ID, e.SDL, e.SubGraphs
}
// SetMergedSDL stores a precomputed merged SDL for (orgId, ref). The entry
// is only retained while c.lastUpdate[key] matches id; subsequent updates
// invalidate it implicitly via the version mismatch in GetMergedSDL.
func (c *Cache) SetMergedSDL(orgId, ref, id, sdl string, subGraphs []*model.SubGraph) {
c.mu.Lock()
defer c.mu.Unlock()
c.mergedSDLs[refKey(orgId, ref)] = &mergedSDLEntry{
ID: id,
SDL: sdl,
SubGraphs: subGraphs,
}
}
// GetSchemaUpdate returns the cached SchemaUpdate (subgraphs + cosmo router
// config) for (orgId, ref) when its id matches the current lastUpdate.
// Returns nil when missing or stale.
func (c *Cache) GetSchemaUpdate(orgId, ref string) *model.SchemaUpdate {
c.mu.RLock()
defer c.mu.RUnlock()
key := refKey(orgId, ref)
upd := c.schemaUpdates[key]
if upd == nil || upd.ID != c.lastUpdate[key] {
return nil
}
return upd
}
// SetSchemaUpdate stores a precomputed SchemaUpdate for (orgId, ref).
func (c *Cache) SetSchemaUpdate(orgId, ref string, update *model.SchemaUpdate) {
c.mu.Lock()
defer c.mu.Unlock()
c.schemaUpdates[refKey(orgId, ref)] = update
}
func refKey(orgId string, ref string) string {
return fmt.Sprintf("%s<->%s", orgId, ref)
}