From 7b065769da24b8e6dcf6684b6de5461ebea0fdf3 Mon Sep 17 00:00:00 2001 From: Denis Arh Date: Thu, 4 Nov 2021 10:53:33 +0100 Subject: [PATCH] Revert "Updates WF step verification logic before creating graph" This reverts commit e2e7510c074238abd8ca043dcb6f04809ee8ac17. --- automation/service/workflow_converter.go | 55 +++++++++++++----------- tests/workflows/0015_step_issue_test.go | 7 ++- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/automation/service/workflow_converter.go b/automation/service/workflow_converter.go index 006925a44..3fb55b5c3 100644 --- a/automation/service/workflow_converter.go +++ b/automation/service/workflow_converter.go @@ -34,10 +34,10 @@ func Convert(wfService *workflow, wf *types.Workflow) (*wfexec.Graph, types.Work // Converts workflow definition to wf execution graph func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) { var ( - g = wfexec.NewGraph() - wfii = types.WorkflowIssueSet{} - IDs = make(map[uint64]int) - makeGraphSteps func(types.WorkflowStepSet) + g = wfexec.NewGraph() + wfii = types.WorkflowIssueSet{} + IDs = make(map[uint64]int) + lastResStep *types.WorkflowStep ) // Basic step verification @@ -68,9 +68,17 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type return true, nil }) - makeGraphSteps = func(steps types.WorkflowStepSet) { - pendingSteps := types.WorkflowStepSet{} - for i, step := range steps { + for g.Len() < len(ss) { + progress := false + lastResStep = nil + + for _, step := range ss { + lastResStep = step + if g.StepByID(step.ID) != nil { + // resolved + continue + } + // Collect all incoming and outgoing paths inPaths := make([]*types.WorkflowPath, 0, 8) outPaths := make([]*types.WorkflowPath, 0, 8) @@ -83,8 +91,8 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type } stepIssues := verifyStep(step, inPaths, outPaths) - resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths) - if err != nil { + + if resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths); err != nil { switch aux := err.(type) { case types.WorkflowIssueSet: stepIssues = append(stepIssues, aux...) @@ -92,28 +100,24 @@ func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, type case error: stepIssues = stepIssues.Append(err, nil) } - } - - if !resolved && err == nil { - pendingSteps = append(pendingSteps, step) + } else if resolved { + progress = true } wfii = append(wfii, stepIssues.SetCulprit("step", IDs[step.ID])...) - if i+1 == len(steps) && len(wfii) > 0 { - var culprit = make(map[string]int) - if step != nil { - culprit = map[string]int{"step": IDs[step.ID]} - } - wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit) - } } - if len(pendingSteps) > 0 && g.Len() < len(ss) { - makeGraphSteps(pendingSteps) + if !progress { + var culprit = make(map[string]int) + if lastResStep != nil { + culprit = map[string]int{"step": IDs[lastResStep.ID]} + } + + // nothing resolved for 1 cycle + wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit) + break } - return } - makeGraphSteps(ss) for pos, path := range def.Paths { if g.StepByID(path.ChildID) == nil { @@ -158,6 +162,9 @@ func (svc workflowConverter) workflowStepDefConv(g *wfexec.Graph, s *types.Workf conv, err := func() (wfexec.Step, error) { switch s.Kind { + case types.WorkflowStepKindVisual: + return nil, nil + case types.WorkflowStepKindDebug: return svc.convDebugStep(s) diff --git a/tests/workflows/0015_step_issue_test.go b/tests/workflows/0015_step_issue_test.go index 7b47c5fe1..46b0a973d 100644 --- a/tests/workflows/0015_step_issue_test.go +++ b/tests/workflows/0015_step_issue_test.go @@ -2,9 +2,10 @@ package workflows import ( "context" - "github.com/stretchr/testify/require" "testing" + "github.com/stretchr/testify/require" + "github.com/cortezaproject/corteza-server/automation/types" ) @@ -17,6 +18,7 @@ func Test0015_step_issue(t *testing.T) { loadScenario(ctx, t) t.Run("exclusive gateway step issue", func(t *testing.T) { + t.Skipf("workflow step resolution & validation need to be to be fixed") _, _, err := execWorkflow(ctx, "case1", types.WorkflowExecParams{}) issues, is := err.(types.WorkflowIssueSet) @@ -29,6 +31,7 @@ func Test0015_step_issue(t *testing.T) { }) t.Run("inclusive gateway step issue", func(t *testing.T) { + t.Skipf("workflow step resolution & validation need to be to be fixed") _, _, err := execWorkflow(ctx, "case2", types.WorkflowExecParams{}) issues, is := err.(types.WorkflowIssueSet) @@ -41,6 +44,7 @@ func Test0015_step_issue(t *testing.T) { }) t.Run("function step issue", func(t *testing.T) { + t.Skipf("workflow step resolution & validation need to be to be fixed") _, _, err := execWorkflow(ctx, "case3", types.WorkflowExecParams{}) issues, is := err.(types.WorkflowIssueSet) @@ -52,6 +56,7 @@ func Test0015_step_issue(t *testing.T) { }) t.Run("iterator step issue", func(t *testing.T) { + t.Skipf("workflow step resolution & validation need to be to be fixed") _, _, err := execWorkflow(ctx, "case4", types.WorkflowExecParams{}) issues, is := err.(types.WorkflowIssueSet)