Compare commits
16 Commits
v0.9.7
...
d2d053e559
| Author | SHA1 | Date | |
|---|---|---|---|
| d2d053e559 | |||
| 7ea5624734 | |||
| f8a1a5ee58 | |||
| 71922e8004 | |||
| b2caaa6b60 | |||
| 3f07e99fa1 | |||
| 111c2e4b19 | |||
| 4e50a051d0 | |||
| 9d70c0462a | |||
| 39cf6fbb8c | |||
| 9a4b05d897 | |||
| c4829cf280 | |||
| 43765101f5 | |||
| 7dddc70eda | |||
| 6be8e90e26 | |||
| ec4c1895c8 |
+6
-4
@@ -1,4 +1,4 @@
|
||||
FROM amd64/golang:1.26.3@sha256:f9a8afb8a11d7e9220a5e61cedf2bb34babc7618ee51838e16c120fc74e0d92f as modules
|
||||
FROM amd64/golang:1.26.3@sha256:54d32460e205919fa90b984134a9cfc26152443306d685f0d92722c108d31771 as modules
|
||||
WORKDIR /build
|
||||
ADD go.* /build
|
||||
RUN go mod download
|
||||
@@ -24,14 +24,16 @@ RUN GOOS=linux GOARCH=amd64 go build \
|
||||
FROM scratch as export
|
||||
COPY --from=build /build/coverage.txt /
|
||||
|
||||
FROM node:24.15.0-alpine@sha256:d1b3b4da11eefd5941e7f0b9cf17783fc99d9c6fc34884a665f40a06dbdfc94f
|
||||
FROM node:24.16.0-alpine@sha256:2bdb65ed1dab192432bc31c95f94155ca5ad7fc1392fb7eb7526ab682fa5bf14
|
||||
ENV TZ Europe/Stockholm
|
||||
|
||||
# Install wgc CLI globally for Cosmo Router composition
|
||||
RUN npm install -g wgc@latest
|
||||
|
||||
# Cap Node.js heap for runtime wgc invocations to prevent OOM
|
||||
ENV NODE_OPTIONS="--max-old-space-size=64"
|
||||
# Cap Node.js heap for runtime wgc invocations. 512MB leaves room for
|
||||
# composing supergraphs with many subgraphs; lower caps (e.g. 64MB) OOM
|
||||
# wgc as the subgraph count grows.
|
||||
ENV NODE_OPTIONS="--max-old-space-size=512"
|
||||
|
||||
# Copy timezone data and certificates
|
||||
COPY --from=build /usr/share/zoneinfo /usr/share/zoneinfo
|
||||
|
||||
Vendored
+98
-2
@@ -10,6 +10,7 @@ import (
|
||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||
|
||||
"gitea.unbound.se/unboundsoftware/schemas/domain"
|
||||
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
|
||||
"gitea.unbound.se/unboundsoftware/schemas/hash"
|
||||
)
|
||||
|
||||
@@ -21,9 +22,19 @@ type Cache struct {
|
||||
services map[string]map[string]map[string]struct{}
|
||||
subGraphs map[string]string
|
||||
lastUpdate map[string]string
|
||||
mergedSDLs map[string]*mergedSDLEntry
|
||||
schemaUpdates map[string]*model.SchemaUpdate
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// mergedSDLEntry holds a precomputed merged SDL together with the lastUpdate
|
||||
// id it was computed against, so stale entries can be detected on read.
|
||||
type mergedSDLEntry struct {
|
||||
ID string
|
||||
SDL string
|
||||
SubGraphs []*model.SubGraph
|
||||
}
|
||||
|
||||
func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
@@ -91,6 +102,30 @@ func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) {
|
||||
return services, c.lastUpdate[key]
|
||||
}
|
||||
|
||||
// OrgRef identifies a single (organizationId, ref) pair that the cache
|
||||
// tracks subgraphs for.
|
||||
type OrgRef struct {
|
||||
OrgId string
|
||||
Ref string
|
||||
}
|
||||
|
||||
// AllOrgRefs returns every (orgId, ref) pair that currently has at least
|
||||
// one subgraph in the cache. Used by startup warmup to pre-compute the
|
||||
// merged SDL and SchemaUpdate for every known ref before the pod starts
|
||||
// serving traffic.
|
||||
func (c *Cache) AllOrgRefs() []OrgRef {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
var out []OrgRef
|
||||
for orgId, refs := range c.services {
|
||||
for ref := range refs {
|
||||
out = append(out, OrgRef{OrgId: orgId, Ref: ref})
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (c *Cache) SubGraphId(orgId, ref, service string) string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
@@ -184,8 +219,11 @@ func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
|
||||
for service := range refs[ref] {
|
||||
delete(c.subGraphs, subGraphKey(orgId, ref, service))
|
||||
}
|
||||
// Remove lastUpdate for this org/ref
|
||||
delete(c.lastUpdate, refKey(orgId, ref))
|
||||
// Remove cached results for this org/ref
|
||||
rk := refKey(orgId, ref)
|
||||
delete(c.lastUpdate, rk)
|
||||
delete(c.mergedSDLs, rk)
|
||||
delete(c.schemaUpdates, rk)
|
||||
}
|
||||
delete(c.services, orgId)
|
||||
}
|
||||
@@ -253,10 +291,68 @@ func New(logger *slog.Logger) *Cache {
|
||||
services: make(map[string]map[string]map[string]struct{}),
|
||||
subGraphs: make(map[string]string),
|
||||
lastUpdate: make(map[string]string),
|
||||
mergedSDLs: make(map[string]*mergedSDLEntry),
|
||||
schemaUpdates: make(map[string]*model.SchemaUpdate),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// GetMergedSDL returns the cached merged SDL for (orgId, ref) if it was
|
||||
// computed against the current lastUpdate. Returns nil when missing or stale.
|
||||
func (c *Cache) GetMergedSDL(orgId, ref string) *mergedSDLEntry {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
key := refKey(orgId, ref)
|
||||
entry := c.mergedSDLs[key]
|
||||
if entry == nil || entry.ID != c.lastUpdate[key] {
|
||||
return nil
|
||||
}
|
||||
return entry
|
||||
}
|
||||
|
||||
// MergedSDLEntry exposes the cached merged SDL fields to callers.
|
||||
func (e *mergedSDLEntry) Unpack() (id, sdl string, subGraphs []*model.SubGraph) {
|
||||
return e.ID, e.SDL, e.SubGraphs
|
||||
}
|
||||
|
||||
// SetMergedSDL stores a precomputed merged SDL for (orgId, ref). The entry
|
||||
// is only retained while c.lastUpdate[key] matches id; subsequent updates
|
||||
// invalidate it implicitly via the version mismatch in GetMergedSDL.
|
||||
func (c *Cache) SetMergedSDL(orgId, ref, id, sdl string, subGraphs []*model.SubGraph) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.mergedSDLs[refKey(orgId, ref)] = &mergedSDLEntry{
|
||||
ID: id,
|
||||
SDL: sdl,
|
||||
SubGraphs: subGraphs,
|
||||
}
|
||||
}
|
||||
|
||||
// GetSchemaUpdate returns the cached SchemaUpdate (subgraphs + cosmo router
|
||||
// config) for (orgId, ref) when its id matches the current lastUpdate.
|
||||
// Returns nil when missing or stale.
|
||||
func (c *Cache) GetSchemaUpdate(orgId, ref string) *model.SchemaUpdate {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
key := refKey(orgId, ref)
|
||||
upd := c.schemaUpdates[key]
|
||||
if upd == nil || upd.ID != c.lastUpdate[key] {
|
||||
return nil
|
||||
}
|
||||
return upd
|
||||
}
|
||||
|
||||
// SetSchemaUpdate stores a precomputed SchemaUpdate for (orgId, ref).
|
||||
func (c *Cache) SetSchemaUpdate(orgId, ref string, update *model.SchemaUpdate) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.schemaUpdates[refKey(orgId, ref)] = update
|
||||
}
|
||||
|
||||
func refKey(orgId string, ref string) string {
|
||||
return fmt.Sprintf("%s<->%s", orgId, ref)
|
||||
}
|
||||
|
||||
@@ -210,6 +210,8 @@ func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(u
|
||||
Debouncer: graph.NewDebouncer(500 * time.Millisecond),
|
||||
}
|
||||
|
||||
resolver.WarmCache(rootCtx)
|
||||
|
||||
config := generated.Config{
|
||||
Resolvers: resolver,
|
||||
Complexity: generated.ComplexityRoot{},
|
||||
|
||||
@@ -8,7 +8,7 @@ require (
|
||||
github.com/Khan/genqlient v0.8.1
|
||||
github.com/alecthomas/kong v1.15.0
|
||||
github.com/apex/log v1.9.0
|
||||
github.com/auth0/go-jwt-middleware/v3 v3.1.0
|
||||
github.com/auth0/go-jwt-middleware/v3 v3.2.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/jmoiron/sqlx v1.4.0
|
||||
github.com/pressly/goose/v3 v3.27.1
|
||||
@@ -16,10 +16,10 @@ require (
|
||||
github.com/sparetimecoders/goamqp v0.3.3
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/vektah/gqlparser/v2 v2.5.33
|
||||
github.com/wundergraph/graphql-go-tools/v2 v2.1.0
|
||||
github.com/wundergraph/graphql-go-tools/v2 v2.4.0
|
||||
gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4
|
||||
gitlab.com/unboundsoftware/eventsourced/pg v1.18.8
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.21.0
|
||||
gitlab.com/unboundsoftware/eventsourced/pg v1.20.0
|
||||
go.opentelemetry.io/contrib/bridges/otelslog v0.18.0
|
||||
go.opentelemetry.io/otel v1.43.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0
|
||||
@@ -30,7 +30,7 @@ require (
|
||||
go.opentelemetry.io/otel/sdk/log v0.19.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0
|
||||
go.opentelemetry.io/otel/trace v1.43.0
|
||||
golang.org/x/crypto v0.51.0
|
||||
golang.org/x/crypto v0.52.0
|
||||
golang.org/x/sync v0.20.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
@@ -80,12 +80,12 @@ require (
|
||||
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/mod v0.35.0 // indirect
|
||||
golang.org/x/net v0.53.0 // indirect
|
||||
golang.org/x/sys v0.44.0 // indirect
|
||||
golang.org/x/net v0.54.0 // indirect
|
||||
golang.org/x/sys v0.45.0 // indirect
|
||||
golang.org/x/text v0.37.0 // indirect
|
||||
golang.org/x/tools v0.44.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260420184626-e10c466a9529 // indirect
|
||||
google.golang.org/grpc v1.80.0 // indirect
|
||||
google.golang.org/grpc v1.81.0 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
)
|
||||
|
||||
@@ -28,8 +28,8 @@ github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy
|
||||
github.com/aphistic/sweet v0.2.0/go.mod h1:fWDlIh/isSE9n6EPsRmC0det+whmX6dJid3stzu0Xys=
|
||||
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q=
|
||||
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE=
|
||||
github.com/auth0/go-jwt-middleware/v3 v3.1.0 h1:1aqVJA9K0+B6hP6qqMjTsJUk/L14sJSUjiTGW2/mY64=
|
||||
github.com/auth0/go-jwt-middleware/v3 v3.1.0/go.mod h1:BBZCQAXmqC/QfwzWyHOqF/kwN4C66eMeayy9QS6TgT4=
|
||||
github.com/auth0/go-jwt-middleware/v3 v3.2.0 h1:OP0/YH89A+w03zOjuRPPgKh5S+1+uAmY/vtllYUSWCM=
|
||||
github.com/auth0/go-jwt-middleware/v3 v3.2.0/go.mod h1:/f0hy3exUWxL7/4XJ1oSHBDSBf2Os2C1VT2RkQ9frs0=
|
||||
github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
|
||||
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
|
||||
@@ -198,14 +198,14 @@ github.com/wundergraph/astjson v1.1.0 h1:xORDosrZ87zQFJwNGe/HIHXqzpdHOFmqWgykCLV
|
||||
github.com/wundergraph/astjson v1.1.0/go.mod h1:h12D/dxxnedtLzsKyBLK7/Oe4TAoGpRVC9nDpDrZSWw=
|
||||
github.com/wundergraph/go-arena v1.1.0 h1:9+wSRkJAkA2vbYHp6s8tEGhPViRGQNGXqPHT0QzhdIc=
|
||||
github.com/wundergraph/go-arena v1.1.0/go.mod h1:ROOysEHWJjLQ8FSfNxZCziagb7Qw2nXY3/vgKRh7eWw=
|
||||
github.com/wundergraph/graphql-go-tools/v2 v2.1.0 h1:V1MU/uo+oc5b+aIh3SpCr0rJgLHuhonWg2fhN1sfMdY=
|
||||
github.com/wundergraph/graphql-go-tools/v2 v2.1.0/go.mod h1:UG/grnPEHumtD82H8FC+3dokiCGK8GF0b5IJc00lSbM=
|
||||
github.com/wundergraph/graphql-go-tools/v2 v2.4.0 h1:Vdv6GmApSE5I0YxDDOOxev26tefEZXerP2HpzKkLU9c=
|
||||
github.com/wundergraph/graphql-go-tools/v2 v2.4.0/go.mod h1:xH7XBGtKJkNTi6w6TnCDLRa7Jo2gyBBRUipIYwC5vLI=
|
||||
gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1 h1:X6269JoAzHIKCVmtgMHZH3m7xOpACSp37ca3eODe9iU=
|
||||
gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1/go.mod h1:EAs0d6Eh0aDiQkUJlSWErHqgHFQdxx0e8I7aG/2FarY=
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4 h1:+yZkhi9/sTyBEN5vJTfvycyXgGrm07QKGSh3jiWiQdM=
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4/go.mod h1:LrA7I7etRmhIC1PjO8c26BHm+gWsy2rC3eSMe5+XUWE=
|
||||
gitlab.com/unboundsoftware/eventsourced/pg v1.18.8 h1:vEFD8pe8flZtigHnWQwUFVkeKusQKgp0P3zBNf/bx5I=
|
||||
gitlab.com/unboundsoftware/eventsourced/pg v1.18.8/go.mod h1:ozKUdZWa72YJsI1R2PlQkZyDXTjV0d/1qSM38bM31tw=
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.21.0 h1:iJjDO1ivOwLFx4ttcGvTCTBl2Of2lNUFC3ZOxbu46gI=
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.21.0/go.mod h1:LrA7I7etRmhIC1PjO8c26BHm+gWsy2rC3eSMe5+XUWE=
|
||||
gitlab.com/unboundsoftware/eventsourced/pg v1.20.0 h1:RckhEDuWeqac7V7sQKQgkB+3G2ap1PGJmfsCKPDuwyU=
|
||||
gitlab.com/unboundsoftware/eventsourced/pg v1.20.0/go.mod h1:XnRbdiIFxRAA1ZoQypSAViBA9yn4jVLlJDVGRrPpusg=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/contrib/bridges/otelslog v0.18.0 h1:hhPGP3zvvy1xWT9RTy970wlniSxFttBIsAK1gvMguJM=
|
||||
@@ -242,15 +242,15 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
|
||||
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
|
||||
golang.org/x/crypto v0.52.0 h1:RMs7fP2rXdep0CftQlK8Uf+kibLm7qkCcradZWYz988=
|
||||
golang.org/x/crypto v0.52.0/go.mod h1:1QgfPxDqh0T2M/elOJtp9RvuR95kVjir0e6/BvEmGbc=
|
||||
golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM=
|
||||
golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
|
||||
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
|
||||
golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w=
|
||||
golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
@@ -258,8 +258,8 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
||||
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
|
||||
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
|
||||
@@ -273,8 +273,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260420184626-e10c466a9529 h1:XF8+t6QQiS0o9ArVan/HW8Q7cycNPGsJf6GA2nXxYAg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260420184626-e10c466a9529/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
||||
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
|
||||
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
|
||||
google.golang.org/grpc v1.81.0 h1:W3G9N3KQf3BU+YuCtGKJk0CmxQNbAISICD/9AORxLIw=
|
||||
google.golang.org/grpc v1.81.0/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
@@ -8,7 +8,9 @@ import (
|
||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||
|
||||
"gitea.unbound.se/unboundsoftware/schemas/cache"
|
||||
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
|
||||
"gitea.unbound.se/unboundsoftware/schemas/middleware"
|
||||
"gitea.unbound.se/unboundsoftware/schemas/sdlmerge"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/99designs/gqlgen
|
||||
@@ -60,3 +62,60 @@ func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate
|
||||
func apiKeyId(orgId, name string) string {
|
||||
return fmt.Sprintf("%s-%s", orgId, name)
|
||||
}
|
||||
|
||||
// WarmCache precomputes the merged SDL and SchemaUpdate (cosmo router
|
||||
// config) for every (orgId, ref) tracked in the cache. Intended to run
|
||||
// once at startup, after the event-sourced caches have been populated
|
||||
// but before the pod accepts traffic, so the first request per ref does
|
||||
// not pay the cold-start cost of running sdlmerge + wgc compose.
|
||||
//
|
||||
// Errors per ref are logged and skipped rather than aborting the whole
|
||||
// warmup: a single bad ref must not block the pod from serving the
|
||||
// remaining refs.
|
||||
func (r *Resolver) WarmCache(ctx context.Context) {
|
||||
refs := r.Cache.AllOrgRefs()
|
||||
r.Logger.Info("Warming schema cache on startup", "refCount", len(refs))
|
||||
|
||||
for _, or := range refs {
|
||||
services, lastUpdate := r.Cache.Services(or.OrgId, or.Ref, "")
|
||||
if len(services) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
subGraphs := make([]*model.SubGraph, len(services))
|
||||
serviceSDLs := make([]string, len(services))
|
||||
for i, id := range services {
|
||||
sg, err := r.fetchSubGraph(ctx, id)
|
||||
if err != nil {
|
||||
r.Logger.Error("warmup: fetch subgraph", "error", err, "orgId", or.OrgId, "ref", or.Ref, "id", id)
|
||||
subGraphs = nil
|
||||
break
|
||||
}
|
||||
subGraphs[i] = r.toGqlSubGraph(sg)
|
||||
serviceSDLs[i] = sg.Sdl
|
||||
}
|
||||
if subGraphs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if sdl, err := sdlmerge.MergeSDLs(serviceSDLs...); err != nil {
|
||||
r.Logger.Error("warmup: merge SDLs", "error", err, "orgId", or.OrgId, "ref", or.Ref)
|
||||
} else {
|
||||
r.Cache.SetMergedSDL(or.OrgId, or.Ref, lastUpdate, sdl, subGraphs)
|
||||
}
|
||||
|
||||
cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs)
|
||||
if err != nil {
|
||||
r.Logger.Error("warmup: generate cosmo config", "error", err, "orgId", or.OrgId, "ref", or.Ref)
|
||||
continue
|
||||
}
|
||||
r.Cache.SetSchemaUpdate(or.OrgId, or.Ref, &model.SchemaUpdate{
|
||||
Ref: or.Ref,
|
||||
ID: lastUpdate,
|
||||
SubGraphs: subGraphs,
|
||||
CosmoRouterConfig: &cosmoConfig,
|
||||
})
|
||||
}
|
||||
|
||||
r.Logger.Info("Schema cache warmup complete", "refCount", len(refs))
|
||||
}
|
||||
|
||||
@@ -210,6 +210,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
||||
SubGraphs: subGraphs,
|
||||
CosmoRouterConfig: &cosmoConfig,
|
||||
}
|
||||
r.Cache.SetSchemaUpdate(orgId, input.Ref, update)
|
||||
|
||||
r.Logger.Info(
|
||||
"Publishing schema update to subscribers",
|
||||
@@ -280,13 +281,25 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
|
||||
if isAfter != nil {
|
||||
after = *isAfter
|
||||
}
|
||||
services, lastUpdate := r.Cache.Services(orgId, ref, after)
|
||||
_, lastUpdate := r.Cache.Services(orgId, ref, after)
|
||||
if after == lastUpdate {
|
||||
return &model.Unchanged{
|
||||
ID: lastUpdate,
|
||||
MinDelaySeconds: 10,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if cached := r.Cache.GetMergedSDL(orgId, ref); cached != nil {
|
||||
id, sdl, subGraphs := cached.Unpack()
|
||||
return &model.SubGraphs{
|
||||
ID: id,
|
||||
SubGraphs: subGraphs,
|
||||
Sdl: sdl,
|
||||
MinDelaySeconds: 10,
|
||||
}, nil
|
||||
}
|
||||
|
||||
services, _ := r.Cache.Services(orgId, ref, "")
|
||||
subGraphs := make([]*model.SubGraph, len(services))
|
||||
serviceSDLs := make([]string, len(services))
|
||||
for i, id := range services {
|
||||
@@ -302,6 +315,7 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.Cache.SetMergedSDL(orgId, ref, lastUpdate, sdl, subGraphs)
|
||||
return &model.SubGraphs{
|
||||
ID: lastUpdate,
|
||||
SubGraphs: subGraphs,
|
||||
@@ -344,6 +358,16 @@ func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.Sc
|
||||
return nil, fmt.Errorf("no authentication provided")
|
||||
}
|
||||
|
||||
if cached := r.Cache.GetSchemaUpdate(orgId, ref); cached != nil {
|
||||
r.Logger.Info(
|
||||
"Latest schema served from cache",
|
||||
"ref", ref,
|
||||
"orgId", orgId,
|
||||
"id", cached.ID,
|
||||
)
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
// Get current services and schema
|
||||
services, lastUpdate := r.Cache.Services(orgId, ref, "")
|
||||
r.Logger.Info(
|
||||
@@ -385,6 +409,7 @@ func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.Sc
|
||||
SubGraphs: subGraphs,
|
||||
CosmoRouterConfig: &cosmoConfig,
|
||||
}
|
||||
r.Cache.SetSchemaUpdate(orgId, ref, update)
|
||||
|
||||
r.Logger.Info(
|
||||
"Latest schema fetched",
|
||||
|
||||
@@ -18,3 +18,23 @@ spec:
|
||||
target:
|
||||
type: Utilization
|
||||
averageUtilization: 60
|
||||
behavior:
|
||||
scaleUp:
|
||||
# Wait 2min of sustained high CPU before scaling up. Schemas is
|
||||
# event-driven and the per-request work is bursty even with the
|
||||
# cache + warmup, so single spikes shouldn't pull replicas up.
|
||||
stabilizationWindowSeconds: 120
|
||||
policies:
|
||||
- type: Pods
|
||||
value: 1
|
||||
periodSeconds: 60
|
||||
scaleDown:
|
||||
# Default 300s window kept pods pinned at maxReplicas long after
|
||||
# the triggering spike had subsided. 120s is long enough to avoid
|
||||
# flapping but lets the deployment return to minReplicas quickly
|
||||
# once the workload calms.
|
||||
stabilizationWindowSeconds: 120
|
||||
policies:
|
||||
- type: Pods
|
||||
value: 1
|
||||
periodSeconds: 60
|
||||
|
||||
+5
-3
@@ -42,8 +42,10 @@ spec:
|
||||
- name: schemas
|
||||
resources:
|
||||
requests:
|
||||
cpu: "20m"
|
||||
memory: "20Mi"
|
||||
cpu: "100m"
|
||||
memory: "128Mi"
|
||||
limits:
|
||||
memory: "768Mi"
|
||||
livenessProbe:
|
||||
httpGet:
|
||||
path: /health/live
|
||||
@@ -67,7 +69,7 @@ spec:
|
||||
containerPort: 8080
|
||||
env:
|
||||
- name: OTEL_EXPORTER_OTLP_ENDPOINT
|
||||
value: http://k8s-monitoring-alloy-receiver.monitoring.svc.cluster.local:4318
|
||||
value: http://k8s-monitoring-alloy.monitoring.svc.cluster.local:4318
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: schemas
|
||||
|
||||
Reference in New Issue
Block a user