3
0

Add panic recovery to wf exec

This commit is contained in:
Tomaž Jerman 2024-11-05 15:01:57 +01:00
parent 6c671983f8
commit 2287894203
2 changed files with 43 additions and 5 deletions

View File

@ -2,6 +2,7 @@ package service
import (
"context"
"fmt"
"sync"
"time"
@ -15,6 +16,7 @@ import (
"github.com/cortezaproject/corteza/server/pkg/sentry"
"github.com/cortezaproject/corteza/server/pkg/wfexec"
"github.com/cortezaproject/corteza/server/store"
"github.com/modern-go/reflect2"
"go.uber.org/zap"
)
@ -448,10 +450,20 @@ func (svc *session) logPending() {
// stateChangeHandler keeps track of session status changes and frequently stores session into db
func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHandler {
return func(status wfexec.SessionStatus, state *wfexec.State, s *wfexec.Session) {
return func(status wfexec.SessionStatus, state *wfexec.State, s *wfexec.Session) (err error) {
svc.mux.Lock()
defer svc.mux.Unlock()
// Handle potential state change handler panics and gracefully error out
defer func() {
reason := recover()
if reflect2.IsNil(reason) {
return
}
err = fmt.Errorf("automation/service/session.go#stateChangeHandler panic: %v", reason)
}()
ses := svc.pool[s.ID()]
if ses == nil {
log := svc.log.With(
@ -549,6 +561,8 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa
log.Error("failed to update session", zap.Error(err))
}
return
}
}

View File

@ -65,7 +65,7 @@ type (
callStack []uint64
}
StateChangeHandler func(SessionStatus, *State, *Session)
StateChangeHandler func(SessionStatus, *State, *Session) error
SessionOpt func(*Session)
@ -180,8 +180,10 @@ func NewSession(ctx context.Context, g *Graph, oo ...SessionOpt) *Session {
log: zap.NewNop(),
eventHandler: func(SessionStatus, *State, *Session) {
eventHandler: func(SessionStatus, *State, *Session) error {
// noop
return nil
},
}
@ -484,7 +486,18 @@ func (s *Session) worker(ctx context.Context) {
s.mux.Unlock()
// Call event handler with completed status
s.eventHandler(SessionCompleted, st, s)
err := s.eventHandler(SessionCompleted, st, s)
if err != nil {
err = fmt.Errorf(
"workflow %d state change handler failed: %w",
s.workflowID,
err,
)
s.log.Error(err.Error())
s.err = err
return
}
return
}
@ -544,7 +557,18 @@ func (s *Session) worker(ctx context.Context) {
zap.Error(st.err),
)
s.eventHandler(status, st, s)
err = s.eventHandler(status, st, s)
if err != nil {
err = fmt.Errorf(
"workflow %d state change handler failed: %w",
s.workflowID,
err,
)
s.log.Error(err.Error())
s.err = err
return
}
for _, n := range nxt {
if n.step != nil {