chore: add context and error handling
This commit is contained in:
+10
-6
@@ -84,6 +84,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
|||||||
}
|
}
|
||||||
|
|
||||||
eventStore, err := pg.New(
|
eventStore, err := pg.New(
|
||||||
|
rootCtx,
|
||||||
db.DB,
|
db.DB,
|
||||||
pg.WithEventTypes(
|
pg.WithEventTypes(
|
||||||
&domain.SubGraphUpdated{},
|
&domain.SubGraphUpdated{},
|
||||||
@@ -92,29 +93,32 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create eventstore: %v", err)
|
return fmt.Errorf("failed to create eventstore: %v", err)
|
||||||
}
|
}
|
||||||
eventPublisher, err := goamqp.NewPublisher(
|
publisher, err := goamqp.NewPublisher(
|
||||||
goamqp.Route{
|
goamqp.Route{
|
||||||
Type: domain.SubGraphUpdated{},
|
Type: domain.SubGraphUpdated{},
|
||||||
Key: "SubGraph.Updated",
|
Key: "SubGraph.Updated",
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create publisher: %v", err)
|
||||||
|
}
|
||||||
|
eventPublisher, err := amqp.New(publisher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create event publisher: %v", err)
|
return fmt.Errorf("failed to create event publisher: %v", err)
|
||||||
}
|
}
|
||||||
amqp.New(eventPublisher)
|
|
||||||
conn, err := connectToAmqpFunc(cli.AmqpURL)
|
conn, err := connectToAmqpFunc(cli.AmqpURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to connect to AMQP: %v", err)
|
return fmt.Errorf("failed to connect to AMQP: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceCache := cache.New(logger)
|
serviceCache := cache.New(logger)
|
||||||
roots, err := eventStore.GetAggregateRoots(reflect.TypeOf(domain.SubGraph{}))
|
roots, err := eventStore.GetAggregateRoots(rootCtx, reflect.TypeOf(domain.SubGraph{}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, root := range roots {
|
for _, root := range roots {
|
||||||
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
|
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
|
||||||
if _, err := eventsourced.NewHandler(subGraph, eventStore); err != nil {
|
if _, err := eventsourced.NewHandler(rootCtx, subGraph, eventStore); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err := serviceCache.Update(subGraph, nil)
|
_, err := serviceCache.Update(subGraph, nil)
|
||||||
@@ -126,7 +130,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
|||||||
goamqp.UseLogger(logger.Errorf),
|
goamqp.UseLogger(logger.Errorf),
|
||||||
goamqp.CloseListener(closeEvents),
|
goamqp.CloseListener(closeEvents),
|
||||||
goamqp.WithPrefetchLimit(20),
|
goamqp.WithPrefetchLimit(20),
|
||||||
goamqp.EventStreamPublisher(eventPublisher),
|
goamqp.EventStreamPublisher(publisher),
|
||||||
goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}),
|
goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}),
|
||||||
}
|
}
|
||||||
if err := conn.Start(rootCtx, setups...); err != nil {
|
if err := conn.Start(rootCtx, setups...); err != nil {
|
||||||
@@ -187,7 +191,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
|||||||
|
|
||||||
resolver := &graph.Resolver{
|
resolver := &graph.Resolver{
|
||||||
EventStore: eventStore,
|
EventStore: eventStore,
|
||||||
Publisher: amqp.New(eventPublisher),
|
Publisher: eventPublisher,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
Cache: serviceCache,
|
Cache: serviceCache,
|
||||||
}
|
}
|
||||||
|
|||||||
+5
-4
@@ -1,6 +1,8 @@
|
|||||||
package graph
|
package graph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/apex/log"
|
"github.com/apex/log"
|
||||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||||
|
|
||||||
@@ -14,8 +16,7 @@ import (
|
|||||||
// It serves as dependency injection for your app, add any dependencies you require here.
|
// It serves as dependency injection for your app, add any dependencies you require here.
|
||||||
|
|
||||||
type Publisher interface {
|
type Publisher interface {
|
||||||
Publish(event eventsourced.Event) error
|
Publish(ctx context.Context, event eventsourced.Event) error
|
||||||
Stop() error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Resolver struct {
|
type Resolver struct {
|
||||||
@@ -25,6 +26,6 @@ type Resolver struct {
|
|||||||
Cache *cache.Cache
|
Cache *cache.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resolver) handler(aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) {
|
func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) {
|
||||||
return eventsourced.NewHandler(aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher))
|
return eventsourced.NewHandler(ctx, aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,17 @@
|
|||||||
package graph
|
package graph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||||
|
|
||||||
"gitlab.com/unboundsoftware/schemas/domain"
|
"gitlab.com/unboundsoftware/schemas/domain"
|
||||||
"gitlab.com/unboundsoftware/schemas/graph/model"
|
"gitlab.com/unboundsoftware/schemas/graph/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *Resolver) fetchSubGraph(subGraphId string) (*domain.SubGraph, error) {
|
func (r *Resolver) fetchSubGraph(ctx context.Context, subGraphId string) (*domain.SubGraph, error) {
|
||||||
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(subGraphId)}
|
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(subGraphId)}
|
||||||
_, err := r.handler(subGraph)
|
_, err := r.handler(ctx, subGraph)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
|||||||
if subGraphId != "" {
|
if subGraphId != "" {
|
||||||
subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId)
|
subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId)
|
||||||
}
|
}
|
||||||
handler, err := r.handler(subGraph)
|
handler, err := r.handler(ctx, subGraph)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -36,7 +36,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
|||||||
serviceSDLs := []string{input.Sdl}
|
serviceSDLs := []string{input.Sdl}
|
||||||
services, _ := r.Cache.Services(input.Ref, "")
|
services, _ := r.Cache.Services(input.Ref, "")
|
||||||
for _, id := range services {
|
for _, id := range services {
|
||||||
sg, err := r.fetchSubGraph(id)
|
sg, err := r.fetchSubGraph(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -48,7 +48,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
_, err = handler.Handle(domain.UpdateSubGraph{
|
_, err = handler.Handle(ctx, domain.UpdateSubGraph{
|
||||||
Ref: input.Ref,
|
Ref: input.Ref,
|
||||||
Service: input.Service,
|
Service: input.Service,
|
||||||
Url: input.URL,
|
Url: input.URL,
|
||||||
@@ -89,7 +89,7 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
|
|||||||
}
|
}
|
||||||
subGraphs := make([]*model.SubGraph, len(services))
|
subGraphs := make([]*model.SubGraph, len(services))
|
||||||
for i, id := range services {
|
for i, id := range services {
|
||||||
sg, err := r.fetchSubGraph(id)
|
sg, err := r.fetchSubGraph(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user