package otelsetup import ( "context" "errors" "gitlab.com/unboundsoftware/eventsourced/eventsourced" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) // eventsourcedMeterName is the instrumentation scope for the event-sourcing // metrics emitted by the adapter returned from NewEventsourcedMetrics. const eventsourcedMeterName = "gitea.unbound.se/shiny/otelsetup/eventsourced" // durationBucketsSeconds are explicit histogram boundaries tuned for // sub-second event-store and command latencies. The SDK default boundaries are // scaled for milliseconds, which would bucket nearly every second-valued // observation into the first bucket and make percentiles useless. var durationBucketsSeconds = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} // eventsourcedMetrics implements eventsourced.MetricsRecorder by translating // the framework's Metric values into OpenTelemetry instruments registered on // the global MeterProvider configured by SetupOTelSDK. // // The OTel metric instruments are safe for concurrent use, and the struct is // immutable after construction, so Record may be called from multiple // goroutines as the framework requires. // // Operation counts are read off each duration histogram's generated _count // series rather than separate counters; the only standalone counters carry // information a histogram count cannot (events.loaded sums the number of events // per load, idempotency.checks counts lookups that have no duration). type eventsourcedMetrics struct { commandDuration metric.Float64Histogram eventStoreDur metric.Float64Histogram eventsLoaded metric.Int64Counter eventLoadDur metric.Float64Histogram snapshotStoreDur metric.Float64Histogram snapshotLoadDur metric.Float64Histogram idempotencyCheck metric.Int64Counter } // NewEventsourcedMetrics builds an eventsourced.MetricsRecorder that records to // the global OpenTelemetry MeterProvider. Pass the result to both // pg.WithMetrics (for event-store operations) and eventsourced.WithMetrics // (for command handling) so a single recorder covers store and handler // metrics. // // SetupOTelSDK must have run first so the global MeterProvider is configured; // when metrics are disabled the global provider is a no-op and recording is // effectively free. func NewEventsourcedMetrics() (eventsourced.MetricsRecorder, error) { m := otel.Meter(eventsourcedMeterName) var errs []error hist := func(name, desc string) metric.Float64Histogram { h, err := m.Float64Histogram( name, metric.WithDescription(desc), metric.WithUnit("s"), metric.WithExplicitBucketBoundaries(durationBucketsSeconds...), ) errs = append(errs, err) return h } counter := func(name, desc string) metric.Int64Counter { c, err := m.Int64Counter(name, metric.WithDescription(desc)) errs = append(errs, err) return c } r := &eventsourcedMetrics{ commandDuration: hist("eventsourced.command.duration", "Wall-clock time to process a command in Handle."), eventStoreDur: hist("eventsourced.event.store.duration", "Time taken to persist a single event."), eventsLoaded: counter("eventsourced.events.loaded", "Number of events loaded when rehydrating aggregates."), eventLoadDur: hist("eventsourced.event.load.duration", "Time taken to load events for an aggregate."), snapshotStoreDur: hist("eventsourced.snapshot.store.duration", "Time taken to persist a snapshot."), snapshotLoadDur: hist("eventsourced.snapshot.load.duration", "Time taken to load a snapshot."), idempotencyCheck: counter("eventsourced.idempotency.checks", "Number of command idempotency lookups."), } if err := errors.Join(errs...); err != nil { return nil, err } return r, nil } // Record implements eventsourced.MetricsRecorder. Metric types the adapter does // not recognise (for example pg outbox metrics when the outbox is not enabled) // are ignored. func (e *eventsourcedMetrics) Record(ctx context.Context, raw eventsourced.Metric) { switch m := raw.(type) { case eventsourced.CommandDuration: e.commandDuration.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( attribute.String("command.type", m.CommandType), attribute.Bool("success", m.Success), )) case eventsourced.EventStored: e.eventStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( attribute.String("aggregate.type", m.AggregateType), attribute.String("event.type", m.EventType), )) case eventsourced.EventsLoaded: attrs := metric.WithAttributes(attribute.String("aggregate.type", m.AggregateType)) e.eventsLoaded.Add(ctx, int64(m.EventCount), attrs) e.eventLoadDur.Record(ctx, m.Duration.Seconds(), attrs) case eventsourced.SnapshotStored: e.snapshotStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( attribute.String("aggregate.type", m.AggregateType), attribute.Bool("success", m.Success), )) case eventsourced.SnapshotLoaded: e.snapshotLoadDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( attribute.String("aggregate.type", m.AggregateType), attribute.Bool("found", m.Found), )) case eventsourced.IdempotencyCheck: e.idempotencyCheck.Add(ctx, 1, metric.WithAttributes( attribute.String("aggregate.type", m.AggregateType), attribute.Bool("hit", m.Hit), )) } }