cb99bbc1a6
NewSubscriptionMetrics(subscription) returns a *SubscriptionMetrics whose method set (Pushed/PushSkipped/Dropped/ChannelFull) satisfies the Observer interface of gitea.unbound.se/shiny/subscriptions structurally — otelsetup does not import that library. Services wire it via subscriptions.WithObserver so the cross-service subscription push path (ADR-0012) reports outcomes as metrics instead of only the library's skip/drop warn logs. One Int64Counter `subscription.notifications` labelled by (subscription, outcome) only — the subscriber key (company id / user email) is deliberately NOT a label to avoid unbounded cardinality. skip-rate = skipped/(pushed+skipped). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
86 lines
3.4 KiB
Go
86 lines
3.4 KiB
Go
package otelsetup
|
|
|
|
import (
|
|
"context"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
)
|
|
|
|
// subscriptionMeterName is the instrumentation scope for the cross-service
|
|
// subscription push metrics emitted by SubscriptionMetrics.
|
|
const subscriptionMeterName = "gitea.unbound.se/shiny/otelsetup/subscriptions"
|
|
|
|
// SubscriptionMetrics records cross-service subscription push outcomes (the
|
|
// read-your-writes subscriptions described in ADR-0012) as OpenTelemetry
|
|
// counters.
|
|
//
|
|
// Its method set (Pushed/PushSkipped/Dropped/ChannelFull) satisfies the
|
|
// Observer interface of gitea.unbound.se/shiny/subscriptions *structurally* —
|
|
// otelsetup does not import that library. A service constructs it and passes it
|
|
// to the registry:
|
|
//
|
|
// metrics, err := otelsetup.NewSubscriptionMetrics("availableCompanies")
|
|
// if err != nil { return err }
|
|
// reg := subscriptions.New[T](subscriptions.WithObserver(metrics))
|
|
//
|
|
// The WithObserver call type-checks the structural match, so a drift in the
|
|
// Observer interface fails the service build. Add a
|
|
// `var _ subscriptions.Observer = (*otelsetup.SubscriptionMetrics)(nil)` next to
|
|
// it for an explicit guard.
|
|
//
|
|
// Outcomes are recorded against a low-cardinality (subscription, outcome) pair.
|
|
// The subscriber key (company id / user email) the Observer methods receive is
|
|
// deliberately NOT used as a metric attribute — that would be unbounded
|
|
// cardinality; the key stays in the subscriptions library's logs for
|
|
// correlation.
|
|
type SubscriptionMetrics struct {
|
|
notifications metric.Int64Counter
|
|
subscription attribute.KeyValue
|
|
}
|
|
|
|
// NewSubscriptionMetrics builds a SubscriptionMetrics that records to the global
|
|
// OpenTelemetry MeterProvider. subscription is the low-cardinality name of the
|
|
// subscription field (e.g. "availableCompanies", "entryBasesChanged"), recorded
|
|
// as an attribute so one counter covers every subscription.
|
|
//
|
|
// SetupOTelSDK must have run first so the global MeterProvider is configured;
|
|
// when metrics are disabled the global provider is a no-op and recording is
|
|
// effectively free.
|
|
func NewSubscriptionMetrics(subscription string) (*SubscriptionMetrics, error) {
|
|
c, err := otel.Meter(subscriptionMeterName).Int64Counter(
|
|
"subscription.notifications",
|
|
metric.WithDescription("Cross-service subscription push outcomes, by outcome (pushed/skipped/dropped/channel_full)."),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &SubscriptionMetrics{
|
|
notifications: c,
|
|
subscription: attribute.String("subscription", subscription),
|
|
}, nil
|
|
}
|
|
|
|
func (s *SubscriptionMetrics) record(outcome string) {
|
|
s.notifications.Add(context.Background(), 1, metric.WithAttributes(
|
|
s.subscription,
|
|
attribute.String("outcome", outcome),
|
|
))
|
|
}
|
|
|
|
// Pushed records a change that was gated and delivered to the key's subscribers
|
|
// — the denominator for a skip/drop rate.
|
|
func (s *SubscriptionMetrics) Pushed(string) { s.record("pushed") }
|
|
|
|
// PushSkipped records a push skipped because the read view never reflected the
|
|
// change within the retry budget.
|
|
func (s *SubscriptionMetrics) PushSkipped(string) { s.record("skipped") }
|
|
|
|
// Dropped records a notification dropped because the worker queue was full.
|
|
func (s *SubscriptionMetrics) Dropped(string) { s.record("dropped") }
|
|
|
|
// ChannelFull records a notification dropped because a subscriber's buffer was
|
|
// full.
|
|
func (s *SubscriptionMetrics) ChannelFull(string) { s.record("channel_full") }
|