Files
schemas/cache/cache.go
T
argoyle ffcf41b85a feat: add commands for managing organizations and users
Introduce `AddUserToOrganization`, `RemoveAPIKey`, and 
`RemoveOrganization` commands to enhance organization 
management. Implement validation for user addition and 
API key removal. Update GraphQL schema to support new 
mutations and add caching for the new events, ensuring 
that organizations and their relationships are accurately 
represented in the cache.
2025-11-22 18:37:07 +01:00

271 lines
8.1 KiB
Go

package cache
import (
"fmt"
"log/slog"
"sync"
"time"
"github.com/sparetimecoders/goamqp"
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
"gitlab.com/unboundsoftware/schemas/domain"
"gitlab.com/unboundsoftware/schemas/hash"
)
type Cache struct {
mu sync.RWMutex
organizations map[string]domain.Organization
users map[string][]string
apiKeys map[string]domain.APIKey // keyed by organizationId-name
services map[string]map[string]map[string]struct{}
subGraphs map[string]string
lastUpdate map[string]string
logger *slog.Logger
}
func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization {
c.mu.RLock()
defer c.mu.RUnlock()
// Find the API key by comparing hashes
for _, key := range c.apiKeys {
if hash.CompareAPIKey(key.Key, apiKey) {
org, exists := c.organizations[key.OrganizationId]
if !exists {
return nil
}
return &org
}
}
return nil
}
func (c *Cache) OrganizationsByUser(sub string) []domain.Organization {
c.mu.RLock()
defer c.mu.RUnlock()
orgIds := c.users[sub]
orgs := make([]domain.Organization, len(orgIds))
for i, id := range orgIds {
orgs[i] = c.organizations[id]
}
return orgs
}
func (c *Cache) AllOrganizations() []domain.Organization {
c.mu.RLock()
defer c.mu.RUnlock()
orgs := make([]domain.Organization, 0, len(c.organizations))
for _, org := range c.organizations {
orgs = append(orgs, org)
}
return orgs
}
func (c *Cache) ApiKeyByKey(key string) *domain.APIKey {
c.mu.RLock()
defer c.mu.RUnlock()
// Find the API key by comparing hashes
for _, apiKey := range c.apiKeys {
if hash.CompareAPIKey(apiKey.Key, key) {
return &apiKey
}
}
return nil
}
func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) {
c.mu.RLock()
defer c.mu.RUnlock()
key := refKey(orgId, ref)
var services []string
if lastUpdate == "" || c.lastUpdate[key] > lastUpdate {
for k := range c.services[orgId][ref] {
services = append(services, k)
}
}
return services, c.lastUpdate[key]
}
func (c *Cache) SubGraphId(orgId, ref, service string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.subGraphs[subGraphKey(orgId, ref, service)]
}
func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
c.mu.Lock()
defer c.mu.Unlock()
switch m := msg.(type) {
case *domain.OrganizationAdded:
o := domain.Organization{
BaseAggregate: eventsourced.BaseAggregateFromString(m.ID.String()),
}
m.UpdateOrganization(&o)
c.organizations[m.ID.String()] = o
c.addUser(m.Initiator, o)
c.logger.With("org_id", m.ID.String(), "event", "OrganizationAdded").Debug("cache updated")
case *domain.UserAddedToOrganization:
org, exists := c.organizations[m.ID.String()]
if exists {
m.UpdateOrganization(&org)
c.organizations[m.ID.String()] = org
c.addUser(m.UserId, org)
c.logger.With("org_id", m.ID.String(), "user_id", m.UserId, "event", "UserAddedToOrganization").Debug("cache updated")
} else {
c.logger.With("org_id", m.ID.String(), "event", "UserAddedToOrganization").Warn("organization not found in cache")
}
case *domain.APIKeyAdded:
key := domain.APIKey{
Name: m.Name,
OrganizationId: m.OrganizationId,
Key: m.Key, // This is now the hashed key
Refs: m.Refs,
Read: m.Read,
Publish: m.Publish,
CreatedBy: m.Initiator,
CreatedAt: m.When(),
}
// Use composite key: organizationId-name
c.apiKeys[apiKeyId(m.OrganizationId, m.Name)] = key
org := c.organizations[m.OrganizationId]
org.APIKeys = append(org.APIKeys, key)
c.organizations[m.OrganizationId] = org
c.logger.With("org_id", m.OrganizationId, "key_name", m.Name, "event", "APIKeyAdded").Debug("cache updated")
case *domain.APIKeyRemoved:
orgId := m.ID.String()
org, exists := c.organizations[orgId]
if exists {
// Remove from organization's API keys list
for i, key := range org.APIKeys {
if key.Name == m.KeyName {
org.APIKeys = append(org.APIKeys[:i], org.APIKeys[i+1:]...)
break
}
}
c.organizations[orgId] = org
// Remove from apiKeys map
delete(c.apiKeys, apiKeyId(orgId, m.KeyName))
c.logger.With("org_id", orgId, "key_name", m.KeyName, "event", "APIKeyRemoved").Debug("cache updated")
} else {
c.logger.With("org_id", orgId, "event", "APIKeyRemoved").Warn("organization not found in cache")
}
case *domain.OrganizationRemoved:
orgId := m.ID.String()
org, exists := c.organizations[orgId]
if exists {
// Remove all API keys for this organization
for _, key := range org.APIKeys {
delete(c.apiKeys, apiKeyId(orgId, key.Name))
}
// Remove organization from all users
for userId, userOrgs := range c.users {
for i, userOrgId := range userOrgs {
if userOrgId == orgId {
c.users[userId] = append(userOrgs[:i], userOrgs[i+1:]...)
break
}
}
// If user has no more organizations, remove from map
if len(c.users[userId]) == 0 {
delete(c.users, userId)
}
}
// Remove services for this organization
if refs, exists := c.services[orgId]; exists {
for ref := range refs {
// Remove all subgraphs for this org/ref combination
for service := range refs[ref] {
delete(c.subGraphs, subGraphKey(orgId, ref, service))
}
// Remove lastUpdate for this org/ref
delete(c.lastUpdate, refKey(orgId, ref))
}
delete(c.services, orgId)
}
// Remove organization
delete(c.organizations, orgId)
c.logger.With("org_id", orgId, "event", "OrganizationRemoved").Debug("cache updated")
} else {
c.logger.With("org_id", orgId, "event", "OrganizationRemoved").Warn("organization not found in cache")
}
case *domain.SubGraphUpdated:
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.Time)
c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraphUpdated").Debug("cache updated")
case *domain.Organization:
c.organizations[m.ID.String()] = *m
c.addUser(m.CreatedBy, *m)
for _, k := range m.APIKeys {
// Use composite key: organizationId-name
c.apiKeys[apiKeyId(k.OrganizationId, k.Name)] = k
}
c.logger.With("org_id", m.ID.String(), "event", "Organization aggregate loaded").Debug("cache updated")
case *domain.SubGraph:
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.ChangedAt)
c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraph aggregate loaded").Debug("cache updated")
default:
c.logger.With("msg", msg).Warn("unexpected message received")
}
return nil, nil
}
func (c *Cache) updateSubGraph(orgId string, ref string, subGraphId string, service string, updated time.Time) {
if _, exists := c.services[orgId]; !exists {
c.services[orgId] = make(map[string]map[string]struct{})
}
if _, exists := c.services[orgId][ref]; !exists {
c.services[orgId][ref] = make(map[string]struct{})
}
c.services[orgId][ref][subGraphId] = struct{}{}
c.subGraphs[subGraphKey(orgId, ref, service)] = subGraphId
c.lastUpdate[refKey(orgId, ref)] = updated.Format(time.RFC3339Nano)
}
func (c *Cache) addUser(sub string, organization domain.Organization) {
user, exists := c.users[sub]
orgId := organization.ID.String()
if !exists {
c.users[sub] = []string{orgId}
return
}
// Check if organization already exists for this user
for _, id := range user {
if id == orgId {
return // Already exists, no need to add
}
}
c.users[sub] = append(user, orgId)
}
func New(logger *slog.Logger) *Cache {
return &Cache{
organizations: make(map[string]domain.Organization),
users: make(map[string][]string),
apiKeys: make(map[string]domain.APIKey),
services: make(map[string]map[string]map[string]struct{}),
subGraphs: make(map[string]string),
lastUpdate: make(map[string]string),
logger: logger,
}
}
func refKey(orgId string, ref string) string {
return fmt.Sprintf("%s<->%s", orgId, ref)
}
func subGraphKey(orgId string, ref string, service string) string {
return fmt.Sprintf("%s<->%s<->%s", orgId, ref, service)
}
func apiKeyId(orgId string, name string) string {
return fmt.Sprintf("%s<->%s", orgId, name)
}