Revert "Updates WF step verification logic before creating graph"
This reverts commit e2e7510c074238abd8ca043dcb6f04809ee8ac17.
This commit is contained in:
parent
e8bc614155
commit
7b065769da
@ -34,10 +34,10 @@ func Convert(wfService *workflow, wf *types.Workflow) (*wfexec.Graph, types.Work
|
||||
// Converts workflow definition to wf execution graph
|
||||
func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) {
|
||||
var (
|
||||
g = wfexec.NewGraph()
|
||||
wfii = types.WorkflowIssueSet{}
|
||||
IDs = make(map[uint64]int)
|
||||
makeGraphSteps func(types.WorkflowStepSet)
|
||||
g = wfexec.NewGraph()
|
||||
wfii = types.WorkflowIssueSet{}
|
||||
IDs = make(map[uint64]int)
|
||||
lastResStep *types.WorkflowStep
|
||||
)
|
||||
|
||||
// Basic step verification
|
||||
@ -68,9 +68,17 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type
|
||||
return true, nil
|
||||
})
|
||||
|
||||
makeGraphSteps = func(steps types.WorkflowStepSet) {
|
||||
pendingSteps := types.WorkflowStepSet{}
|
||||
for i, step := range steps {
|
||||
for g.Len() < len(ss) {
|
||||
progress := false
|
||||
lastResStep = nil
|
||||
|
||||
for _, step := range ss {
|
||||
lastResStep = step
|
||||
if g.StepByID(step.ID) != nil {
|
||||
// resolved
|
||||
continue
|
||||
}
|
||||
|
||||
// Collect all incoming and outgoing paths
|
||||
inPaths := make([]*types.WorkflowPath, 0, 8)
|
||||
outPaths := make([]*types.WorkflowPath, 0, 8)
|
||||
@ -83,8 +91,8 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type
|
||||
}
|
||||
|
||||
stepIssues := verifyStep(step, inPaths, outPaths)
|
||||
resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths)
|
||||
if err != nil {
|
||||
|
||||
if resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths); err != nil {
|
||||
switch aux := err.(type) {
|
||||
case types.WorkflowIssueSet:
|
||||
stepIssues = append(stepIssues, aux...)
|
||||
@ -92,28 +100,24 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type
|
||||
case error:
|
||||
stepIssues = stepIssues.Append(err, nil)
|
||||
}
|
||||
}
|
||||
|
||||
if !resolved && err == nil {
|
||||
pendingSteps = append(pendingSteps, step)
|
||||
} else if resolved {
|
||||
progress = true
|
||||
}
|
||||
|
||||
wfii = append(wfii, stepIssues.SetCulprit("step", IDs[step.ID])...)
|
||||
if i+1 == len(steps) && len(wfii) > 0 {
|
||||
var culprit = make(map[string]int)
|
||||
if step != nil {
|
||||
culprit = map[string]int{"step": IDs[step.ID]}
|
||||
}
|
||||
wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit)
|
||||
}
|
||||
}
|
||||
|
||||
if len(pendingSteps) > 0 && g.Len() < len(ss) {
|
||||
makeGraphSteps(pendingSteps)
|
||||
if !progress {
|
||||
var culprit = make(map[string]int)
|
||||
if lastResStep != nil {
|
||||
culprit = map[string]int{"step": IDs[lastResStep.ID]}
|
||||
}
|
||||
|
||||
// nothing resolved for 1 cycle
|
||||
wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit)
|
||||
break
|
||||
}
|
||||
return
|
||||
}
|
||||
makeGraphSteps(ss)
|
||||
|
||||
for pos, path := range def.Paths {
|
||||
if g.StepByID(path.ChildID) == nil {
|
||||
@ -158,6 +162,9 @@ func (svc workflowConverter) workflowStepDefConv(g *wfexec.Graph, s *types.Workf
|
||||
|
||||
conv, err := func() (wfexec.Step, error) {
|
||||
switch s.Kind {
|
||||
case types.WorkflowStepKindVisual:
|
||||
return nil, nil
|
||||
|
||||
case types.WorkflowStepKindDebug:
|
||||
return svc.convDebugStep(s)
|
||||
|
||||
|
||||
@ -2,9 +2,10 @@ package workflows
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/automation/types"
|
||||
)
|
||||
|
||||
@ -17,6 +18,7 @@ func Test0015_step_issue(t *testing.T) {
|
||||
loadScenario(ctx, t)
|
||||
|
||||
t.Run("exclusive gateway step issue", func(t *testing.T) {
|
||||
t.Skipf("workflow step resolution & validation need to be to be fixed")
|
||||
_, _, err := execWorkflow(ctx, "case1", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
@ -29,6 +31,7 @@ func Test0015_step_issue(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("inclusive gateway step issue", func(t *testing.T) {
|
||||
t.Skipf("workflow step resolution & validation need to be to be fixed")
|
||||
_, _, err := execWorkflow(ctx, "case2", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
@ -41,6 +44,7 @@ func Test0015_step_issue(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("function step issue", func(t *testing.T) {
|
||||
t.Skipf("workflow step resolution & validation need to be to be fixed")
|
||||
_, _, err := execWorkflow(ctx, "case3", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
@ -52,6 +56,7 @@ func Test0015_step_issue(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("iterator step issue", func(t *testing.T) {
|
||||
t.Skipf("workflow step resolution & validation need to be to be fixed")
|
||||
_, _, err := execWorkflow(ctx, "case4", types.WorkflowExecParams{})
|
||||
|
||||
issues, is := err.(types.WorkflowIssueSet)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user