From cb99bbc1a6d5c3e79df9625e581f36f455a83e93 Mon Sep 17 00:00:00 2001 From: Joakim Olsson Date: Tue, 16 Jun 2026 15:07:35 +0200 Subject: [PATCH] feat(metrics): add SubscriptionMetrics OTel observer for read-your-writes subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NewSubscriptionMetrics(subscription) returns a *SubscriptionMetrics whose method set (Pushed/PushSkipped/Dropped/ChannelFull) satisfies the Observer interface of gitea.unbound.se/shiny/subscriptions structurally — otelsetup does not import that library. Services wire it via subscriptions.WithObserver so the cross-service subscription push path (ADR-0012) reports outcomes as metrics instead of only the library's skip/drop warn logs. One Int64Counter `subscription.notifications` labelled by (subscription, outcome) only — the subscriber key (company id / user email) is deliberately NOT a label to avoid unbounded cardinality. skip-rate = skipped/(pushed+skipped). Co-Authored-By: Claude Opus 4.8 (1M context) --- subscription_metrics.go | 85 +++++++++++++++++++++++++++ subscription_metrics_test.go | 108 +++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 subscription_metrics.go create mode 100644 subscription_metrics_test.go diff --git a/subscription_metrics.go b/subscription_metrics.go new file mode 100644 index 0000000..6133964 --- /dev/null +++ b/subscription_metrics.go @@ -0,0 +1,85 @@ +package otelsetup + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// subscriptionMeterName is the instrumentation scope for the cross-service +// subscription push metrics emitted by SubscriptionMetrics. +const subscriptionMeterName = "gitea.unbound.se/shiny/otelsetup/subscriptions" + +// SubscriptionMetrics records cross-service subscription push outcomes (the +// read-your-writes subscriptions described in ADR-0012) as OpenTelemetry +// counters. +// +// Its method set (Pushed/PushSkipped/Dropped/ChannelFull) satisfies the +// Observer interface of gitea.unbound.se/shiny/subscriptions *structurally* — +// otelsetup does not import that library. A service constructs it and passes it +// to the registry: +// +// metrics, err := otelsetup.NewSubscriptionMetrics("availableCompanies") +// if err != nil { return err } +// reg := subscriptions.New[T](subscriptions.WithObserver(metrics)) +// +// The WithObserver call type-checks the structural match, so a drift in the +// Observer interface fails the service build. Add a +// `var _ subscriptions.Observer = (*otelsetup.SubscriptionMetrics)(nil)` next to +// it for an explicit guard. +// +// Outcomes are recorded against a low-cardinality (subscription, outcome) pair. +// The subscriber key (company id / user email) the Observer methods receive is +// deliberately NOT used as a metric attribute — that would be unbounded +// cardinality; the key stays in the subscriptions library's logs for +// correlation. +type SubscriptionMetrics struct { + notifications metric.Int64Counter + subscription attribute.KeyValue +} + +// NewSubscriptionMetrics builds a SubscriptionMetrics that records to the global +// OpenTelemetry MeterProvider. subscription is the low-cardinality name of the +// subscription field (e.g. "availableCompanies", "entryBasesChanged"), recorded +// as an attribute so one counter covers every subscription. +// +// SetupOTelSDK must have run first so the global MeterProvider is configured; +// when metrics are disabled the global provider is a no-op and recording is +// effectively free. +func NewSubscriptionMetrics(subscription string) (*SubscriptionMetrics, error) { + c, err := otel.Meter(subscriptionMeterName).Int64Counter( + "subscription.notifications", + metric.WithDescription("Cross-service subscription push outcomes, by outcome (pushed/skipped/dropped/channel_full)."), + ) + if err != nil { + return nil, err + } + return &SubscriptionMetrics{ + notifications: c, + subscription: attribute.String("subscription", subscription), + }, nil +} + +func (s *SubscriptionMetrics) record(outcome string) { + s.notifications.Add(context.Background(), 1, metric.WithAttributes( + s.subscription, + attribute.String("outcome", outcome), + )) +} + +// Pushed records a change that was gated and delivered to the key's subscribers +// — the denominator for a skip/drop rate. +func (s *SubscriptionMetrics) Pushed(string) { s.record("pushed") } + +// PushSkipped records a push skipped because the read view never reflected the +// change within the retry budget. +func (s *SubscriptionMetrics) PushSkipped(string) { s.record("skipped") } + +// Dropped records a notification dropped because the worker queue was full. +func (s *SubscriptionMetrics) Dropped(string) { s.record("dropped") } + +// ChannelFull records a notification dropped because a subscriber's buffer was +// full. +func (s *SubscriptionMetrics) ChannelFull(string) { s.record("channel_full") } diff --git a/subscription_metrics_test.go b/subscription_metrics_test.go new file mode 100644 index 0000000..0ed2129 --- /dev/null +++ b/subscription_metrics_test.go @@ -0,0 +1,108 @@ +package otelsetup + +import ( + "context" + "testing" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/noop" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// observerShape mirrors gitea.unbound.se/shiny/subscriptions.Observer. This +// compile-time assertion guards that SubscriptionMetrics still satisfies that +// interface structurally, without otelsetup importing the library. +// +// KEEP IN SYNC with subscriptions.Observer: this only proves SubscriptionMetrics +// matches this local copy. The authoritative check that the local copy still +// matches the real interface is the `subscriptions.WithObserver(...)` call site +// in each consuming service — keep a `var _ subscriptions.Observer` guard there. +type observerShape interface { + Pushed(string) + PushSkipped(string) + Dropped(string) + ChannelFull(string) +} + +var _ observerShape = (*SubscriptionMetrics)(nil) + +// TestSubscriptionMetrics_DisabledProviderIsSafe proves the "recording is free +// when metrics are disabled" claim: with the default global no-op provider, the +// methods neither panic nor emit instruments. +func TestSubscriptionMetrics_DisabledProviderIsSafe(t *testing.T) { + otel.SetMeterProvider(noop.NewMeterProvider()) + + m, err := NewSubscriptionMetrics("entryBasesChanged") + if err != nil { + t.Fatalf("NewSubscriptionMetrics returned error: %v", err) + } + // Must not panic on the no-op provider. + m.Pushed("c1") + m.PushSkipped("c1") + m.Dropped("c1") + m.ChannelFull("c1") +} + +func TestNewSubscriptionMetrics_RecordsOutcomes(t *testing.T) { + reader := sdkmetric.NewManualReader() + otel.SetMeterProvider(sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))) + + m, err := NewSubscriptionMetrics("availableCompanies") + if err != nil { + t.Fatalf("NewSubscriptionMetrics returned error: %v", err) + } + + // The subscriber key is ignored for metrics; different keys must not create + // new series (cardinality guard is implicit — we only label by outcome). + m.Pushed("c1") + m.Pushed("c2") + m.PushSkipped("c1") + m.Dropped("c1") + m.ChannelFull("c1") + + var rm metricdata.ResourceMetrics + if err := reader.Collect(context.Background(), &rm); err != nil { + t.Fatalf("collect: %v", err) + } + + counts := map[string]int64{} + dataPoints := 0 + found := false + for _, sm := range rm.ScopeMetrics { + for _, md := range sm.Metrics { + if md.Name != "subscription.notifications" { + continue + } + found = true + sum, ok := md.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("expected Sum[int64], got %T", md.Data) + } + for _, dp := range sum.DataPoints { + dataPoints++ + sub, _ := dp.Attributes.Value(attribute.Key("subscription")) + if sub.AsString() != "availableCompanies" { + t.Errorf("unexpected subscription attribute: %q", sub.AsString()) + } + outcome, _ := dp.Attributes.Value(attribute.Key("outcome")) + counts[outcome.AsString()] += dp.Value + } + } + } + + if !found { + t.Fatal("subscription.notifications counter not emitted") + } + // One series per outcome, keyed by outcome only (not by subscriber key). + if dataPoints != 4 { + t.Errorf("expected 4 data points (one per outcome), got %d", dataPoints) + } + want := map[string]int64{"pushed": 2, "skipped": 1, "dropped": 1, "channel_full": 1} + for k, v := range want { + if counts[k] != v { + t.Errorf("outcome %q: got %d, want %d", k, counts[k], v) + } + } +} -- 2.52.0