chore: add context and error handling

This commit is contained in:
2022-12-17 14:36:42 +01:00
parent 80d3c44cb0
commit 98a679d2d3
4 changed files with 23 additions and 16 deletions
+10 -6
View File
@@ -84,6 +84,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
} }
eventStore, err := pg.New( eventStore, err := pg.New(
rootCtx,
db.DB, db.DB,
pg.WithEventTypes( pg.WithEventTypes(
&domain.SubGraphUpdated{}, &domain.SubGraphUpdated{},
@@ -92,29 +93,32 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
if err != nil { if err != nil {
return fmt.Errorf("failed to create eventstore: %v", err) return fmt.Errorf("failed to create eventstore: %v", err)
} }
eventPublisher, err := goamqp.NewPublisher( publisher, err := goamqp.NewPublisher(
goamqp.Route{ goamqp.Route{
Type: domain.SubGraphUpdated{}, Type: domain.SubGraphUpdated{},
Key: "SubGraph.Updated", Key: "SubGraph.Updated",
}, },
) )
if err != nil {
return fmt.Errorf("failed to create publisher: %v", err)
}
eventPublisher, err := amqp.New(publisher)
if err != nil { if err != nil {
return fmt.Errorf("failed to create event publisher: %v", err) return fmt.Errorf("failed to create event publisher: %v", err)
} }
amqp.New(eventPublisher)
conn, err := connectToAmqpFunc(cli.AmqpURL) conn, err := connectToAmqpFunc(cli.AmqpURL)
if err != nil { if err != nil {
return fmt.Errorf("failed to connect to AMQP: %v", err) return fmt.Errorf("failed to connect to AMQP: %v", err)
} }
serviceCache := cache.New(logger) serviceCache := cache.New(logger)
roots, err := eventStore.GetAggregateRoots(reflect.TypeOf(domain.SubGraph{})) roots, err := eventStore.GetAggregateRoots(rootCtx, reflect.TypeOf(domain.SubGraph{}))
if err != nil { if err != nil {
return err return err
} }
for _, root := range roots { for _, root := range roots {
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())} subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
if _, err := eventsourced.NewHandler(subGraph, eventStore); err != nil { if _, err := eventsourced.NewHandler(rootCtx, subGraph, eventStore); err != nil {
return err return err
} }
_, err := serviceCache.Update(subGraph, nil) _, err := serviceCache.Update(subGraph, nil)
@@ -126,7 +130,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
goamqp.UseLogger(logger.Errorf), goamqp.UseLogger(logger.Errorf),
goamqp.CloseListener(closeEvents), goamqp.CloseListener(closeEvents),
goamqp.WithPrefetchLimit(20), goamqp.WithPrefetchLimit(20),
goamqp.EventStreamPublisher(eventPublisher), goamqp.EventStreamPublisher(publisher),
goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}), goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}),
} }
if err := conn.Start(rootCtx, setups...); err != nil { if err := conn.Start(rootCtx, setups...); err != nil {
@@ -187,7 +191,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
resolver := &graph.Resolver{ resolver := &graph.Resolver{
EventStore: eventStore, EventStore: eventStore,
Publisher: amqp.New(eventPublisher), Publisher: eventPublisher,
Logger: logger, Logger: logger,
Cache: serviceCache, Cache: serviceCache,
} }
+5 -4
View File
@@ -1,6 +1,8 @@
package graph package graph
import ( import (
"context"
"github.com/apex/log" "github.com/apex/log"
"gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitlab.com/unboundsoftware/eventsourced/eventsourced"
@@ -14,8 +16,7 @@ import (
// It serves as dependency injection for your app, add any dependencies you require here. // It serves as dependency injection for your app, add any dependencies you require here.
type Publisher interface { type Publisher interface {
Publish(event eventsourced.Event) error Publish(ctx context.Context, event eventsourced.Event) error
Stop() error
} }
type Resolver struct { type Resolver struct {
@@ -25,6 +26,6 @@ type Resolver struct {
Cache *cache.Cache Cache *cache.Cache
} }
func (r *Resolver) handler(aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) { func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) {
return eventsourced.NewHandler(aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher)) return eventsourced.NewHandler(ctx, aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher))
} }
+4 -2
View File
@@ -1,15 +1,17 @@
package graph package graph
import ( import (
"context"
"gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitlab.com/unboundsoftware/eventsourced/eventsourced"
"gitlab.com/unboundsoftware/schemas/domain" "gitlab.com/unboundsoftware/schemas/domain"
"gitlab.com/unboundsoftware/schemas/graph/model" "gitlab.com/unboundsoftware/schemas/graph/model"
) )
func (r *Resolver) fetchSubGraph(subGraphId string) (*domain.SubGraph, error) { func (r *Resolver) fetchSubGraph(ctx context.Context, subGraphId string) (*domain.SubGraph, error) {
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(subGraphId)} subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(subGraphId)}
_, err := r.handler(subGraph) _, err := r.handler(ctx, subGraph)
if err != nil { if err != nil {
return nil, err return nil, err
} }
+4 -4
View File
@@ -24,7 +24,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
if subGraphId != "" { if subGraphId != "" {
subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId) subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId)
} }
handler, err := r.handler(subGraph) handler, err := r.handler(ctx, subGraph)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -36,7 +36,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
serviceSDLs := []string{input.Sdl} serviceSDLs := []string{input.Sdl}
services, _ := r.Cache.Services(input.Ref, "") services, _ := r.Cache.Services(input.Ref, "")
for _, id := range services { for _, id := range services {
sg, err := r.fetchSubGraph(id) sg, err := r.fetchSubGraph(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -48,7 +48,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
if err != nil { if err != nil {
return nil, err return nil, err
} }
_, err = handler.Handle(domain.UpdateSubGraph{ _, err = handler.Handle(ctx, domain.UpdateSubGraph{
Ref: input.Ref, Ref: input.Ref,
Service: input.Service, Service: input.Service,
Url: input.URL, Url: input.URL,
@@ -89,7 +89,7 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
} }
subGraphs := make([]*model.SubGraph, len(services)) subGraphs := make([]*model.SubGraph, len(services))
for i, id := range services { for i, id := range services {
sg, err := r.fetchSubGraph(id) sg, err := r.fetchSubGraph(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }