3
0

Draft a counter for RBAC index

This commit is contained in:
Tomaž Jerman 2024-11-06 15:22:25 +01:00
parent 447cee2f55
commit 27bdbf2ac3
2 changed files with 207 additions and 38 deletions

View File

@ -8,34 +8,132 @@ import (
)
type (
usageCounter struct {
index map[uint64]uint
usageCounter[K comparable] struct {
lock sync.RWMutex
sigThreshold uint
// index keeps track of all the things we're counting
index map[K]counterItem[K]
incChan chan uint64
sigChan chan counterEntry
// sigEvictThreshold denotes when the usage counter should evict an item
sigEvictThreshold float64
// decayFactor denotes how fast the score decays
// when 1 - it won't decay
// when 0 - it's barely preserved
decayFactor float64
// incChan sends instructions to the counter re. key K increment
incChan chan K
// sigEvict lets the counter notify the manager what key K should be evicted
sigEvict chan K
// @todo remove
sigChan chan K
// decayInterval denotes in what interval the decay factor should apply
decayInterval time.Duration
// cleanupInterval denotes in what interval counter evicts stuff
cleanupInterval time.Duration
}
counterEntry struct {
key uint64
count uint
// counterItem wraps some metadata around each index
counterItem[K comparable] struct {
key K
score float64
// added denotes when the item was added to the counter
added time.Time
// lastScored denotes when the item was last scored (either via decay or access)
lastScored time.Time
// lastAccess denotes when the item was last accessed, needed
lastAccess time.Time
}
MinHeap []counterEntry
MinHeap[K comparable] []counterItem[K]
)
func (svc *usageCounter) worstPerformers(n int) (out []uint64) {
// inc updates key K
func (svc *usageCounter[K]) inc(key K) {
svc.lock.Lock()
defer svc.lock.Unlock()
_, ok := svc.index[key]
if !ok {
svc.procNew(key)
} else {
svc.procExisting(key)
}
}
// evict evicts the items below the specified threshold
func (svc *usageCounter[K]) evict() (out []K) {
svc.lock.Lock()
defer svc.lock.Unlock()
// Firstly score them up
out = make([]K, 0, 4)
for k, v := range svc.index {
if v.score > float64(svc.sigEvictThreshold) {
continue
}
out = append(out, k)
}
// Then delete them
for _, r := range out {
delete(svc.index, r)
}
return out
}
// decay applies the specified decay factor to the cache items
func (svc *usageCounter[K]) decay() {
svc.lock.Lock()
defer svc.lock.Unlock()
n := time.Now()
for k, v := range svc.index {
if n.Before(v.lastAccess.Add(svc.decayInterval)) {
continue
}
v.score *= svc.decayFactor
svc.index[k] = v
}
}
// bestPerformers returns the top n items based on their score
func (svc *usageCounter[K]) bestPerformers(n int) (out []K) {
svc.lock.RLock()
defer svc.lock.RUnlock()
hh := make(MinHeap[K], 0, len(svc.index))
for k, v := range svc.index {
hh = append(hh, counterItem[K]{key: k, score: v.score})
}
sort.Sort(hh)
for i := len(hh) - 1; i >= 0; i-- {
out = append(out, hh[i].key)
if len(out) >= n {
return
}
}
return
}
// worstPerformers returns the bottom n items based on their score
func (svc *usageCounter[K]) worstPerformers(n int) (out []K) {
svc.lock.RLock()
defer svc.lock.RUnlock()
// Code to get n elements with the smallest count
hh := make(MinHeap, 0, len(svc.index))
hh := make(MinHeap[K], 0, len(svc.index))
for k, v := range svc.index {
hh = append(hh, counterEntry{key: k, count: v})
hh = append(hh, counterItem[K]{key: k, score: v.score})
}
sort.Sort(hh)
@ -51,32 +149,40 @@ func (svc *usageCounter) worstPerformers(n int) (out []uint64) {
return
}
func (svc *usageCounter) inc(key uint64) {
svc.lock.Lock()
defer svc.lock.Unlock()
count := svc.index[key] + 1
svc.index[key] = count
if count >= svc.sigThreshold {
delete(svc.index, key)
svc.sigChan <- counterEntry{key: key, count: count}
// procNew notes a new key in the thing, defaults and stuff
func (svc *usageCounter[K]) procNew(key K) {
n := time.Now()
svc.index[key] = counterItem[K]{
score: 1,
added: n,
lastScored: n,
lastAccess: n,
}
}
func (svc *usageCounter) clean() {
svc.lock.Lock()
defer svc.lock.Unlock()
// procExisting notes an access to an existing index element
func (svc *usageCounter[K]) procExisting(key K) {
n := time.Now()
for k, v := range svc.index {
if v < uint(float64(svc.sigThreshold)*0.05) {
delete(svc.index, k)
}
}
aux := svc.index[key]
aux.lastAccess = n
aux.lastScored = n
aux.score++
svc.index[key] = aux
}
func (svc *usageCounter) watch(ctx context.Context) {
cleanT := time.NewTicker(time.Minute * 10)
func (svc *usageCounter[K]) watch(ctx context.Context) {
if svc.decayInterval == 0 {
panic("svc.decayInterval can not be 0")
}
if svc.cleanupInterval == 0 {
panic("svc.cleanupInterval can not be 0")
}
decayT := time.NewTicker(svc.decayInterval)
evictT := time.NewTicker(svc.cleanupInterval)
go func() {
for {
@ -84,8 +190,14 @@ func (svc *usageCounter) watch(ctx context.Context) {
case <-ctx.Done():
return
case <-cleanT.C:
svc.clean()
case <-evictT.C:
evicted := svc.evict()
for _, e := range evicted {
svc.sigEvict <- e
}
case <-decayT.C:
svc.decay()
case key := <-svc.incChan:
svc.inc(key)
@ -94,6 +206,6 @@ func (svc *usageCounter) watch(ctx context.Context) {
}()
}
func (h MinHeap) Len() int { return len(h) }
func (h MinHeap) Less(i, j int) bool { return h[i].count < h[j].count }
func (h MinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h MinHeap[K]) Len() int { return len(h) }
func (h MinHeap[K]) Less(i, j int) bool { return h[i].score < h[j].score }
func (h MinHeap[K]) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

View File

@ -0,0 +1,57 @@
package rbac
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestWrapperCounter(t *testing.T) {
// @note since I'm leaving the decayInterval empty we don't need to fiddle
// with lastAccess timestamps
svc := &usageCounter[string]{
index: map[string]counterItem[string]{},
sigEvictThreshold: 0.5,
decayFactor: 0.5,
}
svc.inc("k1")
aux := svc.index["k1"]
require.Equal(t, 1.0, aux.score)
svc.inc("k2")
aux = svc.index["k1"]
require.Equal(t, 1.0, aux.score)
aux = svc.index["k2"]
require.Equal(t, 1.0, aux.score)
svc.inc("k1")
aux = svc.index["k1"]
require.Equal(t, 2.0, aux.score)
aux = svc.index["k2"]
require.Equal(t, 1.0, aux.score)
svc.decay()
aux = svc.index["k1"]
require.Equal(t, 1.0, aux.score)
aux = svc.index["k2"]
require.Equal(t, 0.5, aux.score)
cleaned := svc.evict()
require.Len(t, cleaned, 1)
aux, ok := svc.index["k1"]
require.True(t, ok)
aux, ok = svc.index["k2"]
require.False(t, ok)
svc.decay()
aux = svc.index["k1"]
require.Equal(t, 0.5, aux.score)
cleaned = svc.evict()
require.Len(t, cleaned, 1)
aux, ok = svc.index["k1"]
require.False(t, ok)
}