feat: add eventsourced MetricsRecorder adapter for OpenTelemetry
NewEventsourcedMetrics returns an eventsourced.MetricsRecorder that maps the framework's Metric values (command duration, event store/load, snapshots, idempotency checks) onto OTel instruments on the global MeterProvider set by SetupOTelSDK. Intended for pg.WithMetrics and eventsourced.WithMetrics.
This commit is contained in:
@@ -4,11 +4,13 @@ go 1.25.0
|
||||
|
||||
require (
|
||||
github.com/99designs/gqlgen v0.17.90
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.23.0
|
||||
go.opentelemetry.io/otel v1.43.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.19.0
|
||||
go.opentelemetry.io/otel/log v0.19.0
|
||||
go.opentelemetry.io/otel/metric v1.43.0
|
||||
go.opentelemetry.io/otel/sdk v1.43.0
|
||||
go.opentelemetry.io/otel/sdk/log v0.19.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0
|
||||
@@ -26,7 +28,6 @@ require (
|
||||
github.com/vektah/gqlparser/v2 v2.5.33 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.43.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
|
||||
golang.org/x/net v0.52.0 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
|
||||
@@ -33,6 +33,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/vektah/gqlparser/v2 v2.5.33 h1:lRp8aIeNUNbimf/axZd7ETg24q06hBtPaas+TcvI/7E=
|
||||
github.com/vektah/gqlparser/v2 v2.5.33/go.mod h1:c1I28gSOVNzlfc4WuDlqU7voQnsqI6OG2amkBAFmgts=
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.23.0 h1:qcteJH9D7kHaOgLQ0fzlW9dv42hSa0Vluqt7p4kooWA=
|
||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.23.0/go.mod h1:LrA7I7etRmhIC1PjO8c26BHm+gWsy2rC3eSMe5+XUWE=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
|
||||
|
||||
+123
@@ -0,0 +1,123 @@
|
||||
package otelsetup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// eventsourcedMeterName is the instrumentation scope for the event-sourcing
|
||||
// metrics emitted by the adapter returned from NewEventsourcedMetrics.
|
||||
const eventsourcedMeterName = "gitea.unbound.se/shiny/otelsetup/eventsourced"
|
||||
|
||||
// durationBucketsSeconds are explicit histogram boundaries tuned for
|
||||
// sub-second event-store and command latencies. The SDK default boundaries are
|
||||
// scaled for milliseconds, which would bucket nearly every second-valued
|
||||
// observation into the first bucket and make percentiles useless.
|
||||
var durationBucketsSeconds = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
|
||||
|
||||
// eventsourcedMetrics implements eventsourced.MetricsRecorder by translating
|
||||
// the framework's Metric values into OpenTelemetry instruments registered on
|
||||
// the global MeterProvider configured by SetupOTelSDK.
|
||||
//
|
||||
// The OTel metric instruments are safe for concurrent use, and the struct is
|
||||
// immutable after construction, so Record may be called from multiple
|
||||
// goroutines as the framework requires.
|
||||
//
|
||||
// Operation counts are read off each duration histogram's generated _count
|
||||
// series rather than separate counters; the only standalone counters carry
|
||||
// information a histogram count cannot (events.loaded sums the number of events
|
||||
// per load, idempotency.checks counts lookups that have no duration).
|
||||
type eventsourcedMetrics struct {
|
||||
commandDuration metric.Float64Histogram
|
||||
eventStoreDur metric.Float64Histogram
|
||||
eventsLoaded metric.Int64Counter
|
||||
eventLoadDur metric.Float64Histogram
|
||||
snapshotStoreDur metric.Float64Histogram
|
||||
snapshotLoadDur metric.Float64Histogram
|
||||
idempotencyCheck metric.Int64Counter
|
||||
}
|
||||
|
||||
// NewEventsourcedMetrics builds an eventsourced.MetricsRecorder that records to
|
||||
// the global OpenTelemetry MeterProvider. Pass the result to both
|
||||
// pg.WithMetrics (for event-store operations) and eventsourced.WithMetrics
|
||||
// (for command handling) so a single recorder covers store and handler
|
||||
// metrics.
|
||||
//
|
||||
// SetupOTelSDK must have run first so the global MeterProvider is configured;
|
||||
// when metrics are disabled the global provider is a no-op and recording is
|
||||
// effectively free.
|
||||
func NewEventsourcedMetrics() (eventsourced.MetricsRecorder, error) {
|
||||
m := otel.Meter(eventsourcedMeterName)
|
||||
var errs []error
|
||||
hist := func(name, desc string) metric.Float64Histogram {
|
||||
h, err := m.Float64Histogram(
|
||||
name,
|
||||
metric.WithDescription(desc),
|
||||
metric.WithUnit("s"),
|
||||
metric.WithExplicitBucketBoundaries(durationBucketsSeconds...),
|
||||
)
|
||||
errs = append(errs, err)
|
||||
return h
|
||||
}
|
||||
counter := func(name, desc string) metric.Int64Counter {
|
||||
c, err := m.Int64Counter(name, metric.WithDescription(desc))
|
||||
errs = append(errs, err)
|
||||
return c
|
||||
}
|
||||
|
||||
r := &eventsourcedMetrics{
|
||||
commandDuration: hist("eventsourced.command.duration", "Wall-clock time to process a command in Handle."),
|
||||
eventStoreDur: hist("eventsourced.event.store.duration", "Time taken to persist a single event."),
|
||||
eventsLoaded: counter("eventsourced.events.loaded", "Number of events loaded when rehydrating aggregates."),
|
||||
eventLoadDur: hist("eventsourced.event.load.duration", "Time taken to load events for an aggregate."),
|
||||
snapshotStoreDur: hist("eventsourced.snapshot.store.duration", "Time taken to persist a snapshot."),
|
||||
snapshotLoadDur: hist("eventsourced.snapshot.load.duration", "Time taken to load a snapshot."),
|
||||
idempotencyCheck: counter("eventsourced.idempotency.checks", "Number of command idempotency lookups."),
|
||||
}
|
||||
if err := errors.Join(errs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Record implements eventsourced.MetricsRecorder. Metric types the adapter does
|
||||
// not recognise (for example pg outbox metrics when the outbox is not enabled)
|
||||
// are ignored.
|
||||
func (e *eventsourcedMetrics) Record(ctx context.Context, raw eventsourced.Metric) {
|
||||
switch m := raw.(type) {
|
||||
case eventsourced.CommandDuration:
|
||||
e.commandDuration.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("command.type", m.CommandType),
|
||||
attribute.Bool("success", m.Success),
|
||||
))
|
||||
case eventsourced.EventStored:
|
||||
e.eventStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.String("event.type", m.EventType),
|
||||
))
|
||||
case eventsourced.EventsLoaded:
|
||||
attrs := metric.WithAttributes(attribute.String("aggregate.type", m.AggregateType))
|
||||
e.eventsLoaded.Add(ctx, int64(m.EventCount), attrs)
|
||||
e.eventLoadDur.Record(ctx, m.Duration.Seconds(), attrs)
|
||||
case eventsourced.SnapshotStored:
|
||||
e.snapshotStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.Bool("success", m.Success),
|
||||
))
|
||||
case eventsourced.SnapshotLoaded:
|
||||
e.snapshotLoadDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.Bool("found", m.Found),
|
||||
))
|
||||
case eventsourced.IdempotencyCheck:
|
||||
e.idempotencyCheck.Add(ctx, 1, metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.Bool("hit", m.Hit),
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package otelsetup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||
"go.opentelemetry.io/otel"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
func TestNewEventsourcedMetrics_RecordsContract(t *testing.T) {
|
||||
reader := sdkmetric.NewManualReader()
|
||||
otel.SetMeterProvider(sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)))
|
||||
|
||||
r, err := NewEventsourcedMetrics()
|
||||
if err != nil {
|
||||
t.Fatalf("NewEventsourcedMetrics returned error: %v", err)
|
||||
}
|
||||
if r == nil {
|
||||
t.Fatal("NewEventsourcedMetrics returned nil recorder")
|
||||
}
|
||||
|
||||
// Recording every known metric type (and an unknown one) must not panic
|
||||
// and must emit the expected instruments.
|
||||
for _, m := range []eventsourced.Metric{
|
||||
eventsourced.CommandDuration{CommandType: "AddEntry", Duration: time.Millisecond, Success: true},
|
||||
eventsourced.EventStored{AggregateType: "Entry", EventType: "EntryAdded", Duration: time.Millisecond},
|
||||
eventsourced.EventsLoaded{AggregateType: "Entry", EventCount: 3, Duration: time.Millisecond},
|
||||
eventsourced.SnapshotStored{AggregateType: "Entry", Duration: time.Millisecond, Success: true},
|
||||
eventsourced.SnapshotLoaded{AggregateType: "Entry", Found: false, Duration: time.Millisecond},
|
||||
eventsourced.IdempotencyCheck{AggregateType: "Entry", Hit: true},
|
||||
unknownMetric{},
|
||||
} {
|
||||
r.Record(context.Background(), m)
|
||||
}
|
||||
|
||||
var rm metricdata.ResourceMetrics
|
||||
if err := reader.Collect(context.Background(), &rm); err != nil {
|
||||
t.Fatalf("collect: %v", err)
|
||||
}
|
||||
|
||||
got := map[string]bool{}
|
||||
for _, sm := range rm.ScopeMetrics {
|
||||
for _, md := range sm.Metrics {
|
||||
got[md.Name] = true
|
||||
}
|
||||
}
|
||||
want := []string{
|
||||
"eventsourced.command.duration",
|
||||
"eventsourced.event.store.duration",
|
||||
"eventsourced.events.loaded",
|
||||
"eventsourced.event.load.duration",
|
||||
"eventsourced.snapshot.store.duration",
|
||||
"eventsourced.snapshot.load.duration",
|
||||
"eventsourced.idempotency.checks",
|
||||
}
|
||||
var missing []string
|
||||
for _, w := range want {
|
||||
if !got[w] {
|
||||
missing = append(missing, w)
|
||||
}
|
||||
}
|
||||
if len(missing) > 0 {
|
||||
sort.Strings(missing)
|
||||
t.Errorf("missing expected metrics: %v", missing)
|
||||
}
|
||||
}
|
||||
|
||||
type unknownMetric struct{}
|
||||
|
||||
func (unknownMetric) IsMetric() {}
|
||||
Reference in New Issue
Block a user