Files

67 lines
1.5 KiB
Go
Raw Permalink Normal View History

package graph
import (
"sync"
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
)
// PubSub handles publishing schema updates to subscribers
type PubSub struct {
mu sync.RWMutex
subscribers map[string][]chan *model.SchemaUpdate
}
func NewPubSub() *PubSub {
return &PubSub{
subscribers: make(map[string][]chan *model.SchemaUpdate),
}
}
// Subscribe creates a new subscription channel for a given schema ref
func (ps *PubSub) Subscribe(ref string) chan *model.SchemaUpdate {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan *model.SchemaUpdate, 10)
ps.subscribers[ref] = append(ps.subscribers[ref], ch)
return ch
}
// Unsubscribe removes a subscription channel
func (ps *PubSub) Unsubscribe(ref string, ch chan *model.SchemaUpdate) {
ps.mu.Lock()
defer ps.mu.Unlock()
subs := ps.subscribers[ref]
for i, sub := range subs {
if sub == ch {
// Remove this subscriber
ps.subscribers[ref] = append(subs[:i], subs[i+1:]...)
close(sub)
break
}
}
// Clean up empty subscriber lists
if len(ps.subscribers[ref]) == 0 {
delete(ps.subscribers, ref)
}
}
// Publish sends a schema update to all subscribers of a given ref
func (ps *PubSub) Publish(ref string, update *model.SchemaUpdate) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, ch := range ps.subscribers[ref] {
// Non-blocking send - if subscriber is slow, skip
select {
case ch <- update:
default:
// Channel full, subscriber is too slow - skip this update
}
}
}