feat(metrics): add SubscriptionMetrics OTel observer for subscriptions (#150)
This commit was merged in pull request #150.
This commit is contained in:
@@ -0,0 +1,85 @@
|
||||
package otelsetup
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
// subscriptionMeterName is the instrumentation scope for the cross-service
|
||||
// subscription push metrics emitted by SubscriptionMetrics.
|
||||
const subscriptionMeterName = "gitea.unbound.se/shiny/otelsetup/subscriptions"
|
||||
|
||||
// SubscriptionMetrics records cross-service subscription push outcomes (the
|
||||
// read-your-writes subscriptions described in ADR-0012) as OpenTelemetry
|
||||
// counters.
|
||||
//
|
||||
// Its method set (Pushed/PushSkipped/Dropped/ChannelFull) satisfies the
|
||||
// Observer interface of gitea.unbound.se/shiny/subscriptions *structurally* —
|
||||
// otelsetup does not import that library. A service constructs it and passes it
|
||||
// to the registry:
|
||||
//
|
||||
// metrics, err := otelsetup.NewSubscriptionMetrics("availableCompanies")
|
||||
// if err != nil { return err }
|
||||
// reg := subscriptions.New[T](subscriptions.WithObserver(metrics))
|
||||
//
|
||||
// The WithObserver call type-checks the structural match, so a drift in the
|
||||
// Observer interface fails the service build. Add a
|
||||
// `var _ subscriptions.Observer = (*otelsetup.SubscriptionMetrics)(nil)` next to
|
||||
// it for an explicit guard.
|
||||
//
|
||||
// Outcomes are recorded against a low-cardinality (subscription, outcome) pair.
|
||||
// The subscriber key (company id / user email) the Observer methods receive is
|
||||
// deliberately NOT used as a metric attribute — that would be unbounded
|
||||
// cardinality; the key stays in the subscriptions library's logs for
|
||||
// correlation.
|
||||
type SubscriptionMetrics struct {
|
||||
notifications metric.Int64Counter
|
||||
subscription attribute.KeyValue
|
||||
}
|
||||
|
||||
// NewSubscriptionMetrics builds a SubscriptionMetrics that records to the global
|
||||
// OpenTelemetry MeterProvider. subscription is the low-cardinality name of the
|
||||
// subscription field (e.g. "availableCompanies", "entryBasesChanged"), recorded
|
||||
// as an attribute so one counter covers every subscription.
|
||||
//
|
||||
// SetupOTelSDK must have run first so the global MeterProvider is configured;
|
||||
// when metrics are disabled the global provider is a no-op and recording is
|
||||
// effectively free.
|
||||
func NewSubscriptionMetrics(subscription string) (*SubscriptionMetrics, error) {
|
||||
c, err := otel.Meter(subscriptionMeterName).Int64Counter(
|
||||
"subscription.notifications",
|
||||
metric.WithDescription("Cross-service subscription push outcomes, by outcome (pushed/skipped/dropped/channel_full)."),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SubscriptionMetrics{
|
||||
notifications: c,
|
||||
subscription: attribute.String("subscription", subscription),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *SubscriptionMetrics) record(outcome string) {
|
||||
s.notifications.Add(context.Background(), 1, metric.WithAttributes(
|
||||
s.subscription,
|
||||
attribute.String("outcome", outcome),
|
||||
))
|
||||
}
|
||||
|
||||
// Pushed records a change that was gated and delivered to the key's subscribers
|
||||
// — the denominator for a skip/drop rate.
|
||||
func (s *SubscriptionMetrics) Pushed(string) { s.record("pushed") }
|
||||
|
||||
// PushSkipped records a push skipped because the read view never reflected the
|
||||
// change within the retry budget.
|
||||
func (s *SubscriptionMetrics) PushSkipped(string) { s.record("skipped") }
|
||||
|
||||
// Dropped records a notification dropped because the worker queue was full.
|
||||
func (s *SubscriptionMetrics) Dropped(string) { s.record("dropped") }
|
||||
|
||||
// ChannelFull records a notification dropped because a subscriber's buffer was
|
||||
// full.
|
||||
func (s *SubscriptionMetrics) ChannelFull(string) { s.record("channel_full") }
|
||||
@@ -0,0 +1,108 @@
|
||||
package otelsetup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
// observerShape mirrors gitea.unbound.se/shiny/subscriptions.Observer. This
|
||||
// compile-time assertion guards that SubscriptionMetrics still satisfies that
|
||||
// interface structurally, without otelsetup importing the library.
|
||||
//
|
||||
// KEEP IN SYNC with subscriptions.Observer: this only proves SubscriptionMetrics
|
||||
// matches this local copy. The authoritative check that the local copy still
|
||||
// matches the real interface is the `subscriptions.WithObserver(...)` call site
|
||||
// in each consuming service — keep a `var _ subscriptions.Observer` guard there.
|
||||
type observerShape interface {
|
||||
Pushed(string)
|
||||
PushSkipped(string)
|
||||
Dropped(string)
|
||||
ChannelFull(string)
|
||||
}
|
||||
|
||||
var _ observerShape = (*SubscriptionMetrics)(nil)
|
||||
|
||||
// TestSubscriptionMetrics_DisabledProviderIsSafe proves the "recording is free
|
||||
// when metrics are disabled" claim: with the default global no-op provider, the
|
||||
// methods neither panic nor emit instruments.
|
||||
func TestSubscriptionMetrics_DisabledProviderIsSafe(t *testing.T) {
|
||||
otel.SetMeterProvider(noop.NewMeterProvider())
|
||||
|
||||
m, err := NewSubscriptionMetrics("entryBasesChanged")
|
||||
if err != nil {
|
||||
t.Fatalf("NewSubscriptionMetrics returned error: %v", err)
|
||||
}
|
||||
// Must not panic on the no-op provider.
|
||||
m.Pushed("c1")
|
||||
m.PushSkipped("c1")
|
||||
m.Dropped("c1")
|
||||
m.ChannelFull("c1")
|
||||
}
|
||||
|
||||
func TestNewSubscriptionMetrics_RecordsOutcomes(t *testing.T) {
|
||||
reader := sdkmetric.NewManualReader()
|
||||
otel.SetMeterProvider(sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)))
|
||||
|
||||
m, err := NewSubscriptionMetrics("availableCompanies")
|
||||
if err != nil {
|
||||
t.Fatalf("NewSubscriptionMetrics returned error: %v", err)
|
||||
}
|
||||
|
||||
// The subscriber key is ignored for metrics; different keys must not create
|
||||
// new series (cardinality guard is implicit — we only label by outcome).
|
||||
m.Pushed("c1")
|
||||
m.Pushed("c2")
|
||||
m.PushSkipped("c1")
|
||||
m.Dropped("c1")
|
||||
m.ChannelFull("c1")
|
||||
|
||||
var rm metricdata.ResourceMetrics
|
||||
if err := reader.Collect(context.Background(), &rm); err != nil {
|
||||
t.Fatalf("collect: %v", err)
|
||||
}
|
||||
|
||||
counts := map[string]int64{}
|
||||
dataPoints := 0
|
||||
found := false
|
||||
for _, sm := range rm.ScopeMetrics {
|
||||
for _, md := range sm.Metrics {
|
||||
if md.Name != "subscription.notifications" {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
sum, ok := md.Data.(metricdata.Sum[int64])
|
||||
if !ok {
|
||||
t.Fatalf("expected Sum[int64], got %T", md.Data)
|
||||
}
|
||||
for _, dp := range sum.DataPoints {
|
||||
dataPoints++
|
||||
sub, _ := dp.Attributes.Value(attribute.Key("subscription"))
|
||||
if sub.AsString() != "availableCompanies" {
|
||||
t.Errorf("unexpected subscription attribute: %q", sub.AsString())
|
||||
}
|
||||
outcome, _ := dp.Attributes.Value(attribute.Key("outcome"))
|
||||
counts[outcome.AsString()] += dp.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Fatal("subscription.notifications counter not emitted")
|
||||
}
|
||||
// One series per outcome, keyed by outcome only (not by subscriber key).
|
||||
if dataPoints != 4 {
|
||||
t.Errorf("expected 4 data points (one per outcome), got %d", dataPoints)
|
||||
}
|
||||
want := map[string]int64{"pushed": 2, "skipped": 1, "dropped": 1, "channel_full": 1}
|
||||
for k, v := range want {
|
||||
if counts[k] != v {
|
||||
t.Errorf("outcome %q: got %d, want %d", k, counts[k], v)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user