ffcf41b85a
Introduce `AddUserToOrganization`, `RemoveAPIKey`, and `RemoveOrganization` commands to enhance organization management. Implement validation for user addition and API key removal. Update GraphQL schema to support new mutations and add caching for the new events, ensuring that organizations and their relationships are accurately represented in the cache.
271 lines
8.1 KiB
Go
271 lines
8.1 KiB
Go
package cache
|
|
|
|
import (
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sparetimecoders/goamqp"
|
|
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
|
|
|
"gitlab.com/unboundsoftware/schemas/domain"
|
|
"gitlab.com/unboundsoftware/schemas/hash"
|
|
)
|
|
|
|
type Cache struct {
|
|
mu sync.RWMutex
|
|
organizations map[string]domain.Organization
|
|
users map[string][]string
|
|
apiKeys map[string]domain.APIKey // keyed by organizationId-name
|
|
services map[string]map[string]map[string]struct{}
|
|
subGraphs map[string]string
|
|
lastUpdate map[string]string
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
// Find the API key by comparing hashes
|
|
for _, key := range c.apiKeys {
|
|
if hash.CompareAPIKey(key.Key, apiKey) {
|
|
org, exists := c.organizations[key.OrganizationId]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
return &org
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cache) OrganizationsByUser(sub string) []domain.Organization {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
orgIds := c.users[sub]
|
|
orgs := make([]domain.Organization, len(orgIds))
|
|
for i, id := range orgIds {
|
|
orgs[i] = c.organizations[id]
|
|
}
|
|
return orgs
|
|
}
|
|
|
|
func (c *Cache) AllOrganizations() []domain.Organization {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
orgs := make([]domain.Organization, 0, len(c.organizations))
|
|
for _, org := range c.organizations {
|
|
orgs = append(orgs, org)
|
|
}
|
|
return orgs
|
|
}
|
|
|
|
func (c *Cache) ApiKeyByKey(key string) *domain.APIKey {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
// Find the API key by comparing hashes
|
|
for _, apiKey := range c.apiKeys {
|
|
if hash.CompareAPIKey(apiKey.Key, key) {
|
|
return &apiKey
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
key := refKey(orgId, ref)
|
|
var services []string
|
|
if lastUpdate == "" || c.lastUpdate[key] > lastUpdate {
|
|
for k := range c.services[orgId][ref] {
|
|
services = append(services, k)
|
|
}
|
|
}
|
|
return services, c.lastUpdate[key]
|
|
}
|
|
|
|
func (c *Cache) SubGraphId(orgId, ref, service string) string {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return c.subGraphs[subGraphKey(orgId, ref, service)]
|
|
}
|
|
|
|
func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
switch m := msg.(type) {
|
|
case *domain.OrganizationAdded:
|
|
o := domain.Organization{
|
|
BaseAggregate: eventsourced.BaseAggregateFromString(m.ID.String()),
|
|
}
|
|
m.UpdateOrganization(&o)
|
|
c.organizations[m.ID.String()] = o
|
|
c.addUser(m.Initiator, o)
|
|
c.logger.With("org_id", m.ID.String(), "event", "OrganizationAdded").Debug("cache updated")
|
|
case *domain.UserAddedToOrganization:
|
|
org, exists := c.organizations[m.ID.String()]
|
|
if exists {
|
|
m.UpdateOrganization(&org)
|
|
c.organizations[m.ID.String()] = org
|
|
c.addUser(m.UserId, org)
|
|
c.logger.With("org_id", m.ID.String(), "user_id", m.UserId, "event", "UserAddedToOrganization").Debug("cache updated")
|
|
} else {
|
|
c.logger.With("org_id", m.ID.String(), "event", "UserAddedToOrganization").Warn("organization not found in cache")
|
|
}
|
|
case *domain.APIKeyAdded:
|
|
key := domain.APIKey{
|
|
Name: m.Name,
|
|
OrganizationId: m.OrganizationId,
|
|
Key: m.Key, // This is now the hashed key
|
|
Refs: m.Refs,
|
|
Read: m.Read,
|
|
Publish: m.Publish,
|
|
CreatedBy: m.Initiator,
|
|
CreatedAt: m.When(),
|
|
}
|
|
// Use composite key: organizationId-name
|
|
c.apiKeys[apiKeyId(m.OrganizationId, m.Name)] = key
|
|
org := c.organizations[m.OrganizationId]
|
|
org.APIKeys = append(org.APIKeys, key)
|
|
c.organizations[m.OrganizationId] = org
|
|
c.logger.With("org_id", m.OrganizationId, "key_name", m.Name, "event", "APIKeyAdded").Debug("cache updated")
|
|
case *domain.APIKeyRemoved:
|
|
orgId := m.ID.String()
|
|
org, exists := c.organizations[orgId]
|
|
if exists {
|
|
// Remove from organization's API keys list
|
|
for i, key := range org.APIKeys {
|
|
if key.Name == m.KeyName {
|
|
org.APIKeys = append(org.APIKeys[:i], org.APIKeys[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
c.organizations[orgId] = org
|
|
// Remove from apiKeys map
|
|
delete(c.apiKeys, apiKeyId(orgId, m.KeyName))
|
|
c.logger.With("org_id", orgId, "key_name", m.KeyName, "event", "APIKeyRemoved").Debug("cache updated")
|
|
} else {
|
|
c.logger.With("org_id", orgId, "event", "APIKeyRemoved").Warn("organization not found in cache")
|
|
}
|
|
case *domain.OrganizationRemoved:
|
|
orgId := m.ID.String()
|
|
org, exists := c.organizations[orgId]
|
|
if exists {
|
|
// Remove all API keys for this organization
|
|
for _, key := range org.APIKeys {
|
|
delete(c.apiKeys, apiKeyId(orgId, key.Name))
|
|
}
|
|
// Remove organization from all users
|
|
for userId, userOrgs := range c.users {
|
|
for i, userOrgId := range userOrgs {
|
|
if userOrgId == orgId {
|
|
c.users[userId] = append(userOrgs[:i], userOrgs[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
// If user has no more organizations, remove from map
|
|
if len(c.users[userId]) == 0 {
|
|
delete(c.users, userId)
|
|
}
|
|
}
|
|
// Remove services for this organization
|
|
if refs, exists := c.services[orgId]; exists {
|
|
for ref := range refs {
|
|
// Remove all subgraphs for this org/ref combination
|
|
for service := range refs[ref] {
|
|
delete(c.subGraphs, subGraphKey(orgId, ref, service))
|
|
}
|
|
// Remove lastUpdate for this org/ref
|
|
delete(c.lastUpdate, refKey(orgId, ref))
|
|
}
|
|
delete(c.services, orgId)
|
|
}
|
|
// Remove organization
|
|
delete(c.organizations, orgId)
|
|
c.logger.With("org_id", orgId, "event", "OrganizationRemoved").Debug("cache updated")
|
|
} else {
|
|
c.logger.With("org_id", orgId, "event", "OrganizationRemoved").Warn("organization not found in cache")
|
|
}
|
|
case *domain.SubGraphUpdated:
|
|
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.Time)
|
|
c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraphUpdated").Debug("cache updated")
|
|
case *domain.Organization:
|
|
c.organizations[m.ID.String()] = *m
|
|
c.addUser(m.CreatedBy, *m)
|
|
for _, k := range m.APIKeys {
|
|
// Use composite key: organizationId-name
|
|
c.apiKeys[apiKeyId(k.OrganizationId, k.Name)] = k
|
|
}
|
|
c.logger.With("org_id", m.ID.String(), "event", "Organization aggregate loaded").Debug("cache updated")
|
|
case *domain.SubGraph:
|
|
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.ChangedAt)
|
|
c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraph aggregate loaded").Debug("cache updated")
|
|
default:
|
|
c.logger.With("msg", msg).Warn("unexpected message received")
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (c *Cache) updateSubGraph(orgId string, ref string, subGraphId string, service string, updated time.Time) {
|
|
if _, exists := c.services[orgId]; !exists {
|
|
c.services[orgId] = make(map[string]map[string]struct{})
|
|
}
|
|
if _, exists := c.services[orgId][ref]; !exists {
|
|
c.services[orgId][ref] = make(map[string]struct{})
|
|
}
|
|
c.services[orgId][ref][subGraphId] = struct{}{}
|
|
c.subGraphs[subGraphKey(orgId, ref, service)] = subGraphId
|
|
c.lastUpdate[refKey(orgId, ref)] = updated.Format(time.RFC3339Nano)
|
|
}
|
|
|
|
func (c *Cache) addUser(sub string, organization domain.Organization) {
|
|
user, exists := c.users[sub]
|
|
orgId := organization.ID.String()
|
|
if !exists {
|
|
c.users[sub] = []string{orgId}
|
|
return
|
|
}
|
|
|
|
// Check if organization already exists for this user
|
|
for _, id := range user {
|
|
if id == orgId {
|
|
return // Already exists, no need to add
|
|
}
|
|
}
|
|
|
|
c.users[sub] = append(user, orgId)
|
|
}
|
|
|
|
func New(logger *slog.Logger) *Cache {
|
|
return &Cache{
|
|
organizations: make(map[string]domain.Organization),
|
|
users: make(map[string][]string),
|
|
apiKeys: make(map[string]domain.APIKey),
|
|
services: make(map[string]map[string]map[string]struct{}),
|
|
subGraphs: make(map[string]string),
|
|
lastUpdate: make(map[string]string),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
func refKey(orgId string, ref string) string {
|
|
return fmt.Sprintf("%s<->%s", orgId, ref)
|
|
}
|
|
|
|
func subGraphKey(orgId string, ref string, service string) string {
|
|
return fmt.Sprintf("%s<->%s<->%s", orgId, ref, service)
|
|
}
|
|
|
|
func apiKeyId(orgId string, name string) string {
|
|
return fmt.Sprintf("%s<->%s", orgId, name)
|
|
}
|