feat(cache): add concurrency safety and logging improvements
Implement read-write mutex locks for cache functions to ensure concurrency safety. Add debug logging for cache updates to enhance traceability of operations. Optimize user addition logic to prevent duplicates. Introduce a new test file for comprehensive cache functionality testing, ensuring reliable behavior.
This commit is contained in:
Vendored
+41
-4
@@ -3,15 +3,18 @@ package cache
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sparetimecoders/goamqp"
|
||||
"gitlab.com/unboundsoftware/eventsourced/eventsourced"
|
||||
|
||||
"gitlab.com/unboundsoftware/schemas/domain"
|
||||
"gitlab.com/unboundsoftware/schemas/hash"
|
||||
)
|
||||
|
||||
type Cache struct {
|
||||
mu sync.RWMutex
|
||||
organizations map[string]domain.Organization
|
||||
users map[string][]string
|
||||
apiKeys map[string]domain.APIKey // keyed by organizationId-name
|
||||
@@ -22,6 +25,9 @@ type Cache struct {
|
||||
}
|
||||
|
||||
func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
// Find the API key by comparing hashes
|
||||
for _, key := range c.apiKeys {
|
||||
if hash.CompareAPIKey(key.Key, apiKey) {
|
||||
@@ -36,6 +42,9 @@ func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization {
|
||||
}
|
||||
|
||||
func (c *Cache) OrganizationsByUser(sub string) []domain.Organization {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
orgIds := c.users[sub]
|
||||
orgs := make([]domain.Organization, len(orgIds))
|
||||
for i, id := range orgIds {
|
||||
@@ -45,6 +54,9 @@ func (c *Cache) OrganizationsByUser(sub string) []domain.Organization {
|
||||
}
|
||||
|
||||
func (c *Cache) ApiKeyByKey(key string) *domain.APIKey {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
// Find the API key by comparing hashes
|
||||
for _, apiKey := range c.apiKeys {
|
||||
if hash.CompareAPIKey(apiKey.Key, key) {
|
||||
@@ -55,6 +67,9 @@ func (c *Cache) ApiKeyByKey(key string) *domain.APIKey {
|
||||
}
|
||||
|
||||
func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
key := refKey(orgId, ref)
|
||||
var services []string
|
||||
if lastUpdate == "" || c.lastUpdate[key] > lastUpdate {
|
||||
@@ -66,16 +81,25 @@ func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) {
|
||||
}
|
||||
|
||||
func (c *Cache) SubGraphId(orgId, ref, service string) string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
return c.subGraphs[subGraphKey(orgId, ref, service)]
|
||||
}
|
||||
|
||||
func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
switch m := msg.(type) {
|
||||
case *domain.OrganizationAdded:
|
||||
o := domain.Organization{}
|
||||
o := domain.Organization{
|
||||
BaseAggregate: eventsourced.BaseAggregateFromString(m.ID.String()),
|
||||
}
|
||||
m.UpdateOrganization(&o)
|
||||
c.organizations[m.ID.String()] = o
|
||||
c.addUser(m.Initiator, o)
|
||||
c.logger.With("org_id", m.ID.String(), "event", "OrganizationAdded").Debug("cache updated")
|
||||
case *domain.APIKeyAdded:
|
||||
key := domain.APIKey{
|
||||
Name: m.Name,
|
||||
@@ -92,8 +116,10 @@ func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
|
||||
org := c.organizations[m.OrganizationId]
|
||||
org.APIKeys = append(org.APIKeys, key)
|
||||
c.organizations[m.OrganizationId] = org
|
||||
c.logger.With("org_id", m.OrganizationId, "key_name", m.Name, "event", "APIKeyAdded").Debug("cache updated")
|
||||
case *domain.SubGraphUpdated:
|
||||
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.Time)
|
||||
c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraphUpdated").Debug("cache updated")
|
||||
case *domain.Organization:
|
||||
c.organizations[m.ID.String()] = *m
|
||||
c.addUser(m.CreatedBy, *m)
|
||||
@@ -101,8 +127,10 @@ func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) {
|
||||
// Use composite key: organizationId-name
|
||||
c.apiKeys[apiKeyId(k.OrganizationId, k.Name)] = k
|
||||
}
|
||||
c.logger.With("org_id", m.ID.String(), "event", "Organization aggregate loaded").Debug("cache updated")
|
||||
case *domain.SubGraph:
|
||||
c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.ChangedAt)
|
||||
c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraph aggregate loaded").Debug("cache updated")
|
||||
default:
|
||||
c.logger.With("msg", msg).Warn("unexpected message received")
|
||||
}
|
||||
@@ -123,11 +151,20 @@ func (c *Cache) updateSubGraph(orgId string, ref string, subGraphId string, serv
|
||||
|
||||
func (c *Cache) addUser(sub string, organization domain.Organization) {
|
||||
user, exists := c.users[sub]
|
||||
orgId := organization.ID.String()
|
||||
if !exists {
|
||||
c.users[sub] = []string{organization.ID.String()}
|
||||
} else {
|
||||
c.users[sub] = append(user, organization.ID.String())
|
||||
c.users[sub] = []string{orgId}
|
||||
return
|
||||
}
|
||||
|
||||
// Check if organization already exists for this user
|
||||
for _, id := range user {
|
||||
if id == orgId {
|
||||
return // Already exists, no need to add
|
||||
}
|
||||
}
|
||||
|
||||
c.users[sub] = append(user, orgId)
|
||||
}
|
||||
|
||||
func New(logger *slog.Logger) *Cache {
|
||||
|
||||
Reference in New Issue
Block a user