3
0

Support for wf exec tracing

This commit is contained in:
Denis Arh 2021-03-18 20:17:36 +01:00
parent 5faeff1ded
commit 0b1c61ba86
9 changed files with 77 additions and 33 deletions

View File

@ -98,6 +98,8 @@ endpoints:
- { name: stepID, type: uint64, required: true, title: "Step ID" }
- { name: input, type: "*expr.Vars", title: "Input", parser: "types.ParseWorkflowVariables" }
- { name: trace, type: bool, title: "Trace workflow execution" }
- { name: wait, type: bool, title: "Wait for workflow to complete" }
- { name: async, type: bool, title: "Execute step and return immediately" }
- title: Triggers

View File

@ -255,6 +255,16 @@ type (
//
// Trace workflow execution
Trace bool
// Wait POST parameter
//
// Wait for workflow to complete
Wait bool
// Async POST parameter
//
// Execute step and return immediately
Async bool
}
)
@ -1015,6 +1025,8 @@ func (r WorkflowExec) Auditable() map[string]interface{} {
"stepID": r.StepID,
"input": r.Input,
"trace": r.Trace,
"wait": r.Wait,
"async": r.Async,
}
}
@ -1038,6 +1050,16 @@ func (r WorkflowExec) GetTrace() bool {
return r.Trace
}
// Auditable returns all auditable/loggable parameters
func (r WorkflowExec) GetWait() bool {
return r.Wait
}
// Auditable returns all auditable/loggable parameters
func (r WorkflowExec) GetAsync() bool {
return r.Async
}
// Fill processes request and fills internal variables
func (r *WorkflowExec) Fill(req *http.Request) (err error) {
@ -1084,6 +1106,20 @@ func (r *WorkflowExec) Fill(req *http.Request) (err error) {
return err
}
}
if val, ok := req.Form["wait"]; ok && len(val) > 0 {
r.Wait, err = payload.ParseBool(val[0]), nil
if err != nil {
return err
}
}
if val, ok := req.Form["async"]; ok && len(val) > 0 {
r.Async, err = payload.ParseBool(val[0]), nil
if err != nil {
return err
}
}
}
{

View File

@ -129,6 +129,8 @@ func (ctrl Workflow) Exec(ctx context.Context, r *request.WorkflowExec) (interfa
StepID: r.StepID,
Trace: r.Trace,
Input: r.Input,
Async: r.Async,
Wait: r.Wait,
})
return wep, err

View File

@ -37,7 +37,7 @@ type (
CanManageWorkflowSessions(context.Context, *types.Workflow) bool
}
WaitFn func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, error)
WaitFn func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, types.Stacktrace, error)
)
func Session(log *zap.Logger) *session {
@ -111,15 +111,6 @@ func (svc *session) suspendAll(ctx context.Context) error {
return nil
}
type (
foo struct {
SessionID uint64
StateID uint64
CreatedAt time.Time
Payload *expr.Vars
}
)
// PendingPrompts returns all prompts on all sessions owned by current user
func (svc *session) PendingPrompts(ctx context.Context) (pp []*wfexec.PendingPrompt) {
var (
@ -182,7 +173,9 @@ func (svc *session) Start(g *wfexec.Graph, i auth.Identifiable, ssp types.Sessio
return
}
return func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, error) { return ses.WaitResults(ctx) }, nil
return func(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, types.Stacktrace, error) {
return ses.WaitResults(ctx)
}, nil
}
// Resume resumes suspended session/state
@ -267,8 +260,14 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa
frame = state.MakeFrame()
)
// Stacktrace will be set to !nil if frame collection is needed
if ses.Stacktrace != nil {
// Stacktrace will be set to !nil if frame collection is needed
if len(ses.Stacktrace) > 0 {
// calculate how long it took to get to this step
frame.ElapsedTime = uint(frame.CreatedAt.Sub(ses.Stacktrace[0].CreatedAt) / time.Millisecond)
}
ses.Stacktrace = append(ses.Stacktrace, frame)
}

View File

@ -484,6 +484,8 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl
t *types.Trigger
results *expr.Vars
wait WaitFn
stacktrace types.Stacktrace
)
err := func() (err error) {
@ -536,7 +538,7 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl
ssp := types.SessionStartParams{
WorkflowID: wf.ID,
KeepFor: wf.KeepSessions,
Trace: wf.Trace,
Trace: wf.Trace || p.Trace,
StepID: p.StepID,
}
@ -583,7 +585,7 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl
}
if !p.Async {
if p.Wait || wf.CheckDeferred() {
if !p.Wait && wf.CheckDeferred() {
// deferred workflow, return right away and keep the workflow session
// running without waiting for the execution
return nil
@ -593,11 +595,11 @@ func (svc *workflow) Exec(ctx context.Context, workflowID uint64, p types.Workfl
// wait for the workflow to complete
// reuse scope for results
// this will be decoded back to event properties
results, _, err = wait(ctx)
results, _, stacktrace, err = wait(ctx)
return
}()
return results, nil, svc.recordAction(ctx, wap, WorkflowActionExecute, err)
return results, stacktrace, svc.recordAction(ctx, wap, WorkflowActionExecute, err)
}
func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger, wf *types.Workflow, g *wfexec.Graph, runAs intAuth.Identifiable) eventbus.HandlerFn {
@ -655,7 +657,7 @@ func makeWorkflowHandler(ac workflowExecController, s *session, t *types.Trigger
// wait for the workflow to complete
// reuse scope for results
// this will be decoded back to event properties
scope, _, err = wait(ctx)
scope, _, _, err = wait(ctx)
if err != nil {
return
}

View File

@ -103,12 +103,12 @@ func (s Session) PendingPrompts(ownerId uint64) []*wfexec.PendingPrompt {
}
// Wait blocks until workflow session is completed or fails (or context is canceled) and returns resuts
func (s Session) WaitResults(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, error) {
func (s *Session) WaitResults(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, Stacktrace, error) {
if err := s.session.WaitUntil(ctx, wfexec.SessionFailed, wfexec.SessionCompleted); err != nil {
return nil, -1, err
return nil, -1, nil, err
}
return s.session.Result(), s.session.Status(), nil
return s.session.Result(), s.session.Status(), s.Stacktrace, nil
}
func (s *Session) Apply(ssp SessionStartParams) {

View File

@ -65,14 +65,19 @@ type (
sessionOpt func(*Session)
Frame struct {
CreatedAt time.Time `json:"createdAt"`
SessionID uint64 `json:"sessionID"`
StateID uint64 `json:"stateID"`
Input *expr.Vars `json:"input"`
Scope *expr.Vars `json:"scope"`
ParentID uint64 `json:"parentID"`
StepID uint64 `json:"stepID"`
LeadTime time.Duration `json:"leadTime"`
CreatedAt time.Time `json:"createdAt"`
SessionID uint64 `json:"sessionID"`
StateID uint64 `json:"stateID"`
Input *expr.Vars `json:"input"`
Scope *expr.Vars `json:"scope"`
ParentID uint64 `json:"parentID"`
StepID uint64 `json:"stepID"`
// How much time from the 1st step to the start of this step in milliseconds
ElapsedTime uint `json:"elapsedTime"`
// How much time it took to execute this step in milliseconds
StepTime uint `json:"stepTime"`
}
// ExecRequest is passed to Exec() functions and contains all information
@ -305,7 +310,7 @@ func (s *Session) enqueue(ctx context.Context, st *State) error {
}
}
// does not wait for the whole wf to be complete but until:
// Wait does not wait for the whole wf to be complete but until:
// - context timeout
// - idle state
// - error in error queue
@ -485,8 +490,6 @@ func (s *Session) exec(ctx context.Context, st *State) {
// }
//}()
s.eventHandler(SessionActive, st, s)
{
if currLoop != nil && currLoop.Is(st.step) {
result = currLoop

View File

@ -137,7 +137,7 @@ func (s State) MakeFrame() *Frame {
}
if s.completed != nil {
f.LeadTime = s.completed.Sub(s.created)
f.StepTime = uint(s.completed.Sub(s.created) / time.Millisecond)
}
return f

View File

@ -186,7 +186,7 @@ func TestWorkflowCreateFull(t *testing.T) {
Scope: expr.RVars{"foo": expr.Must(expr.NewString("bar"))}.Vars(),
Steps: types.WorkflowStepSet{},
Paths: types.WorkflowPathSet{},
RunAs: 42,
RunAs: 0,
OwnedBy: 42,
}
)