2022-10-09 15:23:52 +02:00
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"reflect"
"sync"
"syscall"
2022-12-16 16:02:16 +01:00
"time"
2022-10-09 15:23:52 +02:00
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/alecthomas/kong"
"github.com/apex/log"
"github.com/apex/log/handlers/json"
2022-12-16 16:02:16 +01:00
"github.com/getsentry/sentry-go"
2022-10-09 15:23:52 +02:00
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/rs/cors"
"github.com/sparetimecoders/goamqp"
"gitlab.com/unboundsoftware/eventsourced/amqp"
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
"gitlab.com/unboundsoftware/eventsourced/pg"
"gitlab.com/unboundsoftware/schemas/cache"
"gitlab.com/unboundsoftware/schemas/domain"
"gitlab.com/unboundsoftware/schemas/graph"
"gitlab.com/unboundsoftware/schemas/graph/generated"
"gitlab.com/unboundsoftware/schemas/middleware"
"gitlab.com/unboundsoftware/schemas/store"
)
2022-12-16 16:02:16 +01:00
type CLI struct {
2022-10-09 15:23:52 +02:00
AmqpURL string ` name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@localhost:5672/" `
Port int ` name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080" `
LogLevel string ` name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info" `
DatabaseURL string ` name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@:5432/schemas?sslmode=disable" `
DatabaseDriverName string ` name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres" `
2023-04-27 07:09:10 +02:00
Issuer string ` name:"issuer" env:"ISSUER" help:"The JWT token issuer to use" default:"unbound.eu.auth0.com" `
StrictSSL bool ` name:"strict-ssl" env:"STRICT_SSL" help:"Should strict SSL handling be enabled" default:"true" `
2022-12-16 16:02:16 +01:00
SentryConfig
2022-10-09 15:23:52 +02:00
}
2022-12-16 16:02:16 +01:00
type SentryConfig struct {
DSN string ` name:"sentry-dsn" env:"SENTRY_DSN" help:"Sentry dsn" default:"" `
Environment string ` name:"sentry-environment" env:"SENTRY_ENVIRONMENT" help:"Sentry environment" default:"development" `
}
var buildVersion = "none"
2022-10-09 15:23:52 +02:00
const serviceName = "schemas"
func main ( ) {
2022-12-16 16:02:16 +01:00
var cli CLI
_ = kong . Parse ( & cli )
2022-10-09 15:23:52 +02:00
log . SetHandler ( json . New ( os . Stdout ) )
2022-12-16 16:02:16 +01:00
log . SetLevelFromString ( cli . LogLevel )
2022-10-09 15:23:52 +02:00
logger := log . WithField ( "service" , serviceName )
closeEvents := make ( chan error )
if err := start (
closeEvents ,
logger ,
ConnectAMQP ,
2022-12-16 16:02:16 +01:00
cli ,
2022-10-09 15:23:52 +02:00
) ; err != nil {
logger . WithError ( err ) . Error ( "process error" )
}
}
2022-12-16 16:02:16 +01:00
func start ( closeEvents chan error , logger * log . Entry , connectToAmqpFunc func ( url string ) ( Connection , error ) , cli CLI ) error {
if err := setupSentry ( logger , cli . SentryConfig ) ; err != nil {
return err
}
defer sentry . Flush ( 2 * time . Second )
2022-10-09 15:23:52 +02:00
rootCtx , rootCancel := context . WithCancel ( context . Background ( ) )
defer rootCancel ( )
2022-12-16 16:02:16 +01:00
db , err := store . SetupDB ( cli . DatabaseDriverName , cli . DatabaseURL )
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt . Errorf ( "failed to setup DB: %v" , err )
}
eventStore , err := pg . New (
2022-12-17 14:36:42 +01:00
rootCtx ,
2022-10-09 15:23:52 +02:00
db . DB ,
pg . WithEventTypes (
& domain . SubGraphUpdated { } ,
2023-04-27 07:09:10 +02:00
& domain . OrganizationAdded { } ,
& domain . APIKeyAdded { } ,
2022-10-09 15:23:52 +02:00
) ,
)
if err != nil {
return fmt . Errorf ( "failed to create eventstore: %v" , err )
}
2023-04-27 07:09:10 +02:00
if err := store . RunEventStoreMigrations ( db ) ; err != nil {
return fmt . Errorf ( "event migrations: %w" , err )
}
2022-12-17 14:36:42 +01:00
publisher , err := goamqp . NewPublisher (
2022-10-09 15:23:52 +02:00
goamqp . Route {
Type : domain . SubGraphUpdated { } ,
Key : "SubGraph.Updated" ,
} ,
2023-04-27 07:09:10 +02:00
goamqp . Route {
Type : domain . OrganizationAdded { } ,
Key : "Organization.Added" ,
} ,
goamqp . Route {
Type : domain . APIKeyAdded { } ,
Key : "Organization.APIKeyAdded" ,
} ,
2022-10-09 15:23:52 +02:00
)
2022-12-17 14:36:42 +01:00
if err != nil {
return fmt . Errorf ( "failed to create publisher: %v" , err )
}
eventPublisher , err := amqp . New ( publisher )
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt . Errorf ( "failed to create event publisher: %v" , err )
}
2022-12-16 16:02:16 +01:00
conn , err := connectToAmqpFunc ( cli . AmqpURL )
2022-10-09 15:23:52 +02:00
if err != nil {
return fmt . Errorf ( "failed to connect to AMQP: %v" , err )
}
serviceCache := cache . New ( logger )
2023-04-27 07:09:10 +02:00
if err := loadOrganizations ( rootCtx , eventStore , serviceCache ) ; err != nil {
return fmt . Errorf ( "caching organizations: %w" , err )
2022-10-09 15:23:52 +02:00
}
2023-04-27 07:09:10 +02:00
if err := loadSubGraphs ( rootCtx , eventStore , serviceCache ) ; err != nil {
return fmt . Errorf ( "caching subgraphs: %w" , err )
2022-10-09 15:23:52 +02:00
}
setups := [ ] goamqp . Setup {
goamqp . UseLogger ( logger . Errorf ) ,
goamqp . CloseListener ( closeEvents ) ,
goamqp . WithPrefetchLimit ( 20 ) ,
2022-12-17 14:36:42 +01:00
goamqp . EventStreamPublisher ( publisher ) ,
2022-10-09 15:23:52 +02:00
goamqp . TransientEventStreamConsumer ( "SubGraph.Updated" , serviceCache . Update , domain . SubGraphUpdated { } ) ,
2023-04-27 07:09:10 +02:00
goamqp . TransientEventStreamConsumer ( "Organization.Added" , serviceCache . Update , domain . OrganizationAdded { } ) ,
goamqp . TransientEventStreamConsumer ( "Organization.APIKeyAdded" , serviceCache . Update , domain . APIKeyAdded { } ) ,
2022-10-09 15:23:52 +02:00
}
2022-12-16 16:02:16 +01:00
if err := conn . Start ( rootCtx , setups ... ) ; err != nil {
2022-10-09 15:23:52 +02:00
return fmt . Errorf ( "failed to setup AMQP: %v" , err )
}
defer func ( ) { _ = conn . Close ( ) } ( )
logger . Info ( "Started" )
mux := http . NewServeMux ( )
2022-12-16 16:02:16 +01:00
httpSrv := & http . Server { Addr : fmt . Sprintf ( ":%d" , cli . Port ) , Handler : mux }
2022-10-09 15:23:52 +02:00
wg := sync . WaitGroup { }
sigint := make ( chan os . Signal , 1 )
signal . Notify ( sigint , os . Interrupt , syscall . SIGTERM )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
sig := <- sigint
if sig != nil {
// In case our shutdown logic is broken/incomplete we reset signal
// handlers so next signal goes to go itself. Go is more aggressive when
// shutting down goroutines
signal . Reset ( os . Interrupt , syscall . SIGTERM )
logger . Info ( "Got shutdown signal.." )
rootCancel ( )
}
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
err := <- closeEvents
if err != nil {
logger . WithError ( err ) . Error ( "received close from AMQP" )
rootCancel ( )
}
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
<- rootCtx . Done ( )
if err := httpSrv . Close ( ) ; err != nil {
logger . WithError ( err ) . Error ( "close http server" )
}
close ( sigint )
close ( closeEvents )
} ( )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
defer rootCancel ( )
resolver := & graph . Resolver {
EventStore : eventStore ,
2022-12-17 14:36:42 +01:00
Publisher : eventPublisher ,
2022-10-09 15:23:52 +02:00
Logger : logger ,
Cache : serviceCache ,
}
config := generated . Config {
Resolvers : resolver ,
Complexity : generated . ComplexityRoot { } ,
}
2023-04-27 07:09:10 +02:00
apiKeyMiddleware := middleware . NewApiKey ( )
mw := middleware . NewAuth0 ( "https://schemas.unbound.se" , cli . Issuer , cli . StrictSSL )
authMiddleware := middleware . NewAuth ( serviceCache )
config . Directives . Auth = authMiddleware . Directive
2022-10-09 15:23:52 +02:00
srv := handler . NewDefaultServer ( generated . NewExecutableSchema (
config ,
) )
sentryHandler := sentryhttp . New ( sentryhttp . Options { Repanic : true } )
mux . Handle ( "/" , sentryHandler . HandleFunc ( playground . Handler ( "GraphQL playground" , "/query" ) ) )
2022-12-16 16:02:16 +01:00
mux . Handle ( "/health" , http . HandlerFunc ( healthFunc ) )
2023-04-27 07:09:10 +02:00
mux . Handle ( "/query" , cors . AllowAll ( ) . Handler (
sentryHandler . Handle (
mw . Middleware ( ) . CheckJWT (
apiKeyMiddleware . Handler (
authMiddleware . Handler ( srv ) ,
) ,
) ,
) ,
) )
2022-10-09 15:23:52 +02:00
2022-12-16 16:02:16 +01:00
logger . Infof ( "connect to http://localhost:%d/ for GraphQL playground" , cli . Port )
2022-10-09 15:23:52 +02:00
if err := httpSrv . ListenAndServe ( ) ; err != nil {
logger . WithError ( err ) . Error ( "listen http" )
}
} ( )
wg . Wait ( )
return nil
}
2023-04-27 07:09:10 +02:00
func loadOrganizations ( ctx context . Context , eventStore eventsourced . EventStore , serviceCache * cache . Cache ) error {
roots , err := eventStore . GetAggregateRoots ( ctx , reflect . TypeOf ( domain . Organization { } ) )
if err != nil {
return err
}
for _ , root := range roots {
organization := & domain . Organization { BaseAggregate : eventsourced . BaseAggregateFromString ( root . String ( ) ) }
if _ , err := eventsourced . NewHandler ( ctx , organization , eventStore ) ; err != nil {
return err
}
_ , err := serviceCache . Update ( organization , nil )
if err != nil {
return err
}
}
return nil
}
func loadSubGraphs ( ctx context . Context , eventStore eventsourced . EventStore , serviceCache * cache . Cache ) error {
roots , err := eventStore . GetAggregateRoots ( ctx , reflect . TypeOf ( domain . SubGraph { } ) )
if err != nil {
return err
}
for _ , root := range roots {
subGraph := & domain . SubGraph { BaseAggregate : eventsourced . BaseAggregateFromString ( root . String ( ) ) }
if _ , err := eventsourced . NewHandler ( ctx , subGraph , eventStore ) ; err != nil {
return err
}
_ , err := serviceCache . Update ( subGraph , nil )
if err != nil {
return err
}
}
return nil
}
2022-10-09 15:23:52 +02:00
func healthFunc ( w http . ResponseWriter , _ * http . Request ) {
_ , _ = w . Write ( [ ] byte ( "OK" ) )
}
2022-12-16 16:02:16 +01:00
func setupSentry ( logger log . Interface , args SentryConfig ) error {
if args . Environment == "" {
return fmt . Errorf ( "no Sentry environment supplied, exiting" )
}
cfg := sentry . ClientOptions {
Dsn : args . DSN ,
Environment : args . Environment ,
Release : fmt . Sprintf ( "%s-%s" , serviceName , buildVersion ) ,
}
switch args . Environment {
case "development" :
cfg . Debug = true
cfg . EnableTracing = false
cfg . TracesSampleRate = 0.0
case "production" :
if args . DSN == "" {
return fmt . Errorf ( "no DSN supplied for non-dev environment, exiting" )
}
cfg . Debug = false
cfg . EnableTracing = true
2023-03-30 14:32:59 +02:00
cfg . TracesSampleRate = 0.01
2022-12-16 16:02:16 +01:00
default :
return fmt . Errorf ( "illegal environment %s" , args . Environment )
}
if err := sentry . Init ( cfg ) ; err != nil {
return fmt . Errorf ( "sentry setup: %w" , err )
}
logger . Infof ( "configured Sentry for env: %s" , args . Environment )
return nil
}
2022-10-09 15:23:52 +02:00
func ConnectAMQP ( url string ) ( Connection , error ) {
return goamqp . NewFromURL ( serviceName , url )
}
type Connection interface {
2022-12-16 16:02:16 +01:00
Start ( ctx context . Context , opts ... goamqp . Setup ) error
2022-10-09 15:23:52 +02:00
Close ( ) error
}