80daed081d
Creates a new `GenerateCosmoRouterConfig` function to build and serialize a Cosmo Router configuration from subgraphs. Implements PubSub mechanism for managing schema updates, allowing subscription to updates. Adds Subscription resolver and updates existing structures to accommodate new functionalities. This enhances the system's capabilities for dynamic updates and configuration management.
296 lines
8.2 KiB
Go
296 lines
8.2 KiB
Go
package graph
|
|
|
|
// This file will be automatically regenerated based on the schema, any resolver implementations
|
|
// will be copied through when generating and any unknown code will be moved to the end.
|
|
// Code generated by github.com/99designs/gqlgen
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
|
|
|
"gitlab.com/unboundsoftware/schemas/domain"
|
|
"gitlab.com/unboundsoftware/schemas/graph/generated"
|
|
"gitlab.com/unboundsoftware/schemas/graph/model"
|
|
"gitlab.com/unboundsoftware/schemas/middleware"
|
|
"gitlab.com/unboundsoftware/schemas/rand"
|
|
"gitlab.com/unboundsoftware/schemas/sdlmerge"
|
|
)
|
|
|
|
// AddOrganization is the resolver for the addOrganization field.
|
|
func (r *mutationResolver) AddOrganization(ctx context.Context, name string) (*model.Organization, error) {
|
|
sub := middleware.UserFromContext(ctx)
|
|
org := &domain.Organization{}
|
|
h, err := r.handler(ctx, org)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = h.Handle(ctx, &domain.AddOrganization{
|
|
Name: name,
|
|
Initiator: sub,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ToGqlOrganization(*org), nil
|
|
}
|
|
|
|
// AddAPIKey is the resolver for the addAPIKey field.
|
|
func (r *mutationResolver) AddAPIKey(ctx context.Context, input *model.InputAPIKey) (*model.APIKey, error) {
|
|
sub := middleware.UserFromContext(ctx)
|
|
org := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(input.OrganizationID)}
|
|
h, err := r.handler(ctx, org)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
key := fmt.Sprintf("us_ak_%s", rand.String(16))
|
|
_, err = h.Handle(ctx, &domain.AddAPIKey{
|
|
Name: input.Name,
|
|
Key: key,
|
|
Refs: input.Refs,
|
|
Read: input.Read,
|
|
Publish: input.Publish,
|
|
Initiator: sub,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &model.APIKey{
|
|
ID: apiKeyId(input.OrganizationID, input.Name),
|
|
Name: input.Name,
|
|
Key: &key,
|
|
Organization: &model.Organization{
|
|
ID: input.OrganizationID,
|
|
Name: org.Name,
|
|
},
|
|
Refs: input.Refs,
|
|
Read: input.Read,
|
|
Publish: input.Publish,
|
|
}, nil
|
|
}
|
|
|
|
// UpdateSubGraph is the resolver for the updateSubGraph field.
|
|
func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.InputSubGraph) (*model.SubGraph, error) {
|
|
orgId := middleware.OrganizationFromContext(ctx)
|
|
name, err := r.apiKeyCanAccessRef(ctx, input.Ref, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
subGraphId := r.Cache.SubGraphId(orgId, input.Ref, input.Service)
|
|
subGraph := &domain.SubGraph{}
|
|
if subGraphId != "" {
|
|
subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId)
|
|
}
|
|
handler, err := r.handler(ctx, subGraph)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if strings.TrimSpace(input.Sdl) == strings.TrimSpace(subGraph.Sdl) &&
|
|
r.stringEqual(input.URL, subGraph.Url) &&
|
|
r.stringEqual(input.WsURL, subGraph.WSUrl) {
|
|
return r.toGqlSubGraph(subGraph), nil
|
|
}
|
|
serviceSDLs := []string{input.Sdl}
|
|
services, _ := r.Cache.Services(orgId, input.Ref, "")
|
|
for _, id := range services {
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if sg.Service != input.Service {
|
|
serviceSDLs = append(serviceSDLs, sg.Sdl)
|
|
}
|
|
}
|
|
_, err = sdlmerge.MergeSDLs(serviceSDLs...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = handler.Handle(ctx, domain.UpdateSubGraph{
|
|
OrganizationId: orgId,
|
|
Ref: input.Ref,
|
|
Service: input.Service,
|
|
Url: input.URL,
|
|
WSUrl: input.WsURL,
|
|
Sdl: input.Sdl,
|
|
Initiator: name,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Publish schema update to subscribers
|
|
go func() {
|
|
services, lastUpdate := r.Cache.Services(orgId, input.Ref, "")
|
|
subGraphs := make([]*model.SubGraph, len(services))
|
|
for i, id := range services {
|
|
sg, err := r.fetchSubGraph(context.Background(), id)
|
|
if err != nil {
|
|
r.Logger.Error("fetch subgraph for update notification", "error", err)
|
|
continue
|
|
}
|
|
subGraphs[i] = &model.SubGraph{
|
|
ID: sg.ID.String(),
|
|
Service: sg.Service,
|
|
URL: sg.Url,
|
|
WsURL: sg.WSUrl,
|
|
Sdl: sg.Sdl,
|
|
ChangedBy: sg.ChangedBy,
|
|
ChangedAt: sg.ChangedAt,
|
|
}
|
|
}
|
|
|
|
// Generate Cosmo router config
|
|
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
|
if err != nil {
|
|
r.Logger.Error("generate cosmo config for update", "error", err)
|
|
cosmoConfig = "" // Send empty if generation fails
|
|
}
|
|
|
|
// Publish to all subscribers of this ref
|
|
r.PubSub.Publish(input.Ref, &model.SchemaUpdate{
|
|
Ref: input.Ref,
|
|
ID: lastUpdate,
|
|
SubGraphs: subGraphs,
|
|
CosmoRouterConfig: &cosmoConfig,
|
|
})
|
|
}()
|
|
|
|
return r.toGqlSubGraph(subGraph), nil
|
|
}
|
|
|
|
// Organizations is the resolver for the organizations field.
|
|
func (r *queryResolver) Organizations(ctx context.Context) ([]*model.Organization, error) {
|
|
sub := middleware.UserFromContext(ctx)
|
|
orgs := r.Cache.OrganizationsByUser(sub)
|
|
return ToGqlOrganizations(orgs), nil
|
|
}
|
|
|
|
// Supergraph is the resolver for the supergraph field.
|
|
func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error) {
|
|
orgId := middleware.OrganizationFromContext(ctx)
|
|
_, err := r.apiKeyCanAccessRef(ctx, ref, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
after := ""
|
|
if isAfter != nil {
|
|
after = *isAfter
|
|
}
|
|
services, lastUpdate := r.Cache.Services(orgId, ref, after)
|
|
if after == lastUpdate {
|
|
return &model.Unchanged{
|
|
ID: lastUpdate,
|
|
MinDelaySeconds: 10,
|
|
}, nil
|
|
}
|
|
subGraphs := make([]*model.SubGraph, len(services))
|
|
for i, id := range services {
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
subGraphs[i] = &model.SubGraph{
|
|
ID: sg.ID.String(),
|
|
Service: sg.Service,
|
|
URL: sg.Url,
|
|
WsURL: sg.WSUrl,
|
|
Sdl: sg.Sdl,
|
|
ChangedBy: sg.ChangedBy,
|
|
ChangedAt: sg.ChangedAt,
|
|
}
|
|
}
|
|
|
|
var serviceSDLs []string
|
|
for _, id := range services {
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
serviceSDLs = append(serviceSDLs, sg.Sdl)
|
|
}
|
|
sdl, err := sdlmerge.MergeSDLs(serviceSDLs...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &model.SubGraphs{
|
|
ID: lastUpdate,
|
|
SubGraphs: subGraphs,
|
|
Sdl: sdl,
|
|
MinDelaySeconds: 10,
|
|
}, nil
|
|
}
|
|
|
|
// SchemaUpdates is the resolver for the schemaUpdates field.
|
|
func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) {
|
|
orgId := middleware.OrganizationFromContext(ctx)
|
|
_, err := r.apiKeyCanAccessRef(ctx, ref, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Subscribe to updates for this ref
|
|
ch := r.PubSub.Subscribe(ref)
|
|
|
|
// Send initial state immediately
|
|
go func() {
|
|
services, lastUpdate := r.Cache.Services(orgId, ref, "")
|
|
subGraphs := make([]*model.SubGraph, len(services))
|
|
for i, id := range services {
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
|
if err != nil {
|
|
r.Logger.Error("fetch subgraph for initial update", "error", err)
|
|
continue
|
|
}
|
|
subGraphs[i] = &model.SubGraph{
|
|
ID: sg.ID.String(),
|
|
Service: sg.Service,
|
|
URL: sg.Url,
|
|
WsURL: sg.WSUrl,
|
|
Sdl: sg.Sdl,
|
|
ChangedBy: sg.ChangedBy,
|
|
ChangedAt: sg.ChangedAt,
|
|
}
|
|
}
|
|
|
|
// Generate Cosmo router config
|
|
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
|
if err != nil {
|
|
r.Logger.Error("generate cosmo config", "error", err)
|
|
cosmoConfig = "" // Send empty if generation fails
|
|
}
|
|
|
|
// Send initial update
|
|
ch <- &model.SchemaUpdate{
|
|
Ref: ref,
|
|
ID: lastUpdate,
|
|
SubGraphs: subGraphs,
|
|
CosmoRouterConfig: &cosmoConfig,
|
|
}
|
|
}()
|
|
|
|
// Clean up subscription when context is done
|
|
go func() {
|
|
<-ctx.Done()
|
|
r.PubSub.Unsubscribe(ref, ch)
|
|
}()
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
// Mutation returns generated.MutationResolver implementation.
|
|
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
|
|
|
|
// Query returns generated.QueryResolver implementation.
|
|
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
|
|
|
// Subscription returns generated.SubscriptionResolver implementation.
|
|
func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} }
|
|
|
|
type (
|
|
mutationResolver struct{ *Resolver }
|
|
queryResolver struct{ *Resolver }
|
|
subscriptionResolver struct{ *Resolver }
|
|
)
|