From 2287894203d5345b0e5913ab1bbbe816209cb33b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Tue, 5 Nov 2024 15:01:57 +0100 Subject: [PATCH] Add panic recovery to wf exec --- server/automation/service/session.go | 16 +++++++++++++- server/pkg/wfexec/session.go | 32 ++++++++++++++++++++++++---- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/server/automation/service/session.go b/server/automation/service/session.go index 4f3f25a7b..9e09d933f 100644 --- a/server/automation/service/session.go +++ b/server/automation/service/session.go @@ -2,6 +2,7 @@ package service import ( "context" + "fmt" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/cortezaproject/corteza/server/pkg/sentry" "github.com/cortezaproject/corteza/server/pkg/wfexec" "github.com/cortezaproject/corteza/server/store" + "github.com/modern-go/reflect2" "go.uber.org/zap" ) @@ -448,10 +450,20 @@ func (svc *session) logPending() { // stateChangeHandler keeps track of session status changes and frequently stores session into db func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHandler { - return func(status wfexec.SessionStatus, state *wfexec.State, s *wfexec.Session) { + return func(status wfexec.SessionStatus, state *wfexec.State, s *wfexec.Session) (err error) { svc.mux.Lock() defer svc.mux.Unlock() + // Handle potential state change handler panics and gracefully error out + defer func() { + reason := recover() + if reflect2.IsNil(reason) { + return + } + + err = fmt.Errorf("automation/service/session.go#stateChangeHandler panic: %v", reason) + }() + ses := svc.pool[s.ID()] if ses == nil { log := svc.log.With( @@ -549,6 +561,8 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa log.Error("failed to update session", zap.Error(err)) } + + return } } diff --git a/server/pkg/wfexec/session.go b/server/pkg/wfexec/session.go index 819c5a041..1cc64fedc 100644 --- a/server/pkg/wfexec/session.go +++ b/server/pkg/wfexec/session.go @@ -65,7 +65,7 @@ type ( callStack []uint64 } - StateChangeHandler func(SessionStatus, *State, *Session) + StateChangeHandler func(SessionStatus, *State, *Session) error SessionOpt func(*Session) @@ -180,8 +180,10 @@ func NewSession(ctx context.Context, g *Graph, oo ...SessionOpt) *Session { log: zap.NewNop(), - eventHandler: func(SessionStatus, *State, *Session) { + eventHandler: func(SessionStatus, *State, *Session) error { // noop + + return nil }, } @@ -484,7 +486,18 @@ func (s *Session) worker(ctx context.Context) { s.mux.Unlock() // Call event handler with completed status - s.eventHandler(SessionCompleted, st, s) + err := s.eventHandler(SessionCompleted, st, s) + if err != nil { + err = fmt.Errorf( + "workflow %d state change handler failed: %w", + s.workflowID, + err, + ) + s.log.Error(err.Error()) + + s.err = err + return + } return } @@ -544,7 +557,18 @@ func (s *Session) worker(ctx context.Context) { zap.Error(st.err), ) - s.eventHandler(status, st, s) + err = s.eventHandler(status, st, s) + if err != nil { + err = fmt.Errorf( + "workflow %d state change handler failed: %w", + s.workflowID, + err, + ) + s.log.Error(err.Error()) + + s.err = err + return + } for _, n := range nxt { if n.step != nil {