Files
schemas/cache/cache.go
T
argoyle aaa111dd20 feat(service): implement graceful shutdown for HTTP server
Add a context with timeout to handle graceful shutdown of the HTTP 
server. Update error handling during the server's close to include 
context-aware shutdown. Ensure that the server properly logs only 
non-closed errors when listening.
2025-04-12 13:41:02 +02:00

146 lines
3.9 KiB
Go

package cache
import (
"fmt"
"log/slog"
"time"
"github.com/sparetimecoders/goamqp"
"gitlab.com/unboundsoftware/schemas/domain"
"gitlab.com/unboundsoftware/schemas/hash"
)
type Cache struct {
organizations map[string]domain.Organization
users map[string][]string
apiKeys map[string]domain.APIKey
services map[string]map[string]map[string]struct{}
subGraphs map[string]string
lastUpdate map[string]string
logger *slog.Logger
}
func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization {
key, exists := c.apiKeys[apiKey]
if !exists {
return nil
}
org, exists := c.organizations[key.OrganizationId]
if !exists {
return nil
}
return &org
}
func (c *Cache) OrganizationsByUser(sub string) []domain.Organization {
orgIds := c.users[sub]
orgs := make([]domain.Organization, len(orgIds))
for i, id := range orgIds {
orgs[i] = c.organizations[id]
}
return orgs
}
func (c *Cache) ApiKeyByKey(key string) *domain.APIKey {
k, exists := c.apiKeys[hash.String(key)]
if !exists {
return nil
}
return &k
}
func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) {
key := refKey(orgId, ref)
var services []string
if lastUpdate == "" || c.lastUpdate[key] > lastUpdate {
for k := range c.services[orgId][ref] {
services = append(services, k)
}
}
return services, c.lastUpdate[key]
}
func (c *Cache) SubGraphId(orgId, ref, service string) string {
return c.subGraphs[subGraphKey(orgId, ref, service)]
}
func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
switch m := msg.(type) {
case *domain.OrganizationAdded:
o := domain.Organization{}
m.UpdateOrganization(&o)
c.organizations[m.ID.String()] = o
c.addUser(m.Initiator, o)
case *domain.APIKeyAdded:
key := domain.APIKey{
Name: m.Name,
OrganizationId: m.OrganizationId,
Key: m.Key,
Refs: m.Refs,
Read: m.Read,
Publish: m.Publish,
CreatedBy: m.Initiator,
CreatedAt: m.When(),
}
c.apiKeys[m.Key] = key
org := c.organizations[m.OrganizationId]
org.APIKeys = append(org.APIKeys, key)
c.organizations[m.OrganizationId] = org
case *domain.SubGraphUpdated:
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.Time)
case *domain.Organization:
c.organizations[m.ID.String()] = *m
c.addUser(m.CreatedBy, *m)
for _, k := range m.APIKeys {
c.apiKeys[k.Key] = k
}
case *domain.SubGraph:
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.ChangedAt)
default:
c.logger.With("msg", msg).Warn("unexpected message received")
}
return nil, nil
}
func (c *Cache) updateSubGraph(orgId string, ref string, subGraphId string, service string, updated time.Time) {
if _, exists := c.services[orgId]; !exists {
c.services[orgId] = make(map[string]map[string]struct{})
}
if _, exists := c.services[orgId][ref]; !exists {
c.services[orgId][ref] = make(map[string]struct{})
}
c.services[orgId][ref][subGraphId] = struct{}{}
c.subGraphs[subGraphKey(orgId, ref, service)] = subGraphId
c.lastUpdate[refKey(orgId, ref)] = updated.Format(time.RFC3339Nano)
}
func (c *Cache) addUser(sub string, organization domain.Organization) {
user, exists := c.users[sub]
if !exists {
c.users[sub] = []string{organization.ID.String()}
} else {
c.users[sub] = append(user, organization.ID.String())
}
}
func New(logger *slog.Logger) *Cache {
return &Cache{
organizations: make(map[string]domain.Organization),
users: make(map[string][]string),
apiKeys: make(map[string]domain.APIKey),
services: make(map[string]map[string]map[string]struct{}),
subGraphs: make(map[string]string),
lastUpdate: make(map[string]string),
logger: logger,
}
}
func refKey(orgId string, ref string) string {
return fmt.Sprintf("%s<->%s", orgId, ref)
}
func subGraphKey(orgId string, ref string, service string) string {
return fmt.Sprintf("%s<->%s<->%s", orgId, ref, service)
}