Run deferred scripts
This commit is contained in:
parent
b1d9fff457
commit
615c2337ab
@ -30,6 +30,7 @@ type (
|
||||
|
||||
automationScriptsFinder interface {
|
||||
Watch(ctx context.Context)
|
||||
WatchScheduled(ctx context.Context, runner automation.DeferredAutomationRunner)
|
||||
FindRunnableScripts(resource, event string, cc ...automation.TriggerConditionChecker) automation.ScriptSet
|
||||
}
|
||||
|
||||
@ -66,6 +67,7 @@ func AutomationRunner(opt AutomationRunnerOpt, f automationScriptsFinder, r corr
|
||||
|
||||
func (svc automationRunner) Watch(ctx context.Context) {
|
||||
svc.scriptFinder.Watch(ctx)
|
||||
svc.scriptFinder.WatchScheduled(ctx, svc)
|
||||
}
|
||||
|
||||
// BeforeRecordCreate - run scripts before record is created
|
||||
@ -195,6 +197,31 @@ func (svc automationRunner) RecordManual(ctx context.Context, scriptID uint64, n
|
||||
return runner(script)
|
||||
}
|
||||
|
||||
// RecordDeferred - Deferred trigger run
|
||||
func (svc automationRunner) RecordDeferred(ctx context.Context, script *automation.Script, ns *types.Namespace, m *types.Module, r *types.Record) (err error) {
|
||||
if script == nil {
|
||||
return errors.New("can not find compatible script")
|
||||
}
|
||||
|
||||
// Do not execute scripts without a user
|
||||
if script.RunAs <= 0 {
|
||||
return errors.New("can not execute deferred scripts without a bound user")
|
||||
}
|
||||
|
||||
// Do not execute UA scripts
|
||||
if script.RunInUA {
|
||||
return errors.New("can not execute user-agent scripts")
|
||||
}
|
||||
|
||||
// Make record script runner and
|
||||
runner := svc.makeRecordScriptRunner(ctx, ns, m, r, false)
|
||||
|
||||
// Run it with a script
|
||||
//
|
||||
// Successfully executed record scripts can have an effect on given record value (r)
|
||||
return runner(script)
|
||||
}
|
||||
|
||||
func (svc automationRunner) RecordScriptTester(ctx context.Context, source string, ns *types.Namespace, m *types.Module, r *types.Record) (err error) {
|
||||
// Make record script runner and
|
||||
runner := svc.makeRecordScriptRunner(ctx, ns, m, r, false)
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"github.com/titpetric/factory"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/compose/types"
|
||||
"github.com/cortezaproject/corteza-server/internal/auth"
|
||||
"github.com/cortezaproject/corteza-server/pkg/sentry"
|
||||
)
|
||||
@ -42,6 +43,10 @@ type (
|
||||
FilterByTrigger(event, resource string, cc ...TriggerConditionChecker) ScriptSet
|
||||
}
|
||||
|
||||
DeferredAutomationRunner interface {
|
||||
RecordDeferred(ctx context.Context, script *Script, ns *types.Namespace, m *types.Module, r *types.Record) (err error)
|
||||
}
|
||||
|
||||
TokenMaker func(context.Context, uint64) (string, error)
|
||||
|
||||
WatcherService interface {
|
||||
@ -111,31 +116,44 @@ func (svc *service) Watch(ctx context.Context) {
|
||||
}
|
||||
}()
|
||||
|
||||
go svc.runScheduled(ctx)
|
||||
|
||||
svc.logger.Debug("watcher initialized")
|
||||
}
|
||||
|
||||
// Runs scheduled scripts every minute
|
||||
func (svc *service) runScheduled(ctx context.Context) {
|
||||
// @todo make time.Minute
|
||||
var ticker = time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
for _, scriptID := range svc.scheduled.pick() {
|
||||
// @todo finish implementation, we need to inform someone
|
||||
// about the scheduled script and that it should be executed
|
||||
svc.logger.Debug(
|
||||
"Running scheduled script",
|
||||
zap.Uint64("scriptID", scriptID),
|
||||
)
|
||||
// RunScheduled runs scheduled scripts periodically
|
||||
func (svc *service) WatchScheduled(ctx context.Context, r DeferredAutomationRunner) {
|
||||
go func() {
|
||||
var ticker = time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
for _, scriptID := range svc.scheduled.pick() {
|
||||
// @todo parallelize this
|
||||
svc.logger.Debug(
|
||||
"Running scheduled script",
|
||||
zap.Uint64("scriptID", scriptID),
|
||||
)
|
||||
err := r.RecordDeferred(ctx, svc.runnables.FindByID(scriptID), nil, nil, nil)
|
||||
if err != nil {
|
||||
svc.logger.Error(
|
||||
"Script failed to run",
|
||||
zap.Uint64("scriptID", scriptID),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
svc.logger.Debug(
|
||||
"Ran scheduled script",
|
||||
zap.Uint64("scriptID", scriptID),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
svc.logger.Debug("scheduled runner initialized")
|
||||
}
|
||||
|
||||
func (svc *service) Reload() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user