Files
schemas/cmd/service/service.go
T

332 lines
9.9 KiB
Go
Raw Normal View History

2022-10-09 15:23:52 +02:00
package main
import (
"context"
"errors"
2022-10-09 15:23:52 +02:00
"fmt"
"log/slog"
2022-10-09 15:23:52 +02:00
"net/http"
"os"
"os/signal"
"reflect"
"sync"
"syscall"
2022-12-16 16:02:16 +01:00
"time"
2022-10-09 15:23:52 +02:00
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/handler/extension"
"github.com/99designs/gqlgen/graphql/handler/lru"
"github.com/99designs/gqlgen/graphql/handler/transport"
2022-10-09 15:23:52 +02:00
"github.com/99designs/gqlgen/graphql/playground"
"github.com/alecthomas/kong"
2022-12-16 16:02:16 +01:00
"github.com/getsentry/sentry-go"
2022-10-09 15:23:52 +02:00
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/rs/cors"
"github.com/sparetimecoders/goamqp"
"github.com/vektah/gqlparser/v2/ast"
2022-10-09 15:23:52 +02:00
"gitlab.com/unboundsoftware/eventsourced/amqp"
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
"gitlab.com/unboundsoftware/eventsourced/pg"
"gitlab.com/unboundsoftware/schemas/cache"
"gitlab.com/unboundsoftware/schemas/domain"
"gitlab.com/unboundsoftware/schemas/graph"
"gitlab.com/unboundsoftware/schemas/graph/generated"
"gitlab.com/unboundsoftware/schemas/logging"
2022-10-09 15:23:52 +02:00
"gitlab.com/unboundsoftware/schemas/middleware"
"gitlab.com/unboundsoftware/schemas/store"
)
2022-12-16 16:02:16 +01:00
type CLI struct {
2024-03-01 22:41:37 +01:00
AmqpURL string `name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@unbound-control-plane.orb.local:5672/"`
2022-10-09 15:23:52 +02:00
Port int `name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080"`
LogLevel string `name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info"`
2024-03-01 22:41:37 +01:00
DatabaseURL string `name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@unbound-control-plane.orb.local:5432/schemas?sslmode=disable"`
2022-10-09 15:23:52 +02:00
DatabaseDriverName string `name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres"`
2023-04-27 07:09:10 +02:00
Issuer string `name:"issuer" env:"ISSUER" help:"The JWT token issuer to use" default:"unbound.eu.auth0.com"`
StrictSSL bool `name:"strict-ssl" env:"STRICT_SSL" help:"Should strict SSL handling be enabled" default:"true"`
2022-12-16 16:02:16 +01:00
SentryConfig
2022-10-09 15:23:52 +02:00
}
2022-12-16 16:02:16 +01:00
type SentryConfig struct {
DSN string `name:"sentry-dsn" env:"SENTRY_DSN" help:"Sentry dsn" default:""`
Environment string `name:"sentry-environment" env:"SENTRY_ENVIRONMENT" help:"Sentry environment" default:"development"`
}
var buildVersion = "none"
2022-10-09 15:23:52 +02:00
const serviceName = "schemas"
func main() {
2022-12-16 16:02:16 +01:00
var cli CLI
_ = kong.Parse(&cli)
logger := logging.SetupLogger(cli.LogLevel, serviceName, buildVersion)
2022-10-09 15:23:52 +02:00
closeEvents := make(chan error)
if err := start(
closeEvents,
logger,
ConnectAMQP,
2022-12-16 16:02:16 +01:00
cli,
2022-10-09 15:23:52 +02:00
); err != nil {
logger.With("error", err).Error("process error")
2022-10-09 15:23:52 +02:00
}
}
func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(url string) (Connection, error), cli CLI) error {
2022-12-16 16:02:16 +01:00
if err := setupSentry(logger, cli.SentryConfig); err != nil {
return err
}
defer sentry.Flush(2 * time.Second)
2022-10-09 15:23:52 +02:00
rootCtx, rootCancel := context.WithCancel(context.Background())
defer rootCancel()
2022-12-16 16:02:16 +01:00
db, err := store.SetupDB(cli.DatabaseDriverName, cli.DatabaseURL)
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt.Errorf("failed to setup DB: %v", err)
}
eventStore, err := pg.New(
2022-12-17 14:36:42 +01:00
rootCtx,
2022-10-09 15:23:52 +02:00
db.DB,
pg.WithEventTypes(
&domain.SubGraphUpdated{},
2023-04-27 07:09:10 +02:00
&domain.OrganizationAdded{},
&domain.APIKeyAdded{},
2022-10-09 15:23:52 +02:00
),
)
if err != nil {
return fmt.Errorf("failed to create eventstore: %v", err)
}
2023-04-27 07:09:10 +02:00
if err := store.RunEventStoreMigrations(db); err != nil {
return fmt.Errorf("event migrations: %w", err)
}
publisher := goamqp.NewPublisher()
2022-12-17 14:36:42 +01:00
eventPublisher, err := amqp.New(publisher)
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt.Errorf("failed to create event publisher: %v", err)
}
2022-12-16 16:02:16 +01:00
conn, err := connectToAmqpFunc(cli.AmqpURL)
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt.Errorf("failed to connect to AMQP: %v", err)
}
serviceCache := cache.New(logger)
2023-04-27 07:09:10 +02:00
if err := loadOrganizations(rootCtx, eventStore, serviceCache); err != nil {
return fmt.Errorf("caching organizations: %w", err)
2022-10-09 15:23:52 +02:00
}
2023-04-27 07:09:10 +02:00
if err := loadSubGraphs(rootCtx, eventStore, serviceCache); err != nil {
return fmt.Errorf("caching subgraphs: %w", err)
2022-10-09 15:23:52 +02:00
}
setups := []goamqp.Setup{
goamqp.UseLogger(func(s string) { logger.Error(s) }),
2022-10-09 15:23:52 +02:00
goamqp.CloseListener(closeEvents),
goamqp.WithPrefetchLimit(20),
2022-12-17 14:36:42 +01:00
goamqp.EventStreamPublisher(publisher),
2022-10-09 15:23:52 +02:00
goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}),
2023-04-27 07:09:10 +02:00
goamqp.TransientEventStreamConsumer("Organization.Added", serviceCache.Update, domain.OrganizationAdded{}),
goamqp.TransientEventStreamConsumer("Organization.APIKeyAdded", serviceCache.Update, domain.APIKeyAdded{}),
goamqp.WithTypeMapping("SubGraph.Updated", domain.SubGraphUpdated{}),
goamqp.WithTypeMapping("Organization.Added", domain.OrganizationAdded{}),
goamqp.WithTypeMapping("Organization.APIKeyAdded", domain.APIKeyAdded{}),
2022-10-09 15:23:52 +02:00
}
2022-12-16 16:02:16 +01:00
if err := conn.Start(rootCtx, setups...); err != nil {
2022-10-09 15:23:52 +02:00
return fmt.Errorf("failed to setup AMQP: %v", err)
}
defer func() { _ = conn.Close() }()
logger.Info("Started")
mux := http.NewServeMux()
2022-12-16 16:02:16 +01:00
httpSrv := &http.Server{Addr: fmt.Sprintf(":%d", cli.Port), Handler: mux}
2022-10-09 15:23:52 +02:00
wg := sync.WaitGroup{}
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
wg.Add(1)
go func() {
defer wg.Done()
sig := <-sigint
if sig != nil {
// In case our shutdown logic is broken/incomplete we reset signal
// handlers so next signal goes to go itself. Go is more aggressive when
// shutting down goroutines
signal.Reset(os.Interrupt, syscall.SIGTERM)
logger.Info("Got shutdown signal..")
rootCancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err := <-closeEvents
if err != nil {
logger.With("error", err).Error("received close from AMQP")
2022-10-09 15:23:52 +02:00
rootCancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
<-rootCtx.Done()
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownRelease()
if err := httpSrv.Shutdown(shutdownCtx); err != nil {
logger.With("error", err).Error("close http server")
2022-10-09 15:23:52 +02:00
}
close(sigint)
close(closeEvents)
}()
wg.Add(1)
go func() {
defer wg.Done()
defer rootCancel()
resolver := &graph.Resolver{
EventStore: eventStore,
2022-12-17 14:36:42 +01:00
Publisher: eventPublisher,
2022-10-09 15:23:52 +02:00
Logger: logger,
Cache: serviceCache,
}
config := generated.Config{
Resolvers: resolver,
Complexity: generated.ComplexityRoot{},
}
2023-04-27 07:09:10 +02:00
apiKeyMiddleware := middleware.NewApiKey()
mw := middleware.NewAuth0("https://schemas.unbound.se", cli.Issuer, cli.StrictSSL)
authMiddleware := middleware.NewAuth(serviceCache)
config.Directives.Auth = authMiddleware.Directive
srv := handler.New(generated.NewExecutableSchema(config))
srv.AddTransport(transport.Websocket{
KeepAlivePingInterval: 10 * time.Second,
})
srv.AddTransport(transport.Options{})
srv.AddTransport(transport.GET{})
srv.AddTransport(transport.POST{})
srv.AddTransport(transport.MultipartForm{})
srv.SetQueryCache(lru.New[*ast.QueryDocument](1000))
srv.Use(extension.Introspection{})
srv.Use(extension.AutomaticPersistedQuery{
Cache: lru.New[string](100),
})
2022-10-09 15:23:52 +02:00
sentryHandler := sentryhttp.New(sentryhttp.Options{Repanic: true})
mux.Handle("/", sentryHandler.HandleFunc(playground.Handler("GraphQL playground", "/query")))
2022-12-16 16:02:16 +01:00
mux.Handle("/health", http.HandlerFunc(healthFunc))
2023-04-27 07:09:10 +02:00
mux.Handle("/query", cors.AllowAll().Handler(
sentryHandler.Handle(
mw.Middleware().CheckJWT(
apiKeyMiddleware.Handler(
authMiddleware.Handler(srv),
),
),
),
))
2022-10-09 15:23:52 +02:00
logger.Info(fmt.Sprintf("connect to http://localhost:%d/ for GraphQL playground", cli.Port))
2022-10-09 15:23:52 +02:00
if err := httpSrv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
logger.With("error", err).Error("listen http")
2022-10-09 15:23:52 +02:00
}
}()
wg.Wait()
return nil
}
2023-04-27 07:09:10 +02:00
func loadOrganizations(ctx context.Context, eventStore eventsourced.EventStore, serviceCache *cache.Cache) error {
roots, err := eventStore.GetAggregateRoots(ctx, reflect.TypeOf(domain.Organization{}))
if err != nil {
return err
}
for _, root := range roots {
organization := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
if _, err := eventsourced.NewHandler(ctx, organization, eventStore); err != nil {
return err
}
_, err := serviceCache.Update(organization, nil)
if err != nil {
return err
}
}
return nil
}
func loadSubGraphs(ctx context.Context, eventStore eventsourced.EventStore, serviceCache *cache.Cache) error {
roots, err := eventStore.GetAggregateRoots(ctx, reflect.TypeOf(domain.SubGraph{}))
if err != nil {
return err
}
for _, root := range roots {
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
if _, err := eventsourced.NewHandler(ctx, subGraph, eventStore); err != nil {
return err
}
_, err := serviceCache.Update(subGraph, nil)
if err != nil {
return err
}
}
return nil
}
2022-10-09 15:23:52 +02:00
func healthFunc(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("OK"))
}
func setupSentry(logger *slog.Logger, args SentryConfig) error {
2022-12-16 16:02:16 +01:00
if args.Environment == "" {
return fmt.Errorf("no Sentry environment supplied, exiting")
}
cfg := sentry.ClientOptions{
Dsn: args.DSN,
Environment: args.Environment,
Release: fmt.Sprintf("%s-%s", serviceName, buildVersion),
}
switch args.Environment {
case "development":
cfg.Debug = true
cfg.EnableTracing = false
cfg.TracesSampleRate = 0.0
case "production":
if args.DSN == "" {
return fmt.Errorf("no DSN supplied for non-dev environment, exiting")
}
cfg.Debug = false
cfg.EnableTracing = true
2023-03-30 14:32:59 +02:00
cfg.TracesSampleRate = 0.01
2022-12-16 16:02:16 +01:00
default:
return fmt.Errorf("illegal environment %s", args.Environment)
}
if err := sentry.Init(cfg); err != nil {
return fmt.Errorf("sentry setup: %w", err)
}
logger.With("environment", args.Environment).Info("configured Sentry")
2022-12-16 16:02:16 +01:00
return nil
}
2022-10-09 15:23:52 +02:00
func ConnectAMQP(url string) (Connection, error) {
return goamqp.NewFromURL(serviceName, url)
}
type Connection interface {
2022-12-16 16:02:16 +01:00
Start(ctx context.Context, opts ...goamqp.Setup) error
2022-10-09 15:23:52 +02:00
Close() error
}