feat: add eventsourced MetricsRecorder adapter for OpenTelemetry (#142)
This commit was merged in pull request #142.
This commit is contained in:
+123
@@ -0,0 +1,123 @@
|
||||
package otelsetup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// eventsourcedMeterName is the instrumentation scope for the event-sourcing
|
||||
// metrics emitted by the adapter returned from NewEventsourcedMetrics.
|
||||
const eventsourcedMeterName = "gitea.unbound.se/shiny/otelsetup/eventsourced"
|
||||
|
||||
// durationBucketsSeconds are explicit histogram boundaries tuned for
|
||||
// sub-second event-store and command latencies. The SDK default boundaries are
|
||||
// scaled for milliseconds, which would bucket nearly every second-valued
|
||||
// observation into the first bucket and make percentiles useless.
|
||||
var durationBucketsSeconds = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
|
||||
|
||||
// eventsourcedMetrics implements eventsourced.MetricsRecorder by translating
|
||||
// the framework's Metric values into OpenTelemetry instruments registered on
|
||||
// the global MeterProvider configured by SetupOTelSDK.
|
||||
//
|
||||
// The OTel metric instruments are safe for concurrent use, and the struct is
|
||||
// immutable after construction, so Record may be called from multiple
|
||||
// goroutines as the framework requires.
|
||||
//
|
||||
// Operation counts are read off each duration histogram's generated _count
|
||||
// series rather than separate counters; the only standalone counters carry
|
||||
// information a histogram count cannot (events.loaded sums the number of events
|
||||
// per load, idempotency.checks counts lookups that have no duration).
|
||||
type eventsourcedMetrics struct {
|
||||
commandDuration metric.Float64Histogram
|
||||
eventStoreDur metric.Float64Histogram
|
||||
eventsLoaded metric.Int64Counter
|
||||
eventLoadDur metric.Float64Histogram
|
||||
snapshotStoreDur metric.Float64Histogram
|
||||
snapshotLoadDur metric.Float64Histogram
|
||||
idempotencyCheck metric.Int64Counter
|
||||
}
|
||||
|
||||
// NewEventsourcedMetrics builds an eventsourced.MetricsRecorder that records to
|
||||
// the global OpenTelemetry MeterProvider. Pass the result to both
|
||||
// pg.WithMetrics (for event-store operations) and eventsourced.WithMetrics
|
||||
// (for command handling) so a single recorder covers store and handler
|
||||
// metrics.
|
||||
//
|
||||
// SetupOTelSDK must have run first so the global MeterProvider is configured;
|
||||
// when metrics are disabled the global provider is a no-op and recording is
|
||||
// effectively free.
|
||||
func NewEventsourcedMetrics() (eventsourced.MetricsRecorder, error) {
|
||||
m := otel.Meter(eventsourcedMeterName)
|
||||
var errs []error
|
||||
hist := func(name, desc string) metric.Float64Histogram {
|
||||
h, err := m.Float64Histogram(
|
||||
name,
|
||||
metric.WithDescription(desc),
|
||||
metric.WithUnit("s"),
|
||||
metric.WithExplicitBucketBoundaries(durationBucketsSeconds...),
|
||||
)
|
||||
errs = append(errs, err)
|
||||
return h
|
||||
}
|
||||
counter := func(name, desc string) metric.Int64Counter {
|
||||
c, err := m.Int64Counter(name, metric.WithDescription(desc))
|
||||
errs = append(errs, err)
|
||||
return c
|
||||
}
|
||||
|
||||
r := &eventsourcedMetrics{
|
||||
commandDuration: hist("eventsourced.command.duration", "Wall-clock time to process a command in Handle."),
|
||||
eventStoreDur: hist("eventsourced.event.store.duration", "Time taken to persist a single event."),
|
||||
eventsLoaded: counter("eventsourced.events.loaded", "Number of events loaded when rehydrating aggregates."),
|
||||
eventLoadDur: hist("eventsourced.event.load.duration", "Time taken to load events for an aggregate."),
|
||||
snapshotStoreDur: hist("eventsourced.snapshot.store.duration", "Time taken to persist a snapshot."),
|
||||
snapshotLoadDur: hist("eventsourced.snapshot.load.duration", "Time taken to load a snapshot."),
|
||||
idempotencyCheck: counter("eventsourced.idempotency.checks", "Number of command idempotency lookups."),
|
||||
}
|
||||
if err := errors.Join(errs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Record implements eventsourced.MetricsRecorder. Metric types the adapter does
|
||||
// not recognise (for example pg outbox metrics when the outbox is not enabled)
|
||||
// are ignored.
|
||||
func (e *eventsourcedMetrics) Record(ctx context.Context, raw eventsourced.Metric) {
|
||||
switch m := raw.(type) {
|
||||
case eventsourced.CommandDuration:
|
||||
e.commandDuration.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("command.type", m.CommandType),
|
||||
attribute.Bool("success", m.Success),
|
||||
))
|
||||
case eventsourced.EventStored:
|
||||
e.eventStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.String("event.type", m.EventType),
|
||||
))
|
||||
case eventsourced.EventsLoaded:
|
||||
attrs := metric.WithAttributes(attribute.String("aggregate.type", m.AggregateType))
|
||||
e.eventsLoaded.Add(ctx, int64(m.EventCount), attrs)
|
||||
e.eventLoadDur.Record(ctx, m.Duration.Seconds(), attrs)
|
||||
case eventsourced.SnapshotStored:
|
||||
e.snapshotStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.Bool("success", m.Success),
|
||||
))
|
||||
case eventsourced.SnapshotLoaded:
|
||||
e.snapshotLoadDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.Bool("found", m.Found),
|
||||
))
|
||||
case eventsourced.IdempotencyCheck:
|
||||
e.idempotencyCheck.Add(ctx, 1, metric.WithAttributes(
|
||||
attribute.String("aggregate.type", m.AggregateType),
|
||||
attribute.Bool("hit", m.Hit),
|
||||
))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user