From 0b1c61ba86742281db7f7fcf111313fecd91a49d Mon Sep 17 00:00:00 2001 From: Denis Arh Date: Thu, 18 Mar 2021 20:17:36 +0100 Subject: [PATCH] Support for wf exec tracing --- automation/rest.yaml | 2 ++ automation/rest/request/workflow.go | 36 +++++++++++++++++++++++++++++ automation/rest/workflow.go | 2 ++ automation/service/session.go | 23 +++++++++--------- automation/service/workflow.go | 12 ++++++---- automation/types/session.go | 6 ++--- pkg/wfexec/session.go | 25 +++++++++++--------- pkg/wfexec/state.go | 2 +- tests/automation/workflow_test.go | 2 +- 9 files changed, 77 insertions(+), 33 deletions(-) diff --git a/automation/rest.yaml b/automation/rest.yaml index dd642c1ef..17a697561 100644 --- a/automation/rest.yaml +++ b/automation/rest.yaml @@ -98,6 +98,8 @@ endpoints: - { name: stepID, type: uint64, required: true, title: "Step ID" } - { name: input, type: "*expr.Vars", title: "Input", parser: "types.ParseWorkflowVariables" } - { name: trace, type: bool, title: "Trace workflow execution" } + - { name: wait, type: bool, title: "Wait for workflow to complete" } + - { name: async, type: bool, title: "Execute step and return immediately" } - title: Triggers diff --git a/automation/rest/request/workflow.go b/automation/rest/request/workflow.go index c98d2ff63..e438a657d 100644 --- a/automation/rest/request/workflow.go +++ b/automation/rest/request/workflow.go @@ -255,6 +255,16 @@ type ( // // Trace workflow execution Trace bool + + // Wait POST parameter + // + // Wait for workflow to complete + Wait bool + + // Async POST parameter + // + // Execute step and return immediately + Async bool } ) @@ -1015,6 +1025,8 @@ func (r WorkflowExec) Auditable() map[string]interface{} { "stepID": r.StepID, "input": r.Input, "trace": r.Trace, + "wait": r.Wait, + "async": r.Async, } } @@ -1038,6 +1050,16 @@ func (r WorkflowExec) GetTrace() bool { return r.Trace } +// Auditable returns all auditable/loggable parameters +func (r WorkflowExec) GetWait() bool { + return r.Wait +} + +// Auditable returns all auditable/loggable parameters +func (r WorkflowExec) GetAsync() bool { + return r.Async +} + // Fill processes request and fills internal variables func (r *WorkflowExec) Fill(req *http.Request) (err error) { @@ -1084,6 +1106,20 @@ func (r *WorkflowExec) Fill(req *http.Request) (err error) { return err } } + + if val, ok := req.Form["wait"]; ok && len(val) > 0 { + r.Wait, err = payload.ParseBool(val[0]), nil + if err != nil { + return err + } + } + + if val, ok := req.Form["async"]; ok && len(val) > 0 { + r.Async, err = payload.ParseBool(val[0]), nil + if err != nil { + return err + } + } } { diff --git a/automation/rest/workflow.go b/automation/rest/workflow.go index 6d734d9bf..d99dc5aed 100644 --- a/automation/rest/workflow.go +++ b/automation/rest/workflow.go @@ -129,6 +129,8 @@ func (ctrl Workflow) Exec(ctx context.Context, r *request.WorkflowExec) (interfa StepID: r.StepID, Trace: r.Trace, Input: r.Input, + Async: r.Async, + Wait: r.Wait, }) return wep, err diff --git a/automation/service/session.go b/automation/service/session.go index 9f35d9ba2..c5ab3879f 100644 --- a/automation/service/session.go +++ b/automation/service/session.go @@ -37,7 +37,7 @@ type ( CanManageWorkflowSessions(context.Context, *types.Workflow) bool } - WaitFn func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, error) + WaitFn func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, types.Stacktrace, error) ) func Session(log *zap.Logger) *session { @@ -111,15 +111,6 @@ func (svc *session) suspendAll(ctx context.Context) error { return nil } -type ( - foo struct { - SessionID uint64 - StateID uint64 - CreatedAt time.Time - Payload *expr.Vars - } -) - // PendingPrompts returns all prompts on all sessions owned by current user func (svc *session) PendingPrompts(ctx context.Context) (pp []*wfexec.PendingPrompt) { var ( @@ -182,7 +173,9 @@ func (svc *session) Start(g *wfexec.Graph, i auth.Identifiable, ssp types.Sessio return } - return func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, error) { return ses.WaitResults(ctx) }, nil + return func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, types.Stacktrace, error) { + return ses.WaitResults(ctx) + }, nil } // Resume resumes suspended session/state @@ -267,8 +260,14 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa frame = state.MakeFrame() ) + // Stacktrace will be set to !nil if frame collection is needed if ses.Stacktrace != nil { - // Stacktrace will be set to !nil if frame collection is needed + + if len(ses.Stacktrace) > 0 { + // calculate how long it took to get to this step + frame.ElapsedTime = uint(frame.CreatedAt.Sub(ses.Stacktrace[0].CreatedAt) / time.Millisecond) + } + ses.Stacktrace = append(ses.Stacktrace, frame) } diff --git a/automation/service/workflow.go b/automation/service/workflow.go index 93bab9e0b..23c28e714 100644 --- a/automation/service/workflow.go +++ b/automation/service/workflow.go @@ -484,6 +484,8 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl t *types.Trigger results *expr.Vars wait WaitFn + + stacktrace types.Stacktrace ) err := func() (err error) { @@ -536,7 +538,7 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl ssp := types.SessionStartParams{ WorkflowID: wf.ID, KeepFor: wf.KeepSessions, - Trace: wf.Trace, + Trace: wf.Trace || p.Trace, StepID: p.StepID, } @@ -583,7 +585,7 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl } if !p.Async { - if p.Wait || wf.CheckDeferred() { + if !p.Wait && wf.CheckDeferred() { // deferred workflow, return right away and keep the workflow session // running without waiting for the execution return nil @@ -593,11 +595,11 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl // wait for the workflow to complete // reuse scope for results // this will be decoded back to event properties - results, _, err = wait(ctx) + results, _, stacktrace, err = wait(ctx) return }() - return results, nil, svc.recordAction(ctx, wap, WorkflowActionExecute, err) + return results, stacktrace, svc.recordAction(ctx, wap, WorkflowActionExecute, err) } func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger, wf *types.Workflow, g *wfexec.Graph, runAs intAuth.Identifiable) eventbus.HandlerFn { @@ -655,7 +657,7 @@ func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger // wait for the workflow to complete // reuse scope for results // this will be decoded back to event properties - scope, _, err = wait(ctx) + scope, _, _, err = wait(ctx) if err != nil { return } diff --git a/automation/types/session.go b/automation/types/session.go index 4ac0e230c..e094cdb5b 100644 --- a/automation/types/session.go +++ b/automation/types/session.go @@ -103,12 +103,12 @@ func (s Session) PendingPrompts(ownerId uint64) []*wfexec.PendingPrompt { } // Wait blocks until workflow session is completed or fails (or context is canceled) and returns resuts -func (s Session) WaitResults(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, error) { +func (s *Session) WaitResults(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, Stacktrace, error) { if err := s.session.WaitUntil(ctx, wfexec.SessionFailed, wfexec.SessionCompleted); err != nil { - return nil, -1, err + return nil, -1, nil, err } - return s.session.Result(), s.session.Status(), nil + return s.session.Result(), s.session.Status(), s.Stacktrace, nil } func (s *Session) Apply(ssp SessionStartParams) { diff --git a/pkg/wfexec/session.go b/pkg/wfexec/session.go index 6f0fab977..2610ee5de 100644 --- a/pkg/wfexec/session.go +++ b/pkg/wfexec/session.go @@ -65,14 +65,19 @@ type ( sessionOpt func(*Session) Frame struct { - CreatedAt time.Time `json:"createdAt"` - SessionID uint64 `json:"sessionID"` - StateID uint64 `json:"stateID"` - Input *expr.Vars `json:"input"` - Scope *expr.Vars `json:"scope"` - ParentID uint64 `json:"parentID"` - StepID uint64 `json:"stepID"` - LeadTime time.Duration `json:"leadTime"` + CreatedAt time.Time `json:"createdAt"` + SessionID uint64 `json:"sessionID"` + StateID uint64 `json:"stateID"` + Input *expr.Vars `json:"input"` + Scope *expr.Vars `json:"scope"` + ParentID uint64 `json:"parentID"` + StepID uint64 `json:"stepID"` + + // How much time from the 1st step to the start of this step in milliseconds + ElapsedTime uint `json:"elapsedTime"` + + // How much time it took to execute this step in milliseconds + StepTime uint `json:"stepTime"` } // ExecRequest is passed to Exec() functions and contains all information @@ -305,7 +310,7 @@ func (s *Session) enqueue(ctx context.Context, st *State) error { } } -// does not wait for the whole wf to be complete but until: +// Wait does not wait for the whole wf to be complete but until: // - context timeout // - idle state // - error in error queue @@ -485,8 +490,6 @@ func (s *Session) exec(ctx context.Context, st *State) { // } //}() - s.eventHandler(SessionActive, st, s) - { if currLoop != nil && currLoop.Is(st.step) { result = currLoop diff --git a/pkg/wfexec/state.go b/pkg/wfexec/state.go index 51c83080a..d543a2403 100644 --- a/pkg/wfexec/state.go +++ b/pkg/wfexec/state.go @@ -137,7 +137,7 @@ func (s State) MakeFrame() *Frame { } if s.completed != nil { - f.LeadTime = s.completed.Sub(s.created) + f.StepTime = uint(s.completed.Sub(s.created) / time.Millisecond) } return f diff --git a/tests/automation/workflow_test.go b/tests/automation/workflow_test.go index f37189b22..75cb2cebe 100644 --- a/tests/automation/workflow_test.go +++ b/tests/automation/workflow_test.go @@ -186,7 +186,7 @@ func TestWorkflowCreateFull(t *testing.T) { Scope: expr.RVars{"foo": expr.Must(expr.NewString("bar"))}.Vars(), Steps: types.WorkflowStepSet{}, Paths: types.WorkflowPathSet{}, - RunAs: 42, + RunAs: 0, OwnedBy: 42, } )