From 98a679d2d3ef6c1e4d41e2fd8cd73bfbf53b7ace Mon Sep 17 00:00:00 2001 From: Joakim Olsson Date: Sat, 17 Dec 2022 14:36:42 +0100 Subject: [PATCH] chore: add context and error handling --- cmd/service/service.go | 16 ++++++++++------ graph/resolver.go | 9 +++++---- graph/schema.helpers.go | 6 ++++-- graph/schema.resolvers.go | 8 ++++---- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/cmd/service/service.go b/cmd/service/service.go index 686f398..5b287c6 100644 --- a/cmd/service/service.go +++ b/cmd/service/service.go @@ -84,6 +84,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url } eventStore, err := pg.New( + rootCtx, db.DB, pg.WithEventTypes( &domain.SubGraphUpdated{}, @@ -92,29 +93,32 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url if err != nil { return fmt.Errorf("failed to create eventstore: %v", err) } - eventPublisher, err := goamqp.NewPublisher( + publisher, err := goamqp.NewPublisher( goamqp.Route{ Type: domain.SubGraphUpdated{}, Key: "SubGraph.Updated", }, ) + if err != nil { + return fmt.Errorf("failed to create publisher: %v", err) + } + eventPublisher, err := amqp.New(publisher) if err != nil { return fmt.Errorf("failed to create event publisher: %v", err) } - amqp.New(eventPublisher) conn, err := connectToAmqpFunc(cli.AmqpURL) if err != nil { return fmt.Errorf("failed to connect to AMQP: %v", err) } serviceCache := cache.New(logger) - roots, err := eventStore.GetAggregateRoots(reflect.TypeOf(domain.SubGraph{})) + roots, err := eventStore.GetAggregateRoots(rootCtx, reflect.TypeOf(domain.SubGraph{})) if err != nil { return err } for _, root := range roots { subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())} - if _, err := eventsourced.NewHandler(subGraph, eventStore); err != nil { + if _, err := eventsourced.NewHandler(rootCtx, subGraph, eventStore); err != nil { return err } _, err := serviceCache.Update(subGraph, nil) @@ -126,7 +130,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url goamqp.UseLogger(logger.Errorf), goamqp.CloseListener(closeEvents), goamqp.WithPrefetchLimit(20), - goamqp.EventStreamPublisher(eventPublisher), + goamqp.EventStreamPublisher(publisher), goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}), } if err := conn.Start(rootCtx, setups...); err != nil { @@ -187,7 +191,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url resolver := &graph.Resolver{ EventStore: eventStore, - Publisher: amqp.New(eventPublisher), + Publisher: eventPublisher, Logger: logger, Cache: serviceCache, } diff --git a/graph/resolver.go b/graph/resolver.go index 55a1df0..56de381 100644 --- a/graph/resolver.go +++ b/graph/resolver.go @@ -1,6 +1,8 @@ package graph import ( + "context" + "github.com/apex/log" "gitlab.com/unboundsoftware/eventsourced/eventsourced" @@ -14,8 +16,7 @@ import ( // It serves as dependency injection for your app, add any dependencies you require here. type Publisher interface { - Publish(event eventsourced.Event) error - Stop() error + Publish(ctx context.Context, event eventsourced.Event) error } type Resolver struct { @@ -25,6 +26,6 @@ type Resolver struct { Cache *cache.Cache } -func (r *Resolver) handler(aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) { - return eventsourced.NewHandler(aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher)) +func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) { + return eventsourced.NewHandler(ctx, aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher)) } diff --git a/graph/schema.helpers.go b/graph/schema.helpers.go index 6cb1b02..6dc24d2 100644 --- a/graph/schema.helpers.go +++ b/graph/schema.helpers.go @@ -1,15 +1,17 @@ package graph import ( + "context" + "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitlab.com/unboundsoftware/schemas/domain" "gitlab.com/unboundsoftware/schemas/graph/model" ) -func (r *Resolver) fetchSubGraph(subGraphId string) (*domain.SubGraph, error) { +func (r *Resolver) fetchSubGraph(ctx context.Context, subGraphId string) (*domain.SubGraph, error) { subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(subGraphId)} - _, err := r.handler(subGraph) + _, err := r.handler(ctx, subGraph) if err != nil { return nil, err } diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 58af27c..3086782 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -24,7 +24,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input if subGraphId != "" { subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId) } - handler, err := r.handler(subGraph) + handler, err := r.handler(ctx, subGraph) if err != nil { return nil, err } @@ -36,7 +36,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input serviceSDLs := []string{input.Sdl} services, _ := r.Cache.Services(input.Ref, "") for _, id := range services { - sg, err := r.fetchSubGraph(id) + sg, err := r.fetchSubGraph(ctx, id) if err != nil { return nil, err } @@ -48,7 +48,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input if err != nil { return nil, err } - _, err = handler.Handle(domain.UpdateSubGraph{ + _, err = handler.Handle(ctx, domain.UpdateSubGraph{ Ref: input.Ref, Service: input.Service, Url: input.URL, @@ -89,7 +89,7 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str } subGraphs := make([]*model.SubGraph, len(services)) for i, id := range services { - sg, err := r.fetchSubGraph(id) + sg, err := r.fetchSubGraph(ctx, id) if err != nil { return nil, err }