2022-10-09 15:23:52 +02:00
|
|
|
package graph
|
|
|
|
|
|
|
|
|
|
// This file will be automatically regenerated based on the schema, any resolver implementations
|
|
|
|
|
// will be copied through when generating and any unknown code will be moved to the end.
|
2023-04-21 06:24:38 +00:00
|
|
|
// Code generated by github.com/99designs/gqlgen
|
2022-10-09 15:23:52 +02:00
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2022-10-14 22:41:56 +02:00
|
|
|
"fmt"
|
|
|
|
|
"strings"
|
2022-10-09 15:23:52 +02:00
|
|
|
|
|
|
|
|
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
|
|
|
|
|
|
|
|
|
"gitlab.com/unboundsoftware/schemas/domain"
|
|
|
|
|
"gitlab.com/unboundsoftware/schemas/graph/generated"
|
|
|
|
|
"gitlab.com/unboundsoftware/schemas/graph/model"
|
2023-04-27 07:09:10 +02:00
|
|
|
"gitlab.com/unboundsoftware/schemas/middleware"
|
|
|
|
|
"gitlab.com/unboundsoftware/schemas/rand"
|
2025-02-28 13:10:07 +01:00
|
|
|
"gitlab.com/unboundsoftware/schemas/sdlmerge"
|
2022-10-09 15:23:52 +02:00
|
|
|
)
|
|
|
|
|
|
2023-04-27 07:09:10 +02:00
|
|
|
// AddOrganization is the resolver for the addOrganization field.
|
|
|
|
|
func (r *mutationResolver) AddOrganization(ctx context.Context, name string) (*model.Organization, error) {
|
|
|
|
|
sub := middleware.UserFromContext(ctx)
|
|
|
|
|
org := &domain.Organization{}
|
|
|
|
|
h, err := r.handler(ctx, org)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
_, err = h.Handle(ctx, &domain.AddOrganization{
|
|
|
|
|
Name: name,
|
|
|
|
|
Initiator: sub,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return ToGqlOrganization(*org), nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-22 18:37:07 +01:00
|
|
|
// AddUserToOrganization is the resolver for the addUserToOrganization field.
|
|
|
|
|
func (r *mutationResolver) AddUserToOrganization(ctx context.Context, organizationID string, userID string) (*model.Organization, error) {
|
|
|
|
|
sub := middleware.UserFromContext(ctx)
|
|
|
|
|
org := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(organizationID)}
|
|
|
|
|
h, err := r.handler(ctx, org)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
_, err = h.Handle(ctx, &domain.AddUserToOrganization{
|
|
|
|
|
UserId: userID,
|
|
|
|
|
Initiator: sub,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return ToGqlOrganization(*org), nil
|
|
|
|
|
}
|
|
|
|
|
|
2023-04-27 07:09:10 +02:00
|
|
|
// AddAPIKey is the resolver for the addAPIKey field.
|
|
|
|
|
func (r *mutationResolver) AddAPIKey(ctx context.Context, input *model.InputAPIKey) (*model.APIKey, error) {
|
|
|
|
|
sub := middleware.UserFromContext(ctx)
|
|
|
|
|
org := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(input.OrganizationID)}
|
|
|
|
|
h, err := r.handler(ctx, org)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
key := fmt.Sprintf("us_ak_%s", rand.String(16))
|
|
|
|
|
_, err = h.Handle(ctx, &domain.AddAPIKey{
|
|
|
|
|
Name: input.Name,
|
|
|
|
|
Key: key,
|
|
|
|
|
Refs: input.Refs,
|
|
|
|
|
Read: input.Read,
|
|
|
|
|
Publish: input.Publish,
|
|
|
|
|
Initiator: sub,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return &model.APIKey{
|
|
|
|
|
ID: apiKeyId(input.OrganizationID, input.Name),
|
|
|
|
|
Name: input.Name,
|
|
|
|
|
Key: &key,
|
|
|
|
|
Organization: &model.Organization{
|
|
|
|
|
ID: input.OrganizationID,
|
|
|
|
|
Name: org.Name,
|
|
|
|
|
},
|
|
|
|
|
Refs: input.Refs,
|
|
|
|
|
Read: input.Read,
|
|
|
|
|
Publish: input.Publish,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-22 18:37:07 +01:00
|
|
|
// RemoveAPIKey is the resolver for the removeAPIKey field.
|
|
|
|
|
func (r *mutationResolver) RemoveAPIKey(ctx context.Context, organizationID string, keyName string) (*model.Organization, error) {
|
|
|
|
|
sub := middleware.UserFromContext(ctx)
|
|
|
|
|
org := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(organizationID)}
|
|
|
|
|
h, err := r.handler(ctx, org)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
_, err = h.Handle(ctx, &domain.RemoveAPIKey{
|
|
|
|
|
KeyName: keyName,
|
|
|
|
|
Initiator: sub,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return ToGqlOrganization(*org), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RemoveOrganization is the resolver for the removeOrganization field.
|
|
|
|
|
func (r *mutationResolver) RemoveOrganization(ctx context.Context, organizationID string) (bool, error) {
|
|
|
|
|
sub := middleware.UserFromContext(ctx)
|
|
|
|
|
org := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(organizationID)}
|
|
|
|
|
h, err := r.handler(ctx, org)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
|
_, err = h.Handle(ctx, &domain.RemoveOrganization{
|
|
|
|
|
Initiator: sub,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
|
return true, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-09 15:23:52 +02:00
|
|
|
// UpdateSubGraph is the resolver for the updateSubGraph field.
|
|
|
|
|
func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.InputSubGraph) (*model.SubGraph, error) {
|
2023-04-27 07:09:10 +02:00
|
|
|
orgId := middleware.OrganizationFromContext(ctx)
|
2023-05-29 22:13:25 +02:00
|
|
|
name, err := r.apiKeyCanAccessRef(ctx, input.Ref, true)
|
2023-04-27 07:09:10 +02:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
subGraphId := r.Cache.SubGraphId(orgId, input.Ref, input.Service)
|
2022-10-09 15:23:52 +02:00
|
|
|
subGraph := &domain.SubGraph{}
|
|
|
|
|
if subGraphId != "" {
|
|
|
|
|
subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId)
|
|
|
|
|
}
|
2022-12-17 14:36:42 +01:00
|
|
|
handler, err := r.handler(ctx, subGraph)
|
2022-10-09 15:23:52 +02:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2022-10-17 14:30:48 +02:00
|
|
|
if strings.TrimSpace(input.Sdl) == strings.TrimSpace(subGraph.Sdl) &&
|
|
|
|
|
r.stringEqual(input.URL, subGraph.Url) &&
|
|
|
|
|
r.stringEqual(input.WsURL, subGraph.WSUrl) {
|
2022-10-14 22:41:56 +02:00
|
|
|
return r.toGqlSubGraph(subGraph), nil
|
|
|
|
|
}
|
2022-10-09 15:23:52 +02:00
|
|
|
serviceSDLs := []string{input.Sdl}
|
2023-04-27 07:09:10 +02:00
|
|
|
services, _ := r.Cache.Services(orgId, input.Ref, "")
|
2022-10-14 22:41:56 +02:00
|
|
|
for _, id := range services {
|
2022-12-17 14:36:42 +01:00
|
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
2022-10-09 15:23:52 +02:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if sg.Service != input.Service {
|
|
|
|
|
serviceSDLs = append(serviceSDLs, sg.Sdl)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_, err = sdlmerge.MergeSDLs(serviceSDLs...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2022-12-17 14:36:42 +01:00
|
|
|
_, err = handler.Handle(ctx, domain.UpdateSubGraph{
|
2023-04-27 07:09:10 +02:00
|
|
|
OrganizationId: orgId,
|
|
|
|
|
Ref: input.Ref,
|
|
|
|
|
Service: input.Service,
|
|
|
|
|
Url: input.URL,
|
|
|
|
|
WSUrl: input.WsURL,
|
|
|
|
|
Sdl: input.Sdl,
|
2023-05-29 22:13:25 +02:00
|
|
|
Initiator: name,
|
2022-10-09 15:23:52 +02:00
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2025-11-19 11:29:30 +01:00
|
|
|
|
|
|
|
|
// Publish schema update to subscribers
|
|
|
|
|
go func() {
|
|
|
|
|
services, lastUpdate := r.Cache.Services(orgId, input.Ref, "")
|
2025-11-20 08:09:00 +01:00
|
|
|
r.Logger.Info("Publishing schema update after subgraph change",
|
|
|
|
|
"ref", input.Ref,
|
|
|
|
|
"orgId", orgId,
|
|
|
|
|
"lastUpdate", lastUpdate,
|
|
|
|
|
"servicesCount", len(services),
|
|
|
|
|
)
|
|
|
|
|
|
2025-11-19 11:29:30 +01:00
|
|
|
subGraphs := make([]*model.SubGraph, len(services))
|
|
|
|
|
for i, id := range services {
|
|
|
|
|
sg, err := r.fetchSubGraph(context.Background(), id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.Logger.Error("fetch subgraph for update notification", "error", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
subGraphs[i] = &model.SubGraph{
|
|
|
|
|
ID: sg.ID.String(),
|
|
|
|
|
Service: sg.Service,
|
|
|
|
|
URL: sg.Url,
|
|
|
|
|
WsURL: sg.WSUrl,
|
|
|
|
|
Sdl: sg.Sdl,
|
|
|
|
|
ChangedBy: sg.ChangedBy,
|
|
|
|
|
ChangedAt: sg.ChangedAt,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Generate Cosmo router config
|
|
|
|
|
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.Logger.Error("generate cosmo config for update", "error", err)
|
|
|
|
|
cosmoConfig = "" // Send empty if generation fails
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Publish to all subscribers of this ref
|
2025-11-20 08:09:00 +01:00
|
|
|
update := &model.SchemaUpdate{
|
2025-11-19 11:29:30 +01:00
|
|
|
Ref: input.Ref,
|
|
|
|
|
ID: lastUpdate,
|
|
|
|
|
SubGraphs: subGraphs,
|
|
|
|
|
CosmoRouterConfig: &cosmoConfig,
|
2025-11-20 08:09:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.Logger.Info("Publishing schema update to subscribers",
|
|
|
|
|
"ref", update.Ref,
|
|
|
|
|
"id", update.ID,
|
|
|
|
|
"subGraphsCount", len(update.SubGraphs),
|
|
|
|
|
"cosmoConfigLength", len(cosmoConfig),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
r.PubSub.Publish(input.Ref, update)
|
2025-11-19 11:29:30 +01:00
|
|
|
}()
|
|
|
|
|
|
2022-10-14 22:41:56 +02:00
|
|
|
return r.toGqlSubGraph(subGraph), nil
|
2022-10-09 15:23:52 +02:00
|
|
|
}
|
|
|
|
|
|
2023-04-27 07:09:10 +02:00
|
|
|
// Organizations is the resolver for the organizations field.
|
|
|
|
|
func (r *queryResolver) Organizations(ctx context.Context) ([]*model.Organization, error) {
|
|
|
|
|
sub := middleware.UserFromContext(ctx)
|
|
|
|
|
orgs := r.Cache.OrganizationsByUser(sub)
|
|
|
|
|
return ToGqlOrganizations(orgs), nil
|
2022-10-14 22:41:56 +02:00
|
|
|
}
|
|
|
|
|
|
2025-11-22 18:37:07 +01:00
|
|
|
// AllOrganizations is the resolver for the allOrganizations field.
|
|
|
|
|
func (r *queryResolver) AllOrganizations(ctx context.Context) ([]*model.Organization, error) {
|
|
|
|
|
// Check if user has admin role
|
|
|
|
|
if !middleware.UserHasRole(ctx, "admin") {
|
|
|
|
|
return nil, fmt.Errorf("unauthorized: admin role required")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
orgs := r.Cache.AllOrganizations()
|
|
|
|
|
return ToGqlOrganizations(orgs), nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-14 22:41:56 +02:00
|
|
|
// Supergraph is the resolver for the supergraph field.
|
|
|
|
|
func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error) {
|
2023-04-27 07:09:10 +02:00
|
|
|
orgId := middleware.OrganizationFromContext(ctx)
|
2025-11-22 18:37:07 +01:00
|
|
|
userId := middleware.UserFromContext(ctx)
|
|
|
|
|
|
|
|
|
|
r.Logger.Info("Supergraph query",
|
|
|
|
|
"ref", ref,
|
|
|
|
|
"orgId", orgId,
|
|
|
|
|
"userId", userId,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// If authenticated with API key (organization), check access
|
|
|
|
|
if orgId != "" {
|
|
|
|
|
_, err := r.apiKeyCanAccessRef(ctx, ref, false)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.Logger.Error("API key cannot access ref", "error", err, "ref", ref)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
} else if userId != "" {
|
|
|
|
|
// For user authentication, check if user has access to ref through their organizations
|
|
|
|
|
userOrgs := r.Cache.OrganizationsByUser(userId)
|
|
|
|
|
if len(userOrgs) == 0 {
|
|
|
|
|
r.Logger.Error("User has no organizations", "userId", userId)
|
|
|
|
|
return nil, fmt.Errorf("user has no access to any organizations")
|
|
|
|
|
}
|
|
|
|
|
// Use the first organization's ID for querying
|
|
|
|
|
orgId = userOrgs[0].ID.String()
|
|
|
|
|
r.Logger.Info("Using organization from user context", "orgId", orgId)
|
|
|
|
|
} else {
|
|
|
|
|
return nil, fmt.Errorf("no authentication provided")
|
2023-05-29 22:13:25 +02:00
|
|
|
}
|
2025-11-22 18:37:07 +01:00
|
|
|
|
2022-10-14 22:41:56 +02:00
|
|
|
after := ""
|
|
|
|
|
if isAfter != nil {
|
|
|
|
|
after = *isAfter
|
|
|
|
|
}
|
2023-04-27 07:09:10 +02:00
|
|
|
services, lastUpdate := r.Cache.Services(orgId, ref, after)
|
2022-10-14 22:41:56 +02:00
|
|
|
if after == lastUpdate {
|
|
|
|
|
return &model.Unchanged{
|
|
|
|
|
ID: lastUpdate,
|
|
|
|
|
MinDelaySeconds: 10,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
2022-10-09 15:23:52 +02:00
|
|
|
subGraphs := make([]*model.SubGraph, len(services))
|
|
|
|
|
for i, id := range services {
|
2022-12-17 14:36:42 +01:00
|
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
2022-10-09 15:23:52 +02:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
subGraphs[i] = &model.SubGraph{
|
|
|
|
|
ID: sg.ID.String(),
|
|
|
|
|
Service: sg.Service,
|
|
|
|
|
URL: sg.Url,
|
|
|
|
|
WsURL: sg.WSUrl,
|
|
|
|
|
Sdl: sg.Sdl,
|
|
|
|
|
ChangedBy: sg.ChangedBy,
|
|
|
|
|
ChangedAt: sg.ChangedAt,
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-03-25 21:46:15 +01:00
|
|
|
|
|
|
|
|
var serviceSDLs []string
|
|
|
|
|
for _, id := range services {
|
|
|
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
serviceSDLs = append(serviceSDLs, sg.Sdl)
|
|
|
|
|
}
|
|
|
|
|
sdl, err := sdlmerge.MergeSDLs(serviceSDLs...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2022-10-14 22:41:56 +02:00
|
|
|
return &model.SubGraphs{
|
|
|
|
|
ID: lastUpdate,
|
|
|
|
|
SubGraphs: subGraphs,
|
2024-03-25 21:46:15 +01:00
|
|
|
Sdl: sdl,
|
2022-10-14 22:41:56 +02:00
|
|
|
MinDelaySeconds: 10,
|
|
|
|
|
}, nil
|
2022-10-09 15:23:52 +02:00
|
|
|
}
|
|
|
|
|
|
2025-11-20 17:02:19 +01:00
|
|
|
// LatestSchema is the resolver for the latestSchema field.
|
|
|
|
|
func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.SchemaUpdate, error) {
|
|
|
|
|
orgId := middleware.OrganizationFromContext(ctx)
|
2025-11-22 18:37:07 +01:00
|
|
|
userId := middleware.UserFromContext(ctx)
|
2025-11-20 17:02:19 +01:00
|
|
|
|
|
|
|
|
r.Logger.Info("LatestSchema query",
|
|
|
|
|
"ref", ref,
|
|
|
|
|
"orgId", orgId,
|
2025-11-22 18:37:07 +01:00
|
|
|
"userId", userId,
|
2025-11-20 17:02:19 +01:00
|
|
|
)
|
|
|
|
|
|
2025-11-22 18:37:07 +01:00
|
|
|
// If authenticated with API key (organization), check access
|
|
|
|
|
if orgId != "" {
|
|
|
|
|
_, err := r.apiKeyCanAccessRef(ctx, ref, false)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.Logger.Error("API key cannot access ref", "error", err, "ref", ref)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
} else if userId != "" {
|
|
|
|
|
// For user authentication, check if user has access to ref through their organizations
|
|
|
|
|
userOrgs := r.Cache.OrganizationsByUser(userId)
|
|
|
|
|
if len(userOrgs) == 0 {
|
|
|
|
|
r.Logger.Error("User has no organizations", "userId", userId)
|
|
|
|
|
return nil, fmt.Errorf("user has no access to any organizations")
|
|
|
|
|
}
|
|
|
|
|
// Use the first organization's ID for querying
|
|
|
|
|
// In a real-world scenario, you might want to check which org has access to this ref
|
|
|
|
|
orgId = userOrgs[0].ID.String()
|
|
|
|
|
r.Logger.Info("Using organization from user context", "orgId", orgId)
|
|
|
|
|
} else {
|
|
|
|
|
return nil, fmt.Errorf("no authentication provided")
|
2025-11-20 17:02:19 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get current services and schema
|
|
|
|
|
services, lastUpdate := r.Cache.Services(orgId, ref, "")
|
|
|
|
|
r.Logger.Info("Fetching latest schema",
|
|
|
|
|
"ref", ref,
|
|
|
|
|
"orgId", orgId,
|
|
|
|
|
"lastUpdate", lastUpdate,
|
|
|
|
|
"servicesCount", len(services),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
subGraphs := make([]*model.SubGraph, len(services))
|
|
|
|
|
for i, id := range services {
|
|
|
|
|
sg, err := r.fetchSubGraph(ctx, id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.Logger.Error("fetch subgraph", "error", err, "id", id)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
subGraphs[i] = &model.SubGraph{
|
|
|
|
|
ID: sg.ID.String(),
|
|
|
|
|
Service: sg.Service,
|
|
|
|
|
URL: sg.Url,
|
|
|
|
|
WsURL: sg.WSUrl,
|
|
|
|
|
Sdl: sg.Sdl,
|
|
|
|
|
ChangedBy: sg.ChangedBy,
|
|
|
|
|
ChangedAt: sg.ChangedAt,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Generate Cosmo router config
|
|
|
|
|
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.Logger.Error("generate cosmo config", "error", err)
|
|
|
|
|
cosmoConfig = "" // Return empty if generation fails
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
update := &model.SchemaUpdate{
|
|
|
|
|
Ref: ref,
|
|
|
|
|
ID: lastUpdate,
|
|
|
|
|
SubGraphs: subGraphs,
|
|
|
|
|
CosmoRouterConfig: &cosmoConfig,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.Logger.Info("Latest schema fetched",
|
|
|
|
|
"ref", update.Ref,
|
|
|
|
|
"id", update.ID,
|
|
|
|
|
"subGraphsCount", len(update.SubGraphs),
|
|
|
|
|
"cosmoConfigLength", len(cosmoConfig),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return update, nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-19 11:29:30 +01:00
|
|
|
// SchemaUpdates is the resolver for the schemaUpdates field.
|
|
|
|
|
func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) {
|
|
|
|
|
orgId := middleware.OrganizationFromContext(ctx)
|
2025-11-20 08:09:00 +01:00
|
|
|
|
|
|
|
|
r.Logger.Info("SchemaUpdates subscription started",
|
|
|
|
|
"ref", ref,
|
|
|
|
|
"orgId", orgId,
|
|
|
|
|
)
|
|
|
|
|
|
2025-11-19 11:29:30 +01:00
|
|
|
_, err := r.apiKeyCanAccessRef(ctx, ref, false)
|
|
|
|
|
if err != nil {
|
2025-11-20 08:09:00 +01:00
|
|
|
r.Logger.Error("API key cannot access ref", "error", err, "ref", ref)
|
2025-11-19 11:29:30 +01:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Subscribe to updates for this ref
|
|
|
|
|
ch := r.PubSub.Subscribe(ref)
|
|
|
|
|
|
|
|
|
|
// Send initial state immediately
|
|
|
|
|
go func() {
|
2025-11-20 08:09:00 +01:00
|
|
|
// Use background context for async operation
|
|
|
|
|
bgCtx := context.Background()
|
|
|
|
|
|
2025-11-19 11:29:30 +01:00
|
|
|
services, lastUpdate := r.Cache.Services(orgId, ref, "")
|
2025-11-20 08:09:00 +01:00
|
|
|
r.Logger.Info("Preparing initial schema update",
|
|
|
|
|
"ref", ref,
|
|
|
|
|
"orgId", orgId,
|
|
|
|
|
"lastUpdate", lastUpdate,
|
|
|
|
|
"servicesCount", len(services),
|
|
|
|
|
)
|
|
|
|
|
|
2025-11-19 11:29:30 +01:00
|
|
|
subGraphs := make([]*model.SubGraph, len(services))
|
|
|
|
|
for i, id := range services {
|
2025-11-20 08:09:00 +01:00
|
|
|
sg, err := r.fetchSubGraph(bgCtx, id)
|
2025-11-19 11:29:30 +01:00
|
|
|
if err != nil {
|
2025-11-20 08:09:00 +01:00
|
|
|
r.Logger.Error("fetch subgraph for initial update", "error", err, "id", id)
|
2025-11-19 11:29:30 +01:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
subGraphs[i] = &model.SubGraph{
|
|
|
|
|
ID: sg.ID.String(),
|
|
|
|
|
Service: sg.Service,
|
|
|
|
|
URL: sg.Url,
|
|
|
|
|
WsURL: sg.WSUrl,
|
|
|
|
|
Sdl: sg.Sdl,
|
|
|
|
|
ChangedBy: sg.ChangedBy,
|
|
|
|
|
ChangedAt: sg.ChangedAt,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Generate Cosmo router config
|
|
|
|
|
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.Logger.Error("generate cosmo config", "error", err)
|
|
|
|
|
cosmoConfig = "" // Send empty if generation fails
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send initial update
|
2025-11-20 08:09:00 +01:00
|
|
|
update := &model.SchemaUpdate{
|
2025-11-19 11:29:30 +01:00
|
|
|
Ref: ref,
|
|
|
|
|
ID: lastUpdate,
|
|
|
|
|
SubGraphs: subGraphs,
|
|
|
|
|
CosmoRouterConfig: &cosmoConfig,
|
|
|
|
|
}
|
2025-11-20 08:09:00 +01:00
|
|
|
|
|
|
|
|
r.Logger.Info("Sending initial schema update",
|
|
|
|
|
"ref", update.Ref,
|
|
|
|
|
"id", update.ID,
|
|
|
|
|
"subGraphsCount", len(update.SubGraphs),
|
|
|
|
|
"cosmoConfigLength", len(cosmoConfig),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
ch <- update
|
2025-11-19 11:29:30 +01:00
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Clean up subscription when context is done
|
|
|
|
|
go func() {
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
r.PubSub.Unsubscribe(ref, ch)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
return ch, nil
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-09 15:23:52 +02:00
|
|
|
// Mutation returns generated.MutationResolver implementation.
|
|
|
|
|
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
|
|
|
|
|
|
|
|
|
|
// Query returns generated.QueryResolver implementation.
|
|
|
|
|
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
|
|
|
|
|
2025-11-19 11:29:30 +01:00
|
|
|
// Subscription returns generated.SubscriptionResolver implementation.
|
|
|
|
|
func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} }
|
|
|
|
|
|
2022-10-09 15:23:52 +02:00
|
|
|
type (
|
2025-11-19 11:29:30 +01:00
|
|
|
mutationResolver struct{ *Resolver }
|
|
|
|
|
queryResolver struct{ *Resolver }
|
|
|
|
|
subscriptionResolver struct{ *Resolver }
|
2022-10-09 15:23:52 +02:00
|
|
|
)
|