diff --git a/automation/rest.yaml b/automation/rest.yaml index caf339b1a..dd642c1ef 100644 --- a/automation/rest.yaml +++ b/automation/rest.yaml @@ -88,6 +88,17 @@ endpoints: post: - { name: scope, type: "*expr.Vars", title: "Workflow meta data", parser: "types.ParseWorkflowVariables" } - { name: runAs, type: bool, required: true, title: "Is workflow enabled" } + - name: exec + method: POST + title: Executes workflow on a specific step (must be orphan step and connected to 'onManual' trigger) + path: "/{workflowID}/exec" + parameters: + path: [ { name: workflowID, type: uint64, required: true, title: "Workflow ID" } ] + post: + - { name: stepID, type: uint64, required: true, title: "Step ID" } + - { name: input, type: "*expr.Vars", title: "Input", parser: "types.ParseWorkflowVariables" } + - { name: trace, type: bool, title: "Trace workflow execution" } + - title: Triggers path: "/triggers" diff --git a/automation/rest/handlers/workflow.go b/automation/rest/handlers/workflow.go index fcd581873..81890c62e 100644 --- a/automation/rest/handlers/workflow.go +++ b/automation/rest/handlers/workflow.go @@ -26,6 +26,7 @@ type ( Delete(context.Context, *request.WorkflowDelete) (interface{}, error) Undelete(context.Context, *request.WorkflowUndelete) (interface{}, error) Test(context.Context, *request.WorkflowTest) (interface{}, error) + Exec(context.Context, *request.WorkflowExec) (interface{}, error) } // HTTP API interface @@ -37,6 +38,7 @@ type ( Delete func(http.ResponseWriter, *http.Request) Undelete func(http.ResponseWriter, *http.Request) Test func(http.ResponseWriter, *http.Request) + Exec func(http.ResponseWriter, *http.Request) } ) @@ -152,6 +154,22 @@ func NewWorkflow(h WorkflowAPI) *Workflow { return } + api.Send(w, r, value) + }, + Exec: func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + params := request.NewWorkflowExec() + if err := params.Fill(r); err != nil { + api.Send(w, r, err) + return + } + + value, err := h.Exec(r.Context(), params) + if err != nil { + api.Send(w, r, err) + return + } + api.Send(w, r, value) }, } @@ -167,5 +185,6 @@ func (h Workflow) MountRoutes(r chi.Router, middlewares ...func(http.Handler) ht r.Delete("/workflows/{workflowID}", h.Delete) r.Post("/workflows/{workflowID}/undelete", h.Undelete) r.Post("/workflows/{workflowID}/test", h.Test) + r.Post("/workflows/{workflowID}/exec", h.Exec) }) } diff --git a/automation/rest/request/workflow.go b/automation/rest/request/workflow.go index 8d580e3a2..c98d2ff63 100644 --- a/automation/rest/request/workflow.go +++ b/automation/rest/request/workflow.go @@ -234,6 +234,28 @@ type ( // Is workflow enabled RunAs bool } + + WorkflowExec struct { + // WorkflowID PATH parameter + // + // Workflow ID + WorkflowID uint64 `json:",string"` + + // StepID POST parameter + // + // Step ID + StepID uint64 `json:",string"` + + // Input POST parameter + // + // Input + Input *expr.Vars + + // Trace POST parameter + // + // Trace workflow execution + Trace bool + } ) // NewWorkflowList request @@ -980,3 +1002,101 @@ func (r *WorkflowTest) Fill(req *http.Request) (err error) { return err } + +// NewWorkflowExec request +func NewWorkflowExec() *WorkflowExec { + return &WorkflowExec{} +} + +// Auditable returns all auditable/loggable parameters +func (r WorkflowExec) Auditable() map[string]interface{} { + return map[string]interface{}{ + "workflowID": r.WorkflowID, + "stepID": r.StepID, + "input": r.Input, + "trace": r.Trace, + } +} + +// Auditable returns all auditable/loggable parameters +func (r WorkflowExec) GetWorkflowID() uint64 { + return r.WorkflowID +} + +// Auditable returns all auditable/loggable parameters +func (r WorkflowExec) GetStepID() uint64 { + return r.StepID +} + +// Auditable returns all auditable/loggable parameters +func (r WorkflowExec) GetInput() *expr.Vars { + return r.Input +} + +// Auditable returns all auditable/loggable parameters +func (r WorkflowExec) GetTrace() bool { + return r.Trace +} + +// Fill processes request and fills internal variables +func (r *WorkflowExec) Fill(req *http.Request) (err error) { + + if strings.ToLower(req.Header.Get("content-type")) == "application/json" { + err = json.NewDecoder(req.Body).Decode(r) + + switch { + case err == io.EOF: + err = nil + case err != nil: + return fmt.Errorf("error parsing http request body: %w", err) + } + } + + { + if err = req.ParseForm(); err != nil { + return err + } + + // POST params + + if val, ok := req.Form["stepID"]; ok && len(val) > 0 { + r.StepID, err = payload.ParseUint64(val[0]), nil + if err != nil { + return err + } + } + + if val, ok := req.Form["input[]"]; ok { + r.Input, err = types.ParseWorkflowVariables(val) + if err != nil { + return err + } + } else if val, ok := req.Form["input"]; ok { + r.Input, err = types.ParseWorkflowVariables(val) + if err != nil { + return err + } + } + + if val, ok := req.Form["trace"]; ok && len(val) > 0 { + r.Trace, err = payload.ParseBool(val[0]), nil + if err != nil { + return err + } + } + } + + { + var val string + // path params + + val = chi.URLParam(req, "workflowID") + r.WorkflowID, err = payload.ParseUint64(val), nil + if err != nil { + return err + } + + } + + return err +} diff --git a/automation/rest/workflow.go b/automation/rest/workflow.go index 594223d64..6d734d9bf 100644 --- a/automation/rest/workflow.go +++ b/automation/rest/workflow.go @@ -7,6 +7,7 @@ import ( "github.com/cortezaproject/corteza-server/automation/service" "github.com/cortezaproject/corteza-server/automation/types" "github.com/cortezaproject/corteza-server/pkg/api" + "github.com/cortezaproject/corteza-server/pkg/expr" "github.com/cortezaproject/corteza-server/pkg/filter" "github.com/cortezaproject/corteza-server/pkg/payload" ) @@ -20,6 +21,7 @@ type ( Update(ctx context.Context, upd *types.Workflow) (*types.Workflow, error) DeleteByID(ctx context.Context, workflowID uint64) error UndeleteByID(ctx context.Context, workflowID uint64) error + Exec(ctx context.Context, workflowID uint64, p types.WorkflowExecParams) (*expr.Vars, types.Stacktrace, error) } } @@ -27,6 +29,11 @@ type ( Filter types.WorkflowFilter `json:"filter"` Set types.WorkflowSet `json:"set"` } + + workflowExecPayload struct { + Results *expr.Vars `json:"results"` + Trace types.Stacktrace `json:"trace,omitempty"` + } ) func (Workflow) New() *Workflow { @@ -112,6 +119,21 @@ func (ctrl Workflow) Undelete(ctx context.Context, r *request.WorkflowUndelete) return api.OK(), ctrl.svc.UndeleteByID(ctx, r.WorkflowID) } +func (ctrl Workflow) Exec(ctx context.Context, r *request.WorkflowExec) (interface{}, error) { + var ( + wep = &workflowExecPayload{} + err error + ) + + wep.Results, wep.Trace, err = ctrl.svc.Exec(ctx, r.WorkflowID, types.WorkflowExecParams{ + StepID: r.StepID, + Trace: r.Trace, + Input: r.Input, + }) + + return wep, err +} + func (ctrl Workflow) makeFilterPayload(ctx context.Context, uu types.WorkflowSet, f types.WorkflowFilter, err error) (*workflowSetPayload, error) { if err != nil { return nil, err diff --git a/automation/service/service.go b/automation/service/service.go index 12682f2c4..5fc5a0080 100644 --- a/automation/service/service.go +++ b/automation/service/service.go @@ -92,8 +92,8 @@ func Initialize(ctx context.Context, log *zap.Logger, s store.Storer, c Config) DefaultAccessControl = AccessControl(rbac.Global()) - DefaultWorkflow = Workflow(DefaultLogger.Named("workflow")) DefaultSession = Session(DefaultLogger.Named("session")) + DefaultWorkflow = Workflow(DefaultLogger.Named("workflow")) DefaultTrigger = Trigger(DefaultLogger.Named("trigger"), c.Workflow) DefaultWorkflow.triggers = DefaultTrigger diff --git a/automation/service/trigger.go b/automation/service/trigger.go index 01ee599a8..bd2cc0471 100644 --- a/automation/service/trigger.go +++ b/automation/service/trigger.go @@ -131,6 +131,32 @@ func (svc *trigger) Search(ctx context.Context, filter types.TriggerFilter) (rr return rr, filter, svc.recordAction(ctx, wap, TriggerActionSearch, err) } +// SearchOnManual finds first matching onManual trigger and returns it +// +// In case stepID is 0, first trigger is returned +func (svc *trigger) SearchOnManual(ctx context.Context, workflowID, stepID uint64) (*types.Trigger, error) { + tt, _, err := svc.Search(ctx, types.TriggerFilter{ + WorkflowID: []uint64{workflowID}, + EventType: "onManual", + }) + + if err != nil { + return nil, err + } + + if stepID == 0 && len(tt) > 0 { + return tt[0], nil + } + + for _, t := range tt { + if t.StepID == stepID { + return t, nil + } + } + + return nil, nil +} + func (svc *trigger) LookupByID(ctx context.Context, triggerID uint64) (res *types.Trigger, err error) { var ( wap = &triggerActionProps{trigger: &types.Trigger{ID: triggerID}} @@ -495,6 +521,11 @@ func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable continue } + if t.EventType == "onManual" { + // skip onManual trigger registration, + // we'll handle them directly + } + var ( cnstr eventbus.ConstraintMatcher ops = make([]eventbus.HandlerRegOp, 0, len(t.Constraints)+2) diff --git a/automation/service/workflow.go b/automation/service/workflow.go index e74f47b13..93bab9e0b 100644 --- a/automation/service/workflow.go +++ b/automation/service/workflow.go @@ -26,6 +26,7 @@ type ( actionlog actionlog.Recorder ac workflowAccessController triggers *trigger + session *session log *zap.Logger @@ -48,7 +49,11 @@ type ( CanDeleteWorkflow(context.Context, *types.Workflow) bool CanUndeleteWorkflow(context.Context, *types.Workflow) bool + CanManageWorkflowSessions(context.Context, *types.Workflow) bool + Grant(ctx context.Context, rr ...*rbac.Rule) error + + workflowExecController } workflowExecController interface { @@ -78,6 +83,7 @@ func Workflow(log *zap.Logger) *workflow { store: DefaultStore, ac: DefaultAccessControl, triggers: DefaultTrigger, + session: DefaultSession, eventbus: eventbus.Service(), wfgs: make(map[uint64]*wfexec.Graph), mux: &sync.RWMutex{}, @@ -277,15 +283,6 @@ func (svc *workflow) UndeleteByID(ctx context.Context, workflowID uint64) error })) } -// Start runs a new workflow -// -// Workflow execution is asynchronous operation. -func (svc *workflow) Start(ctx context.Context, workflowID uint64, scope *expr.Vars) error { - defer svc.mux.Unlock() - svc.mux.Lock() - return errors.Internal("pending implementation") -} - func (svc workflow) uniqueCheck(ctx context.Context, res *types.Workflow) (err error) { if res.Handle != "" { if e, _ := store.LookupAutomationWorkflowByHandle(ctx, svc.store, res.Handle); e != nil && e.ID != res.ID { @@ -479,9 +476,133 @@ func (svc *workflow) Load(ctx context.Context) error { return svc.triggers.registerWorkflows(ctx, wwf...) } +func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.WorkflowExecParams) (*expr.Vars, types.Stacktrace, error) { + var ( + runAs intAuth.Identifiable + wap = &workflowActionProps{} + wf *types.Workflow + t *types.Trigger + results *expr.Vars + wait WaitFn + ) + + err := func() (err error) { + wf, err = loadWorkflow(ctx, svc.store, workflowID) + if err != nil { + return + } + + wap.setWorkflow(wf) + if !svc.ac.CanExecuteWorkflow(ctx, wf) { + return WorkflowErrNotAllowedToExecute() + } + + // User wants to trace workflow execution + // This means we'll allow him to specify any (orphaned) step + // even if it's not linked to onManual trigger + if p.Trace && !svc.ac.CanManageWorkflowSessions(ctx, wf) { + return WorkflowErrNotAllowedToExecute() + } + + g, convErr := Convert(svc, wf) + if len(convErr) > 0 { + return convErr + } + + // Find the trigger. + t, err = func() (*types.Trigger, error) { + var tt types.TriggerSet + tt, _, err = svc.triggers.Search(ctx, types.TriggerFilter{WorkflowID: []uint64{workflowID}}) + if err != nil { + return nil, err + } + + if p.StepID == 0 && len(tt) > 0 { + return tt[0], nil + } else { + for _, tMatch := range tt { + if tMatch.StepID == p.StepID { + return tMatch, nil + } + } + } + + return nil, nil + }() + + // Start with workflow scope + scope := wf.Scope.Merge() + + ssp := types.SessionStartParams{ + WorkflowID: wf.ID, + KeepFor: wf.KeepSessions, + Trace: wf.Trace, + StepID: p.StepID, + } + + if !p.Trace { + if t == nil { + return WorkflowErrUnknownWorkflowStep() + } + } + + if t != nil { + wap.setTrigger(t) + + // Add trigger's input to scope + scope = scope.Merge(t.Input) + _ = scope.AssignFieldValue("eventType", expr.Must(expr.NewString(t.EventType))) + _ = scope.AssignFieldValue("resourceType", expr.Must(expr.NewString(t.ResourceType))) + + ssp.StepID = t.StepID + ssp.EventType = t.EventType + ssp.ResourceType = t.ResourceType + } else { + ssp.EventType = "onTrace" + ssp.ResourceType = "" + } + + // Finally, assign input values + ssp.Input = scope.Merge(p.Input) + + if wf.RunAs > 0 { + if runAs, err = DefaultUser.FindByID(ctx, wf.RunAs); err != nil { + return + } + } + + if runAs == nil { + // Default to current user + runAs = intAuth.GetIdentityFromContext(ctx) + } + + wait, err = svc.session.Start(g, runAs, ssp) + + if err != nil { + return err + } + + if !p.Async { + if p.Wait || wf.CheckDeferred() { + // deferred workflow, return right away and keep the workflow session + // running without waiting for the execution + return nil + } + } + + // wait for the workflow to complete + // reuse scope for results + // this will be decoded back to event properties + results, _, err = wait(ctx) + return + }() + + return results, nil, svc.recordAction(ctx, wap, WorkflowActionExecute, err) +} + func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger, wf *types.Workflow, g *wfexec.Graph, runAs intAuth.Identifiable) eventbus.HandlerFn { return func(ctx context.Context, ev eventbus.Event) (err error) { - if !ac.(*accessControl).CanExecuteWorkflow(ctx, wf) { + if !ac.CanExecuteWorkflow(ctx, wf) { return WorkflowErrNotAllowedToExecute() } diff --git a/automation/service/workflow_actions.gen.go b/automation/service/workflow_actions.gen.go index bc709eb89..18ab0cbab 100644 --- a/automation/service/workflow_actions.gen.go +++ b/automation/service/workflow_actions.gen.go @@ -23,6 +23,7 @@ type ( workflow *types.Workflow new *types.Workflow update *types.Workflow + trigger *types.Trigger filter *types.WorkflowFilter } @@ -84,6 +85,17 @@ func (p *workflowActionProps) setUpdate(update *types.Workflow) *workflowActionP return p } +// setTrigger updates workflowActionProps's trigger +// +// Allows method chaining +// +// This function is auto-generated. +// +func (p *workflowActionProps) setTrigger(trigger *types.Trigger) *workflowActionProps { + p.trigger = trigger + return p +} + // setFilter updates workflowActionProps's filter // // Allows method chaining @@ -116,6 +128,12 @@ func (p workflowActionProps) Serialize() actionlog.Meta { m.Set("update.handle", p.update.Handle, true) m.Set("update.ID", p.update.ID, true) } + if p.trigger != nil { + m.Set("trigger.eventType", p.trigger.EventType, true) + m.Set("trigger.resourceType", p.trigger.ResourceType, true) + m.Set("trigger.ID", p.trigger.ID, true) + m.Set("trigger.stepID", p.trigger.StepID, true) + } if p.filter != nil { } @@ -189,6 +207,24 @@ func (p workflowActionProps) Format(in string, err error) string { pairs = append(pairs, "{update.ID}", fns(p.update.ID)) } + if p.trigger != nil { + // replacement for "{trigger}" (in order how fields are defined) + pairs = append( + pairs, + "{trigger}", + fns( + p.trigger.EventType, + p.trigger.ResourceType, + p.trigger.ID, + p.trigger.StepID, + ), + ) + pairs = append(pairs, "{trigger.eventType}", fns(p.trigger.EventType)) + pairs = append(pairs, "{trigger.resourceType}", fns(p.trigger.ResourceType)) + pairs = append(pairs, "{trigger.ID}", fns(p.trigger.ID)) + pairs = append(pairs, "{trigger.stepID}", fns(p.trigger.StepID)) + } + if p.filter != nil { // replacement for "{filter}" (in order how fields are defined) pairs = append( @@ -352,6 +388,26 @@ func WorkflowActionUndelete(props ...*workflowActionProps) *workflowAction { return a } +// WorkflowActionExecute returns "automation:workflow.execute" action +// +// This function is auto-generated. +// +func WorkflowActionExecute(props ...*workflowActionProps) *workflowAction { + a := &workflowAction{ + timestamp: time.Now(), + resource: "automation:workflow", + action: "execute", + log: "{workflow} executed", + severity: actionlog.Info, + } + + if len(props) > 0 { + a.props = props[0] + } + + return a +} + // ********************************************************************************************************************* // ********************************************************************************************************************* // Error constructors @@ -732,6 +788,38 @@ func WorkflowErrNotAllowedToExecute(mm ...*workflowActionProps) *errors.Error { return e } +// WorkflowErrUnknownWorkflowStep returns "automation:workflow.unknownWorkflowStep" as *errors.Error +// +// +// This function is auto-generated. +// +func WorkflowErrUnknownWorkflowStep(mm ...*workflowActionProps) *errors.Error { + var p = &workflowActionProps{} + if len(mm) > 0 { + p = mm[0] + } + + var e = errors.New( + errors.KindInternal, + + p.Format("unknown workflow step", nil), + + errors.Meta("type", "unknownWorkflowStep"), + errors.Meta("resource", "automation:workflow"), + + // action log entry; no formatting, it will be applied inside recordAction fn. + errors.Meta(workflowLogMetaKey{}, "failed to execute {workflow}; unknown workflow step"), + errors.Meta(workflowPropsMetaKey{}, p), + + errors.StackSkip(1), + ) + + if len(mm) > 0 { + } + + return e +} + // WorkflowErrHandleNotUnique returns "automation:workflow.handleNotUnique" as *errors.Error // // diff --git a/automation/service/workflow_actions.yaml b/automation/service/workflow_actions.yaml index db8038924..bc0bca569 100644 --- a/automation/service/workflow_actions.yaml +++ b/automation/service/workflow_actions.yaml @@ -23,6 +23,9 @@ props: - name: update type: "*types.Workflow" fields: [ handle, ID ] + - name: trigger + type: "*types.Trigger" + fields: [ eventType, resourceType, ID, stepID, ] - name: filter type: "*types.WorkflowFilter" @@ -48,6 +51,10 @@ actions: - action: undelete log: "undeleted {workflow}" + - action: execute + # NOTE: only explicitly triggered workflow execution is logged + log: "{workflow} executed" + errors: - error: notFound message: "workflow not found" @@ -90,6 +97,10 @@ errors: message: "not allowed to execute this workflow" log: "failed to execute {workflow}; insufficient permissions" + - error: unknownWorkflowStep + message: "unknown workflow step" + log: "failed to execute {workflow}; unknown workflow step" + - error: handleNotUnique message: "workflow handle not unique" log: "duplicate handle used for workflow ({workflow})" diff --git a/automation/types/workflow.go b/automation/types/workflow.go index c62f4c63f..1152ac28a 100644 --- a/automation/types/workflow.go +++ b/automation/types/workflow.go @@ -77,6 +77,22 @@ type ( Culprit map[string]int `json:"culprit"` Description string `json:"description"` } + + WorkflowExecParams struct { + // Start with this specific step + StepID uint64 + + // Enable execution tracing + Trace bool + + // Do not wait for workflow to be finished + Async bool + + // Wait for workflow to be executed even if it's deferred + Wait bool + + Input *expr.Vars + } ) // Resource returns a resource ID for this type