Add option to controll reindex interval
This commit is contained in:
parent
a99bf12d87
commit
bc0e5a30ad
@ -265,6 +265,12 @@
|
||||
# Default: <no value>
|
||||
# RBAC_CLEANUP_INTERVAL=<no value>
|
||||
|
||||
###############################################################################
|
||||
# Reindex interval controls when the index should be re-calculated.
|
||||
# Type: time.Duration
|
||||
# Default: <no value>
|
||||
# RBAC_REINDEX_INTERVAL=<no value>
|
||||
|
||||
###############################################################################
|
||||
# [IMPORTANT]
|
||||
# ====
|
||||
|
||||
@ -357,6 +357,7 @@ func (app *CortezaApp) InitServices(ctx context.Context) (err error) {
|
||||
DecayInterval: app.Opt.RBAC.DecayInterval,
|
||||
CleanupInterval: app.Opt.RBAC.CleanupInterval,
|
||||
IndexFlushInterval: app.Opt.RBAC.IndexFlushInterval,
|
||||
ReindexInterval: app.Opt.RBAC.ReindexInterval,
|
||||
RuleStorage: app.Store,
|
||||
RoleStorage: app.Store,
|
||||
|
||||
|
||||
@ -71,6 +71,14 @@ RBAC: schema.#optionsGroup & {
|
||||
"""
|
||||
}
|
||||
|
||||
reindex_interval: {
|
||||
type: "time.Duration"
|
||||
defaultGoExpr: "time.Minute * 10"
|
||||
description: """
|
||||
Reindex interval controls when the index should be re-calculated.
|
||||
"""
|
||||
}
|
||||
|
||||
index_flush_interval: {
|
||||
type: "time.Duration"
|
||||
defaultGoExpr: "time.Minute * 35"
|
||||
|
||||
2
server/pkg/options/options.gen.go
generated
2
server/pkg/options/options.gen.go
generated
@ -59,6 +59,7 @@ type (
|
||||
DecayFactor float64 `env:"RBAC_DECAY_FACTOR"`
|
||||
DecayInterval time.Duration `env:"RBAC_DECAY_INTERVAL"`
|
||||
CleanupInterval time.Duration `env:"RBAC_CLEANUP_INTERVAL"`
|
||||
ReindexInterval time.Duration `env:"RBAC_REINDEX_INTERVAL"`
|
||||
IndexFlushInterval time.Duration `env:"RBAC_INDEX_FLUSH_INTERVAL"`
|
||||
ServiceUser string `env:"RBAC_SERVICE_USER"`
|
||||
BypassRoles string `env:"RBAC_BYPASS_ROLES"`
|
||||
@ -391,6 +392,7 @@ func Rbac() (o *RbacOpt) {
|
||||
DecayFactor: 0.9,
|
||||
DecayInterval: time.Minute * 30,
|
||||
CleanupInterval: time.Minute * 31,
|
||||
ReindexInterval: time.Minute * 10,
|
||||
IndexFlushInterval: time.Minute * 35,
|
||||
BypassRoles: "super-admin",
|
||||
AuthenticatedRoles: "authenticated",
|
||||
|
||||
@ -56,6 +56,8 @@ type (
|
||||
DecayInterval time.Duration
|
||||
// CleanupInterval states how often stale or poor performers should be thrown out
|
||||
CleanupInterval time.Duration
|
||||
// ReindexInterval states how often we should reindex based on updated scores
|
||||
ReindexInterval time.Duration
|
||||
// IndexFlushInterval states how often the index state should be flushed to the database
|
||||
IndexFlushInterval time.Duration
|
||||
|
||||
@ -1157,34 +1159,45 @@ func (svc *Service) DebuggerAddIndex(role uint64, resource string, rules ...*Rul
|
||||
// Processing n stuff
|
||||
|
||||
func (svc *Service) watch(ctx context.Context) {
|
||||
tck := time.NewTicker(time.Minute * 5)
|
||||
|
||||
tInt := svc.cfg.IndexFlushInterval
|
||||
if tInt == 0 {
|
||||
tInt = time.Minute * 5
|
||||
}
|
||||
tTck := time.NewTicker(tInt)
|
||||
_ = tTck
|
||||
|
||||
t := time.NewTicker(time.Minute * 5)
|
||||
rexInt := svc.cfg.IndexFlushInterval
|
||||
flushInt := svc.cfg.IndexFlushInterval
|
||||
if flushInt == 0 {
|
||||
flushInt = time.Minute * 30
|
||||
}
|
||||
flushTck := time.NewTicker(flushInt)
|
||||
_ = flushTck
|
||||
|
||||
rexInt := svc.cfg.ReindexInterval
|
||||
if rexInt == 0 {
|
||||
rexInt = time.Minute * 30
|
||||
}
|
||||
rexTck := time.NewTicker(rexInt)
|
||||
_ = rexTck
|
||||
|
||||
rex := time.NewTicker(time.Minute * 30)
|
||||
|
||||
flshInt := svc.cfg.IndexFlushInterval
|
||||
if flshInt == 0 {
|
||||
flshInt = time.Minute * 5
|
||||
}
|
||||
tFlush := time.NewTicker(flshInt)
|
||||
defer func() {
|
||||
tck.Stop()
|
||||
tTck.Stop()
|
||||
flushTck.Stop()
|
||||
rexTck.Stop()
|
||||
}()
|
||||
|
||||
lg := svc.logger.Named("rbac service wrapper")
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-tck.C:
|
||||
lg.Info("tick")
|
||||
|
||||
case <-rex.C:
|
||||
case <-rexTck.C:
|
||||
lg.Info("reindex")
|
||||
|
||||
err := svc.updateWrapperIndex(ctx)
|
||||
@ -1192,7 +1205,7 @@ func (svc *Service) watch(ctx context.Context) {
|
||||
lg.Error("reindex failed", zap.Error(err))
|
||||
}
|
||||
|
||||
case <-tFlush.C:
|
||||
case <-flushTck.C:
|
||||
err := svc.cfg.FlushIndexState(ctx, svc.index.getIndexed())
|
||||
if err != nil {
|
||||
lg.Error("failed to flush the index state", zap.Error(err))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user