package main import ( "context" "fmt" "net/http" "os" "os/signal" "reflect" "sync" "syscall" "time" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" "github.com/alecthomas/kong" "github.com/apex/log" "github.com/apex/log/handlers/json" "github.com/getsentry/sentry-go" sentryhttp "github.com/getsentry/sentry-go/http" "github.com/rs/cors" "github.com/sparetimecoders/goamqp" "gitlab.com/unboundsoftware/eventsourced/amqp" "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitlab.com/unboundsoftware/eventsourced/pg" "gitlab.com/unboundsoftware/schemas/cache" "gitlab.com/unboundsoftware/schemas/domain" "gitlab.com/unboundsoftware/schemas/graph" "gitlab.com/unboundsoftware/schemas/graph/generated" "gitlab.com/unboundsoftware/schemas/middleware" "gitlab.com/unboundsoftware/schemas/store" ) type CLI struct { AmqpURL string `name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@localhost:5672/"` Port int `name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080"` LogLevel string `name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info"` DatabaseURL string `name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@:5432/schemas?sslmode=disable"` DatabaseDriverName string `name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres"` Issuer string `name:"issuer" env:"ISSUER" help:"The JWT token issuer to use" default:"unbound.eu.auth0.com"` StrictSSL bool `name:"strict-ssl" env:"STRICT_SSL" help:"Should strict SSL handling be enabled" default:"true"` SentryConfig } type SentryConfig struct { DSN string `name:"sentry-dsn" env:"SENTRY_DSN" help:"Sentry dsn" default:""` Environment string `name:"sentry-environment" env:"SENTRY_ENVIRONMENT" help:"Sentry environment" default:"development"` } var buildVersion = "none" const serviceName = "schemas" func main() { var cli CLI _ = kong.Parse(&cli) log.SetHandler(json.New(os.Stdout)) log.SetLevelFromString(cli.LogLevel) logger := log.WithField("service", serviceName) closeEvents := make(chan error) if err := start( closeEvents, logger, ConnectAMQP, cli, ); err != nil { logger.WithError(err).Error("process error") } } func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url string) (Connection, error), cli CLI) error { if err := setupSentry(logger, cli.SentryConfig); err != nil { return err } defer sentry.Flush(2 * time.Second) rootCtx, rootCancel := context.WithCancel(context.Background()) defer rootCancel() db, err := store.SetupDB(cli.DatabaseDriverName, cli.DatabaseURL) if err != nil { return fmt.Errorf("failed to setup DB: %v", err) } eventStore, err := pg.New( rootCtx, db.DB, pg.WithEventTypes( &domain.SubGraphUpdated{}, &domain.OrganizationAdded{}, &domain.APIKeyAdded{}, ), ) if err != nil { return fmt.Errorf("failed to create eventstore: %v", err) } if err := store.RunEventStoreMigrations(db); err != nil { return fmt.Errorf("event migrations: %w", err) } publisher := goamqp.NewPublisher() eventPublisher, err := amqp.New(publisher) if err != nil { return fmt.Errorf("failed to create event publisher: %v", err) } conn, err := connectToAmqpFunc(cli.AmqpURL) if err != nil { return fmt.Errorf("failed to connect to AMQP: %v", err) } serviceCache := cache.New(logger) if err := loadOrganizations(rootCtx, eventStore, serviceCache); err != nil { return fmt.Errorf("caching organizations: %w", err) } if err := loadSubGraphs(rootCtx, eventStore, serviceCache); err != nil { return fmt.Errorf("caching subgraphs: %w", err) } setups := []goamqp.Setup{ goamqp.UseLogger(logger.Error), goamqp.CloseListener(closeEvents), goamqp.WithPrefetchLimit(20), goamqp.EventStreamPublisher(publisher), goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}), goamqp.TransientEventStreamConsumer("Organization.Added", serviceCache.Update, domain.OrganizationAdded{}), goamqp.TransientEventStreamConsumer("Organization.APIKeyAdded", serviceCache.Update, domain.APIKeyAdded{}), goamqp.WithTypeMapping("SubGraph.Updated", domain.SubGraphUpdated{}), goamqp.WithTypeMapping("Organization.Added", domain.OrganizationAdded{}), goamqp.WithTypeMapping("Organization.APIKeyAdded", domain.APIKeyAdded{}), } if err := conn.Start(rootCtx, setups...); err != nil { return fmt.Errorf("failed to setup AMQP: %v", err) } defer func() { _ = conn.Close() }() logger.Info("Started") mux := http.NewServeMux() httpSrv := &http.Server{Addr: fmt.Sprintf(":%d", cli.Port), Handler: mux} wg := sync.WaitGroup{} sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) wg.Add(1) go func() { defer wg.Done() sig := <-sigint if sig != nil { // In case our shutdown logic is broken/incomplete we reset signal // handlers so next signal goes to go itself. Go is more aggressive when // shutting down goroutines signal.Reset(os.Interrupt, syscall.SIGTERM) logger.Info("Got shutdown signal..") rootCancel() } }() wg.Add(1) go func() { defer wg.Done() err := <-closeEvents if err != nil { logger.WithError(err).Error("received close from AMQP") rootCancel() } }() wg.Add(1) go func() { defer wg.Done() <-rootCtx.Done() if err := httpSrv.Close(); err != nil { logger.WithError(err).Error("close http server") } close(sigint) close(closeEvents) }() wg.Add(1) go func() { defer wg.Done() defer rootCancel() resolver := &graph.Resolver{ EventStore: eventStore, Publisher: eventPublisher, Logger: logger, Cache: serviceCache, } config := generated.Config{ Resolvers: resolver, Complexity: generated.ComplexityRoot{}, } apiKeyMiddleware := middleware.NewApiKey() mw := middleware.NewAuth0("https://schemas.unbound.se", cli.Issuer, cli.StrictSSL) authMiddleware := middleware.NewAuth(serviceCache) config.Directives.Auth = authMiddleware.Directive srv := handler.NewDefaultServer(generated.NewExecutableSchema( config, )) sentryHandler := sentryhttp.New(sentryhttp.Options{Repanic: true}) mux.Handle("/", sentryHandler.HandleFunc(playground.Handler("GraphQL playground", "/query"))) mux.Handle("/health", http.HandlerFunc(healthFunc)) mux.Handle("/query", cors.AllowAll().Handler( sentryHandler.Handle( mw.Middleware().CheckJWT( apiKeyMiddleware.Handler( authMiddleware.Handler(srv), ), ), ), )) logger.Infof("connect to http://localhost:%d/ for GraphQL playground", cli.Port) if err := httpSrv.ListenAndServe(); err != nil { logger.WithError(err).Error("listen http") } }() wg.Wait() return nil } func loadOrganizations(ctx context.Context, eventStore eventsourced.EventStore, serviceCache *cache.Cache) error { roots, err := eventStore.GetAggregateRoots(ctx, reflect.TypeOf(domain.Organization{})) if err != nil { return err } for _, root := range roots { organization := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())} if _, err := eventsourced.NewHandler(ctx, organization, eventStore); err != nil { return err } _, err := serviceCache.Update(organization, nil) if err != nil { return err } } return nil } func loadSubGraphs(ctx context.Context, eventStore eventsourced.EventStore, serviceCache *cache.Cache) error { roots, err := eventStore.GetAggregateRoots(ctx, reflect.TypeOf(domain.SubGraph{})) if err != nil { return err } for _, root := range roots { subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())} if _, err := eventsourced.NewHandler(ctx, subGraph, eventStore); err != nil { return err } _, err := serviceCache.Update(subGraph, nil) if err != nil { return err } } return nil } func healthFunc(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("OK")) } func setupSentry(logger log.Interface, args SentryConfig) error { if args.Environment == "" { return fmt.Errorf("no Sentry environment supplied, exiting") } cfg := sentry.ClientOptions{ Dsn: args.DSN, Environment: args.Environment, Release: fmt.Sprintf("%s-%s", serviceName, buildVersion), } switch args.Environment { case "development": cfg.Debug = true cfg.EnableTracing = false cfg.TracesSampleRate = 0.0 case "production": if args.DSN == "" { return fmt.Errorf("no DSN supplied for non-dev environment, exiting") } cfg.Debug = false cfg.EnableTracing = true cfg.TracesSampleRate = 0.01 default: return fmt.Errorf("illegal environment %s", args.Environment) } if err := sentry.Init(cfg); err != nil { return fmt.Errorf("sentry setup: %w", err) } logger.Infof("configured Sentry for env: %s", args.Environment) return nil } func ConnectAMQP(url string) (Connection, error) { return goamqp.NewFromURL(serviceName, url) } type Connection interface { Start(ctx context.Context, opts ...goamqp.Setup) error Close() error }