Updates WF step verification logic before creating graph
- Fixes duplicate issues for function, iterator and gateway steps - Extends workflow tests for above scenario
This commit is contained in:
@@ -34,10 +34,10 @@ func Convert(wfService *workflow, wf *types.Workflow) (*wfexec.Graph, types.Work
|
||||
// Converts workflow definition to wf execution graph
|
||||
func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) {
|
||||
var (
|
||||
g = wfexec.NewGraph()
|
||||
wfii = types.WorkflowIssueSet{}
|
||||
IDs = make(map[uint64]int)
|
||||
lastResStep *types.WorkflowStep
|
||||
g = wfexec.NewGraph()
|
||||
wfii = types.WorkflowIssueSet{}
|
||||
IDs = make(map[uint64]int)
|
||||
makeGraphSteps func(types.WorkflowStepSet)
|
||||
)
|
||||
|
||||
// Basic step verification
|
||||
@@ -68,17 +68,9 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type
|
||||
return true, nil
|
||||
})
|
||||
|
||||
for g.Len() < len(ss) {
|
||||
progress := false
|
||||
lastResStep = nil
|
||||
|
||||
for _, step := range ss {
|
||||
lastResStep = step
|
||||
if g.StepByID(step.ID) != nil {
|
||||
// resolved
|
||||
continue
|
||||
}
|
||||
|
||||
makeGraphSteps = func(steps types.WorkflowStepSet) {
|
||||
pendingSteps := types.WorkflowStepSet{}
|
||||
for i, step := range steps {
|
||||
// Collect all incoming and outgoing paths
|
||||
inPaths := make([]*types.WorkflowPath, 0, 8)
|
||||
outPaths := make([]*types.WorkflowPath, 0, 8)
|
||||
@@ -91,8 +83,8 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type
|
||||
}
|
||||
|
||||
stepIssues := verifyStep(step, inPaths, outPaths)
|
||||
|
||||
if resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths); err != nil {
|
||||
resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths)
|
||||
if err != nil {
|
||||
switch aux := err.(type) {
|
||||
case types.WorkflowIssueSet:
|
||||
stepIssues = append(stepIssues, aux...)
|
||||
@@ -100,24 +92,28 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type
|
||||
case error:
|
||||
stepIssues = stepIssues.Append(err, nil)
|
||||
}
|
||||
} else if resolved {
|
||||
progress = true
|
||||
}
|
||||
|
||||
if !resolved && err == nil {
|
||||
pendingSteps = append(pendingSteps, step)
|
||||
}
|
||||
|
||||
wfii = append(wfii, stepIssues.SetCulprit("step", IDs[step.ID])...)
|
||||
}
|
||||
|
||||
if !progress {
|
||||
var culprit = make(map[string]int)
|
||||
if lastResStep != nil {
|
||||
culprit = map[string]int{"step": IDs[lastResStep.ID]}
|
||||
if i+1 == len(steps) && len(wfii) > 0 {
|
||||
var culprit = make(map[string]int)
|
||||
if step != nil {
|
||||
culprit = map[string]int{"step": IDs[step.ID]}
|
||||
}
|
||||
wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit)
|
||||
}
|
||||
|
||||
// nothing resolved for 1 cycle
|
||||
wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit)
|
||||
break
|
||||
}
|
||||
|
||||
if len(pendingSteps) > 0 && g.Len() < len(ss) {
|
||||
makeGraphSteps(pendingSteps)
|
||||
}
|
||||
return
|
||||
}
|
||||
makeGraphSteps(ss)
|
||||
|
||||
for pos, path := range def.Paths {
|
||||
if g.StepByID(path.ChildID) == nil {
|
||||
@@ -162,9 +158,6 @@ func (svc workflowConverter) workflowStepDefConv(g *wfexec.Graph, s *types.Workf
|
||||
|
||||
conv, err := func() (wfexec.Step, error) {
|
||||
switch s.Kind {
|
||||
case types.WorkflowStepKindVisual:
|
||||
return nil, nil
|
||||
|
||||
case types.WorkflowStepKindDebug:
|
||||
return svc.convDebugStep(s)
|
||||
|
||||
|
||||
67
tests/workflows/0015_step_issue_test.go
Normal file
67
tests/workflows/0015_step_issue_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package workflows
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/automation/types"
|
||||
)
|
||||
|
||||
func Test0015_step_issue(t *testing.T) {
|
||||
var (
|
||||
ctx = bypassRBAC(context.Background())
|
||||
req = require.New(t)
|
||||
)
|
||||
|
||||
loadScenario(ctx, t)
|
||||
|
||||
t.Run("exclusive gateway step issue", func(t *testing.T) {
|
||||
_, _, err := execWorkflow(ctx, "case1", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
req.True(is)
|
||||
// It should return only 3 issues
|
||||
// 1. gateway step expects at least 1 outbound path(s)
|
||||
// 2. expecting at least two paths for exclusive gateway
|
||||
// 3. failed to resolve workflow step dependencies
|
||||
req.Len(issues, 3)
|
||||
})
|
||||
|
||||
t.Run("inclusive gateway step issue", func(t *testing.T) {
|
||||
_, _, err := execWorkflow(ctx, "case2", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
req.True(is)
|
||||
// It should return only 3 issues
|
||||
// 1. gateway step expects at least 1 outbound path(s)
|
||||
// 2. expecting at least two paths for inclusive gateway
|
||||
// 3. failed to resolve workflow step dependencies
|
||||
req.Len(issues, 3)
|
||||
})
|
||||
|
||||
t.Run("function step issue", func(t *testing.T) {
|
||||
_, _, err := execWorkflow(ctx, "case3", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
req.True(is)
|
||||
// It should return only 2 issues
|
||||
// 1. failed to verify argument expressions for function logInfo: parameter message is required
|
||||
// 2. failed to resolve workflow step dependencies
|
||||
req.Len(issues, 2)
|
||||
})
|
||||
|
||||
t.Run("iterator step issue", func(t *testing.T) {
|
||||
_, _, err := execWorkflow(ctx, "case4", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
req.True(is)
|
||||
// It should return only 4 issues
|
||||
// 1. iterator step expects reference
|
||||
// 2. iterator step expects exactly 2 outbound path(s)
|
||||
// 3. unknown function ""
|
||||
// 4. failed to resolve workflow step dependencies
|
||||
req.Len(issues, 4)
|
||||
})
|
||||
|
||||
}
|
||||
83
tests/workflows/testdata/S0015_step_issue/workflow.yaml
vendored
Normal file
83
tests/workflows/testdata/S0015_step_issue/workflow.yaml
vendored
Normal file
@@ -0,0 +1,83 @@
|
||||
workflows:
|
||||
case1:
|
||||
enabled: true
|
||||
trace: true
|
||||
triggers:
|
||||
- enabled: true
|
||||
stepID: 1
|
||||
|
||||
steps:
|
||||
- stepID: 1
|
||||
kind: expressions
|
||||
arguments: [ { target: foo, type: Integer, expr: "40" } ]
|
||||
- stepID: 2
|
||||
kind: expressions
|
||||
arguments: [ { target: bar, type: Integer, expr: "50" } ]
|
||||
- stepID: 3
|
||||
kind: gateway
|
||||
ref: excl
|
||||
|
||||
paths:
|
||||
- { parentID: 1, childID: 2 }
|
||||
|
||||
case2:
|
||||
enabled: true
|
||||
trace: true
|
||||
triggers:
|
||||
- enabled: true
|
||||
stepID: 1
|
||||
|
||||
steps:
|
||||
- stepID: 1
|
||||
kind: expressions
|
||||
arguments: [ { target: foo, type: Integer, expr: "40" } ]
|
||||
- stepID: 2
|
||||
kind: expressions
|
||||
arguments: [ { target: bar, type: Integer, expr: "50" } ]
|
||||
- stepID: 3
|
||||
kind: gateway
|
||||
ref: incl
|
||||
|
||||
paths:
|
||||
- { parentID: 1, childID: 2 }
|
||||
|
||||
case3:
|
||||
enabled: true
|
||||
trace: true
|
||||
triggers:
|
||||
- enabled: true
|
||||
stepID: 1
|
||||
|
||||
steps:
|
||||
- stepID: 1
|
||||
kind: expressions
|
||||
arguments: [ { target: foo, type: Integer, expr: "40" } ]
|
||||
- stepID: 2
|
||||
kind: expressions
|
||||
arguments: [ { target: bar, type: Integer, expr: "50" } ]
|
||||
- stepID: 3
|
||||
kind: function
|
||||
ref: logInfo
|
||||
|
||||
paths:
|
||||
- { parentID: 1, childID: 2 }
|
||||
|
||||
case4:
|
||||
enabled: true
|
||||
trace: true
|
||||
triggers:
|
||||
- enabled: true
|
||||
stepID: 1
|
||||
|
||||
steps:
|
||||
- stepID: 1
|
||||
kind: expressions
|
||||
arguments: [ { target: foo, type: Integer, expr: "40" } ]
|
||||
- stepID: 2
|
||||
kind: expressions
|
||||
arguments: [ { target: bar, type: Integer, expr: "50" } ]
|
||||
- stepID: 3
|
||||
kind: iterator
|
||||
|
||||
paths:
|
||||
- { parentID: 1, childID: 2 }
|
||||
Reference in New Issue
Block a user