From a0ecf0537f345d8da46eaa9ee04843d2e70abfa4 Mon Sep 17 00:00:00 2001 From: Denis Arh Date: Fri, 12 Feb 2021 06:49:07 +0100 Subject: [PATCH] Refactored workflow converter, add issues --- automation/service/session.go | 18 +- automation/service/trigger.go | 196 +++++---- automation/service/workflow.go | 481 +------------------- automation/service/workflow_converter.go | 531 +++++++++++++++++++++++ automation/types/type_set.gen.go | 35 ++ automation/types/type_set.gen_test.go | 56 +++ automation/types/types.yaml | 2 + automation/types/workflow.go | 61 ++- store/automation_workflows.yaml | 1 + store/rdbms/automation_workflows.gen.go | 3 + store/rdbms/rdbms_schema.go | 1 + 11 files changed, 804 insertions(+), 581 deletions(-) create mode 100644 automation/service/workflow_converter.go diff --git a/automation/service/session.go b/automation/service/session.go index d8027807a..2b6337051 100644 --- a/automation/service/session.go +++ b/automation/service/session.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "github.com/cortezaproject/corteza-server/automation/types" "github.com/cortezaproject/corteza-server/pkg/actionlog" "github.com/cortezaproject/corteza-server/pkg/auth" @@ -118,29 +117,28 @@ func (svc *session) suspendAll(ctx context.Context) error { // It does not check user's permissions to execute workflow(s) so it should be used only when ! func (svc *session) Start(g *wfexec.Graph, i auth.Identifiable, ssp types.SessionStartParams) (wait WaitFn, err error) { var ( - ctx = auth.SetIdentityToContext(context.Background(), i) - ses = svc.spawn(g, ssp.Trace) start wfexec.Step ) - wait = func(ctx context.Context) (*expr.Vars, error) { - return &expr.Vars{}, nil // no-op... - } - if ssp.StepID == 0 { // starting step is not explicitly workflows on trigger, find orphan step switch oo := g.Orphans(); len(oo) { case 1: start = oo[0] case 0: - return nil, fmt.Errorf("could not find step without parents") + return nil, errors.InvalidData("could not find starting step") default: - return nil, fmt.Errorf("multiple steps without parents") + return nil, errors.InvalidData("can not start workflow session multiple starting steps found") } } else if start = g.StepByID(ssp.StepID); start == nil { - return nil, fmt.Errorf("trigger staring step references nonexisting step") + return nil, errors.InvalidData("trigger staring step references nonexisting step") } + var ( + ctx = auth.SetIdentityToContext(context.Background(), i) + ses = svc.spawn(g, ssp.Trace) + ) + ses.CreatedAt = *now() ses.CreatedBy = i.Identity() ses.Apply(ssp) diff --git a/automation/service/trigger.go b/automation/service/trigger.go index 283041774..5731fade1 100644 --- a/automation/service/trigger.go +++ b/automation/service/trigger.go @@ -437,26 +437,25 @@ func (svc *trigger) registerWorkflow(ctx context.Context, wf *types.Workflow, tt // It preloads run-as identity and finds a starting step for each trigger func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable, tt ...*types.Trigger) { var ( - err error - g *wfexec.Graph - wfLog = svc.log. - WithOptions(zap.AddStacktrace(zap.DPanicLevel)). - With(zap.Uint64("workflowID", wf.ID)) + handlerFn eventbus.HandlerFn + err error + g *wfexec.Graph + issues types.WorkflowIssueSet + wfLog = svc.log. + WithOptions(zap.AddStacktrace(zap.DPanicLevel)). + With(zap.Uint64("workflowID", wf.ID)) + + // register only enabled, undeleted workflows + registerWorkflow = wf.Enabled || wf.DeletedAt == nil ) - if !wf.Enabled { - wfLog.Debug("skipping disabled workflow") - return - } - - if wf.DeletedAt != nil { - wfLog.Debug("skipping deleted workflow") - return - } - - if g, err = svc.workflow.toGraph(wf); err != nil { - wfLog.Error("failed to convert workflow to graph", zap.Error(err)) - return + // convert only registerable and issuless workflwos + if registerWorkflow && len(wf.Issues) == 0 { + // Convert workflow only when valid (no issues, enable, not delete) + if g, issues = Convert(svc.workflow, wf); len(issues) > 0 { + wfLog.Error("failed to convert workflow to graph", zap.Error(issues)) + g = nil + } } defer svc.mux.Unlock() @@ -465,85 +464,38 @@ func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable for _, t := range tt { log := wfLog.With(zap.Uint64("triggerID", t.ID)) - if !t.Enabled { - log.Debug("skipping disabled trigger") - continue + // always unregister + if svc.reg[wf.ID] == nil { + svc.reg[wf.ID] = make(map[uint64]uintptr) + } else if ptr := svc.reg[wf.ID][t.ID]; ptr != 0 { + // unregister handlers for this trigger if they exist + svc.eventbus.Unregister(ptr) } - if t.DeletedAt != nil { - log.Debug("skipping deleted trigger") + // do not register disabled or deleted triggers + if !registerWorkflow || !t.Enabled || t.DeletedAt != nil { continue } var ( - handler = func(ctx context.Context, ev eventbus.Event) (err error) { - - var ( - // create session scope from predefined workflow scope and trigger input - scope = wf.Scope.Merge(t.Input) - evScope *expr.Vars - wait WaitFn - ) - - if enc, is := ev.(varsEncoder); is { - if evScope, err = enc.EncodeVars(); err != nil { - return - } - - scope = scope.Merge(evScope) - } - - _ = scope.AssignFieldValue("eventType", expr.Must(expr.NewString(ev.EventType()))) - _ = scope.AssignFieldValue("resourceType", expr.Must(expr.NewString(ev.ResourceType()))) - - if runAs == nil { - // @todo can/should we get alternative identity from Event? - // for example: - // - use http auth header and get username - // - use from/to/replyTo and use that as an identifier - runAs = auth.GetIdentityFromContext(ctx) - } - - log.Debug("handling triggered workflow", - zap.Any("event", ev), - zap.Uint64("runAs", runAs.Identity()), - ) - - wait, err = svc.session.Start(g, runAs, types.SessionStartParams{ - WorkflowID: wf.ID, - KeepFor: wf.KeepSessions, - Trace: wf.Trace, - Input: scope, - StepID: t.StepID, - EventType: t.EventType, - ResourceType: t.ResourceType, - }) - - if err != nil { - log.Error("workflow error", zap.Error(err)) - return err - } - - // wait for the workflow to complete - // reuse scope for results - // this will be decoded back to event properties - scope, err = wait(ctx) - if err != nil { - return - } - - if dec, is := ev.(varsDecoder); is { - return dec.DecodeVars(scope) - } - - return nil - } - - ops = make([]eventbus.HandlerRegOp, 0, len(t.Constraints)+2) cnstr eventbus.ConstraintMatcher - err error + ops = make([]eventbus.HandlerRegOp, 0, len(t.Constraints)+2) ) + if g == nil { + handlerFn = func(_ context.Context, ev eventbus.Event) error { + return errors.Internal( + "trigger %s on %s failed due to invalid workflow %d: %w", + ev.EventType(), + ev.ResourceType(), + wf.ID, + wf.Issues, + ) + } + } else { + handlerFn = makeWorkflowHandler(svc.session, t, wf, g, runAs) + } + ops = append( ops, eventbus.On(t.EventType), @@ -562,14 +514,7 @@ func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable } } - if svc.reg[wf.ID] == nil { - svc.reg[wf.ID] = make(map[uint64]uintptr) - } else if ptr := svc.reg[wf.ID][t.ID]; ptr != 0 { - // unregister handlers for this trigger if they exist - svc.eventbus.Unregister(ptr) - } - - svc.reg[wf.ID][t.ID] = svc.eventbus.Register(handler, ops...) + svc.reg[wf.ID][t.ID] = svc.eventbus.Register(handlerFn, ops...) log.Debug("trigger registered", zap.String("eventType", t.EventType), @@ -611,6 +556,65 @@ func (svc *trigger) unregisterTriggers(tt ...*types.Trigger) { } } +func makeWorkflowHandler(s *session, t *types.Trigger, wf *types.Workflow, g *wfexec.Graph, runAs auth.Identifiable) eventbus.HandlerFn { + return func(ctx context.Context, ev eventbus.Event) (err error) { + + var ( + // create session scope from predefined workflow scope and trigger input + scope = wf.Scope.Merge(t.Input) + evScope *expr.Vars + wait WaitFn + ) + + if enc, is := ev.(varsEncoder); is { + if evScope, err = enc.EncodeVars(); err != nil { + return + } + + scope = scope.Merge(evScope) + } + + _ = scope.AssignFieldValue("eventType", expr.Must(expr.NewString(ev.EventType()))) + _ = scope.AssignFieldValue("resourceType", expr.Must(expr.NewString(ev.ResourceType()))) + + if runAs == nil { + // @todo can/should we get alternative identity from Event? + // for example: + // - use http auth header and get username + // - use from/to/replyTo and use that as an identifier + runAs = auth.GetIdentityFromContext(ctx) + } + + wait, err = s.Start(g, runAs, types.SessionStartParams{ + WorkflowID: wf.ID, + KeepFor: wf.KeepSessions, + Trace: wf.Trace, + Input: scope, + StepID: t.StepID, + EventType: t.EventType, + ResourceType: t.ResourceType, + }) + + if err != nil { + return err + } + + // wait for the workflow to complete + // reuse scope for results + // this will be decoded back to event properties + scope, err = wait(ctx) + if err != nil { + return + } + + if dec, is := ev.(varsDecoder); is { + return dec.DecodeVars(scope) + } + + return nil + } +} + func loadTrigger(ctx context.Context, s store.Storer, workflowID uint64) (res *types.Trigger, err error) { if workflowID == 0 { return nil, TriggerErrInvalidID() diff --git a/automation/service/workflow.go b/automation/service/workflow.go index a29cf04b3..4be3dac43 100644 --- a/automation/service/workflow.go +++ b/automation/service/workflow.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "github.com/cortezaproject/corteza-server/automation/types" "github.com/cortezaproject/corteza-server/pkg/actionlog" intAuth "github.com/cortezaproject/corteza-server/pkg/auth" @@ -17,7 +16,6 @@ import ( "github.com/cortezaproject/corteza-server/store" "go.uber.org/zap" "reflect" - "strings" "sync" ) @@ -187,10 +185,6 @@ func (svc *workflow) Create(ctx context.Context, new *types.Workflow) (wf *types return err } - if err = validateSteps(new.Steps...); err != nil { - return - } - wf = &types.Workflow{ ID: nextID(), Handle: new.Handle, @@ -211,6 +205,8 @@ func (svc *workflow) Create(ctx context.Context, new *types.Workflow) (wf *types CreatedBy: cUser, } + _, wf.Issues = Convert(svc, wf) + if err = store.CreateAutomationWorkflow(ctx, s, wf); err != nil { return } @@ -264,7 +260,7 @@ func (svc workflow) uniqueCheck(ctx context.Context, res *types.Workflow) (err e return nil } -func (svc workflow) updater(ctx context.Context, workflowID uint64, action func(...*workflowActionProps) *workflowAction, fn workflowUpdateHandler) (*types.Workflow, error) { +func (svc *workflow) updater(ctx context.Context, workflowID uint64, action func(...*workflowActionProps) *workflowAction, fn workflowUpdateHandler) (*types.Workflow, error) { var ( changes workflowChanges res *types.Workflow @@ -297,9 +293,12 @@ func (svc workflow) updater(ctx context.Context, workflowID uint64, action func( } if changes&workflowDefChanged > 0 { - if err = svc.triggers.registerWorkflows(ctx, res); err != nil { - return err + if _, res.Issues = Convert(svc, res); len(res.Issues) == 0 { + if err = svc.triggers.registerWorkflows(ctx, res); err != nil { + return err + } } + } if changes&workflowLabelsChanged > 0 { @@ -375,10 +374,6 @@ func (svc workflow) handleUpdate(upd *types.Workflow) workflowUpdateHandler { if upd.Steps != nil { if !reflect.DeepEqual(upd.Steps, res.Steps) { - if err = validateSteps(upd.Steps...); err != nil { - return - } - changes |= workflowChanged | workflowDefChanged res.Steps = upd.Steps } @@ -452,466 +447,6 @@ func (svc *workflow) Load(ctx context.Context) error { return svc.triggers.registerWorkflows(ctx, wwf...) } -// Converts workflow definition to wf execution graph -func (svc *workflow) toGraph(def *types.Workflow) (*wfexec.Graph, error) { - var ( - g = wfexec.NewGraph() - ) - - for g.Len() < len(def.Steps) { - progress := false - for _, step := range def.Steps { - if g.StepByID(step.ID) != nil { - // resolved - continue - } - - if step.Kind == types.WorkflowStepKindVisual { - // make sure visual steps are skipped - continue - } - - // Collect all incoming and outgoing paths - inPaths := make([]*types.WorkflowPath, 0, 8) - outPaths := make([]*types.WorkflowPath, 0, 8) - for _, path := range def.Paths { - if path.ChildID == step.ID { - inPaths = append(inPaths, path) - } else if path.ParentID == step.ID { - outPaths = append(outPaths, path) - } - } - - if resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths); err != nil { - return nil, err - } else if resolved { - progress = true - } - } - - if !progress { - // nothing resolved - return nil, errors.Internal("failed to resolve workflow step dependencies") - } - } - - for _, path := range def.Paths { - if g.StepByID(path.ChildID) == nil { - return nil, errors.Internal("failed to resolve step with ID %d", path.ChildID) - } - - if g.StepByID(path.ParentID) == nil { - return nil, errors.Internal("failed to resolve step with ID %d", path.ParentID) - } - - g.AddParent( - g.StepByID(path.ChildID), - g.StepByID(path.ParentID), - ) - } - - return g, nil -} - -// converts all step definitions into workflow.Step instances -// -// if this func returns nil for step and error, assume unresolved dependencies -func (svc *workflow) workflowStepDefConv(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (bool, error) { - conv, err := func() (wfexec.Step, error) { - switch s.Kind { - case types.WorkflowStepKindVisual: - return nil, nil - case types.WorkflowStepKindDebug: - return svc.convDebugStep(s) - - case types.WorkflowStepKindExpressions: - return svc.convExpressionStep(s) - - case types.WorkflowStepKindGateway: - return svc.convGateway(g, s, in, out) - - case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator: - return svc.convFunctionStep(g, s, out) - - case types.WorkflowStepKindError: - return svc.convErrorStep(s, out) - - case types.WorkflowStepKindTermination: - return svc.convTerminationStep(out) - - case types.WorkflowStepKindPrompt: - return svc.convPromptStep(s) - - case types.WorkflowStepKindErrHandler: - return svc.convErrorHandlerStep(g, out) - - default: - return nil, errors.Internal("unsupported step kind %q", s.Kind) - } - }() - - if err != nil { - return false, err - } else if conv != nil { - conv.SetID(s.ID) - g.AddStep(conv) - return true, err - } else { - // signal caller that we were unable to - // resolve definition at the moment - return false, nil - } -} - -func (svc *workflow) convGateway(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (wfexec.Step, error) { - switch s.Ref { - case "fork": - return wfexec.ForkGateway(), nil - - case "join": - var ( - ss []wfexec.Step - ) - for _, p := range in { - if parent := g.StepByID(p.ParentID); parent != nil { - ss = append(ss, parent) - } else { - // unresolved parent, come back later. - return nil, nil - } - } - - return wfexec.JoinGateway(ss...), nil - - case "incl", "excl": - var ( - pp []*wfexec.GatewayPath - ) - - for _, c := range out { - child := g.StepByID(c.ChildID) - if child == nil { - return nil, nil - } - - if len(c.Expr) > 0 { - if err := svc.parser.ParseEvaluators(c); err != nil { - return nil, err - } - } - - // wrapping with fn to make sure that we're dealing with the right wf path inside gw-path tester fn - err := func(c types.WorkflowPath) error { - p, err := wfexec.NewGatewayPath(child, func(ctx context.Context, scope *expr.Vars) (bool, error) { - if len(c.Expr) == 0 { - return true, nil - } - - return c.Test(ctx, scope) - }) - - if err != nil { - return err - } - - pp = append(pp, p) - return nil - }(*c) - - if err != nil { - return nil, err - } - } - - if s.Ref == "excl" { - return wfexec.ExclGateway(pp...) - } else { - return wfexec.InclGateway(pp...) - } - } - - return nil, fmt.Errorf("unknown gateway type") -} - -func (svc *workflow) convErrorHandlerStep(g *wfexec.Graph, out []*types.WorkflowPath) (wfexec.Step, error) { - switch len(out) { - case 0: - return nil, fmt.Errorf("expecting at least one path out of error handling step") - case 1: - // remove error handler - return types.ErrorHandlerStep(nil), nil - case 2: - errorHandler := g.StepByID(out[1].ChildID) - if errorHandler == nil { - // wait for it to be resolved - return nil, nil - } - - return types.ErrorHandlerStep(errorHandler), nil - - default: - // this might be extended in the future to allow different paths using expression - // but then again, this can be solved by gateway path following the error handling step - return nil, fmt.Errorf("max 2 paths out of error handling step") - } -} - -func (svc *workflow) convExpressionStep(s *types.WorkflowStep) (wfexec.Step, error) { - if err := svc.parseExpressions(s.Arguments...); err != nil { - return nil, err - } - - return types.ExpressionsStep(s.Arguments...), nil -} - -// internal debug step that can log entire -func (svc *workflow) convDebugStep(s *types.WorkflowStep) (wfexec.Step, error) { - if err := svc.parseExpressions(s.Arguments...); err != nil { - return nil, err - } - - return types.DebugStep(svc.log), nil -} - -func (svc *workflow) convFunctionStep(g *wfexec.Graph, s *types.WorkflowStep, out []*types.WorkflowPath) (wfexec.Step, error) { - if s.Ref == "" { - return nil, errors.Internal("function reference missing") - } - - reg := Registry() - - if def := reg.Function(s.Ref); def == nil { - return nil, errors.Internal("unknown function %q", s.Ref) - } else { - if def.Kind != string(s.Kind) { - return nil, fmt.Errorf("unexpected %s on %s step", def.Kind, s.Kind) - } - - var ( - err error - isIterator = def.Kind == types.FunctionKindIterator - ) - - if isIterator { - if len(out) != 2 { - return nil, fmt.Errorf("expecting exactly two paths (next, exit) out of iterator function step") - } - - if def.Iterator == nil { - return nil, errors.Internal("iterator handler for %q not set", s.Ref) - } - } else { - if def.Handler == nil { - return nil, errors.Internal("function handler for %q not set", s.Ref) - } - } - - if err = svc.parseExpressions(s.Arguments...); err != nil { - return nil, errors.Internal("failed to parse argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) - } else if err = def.Parameters.VerifyArguments(s.Arguments); err != nil { - return nil, errors.Internal("failed to verify argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) - } - - if err = svc.parseExpressions(s.Results...); err != nil { - return nil, errors.Internal("failed to parse result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) - } else if err = def.Results.VerifyResults(s.Results); err != nil { - return nil, errors.Internal("failed to verify result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) - } - - if isIterator { - var ( - next = g.StepByID(out[0].ChildID) - exit = g.StepByID(out[1].ChildID) - ) - - if next == nil || exit == nil { - // wait for steps to be resolved - return nil, nil - } - - return types.IteratorStep(def, s.Arguments, s.Results, next, exit) - - } else { - return types.FunctionStep(def, s.Arguments, s.Results) - } - } -} - -// creates error step -// -// Expects ZERO outgoing paths and -func (svc *workflow) convErrorStep(s *types.WorkflowStep, out types.WorkflowPathSet) (wfexec.Step, error) { - const ( - argName = "message" - ) - - if len(out) > 0 { - return nil, errors.Internal("error step must be last step in branch") - } - - var ( - args = types.ExprSet(s.Arguments) - ) - - if msgArg := args.GetByTarget(argName); msgArg == nil { - return nil, errors.Internal("error step must have %s argument", argName) - } else if msgArg.Type != (expr.String{}).Type() { - return nil, errors.Internal("%s argument on error step must be string, got type '%s'", argName, msgArg.Type) - } else if len(args) > 1 { - return nil, errors.Internal("too many arguments on error step") - } - - if err := svc.parseExpressions(args...); err != nil { - return nil, err - } - - return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) { - var ( - msg string - result, err = args.Eval(ctx, r.Scope) - ) - if err != nil { - return nil, err - } - - if result.Has(argName) { - str, _ := expr.NewString(expr.Must(result.Select(argName))) - msg = str.GetValue() - } else { - if aux, is := args.GetByTarget(argName).Value.(string); is { - msg = aux - } else { - msg = "ERROR" - } - } - - return nil, errors.Automation(msg) - }), nil -} - -// converts prompt definition to wfexec.Step -func (svc *workflow) convTerminationStep(out types.WorkflowPathSet) (wfexec.Step, error) { - if len(out) > 0 { - return nil, errors.Internal("termination step must be last step in branch") - } - - return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) { - return wfexec.Termination(), nil - }), nil -} - -// converts prompt definition to wfexec.Step -func (svc *workflow) convPromptStep(s *types.WorkflowStep) (wfexec.Step, error) { - if err := svc.parseExpressions(s.Arguments...); err != nil { - return nil, err - } - - // Use expression step as base for prompt step - return types.PromptStep(s.Ref, types.ExpressionsStep(s.Arguments...)), nil -} - -func (svc *workflow) parseExpressions(ee ...*types.Expr) (err error) { - for _, e := range ee { - - if len(strings.TrimSpace(e.Expr)) > 0 { - if err = svc.parser.ParseEvaluators(e); err != nil { - return - } - } - - if err = e.SetType(exprTypeSetter(svc.reg, e)); err != nil { - return err - } - - for _, t := range e.Tests { - if err = svc.parser.ParseEvaluators(t); err != nil { - return - } - } - } - - return nil -} - -func validateSteps(ss ...*types.WorkflowStep) error { - var ( - IDs = make(map[uint64]int) - - noArgs = func(i int, s *types.WorkflowStep) error { - if len(s.Arguments) > 0 { - return errors.Internal("%s step (ID=%d, position=%d) does not accept arguments", s.Kind, s.ID, i) - } - - return nil - } - - reqArgs = func(i int, s *types.WorkflowStep) error { - if len(s.Arguments) == 0 { - return errors.Internal("%s step (ID=%d, position=%d) require defined arguments", s.Kind, s.ID, i) - } - - return nil - } - - noResults = func(i int, s *types.WorkflowStep) error { - if len(s.Results) > 0 { - return errors.Internal("%s step (ID=%d, position=%d) does not accept results", s.Kind, s.ID, i) - } - - return nil - } - - checks = make([]func(i int, s *types.WorkflowStep) error, 0) - ) - - for i, s := range ss { - if p, has := IDs[s.ID]; has { - return fmt.Errorf("duplicate step ID (%d) used for steps on positions %d and %d", s.ID, p, i) - } - - IDs[s.ID] = i - checks = nil - - switch s.Kind { - case types.WorkflowStepKindErrHandler: - - case types.WorkflowStepKindDebug: - checks = append(checks, noResults) - - case types.WorkflowStepKindVisual: - checks = append(checks, noArgs, noResults) - - case types.WorkflowStepKindExpressions: - checks = append(checks, reqArgs, noResults) - - case types.WorkflowStepKindGateway: - checks = append(checks, noArgs, noResults) - - case types.WorkflowStepKindError: - checks = append(checks, noResults) - - case types.WorkflowStepKindTermination: - checks = append(checks, noArgs, noResults) - - case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator: - - case types.WorkflowStepKindPrompt: - checks = append(checks, noResults) - - default: - return errors.Internal("unknown step kind (ID=%d, position=%d)", s.ID, i) - } - - for _, check := range checks { - if err := check(i, s); err != nil { - return err - } - } - } - - return nil -} - func loadWorkflow(ctx context.Context, s store.Storer, workflowID uint64) (res *types.Workflow, err error) { if workflowID == 0 { return nil, WorkflowErrInvalidID() diff --git a/automation/service/workflow_converter.go b/automation/service/workflow_converter.go new file mode 100644 index 000000000..e9b37e72c --- /dev/null +++ b/automation/service/workflow_converter.go @@ -0,0 +1,531 @@ +package service + +import ( + "context" + "fmt" + "github.com/cortezaproject/corteza-server/automation/types" + "github.com/cortezaproject/corteza-server/pkg/errors" + "github.com/cortezaproject/corteza-server/pkg/expr" + "github.com/cortezaproject/corteza-server/pkg/wfexec" + "go.uber.org/zap" + "strings" +) + +type ( + workflowConverter struct { + // workflow function registry + reg *registry + parser expr.Parsable + log *zap.Logger + } +) + +func Convert(wfService *workflow, wf *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) { + conv := &workflowConverter{ + reg: wfService.reg, + parser: wfService.parser, + log: wfService.log, + } + + return conv.makeGraph(wf) +} + +// Converts workflow definition to wf execution graph +func (svc workflowConverter) makeGraph(def *types.Workflow) (*wfexec.Graph, types.WorkflowIssueSet) { + var ( + g = wfexec.NewGraph() + wfii = types.WorkflowIssueSet{} + IDs = make(map[uint64]int) + lastResStep *types.WorkflowStep + ) + + // Basic step verification + for i, s := range def.Steps { + if _, has := IDs[s.ID]; has { + wfii = wfii.Append( + fmt.Errorf("step ID not unique"), + map[string]int{"step": i, "duplicate": IDs[s.ID]}, + ) + } else { + IDs[s.ID] = i + } + } + + // if we have one or more duplicated-id error we need to abort right away + // because all further step positions in issue culprit will be invalid + if len(wfii) > 0 { + return nil, wfii + } + + for g.Len() < len(def.Steps) { + progress := false + lastResStep = nil + for _, step := range def.Steps { + lastResStep = step + if g.StepByID(step.ID) != nil { + // resolved + continue + } + + stepIssues := verifyStep(step) + + if step.Kind == types.WorkflowStepKindVisual { + // make sure visual steps are skipped + continue + } + + // Collect all incoming and outgoing paths + inPaths := make([]*types.WorkflowPath, 0, 8) + outPaths := make([]*types.WorkflowPath, 0, 8) + for _, path := range def.Paths { + if path.ChildID == step.ID { + inPaths = append(inPaths, path) + } else if path.ParentID == step.ID { + outPaths = append(outPaths, path) + } + } + + if resolved, err := svc.workflowStepDefConv(g, step, inPaths, outPaths); err != nil { + switch aux := err.(type) { + case types.WorkflowIssueSet: + stepIssues = append(stepIssues, aux...) + continue + case error: + stepIssues = stepIssues.Append(err, nil) + } + } else if resolved { + progress = true + } + + wfii = append(wfii, stepIssues.SetCulprit("step", IDs[step.ID])...) + } + + if !progress { + var culprit = make(map[string]int) + if lastResStep != nil { + culprit = map[string]int{"step": IDs[lastResStep.ID]} + } + + // nothing resolved for 1 cycle + wfii = wfii.Append(fmt.Errorf("failed to resolve workflow step dependencies"), culprit) + break + } + } + + for pos, path := range def.Paths { + if g.StepByID(path.ChildID) == nil { + wfii = wfii.Append(fmt.Errorf("failed to resolve step with ID %d", path.ChildID), map[string]int{"path": pos}) + continue + } + + if g.StepByID(path.ParentID) == nil { + wfii = wfii.Append(fmt.Errorf("failed to resolve step with ID %d", path.ParentID), map[string]int{"path": pos}) + continue + } + + if len(wfii) > 0 { + // pointless to fill the if there are errors + continue + } + + g.AddParent( + g.StepByID(path.ChildID), + g.StepByID(path.ParentID), + ) + } + + if len(wfii) > 0 { + return nil, wfii + } + + return g, nil +} + +// converts all step definitions into workflow.Step instances +// +// if this func returns nil for step and error, assume unresolved dependencies +func (svc workflowConverter) workflowStepDefConv(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (bool, error) { + conv, err := func() (wfexec.Step, error) { + switch s.Kind { + case types.WorkflowStepKindVisual: + return nil, nil + case types.WorkflowStepKindDebug: + return svc.convDebugStep(s) + + case types.WorkflowStepKindExpressions: + return svc.convExpressionStep(s) + + case types.WorkflowStepKindGateway: + return svc.convGateway(g, s, in, out) + + case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator: + return svc.convFunctionStep(g, s, out) + + case types.WorkflowStepKindError: + return svc.convErrorStep(s, out) + + case types.WorkflowStepKindTermination: + return svc.convTerminationStep(out) + + case types.WorkflowStepKindPrompt: + return svc.convPromptStep(s) + + case types.WorkflowStepKindErrHandler: + return svc.convErrorHandlerStep(g, out) + + default: + return nil, errors.Internal("unsupported step kind %q", s.Kind) + } + }() + + if err != nil { + return false, err + } else if conv != nil { + conv.SetID(s.ID) + g.AddStep(conv) + return true, err + } else { + // signal caller that we were unable to + // resolve definition at the moment + return false, nil + } +} + +func (svc workflowConverter) convGateway(g *wfexec.Graph, s *types.WorkflowStep, in, out []*types.WorkflowPath) (wfexec.Step, error) { + switch s.Ref { + case "fork": + return wfexec.ForkGateway(), nil + + case "join": + var ( + ss []wfexec.Step + ) + for _, p := range in { + if parent := g.StepByID(p.ParentID); parent != nil { + ss = append(ss, parent) + } else { + // unresolved parent, come back later. + return nil, nil + } + } + + return wfexec.JoinGateway(ss...), nil + + case "incl", "excl": + var ( + pp []*wfexec.GatewayPath + ) + + for _, c := range out { + child := g.StepByID(c.ChildID) + if child == nil { + return nil, nil + } + + if len(c.Expr) > 0 { + if err := svc.parser.ParseEvaluators(c); err != nil { + return nil, err + } + } + + // wrapping with fn to make sure that we're dealing with the right wf path inside gw-path tester fn + err := func(c types.WorkflowPath) error { + p, err := wfexec.NewGatewayPath(child, func(ctx context.Context, scope *expr.Vars) (bool, error) { + if len(c.Expr) == 0 { + return true, nil + } + + return c.Test(ctx, scope) + }) + + if err != nil { + return err + } + + pp = append(pp, p) + return nil + }(*c) + + if err != nil { + return nil, err + } + } + + if s.Ref == "excl" { + return wfexec.ExclGateway(pp...) + } else { + return wfexec.InclGateway(pp...) + } + } + + return nil, fmt.Errorf("unknown gateway type") +} + +func (svc workflowConverter) convErrorHandlerStep(g *wfexec.Graph, out []*types.WorkflowPath) (wfexec.Step, error) { + switch len(out) { + case 0: + return nil, fmt.Errorf("expecting at least one path out of error handling step") + case 1: + // remove error handler + return types.ErrorHandlerStep(nil), nil + case 2: + errorHandler := g.StepByID(out[1].ChildID) + if errorHandler == nil { + // wait for it to be resolved + return nil, nil + } + + return types.ErrorHandlerStep(errorHandler), nil + + default: + // this might be extended in the future to allow different paths using expression + // but then again, this can be solved by gateway path following the error handling step + return nil, fmt.Errorf("max 2 paths out of error handling step") + } +} + +func (svc workflowConverter) convExpressionStep(s *types.WorkflowStep) (wfexec.Step, error) { + if err := svc.parseExpressions(s.Arguments...); err != nil { + return nil, err + } + + return types.ExpressionsStep(s.Arguments...), nil +} + +// internal debug step that can log entire +func (svc workflowConverter) convDebugStep(s *types.WorkflowStep) (wfexec.Step, error) { + if err := svc.parseExpressions(s.Arguments...); err != nil { + return nil, err + } + + return types.DebugStep(svc.log), nil +} + +func (svc workflowConverter) convFunctionStep(g *wfexec.Graph, s *types.WorkflowStep, out []*types.WorkflowPath) (wfexec.Step, error) { + if s.Ref == "" { + return nil, errors.Internal("function reference missing") + } + + reg := Registry() + + if def := reg.Function(s.Ref); def == nil { + return nil, errors.Internal("unknown function %q", s.Ref) + } else { + if def.Kind != string(s.Kind) { + return nil, fmt.Errorf("unexpected %s on %s step", def.Kind, s.Kind) + } + + var ( + err error + isIterator = def.Kind == types.FunctionKindIterator + ) + + if isIterator { + if len(out) != 2 { + return nil, fmt.Errorf("expecting exactly two paths (next, exit) out of iterator function step") + } + + if def.Iterator == nil { + return nil, errors.Internal("iterator handler for %q not set", s.Ref) + } + } else { + if def.Handler == nil { + return nil, errors.Internal("function handler for %q not set", s.Ref) + } + } + + if err = svc.parseExpressions(s.Arguments...); err != nil { + return nil, errors.Internal("failed to parse argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) + } else if err = def.Parameters.VerifyArguments(s.Arguments); err != nil { + return nil, errors.Internal("failed to verify argument expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) + } + + if err = svc.parseExpressions(s.Results...); err != nil { + return nil, errors.Internal("failed to parse result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) + } else if err = def.Results.VerifyResults(s.Results); err != nil { + return nil, errors.Internal("failed to verify result expressions for %s %s: %s", s.Kind, s.Ref, err).Wrap(err) + } + + if isIterator { + var ( + next = g.StepByID(out[0].ChildID) + exit = g.StepByID(out[1].ChildID) + ) + + if next == nil || exit == nil { + // wait for steps to be resolved + return nil, nil + } + + return types.IteratorStep(def, s.Arguments, s.Results, next, exit) + + } else { + return types.FunctionStep(def, s.Arguments, s.Results) + } + } +} + +// creates error step +// +// Expects ZERO outgoing paths and +func (svc workflowConverter) convErrorStep(s *types.WorkflowStep, out types.WorkflowPathSet) (wfexec.Step, error) { + const ( + argName = "message" + ) + + if len(out) > 0 { + return nil, errors.Internal("error step must be last step in branch") + } + + var ( + args = types.ExprSet(s.Arguments) + ) + + if msgArg := args.GetByTarget(argName); msgArg == nil { + return nil, errors.Internal("error step must have %s argument", argName) + } else if msgArg.Type != (expr.String{}).Type() { + return nil, errors.Internal("%s argument on error step must be string, got type '%s'", argName, msgArg.Type) + } else if len(args) > 1 { + return nil, errors.Internal("too many arguments on error step") + } + + if err := svc.parseExpressions(args...); err != nil { + return nil, err + } + + return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) { + var ( + msg string + result, err = args.Eval(ctx, r.Scope) + ) + if err != nil { + return nil, err + } + + if result.Has(argName) { + str, _ := expr.NewString(expr.Must(result.Select(argName))) + msg = str.GetValue() + } else { + if aux, is := args.GetByTarget(argName).Value.(string); is { + msg = aux + } else { + msg = "ERROR" + } + } + + return nil, errors.Automation(msg) + }), nil +} + +// converts prompt definition to wfexec.Step +func (svc workflowConverter) convTerminationStep(out types.WorkflowPathSet) (wfexec.Step, error) { + if len(out) > 0 { + return nil, errors.Internal("termination step must be last step in branch") + } + + return wfexec.NewGenericStep(func(ctx context.Context, r *wfexec.ExecRequest) (wfexec.ExecResponse, error) { + return wfexec.Termination(), nil + }), nil +} + +// converts prompt definition to wfexec.Step +func (svc workflowConverter) convPromptStep(s *types.WorkflowStep) (wfexec.Step, error) { + if err := svc.parseExpressions(s.Arguments...); err != nil { + return nil, err + } + + // Use expression step as base for prompt step + return types.PromptStep(s.Ref, types.ExpressionsStep(s.Arguments...)), nil +} + +func (svc workflowConverter) parseExpressions(ee ...*types.Expr) (err error) { + for _, e := range ee { + + if len(strings.TrimSpace(e.Expr)) > 0 { + if err = svc.parser.ParseEvaluators(e); err != nil { + return + } + } + + if err = e.SetType(exprTypeSetter(svc.reg, e)); err != nil { + return err + } + + for _, t := range e.Tests { + if err = svc.parser.ParseEvaluators(t); err != nil { + return + } + } + } + + return nil +} + +func verifyStep(step *types.WorkflowStep) types.WorkflowIssueSet { + var ( + ii = types.WorkflowIssueSet{} + + noArgs = func(s *types.WorkflowStep) error { + if len(s.Arguments) > 0 { + return errors.Internal("%s step does not accept arguments", s.Kind) + } + + return nil + } + + noResults = func(s *types.WorkflowStep) error { + if len(s.Results) > 0 { + return errors.Internal("%s step does not accept results", s.Kind) + } + + return nil + } + + checks = make([]func(s *types.WorkflowStep) error, 0) + ) + + switch step.Kind { + case types.WorkflowStepKindErrHandler: + checks = append(checks, noArgs, noResults) + + case types.WorkflowStepKindDebug: + checks = append(checks, noResults) + + case types.WorkflowStepKindVisual: + checks = append(checks, noArgs, noResults) + + case types.WorkflowStepKindExpressions: + checks = append(checks, noResults, func(s *types.WorkflowStep) error { + if len(s.Arguments) == 0 { + return errors.Internal("%s step require at least one argument", s.Kind) + } + + return nil + }) + + case types.WorkflowStepKindGateway: + checks = append(checks, noArgs, noResults) + + case types.WorkflowStepKindError: + checks = append(checks, noResults) + + case types.WorkflowStepKindTermination: + checks = append(checks, noArgs, noResults) + + case types.WorkflowStepKindFunction, types.WorkflowStepKindIterator: + + case types.WorkflowStepKindPrompt: + checks = append(checks, noResults) + + default: + return ii.Append(fmt.Errorf("unknown step kind"), nil) + } + + for _, check := range checks { + if err := check(step); err != nil { + ii = ii.Append(err, nil) + } + } + + return ii +} diff --git a/automation/types/type_set.gen.go b/automation/types/type_set.gen.go index 6d241bc1b..18c978bec 100644 --- a/automation/types/type_set.gen.go +++ b/automation/types/type_set.gen.go @@ -35,6 +35,11 @@ type ( // This type is auto-generated. WorkflowSet []*Workflow + // WorkflowIssueSet slice of WorkflowIssue + // + // This type is auto-generated. + WorkflowIssueSet []*WorkflowIssue + // WorkflowPathSet slice of WorkflowPath // // This type is auto-generated. @@ -300,6 +305,36 @@ func (set WorkflowSet) IDs() (IDs []uint64) { return } +// Walk iterates through every slice item and calls w(WorkflowIssue) err +// +// This function is auto-generated. +func (set WorkflowIssueSet) Walk(w func(*WorkflowIssue) error) (err error) { + for i := range set { + if err = w(set[i]); err != nil { + return + } + } + + return +} + +// Filter iterates through every slice item, calls f(WorkflowIssue) (bool, err) and return filtered slice +// +// This function is auto-generated. +func (set WorkflowIssueSet) Filter(f func(*WorkflowIssue) (bool, error)) (out WorkflowIssueSet, err error) { + var ok bool + out = WorkflowIssueSet{} + for i := range set { + if ok, err = f(set[i]); err != nil { + return + } else if ok { + out = append(out, set[i]) + } + } + + return +} + // Walk iterates through every slice item and calls w(WorkflowPath) err // // This function is auto-generated. diff --git a/automation/types/type_set.gen_test.go b/automation/types/type_set.gen_test.go index 592514027..4ececa3a5 100644 --- a/automation/types/type_set.gen_test.go +++ b/automation/types/type_set.gen_test.go @@ -430,6 +430,62 @@ func TestWorkflowSetIDs(t *testing.T) { } } +func TestWorkflowIssueSetWalk(t *testing.T) { + var ( + value = make(WorkflowIssueSet, 3) + req = require.New(t) + ) + + // check walk with no errors + { + err := value.Walk(func(*WorkflowIssue) error { + return nil + }) + req.NoError(err) + } + + // check walk with error + req.Error(value.Walk(func(*WorkflowIssue) error { return fmt.Errorf("walk error") })) +} + +func TestWorkflowIssueSetFilter(t *testing.T) { + var ( + value = make(WorkflowIssueSet, 3) + req = require.New(t) + ) + + // filter nothing + { + set, err := value.Filter(func(*WorkflowIssue) (bool, error) { + return true, nil + }) + req.NoError(err) + req.Equal(len(set), len(value)) + } + + // filter one item + { + found := false + set, err := value.Filter(func(*WorkflowIssue) (bool, error) { + if !found { + found = true + return found, nil + } + return false, nil + }) + req.NoError(err) + req.Len(set, 1) + } + + // filter error + { + _, err := value.Filter(func(*WorkflowIssue) (bool, error) { + return false, fmt.Errorf("filter error") + }) + req.Error(err) + } +} + func TestWorkflowPathSetWalk(t *testing.T) { var ( value = make(WorkflowPathSet, 3) diff --git a/automation/types/types.yaml b/automation/types/types.yaml index a6fdca969..4837d66e6 100644 --- a/automation/types/types.yaml +++ b/automation/types/types.yaml @@ -8,6 +8,8 @@ types: labelResourceType: workflow WorkflowPath: noIdField: true + WorkflowIssue: + noIdField: true WorkflowStep: {} Session: {} State: {} diff --git a/automation/types/workflow.go b/automation/types/workflow.go index c03fb3607..f177cc89b 100644 --- a/automation/types/workflow.go +++ b/automation/types/workflow.go @@ -31,6 +31,9 @@ type ( Steps WorkflowStepSet `json:"steps"` Paths WorkflowPathSet `json:"paths"` + // Collection of issues from the last parse + Issues WorkflowIssueSet `json:"issues,omitempty"` + RunAs uint64 `json:"runAs,string"` OwnedBy uint64 `json:"ownedBy,string"` @@ -70,6 +73,12 @@ type ( Visual map[string]interface{} `json:"visual"` } + WorkflowIssue struct { + // url encoded location of the error: + Culprit map[string]int `json:"culprit"` + Description string `json:"description"` + } + // WorkflowStep describes one workflow step WorkflowStep struct { ID uint64 `json:"stepID,string"` @@ -126,8 +135,6 @@ const ( WorkflowStepKindErrHandler WorkflowStepKind = "error-handler" // no ref WorkflowStepKindVisual WorkflowStepKind = "visual" // ref = <*> WorkflowStepKindDebug WorkflowStepKind = "debug" // ref = <*> - //WorkflowStepKindSubprocess WorkflowStepKind = "subprocess" - //WorkflowStepKindEvent WorkflowStepKind = "event" // ref = ?? ) // Resource returns a resource ID for this type @@ -159,6 +166,56 @@ func (vv *WorkflowMeta) Value() (driver.Value, error) { return json.Marshal(vv) } +// Scan on WorkflowStepSet gracefully handles conversion from NULL +func (set WorkflowIssueSet) Value() (driver.Value, error) { + return json.Marshal(set) +} + +func (set *WorkflowIssueSet) Scan(value interface{}) error { + //lint:ignore S1034 This typecast is intentional, we need to get []byte out of a []uint8 + switch value.(type) { + case nil: + *set = WorkflowIssueSet{} + case []uint8: + b := value.([]byte) + if err := json.Unmarshal(b, set); err != nil { + return fmt.Errorf("can not scan '%v' into WorkflowIssueSet: %w", string(b), err) + } + } + + return nil +} + +func (set WorkflowIssueSet) Error() string { + switch len(set) { + case 0: + return fmt.Sprintf("no workflow issue found") + case 1: + return fmt.Sprintf("1 workflow issue found") + default: + return fmt.Sprintf("%d workflow issues found", len(set)) + } +} + +func (set WorkflowIssueSet) Append(err error, culprit map[string]int) WorkflowIssueSet { + if culprit == nil { + culprit = make(map[string]int) + } + + return append(set, &WorkflowIssue{ + Culprit: culprit, + Description: err.Error(), + }) +} + +func (set WorkflowIssueSet) SetCulprit(name string, pos int) WorkflowIssueSet { + for i := range set { + set[i].Culprit[name] = pos + } + + return set +} + func (t WorkflowPath) GetExpr() string { return t.Expr } func (t *WorkflowPath) SetEval(eval expr.Evaluable) { t.eval = eval } func (t WorkflowPath) Eval(ctx context.Context, scope *expr.Vars) (interface{}, error) { diff --git a/store/automation_workflows.yaml b/store/automation_workflows.yaml index 10195395a..0965915f1 100644 --- a/store/automation_workflows.yaml +++ b/store/automation_workflows.yaml @@ -14,6 +14,7 @@ fields: - { field: Scope, type: "expr.Vars" } - { field: Steps, type: "expr.Vars" } - { field: Paths, type: "expr.Vars" } + - { field: Issues, type: "WorkflowIssueSet" } - { field: RunAs, type: "uint64" } - { field: OwnedBy } - { field: CreatedBy } diff --git a/store/rdbms/automation_workflows.gen.go b/store/rdbms/automation_workflows.gen.go index 05b85c9aa..d27c726d0 100644 --- a/store/rdbms/automation_workflows.gen.go +++ b/store/rdbms/automation_workflows.gen.go @@ -451,6 +451,7 @@ func (s Store) internalAutomationWorkflowRowScanner(row rowScanner) (res *types. &res.Scope, &res.Steps, &res.Paths, + &res.Issues, &res.RunAs, &res.OwnedBy, &res.CreatedBy, @@ -507,6 +508,7 @@ func (Store) automationWorkflowColumns(aa ...string) []string { alias + "scope", alias + "steps", alias + "paths", + alias + "issues", alias + "run_as", alias + "owned_by", alias + "created_by", @@ -544,6 +546,7 @@ func (s Store) internalAutomationWorkflowEncoder(res *types.Workflow) store.Payl "scope": res.Scope, "steps": res.Steps, "paths": res.Paths, + "issues": res.Issues, "run_as": res.RunAs, "owned_by": res.OwnedBy, "created_by": res.CreatedBy, diff --git a/store/rdbms/rdbms_schema.go b/store/rdbms/rdbms_schema.go index 04817387b..861eab9d8 100644 --- a/store/rdbms/rdbms_schema.go +++ b/store/rdbms/rdbms_schema.go @@ -581,6 +581,7 @@ func (Schema) AutomationWorkflows() *Table { ColumnDef("scope", ColumnTypeJson), ColumnDef("steps", ColumnTypeJson), ColumnDef("paths", ColumnTypeJson), + ColumnDef("issues", ColumnTypeJson), ColumnDef("run_as", ColumnTypeIdentifier), ColumnDef("owned_by", ColumnTypeIdentifier), CUDTimestamps,