feat: add latestSchema query for retrieving schema updates

Implements the `latestSchema` query to fetch the latest schema 
updates for an organization. This change enhances the GraphQL API by
allowing clients to retrieve the most recent schema version and its 
associated subgraphs. The resolver performs necessary access checks, 
logs relevant information, and generates the Cosmo router configuration 
from fetched subgraph SDLs, returning structured schema update details.
This commit is contained in:
2025-11-20 17:02:19 +01:00
parent 4d18cf4175
commit 9368d77bc8
8 changed files with 270 additions and 73 deletions
+9 -1
View File
@@ -24,9 +24,17 @@ RUN GOOS=linux GOARCH=amd64 go build \
FROM scratch as export
COPY --from=build /build/coverage.txt /
FROM scratch
FROM node:22-alpine
ENV TZ Europe/Stockholm
# Install wgc CLI globally for Cosmo Router composition
RUN npm install -g wgc@latest
# Copy timezone data and certificates
COPY --from=build /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy the service binary
COPY --from=build /release/service /
CMD ["/service"]
+3 -2
View File
@@ -9,6 +9,7 @@ require (
github.com/apex/log v1.9.0
github.com/auth0/go-jwt-middleware/v2 v2.3.0
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/uuid v1.6.0
github.com/jmoiron/sqlx v1.4.0
github.com/pkg/errors v0.9.1
github.com/pressly/goose/v3 v3.26.0
@@ -30,6 +31,7 @@ require (
go.opentelemetry.io/otel/sdk/log v0.14.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
gopkg.in/yaml.v3 v3.0.1
)
require (
@@ -41,7 +43,6 @@ require (
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
@@ -51,6 +52,7 @@ require (
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/sosodev/duration v1.3.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tidwall/gjson v1.17.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
@@ -72,5 +74,4 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
google.golang.org/grpc v1.75.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+2
View File
@@ -141,6 +141,8 @@ github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERA
github.com/sparetimecoders/goamqp v0.3.3 h1:z/nfTPmrjeU/rIVuNOgsVLCimp3WFoNFvS3ZzXRJ6HE=
github.com/sparetimecoders/goamqp v0.3.3/go.mod h1:W9NRCpWLE+Vruv2dcRSbszNil2O826d2Nv6kAkETW5o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+72 -20
View File
@@ -1,54 +1,106 @@
package graph
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"gopkg.in/yaml.v3"
"gitlab.com/unboundsoftware/schemas/graph/model"
)
// GenerateCosmoRouterConfig generates a Cosmo Router execution config from subgraphs
// using the official wgc CLI tool via npx
func GenerateCosmoRouterConfig(subGraphs []*model.SubGraph) (string, error) {
// Build the Cosmo router config structure
// This is a simplified version - you may need to adjust based on actual Cosmo requirements
config := map[string]interface{}{
"version": "1",
"subgraphs": convertSubGraphsToCosmo(subGraphs),
// Add other Cosmo-specific configuration as needed
if len(subGraphs) == 0 {
return "", fmt.Errorf("no subgraphs provided")
}
// Marshal to JSON
configJSON, err := json.MarshalIndent(config, "", " ")
// Create a temporary directory for composition
tmpDir, err := os.MkdirTemp("", "cosmo-compose-*")
if err != nil {
return "", fmt.Errorf("marshal cosmo config: %w", err)
return "", fmt.Errorf("create temp dir: %w", err)
}
defer os.RemoveAll(tmpDir)
// Write each subgraph SDL to a file
type SubgraphConfig struct {
Name string `yaml:"name"`
RoutingURL string `yaml:"routing_url,omitempty"`
Schema map[string]string `yaml:"schema"`
Subscription map[string]interface{} `yaml:"subscription,omitempty"`
}
return string(configJSON), nil
type InputConfig struct {
Version int `yaml:"version"`
Subgraphs []SubgraphConfig `yaml:"subgraphs"`
}
func convertSubGraphsToCosmo(subGraphs []*model.SubGraph) []map[string]interface{} {
cosmoSubgraphs := make([]map[string]interface{}, 0, len(subGraphs))
inputConfig := InputConfig{
Version: 1,
Subgraphs: make([]SubgraphConfig, 0, len(subGraphs)),
}
for _, sg := range subGraphs {
cosmoSg := map[string]interface{}{
"name": sg.Service,
"sdl": sg.Sdl,
// Write SDL to a temp file
schemaFile := filepath.Join(tmpDir, fmt.Sprintf("%s.graphql", sg.Service))
if err := os.WriteFile(schemaFile, []byte(sg.Sdl), 0o644); err != nil {
return "", fmt.Errorf("write schema file for %s: %w", sg.Service, err)
}
subgraphCfg := SubgraphConfig{
Name: sg.Service,
Schema: map[string]string{
"file": schemaFile,
},
}
if sg.URL != nil {
cosmoSg["routing_url"] = *sg.URL
subgraphCfg.RoutingURL = *sg.URL
}
if sg.WsURL != nil {
cosmoSg["subscription"] = map[string]interface{}{
subgraphCfg.Subscription = map[string]interface{}{
"url": *sg.WsURL,
"protocol": "ws",
"websocket_subprotocol": "graphql-ws",
}
}
cosmoSubgraphs = append(cosmoSubgraphs, cosmoSg)
inputConfig.Subgraphs = append(inputConfig.Subgraphs, subgraphCfg)
}
return cosmoSubgraphs
// Write input config YAML
inputFile := filepath.Join(tmpDir, "input.yaml")
inputYAML, err := yaml.Marshal(inputConfig)
if err != nil {
return "", fmt.Errorf("marshal input config: %w", err)
}
if err := os.WriteFile(inputFile, inputYAML, 0o644); err != nil {
return "", fmt.Errorf("write input config: %w", err)
}
// Execute wgc router compose
// wgc is installed globally in the Docker image
outputFile := filepath.Join(tmpDir, "config.json")
cmd := exec.Command("wgc", "router", "compose",
"--input", inputFile,
"--out", outputFile,
"--suppress-warnings",
)
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("wgc router compose failed: %w\nOutput: %s", err, string(output))
}
// Read the generated config
configJSON, err := os.ReadFile(outputFile)
if err != nil {
return "", fmt.Errorf("read output config: %w", err)
}
return string(configJSON), nil
}
-49
View File
@@ -203,55 +203,6 @@ func TestGenerateCosmoRouterConfig(t *testing.T) {
}
}
func TestConvertSubGraphsToCosmo(t *testing.T) {
tests := []struct {
name string
subGraphs []*model.SubGraph
wantLen int
validate func(t *testing.T, result []map[string]interface{})
}{
{
name: "preserves subgraph order",
subGraphs: []*model.SubGraph{
{Service: "alpha", URL: stringPtr("http://a"), Sdl: "a"},
{Service: "beta", URL: stringPtr("http://b"), Sdl: "b"},
{Service: "gamma", URL: stringPtr("http://c"), Sdl: "c"},
},
wantLen: 3,
validate: func(t *testing.T, result []map[string]interface{}) {
assert.Equal(t, "alpha", result[0]["name"])
assert.Equal(t, "beta", result[1]["name"])
assert.Equal(t, "gamma", result[2]["name"])
},
},
{
name: "includes SDL exactly as provided",
subGraphs: []*model.SubGraph{
{
Service: "test",
URL: stringPtr("http://test"),
Sdl: "type Query { special: String! }",
},
},
wantLen: 1,
validate: func(t *testing.T, result []map[string]interface{}) {
assert.Equal(t, "type Query { special: String! }", result[0]["sdl"])
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := convertSubGraphsToCosmo(tt.subGraphs)
assert.Len(t, result, tt.wantLen)
if tt.validate != nil {
tt.validate(t, result)
}
})
}
}
// Helper function for tests
func stringPtr(s string) *string {
return &s
+116
View File
@@ -74,6 +74,7 @@ type ComplexityRoot struct {
}
Query struct {
LatestSchema func(childComplexity int, ref string) int
Organizations func(childComplexity int) int
Supergraph func(childComplexity int, ref string, isAfter *string) int
}
@@ -124,6 +125,7 @@ type MutationResolver interface {
type QueryResolver interface {
Organizations(ctx context.Context) ([]*model.Organization, error)
Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error)
LatestSchema(ctx context.Context, ref string) (*model.SchemaUpdate, error)
}
type SubscriptionResolver interface {
SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error)
@@ -250,6 +252,17 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
return e.complexity.Organization.Users(childComplexity), true
case "Query.latestSchema":
if e.complexity.Query.LatestSchema == nil {
break
}
args, err := ec.field_Query_latestSchema_args(ctx, rawArgs)
if err != nil {
return 0, false
}
return e.complexity.Query.LatestSchema(childComplexity, args["ref"].(string)), true
case "Query.organizations":
if e.complexity.Query.Organizations == nil {
break
@@ -520,6 +533,7 @@ var sources = []*ast.Source{
{Name: "../schema.graphqls", Input: `type Query {
organizations: [Organization!]! @auth(user: true)
supergraph(ref: String!, isAfter: String): Supergraph! @auth(organization: true)
latestSchema(ref: String!): SchemaUpdate! @auth(organization: true)
}
type Mutation {
@@ -671,6 +685,17 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs
return args, nil
}
func (ec *executionContext) field_Query_latestSchema_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
var err error
args := map[string]any{}
arg0, err := graphql.ProcessArgField(ctx, rawArgs, "ref", ec.unmarshalNString2string)
if err != nil {
return nil, err
}
args["ref"] = arg0
return args, nil
}
func (ec *executionContext) field_Query_supergraph_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
var err error
args := map[string]any{}
@@ -1434,6 +1459,75 @@ func (ec *executionContext) fieldContext_Query_supergraph(ctx context.Context, f
return fc, nil
}
func (ec *executionContext) _Query_latestSchema(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
ec.OperationContext,
field,
ec.fieldContext_Query_latestSchema,
func(ctx context.Context) (any, error) {
fc := graphql.GetFieldContext(ctx)
return ec.resolvers.Query().LatestSchema(ctx, fc.Args["ref"].(string))
},
func(ctx context.Context, next graphql.Resolver) graphql.Resolver {
directive0 := next
directive1 := func(ctx context.Context) (any, error) {
organization, err := ec.unmarshalOBoolean2ᚖbool(ctx, true)
if err != nil {
var zeroVal *model.SchemaUpdate
return zeroVal, err
}
if ec.directives.Auth == nil {
var zeroVal *model.SchemaUpdate
return zeroVal, errors.New("directive auth is not implemented")
}
return ec.directives.Auth(ctx, nil, directive0, nil, organization)
}
next = directive1
return next
},
ec.marshalNSchemaUpdate2ᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate,
true,
true,
)
}
func (ec *executionContext) fieldContext_Query_latestSchema(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Query",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name {
case "ref":
return ec.fieldContext_SchemaUpdate_ref(ctx, field)
case "id":
return ec.fieldContext_SchemaUpdate_id(ctx, field)
case "subGraphs":
return ec.fieldContext_SchemaUpdate_subGraphs(ctx, field)
case "cosmoRouterConfig":
return ec.fieldContext_SchemaUpdate_cosmoRouterConfig(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type SchemaUpdate", field.Name)
},
}
defer func() {
if r := recover(); r != nil {
err = ec.Recover(ctx, r)
ec.Error(ctx, err)
}
}()
ctx = graphql.WithFieldContext(ctx, fc)
if fc.Args, err = ec.field_Query_latestSchema_args(ctx, field.ArgumentMap(ec.Variables)); err != nil {
ec.Error(ctx, err)
return fc, err
}
return fc, nil
}
func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
@@ -3997,6 +4091,28 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) })
case "latestSchema":
field := field
innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Query_latestSchema(ctx, field)
if res == graphql.Null {
atomic.AddUint32(&fs.Invalids, 1)
}
return res
}
rrm := func(ctx context.Context) graphql.Marshaler {
return ec.OperationContext.RootResolverMiddleware(ctx,
func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) })
}
out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) })
case "__type":
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
+1
View File
@@ -1,6 +1,7 @@
type Query {
organizations: [Organization!]! @auth(user: true)
supergraph(ref: String!, isAfter: String): Supergraph! @auth(organization: true)
latestSchema(ref: String!): SchemaUpdate! @auth(organization: true)
}
type Mutation {
+66
View File
@@ -238,6 +238,72 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
}, nil
}
// LatestSchema is the resolver for the latestSchema field.
func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.SchemaUpdate, error) {
orgId := middleware.OrganizationFromContext(ctx)
r.Logger.Info("LatestSchema query",
"ref", ref,
"orgId", orgId,
)
_, err := r.apiKeyCanAccessRef(ctx, ref, false)
if err != nil {
r.Logger.Error("API key cannot access ref", "error", err, "ref", ref)
return nil, err
}
// Get current services and schema
services, lastUpdate := r.Cache.Services(orgId, ref, "")
r.Logger.Info("Fetching latest schema",
"ref", ref,
"orgId", orgId,
"lastUpdate", lastUpdate,
"servicesCount", len(services),
)
subGraphs := make([]*model.SubGraph, len(services))
for i, id := range services {
sg, err := r.fetchSubGraph(ctx, id)
if err != nil {
r.Logger.Error("fetch subgraph", "error", err, "id", id)
return nil, err
}
subGraphs[i] = &model.SubGraph{
ID: sg.ID.String(),
Service: sg.Service,
URL: sg.Url,
WsURL: sg.WSUrl,
Sdl: sg.Sdl,
ChangedBy: sg.ChangedBy,
ChangedAt: sg.ChangedAt,
}
}
// Generate Cosmo router config
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
if err != nil {
r.Logger.Error("generate cosmo config", "error", err)
cosmoConfig = "" // Return empty if generation fails
}
update := &model.SchemaUpdate{
Ref: ref,
ID: lastUpdate,
SubGraphs: subGraphs,
CosmoRouterConfig: &cosmoConfig,
}
r.Logger.Info("Latest schema fetched",
"ref", update.Ref,
"id", update.ID,
"subGraphsCount", len(update.SubGraphs),
"cosmoConfigLength", len(cosmoConfig),
)
return update, nil
}
// SchemaUpdates is the resolver for the schemaUpdates field.
func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) {
orgId := middleware.OrganizationFromContext(ctx)