diff --git a/compose/service/automation_runner.go b/compose/service/automation_runner.go index 8904bae25..0734fb640 100644 --- a/compose/service/automation_runner.go +++ b/compose/service/automation_runner.go @@ -30,6 +30,7 @@ type ( automationScriptsFinder interface { Watch(ctx context.Context) + WatchScheduled(ctx context.Context, runner automation.DeferredAutomationRunner) FindRunnableScripts(resource, event string, cc ...automation.TriggerConditionChecker) automation.ScriptSet } @@ -66,6 +67,7 @@ func AutomationRunner(opt AutomationRunnerOpt, f automationScriptsFinder, r corr func (svc automationRunner) Watch(ctx context.Context) { svc.scriptFinder.Watch(ctx) + svc.scriptFinder.WatchScheduled(ctx, svc) } // BeforeRecordCreate - run scripts before record is created @@ -195,6 +197,31 @@ func (svc automationRunner) RecordManual(ctx context.Context, scriptID uint64, n return runner(script) } +// RecordDeferred - Deferred trigger run +func (svc automationRunner) RecordDeferred(ctx context.Context, script *automation.Script, ns *types.Namespace, m *types.Module, r *types.Record) (err error) { + if script == nil { + return errors.New("can not find compatible script") + } + + // Do not execute scripts without a user + if script.RunAs <= 0 { + return errors.New("can not execute deferred scripts without a bound user") + } + + // Do not execute UA scripts + if script.RunInUA { + return errors.New("can not execute user-agent scripts") + } + + // Make record script runner and + runner := svc.makeRecordScriptRunner(ctx, ns, m, r, false) + + // Run it with a script + // + // Successfully executed record scripts can have an effect on given record value (r) + return runner(script) +} + func (svc automationRunner) RecordScriptTester(ctx context.Context, source string, ns *types.Namespace, m *types.Module, r *types.Record) (err error) { // Make record script runner and runner := svc.makeRecordScriptRunner(ctx, ns, m, r, false) diff --git a/pkg/automation/service.go b/pkg/automation/service.go index 0adb04724..eecd10b4e 100644 --- a/pkg/automation/service.go +++ b/pkg/automation/service.go @@ -9,6 +9,7 @@ import ( "github.com/titpetric/factory" "go.uber.org/zap" + "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/internal/auth" "github.com/cortezaproject/corteza-server/pkg/sentry" ) @@ -42,6 +43,10 @@ type ( FilterByTrigger(event, resource string, cc ...TriggerConditionChecker) ScriptSet } + DeferredAutomationRunner interface { + RecordDeferred(ctx context.Context, script *Script, ns *types.Namespace, m *types.Module, r *types.Record) (err error) + } + TokenMaker func(context.Context, uint64) (string, error) WatcherService interface { @@ -111,31 +116,44 @@ func (svc *service) Watch(ctx context.Context) { } }() - go svc.runScheduled(ctx) - svc.logger.Debug("watcher initialized") } -// Runs scheduled scripts every minute -func (svc *service) runScheduled(ctx context.Context) { - // @todo make time.Minute - var ticker = time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - for _, scriptID := range svc.scheduled.pick() { - // @todo finish implementation, we need to inform someone - // about the scheduled script and that it should be executed - svc.logger.Debug( - "Running scheduled script", - zap.Uint64("scriptID", scriptID), - ) +// RunScheduled runs scheduled scripts periodically +func (svc *service) WatchScheduled(ctx context.Context, r DeferredAutomationRunner) { + go func() { + var ticker = time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, scriptID := range svc.scheduled.pick() { + // @todo parallelize this + svc.logger.Debug( + "Running scheduled script", + zap.Uint64("scriptID", scriptID), + ) + err := r.RecordDeferred(ctx, svc.runnables.FindByID(scriptID), nil, nil, nil) + if err != nil { + svc.logger.Error( + "Script failed to run", + zap.Uint64("scriptID", scriptID), + zap.Error(err), + ) + } else { + svc.logger.Debug( + "Ran scheduled script", + zap.Uint64("scriptID", scriptID), + ) + } + } } } - } + }() + + svc.logger.Debug("scheduled runner initialized") } func (svc *service) Reload() {