124 lines
5.2 KiB
Go
124 lines
5.2 KiB
Go
|
|
package otelsetup
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
|
||
|
|
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||
|
|
"go.opentelemetry.io/otel"
|
||
|
|
"go.opentelemetry.io/otel/attribute"
|
||
|
|
"go.opentelemetry.io/otel/metric"
|
||
|
|
)
|
||
|
|
|
||
|
|
// eventsourcedMeterName is the instrumentation scope for the event-sourcing
|
||
|
|
// metrics emitted by the adapter returned from NewEventsourcedMetrics.
|
||
|
|
const eventsourcedMeterName = "gitea.unbound.se/shiny/otelsetup/eventsourced"
|
||
|
|
|
||
|
|
// durationBucketsSeconds are explicit histogram boundaries tuned for
|
||
|
|
// sub-second event-store and command latencies. The SDK default boundaries are
|
||
|
|
// scaled for milliseconds, which would bucket nearly every second-valued
|
||
|
|
// observation into the first bucket and make percentiles useless.
|
||
|
|
var durationBucketsSeconds = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
|
||
|
|
|
||
|
|
// eventsourcedMetrics implements eventsourced.MetricsRecorder by translating
|
||
|
|
// the framework's Metric values into OpenTelemetry instruments registered on
|
||
|
|
// the global MeterProvider configured by SetupOTelSDK.
|
||
|
|
//
|
||
|
|
// The OTel metric instruments are safe for concurrent use, and the struct is
|
||
|
|
// immutable after construction, so Record may be called from multiple
|
||
|
|
// goroutines as the framework requires.
|
||
|
|
//
|
||
|
|
// Operation counts are read off each duration histogram's generated _count
|
||
|
|
// series rather than separate counters; the only standalone counters carry
|
||
|
|
// information a histogram count cannot (events.loaded sums the number of events
|
||
|
|
// per load, idempotency.checks counts lookups that have no duration).
|
||
|
|
type eventsourcedMetrics struct {
|
||
|
|
commandDuration metric.Float64Histogram
|
||
|
|
eventStoreDur metric.Float64Histogram
|
||
|
|
eventsLoaded metric.Int64Counter
|
||
|
|
eventLoadDur metric.Float64Histogram
|
||
|
|
snapshotStoreDur metric.Float64Histogram
|
||
|
|
snapshotLoadDur metric.Float64Histogram
|
||
|
|
idempotencyCheck metric.Int64Counter
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewEventsourcedMetrics builds an eventsourced.MetricsRecorder that records to
|
||
|
|
// the global OpenTelemetry MeterProvider. Pass the result to both
|
||
|
|
// pg.WithMetrics (for event-store operations) and eventsourced.WithMetrics
|
||
|
|
// (for command handling) so a single recorder covers store and handler
|
||
|
|
// metrics.
|
||
|
|
//
|
||
|
|
// SetupOTelSDK must have run first so the global MeterProvider is configured;
|
||
|
|
// when metrics are disabled the global provider is a no-op and recording is
|
||
|
|
// effectively free.
|
||
|
|
func NewEventsourcedMetrics() (eventsourced.MetricsRecorder, error) {
|
||
|
|
m := otel.Meter(eventsourcedMeterName)
|
||
|
|
var errs []error
|
||
|
|
hist := func(name, desc string) metric.Float64Histogram {
|
||
|
|
h, err := m.Float64Histogram(
|
||
|
|
name,
|
||
|
|
metric.WithDescription(desc),
|
||
|
|
metric.WithUnit("s"),
|
||
|
|
metric.WithExplicitBucketBoundaries(durationBucketsSeconds...),
|
||
|
|
)
|
||
|
|
errs = append(errs, err)
|
||
|
|
return h
|
||
|
|
}
|
||
|
|
counter := func(name, desc string) metric.Int64Counter {
|
||
|
|
c, err := m.Int64Counter(name, metric.WithDescription(desc))
|
||
|
|
errs = append(errs, err)
|
||
|
|
return c
|
||
|
|
}
|
||
|
|
|
||
|
|
r := &eventsourcedMetrics{
|
||
|
|
commandDuration: hist("eventsourced.command.duration", "Wall-clock time to process a command in Handle."),
|
||
|
|
eventStoreDur: hist("eventsourced.event.store.duration", "Time taken to persist a single event."),
|
||
|
|
eventsLoaded: counter("eventsourced.events.loaded", "Number of events loaded when rehydrating aggregates."),
|
||
|
|
eventLoadDur: hist("eventsourced.event.load.duration", "Time taken to load events for an aggregate."),
|
||
|
|
snapshotStoreDur: hist("eventsourced.snapshot.store.duration", "Time taken to persist a snapshot."),
|
||
|
|
snapshotLoadDur: hist("eventsourced.snapshot.load.duration", "Time taken to load a snapshot."),
|
||
|
|
idempotencyCheck: counter("eventsourced.idempotency.checks", "Number of command idempotency lookups."),
|
||
|
|
}
|
||
|
|
if err := errors.Join(errs...); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return r, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// Record implements eventsourced.MetricsRecorder. Metric types the adapter does
|
||
|
|
// not recognise (for example pg outbox metrics when the outbox is not enabled)
|
||
|
|
// are ignored.
|
||
|
|
func (e *eventsourcedMetrics) Record(ctx context.Context, raw eventsourced.Metric) {
|
||
|
|
switch m := raw.(type) {
|
||
|
|
case eventsourced.CommandDuration:
|
||
|
|
e.commandDuration.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||
|
|
attribute.String("command.type", m.CommandType),
|
||
|
|
attribute.Bool("success", m.Success),
|
||
|
|
))
|
||
|
|
case eventsourced.EventStored:
|
||
|
|
e.eventStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||
|
|
attribute.String("aggregate.type", m.AggregateType),
|
||
|
|
attribute.String("event.type", m.EventType),
|
||
|
|
))
|
||
|
|
case eventsourced.EventsLoaded:
|
||
|
|
attrs := metric.WithAttributes(attribute.String("aggregate.type", m.AggregateType))
|
||
|
|
e.eventsLoaded.Add(ctx, int64(m.EventCount), attrs)
|
||
|
|
e.eventLoadDur.Record(ctx, m.Duration.Seconds(), attrs)
|
||
|
|
case eventsourced.SnapshotStored:
|
||
|
|
e.snapshotStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||
|
|
attribute.String("aggregate.type", m.AggregateType),
|
||
|
|
attribute.Bool("success", m.Success),
|
||
|
|
))
|
||
|
|
case eventsourced.SnapshotLoaded:
|
||
|
|
e.snapshotLoadDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes(
|
||
|
|
attribute.String("aggregate.type", m.AggregateType),
|
||
|
|
attribute.Bool("found", m.Found),
|
||
|
|
))
|
||
|
|
case eventsourced.IdempotencyCheck:
|
||
|
|
e.idempotencyCheck.Add(ctx, 1, metric.WithAttributes(
|
||
|
|
attribute.String("aggregate.type", m.AggregateType),
|
||
|
|
attribute.Bool("hit", m.Hit),
|
||
|
|
))
|
||
|
|
}
|
||
|
|
}
|