3
0

Improve workflow error handling

- prevent the try-catch step from ending the exec session
- prevent pre-run validation checks to cycle the exec
This commit is contained in:
Tomaž Jerman 2021-05-24 15:55:32 +02:00
parent 7bde43856e
commit 2dbf536faa
6 changed files with 166 additions and 18 deletions

View File

@ -175,7 +175,9 @@ func (svc *session) Start(g *wfexec.Graph, i auth.Identifiable, ssp types.Sessio
return nil, errors.InvalidData("cannot start workflow session multiple starting steps found")
}
} else if start = g.StepByID(ssp.StepID); start == nil {
return nil, errors.InvalidData("trigger staring step references nonexisting step")
return nil, errors.InvalidData("trigger staring step references non-existing step")
} else if len(g.Parents(g.StepByID(ssp.StepID))) > 0 {
return nil, errors.InvalidData("cannot start workflow on a step with parents")
}
var (
@ -311,7 +313,7 @@ func (svc *session) gc() {
pending1d++
}
if s.CompletedAt == nil {
if !s.GC() {
continue
}

View File

@ -0,0 +1,40 @@
package service
import (
"testing"
"github.com/cortezaproject/corteza-server/automation/types"
"github.com/cortezaproject/corteza-server/pkg/wfexec"
"github.com/stretchr/testify/require"
)
func TestSession_Start(t *testing.T) {
var (
req = require.New(t)
ses = &session{}
g = wfexec.NewGraph()
err error
)
_, err = ses.Start(g, nil, types.SessionStartParams{})
req.EqualError(err, "could not find starting step")
g.AddStep(wfexec.NewGenericStep(nil))
_, err = ses.Start(g, nil, types.SessionStartParams{StepID: 4321})
req.EqualError(err, "trigger staring step references non-existing step")
// Adding another orphaned step and starting session w/o explicitly specifying the starting step
g.AddStep(wfexec.NewGenericStep(nil))
_, err = ses.Start(g, nil, types.SessionStartParams{})
req.EqualError(err, "cannot start workflow session multiple starting steps found")
// add a generic step with a known ID so we can use it as a starting point
s := wfexec.NewGenericStep(nil)
s.SetID(42)
g.AddStep(s)
// add parents to the 42 step
g.AddStep(wfexec.NewGenericStep(nil), s)
_, err = ses.Start(g, nil, types.SessionStartParams{StepID: 42})
req.EqualError(err, "cannot start workflow on a step with parents")
}

View File

@ -5,10 +5,11 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
"time"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/wfexec"
"time"
)
type (
@ -111,6 +112,10 @@ func (s Session) PendingPrompts(ownerId uint64) []*wfexec.PendingPrompt {
return s.session.UserPendingPrompts(ownerId)
}
func (s Session) GC() bool {
return s.CompletedAt != nil || s.session.Error() != nil
}
// WaitResults wait blocks until workflow session is completed or fails (or context is canceled) and returns resuts
func (s *Session) WaitResults(ctx context.Context) (*expr.Vars, wfexec.SessionStatus, Stacktrace, error) {
if err := s.session.WaitUntil(ctx, wfexec.SessionFailed, wfexec.SessionCompleted); err != nil {

View File

@ -233,12 +233,22 @@ func (s *Session) Result() *expr.Vars {
}
func (s *Session) Exec(ctx context.Context, step Step, scope *expr.Vars) error {
if s.g.Len() == 0 {
return fmt.Errorf("refusing to execute without steps")
}
err := func() error {
if s.g.Len() == 0 {
return fmt.Errorf("refusing to execute without steps")
}
if len(s.g.Parents(step)) > 0 {
return fmt.Errorf("cannot execute step with parents")
if len(s.g.Parents(step)) > 0 {
return fmt.Errorf("cannot execute step with parents")
}
return nil
}()
if err != nil {
// send nil to error queue to trigger worker shutdown
// session error must be set to update session status
s.qErr <- err
return err
}
if scope == nil {
@ -452,15 +462,18 @@ func (s *Session) worker(ctx context.Context) {
)
s.mux.Lock()
s.mux.Unlock()
defer s.mux.Unlock()
// We need to force failed session status
// because it's not set early enough to pick it up with s.Status()
status = SessionFailed
// when the err handler is defined, the error was handled and should not kill the workflow
if !st.errHandled {
// We need to force failed session status
// because it's not set early enough to pick it up with s.Status()
status = SessionFailed
// pushing step execution error into error queue
// to break worker loop
s.qErr <- st.err
// pushing step execution error into error queue
// to break worker loop
s.qErr <- st.err
}
}
s.log.Debug(
@ -603,6 +616,7 @@ func (s *Session) exec(ctx context.Context, st *State) (err error) {
// in case of another error in the error-handling branch
eh := st.errHandler
st.errHandler = nil
st.errHandled = true
if err = s.enqueue(ctx, st.Next(eh, scope)); err != nil {
log.Warn("unable to queue", zap.Error(err))
}

View File

@ -2,11 +2,13 @@ package wfexec
import (
"context"
"fmt"
"testing"
"time"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"testing"
"time"
)
type (
@ -189,7 +191,88 @@ func TestSession_Delays(t *testing.T) {
req.Contains(ses.Result().Dict(), "waitForMoment")
req.Contains(ses.Result().Dict(), "waitForInput")
req.Equal("foo", expr.Must(expr.Select(ses.Result(), "input")).Get())
}
func TestSession_ErrHandler(t *testing.T) {
var (
ctx = context.Background()
req = require.New(t)
wf = NewGraph()
ses = NewSession(
ctx,
wf,
// enable if you need to see what is going on
//SetLogger(logger.MakeDebugLogger()),
// enable if you need to see what is going on
//SetHandler(func(status SessionStatus, state *State, session *Session) {
// if state.step != nil {
// println(state.step.(*sesTestStep).name)
// }
//}),
)
cb_1_1 = &sesTestStep{name: "catch-branch-1-1"}
cb_1_2 = &sesTestStep{name: "catch-branch-1-2"}
tb_1_1 = &sesTestStep{name: "try-branch-1-1"}
eh_1 = &sesTestStep{name: "err-handler", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) {
return ErrorHandler(cb_1_1), nil
}}
er_1 = &sesTestStep{name: "err-raiser", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) {
return nil, fmt.Errorf("would-be-handled-error")
}}
cb_2_1 = &sesTestStep{name: "catch-branch-2-1"}
cb_2_2 = &sesTestStep{name: "catch-branch-2-2"}
tb_2_1 = &sesTestStep{name: "try-branch-2-1"}
eh_2 = &sesTestStep{name: "err-handler", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) {
return ErrorHandler(cb_2_1), nil
}}
er_2 = &sesTestStep{name: "err-raiser", exec: func(ctx context.Context, request *ExecRequest) (ExecResponse, error) {
return nil, fmt.Errorf("would-be-handled-error")
}}
)
wf.AddStep(eh_1, tb_1_1) // error handling step (entrypoint!)
wf.AddStep(tb_1_1) // add try step
wf.AddStep(tb_1_1, er_1) // add error raising step right after 1st step in try branch
wf.AddStep(cb_1_1, cb_1_2) // catch branch step 1 & 2
wf.AddStep(cb_1_2, eh_2) // 2nd error handling step right after 1st catch branch
wf.AddStep(eh_2, tb_2_1) // step in try branch
wf.AddStep(tb_2_1, er_2) // 2nd error raising step on 2nd try branch
wf.AddStep(cb_2_1, cb_2_2) // 2nd catch branch step 1 & 2
req.NoError(ses.Exec(ctx, eh_1, nil))
req.NoError(ses.Wait(ctx))
req.Equal(
"/try-branch-1-1/catch-branch-1-1/catch-branch-1-2/try-branch-2-1/catch-branch-2-1/catch-branch-2-2",
ses.Result().Dict()["path"],
)
}
func TestSession_ExecStepWithParents(t *testing.T) {
var (
ctx = context.Background()
req = require.New(t)
wf = NewGraph()
ses = NewSession(ctx, wf)
p = &sesTestStep{name: "p"}
c = &sesTestStep{name: "c"}
)
wf.AddStep(p, c)
req.Equal(SessionActive, ses.Status())
req.Error(ses.Exec(ctx, c, nil))
req.Error(ses.Wait(ctx))
req.Equal(SessionFailed, ses.Status())
}
func bmSessionSimpleStepSequence(c uint64, b *testing.B) {

View File

@ -2,9 +2,10 @@ package wfexec
import (
"encoding/json"
"time"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/expr"
"time"
)
type (
@ -47,6 +48,9 @@ type (
// error handling step
errHandler Step
// error handled flag, this gets restarted on every new state!
errHandled bool
loops []Iterator
action string