From a94185a4817ebecafd1f0345b5cbc98e28cf3589 Mon Sep 17 00:00:00 2001 From: Denis Arh Date: Fri, 23 Aug 2019 08:06:02 +0200 Subject: [PATCH] Add (and disable) support for executing deferred scripts as scheduled --- pkg/automation/schedled_test.go | 89 +++++++++++++++++++++++++++++++++ pkg/automation/scheduled.go | 85 +++++++++++++++++++++++++++++++ pkg/automation/service.go | 36 ++++++++++++- 3 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 pkg/automation/schedled_test.go create mode 100644 pkg/automation/scheduled.go diff --git a/pkg/automation/schedled_test.go b/pkg/automation/schedled_test.go new file mode 100644 index 000000000..a30ef0856 --- /dev/null +++ b/pkg/automation/schedled_test.go @@ -0,0 +1,89 @@ +package automation + +import ( + "reflect" + "testing" + "time" +) + +func ss2tt(ss ...string) []time.Time { + tt := make([]time.Time, len(ss)) + for i, s := range ss { + tt[i], _ = time.Parse(time.RFC3339, s) + } + return tt +} + +func TestScheduleBuilder(t *testing.T) { + tests := []struct { + name string + ss ScriptSet + sch scheduledSet + }{ + {name: "basics", + ss: ScriptSet{ + &Script{ID: 1, Enabled: true, triggers: TriggerSet{ + &Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:02:00+02:00"}, + &Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:03:00+02:00"}, + }}, + &Script{ID: 2, Enabled: true, triggers: TriggerSet{ + &Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:02:00+02:00"}, + &Trigger{Enabled: true, Event: EVENT_TYPE_DEFERRED, Condition: "2000-01-01T00:03:00+02:00"}, + }}, + }, + + sch: scheduledSet{ + schedule{scriptID: 1, timestamps: ss2tt("2000-01-01T00:02:00+02:00", "2000-01-01T00:03:00+02:00")}, + schedule{scriptID: 2, timestamps: ss2tt("2000-01-01T00:02:00+02:00", "2000-01-01T00:03:00+02:00")}, + }, + }, + } + + n, _ := time.Parse(time.RFC3339, "2000-01-01T00:00:00+02:00") + now = func() time.Time { return n } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := buildScheduleList(tt.ss) + if !reflect.DeepEqual(out, tt.sch) { + t.Errorf("Result do not match %v %v", out, tt.sch) + } + }) + } +} + +func TestSchedulePicker(t *testing.T) { + tests := []struct { + name string + sch scheduledSet + ids []uint64 + }{ + {name: "one", + ids: []uint64{1}, + sch: scheduledSet{ + schedule{scriptID: 1, timestamps: ss2tt("2000-01-01T00:01:00+02:00", "2000-01-01T00:03:00+02:00")}, + schedule{scriptID: 2, timestamps: ss2tt("2000-01-01T00:04:00+02:00", "2000-01-01T00:05:00+02:00")}, + }, + }, + {name: "two", + ids: []uint64{1, 2}, + sch: scheduledSet{ + schedule{scriptID: 1, timestamps: ss2tt("2000-01-01T00:01:00+02:00")}, + schedule{scriptID: 2, timestamps: ss2tt("2000-01-01T00:01:00+02:00", "2000-01-01T00:05:00+02:00")}, + }, + }, + } + + n, _ := time.Parse(time.RFC3339, "2000-01-01T00:01:50+02:00") + now = func() time.Time { return n } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := tt.sch.pick() + + if !reflect.DeepEqual(out, tt.ids) { + t.Errorf("Result do not match %v %v", out, tt.ids) + } + }) + } +} diff --git a/pkg/automation/scheduled.go b/pkg/automation/scheduled.go new file mode 100644 index 000000000..62325ff34 --- /dev/null +++ b/pkg/automation/scheduled.go @@ -0,0 +1,85 @@ +package automation + +import ( + "time" +) + +type ( + // List of scheduled scripts with schedule time + scheduledSet []schedule + + // ref to script & list of timestamps when we should run this + schedule struct { + scriptID uint64 + timestamps []time.Time + // intervals []interface{} + } +) + +var ( + now = func() time.Time { return time.Now() } +) + +// scans runnables and builds a set of scheduled scripts +func buildScheduleList(runables ScriptSet) scheduledSet { + set := scheduledSet{} + + _ = runables.Walk(func(s *Script) error { + sch := schedule{scriptID: s.ID} + for _, t := range s.triggers { + if !t.IsDeferred() { + // only interested in deferred scripts + continue + } + + if t.Condition == "" { + continue + } + + if ts, err := time.Parse(time.RFC3339, t.Condition); err == nil { + ts = ts.Truncate(time.Minute) + if ts.Before(now()) { + // in the past... + continue + } + + sch.timestamps = append(sch.timestamps, ts) + continue + } + + // @todo parse cron format and fill intervals + } + + // If there is anything useful in the schedule, + // add it to the list + if len(sch.timestamps) > 0 { + set = append(set, sch) + } + + return nil + }) + + return set +} + +// scans scheduled set and picks out all candidates that +// are scheduled this minute +// +// Script is executed only once, even if +// is scheduled multiple times, +func (set scheduledSet) pick() []uint64 { + uu := []uint64{} + thisMinute := now().Truncate(time.Minute) + for _, s := range set { + for _, t := range s.timestamps { + if thisMinute.Equal(t) { + uu = append(uu, s.scriptID) + break + } + } + + // @todo pick from intervals + } + + return uu +} diff --git a/pkg/automation/service.go b/pkg/automation/service.go index 1625ff989..4b64aab60 100644 --- a/pkg/automation/service.go +++ b/pkg/automation/service.go @@ -26,6 +26,9 @@ type ( // internal list of runnable scripts (and their accompanying triggers) runnables ScriptSet + // internal list of scheduled scripts + scheduled scheduledSet + // turns user-id (rel_runner / runAs) into valid credentials (JWT) makeToken TokenMaker @@ -107,9 +110,33 @@ func (svc *service) Watch(ctx context.Context) { } }() + // @todo enable when deferred scripts can be execured + // go svc.runScheduled(ctx) + svc.logger.Debug("watcher initialized") } +// Runs scheduled scripts every minute +func (svc *service) runScheduled(ctx context.Context) { + var ticker = time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, scriptID := range svc.scheduled.pick() { + // @todo finish implementation, we need to inform someone + // about the scheduled script and that it should be executed + svc.logger.Debug( + "Running scheduled script", + zap.Uint64("scriptID", scriptID), + ) + } + } + } +} + func (svc *service) Reload() { go func() { select { @@ -167,7 +194,7 @@ func (svc *service) reload(ctx context.Context) { return nil }) - return tt.Walk(func(t *Trigger) error { + _ = tt.Walk(func(t *Trigger) error { s := svc.runnables.FindByID(t.ScriptID) if s != nil && t.IsValid() && s.CheckCompatibility(t) == nil { // Add only compatible triggers @@ -176,6 +203,13 @@ func (svc *service) reload(ctx context.Context) { return nil }) + + // update scheduled list + // @todo enable when deferred scripts can be execured + // svc.scheduled = buildScheduleList(svc.runnables) + // svc.logger.Info("deferred scripts scheduled", zap.Int("count", len(svc.scheduled))) + + return }) }