2022-10-09 15:23:52 +02:00
package main
import (
"context"
2025-04-12 10:43:40 +02:00
"errors"
2022-10-09 15:23:52 +02:00
"fmt"
2025-04-12 10:43:40 +02:00
"log/slog"
2022-10-09 15:23:52 +02:00
"net/http"
"os"
"os/signal"
"reflect"
"sync"
"syscall"
2022-12-16 16:02:16 +01:00
"time"
2022-10-09 15:23:52 +02:00
"github.com/99designs/gqlgen/graphql/handler"
2024-12-09 14:57:00 +00:00
"github.com/99designs/gqlgen/graphql/handler/extension"
"github.com/99designs/gqlgen/graphql/handler/lru"
"github.com/99designs/gqlgen/graphql/handler/transport"
2022-10-09 15:23:52 +02:00
"github.com/99designs/gqlgen/graphql/playground"
"github.com/alecthomas/kong"
"github.com/rs/cors"
"github.com/sparetimecoders/goamqp"
2024-12-09 14:57:00 +00:00
"github.com/vektah/gqlparser/v2/ast"
2022-10-09 15:23:52 +02:00
"gitlab.com/unboundsoftware/eventsourced/amqp"
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
"gitlab.com/unboundsoftware/eventsourced/pg"
2026-01-17 22:53:46 +01:00
"gitea.unbound.se/unboundsoftware/schemas/cache"
"gitea.unbound.se/unboundsoftware/schemas/domain"
"gitea.unbound.se/unboundsoftware/schemas/graph"
"gitea.unbound.se/unboundsoftware/schemas/graph/generated"
"gitea.unbound.se/unboundsoftware/schemas/health"
"gitea.unbound.se/unboundsoftware/schemas/logging"
"gitea.unbound.se/unboundsoftware/schemas/middleware"
"gitea.unbound.se/unboundsoftware/schemas/monitoring"
"gitea.unbound.se/unboundsoftware/schemas/store"
2022-10-09 15:23:52 +02:00
)
2022-12-16 16:02:16 +01:00
type CLI struct {
2024-03-01 22:41:37 +01:00
AmqpURL string ` name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@unbound-control-plane.orb.local:5672/" `
2022-10-09 15:23:52 +02:00
Port int ` name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080" `
LogLevel string ` name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info" `
2025-06-13 11:00:52 +02:00
LogFormat string ` name:"log-format" env:"LOG_FORMAT" help:"The format of logs" default:"text" enum:"otel,json,text" `
2024-03-01 22:41:37 +01:00
DatabaseURL string ` name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@unbound-control-plane.orb.local:5432/schemas?sslmode=disable" `
2022-10-09 15:23:52 +02:00
DatabaseDriverName string ` name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres" `
2023-04-27 07:09:10 +02:00
Issuer string ` name:"issuer" env:"ISSUER" help:"The JWT token issuer to use" default:"unbound.eu.auth0.com" `
StrictSSL bool ` name:"strict-ssl" env:"STRICT_SSL" help:"Should strict SSL handling be enabled" default:"true" `
2025-06-13 11:00:52 +02:00
Environment string ` name:"environment" env:"ENVIRONMENT" help:"The environment we are running in" default:"development" enum:"development,staging,production" `
2022-12-16 16:02:16 +01:00
}
var buildVersion = "none"
2022-10-09 15:23:52 +02:00
const serviceName = "schemas"
func main ( ) {
2022-12-16 16:02:16 +01:00
var cli CLI
_ = kong . Parse ( & cli )
2025-06-13 11:00:52 +02:00
logger := logging . SetupLogger ( cli . LogLevel , cli . LogFormat , serviceName , buildVersion )
2022-10-09 15:23:52 +02:00
closeEvents := make ( chan error )
if err := start (
closeEvents ,
logger ,
ConnectAMQP ,
2022-12-16 16:02:16 +01:00
cli ,
2022-10-09 15:23:52 +02:00
) ; err != nil {
2025-04-12 10:43:40 +02:00
logger . With ( "error" , err ) . Error ( "process error" )
2022-10-09 15:23:52 +02:00
}
}
2025-04-12 10:43:40 +02:00
func start ( closeEvents chan error , logger * slog . Logger , connectToAmqpFunc func ( url string ) ( Connection , error ) , cli CLI ) error {
2022-10-09 15:23:52 +02:00
rootCtx , rootCancel := context . WithCancel ( context . Background ( ) )
defer rootCancel ( )
2025-06-13 11:00:52 +02:00
shutdownFn , err := monitoring . SetupOTelSDK ( rootCtx , cli . LogFormat == "otel" , serviceName , buildVersion , cli . Environment )
if err != nil {
return err
}
defer func ( ) {
_ = errors . Join ( shutdownFn ( context . Background ( ) ) )
} ( )
2022-12-16 16:02:16 +01:00
db , err := store . SetupDB ( cli . DatabaseDriverName , cli . DatabaseURL )
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt . Errorf ( "failed to setup DB: %v" , err )
}
eventStore , err := pg . New (
2022-12-17 14:36:42 +01:00
rootCtx ,
2022-10-09 15:23:52 +02:00
db . DB ,
pg . WithEventTypes (
& domain . SubGraphUpdated { } ,
2023-04-27 07:09:10 +02:00
& domain . OrganizationAdded { } ,
2025-11-22 18:37:07 +01:00
& domain . UserAddedToOrganization { } ,
2023-04-27 07:09:10 +02:00
& domain . APIKeyAdded { } ,
2025-11-22 18:37:07 +01:00
& domain . APIKeyRemoved { } ,
& domain . OrganizationRemoved { } ,
2022-10-09 15:23:52 +02:00
) ,
)
if err != nil {
return fmt . Errorf ( "failed to create eventstore: %v" , err )
}
2023-04-27 07:09:10 +02:00
if err := store . RunEventStoreMigrations ( db ) ; err != nil {
return fmt . Errorf ( "event migrations: %w" , err )
}
2023-10-05 06:25:33 +00:00
publisher := goamqp . NewPublisher ( )
2022-12-17 14:36:42 +01:00
eventPublisher , err := amqp . New ( publisher )
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt . Errorf ( "failed to create event publisher: %v" , err )
}
2022-12-16 16:02:16 +01:00
conn , err := connectToAmqpFunc ( cli . AmqpURL )
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt . Errorf ( "failed to connect to AMQP: %v" , err )
}
serviceCache := cache . New ( logger )
2023-04-27 07:09:10 +02:00
if err := loadOrganizations ( rootCtx , eventStore , serviceCache ) ; err != nil {
return fmt . Errorf ( "caching organizations: %w" , err )
2022-10-09 15:23:52 +02:00
}
2023-04-27 07:09:10 +02:00
if err := loadSubGraphs ( rootCtx , eventStore , serviceCache ) ; err != nil {
return fmt . Errorf ( "caching subgraphs: %w" , err )
2022-10-09 15:23:52 +02:00
}
setups := [ ] goamqp . Setup {
2025-04-12 10:43:40 +02:00
goamqp . UseLogger ( func ( s string ) { logger . Error ( s ) } ) ,
2022-10-09 15:23:52 +02:00
goamqp . CloseListener ( closeEvents ) ,
goamqp . WithPrefetchLimit ( 20 ) ,
2022-12-17 14:36:42 +01:00
goamqp . EventStreamPublisher ( publisher ) ,
2022-10-09 15:23:52 +02:00
goamqp . TransientEventStreamConsumer ( "SubGraph.Updated" , serviceCache . Update , domain . SubGraphUpdated { } ) ,
2023-04-27 07:09:10 +02:00
goamqp . TransientEventStreamConsumer ( "Organization.Added" , serviceCache . Update , domain . OrganizationAdded { } ) ,
2025-11-22 18:37:07 +01:00
goamqp . TransientEventStreamConsumer ( "Organization.UserAdded" , serviceCache . Update , domain . UserAddedToOrganization { } ) ,
2023-04-27 07:09:10 +02:00
goamqp . TransientEventStreamConsumer ( "Organization.APIKeyAdded" , serviceCache . Update , domain . APIKeyAdded { } ) ,
2025-11-22 18:37:07 +01:00
goamqp . TransientEventStreamConsumer ( "Organization.APIKeyRemoved" , serviceCache . Update , domain . APIKeyRemoved { } ) ,
goamqp . TransientEventStreamConsumer ( "Organization.Removed" , serviceCache . Update , domain . OrganizationRemoved { } ) ,
2023-10-05 06:25:33 +00:00
goamqp . WithTypeMapping ( "SubGraph.Updated" , domain . SubGraphUpdated { } ) ,
goamqp . WithTypeMapping ( "Organization.Added" , domain . OrganizationAdded { } ) ,
2025-11-22 18:37:07 +01:00
goamqp . WithTypeMapping ( "Organization.UserAdded" , domain . UserAddedToOrganization { } ) ,
2023-10-05 06:25:33 +00:00
goamqp . WithTypeMapping ( "Organization.APIKeyAdded" , domain . APIKeyAdded { } ) ,
2025-11-22 18:37:07 +01:00
goamqp . WithTypeMapping ( "Organization.APIKeyRemoved" , domain . APIKeyRemoved { } ) ,
goamqp . WithTypeMapping ( "Organization.Removed" , domain . OrganizationRemoved { } ) ,
2022-10-09 15:23:52 +02:00
}
2022-12-16 16:02:16 +01:00
if err := conn . Start ( rootCtx , setups ... ) ; err != nil {
2022-10-09 15:23:52 +02:00
return fmt . Errorf ( "failed to setup AMQP: %v" , err )
}
defer func ( ) { _ = conn . Close ( ) } ( )
logger . Info ( "Started" )
mux := http . NewServeMux ( )
2022-12-16 16:02:16 +01:00
httpSrv := & http . Server { Addr : fmt . Sprintf ( ":%d" , cli . Port ) , Handler : mux }
2022-10-09 15:23:52 +02:00
wg := sync . WaitGroup { }
sigint := make ( chan os . Signal , 1 )
signal . Notify ( sigint , os . Interrupt , syscall . SIGTERM )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
sig := <- sigint
if sig != nil {
// In case our shutdown logic is broken/incomplete we reset signal
// handlers so next signal goes to go itself. Go is more aggressive when
// shutting down goroutines
signal . Reset ( os . Interrupt , syscall . SIGTERM )
logger . Info ( "Got shutdown signal.." )
rootCancel ( )
}
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
err := <- closeEvents
if err != nil {
2025-04-12 10:43:40 +02:00
logger . With ( "error" , err ) . Error ( "received close from AMQP" )
2022-10-09 15:23:52 +02:00
rootCancel ( )
}
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
<- rootCtx . Done ( )
2025-04-12 10:43:40 +02:00
shutdownCtx , shutdownRelease := context . WithTimeout ( context . Background ( ) , 10 * time . Second )
defer shutdownRelease ( )
if err := httpSrv . Shutdown ( shutdownCtx ) ; err != nil {
logger . With ( "error" , err ) . Error ( "close http server" )
2022-10-09 15:23:52 +02:00
}
close ( sigint )
close ( closeEvents )
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
defer rootCancel ( )
resolver := & graph . Resolver {
2026-02-23 08:05:47 +01:00
EventStore : eventStore ,
Publisher : eventPublisher ,
Logger : logger ,
Cache : serviceCache ,
PubSub : graph . NewPubSub ( ) ,
CosmoGenerator : graph . NewCosmoGenerator ( & graph . DefaultCommandExecutor { } , 60 * time . Second ) ,
Debouncer : graph . NewDebouncer ( 500 * time . Millisecond ) ,
2022-10-09 15:23:52 +02:00
}
config := generated . Config {
Resolvers : resolver ,
Complexity : generated . ComplexityRoot { } ,
}
2023-04-27 07:09:10 +02:00
apiKeyMiddleware := middleware . NewApiKey ( )
mw := middleware . NewAuth0 ( "https://schemas.unbound.se" , cli . Issuer , cli . StrictSSL )
authMiddleware := middleware . NewAuth ( serviceCache )
config . Directives . Auth = authMiddleware . Directive
2024-12-09 14:57:00 +00:00
srv := handler . New ( generated . NewExecutableSchema ( config ) )
srv . AddTransport ( transport . Websocket {
KeepAlivePingInterval : 10 * time . Second ,
2025-11-20 08:09:00 +01:00
InitFunc : func ( ctx context . Context , initPayload transport . InitPayload ) ( context . Context , * transport . InitPayload , error ) {
// Extract API key from WebSocket connection_init payload
if apiKey , ok := initPayload [ "X-Api-Key" ] . ( string ) ; ok && apiKey != "" {
logger . Info ( "WebSocket connection with API key" , "has_key" , true )
ctx = context . WithValue ( ctx , middleware . ApiKey , apiKey )
2025-11-20 22:11:17 +01:00
// Look up organization by API key (cache handles hash comparison)
if organization := serviceCache . OrganizationByAPIKey ( apiKey ) ; organization != nil {
2025-11-20 08:09:00 +01:00
logger . Info ( "WebSocket: Organization found for API key" , "org_id" , organization . ID . String ( ) )
ctx = context . WithValue ( ctx , middleware . OrganizationKey , * organization )
} else {
logger . Warn ( "WebSocket: No organization found for API key" )
}
} else {
logger . Info ( "WebSocket connection without API key" )
}
return ctx , & initPayload , nil
} ,
2024-12-09 14:57:00 +00:00
} )
srv . AddTransport ( transport . Options { } )
srv . AddTransport ( transport . GET { } )
srv . AddTransport ( transport . POST { } )
srv . AddTransport ( transport . MultipartForm { } )
srv . SetQueryCache ( lru . New [ * ast . QueryDocument ] ( 1000 ) )
srv . Use ( extension . Introspection { } )
srv . Use ( extension . AutomaticPersistedQuery {
Cache : lru . New [ string ] ( 100 ) ,
} )
2022-10-09 15:23:52 +02:00
2025-11-21 10:24:34 +01:00
healthChecker := health . New ( db . DB , logger )
2025-06-13 11:00:52 +02:00
mux . Handle ( "/" , monitoring . Handler ( playground . Handler ( "GraphQL playground" , "/query" ) ) )
2025-11-21 10:24:34 +01:00
mux . Handle ( "/health" , http . HandlerFunc ( healthChecker . LivenessHandler ) )
mux . Handle ( "/health/live" , http . HandlerFunc ( healthChecker . LivenessHandler ) )
mux . Handle ( "/health/ready" , http . HandlerFunc ( healthChecker . ReadinessHandler ) )
2023-04-27 07:09:10 +02:00
mux . Handle ( "/query" , cors . AllowAll ( ) . Handler (
2025-06-13 11:00:52 +02:00
monitoring . Handler (
2023-04-27 07:09:10 +02:00
mw . Middleware ( ) . CheckJWT (
apiKeyMiddleware . Handler (
authMiddleware . Handler ( srv ) ,
) ,
) ,
) ,
) )
2022-10-09 15:23:52 +02:00
2025-04-12 10:43:40 +02:00
logger . Info ( fmt . Sprintf ( "connect to http://localhost:%d/ for GraphQL playground" , cli . Port ) )
2022-10-09 15:23:52 +02:00
2025-04-12 10:43:40 +02:00
if err := httpSrv . ListenAndServe ( ) ; ! errors . Is ( err , http . ErrServerClosed ) {
logger . With ( "error" , err ) . Error ( "listen http" )
2022-10-09 15:23:52 +02:00
}
} ( )
wg . Wait ( )
return nil
}
2023-04-27 07:09:10 +02:00
func loadOrganizations ( ctx context . Context , eventStore eventsourced . EventStore , serviceCache * cache . Cache ) error {
roots , err := eventStore . GetAggregateRoots ( ctx , reflect . TypeOf ( domain . Organization { } ) )
if err != nil {
return err
}
for _ , root := range roots {
organization := & domain . Organization { BaseAggregate : eventsourced . BaseAggregateFromString ( root . String ( ) ) }
if _ , err := eventsourced . NewHandler ( ctx , organization , eventStore ) ; err != nil {
return err
}
_ , err := serviceCache . Update ( organization , nil )
if err != nil {
return err
}
}
return nil
}
func loadSubGraphs ( ctx context . Context , eventStore eventsourced . EventStore , serviceCache * cache . Cache ) error {
roots , err := eventStore . GetAggregateRoots ( ctx , reflect . TypeOf ( domain . SubGraph { } ) )
if err != nil {
return err
}
for _ , root := range roots {
subGraph := & domain . SubGraph { BaseAggregate : eventsourced . BaseAggregateFromString ( root . String ( ) ) }
if _ , err := eventsourced . NewHandler ( ctx , subGraph , eventStore ) ; err != nil {
return err
}
_ , err := serviceCache . Update ( subGraph , nil )
if err != nil {
return err
}
}
return nil
}
2022-10-09 15:23:52 +02:00
func ConnectAMQP ( url string ) ( Connection , error ) {
return goamqp . NewFromURL ( serviceName , url )
}
type Connection interface {
2022-12-16 16:02:16 +01:00
Start ( ctx context . Context , opts ... goamqp . Setup ) error
2022-10-09 15:23:52 +02:00
Close ( ) error
}