From 9368d77bc81bfcb9b8fe33965b7652e3ae970131 Mon Sep 17 00:00:00 2001 From: Joakim Olsson Date: Thu, 20 Nov 2025 17:02:19 +0100 Subject: [PATCH] feat: add latestSchema query for retrieving schema updates Implements the `latestSchema` query to fetch the latest schema updates for an organization. This change enhances the GraphQL API by allowing clients to retrieve the most recent schema version and its associated subgraphs. The resolver performs necessary access checks, logs relevant information, and generates the Cosmo router configuration from fetched subgraph SDLs, returning structured schema update details. --- Dockerfile | 10 ++- go.mod | 5 +- go.sum | 2 + graph/cosmo.go | 94 +++++++++++++++++++++------- graph/cosmo_test.go | 49 --------------- graph/generated/generated.go | 116 +++++++++++++++++++++++++++++++++++ graph/schema.graphqls | 1 + graph/schema.resolvers.go | 66 ++++++++++++++++++++ 8 files changed, 270 insertions(+), 73 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3512d54..67e36fd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,9 +24,17 @@ RUN GOOS=linux GOARCH=amd64 go build \ FROM scratch as export COPY --from=build /build/coverage.txt / -FROM scratch +FROM node:22-alpine ENV TZ Europe/Stockholm + +# Install wgc CLI globally for Cosmo Router composition +RUN npm install -g wgc@latest + +# Copy timezone data and certificates COPY --from=build /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ + +# Copy the service binary COPY --from=build /release/service / + CMD ["/service"] diff --git a/go.mod b/go.mod index 127627f..6a4d2f0 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/apex/log v1.9.0 github.com/auth0/go-jwt-middleware/v2 v2.3.0 github.com/golang-jwt/jwt/v5 v5.3.0 + github.com/google/uuid v1.6.0 github.com/jmoiron/sqlx v1.4.0 github.com/pkg/errors v0.9.1 github.com/pressly/goose/v3 v3.26.0 @@ -30,6 +31,7 @@ require ( go.opentelemetry.io/otel/sdk/log v0.14.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -41,7 +43,6 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect @@ -51,6 +52,7 @@ require ( github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/sethvargo/go-retry v0.3.0 // indirect github.com/sosodev/duration v1.3.1 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/gjson v1.17.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect @@ -72,5 +74,4 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect google.golang.org/grpc v1.75.0 // indirect google.golang.org/protobuf v1.36.10 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fd5c8e7..3abb5fe 100644 --- a/go.sum +++ b/go.sum @@ -141,6 +141,8 @@ github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERA github.com/sparetimecoders/goamqp v0.3.3 h1:z/nfTPmrjeU/rIVuNOgsVLCimp3WFoNFvS3ZzXRJ6HE= github.com/sparetimecoders/goamqp v0.3.3/go.mod h1:W9NRCpWLE+Vruv2dcRSbszNil2O826d2Nv6kAkETW5o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= diff --git a/graph/cosmo.go b/graph/cosmo.go index 150e162..69deb88 100644 --- a/graph/cosmo.go +++ b/graph/cosmo.go @@ -1,54 +1,106 @@ package graph import ( - "encoding/json" "fmt" + "os" + "os/exec" + "path/filepath" + + "gopkg.in/yaml.v3" "gitlab.com/unboundsoftware/schemas/graph/model" ) // GenerateCosmoRouterConfig generates a Cosmo Router execution config from subgraphs +// using the official wgc CLI tool via npx func GenerateCosmoRouterConfig(subGraphs []*model.SubGraph) (string, error) { - // Build the Cosmo router config structure - // This is a simplified version - you may need to adjust based on actual Cosmo requirements - config := map[string]interface{}{ - "version": "1", - "subgraphs": convertSubGraphsToCosmo(subGraphs), - // Add other Cosmo-specific configuration as needed + if len(subGraphs) == 0 { + return "", fmt.Errorf("no subgraphs provided") } - // Marshal to JSON - configJSON, err := json.MarshalIndent(config, "", " ") + // Create a temporary directory for composition + tmpDir, err := os.MkdirTemp("", "cosmo-compose-*") if err != nil { - return "", fmt.Errorf("marshal cosmo config: %w", err) + return "", fmt.Errorf("create temp dir: %w", err) + } + defer os.RemoveAll(tmpDir) + + // Write each subgraph SDL to a file + type SubgraphConfig struct { + Name string `yaml:"name"` + RoutingURL string `yaml:"routing_url,omitempty"` + Schema map[string]string `yaml:"schema"` + Subscription map[string]interface{} `yaml:"subscription,omitempty"` } - return string(configJSON), nil -} + type InputConfig struct { + Version int `yaml:"version"` + Subgraphs []SubgraphConfig `yaml:"subgraphs"` + } -func convertSubGraphsToCosmo(subGraphs []*model.SubGraph) []map[string]interface{} { - cosmoSubgraphs := make([]map[string]interface{}, 0, len(subGraphs)) + inputConfig := InputConfig{ + Version: 1, + Subgraphs: make([]SubgraphConfig, 0, len(subGraphs)), + } for _, sg := range subGraphs { - cosmoSg := map[string]interface{}{ - "name": sg.Service, - "sdl": sg.Sdl, + // Write SDL to a temp file + schemaFile := filepath.Join(tmpDir, fmt.Sprintf("%s.graphql", sg.Service)) + if err := os.WriteFile(schemaFile, []byte(sg.Sdl), 0o644); err != nil { + return "", fmt.Errorf("write schema file for %s: %w", sg.Service, err) + } + + subgraphCfg := SubgraphConfig{ + Name: sg.Service, + Schema: map[string]string{ + "file": schemaFile, + }, } if sg.URL != nil { - cosmoSg["routing_url"] = *sg.URL + subgraphCfg.RoutingURL = *sg.URL } if sg.WsURL != nil { - cosmoSg["subscription"] = map[string]interface{}{ + subgraphCfg.Subscription = map[string]interface{}{ "url": *sg.WsURL, "protocol": "ws", "websocket_subprotocol": "graphql-ws", } } - cosmoSubgraphs = append(cosmoSubgraphs, cosmoSg) + inputConfig.Subgraphs = append(inputConfig.Subgraphs, subgraphCfg) } - return cosmoSubgraphs + // Write input config YAML + inputFile := filepath.Join(tmpDir, "input.yaml") + inputYAML, err := yaml.Marshal(inputConfig) + if err != nil { + return "", fmt.Errorf("marshal input config: %w", err) + } + if err := os.WriteFile(inputFile, inputYAML, 0o644); err != nil { + return "", fmt.Errorf("write input config: %w", err) + } + + // Execute wgc router compose + // wgc is installed globally in the Docker image + outputFile := filepath.Join(tmpDir, "config.json") + cmd := exec.Command("wgc", "router", "compose", + "--input", inputFile, + "--out", outputFile, + "--suppress-warnings", + ) + + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("wgc router compose failed: %w\nOutput: %s", err, string(output)) + } + + // Read the generated config + configJSON, err := os.ReadFile(outputFile) + if err != nil { + return "", fmt.Errorf("read output config: %w", err) + } + + return string(configJSON), nil } diff --git a/graph/cosmo_test.go b/graph/cosmo_test.go index 0f6de36..60cd3a0 100644 --- a/graph/cosmo_test.go +++ b/graph/cosmo_test.go @@ -203,55 +203,6 @@ func TestGenerateCosmoRouterConfig(t *testing.T) { } } -func TestConvertSubGraphsToCosmo(t *testing.T) { - tests := []struct { - name string - subGraphs []*model.SubGraph - wantLen int - validate func(t *testing.T, result []map[string]interface{}) - }{ - { - name: "preserves subgraph order", - subGraphs: []*model.SubGraph{ - {Service: "alpha", URL: stringPtr("http://a"), Sdl: "a"}, - {Service: "beta", URL: stringPtr("http://b"), Sdl: "b"}, - {Service: "gamma", URL: stringPtr("http://c"), Sdl: "c"}, - }, - wantLen: 3, - validate: func(t *testing.T, result []map[string]interface{}) { - assert.Equal(t, "alpha", result[0]["name"]) - assert.Equal(t, "beta", result[1]["name"]) - assert.Equal(t, "gamma", result[2]["name"]) - }, - }, - { - name: "includes SDL exactly as provided", - subGraphs: []*model.SubGraph{ - { - Service: "test", - URL: stringPtr("http://test"), - Sdl: "type Query { special: String! }", - }, - }, - wantLen: 1, - validate: func(t *testing.T, result []map[string]interface{}) { - assert.Equal(t, "type Query { special: String! }", result[0]["sdl"]) - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := convertSubGraphsToCosmo(tt.subGraphs) - assert.Len(t, result, tt.wantLen) - - if tt.validate != nil { - tt.validate(t, result) - } - }) - } -} - // Helper function for tests func stringPtr(s string) *string { return &s diff --git a/graph/generated/generated.go b/graph/generated/generated.go index b4e5fec..7ba0598 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -74,6 +74,7 @@ type ComplexityRoot struct { } Query struct { + LatestSchema func(childComplexity int, ref string) int Organizations func(childComplexity int) int Supergraph func(childComplexity int, ref string, isAfter *string) int } @@ -124,6 +125,7 @@ type MutationResolver interface { type QueryResolver interface { Organizations(ctx context.Context) ([]*model.Organization, error) Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error) + LatestSchema(ctx context.Context, ref string) (*model.SchemaUpdate, error) } type SubscriptionResolver interface { SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) @@ -250,6 +252,17 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.Organization.Users(childComplexity), true + case "Query.latestSchema": + if e.complexity.Query.LatestSchema == nil { + break + } + + args, err := ec.field_Query_latestSchema_args(ctx, rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.LatestSchema(childComplexity, args["ref"].(string)), true case "Query.organizations": if e.complexity.Query.Organizations == nil { break @@ -520,6 +533,7 @@ var sources = []*ast.Source{ {Name: "../schema.graphqls", Input: `type Query { organizations: [Organization!]! @auth(user: true) supergraph(ref: String!, isAfter: String): Supergraph! @auth(organization: true) + latestSchema(ref: String!): SchemaUpdate! @auth(organization: true) } type Mutation { @@ -671,6 +685,17 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_latestSchema_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { + var err error + args := map[string]any{} + arg0, err := graphql.ProcessArgField(ctx, rawArgs, "ref", ec.unmarshalNString2string) + if err != nil { + return nil, err + } + args["ref"] = arg0 + return args, nil +} + func (ec *executionContext) field_Query_supergraph_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} @@ -1434,6 +1459,75 @@ func (ec *executionContext) fieldContext_Query_supergraph(ctx context.Context, f return fc, nil } +func (ec *executionContext) _Query_latestSchema(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_Query_latestSchema, + func(ctx context.Context) (any, error) { + fc := graphql.GetFieldContext(ctx) + return ec.resolvers.Query().LatestSchema(ctx, fc.Args["ref"].(string)) + }, + func(ctx context.Context, next graphql.Resolver) graphql.Resolver { + directive0 := next + + directive1 := func(ctx context.Context) (any, error) { + organization, err := ec.unmarshalOBoolean2ᚖbool(ctx, true) + if err != nil { + var zeroVal *model.SchemaUpdate + return zeroVal, err + } + if ec.directives.Auth == nil { + var zeroVal *model.SchemaUpdate + return zeroVal, errors.New("directive auth is not implemented") + } + return ec.directives.Auth(ctx, nil, directive0, nil, organization) + } + + next = directive1 + return next + }, + ec.marshalNSchemaUpdate2ᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_Query_latestSchema(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "ref": + return ec.fieldContext_SchemaUpdate_ref(ctx, field) + case "id": + return ec.fieldContext_SchemaUpdate_id(ctx, field) + case "subGraphs": + return ec.fieldContext_SchemaUpdate_subGraphs(ctx, field) + case "cosmoRouterConfig": + return ec.fieldContext_SchemaUpdate_cosmoRouterConfig(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type SchemaUpdate", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_latestSchema_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -3997,6 +4091,28 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "latestSchema": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_latestSchema(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) case "__type": out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { diff --git a/graph/schema.graphqls b/graph/schema.graphqls index 97d82cb..ad1df55 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -1,6 +1,7 @@ type Query { organizations: [Organization!]! @auth(user: true) supergraph(ref: String!, isAfter: String): Supergraph! @auth(organization: true) + latestSchema(ref: String!): SchemaUpdate! @auth(organization: true) } type Mutation { diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 00f6202..b426df5 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -238,6 +238,72 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str }, nil } +// LatestSchema is the resolver for the latestSchema field. +func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.SchemaUpdate, error) { + orgId := middleware.OrganizationFromContext(ctx) + + r.Logger.Info("LatestSchema query", + "ref", ref, + "orgId", orgId, + ) + + _, err := r.apiKeyCanAccessRef(ctx, ref, false) + if err != nil { + r.Logger.Error("API key cannot access ref", "error", err, "ref", ref) + return nil, err + } + + // Get current services and schema + services, lastUpdate := r.Cache.Services(orgId, ref, "") + r.Logger.Info("Fetching latest schema", + "ref", ref, + "orgId", orgId, + "lastUpdate", lastUpdate, + "servicesCount", len(services), + ) + + subGraphs := make([]*model.SubGraph, len(services)) + for i, id := range services { + sg, err := r.fetchSubGraph(ctx, id) + if err != nil { + r.Logger.Error("fetch subgraph", "error", err, "id", id) + return nil, err + } + subGraphs[i] = &model.SubGraph{ + ID: sg.ID.String(), + Service: sg.Service, + URL: sg.Url, + WsURL: sg.WSUrl, + Sdl: sg.Sdl, + ChangedBy: sg.ChangedBy, + ChangedAt: sg.ChangedAt, + } + } + + // Generate Cosmo router config + cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs) + if err != nil { + r.Logger.Error("generate cosmo config", "error", err) + cosmoConfig = "" // Return empty if generation fails + } + + update := &model.SchemaUpdate{ + Ref: ref, + ID: lastUpdate, + SubGraphs: subGraphs, + CosmoRouterConfig: &cosmoConfig, + } + + r.Logger.Info("Latest schema fetched", + "ref", update.Ref, + "id", update.ID, + "subGraphsCount", len(update.SubGraphs), + "cosmoConfigLength", len(cosmoConfig), + ) + + return update, nil +} + // SchemaUpdates is the resolver for the schemaUpdates field. func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) { orgId := middleware.OrganizationFromContext(ctx)