3
0

Add (and disable) support for executing deferred scripts as scheduled

This commit is contained in:
Denis Arh 2019-08-23 08:06:02 +02:00
parent e4f0c5a3b6
commit a94185a481
3 changed files with 209 additions and 1 deletions

View File

@ -0,0 +1,89 @@
package automation
import (
"reflect"
"testing"
"time"
)
func ss2tt(ss ...string) []time.Time {
tt := make([]time.Time, len(ss))
for i, s := range ss {
tt[i], _ = time.Parse(time.RFC3339, s)
}
return tt
}
func TestScheduleBuilder(t *testing.T) {
tests := []struct {
name string
ss ScriptSet
sch scheduledSet
}{
{name: "basics",
ss: ScriptSet{
&Script{ID: 1, Enabled: true, triggers: TriggerSet{
&Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:02:00+02:00"},
&Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:03:00+02:00"},
}},
&Script{ID: 2, Enabled: true, triggers: TriggerSet{
&Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:02:00+02:00"},
&Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:03:00+02:00"},
}},
},
sch: scheduledSet{
schedule{scriptID: 1, timestamps: ss2tt("2000-01-01T00:02:00+02:00", "2000-01-01T00:03:00+02:00")},
schedule{scriptID: 2, timestamps: ss2tt("2000-01-01T00:02:00+02:00", "2000-01-01T00:03:00+02:00")},
},
},
}
n, _ := time.Parse(time.RFC3339, "2000-01-01T00:00:00+02:00")
now = func() time.Time { return n }
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out := buildScheduleList(tt.ss)
if !reflect.DeepEqual(out, tt.sch) {
t.Errorf("Result do not match %v %v", out, tt.sch)
}
})
}
}
func TestSchedulePicker(t *testing.T) {
tests := []struct {
name string
sch scheduledSet
ids []uint64
}{
{name: "one",
ids: []uint64{1},
sch: scheduledSet{
schedule{scriptID: 1, timestamps: ss2tt("2000-01-01T00:01:00+02:00", "2000-01-01T00:03:00+02:00")},
schedule{scriptID: 2, timestamps: ss2tt("2000-01-01T00:04:00+02:00", "2000-01-01T00:05:00+02:00")},
},
},
{name: "two",
ids: []uint64{1, 2},
sch: scheduledSet{
schedule{scriptID: 1, timestamps: ss2tt("2000-01-01T00:01:00+02:00")},
schedule{scriptID: 2, timestamps: ss2tt("2000-01-01T00:01:00+02:00", "2000-01-01T00:05:00+02:00")},
},
},
}
n, _ := time.Parse(time.RFC3339, "2000-01-01T00:01:50+02:00")
now = func() time.Time { return n }
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out := tt.sch.pick()
if !reflect.DeepEqual(out, tt.ids) {
t.Errorf("Result do not match %v %v", out, tt.ids)
}
})
}
}

View File

@ -0,0 +1,85 @@
package automation
import (
"time"
)
type (
// List of scheduled scripts with schedule time
scheduledSet []schedule
// ref to script & list of timestamps when we should run this
schedule struct {
scriptID uint64
timestamps []time.Time
// intervals []interface{}
}
)
var (
now = func() time.Time { return time.Now() }
)
// scans runnables and builds a set of scheduled scripts
func buildScheduleList(runables ScriptSet) scheduledSet {
set := scheduledSet{}
_ = runables.Walk(func(s *Script) error {
sch := schedule{scriptID: s.ID}
for _, t := range s.triggers {
if !t.IsDeferred() {
// only interested in deferred scripts
continue
}
if t.Condition == "" {
continue
}
if ts, err := time.Parse(time.RFC3339, t.Condition); err == nil {
ts = ts.Truncate(time.Minute)
if ts.Before(now()) {
// in the past...
continue
}
sch.timestamps = append(sch.timestamps, ts)
continue
}
// @todo parse cron format and fill intervals
}
// If there is anything useful in the schedule,
// add it to the list
if len(sch.timestamps) > 0 {
set = append(set, sch)
}
return nil
})
return set
}
// scans scheduled set and picks out all candidates that
// are scheduled this minute
//
// Script is executed only once, even if
// is scheduled multiple times,
func (set scheduledSet) pick() []uint64 {
uu := []uint64{}
thisMinute := now().Truncate(time.Minute)
for _, s := range set {
for _, t := range s.timestamps {
if thisMinute.Equal(t) {
uu = append(uu, s.scriptID)
break
}
}
// @todo pick from intervals
}
return uu
}

View File

@ -26,6 +26,9 @@ type (
// internal list of runnable scripts (and their accompanying triggers)
runnables ScriptSet
// internal list of scheduled scripts
scheduled scheduledSet
// turns user-id (rel_runner / runAs) into valid credentials (JWT)
makeToken TokenMaker
@ -107,9 +110,33 @@ func (svc *service) Watch(ctx context.Context) {
}
}()
// @todo enable when deferred scripts can be execured
// go svc.runScheduled(ctx)
svc.logger.Debug("watcher initialized")
}
// Runs scheduled scripts every minute
func (svc *service) runScheduled(ctx context.Context) {
var ticker = time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, scriptID := range svc.scheduled.pick() {
// @todo finish implementation, we need to inform someone
// about the scheduled script and that it should be executed
svc.logger.Debug(
"Running scheduled script",
zap.Uint64("scriptID", scriptID),
)
}
}
}
}
func (svc *service) Reload() {
go func() {
select {
@ -167,7 +194,7 @@ func (svc *service) reload(ctx context.Context) {
return nil
})
return tt.Walk(func(t *Trigger) error {
_ = tt.Walk(func(t *Trigger) error {
s := svc.runnables.FindByID(t.ScriptID)
if s != nil && t.IsValid() && s.CheckCompatibility(t) == nil {
// Add only compatible triggers
@ -176,6 +203,13 @@ func (svc *service) reload(ctx context.Context) {
return nil
})
// update scheduled list
// @todo enable when deferred scripts can be execured
// svc.scheduled = buildScheduleList(svc.runnables)
// svc.logger.Info("deferred scripts scheduled", zap.Int("count", len(svc.scheduled)))
return
})
}