feat: initial commit
This commit is contained in:
@@ -0,0 +1,216 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"reflect"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/99designs/gqlgen/graphql/handler"
|
||||
"github.com/99designs/gqlgen/graphql/playground"
|
||||
"github.com/alecthomas/kong"
|
||||
"github.com/apex/log"
|
||||
"github.com/apex/log/handlers/json"
|
||||
sentryhttp "github.com/getsentry/sentry-go/http"
|
||||
"github.com/rs/cors"
|
||||
"github.com/sparetimecoders/goamqp"
|
||||
"gitlab.com/unboundsoftware/eventsourced/amqp"
|
||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||
"gitlab.com/unboundsoftware/eventsourced/pg"
|
||||
|
||||
"gitlab.com/unboundsoftware/schemas/cache"
|
||||
"gitlab.com/unboundsoftware/schemas/domain"
|
||||
"gitlab.com/unboundsoftware/schemas/graph"
|
||||
"gitlab.com/unboundsoftware/schemas/graph/generated"
|
||||
"gitlab.com/unboundsoftware/schemas/middleware"
|
||||
"gitlab.com/unboundsoftware/schemas/store"
|
||||
)
|
||||
|
||||
var CLI struct {
|
||||
AmqpURL string `name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@localhost:5672/"`
|
||||
Port int `name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080"`
|
||||
APIKey string `name:"api-key" env:"API_KEY" help:"The API-key that is required"`
|
||||
LogLevel string `name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info"`
|
||||
DatabaseURL string `name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@:5432/schemas?sslmode=disable"`
|
||||
DatabaseDriverName string `name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres"`
|
||||
}
|
||||
|
||||
const serviceName = "schemas"
|
||||
|
||||
func main() {
|
||||
_ = kong.Parse(&CLI)
|
||||
log.SetHandler(json.New(os.Stdout))
|
||||
log.SetLevelFromString(CLI.LogLevel)
|
||||
logger := log.WithField("service", serviceName)
|
||||
closeEvents := make(chan error)
|
||||
|
||||
if err := start(
|
||||
closeEvents,
|
||||
logger,
|
||||
ConnectAMQP,
|
||||
); err != nil {
|
||||
logger.WithError(err).Error("process error")
|
||||
}
|
||||
}
|
||||
|
||||
func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url string) (Connection, error)) error {
|
||||
rootCtx, rootCancel := context.WithCancel(context.Background())
|
||||
defer rootCancel()
|
||||
|
||||
db, err := store.SetupDB(CLI.DatabaseDriverName, CLI.DatabaseURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to setup DB: %v", err)
|
||||
}
|
||||
|
||||
eventStore, err := pg.New(
|
||||
db.DB,
|
||||
pg.WithEventTypes(
|
||||
&domain.SubGraphUpdated{},
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create eventstore: %v", err)
|
||||
}
|
||||
eventPublisher, err := goamqp.NewPublisher(
|
||||
goamqp.Route{
|
||||
Type: domain.SubGraphUpdated{},
|
||||
Key: "SubGraph.Updated",
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create event publisher: %v", err)
|
||||
}
|
||||
amqp.New(eventPublisher)
|
||||
conn, err := connectToAmqpFunc(CLI.AmqpURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to AMQP: %v", err)
|
||||
}
|
||||
|
||||
serviceCache := cache.New(logger)
|
||||
roots, err := eventStore.GetAggregateRoots(reflect.TypeOf(domain.SubGraph{}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, root := range roots {
|
||||
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
|
||||
if _, err := eventsourced.NewHandler(subGraph, eventStore); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := serviceCache.Update(subGraph, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
setups := []goamqp.Setup{
|
||||
goamqp.UseLogger(logger.Errorf),
|
||||
goamqp.CloseListener(closeEvents),
|
||||
goamqp.WithPrefetchLimit(20),
|
||||
goamqp.EventStreamPublisher(eventPublisher),
|
||||
goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}),
|
||||
}
|
||||
if err := conn.Start(setups...); err != nil {
|
||||
return fmt.Errorf("failed to setup AMQP: %v", err)
|
||||
}
|
||||
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
logger.Info("Started")
|
||||
|
||||
mux := http.NewServeMux()
|
||||
httpSrv := &http.Server{Addr: fmt.Sprintf(":%d", CLI.Port), Handler: mux}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
sig := <-sigint
|
||||
if sig != nil {
|
||||
// In case our shutdown logic is broken/incomplete we reset signal
|
||||
// handlers so next signal goes to go itself. Go is more aggressive when
|
||||
// shutting down goroutines
|
||||
signal.Reset(os.Interrupt, syscall.SIGTERM)
|
||||
logger.Info("Got shutdown signal..")
|
||||
rootCancel()
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := <-closeEvents
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("received close from AMQP")
|
||||
rootCancel()
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-rootCtx.Done()
|
||||
|
||||
if err := httpSrv.Close(); err != nil {
|
||||
logger.WithError(err).Error("close http server")
|
||||
}
|
||||
close(sigint)
|
||||
close(closeEvents)
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer rootCancel()
|
||||
|
||||
resolver := &graph.Resolver{
|
||||
EventStore: eventStore,
|
||||
Publisher: amqp.New(eventPublisher),
|
||||
Logger: logger,
|
||||
Cache: serviceCache,
|
||||
}
|
||||
|
||||
config := generated.Config{
|
||||
Resolvers: resolver,
|
||||
Complexity: generated.ComplexityRoot{},
|
||||
}
|
||||
apiKeyMiddleware := middleware.NewApiKey(CLI.APIKey, logger)
|
||||
config.Directives.HasApiKey = apiKeyMiddleware.Directive
|
||||
srv := handler.NewDefaultServer(generated.NewExecutableSchema(
|
||||
config,
|
||||
))
|
||||
|
||||
sentryHandler := sentryhttp.New(sentryhttp.Options{Repanic: true})
|
||||
mux.Handle("/", sentryHandler.HandleFunc(playground.Handler("GraphQL playground", "/query")))
|
||||
mux.Handle("/health", sentryHandler.HandleFunc(healthFunc))
|
||||
mux.Handle("/query", cors.AllowAll().Handler(sentryHandler.Handle(apiKeyMiddleware.Handler(srv))))
|
||||
|
||||
logger.Infof("connect to http://localhost:%d/ for GraphQL playground", CLI.Port)
|
||||
|
||||
if err := httpSrv.ListenAndServe(); err != nil {
|
||||
logger.WithError(err).Error("listen http")
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func healthFunc(w http.ResponseWriter, _ *http.Request) {
|
||||
_, _ = w.Write([]byte("OK"))
|
||||
}
|
||||
|
||||
func ConnectAMQP(url string) (Connection, error) {
|
||||
return goamqp.NewFromURL(serviceName, url)
|
||||
}
|
||||
|
||||
type Connection interface {
|
||||
Start(opts ...goamqp.Setup) error
|
||||
Close() error
|
||||
}
|
||||
Reference in New Issue
Block a user