3
0

Implement explcit workflow exec (onManual, testing)

This commit is contained in:
Denis Arh 2021-03-18 18:59:09 +01:00
parent 9992c07073
commit 5faeff1ded
10 changed files with 450 additions and 11 deletions

View File

@ -88,6 +88,17 @@ endpoints:
post:
- { name: scope, type: "*expr.Vars", title: "Workflow meta data", parser: "types.ParseWorkflowVariables" }
- { name: runAs, type: bool, required: true, title: "Is workflow enabled" }
- name: exec
method: POST
title: Executes workflow on a specific step (must be orphan step and connected to 'onManual' trigger)
path: "/{workflowID}/exec"
parameters:
path: [ { name: workflowID, type: uint64, required: true, title: "Workflow ID" } ]
post:
- { name: stepID, type: uint64, required: true, title: "Step ID" }
- { name: input, type: "*expr.Vars", title: "Input", parser: "types.ParseWorkflowVariables" }
- { name: trace, type: bool, title: "Trace workflow execution" }
- title: Triggers
path: "/triggers"

View File

@ -26,6 +26,7 @@ type (
Delete(context.Context, *request.WorkflowDelete) (interface{}, error)
Undelete(context.Context, *request.WorkflowUndelete) (interface{}, error)
Test(context.Context, *request.WorkflowTest) (interface{}, error)
Exec(context.Context, *request.WorkflowExec) (interface{}, error)
}
// HTTP API interface
@ -37,6 +38,7 @@ type (
Delete func(http.ResponseWriter, *http.Request)
Undelete func(http.ResponseWriter, *http.Request)
Test func(http.ResponseWriter, *http.Request)
Exec func(http.ResponseWriter, *http.Request)
}
)
@ -152,6 +154,22 @@ func NewWorkflow(h WorkflowAPI) *Workflow {
return
}
api.Send(w, r, value)
},
Exec: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewWorkflowExec()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.Exec(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
}
api.Send(w, r, value)
},
}
@ -167,5 +185,6 @@ func (h Workflow) MountRoutes(r chi.Router, middlewares ...func(http.Handler) ht
r.Delete("/workflows/{workflowID}", h.Delete)
r.Post("/workflows/{workflowID}/undelete", h.Undelete)
r.Post("/workflows/{workflowID}/test", h.Test)
r.Post("/workflows/{workflowID}/exec", h.Exec)
})
}

View File

@ -234,6 +234,28 @@ type (
// Is workflow enabled
RunAs bool
}
WorkflowExec struct {
// WorkflowID PATH parameter
//
// Workflow ID
WorkflowID uint64 `json:",string"`
// StepID POST parameter
//
// Step ID
StepID uint64 `json:",string"`
// Input POST parameter
//
// Input
Input *expr.Vars
// Trace POST parameter
//
// Trace workflow execution
Trace bool
}
)
// NewWorkflowList request
@ -980,3 +1002,101 @@ func (r *WorkflowTest) Fill(req *http.Request) (err error) {
return err
}
// NewWorkflowExec request
func NewWorkflowExec() *WorkflowExec {
return &WorkflowExec{}
}
// Auditable returns all auditable/loggable parameters
func (r WorkflowExec) Auditable() map[string]interface{} {
return map[string]interface{}{
"workflowID": r.WorkflowID,
"stepID": r.StepID,
"input": r.Input,
"trace": r.Trace,
}
}
// Auditable returns all auditable/loggable parameters
func (r WorkflowExec) GetWorkflowID() uint64 {
return r.WorkflowID
}
// Auditable returns all auditable/loggable parameters
func (r WorkflowExec) GetStepID() uint64 {
return r.StepID
}
// Auditable returns all auditable/loggable parameters
func (r WorkflowExec) GetInput() *expr.Vars {
return r.Input
}
// Auditable returns all auditable/loggable parameters
func (r WorkflowExec) GetTrace() bool {
return r.Trace
}
// Fill processes request and fills internal variables
func (r *WorkflowExec) Fill(req *http.Request) (err error) {
if strings.ToLower(req.Header.Get("content-type")) == "application/json" {
err = json.NewDecoder(req.Body).Decode(r)
switch {
case err == io.EOF:
err = nil
case err != nil:
return fmt.Errorf("error parsing http request body: %w", err)
}
}
{
if err = req.ParseForm(); err != nil {
return err
}
// POST params
if val, ok := req.Form["stepID"]; ok && len(val) > 0 {
r.StepID, err = payload.ParseUint64(val[0]), nil
if err != nil {
return err
}
}
if val, ok := req.Form["input[]"]; ok {
r.Input, err = types.ParseWorkflowVariables(val)
if err != nil {
return err
}
} else if val, ok := req.Form["input"]; ok {
r.Input, err = types.ParseWorkflowVariables(val)
if err != nil {
return err
}
}
if val, ok := req.Form["trace"]; ok && len(val) > 0 {
r.Trace, err = payload.ParseBool(val[0]), nil
if err != nil {
return err
}
}
}
{
var val string
// path params
val = chi.URLParam(req, "workflowID")
r.WorkflowID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
}
return err
}

View File

@ -7,6 +7,7 @@ import (
"github.com/cortezaproject/corteza-server/automation/service"
"github.com/cortezaproject/corteza-server/automation/types"
"github.com/cortezaproject/corteza-server/pkg/api"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/payload"
)
@ -20,6 +21,7 @@ type (
Update(ctx context.Context, upd *types.Workflow) (*types.Workflow, error)
DeleteByID(ctx context.Context, workflowID uint64) error
UndeleteByID(ctx context.Context, workflowID uint64) error
Exec(ctx context.Context, workflowID uint64, p types.WorkflowExecParams) (*expr.Vars, types.Stacktrace, error)
}
}
@ -27,6 +29,11 @@ type (
Filter types.WorkflowFilter `json:"filter"`
Set types.WorkflowSet `json:"set"`
}
workflowExecPayload struct {
Results *expr.Vars `json:"results"`
Trace types.Stacktrace `json:"trace,omitempty"`
}
)
func (Workflow) New() *Workflow {
@ -112,6 +119,21 @@ func (ctrl Workflow) Undelete(ctx context.Context, r *request.WorkflowUndelete)
return api.OK(), ctrl.svc.UndeleteByID(ctx, r.WorkflowID)
}
func (ctrl Workflow) Exec(ctx context.Context, r *request.WorkflowExec) (interface{}, error) {
var (
wep = &workflowExecPayload{}
err error
)
wep.Results, wep.Trace, err = ctrl.svc.Exec(ctx, r.WorkflowID, types.WorkflowExecParams{
StepID: r.StepID,
Trace: r.Trace,
Input: r.Input,
})
return wep, err
}
func (ctrl Workflow) makeFilterPayload(ctx context.Context, uu types.WorkflowSet, f types.WorkflowFilter, err error) (*workflowSetPayload, error) {
if err != nil {
return nil, err

View File

@ -92,8 +92,8 @@ func Initialize(ctx context.Context, log *zap.Logger, s store.Storer, c Config)
DefaultAccessControl = AccessControl(rbac.Global())
DefaultWorkflow = Workflow(DefaultLogger.Named("workflow"))
DefaultSession = Session(DefaultLogger.Named("session"))
DefaultWorkflow = Workflow(DefaultLogger.Named("workflow"))
DefaultTrigger = Trigger(DefaultLogger.Named("trigger"), c.Workflow)
DefaultWorkflow.triggers = DefaultTrigger

View File

@ -131,6 +131,32 @@ func (svc *trigger) Search(ctx context.Context, filter types.TriggerFilter) (rr
return rr, filter, svc.recordAction(ctx, wap, TriggerActionSearch, err)
}
// SearchOnManual finds first matching onManual trigger and returns it
//
// In case stepID is 0, first trigger is returned
func (svc *trigger) SearchOnManual(ctx context.Context, workflowID, stepID uint64) (*types.Trigger, error) {
tt, _, err := svc.Search(ctx, types.TriggerFilter{
WorkflowID: []uint64{workflowID},
EventType: "onManual",
})
if err != nil {
return nil, err
}
if stepID == 0 && len(tt) > 0 {
return tt[0], nil
}
for _, t := range tt {
if t.StepID == stepID {
return t, nil
}
}
return nil, nil
}
func (svc *trigger) LookupByID(ctx context.Context, triggerID uint64) (res *types.Trigger, err error) {
var (
wap = &triggerActionProps{trigger: &types.Trigger{ID: triggerID}}
@ -495,6 +521,11 @@ func (svc *trigger) registerTriggers(wf *types.Workflow, runAs auth.Identifiable
continue
}
if t.EventType == "onManual" {
// skip onManual trigger registration,
// we'll handle them directly
}
var (
cnstr eventbus.ConstraintMatcher
ops = make([]eventbus.HandlerRegOp, 0, len(t.Constraints)+2)

View File

@ -26,6 +26,7 @@ type (
actionlog actionlog.Recorder
ac workflowAccessController
triggers *trigger
session *session
log *zap.Logger
@ -48,7 +49,11 @@ type (
CanDeleteWorkflow(context.Context, *types.Workflow) bool
CanUndeleteWorkflow(context.Context, *types.Workflow) bool
CanManageWorkflowSessions(context.Context, *types.Workflow) bool
Grant(ctx context.Context, rr ...*rbac.Rule) error
workflowExecController
}
workflowExecController interface {
@ -78,6 +83,7 @@ func Workflow(log *zap.Logger) *workflow {
store: DefaultStore,
ac: DefaultAccessControl,
triggers: DefaultTrigger,
session: DefaultSession,
eventbus: eventbus.Service(),
wfgs: make(map[uint64]*wfexec.Graph),
mux: &sync.RWMutex{},
@ -277,15 +283,6 @@ func (svc *workflow) UndeleteByID(ctx context.Context, workflowID uint64) error
}))
}
// Start runs a new workflow
//
// Workflow execution is asynchronous operation.
func (svc *workflow) Start(ctx context.Context, workflowID uint64, scope *expr.Vars) error {
defer svc.mux.Unlock()
svc.mux.Lock()
return errors.Internal("pending implementation")
}
func (svc workflow) uniqueCheck(ctx context.Context, res *types.Workflow) (err error) {
if res.Handle != "" {
if e, _ := store.LookupAutomationWorkflowByHandle(ctx, svc.store, res.Handle); e != nil && e.ID != res.ID {
@ -479,9 +476,133 @@ func (svc *workflow) Load(ctx context.Context) error {
return svc.triggers.registerWorkflows(ctx, wwf...)
}
func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.WorkflowExecParams) (*expr.Vars, types.Stacktrace, error) {
var (
runAs intAuth.Identifiable
wap = &workflowActionProps{}
wf *types.Workflow
t *types.Trigger
results *expr.Vars
wait WaitFn
)
err := func() (err error) {
wf, err = loadWorkflow(ctx, svc.store, workflowID)
if err != nil {
return
}
wap.setWorkflow(wf)
if !svc.ac.CanExecuteWorkflow(ctx, wf) {
return WorkflowErrNotAllowedToExecute()
}
// User wants to trace workflow execution
// This means we'll allow him to specify any (orphaned) step
// even if it's not linked to onManual trigger
if p.Trace && !svc.ac.CanManageWorkflowSessions(ctx, wf) {
return WorkflowErrNotAllowedToExecute()
}
g, convErr := Convert(svc, wf)
if len(convErr) > 0 {
return convErr
}
// Find the trigger.
t, err = func() (*types.Trigger, error) {
var tt types.TriggerSet
tt, _, err = svc.triggers.Search(ctx, types.TriggerFilter{WorkflowID: []uint64{workflowID}})
if err != nil {
return nil, err
}
if p.StepID == 0 && len(tt) > 0 {
return tt[0], nil
} else {
for _, tMatch := range tt {
if tMatch.StepID == p.StepID {
return tMatch, nil
}
}
}
return nil, nil
}()
// Start with workflow scope
scope := wf.Scope.Merge()
ssp := types.SessionStartParams{
WorkflowID: wf.ID,
KeepFor: wf.KeepSessions,
Trace: wf.Trace,
StepID: p.StepID,
}
if !p.Trace {
if t == nil {
return WorkflowErrUnknownWorkflowStep()
}
}
if t != nil {
wap.setTrigger(t)
// Add trigger's input to scope
scope = scope.Merge(t.Input)
_ = scope.AssignFieldValue("eventType", expr.Must(expr.NewString(t.EventType)))
_ = scope.AssignFieldValue("resourceType", expr.Must(expr.NewString(t.ResourceType)))
ssp.StepID = t.StepID
ssp.EventType = t.EventType
ssp.ResourceType = t.ResourceType
} else {
ssp.EventType = "onTrace"
ssp.ResourceType = ""
}
// Finally, assign input values
ssp.Input = scope.Merge(p.Input)
if wf.RunAs > 0 {
if runAs, err = DefaultUser.FindByID(ctx, wf.RunAs); err != nil {
return
}
}
if runAs == nil {
// Default to current user
runAs = intAuth.GetIdentityFromContext(ctx)
}
wait, err = svc.session.Start(g, runAs, ssp)
if err != nil {
return err
}
if !p.Async {
if p.Wait || wf.CheckDeferred() {
// deferred workflow, return right away and keep the workflow session
// running without waiting for the execution
return nil
}
}
// wait for the workflow to complete
// reuse scope for results
// this will be decoded back to event properties
results, _, err = wait(ctx)
return
}()
return results, nil, svc.recordAction(ctx, wap, WorkflowActionExecute, err)
}
func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger, wf *types.Workflow, g *wfexec.Graph, runAs intAuth.Identifiable) eventbus.HandlerFn {
return func(ctx context.Context, ev eventbus.Event) (err error) {
if !ac.(*accessControl).CanExecuteWorkflow(ctx, wf) {
if !ac.CanExecuteWorkflow(ctx, wf) {
return WorkflowErrNotAllowedToExecute()
}

View File

@ -23,6 +23,7 @@ type (
workflow *types.Workflow
new *types.Workflow
update *types.Workflow
trigger *types.Trigger
filter *types.WorkflowFilter
}
@ -84,6 +85,17 @@ func (p *workflowActionProps) setUpdate(update *types.Workflow) *workflowActionP
return p
}
// setTrigger updates workflowActionProps's trigger
//
// Allows method chaining
//
// This function is auto-generated.
//
func (p *workflowActionProps) setTrigger(trigger *types.Trigger) *workflowActionProps {
p.trigger = trigger
return p
}
// setFilter updates workflowActionProps's filter
//
// Allows method chaining
@ -116,6 +128,12 @@ func (p workflowActionProps) Serialize() actionlog.Meta {
m.Set("update.handle", p.update.Handle, true)
m.Set("update.ID", p.update.ID, true)
}
if p.trigger != nil {
m.Set("trigger.eventType", p.trigger.EventType, true)
m.Set("trigger.resourceType", p.trigger.ResourceType, true)
m.Set("trigger.ID", p.trigger.ID, true)
m.Set("trigger.stepID", p.trigger.StepID, true)
}
if p.filter != nil {
}
@ -189,6 +207,24 @@ func (p workflowActionProps) Format(in string, err error) string {
pairs = append(pairs, "{update.ID}", fns(p.update.ID))
}
if p.trigger != nil {
// replacement for "{trigger}" (in order how fields are defined)
pairs = append(
pairs,
"{trigger}",
fns(
p.trigger.EventType,
p.trigger.ResourceType,
p.trigger.ID,
p.trigger.StepID,
),
)
pairs = append(pairs, "{trigger.eventType}", fns(p.trigger.EventType))
pairs = append(pairs, "{trigger.resourceType}", fns(p.trigger.ResourceType))
pairs = append(pairs, "{trigger.ID}", fns(p.trigger.ID))
pairs = append(pairs, "{trigger.stepID}", fns(p.trigger.StepID))
}
if p.filter != nil {
// replacement for "{filter}" (in order how fields are defined)
pairs = append(
@ -352,6 +388,26 @@ func WorkflowActionUndelete(props ...*workflowActionProps) *workflowAction {
return a
}
// WorkflowActionExecute returns "automation:workflow.execute" action
//
// This function is auto-generated.
//
func WorkflowActionExecute(props ...*workflowActionProps) *workflowAction {
a := &workflowAction{
timestamp: time.Now(),
resource: "automation:workflow",
action: "execute",
log: "{workflow} executed",
severity: actionlog.Info,
}
if len(props) > 0 {
a.props = props[0]
}
return a
}
// *********************************************************************************************************************
// *********************************************************************************************************************
// Error constructors
@ -732,6 +788,38 @@ func WorkflowErrNotAllowedToExecute(mm ...*workflowActionProps) *errors.Error {
return e
}
// WorkflowErrUnknownWorkflowStep returns "automation:workflow.unknownWorkflowStep" as *errors.Error
//
//
// This function is auto-generated.
//
func WorkflowErrUnknownWorkflowStep(mm ...*workflowActionProps) *errors.Error {
var p = &workflowActionProps{}
if len(mm) > 0 {
p = mm[0]
}
var e = errors.New(
errors.KindInternal,
p.Format("unknown workflow step", nil),
errors.Meta("type", "unknownWorkflowStep"),
errors.Meta("resource", "automation:workflow"),
// action log entry; no formatting, it will be applied inside recordAction fn.
errors.Meta(workflowLogMetaKey{}, "failed to execute {workflow}; unknown workflow step"),
errors.Meta(workflowPropsMetaKey{}, p),
errors.StackSkip(1),
)
if len(mm) > 0 {
}
return e
}
// WorkflowErrHandleNotUnique returns "automation:workflow.handleNotUnique" as *errors.Error
//
//

View File

@ -23,6 +23,9 @@ props:
- name: update
type: "*types.Workflow"
fields: [ handle, ID ]
- name: trigger
type: "*types.Trigger"
fields: [ eventType, resourceType, ID, stepID, ]
- name: filter
type: "*types.WorkflowFilter"
@ -48,6 +51,10 @@ actions:
- action: undelete
log: "undeleted {workflow}"
- action: execute
# NOTE: only explicitly triggered workflow execution is logged
log: "{workflow} executed"
errors:
- error: notFound
message: "workflow not found"
@ -90,6 +97,10 @@ errors:
message: "not allowed to execute this workflow"
log: "failed to execute {workflow}; insufficient permissions"
- error: unknownWorkflowStep
message: "unknown workflow step"
log: "failed to execute {workflow}; unknown workflow step"
- error: handleNotUnique
message: "workflow handle not unique"
log: "duplicate handle used for workflow ({workflow})"

View File

@ -77,6 +77,22 @@ type (
Culprit map[string]int `json:"culprit"`
Description string `json:"description"`
}
WorkflowExecParams struct {
// Start with this specific step
StepID uint64
// Enable execution tracing
Trace bool
// Do not wait for workflow to be finished
Async bool
// Wait for workflow to be executed even if it's deferred
Wait bool
Input *expr.Vars
}
)
// Resource returns a resource ID for this type