3
0

Refactored workflow converter, add issues

This commit is contained in:
Denis Arh 2021-02-12 06:49:07 +01:00
parent 5438097648
commit a0ecf0537f
11 changed files with 804 additions and 581 deletions

View File

@ -2,7 +2,6 @@ package service
import (
"context"
"fmt"
"github.com/cortezaproject/corteza-server/automation/types"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"
@ -118,29 +117,28 @@ func (svc *session) suspendAll(ctx context.Context) error {
// It does not check user's permissions to execute workflow(s) so it should be used only when !
func (svc *session) Start(g *wfexec.Graph, i auth.Identifiable, ssp types.SessionStartParams) (wait WaitFn, err error) {
var (
ctx = auth.SetIdentityToContext(context.Background(), i)
ses = svc.spawn(g, ssp.Trace)
start wfexec.Step
)
wait = func(ctx context.Context) (*expr.Vars, error) {
return &expr.Vars{}, nil // no-op...
}
if ssp.StepID == 0 {
// starting step is not explicitly workflows on trigger, find orphan step
switch oo := g.Orphans(); len(oo) {
case 1:
start = oo[0]
case 0:
return nil, fmt.Errorf("could not find step without parents")
return nil, errors.InvalidData("could not find starting step")
default:
return nil, fmt.Errorf("multiple steps without parents")
return nil, errors.InvalidData("can not start workflow session multiple starting steps found")
}
} else if start = g.StepByID(ssp.StepID); start == nil {
return nil, fmt.Errorf("trigger staring step references nonexisting step")
return nil, errors.InvalidData("trigger staring step references nonexisting step")
}
var (
ctx = auth.SetIdentityToContext(context.Background(), i)
ses = svc.spawn(g, ssp.Trace)
)
ses.CreatedAt = *now()
ses.CreatedBy = i.Identity()
ses.Apply(ssp)

View File

@ -437,26 +437,25 @@ func (svc *trigger) registerWorkflow(ctx context.Context, wf *types.Workflow, tt
// It preloads run-as identity and finds a starting step for each trigger
func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable, tt ...*types.Trigger) {
var (
err error
g *wfexec.Graph
wfLog = svc.log.
WithOptions(zap.AddStacktrace(zap.DPanicLevel)).
With(zap.Uint64("workflowID", wf.ID))
handlerFn eventbus.HandlerFn
err error
g *wfexec.Graph
issues types.WorkflowIssueSet
wfLog = svc.log.
WithOptions(zap.AddStacktrace(zap.DPanicLevel)).
With(zap.Uint64("workflowID", wf.ID))
// register only enabled, undeleted workflows
registerWorkflow = wf.Enabled || wf.DeletedAt == nil
)
if !wf.Enabled {
wfLog.Debug("skipping disabled workflow")
return
}
if wf.DeletedAt != nil {
wfLog.Debug("skipping deleted workflow")
return
}
if g, err = svc.workflow.toGraph(wf); err != nil {
wfLog.Error("failed to convert workflow to graph", zap.Error(err))
return
// convert only registerable and issuless workflwos
if registerWorkflow && len(wf.Issues) == 0 {
// Convert workflow only when valid (no issues, enable, not delete)
if g, issues = Convert(svc.workflow, wf); len(issues) > 0 {
wfLog.Error("failed to convert workflow to graph", zap.Error(issues))
g = nil
}
}
defer svc.mux.Unlock()
@ -465,85 +464,38 @@ func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable
for _, t := range tt {
log := wfLog.With(zap.Uint64("triggerID", t.ID))
if !t.Enabled {
log.Debug("skipping disabled trigger")
continue
// always unregister
if svc.reg[wf.ID] == nil {
svc.reg[wf.ID] = make(map[uint64]uintptr)
} else if ptr := svc.reg[wf.ID][t.ID]; ptr != 0 {
// unregister handlers for this trigger if they exist
svc.eventbus.Unregister(ptr)
}
if t.DeletedAt != nil {
log.Debug("skipping deleted trigger")
// do not register disabled or deleted triggers
if !registerWorkflow || !t.Enabled || t.DeletedAt != nil {
continue
}
var (
handler = func(ctx context.Context, ev eventbus.Event) (err error) {
var (
// create session scope from predefined workflow scope and trigger input
scope = wf.Scope.Merge(t.Input)
evScope *expr.Vars
wait WaitFn
)
if enc, is := ev.(varsEncoder); is {
if evScope, err = enc.EncodeVars(); err != nil {
return
}
scope = scope.Merge(evScope)
}
_ = scope.AssignFieldValue("eventType", expr.Must(expr.NewString(ev.EventType())))
_ = scope.AssignFieldValue("resourceType", expr.Must(expr.NewString(ev.ResourceType())))
if runAs == nil {
// @todo can/should we get alternative identity from Event?
// for example:
// - use http auth header and get username
// - use from/to/replyTo and use that as an identifier
runAs = auth.GetIdentityFromContext(ctx)
}
log.Debug("handling triggered workflow",
zap.Any("event", ev),
zap.Uint64("runAs", runAs.Identity()),
)
wait, err = svc.session.Start(g, runAs, types.SessionStartParams{
WorkflowID: wf.ID,
KeepFor: wf.KeepSessions,
Trace: wf.Trace,
Input: scope,
StepID: t.StepID,
EventType: t.EventType,
ResourceType: t.ResourceType,
})
if err != nil {
log.Error("workflow error", zap.Error(err))
return err
}
// wait for the workflow to complete
// reuse scope for results
// this will be decoded back to event properties
scope, err = wait(ctx)
if err != nil {
return
}
if dec, is := ev.(varsDecoder); is {
return dec.DecodeVars(scope)
}
return nil
}
ops = make([]eventbus.HandlerRegOp, 0, len(t.Constraints)+2)
cnstr eventbus.ConstraintMatcher
err error
ops = make([]eventbus.HandlerRegOp, 0, len(t.Constraints)+2)
)
if g == nil {
handlerFn = func(_ context.Context, ev eventbus.Event) error {
return errors.Internal(
"trigger %s on %s failed due to invalid workflow %d: %w",
ev.EventType(),
ev.ResourceType(),
wf.ID,
wf.Issues,
)
}
} else {
handlerFn = makeWorkflowHandler(svc.session, t, wf, g, runAs)
}
ops = append(
ops,
eventbus.On(t.EventType),
@ -562,14 +514,7 @@ func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable
}
}
if svc.reg[wf.ID] == nil {
svc.reg[wf.ID] = make(map[uint64]uintptr)
} else if ptr := svc.reg[wf.ID][t.ID]; ptr != 0 {
// unregister handlers for this trigger if they exist
svc.eventbus.Unregister(ptr)
}
svc.reg[wf.ID][t.ID] = svc.eventbus.Register(handler, ops...)
svc.reg[wf.ID][t.ID] = svc.eventbus.Register(handlerFn, ops...)
log.Debug("trigger registered",
zap.String("eventType", t.EventType),
@ -611,6 +556,65 @@ func (svc *trigger) unregisterTriggers(tt ...*types.Trigger) {
}
}
func makeWorkflowHandler(s *session, t *types.Trigger, wf *types.Workflow, g *wfexec.Graph, runAs auth.Identifiable) eventbus.HandlerFn {
return func(ctx context.Context, ev eventbus.Event) (err error) {
var (
// create session scope from predefined workflow scope and trigger input
scope = wf.Scope.Merge(t.Input)
evScope *expr.Vars
wait WaitFn
)
if enc, is := ev.(varsEncoder); is {
if evScope, err = enc.EncodeVars(); err != nil {
return
}
scope = scope.Merge(evScope)
}
_ = scope.AssignFieldValue("eventType", expr.Must(expr.NewString(ev.EventType())))
_ = scope.AssignFieldValue("resourceType", expr.Must(expr.NewString(ev.ResourceType())))
if runAs == nil {
// @todo can/should we get alternative identity from Event?
// for example:
// - use http auth header and get username
// - use from/to/replyTo and use that as an identifier
runAs = auth.GetIdentityFromContext(ctx)
}
wait, err = s.Start(g, runAs, types.SessionStartParams{
WorkflowID: wf.ID,
KeepFor: wf.KeepSessions,
Trace: wf.Trace,
Input: scope,
StepID: t.StepID,
EventType: t.EventType,
ResourceType: t.ResourceType,
})
if err != nil {
return err
}
// wait for the workflow to complete
// reuse scope for results
// this will be decoded back to event properties
scope, err = wait(ctx)
if err != nil {
return
}
if dec, is := ev.(varsDecoder); is {
return dec.DecodeVars(scope)
}
return nil
}
}
func loadTrigger(ctx context.Context, s store.Storer, workflowID uint64) (res *types.Trigger, err error) {
if workflowID == 0 {
return nil, TriggerErrInvalidID()

View File

@ -2,7 +2,6 @@ package service
import (
"context"
"fmt"
"github.com/cortezaproject/corteza-server/automation/types"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
intAuth "github.com/cortezaproject/corteza-server/pkg/auth"
@ -17,7 +16,6 @@ import (
"github.com/cortezaproject/corteza-server/store"
"go.uber.org/zap"
"reflect"
"strings"
"sync"
)
@ -187,10 +185,6 @@ func (svc *workflow) Create(ctx context.Context, new *types.Workflow) (wf *types
return err
}
if err = validateSteps(new.Steps...); err != nil {
return
}
wf = &types.Workflow{
ID: nextID(),
Handle: new.Handle,
@ -211,6 +205,8 @@ func (svc *workflow) Create(ctx context.Context, new *types.Workflow) (wf *types
CreatedBy: cUser,
}
_, wf.Issues = Convert(svc, wf)
if err = store.CreateAutomationWorkflow(ctx, s, wf); err != nil {
return
}
@ -264,7 +260,7 @@ func (svc workflow) uniqueCheck(ctx context.Context, res *types.Workflow) (err e
return nil
}
func (svc workflow) updater(ctx context.Context, workflowID uint64, action func(...*workflowActionProps) *workflowAction, fn workflowUpdateHandler) (*types.Workflow, error) {
func (svc *workflow) updater(ctx context.Context, workflowID uint64, action func(...*workflowActionProps) *workflowAction, fn workflowUpdateHandler) (*types.Workflow, error) {
var (
changes workflowChanges
res *types.Workflow
@ -297,9 +293,12 @@ func (svc workflow) updater(ctx context.Context, workflowID uint64, action func(
}
if changes&workflowDefChanged > 0 {
if err = svc.triggers.registerWorkflows(ctx, res); err != nil {
return err
if _, res.Issues = Convert(svc, res); len(res.Issues) == 0 {
if err = svc.triggers.registerWorkflows(ctx, res); err != nil {
return err
}
}
}
if changes&workflowLabelsChanged > 0 {
@ -375,10 +374,6 @@ func (svc workflow) handleUpdate(upd *types.Workflow) workflowUpdateHandler {
if upd.Steps != nil {
if !reflect.DeepEqual(upd.Steps, res.Steps) {
if err = validateSteps(upd.Steps...); err != nil {
return
}
changes |= workflowChanged | workflowDefChanged
res.Steps = upd.Steps
}
@ -452,466 +447,6 @@ func (svc *workflow) Load(ctx context.Context) error {
return svc.triggers.registerWorkflows(ctx, wwf...)
}
// Converts workflow definition to wf execution graph
func (svc *workflow) toGraph(def *types.Workflow) (*wfexec.Graph, error) {
var (
g = wfexec.NewGraph()
)
for g.Len() < len(def.Steps) {
progress := false
for _, step := range def.Steps {
if g.StepByID(step.ID) != nil {
// resolved
continue
}
if step.Kind == types.WorkflowStepKindVisual {
// make sure visual steps are skipped
continue
}
// Collect all incoming and outgoing paths
inPaths := make([]*types.WorkflowPath, 0, 8)
outPaths := make([]*types.WorkflowPath, 0, 8)
for _, path := range def.Paths {
if path.ChildID == step.ID {
inPaths = append(inPaths, path)
} else if path.ParentID == step.ID {
outPaths = append(outPaths, path)
}
}
if resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths); err != nil {
return nil, err
} else if resolved {
progress = true
}
}
if !progress {
// nothing resolved
return nil, errors.Internal("failed to resolve workflow step dependencies")
}
}
for _, path := range def.Paths {
if g.StepByID(path.ChildID) == nil {
return nil, errors.Internal("failed to resolve step with ID %d", path.ChildID)
}
if g.StepByID(path.ParentID) == nil {
return nil, errors.Internal("failed to resolve step with ID %d", path.ParentID)
}
g.AddParent(
g.StepByID(path.ChildID),
g.StepByID(path.ParentID),
)
}
return g, nil
}
// converts all step definitions into workflow.Step instances
//
// if this func returns nil for step and error, assume unresolved dependencies
func (svc *workflow) workflowStepDefConv(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (bool, error) {
conv, err := func() (wfexec.Step, error) {
switch s.Kind {
case types.WorkflowStepKindVisual:
return nil, nil
case types.WorkflowStepKindDebug:
return svc.convDebugStep(s)
case types.WorkflowStepKindExpressions:
return svc.convExpressionStep(s)
case types.WorkflowStepKindGateway:
return svc.convGateway(g, s, in, out)
case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator:
return svc.convFunctionStep(g, s, out)
case types.WorkflowStepKindError:
return svc.convErrorStep(s, out)
case types.WorkflowStepKindTermination:
return svc.convTerminationStep(out)
case types.WorkflowStepKindPrompt:
return svc.convPromptStep(s)
case types.WorkflowStepKindErrHandler:
return svc.convErrorHandlerStep(g, out)
default:
return nil, errors.Internal("unsupported step kind %q", s.Kind)
}
}()
if err != nil {
return false, err
} else if conv != nil {
conv.SetID(s.ID)
g.AddStep(conv)
return true, err
} else {
// signal caller that we were unable to
// resolve definition at the moment
return false, nil
}
}
func (svc *workflow) convGateway(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (wfexec.Step, error) {
switch s.Ref {
case "fork":
return wfexec.ForkGateway(), nil
case "join":
var (
ss []wfexec.Step
)
for _, p := range in {
if parent := g.StepByID(p.ParentID); parent != nil {
ss = append(ss, parent)
} else {
// unresolved parent, come back later.
return nil, nil
}
}
return wfexec.JoinGateway(ss...), nil
case "incl", "excl":
var (
pp []*wfexec.GatewayPath
)
for _, c := range out {
child := g.StepByID(c.ChildID)
if child == nil {
return nil, nil
}
if len(c.Expr) > 0 {
if err := svc.parser.ParseEvaluators(c); err != nil {
return nil, err
}
}
// wrapping with fn to make sure that we're dealing with the right wf path inside gw-path tester fn
err := func(c types.WorkflowPath) error {
p, err := wfexec.NewGatewayPath(child, func(ctx context.Context, scope *expr.Vars) (bool, error) {
if len(c.Expr) == 0 {
return true, nil
}
return c.Test(ctx, scope)
})
if err != nil {
return err
}
pp = append(pp, p)
return nil
}(*c)
if err != nil {
return nil, err
}
}
if s.Ref == "excl" {
return wfexec.ExclGateway(pp...)
} else {
return wfexec.InclGateway(pp...)
}
}
return nil, fmt.Errorf("unknown gateway type")
}
func (svc *workflow) convErrorHandlerStep(g *wfexec.Graph, out []*types.WorkflowPath) (wfexec.Step, error) {
switch len(out) {
case 0:
return nil, fmt.Errorf("expecting at least one path out of error handling step")
case 1:
// remove error handler
return types.ErrorHandlerStep(nil), nil
case 2:
errorHandler := g.StepByID(out[1].ChildID)
if errorHandler == nil {
// wait for it to be resolved
return nil, nil
}
return types.ErrorHandlerStep(errorHandler), nil
default:
// this might be extended in the future to allow different paths using expression
// but then again, this can be solved by gateway path following the error handling step
return nil, fmt.Errorf("max 2 paths out of error handling step")
}
}
func (svc *workflow) convExpressionStep(s *types.WorkflowStep) (wfexec.Step, error) {
if err := svc.parseExpressions(s.Arguments...); err != nil {
return nil, err
}
return types.ExpressionsStep(s.Arguments...), nil
}
// internal debug step that can log entire
func (svc *workflow) convDebugStep(s *types.WorkflowStep) (wfexec.Step, error) {
if err := svc.parseExpressions(s.Arguments...); err != nil {
return nil, err
}
return types.DebugStep(svc.log), nil
}
func (svc *workflow) convFunctionStep(g *wfexec.Graph, s *types.WorkflowStep, out []*types.WorkflowPath) (wfexec.Step, error) {
if s.Ref == "" {
return nil, errors.Internal("function reference missing")
}
reg := Registry()
if def := reg.Function(s.Ref); def == nil {
return nil, errors.Internal("unknown function %q", s.Ref)
} else {
if def.Kind != string(s.Kind) {
return nil, fmt.Errorf("unexpected %s on %s step", def.Kind, s.Kind)
}
var (
err error
isIterator = def.Kind == types.FunctionKindIterator
)
if isIterator {
if len(out) != 2 {
return nil, fmt.Errorf("expecting exactly two paths (next, exit) out of iterator function step")
}
if def.Iterator == nil {
return nil, errors.Internal("iterator handler for %q not set", s.Ref)
}
} else {
if def.Handler == nil {
return nil, errors.Internal("function handler for %q not set", s.Ref)
}
}
if err = svc.parseExpressions(s.Arguments...); err != nil {
return nil, errors.Internal("failed to parse argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
} else if err = def.Parameters.VerifyArguments(s.Arguments); err != nil {
return nil, errors.Internal("failed to verify argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
}
if err = svc.parseExpressions(s.Results...); err != nil {
return nil, errors.Internal("failed to parse result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
} else if err = def.Results.VerifyResults(s.Results); err != nil {
return nil, errors.Internal("failed to verify result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
}
if isIterator {
var (
next = g.StepByID(out[0].ChildID)
exit = g.StepByID(out[1].ChildID)
)
if next == nil || exit == nil {
// wait for steps to be resolved
return nil, nil
}
return types.IteratorStep(def, s.Arguments, s.Results, next, exit)
} else {
return types.FunctionStep(def, s.Arguments, s.Results)
}
}
}
// creates error step
//
// Expects ZERO outgoing paths and
func (svc *workflow) convErrorStep(s *types.WorkflowStep, out types.WorkflowPathSet) (wfexec.Step, error) {
const (
argName = "message"
)
if len(out) > 0 {
return nil, errors.Internal("error step must be last step in branch")
}
var (
args = types.ExprSet(s.Arguments)
)
if msgArg := args.GetByTarget(argName); msgArg == nil {
return nil, errors.Internal("error step must have %s argument", argName)
} else if msgArg.Type != (expr.String{}).Type() {
return nil, errors.Internal("%s argument on error step must be string, got type '%s'", argName, msgArg.Type)
} else if len(args) > 1 {
return nil, errors.Internal("too many arguments on error step")
}
if err := svc.parseExpressions(args...); err != nil {
return nil, err
}
return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) {
var (
msg string
result, err = args.Eval(ctx, r.Scope)
)
if err != nil {
return nil, err
}
if result.Has(argName) {
str, _ := expr.NewString(expr.Must(result.Select(argName)))
msg = str.GetValue()
} else {
if aux, is := args.GetByTarget(argName).Value.(string); is {
msg = aux
} else {
msg = "ERROR"
}
}
return nil, errors.Automation(msg)
}), nil
}
// converts prompt definition to wfexec.Step
func (svc *workflow) convTerminationStep(out types.WorkflowPathSet) (wfexec.Step, error) {
if len(out) > 0 {
return nil, errors.Internal("termination step must be last step in branch")
}
return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) {
return wfexec.Termination(), nil
}), nil
}
// converts prompt definition to wfexec.Step
func (svc *workflow) convPromptStep(s *types.WorkflowStep) (wfexec.Step, error) {
if err := svc.parseExpressions(s.Arguments...); err != nil {
return nil, err
}
// Use expression step as base for prompt step
return types.PromptStep(s.Ref, types.ExpressionsStep(s.Arguments...)), nil
}
func (svc *workflow) parseExpressions(ee ...*types.Expr) (err error) {
for _, e := range ee {
if len(strings.TrimSpace(e.Expr)) > 0 {
if err = svc.parser.ParseEvaluators(e); err != nil {
return
}
}
if err = e.SetType(exprTypeSetter(svc.reg, e)); err != nil {
return err
}
for _, t := range e.Tests {
if err = svc.parser.ParseEvaluators(t); err != nil {
return
}
}
}
return nil
}
func validateSteps(ss ...*types.WorkflowStep) error {
var (
IDs = make(map[uint64]int)
noArgs = func(i int, s *types.WorkflowStep) error {
if len(s.Arguments) > 0 {
return errors.Internal("%s step (ID=%d, position=%d) does not accept arguments", s.Kind, s.ID, i)
}
return nil
}
reqArgs = func(i int, s *types.WorkflowStep) error {
if len(s.Arguments) == 0 {
return errors.Internal("%s step (ID=%d, position=%d) require defined arguments", s.Kind, s.ID, i)
}
return nil
}
noResults = func(i int, s *types.WorkflowStep) error {
if len(s.Results) > 0 {
return errors.Internal("%s step (ID=%d, position=%d) does not accept results", s.Kind, s.ID, i)
}
return nil
}
checks = make([]func(i int, s *types.WorkflowStep) error, 0)
)
for i, s := range ss {
if p, has := IDs[s.ID]; has {
return fmt.Errorf("duplicate step ID (%d) used for steps on positions %d and %d", s.ID, p, i)
}
IDs[s.ID] = i
checks = nil
switch s.Kind {
case types.WorkflowStepKindErrHandler:
case types.WorkflowStepKindDebug:
checks = append(checks, noResults)
case types.WorkflowStepKindVisual:
checks = append(checks, noArgs, noResults)
case types.WorkflowStepKindExpressions:
checks = append(checks, reqArgs, noResults)
case types.WorkflowStepKindGateway:
checks = append(checks, noArgs, noResults)
case types.WorkflowStepKindError:
checks = append(checks, noResults)
case types.WorkflowStepKindTermination:
checks = append(checks, noArgs, noResults)
case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator:
case types.WorkflowStepKindPrompt:
checks = append(checks, noResults)
default:
return errors.Internal("unknown step kind (ID=%d, position=%d)", s.ID, i)
}
for _, check := range checks {
if err := check(i, s); err != nil {
return err
}
}
}
return nil
}
func loadWorkflow(ctx context.Context, s store.Storer, workflowID uint64) (res *types.Workflow, err error) {
if workflowID == 0 {
return nil, WorkflowErrInvalidID()

View File

@ -0,0 +1,531 @@
package service
import (
"context"
"fmt"
"github.com/cortezaproject/corteza-server/automation/types"
"github.com/cortezaproject/corteza-server/pkg/errors"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/cortezaproject/corteza-server/pkg/wfexec"
"go.uber.org/zap"
"strings"
)
type (
workflowConverter struct {
// workflow function registry
reg *registry
parser expr.Parsable
log *zap.Logger
}
)
func Convert(wfService *workflow, wf *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) {
conv := &workflowConverter{
reg: wfService.reg,
parser: wfService.parser,
log: wfService.log,
}
return conv.makeGraph(wf)
}
// Converts workflow definition to wf execution graph
func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) {
var (
g = wfexec.NewGraph()
wfii = types.WorkflowIssueSet{}
IDs = make(map[uint64]int)
lastResStep *types.WorkflowStep
)
// Basic step verification
for i, s := range def.Steps {
if _, has := IDs[s.ID]; has {
wfii = wfii.Append(
fmt.Errorf("step ID not unique"),
map[string]int{"step": i, "duplicate": IDs[s.ID]},
)
} else {
IDs[s.ID] = i
}
}
// if we have one or more duplicated-id error we need to abort right away
// because all further step positions in issue culprit will be invalid
if len(wfii) > 0 {
return nil, wfii
}
for g.Len() < len(def.Steps) {
progress := false
lastResStep = nil
for _, step := range def.Steps {
lastResStep = step
if g.StepByID(step.ID) != nil {
// resolved
continue
}
stepIssues := verifyStep(step)
if step.Kind == types.WorkflowStepKindVisual {
// make sure visual steps are skipped
continue
}
// Collect all incoming and outgoing paths
inPaths := make([]*types.WorkflowPath, 0, 8)
outPaths := make([]*types.WorkflowPath, 0, 8)
for _, path := range def.Paths {
if path.ChildID == step.ID {
inPaths = append(inPaths, path)
} else if path.ParentID == step.ID {
outPaths = append(outPaths, path)
}
}
if resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths); err != nil {
switch aux := err.(type) {
case types.WorkflowIssueSet:
stepIssues = append(stepIssues, aux...)
continue
case error:
stepIssues = stepIssues.Append(err, nil)
}
} else if resolved {
progress = true
}
wfii = append(wfii, stepIssues.SetCulprit("step", IDs[step.ID])...)
}
if !progress {
var culprit = make(map[string]int)
if lastResStep != nil {
culprit = map[string]int{"step": IDs[lastResStep.ID]}
}
// nothing resolved for 1 cycle
wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit)
break
}
}
for pos, path := range def.Paths {
if g.StepByID(path.ChildID) == nil {
wfii = wfii.Append(fmt.Errorf("failed to resolve step with ID %d", path.ChildID), map[string]int{"path": pos})
continue
}
if g.StepByID(path.ParentID) == nil {
wfii = wfii.Append(fmt.Errorf("failed to resolve step with ID %d", path.ParentID), map[string]int{"path": pos})
continue
}
if len(wfii) > 0 {
// pointless to fill the if there are errors
continue
}
g.AddParent(
g.StepByID(path.ChildID),
g.StepByID(path.ParentID),
)
}
if len(wfii) > 0 {
return nil, wfii
}
return g, nil
}
// converts all step definitions into workflow.Step instances
//
// if this func returns nil for step and error, assume unresolved dependencies
func (svc workflowConverter) workflowStepDefConv(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (bool, error) {
conv, err := func() (wfexec.Step, error) {
switch s.Kind {
case types.WorkflowStepKindVisual:
return nil, nil
case types.WorkflowStepKindDebug:
return svc.convDebugStep(s)
case types.WorkflowStepKindExpressions:
return svc.convExpressionStep(s)
case types.WorkflowStepKindGateway:
return svc.convGateway(g, s, in, out)
case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator:
return svc.convFunctionStep(g, s, out)
case types.WorkflowStepKindError:
return svc.convErrorStep(s, out)
case types.WorkflowStepKindTermination:
return svc.convTerminationStep(out)
case types.WorkflowStepKindPrompt:
return svc.convPromptStep(s)
case types.WorkflowStepKindErrHandler:
return svc.convErrorHandlerStep(g, out)
default:
return nil, errors.Internal("unsupported step kind %q", s.Kind)
}
}()
if err != nil {
return false, err
} else if conv != nil {
conv.SetID(s.ID)
g.AddStep(conv)
return true, err
} else {
// signal caller that we were unable to
// resolve definition at the moment
return false, nil
}
}
func (svc workflowConverter) convGateway(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (wfexec.Step, error) {
switch s.Ref {
case "fork":
return wfexec.ForkGateway(), nil
case "join":
var (
ss []wfexec.Step
)
for _, p := range in {
if parent := g.StepByID(p.ParentID); parent != nil {
ss = append(ss, parent)
} else {
// unresolved parent, come back later.
return nil, nil
}
}
return wfexec.JoinGateway(ss...), nil
case "incl", "excl":
var (
pp []*wfexec.GatewayPath
)
for _, c := range out {
child := g.StepByID(c.ChildID)
if child == nil {
return nil, nil
}
if len(c.Expr) > 0 {
if err := svc.parser.ParseEvaluators(c); err != nil {
return nil, err
}
}
// wrapping with fn to make sure that we're dealing with the right wf path inside gw-path tester fn
err := func(c types.WorkflowPath) error {
p, err := wfexec.NewGatewayPath(child, func(ctx context.Context, scope *expr.Vars) (bool, error) {
if len(c.Expr) == 0 {
return true, nil
}
return c.Test(ctx, scope)
})
if err != nil {
return err
}
pp = append(pp, p)
return nil
}(*c)
if err != nil {
return nil, err
}
}
if s.Ref == "excl" {
return wfexec.ExclGateway(pp...)
} else {
return wfexec.InclGateway(pp...)
}
}
return nil, fmt.Errorf("unknown gateway type")
}
func (svc workflowConverter) convErrorHandlerStep(g *wfexec.Graph, out []*types.WorkflowPath) (wfexec.Step, error) {
switch len(out) {
case 0:
return nil, fmt.Errorf("expecting at least one path out of error handling step")
case 1:
// remove error handler
return types.ErrorHandlerStep(nil), nil
case 2:
errorHandler := g.StepByID(out[1].ChildID)
if errorHandler == nil {
// wait for it to be resolved
return nil, nil
}
return types.ErrorHandlerStep(errorHandler), nil
default:
// this might be extended in the future to allow different paths using expression
// but then again, this can be solved by gateway path following the error handling step
return nil, fmt.Errorf("max 2 paths out of error handling step")
}
}
func (svc workflowConverter) convExpressionStep(s *types.WorkflowStep) (wfexec.Step, error) {
if err := svc.parseExpressions(s.Arguments...); err != nil {
return nil, err
}
return types.ExpressionsStep(s.Arguments...), nil
}
// internal debug step that can log entire
func (svc workflowConverter) convDebugStep(s *types.WorkflowStep) (wfexec.Step, error) {
if err := svc.parseExpressions(s.Arguments...); err != nil {
return nil, err
}
return types.DebugStep(svc.log), nil
}
func (svc workflowConverter) convFunctionStep(g *wfexec.Graph, s *types.WorkflowStep, out []*types.WorkflowPath) (wfexec.Step, error) {
if s.Ref == "" {
return nil, errors.Internal("function reference missing")
}
reg := Registry()
if def := reg.Function(s.Ref); def == nil {
return nil, errors.Internal("unknown function %q", s.Ref)
} else {
if def.Kind != string(s.Kind) {
return nil, fmt.Errorf("unexpected %s on %s step", def.Kind, s.Kind)
}
var (
err error
isIterator = def.Kind == types.FunctionKindIterator
)
if isIterator {
if len(out) != 2 {
return nil, fmt.Errorf("expecting exactly two paths (next, exit) out of iterator function step")
}
if def.Iterator == nil {
return nil, errors.Internal("iterator handler for %q not set", s.Ref)
}
} else {
if def.Handler == nil {
return nil, errors.Internal("function handler for %q not set", s.Ref)
}
}
if err = svc.parseExpressions(s.Arguments...); err != nil {
return nil, errors.Internal("failed to parse argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
} else if err = def.Parameters.VerifyArguments(s.Arguments); err != nil {
return nil, errors.Internal("failed to verify argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
}
if err = svc.parseExpressions(s.Results...); err != nil {
return nil, errors.Internal("failed to parse result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
} else if err = def.Results.VerifyResults(s.Results); err != nil {
return nil, errors.Internal("failed to verify result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err)
}
if isIterator {
var (
next = g.StepByID(out[0].ChildID)
exit = g.StepByID(out[1].ChildID)
)
if next == nil || exit == nil {
// wait for steps to be resolved
return nil, nil
}
return types.IteratorStep(def, s.Arguments, s.Results, next, exit)
} else {
return types.FunctionStep(def, s.Arguments, s.Results)
}
}
}
// creates error step
//
// Expects ZERO outgoing paths and
func (svc workflowConverter) convErrorStep(s *types.WorkflowStep, out types.WorkflowPathSet) (wfexec.Step, error) {
const (
argName = "message"
)
if len(out) > 0 {
return nil, errors.Internal("error step must be last step in branch")
}
var (
args = types.ExprSet(s.Arguments)
)
if msgArg := args.GetByTarget(argName); msgArg == nil {
return nil, errors.Internal("error step must have %s argument", argName)
} else if msgArg.Type != (expr.String{}).Type() {
return nil, errors.Internal("%s argument on error step must be string, got type '%s'", argName, msgArg.Type)
} else if len(args) > 1 {
return nil, errors.Internal("too many arguments on error step")
}
if err := svc.parseExpressions(args...); err != nil {
return nil, err
}
return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) {
var (
msg string
result, err = args.Eval(ctx, r.Scope)
)
if err != nil {
return nil, err
}
if result.Has(argName) {
str, _ := expr.NewString(expr.Must(result.Select(argName)))
msg = str.GetValue()
} else {
if aux, is := args.GetByTarget(argName).Value.(string); is {
msg = aux
} else {
msg = "ERROR"
}
}
return nil, errors.Automation(msg)
}), nil
}
// converts prompt definition to wfexec.Step
func (svc workflowConverter) convTerminationStep(out types.WorkflowPathSet) (wfexec.Step, error) {
if len(out) > 0 {
return nil, errors.Internal("termination step must be last step in branch")
}
return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) {
return wfexec.Termination(), nil
}), nil
}
// converts prompt definition to wfexec.Step
func (svc workflowConverter) convPromptStep(s *types.WorkflowStep) (wfexec.Step, error) {
if err := svc.parseExpressions(s.Arguments...); err != nil {
return nil, err
}
// Use expression step as base for prompt step
return types.PromptStep(s.Ref, types.ExpressionsStep(s.Arguments...)), nil
}
func (svc workflowConverter) parseExpressions(ee ...*types.Expr) (err error) {
for _, e := range ee {
if len(strings.TrimSpace(e.Expr)) > 0 {
if err = svc.parser.ParseEvaluators(e); err != nil {
return
}
}
if err = e.SetType(exprTypeSetter(svc.reg, e)); err != nil {
return err
}
for _, t := range e.Tests {
if err = svc.parser.ParseEvaluators(t); err != nil {
return
}
}
}
return nil
}
func verifyStep(step *types.WorkflowStep) types.WorkflowIssueSet {
var (
ii = types.WorkflowIssueSet{}
noArgs = func(s *types.WorkflowStep) error {
if len(s.Arguments) > 0 {
return errors.Internal("%s step does not accept arguments", s.Kind)
}
return nil
}
noResults = func(s *types.WorkflowStep) error {
if len(s.Results) > 0 {
return errors.Internal("%s step does not accept results", s.Kind)
}
return nil
}
checks = make([]func(s *types.WorkflowStep) error, 0)
)
switch step.Kind {
case types.WorkflowStepKindErrHandler:
checks = append(checks, noArgs, noResults)
case types.WorkflowStepKindDebug:
checks = append(checks, noResults)
case types.WorkflowStepKindVisual:
checks = append(checks, noArgs, noResults)
case types.WorkflowStepKindExpressions:
checks = append(checks, noResults, func(s *types.WorkflowStep) error {
if len(s.Arguments) == 0 {
return errors.Internal("%s step require at least one argument", s.Kind)
}
return nil
})
case types.WorkflowStepKindGateway:
checks = append(checks, noArgs, noResults)
case types.WorkflowStepKindError:
checks = append(checks, noResults)
case types.WorkflowStepKindTermination:
checks = append(checks, noArgs, noResults)
case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator:
case types.WorkflowStepKindPrompt:
checks = append(checks, noResults)
default:
return ii.Append(fmt.Errorf("unknown step kind"), nil)
}
for _, check := range checks {
if err := check(step); err != nil {
ii = ii.Append(err, nil)
}
}
return ii
}

View File

@ -35,6 +35,11 @@ type (
// This type is auto-generated.
WorkflowSet []*Workflow
// WorkflowIssueSet slice of WorkflowIssue
//
// This type is auto-generated.
WorkflowIssueSet []*WorkflowIssue
// WorkflowPathSet slice of WorkflowPath
//
// This type is auto-generated.
@ -300,6 +305,36 @@ func (set WorkflowSet) IDs() (IDs []uint64) {
return
}
// Walk iterates through every slice item and calls w(WorkflowIssue) err
//
// This function is auto-generated.
func (set WorkflowIssueSet) Walk(w func(*WorkflowIssue) error) (err error) {
for i := range set {
if err = w(set[i]); err != nil {
return
}
}
return
}
// Filter iterates through every slice item, calls f(WorkflowIssue) (bool, err) and return filtered slice
//
// This function is auto-generated.
func (set WorkflowIssueSet) Filter(f func(*WorkflowIssue) (bool, error)) (out WorkflowIssueSet, err error) {
var ok bool
out = WorkflowIssueSet{}
for i := range set {
if ok, err = f(set[i]); err != nil {
return
} else if ok {
out = append(out, set[i])
}
}
return
}
// Walk iterates through every slice item and calls w(WorkflowPath) err
//
// This function is auto-generated.

View File

@ -430,6 +430,62 @@ func TestWorkflowSetIDs(t *testing.T) {
}
}
func TestWorkflowIssueSetWalk(t *testing.T) {
var (
value = make(WorkflowIssueSet, 3)
req = require.New(t)
)
// check walk with no errors
{
err := value.Walk(func(*WorkflowIssue) error {
return nil
})
req.NoError(err)
}
// check walk with error
req.Error(value.Walk(func(*WorkflowIssue) error { return fmt.Errorf("walk error") }))
}
func TestWorkflowIssueSetFilter(t *testing.T) {
var (
value = make(WorkflowIssueSet, 3)
req = require.New(t)
)
// filter nothing
{
set, err := value.Filter(func(*WorkflowIssue) (bool, error) {
return true, nil
})
req.NoError(err)
req.Equal(len(set), len(value))
}
// filter one item
{
found := false
set, err := value.Filter(func(*WorkflowIssue) (bool, error) {
if !found {
found = true
return found, nil
}
return false, nil
})
req.NoError(err)
req.Len(set, 1)
}
// filter error
{
_, err := value.Filter(func(*WorkflowIssue) (bool, error) {
return false, fmt.Errorf("filter error")
})
req.Error(err)
}
}
func TestWorkflowPathSetWalk(t *testing.T) {
var (
value = make(WorkflowPathSet, 3)

View File

@ -8,6 +8,8 @@ types:
labelResourceType: workflow
WorkflowPath:
noIdField: true
WorkflowIssue:
noIdField: true
WorkflowStep: {}
Session: {}
State: {}

View File

@ -31,6 +31,9 @@ type (
Steps WorkflowStepSet `json:"steps"`
Paths WorkflowPathSet `json:"paths"`
// Collection of issues from the last parse
Issues WorkflowIssueSet `json:"issues,omitempty"`
RunAs uint64 `json:"runAs,string"`
OwnedBy uint64 `json:"ownedBy,string"`
@ -70,6 +73,12 @@ type (
Visual map[string]interface{} `json:"visual"`
}
WorkflowIssue struct {
// url encoded location of the error:
Culprit map[string]int `json:"culprit"`
Description string `json:"description"`
}
// WorkflowStep describes one workflow step
WorkflowStep struct {
ID uint64 `json:"stepID,string"`
@ -126,8 +135,6 @@ const (
WorkflowStepKindErrHandler WorkflowStepKind = "error-handler" // no ref
WorkflowStepKindVisual WorkflowStepKind = "visual" // ref = <*>
WorkflowStepKindDebug WorkflowStepKind = "debug" // ref = <*>
//WorkflowStepKindSubprocess WorkflowStepKind = "subprocess"
//WorkflowStepKindEvent WorkflowStepKind = "event" // ref = ??
)
// Resource returns a resource ID for this type
@ -159,6 +166,56 @@ func (vv *WorkflowMeta) Value() (driver.Value, error) {
return json.Marshal(vv)
}
// Scan on WorkflowStepSet gracefully handles conversion from NULL
func (set WorkflowIssueSet) Value() (driver.Value, error) {
return json.Marshal(set)
}
func (set *WorkflowIssueSet) Scan(value interface{}) error {
//lint:ignore S1034 This typecast is intentional, we need to get []byte out of a []uint8
switch value.(type) {
case nil:
*set = WorkflowIssueSet{}
case []uint8:
b := value.([]byte)
if err := json.Unmarshal(b, set); err != nil {
return fmt.Errorf("can not scan '%v' into WorkflowIssueSet: %w", string(b), err)
}
}
return nil
}
func (set WorkflowIssueSet) Error() string {
switch len(set) {
case 0:
return fmt.Sprintf("no workflow issue found")
case 1:
return fmt.Sprintf("1 workflow issue found")
default:
return fmt.Sprintf("%d workflow issues found", len(set))
}
}
func (set WorkflowIssueSet) Append(err error, culprit map[string]int) WorkflowIssueSet {
if culprit == nil {
culprit = make(map[string]int)
}
return append(set, &WorkflowIssue{
Culprit: culprit,
Description: err.Error(),
})
}
func (set WorkflowIssueSet) SetCulprit(name string, pos int) WorkflowIssueSet {
for i := range set {
set[i].Culprit[name] = pos
}
return set
}
func (t WorkflowPath) GetExpr() string { return t.Expr }
func (t *WorkflowPath) SetEval(eval expr.Evaluable) { t.eval = eval }
func (t WorkflowPath) Eval(ctx context.Context, scope *expr.Vars) (interface{}, error) {

View File

@ -14,6 +14,7 @@ fields:
- { field: Scope, type: "expr.Vars" }
- { field: Steps, type: "expr.Vars" }
- { field: Paths, type: "expr.Vars" }
- { field: Issues, type: "WorkflowIssueSet" }
- { field: RunAs, type: "uint64" }
- { field: OwnedBy }
- { field: CreatedBy }

View File

@ -451,6 +451,7 @@ func (s Store) internalAutomationWorkflowRowScanner(row rowScanner) (res *types.
&res.Scope,
&res.Steps,
&res.Paths,
&res.Issues,
&res.RunAs,
&res.OwnedBy,
&res.CreatedBy,
@ -507,6 +508,7 @@ func (Store) automationWorkflowColumns(aa ...string) []string {
alias + "scope",
alias + "steps",
alias + "paths",
alias + "issues",
alias + "run_as",
alias + "owned_by",
alias + "created_by",
@ -544,6 +546,7 @@ func (s Store) internalAutomationWorkflowEncoder(res *types.Workflow) store.Payl
"scope": res.Scope,
"steps": res.Steps,
"paths": res.Paths,
"issues": res.Issues,
"run_as": res.RunAs,
"owned_by": res.OwnedBy,
"created_by": res.CreatedBy,

View File

@ -581,6 +581,7 @@ func (Schema) AutomationWorkflows() *Table {
ColumnDef("scope", ColumnTypeJson),
ColumnDef("steps", ColumnTypeJson),
ColumnDef("paths", ColumnTypeJson),
ColumnDef("issues", ColumnTypeJson),
ColumnDef("run_as", ColumnTypeIdentifier),
ColumnDef("owned_by", ColumnTypeIdentifier),
CUDTimestamps,