From 27bdbf2ac3dfe2df1f6500990ccc7fa5291bd3e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Wed, 6 Nov 2024 15:22:25 +0100 Subject: [PATCH] Draft a counter for RBAC index --- server/pkg/rbac/wrapper_counter.go | 188 +++++++++++++++++++----- server/pkg/rbac/wrapper_counter_test.go | 57 +++++++ 2 files changed, 207 insertions(+), 38 deletions(-) create mode 100644 server/pkg/rbac/wrapper_counter_test.go diff --git a/server/pkg/rbac/wrapper_counter.go b/server/pkg/rbac/wrapper_counter.go index fe0fb340e..0e137aa2c 100644 --- a/server/pkg/rbac/wrapper_counter.go +++ b/server/pkg/rbac/wrapper_counter.go @@ -8,34 +8,132 @@ import ( ) type ( - usageCounter struct { - index map[uint64]uint - + usageCounter[K comparable] struct { lock sync.RWMutex - sigThreshold uint + // index keeps track of all the things we're counting + index map[K]counterItem[K] - incChan chan uint64 - sigChan chan counterEntry + // sigEvictThreshold denotes when the usage counter should evict an item + sigEvictThreshold float64 + // decayFactor denotes how fast the score decays + // when 1 - it won't decay + // when 0 - it's barely preserved + decayFactor float64 + + // incChan sends instructions to the counter re. key K increment + incChan chan K + // sigEvict lets the counter notify the manager what key K should be evicted + sigEvict chan K + // @todo remove + sigChan chan K + + // decayInterval denotes in what interval the decay factor should apply + decayInterval time.Duration + // cleanupInterval denotes in what interval counter evicts stuff + cleanupInterval time.Duration } - counterEntry struct { - key uint64 - count uint + // counterItem wraps some metadata around each index + counterItem[K comparable] struct { + key K + score float64 + + // added denotes when the item was added to the counter + added time.Time + // lastScored denotes when the item was last scored (either via decay or access) + lastScored time.Time + // lastAccess denotes when the item was last accessed, needed + lastAccess time.Time } - MinHeap []counterEntry + MinHeap[K comparable] []counterItem[K] ) -func (svc *usageCounter) worstPerformers(n int) (out []uint64) { +// inc updates key K +func (svc *usageCounter[K]) inc(key K) { + svc.lock.Lock() + defer svc.lock.Unlock() + + _, ok := svc.index[key] + if !ok { + svc.procNew(key) + } else { + svc.procExisting(key) + } +} + +// evict evicts the items below the specified threshold +func (svc *usageCounter[K]) evict() (out []K) { + svc.lock.Lock() + defer svc.lock.Unlock() + + // Firstly score them up + out = make([]K, 0, 4) + for k, v := range svc.index { + if v.score > float64(svc.sigEvictThreshold) { + continue + } + + out = append(out, k) + } + + // Then delete them + for _, r := range out { + delete(svc.index, r) + } + + return out +} + +// decay applies the specified decay factor to the cache items +func (svc *usageCounter[K]) decay() { + svc.lock.Lock() + defer svc.lock.Unlock() + + n := time.Now() + for k, v := range svc.index { + if n.Before(v.lastAccess.Add(svc.decayInterval)) { + continue + } + + v.score *= svc.decayFactor + svc.index[k] = v + } +} + +// bestPerformers returns the top n items based on their score +func (svc *usageCounter[K]) bestPerformers(n int) (out []K) { + svc.lock.RLock() + defer svc.lock.RUnlock() + + hh := make(MinHeap[K], 0, len(svc.index)) + for k, v := range svc.index { + hh = append(hh, counterItem[K]{key: k, score: v.score}) + } + + sort.Sort(hh) + + for i := len(hh) - 1; i >= 0; i-- { + out = append(out, hh[i].key) + + if len(out) >= n { + return + } + } + return +} + +// worstPerformers returns the bottom n items based on their score +func (svc *usageCounter[K]) worstPerformers(n int) (out []K) { svc.lock.RLock() defer svc.lock.RUnlock() // Code to get n elements with the smallest count - hh := make(MinHeap, 0, len(svc.index)) + hh := make(MinHeap[K], 0, len(svc.index)) for k, v := range svc.index { - hh = append(hh, counterEntry{key: k, count: v}) + hh = append(hh, counterItem[K]{key: k, score: v.score}) } sort.Sort(hh) @@ -51,32 +149,40 @@ func (svc *usageCounter) worstPerformers(n int) (out []uint64) { return } -func (svc *usageCounter) inc(key uint64) { - svc.lock.Lock() - defer svc.lock.Unlock() - - count := svc.index[key] + 1 - svc.index[key] = count - - if count >= svc.sigThreshold { - delete(svc.index, key) - svc.sigChan <- counterEntry{key: key, count: count} +// procNew notes a new key in the thing, defaults and stuff +func (svc *usageCounter[K]) procNew(key K) { + n := time.Now() + svc.index[key] = counterItem[K]{ + score: 1, + added: n, + lastScored: n, + lastAccess: n, } } -func (svc *usageCounter) clean() { - svc.lock.Lock() - defer svc.lock.Unlock() +// procExisting notes an access to an existing index element +func (svc *usageCounter[K]) procExisting(key K) { + n := time.Now() - for k, v := range svc.index { - if v < uint(float64(svc.sigThreshold)*0.05) { - delete(svc.index, k) - } - } + aux := svc.index[key] + aux.lastAccess = n + aux.lastScored = n + aux.score++ + + svc.index[key] = aux } -func (svc *usageCounter) watch(ctx context.Context) { - cleanT := time.NewTicker(time.Minute * 10) +func (svc *usageCounter[K]) watch(ctx context.Context) { + if svc.decayInterval == 0 { + panic("svc.decayInterval can not be 0") + } + + if svc.cleanupInterval == 0 { + panic("svc.cleanupInterval can not be 0") + } + + decayT := time.NewTicker(svc.decayInterval) + evictT := time.NewTicker(svc.cleanupInterval) go func() { for { @@ -84,8 +190,14 @@ func (svc *usageCounter) watch(ctx context.Context) { case <-ctx.Done(): return - case <-cleanT.C: - svc.clean() + case <-evictT.C: + evicted := svc.evict() + for _, e := range evicted { + svc.sigEvict <- e + } + + case <-decayT.C: + svc.decay() case key := <-svc.incChan: svc.inc(key) @@ -94,6 +206,6 @@ func (svc *usageCounter) watch(ctx context.Context) { }() } -func (h MinHeap) Len() int { return len(h) } -func (h MinHeap) Less(i, j int) bool { return h[i].count < h[j].count } -func (h MinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h MinHeap[K]) Len() int { return len(h) } +func (h MinHeap[K]) Less(i, j int) bool { return h[i].score < h[j].score } +func (h MinHeap[K]) Swap(i, j int) { h[i], h[j] = h[j], h[i] } diff --git a/server/pkg/rbac/wrapper_counter_test.go b/server/pkg/rbac/wrapper_counter_test.go new file mode 100644 index 000000000..2a9f3bb5d --- /dev/null +++ b/server/pkg/rbac/wrapper_counter_test.go @@ -0,0 +1,57 @@ +package rbac + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWrapperCounter(t *testing.T) { + // @note since I'm leaving the decayInterval empty we don't need to fiddle + // with lastAccess timestamps + svc := &usageCounter[string]{ + index: map[string]counterItem[string]{}, + + sigEvictThreshold: 0.5, + decayFactor: 0.5, + } + + svc.inc("k1") + aux := svc.index["k1"] + require.Equal(t, 1.0, aux.score) + + svc.inc("k2") + aux = svc.index["k1"] + require.Equal(t, 1.0, aux.score) + aux = svc.index["k2"] + require.Equal(t, 1.0, aux.score) + + svc.inc("k1") + aux = svc.index["k1"] + require.Equal(t, 2.0, aux.score) + aux = svc.index["k2"] + require.Equal(t, 1.0, aux.score) + + svc.decay() + aux = svc.index["k1"] + require.Equal(t, 1.0, aux.score) + aux = svc.index["k2"] + require.Equal(t, 0.5, aux.score) + + cleaned := svc.evict() + require.Len(t, cleaned, 1) + aux, ok := svc.index["k1"] + require.True(t, ok) + + aux, ok = svc.index["k2"] + require.False(t, ok) + + svc.decay() + aux = svc.index["k1"] + require.Equal(t, 0.5, aux.score) + + cleaned = svc.evict() + require.Len(t, cleaned, 1) + aux, ok = svc.index["k1"] + require.False(t, ok) +}