2025-11-19 11:29:30 +01:00
|
|
|
package graph
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"sync"
|
|
|
|
|
|
2026-01-17 22:53:46 +01:00
|
|
|
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
|
2025-11-19 11:29:30 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// PubSub handles publishing schema updates to subscribers
|
|
|
|
|
type PubSub struct {
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
subscribers map[string][]chan *model.SchemaUpdate
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewPubSub() *PubSub {
|
|
|
|
|
return &PubSub{
|
|
|
|
|
subscribers: make(map[string][]chan *model.SchemaUpdate),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Subscribe creates a new subscription channel for a given schema ref
|
|
|
|
|
func (ps *PubSub) Subscribe(ref string) chan *model.SchemaUpdate {
|
|
|
|
|
ps.mu.Lock()
|
|
|
|
|
defer ps.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
ch := make(chan *model.SchemaUpdate, 10)
|
|
|
|
|
ps.subscribers[ref] = append(ps.subscribers[ref], ch)
|
|
|
|
|
|
|
|
|
|
return ch
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unsubscribe removes a subscription channel
|
|
|
|
|
func (ps *PubSub) Unsubscribe(ref string, ch chan *model.SchemaUpdate) {
|
|
|
|
|
ps.mu.Lock()
|
|
|
|
|
defer ps.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
subs := ps.subscribers[ref]
|
|
|
|
|
for i, sub := range subs {
|
|
|
|
|
if sub == ch {
|
|
|
|
|
// Remove this subscriber
|
|
|
|
|
ps.subscribers[ref] = append(subs[:i], subs[i+1:]...)
|
|
|
|
|
close(sub)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Clean up empty subscriber lists
|
|
|
|
|
if len(ps.subscribers[ref]) == 0 {
|
|
|
|
|
delete(ps.subscribers, ref)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Publish sends a schema update to all subscribers of a given ref
|
|
|
|
|
func (ps *PubSub) Publish(ref string, update *model.SchemaUpdate) {
|
|
|
|
|
ps.mu.RLock()
|
|
|
|
|
defer ps.mu.RUnlock()
|
|
|
|
|
|
|
|
|
|
for _, ch := range ps.subscribers[ref] {
|
|
|
|
|
// Non-blocking send - if subscriber is slow, skip
|
|
|
|
|
select {
|
|
|
|
|
case ch <- update:
|
|
|
|
|
default:
|
|
|
|
|
// Channel full, subscriber is too slow - skip this update
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|