package main import ( "context" "fmt" "net/http" "os" "os/signal" "reflect" "sync" "syscall" "time" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" "github.com/alecthomas/kong" "github.com/apex/log" "github.com/apex/log/handlers/json" "github.com/getsentry/sentry-go" sentryhttp "github.com/getsentry/sentry-go/http" "github.com/rs/cors" "github.com/sparetimecoders/goamqp" "gitlab.com/unboundsoftware/eventsourced/amqp" "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitlab.com/unboundsoftware/eventsourced/pg" "gitlab.com/unboundsoftware/schemas/cache" "gitlab.com/unboundsoftware/schemas/domain" "gitlab.com/unboundsoftware/schemas/graph" "gitlab.com/unboundsoftware/schemas/graph/generated" "gitlab.com/unboundsoftware/schemas/middleware" "gitlab.com/unboundsoftware/schemas/store" ) type CLI struct { AmqpURL string `name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@localhost:5672/"` Port int `name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080"` APIKey string `name:"api-key" env:"API_KEY" help:"The API-key that is required"` LogLevel string `name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info"` DatabaseURL string `name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@:5432/schemas?sslmode=disable"` DatabaseDriverName string `name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres"` SentryConfig } type SentryConfig struct { DSN string `name:"sentry-dsn" env:"SENTRY_DSN" help:"Sentry dsn" default:""` Environment string `name:"sentry-environment" env:"SENTRY_ENVIRONMENT" help:"Sentry environment" default:"development"` } var buildVersion = "none" const serviceName = "schemas" func main() { var cli CLI _ = kong.Parse(&cli) log.SetHandler(json.New(os.Stdout)) log.SetLevelFromString(cli.LogLevel) logger := log.WithField("service", serviceName) closeEvents := make(chan error) if err := start( closeEvents, logger, ConnectAMQP, cli, ); err != nil { logger.WithError(err).Error("process error") } } func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url string) (Connection, error), cli CLI) error { if err := setupSentry(logger, cli.SentryConfig); err != nil { return err } defer sentry.Flush(2 * time.Second) rootCtx, rootCancel := context.WithCancel(context.Background()) defer rootCancel() db, err := store.SetupDB(cli.DatabaseDriverName, cli.DatabaseURL) if err != nil { return fmt.Errorf("failed to setup DB: %v", err) } eventStore, err := pg.New( rootCtx, db.DB, pg.WithEventTypes( &domain.SubGraphUpdated{}, ), ) if err != nil { return fmt.Errorf("failed to create eventstore: %v", err) } publisher, err := goamqp.NewPublisher( goamqp.Route{ Type: domain.SubGraphUpdated{}, Key: "SubGraph.Updated", }, ) if err != nil { return fmt.Errorf("failed to create publisher: %v", err) } eventPublisher, err := amqp.New(publisher) if err != nil { return fmt.Errorf("failed to create event publisher: %v", err) } conn, err := connectToAmqpFunc(cli.AmqpURL) if err != nil { return fmt.Errorf("failed to connect to AMQP: %v", err) } serviceCache := cache.New(logger) roots, err := eventStore.GetAggregateRoots(rootCtx, reflect.TypeOf(domain.SubGraph{})) if err != nil { return err } for _, root := range roots { subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())} if _, err := eventsourced.NewHandler(rootCtx, subGraph, eventStore); err != nil { return err } _, err := serviceCache.Update(subGraph, nil) if err != nil { return err } } setups := []goamqp.Setup{ goamqp.UseLogger(logger.Errorf), goamqp.CloseListener(closeEvents), goamqp.WithPrefetchLimit(20), goamqp.EventStreamPublisher(publisher), goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}), } if err := conn.Start(rootCtx, setups...); err != nil { return fmt.Errorf("failed to setup AMQP: %v", err) } defer func() { _ = conn.Close() }() logger.Info("Started") mux := http.NewServeMux() httpSrv := &http.Server{Addr: fmt.Sprintf(":%d", cli.Port), Handler: mux} wg := sync.WaitGroup{} sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) wg.Add(1) go func() { defer wg.Done() sig := <-sigint if sig != nil { // In case our shutdown logic is broken/incomplete we reset signal // handlers so next signal goes to go itself. Go is more aggressive when // shutting down goroutines signal.Reset(os.Interrupt, syscall.SIGTERM) logger.Info("Got shutdown signal..") rootCancel() } }() wg.Add(1) go func() { defer wg.Done() err := <-closeEvents if err != nil { logger.WithError(err).Error("received close from AMQP") rootCancel() } }() wg.Add(1) go func() { defer wg.Done() <-rootCtx.Done() if err := httpSrv.Close(); err != nil { logger.WithError(err).Error("close http server") } close(sigint) close(closeEvents) }() wg.Add(1) go func() { defer wg.Done() defer rootCancel() resolver := &graph.Resolver{ EventStore: eventStore, Publisher: eventPublisher, Logger: logger, Cache: serviceCache, } config := generated.Config{ Resolvers: resolver, Complexity: generated.ComplexityRoot{}, } apiKeyMiddleware := middleware.NewApiKey(cli.APIKey, logger) config.Directives.HasApiKey = apiKeyMiddleware.Directive srv := handler.NewDefaultServer(generated.NewExecutableSchema( config, )) sentryHandler := sentryhttp.New(sentryhttp.Options{Repanic: true}) mux.Handle("/", sentryHandler.HandleFunc(playground.Handler("GraphQL playground", "/query"))) mux.Handle("/health", http.HandlerFunc(healthFunc)) mux.Handle("/query", cors.AllowAll().Handler(sentryHandler.Handle(apiKeyMiddleware.Handler(srv)))) logger.Infof("connect to http://localhost:%d/ for GraphQL playground", cli.Port) if err := httpSrv.ListenAndServe(); err != nil { logger.WithError(err).Error("listen http") } }() wg.Wait() return nil } func healthFunc(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("OK")) } func setupSentry(logger log.Interface, args SentryConfig) error { if args.Environment == "" { return fmt.Errorf("no Sentry environment supplied, exiting") } cfg := sentry.ClientOptions{ Dsn: args.DSN, Environment: args.Environment, Release: fmt.Sprintf("%s-%s", serviceName, buildVersion), } switch args.Environment { case "development": cfg.Debug = true cfg.EnableTracing = false cfg.TracesSampleRate = 0.0 case "production": if args.DSN == "" { return fmt.Errorf("no DSN supplied for non-dev environment, exiting") } cfg.Debug = false cfg.EnableTracing = true cfg.TracesSampleRate = 1.0 default: return fmt.Errorf("illegal environment %s", args.Environment) } if err := sentry.Init(cfg); err != nil { return fmt.Errorf("sentry setup: %w", err) } logger.Infof("configured Sentry for env: %s", args.Environment) return nil } func ConnectAMQP(url string) (Connection, error) { return goamqp.NewFromURL(serviceName, url) } type Connection interface { Start(ctx context.Context, opts ...goamqp.Setup) error Close() error }