From 2dbf536faaa352b33e43fb46d77e0435587aea9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Mon, 24 May 2021 15:55:32 +0200 Subject: [PATCH] Improve workflow error handling - prevent the try-catch step from ending the exec session - prevent pre-run validation checks to cycle the exec --- automation/service/session.go | 6 ++- automation/service/session_test.go | 40 ++++++++++++++ automation/types/session.go | 7 ++- pkg/wfexec/session.go | 38 ++++++++----- pkg/wfexec/session_test.go | 87 +++++++++++++++++++++++++++++- pkg/wfexec/state.go | 6 ++- 6 files changed, 166 insertions(+), 18 deletions(-) create mode 100644 automation/service/session_test.go diff --git a/automation/service/session.go b/automation/service/session.go index 0896e666f..22896a219 100644 --- a/automation/service/session.go +++ b/automation/service/session.go @@ -175,7 +175,9 @@ func (svc *session) Start(g *wfexec.Graph, i auth.Identifiable, ssp types.Sessio return nil, errors.InvalidData("cannot start workflow session multiple starting steps found") } } else if start = g.StepByID(ssp.StepID); start == nil { - return nil, errors.InvalidData("trigger staring step references nonexisting step") + return nil, errors.InvalidData("trigger staring step references non-existing step") + } else if len(g.Parents(g.StepByID(ssp.StepID))) > 0 { + return nil, errors.InvalidData("cannot start workflow on a step with parents") } var ( @@ -311,7 +313,7 @@ func (svc *session) gc() { pending1d++ } - if s.CompletedAt == nil { + if !s.GC() { continue } diff --git a/automation/service/session_test.go b/automation/service/session_test.go new file mode 100644 index 000000000..712ffed68 --- /dev/null +++ b/automation/service/session_test.go @@ -0,0 +1,40 @@ +package service + +import ( + "testing" + + "github.com/cortezaproject/corteza-server/automation/types" + "github.com/cortezaproject/corteza-server/pkg/wfexec" + "github.com/stretchr/testify/require" +) + +func TestSession_Start(t *testing.T) { + var ( + req = require.New(t) + ses = &session{} + g = wfexec.NewGraph() + err error + ) + + _, err = ses.Start(g, nil, types.SessionStartParams{}) + req.EqualError(err, "could not find starting step") + + g.AddStep(wfexec.NewGenericStep(nil)) + _, err = ses.Start(g, nil, types.SessionStartParams{StepID: 4321}) + req.EqualError(err, "trigger staring step references non-existing step") + + // Adding another orphaned step and starting session w/o explicitly specifying the starting step + g.AddStep(wfexec.NewGenericStep(nil)) + _, err = ses.Start(g, nil, types.SessionStartParams{}) + req.EqualError(err, "cannot start workflow session multiple starting steps found") + + // add a generic step with a known ID so we can use it as a starting point + s := wfexec.NewGenericStep(nil) + s.SetID(42) + g.AddStep(s) + // add parents to the 42 step + g.AddStep(wfexec.NewGenericStep(nil), s) + _, err = ses.Start(g, nil, types.SessionStartParams{StepID: 42}) + req.EqualError(err, "cannot start workflow on a step with parents") + +} diff --git a/automation/types/session.go b/automation/types/session.go index d564185fc..93b4ce754 100644 --- a/automation/types/session.go +++ b/automation/types/session.go @@ -5,10 +5,11 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "time" + "github.com/cortezaproject/corteza-server/pkg/expr" "github.com/cortezaproject/corteza-server/pkg/filter" "github.com/cortezaproject/corteza-server/pkg/wfexec" - "time" ) type ( @@ -111,6 +112,10 @@ func (s Session) PendingPrompts(ownerId uint64) []*wfexec.PendingPrompt { return s.session.UserPendingPrompts(ownerId) } +func (s Session) GC() bool { + return s.CompletedAt != nil || s.session.Error() != nil +} + // WaitResults wait blocks until workflow session is completed or fails (or context is canceled) and returns resuts func (s *Session) WaitResults(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, Stacktrace, error) { if err := s.session.WaitUntil(ctx, wfexec.SessionFailed, wfexec.SessionCompleted); err != nil { diff --git a/pkg/wfexec/session.go b/pkg/wfexec/session.go index 30053d711..433c02cc2 100644 --- a/pkg/wfexec/session.go +++ b/pkg/wfexec/session.go @@ -233,12 +233,22 @@ func (s *Session) Result() *expr.Vars { } func (s *Session) Exec(ctx context.Context, step Step, scope *expr.Vars) error { - if s.g.Len() == 0 { - return fmt.Errorf("refusing to execute without steps") - } + err := func() error { + if s.g.Len() == 0 { + return fmt.Errorf("refusing to execute without steps") + } - if len(s.g.Parents(step)) > 0 { - return fmt.Errorf("cannot execute step with parents") + if len(s.g.Parents(step)) > 0 { + return fmt.Errorf("cannot execute step with parents") + } + return nil + }() + + if err != nil { + // send nil to error queue to trigger worker shutdown + // session error must be set to update session status + s.qErr <- err + return err } if scope == nil { @@ -452,15 +462,18 @@ func (s *Session) worker(ctx context.Context) { ) s.mux.Lock() - s.mux.Unlock() + defer s.mux.Unlock() - // We need to force failed session status - // because it's not set early enough to pick it up with s.Status() - status = SessionFailed + // when the err handler is defined, the error was handled and should not kill the workflow + if !st.errHandled { + // We need to force failed session status + // because it's not set early enough to pick it up with s.Status() + status = SessionFailed - // pushing step execution error into error queue - // to break worker loop - s.qErr <- st.err + // pushing step execution error into error queue + // to break worker loop + s.qErr <- st.err + } } s.log.Debug( @@ -603,6 +616,7 @@ func (s *Session) exec(ctx context.Context, st *State) (err error) { // in case of another error in the error-handling branch eh := st.errHandler st.errHandler = nil + st.errHandled = true if err = s.enqueue(ctx, st.Next(eh, scope)); err != nil { log.Warn("unable to queue", zap.Error(err)) } diff --git a/pkg/wfexec/session_test.go b/pkg/wfexec/session_test.go index 57f320e7e..7f2309cd1 100644 --- a/pkg/wfexec/session_test.go +++ b/pkg/wfexec/session_test.go @@ -2,11 +2,13 @@ package wfexec import ( "context" + "fmt" + "testing" + "time" + "github.com/cortezaproject/corteza-server/pkg/expr" "github.com/stretchr/testify/require" "go.uber.org/atomic" - "testing" - "time" ) type ( @@ -189,7 +191,88 @@ func TestSession_Delays(t *testing.T) { req.Contains(ses.Result().Dict(), "waitForMoment") req.Contains(ses.Result().Dict(), "waitForInput") req.Equal("foo", expr.Must(expr.Select(ses.Result(), "input")).Get()) +} +func TestSession_ErrHandler(t *testing.T) { + var ( + ctx = context.Background() + req = require.New(t) + wf = NewGraph() + ses = NewSession( + ctx, + wf, + + // enable if you need to see what is going on + //SetLogger(logger.MakeDebugLogger()), + + // enable if you need to see what is going on + //SetHandler(func(status SessionStatus, state *State, session *Session) { + // if state.step != nil { + // println(state.step.(*sesTestStep).name) + // } + //}), + ) + + cb_1_1 = &sesTestStep{name: "catch-branch-1-1"} + cb_1_2 = &sesTestStep{name: "catch-branch-1-2"} + tb_1_1 = &sesTestStep{name: "try-branch-1-1"} + + eh_1 = &sesTestStep{name: "err-handler", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) { + return ErrorHandler(cb_1_1), nil + }} + er_1 = &sesTestStep{name: "err-raiser", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) { + return nil, fmt.Errorf("would-be-handled-error") + }} + + cb_2_1 = &sesTestStep{name: "catch-branch-2-1"} + cb_2_2 = &sesTestStep{name: "catch-branch-2-2"} + tb_2_1 = &sesTestStep{name: "try-branch-2-1"} + + eh_2 = &sesTestStep{name: "err-handler", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) { + return ErrorHandler(cb_2_1), nil + }} + er_2 = &sesTestStep{name: "err-raiser", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) { + return nil, fmt.Errorf("would-be-handled-error") + }} + ) + + wf.AddStep(eh_1, tb_1_1) // error handling step (entrypoint!) + wf.AddStep(tb_1_1) // add try step + wf.AddStep(tb_1_1, er_1) // add error raising step right after 1st step in try branch + wf.AddStep(cb_1_1, cb_1_2) // catch branch step 1 & 2 + + wf.AddStep(cb_1_2, eh_2) // 2nd error handling step right after 1st catch branch + wf.AddStep(eh_2, tb_2_1) // step in try branch + wf.AddStep(tb_2_1, er_2) // 2nd error raising step on 2nd try branch + wf.AddStep(cb_2_1, cb_2_2) // 2nd catch branch step 1 & 2 + + req.NoError(ses.Exec(ctx, eh_1, nil)) + + req.NoError(ses.Wait(ctx)) + + req.Equal( + "/try-branch-1-1/catch-branch-1-1/catch-branch-1-2/try-branch-2-1/catch-branch-2-1/catch-branch-2-2", + ses.Result().Dict()["path"], + ) +} + +func TestSession_ExecStepWithParents(t *testing.T) { + var ( + ctx = context.Background() + req = require.New(t) + wf = NewGraph() + ses = NewSession(ctx, wf) + + p = &sesTestStep{name: "p"} + c = &sesTestStep{name: "c"} + ) + + wf.AddStep(p, c) + + req.Equal(SessionActive, ses.Status()) + req.Error(ses.Exec(ctx, c, nil)) + req.Error(ses.Wait(ctx)) + req.Equal(SessionFailed, ses.Status()) } func bmSessionSimpleStepSequence(c uint64, b *testing.B) { diff --git a/pkg/wfexec/state.go b/pkg/wfexec/state.go index b24de6602..7f8819774 100644 --- a/pkg/wfexec/state.go +++ b/pkg/wfexec/state.go @@ -2,9 +2,10 @@ package wfexec import ( "encoding/json" + "time" + "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/expr" - "time" ) type ( @@ -47,6 +48,9 @@ type ( // error handling step errHandler Step + // error handled flag, this gets restarted on every new state! + errHandled bool + loops []Iterator action string