3
0

Workflow sessions can be canceled via API

This commit is contained in:
Denis Arh
2022-07-22 15:56:07 +02:00
parent 8e42876010
commit 14026ec47e
7 changed files with 119 additions and 192 deletions

View File

@@ -206,17 +206,21 @@ endpoints:
title: Read session details
path: "/{sessionID}"
parameters: { path: [ { name: sessionID, type: uint64, required: true, title: "Session ID" } ] }
- name: trace
method: GET
title: Read session trace info
path: "/{sessionID}/trace"
parameters: { path: [ { name: sessionID, type: uint64, required: true, title: "Session ID" } ] }
- name: delete
method: DELETE
title: Remove session
path: "/{sessionID}"
- name: cancel
method: POST
title: Cancel session
path: "/{sessionID}/cancel"
parameters: { path: [ { name: sessionID, type: uint64, required: true, title: "Session ID" } ] }
# not yet implemented
#
# - name: delete
# method: DELETE
# title: Remove session
# path: "/{sessionID}"
# parameters: { path: [ { name: sessionID, type: uint64, required: true, title: "Session ID" } ] }
- name: listPrompts
method: GET
title: Returns pending prompts from all sessions
@@ -233,14 +237,16 @@ endpoints:
post:
- { name: input, type: "*expr.Vars", title: "Prompt variables", parser: "types.ParseWorkflowVariables" }
- name: deleteState
method: DELETE
title: Cancel session's state
path: "/{sessionID}/state/{stateID}"
parameters:
path:
- { name: sessionID, type: uint64, required: true, title: "Session ID" }
- { name: stateID, type: uint64, required: true, title: "State ID" }
# not yet implemented
#
# - name: deleteState
# method: DELETE
# title: Cancel session's state
# path: "/{sessionID}/state/{stateID}"
# parameters:
# path:
# - { name: sessionID, type: uint64, required: true, title: "Session ID" }
# - { name: stateID, type: uint64, required: true, title: "State ID" }
- title: Functions
path: "/functions"

View File

@@ -21,22 +21,18 @@ type (
SessionAPI interface {
List(context.Context, *request.SessionList) (interface{}, error)
Read(context.Context, *request.SessionRead) (interface{}, error)
Trace(context.Context, *request.SessionTrace) (interface{}, error)
Delete(context.Context, *request.SessionDelete) (interface{}, error)
Cancel(context.Context, *request.SessionCancel) (interface{}, error)
ListPrompts(context.Context, *request.SessionListPrompts) (interface{}, error)
ResumeState(context.Context, *request.SessionResumeState) (interface{}, error)
DeleteState(context.Context, *request.SessionDeleteState) (interface{}, error)
}
// HTTP API interface
Session struct {
List func(http.ResponseWriter, *http.Request)
Read func(http.ResponseWriter, *http.Request)
Trace func(http.ResponseWriter, *http.Request)
Delete func(http.ResponseWriter, *http.Request)
Cancel func(http.ResponseWriter, *http.Request)
ListPrompts func(http.ResponseWriter, *http.Request)
ResumeState func(http.ResponseWriter, *http.Request)
DeleteState func(http.ResponseWriter, *http.Request)
}
)
@@ -74,31 +70,15 @@ func NewSession(h SessionAPI) *Session {
api.Send(w, r, value)
},
Trace: func(w http.ResponseWriter, r *http.Request) {
Cancel: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewSessionTrace()
params := request.NewSessionCancel()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.Trace(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
}
api.Send(w, r, value)
},
Delete: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewSessionDelete()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.Delete(r.Context(), params)
value, err := h.Cancel(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
@@ -136,22 +116,6 @@ func NewSession(h SessionAPI) *Session {
return
}
api.Send(w, r, value)
},
DeleteState: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
params := request.NewSessionDeleteState()
if err := params.Fill(r); err != nil {
api.Send(w, r, err)
return
}
value, err := h.DeleteState(r.Context(), params)
if err != nil {
api.Send(w, r, err)
return
}
api.Send(w, r, value)
},
}
@@ -162,10 +126,8 @@ func (h Session) MountRoutes(r chi.Router, middlewares ...func(http.Handler) htt
r.Use(middlewares...)
r.Get("/sessions/", h.List)
r.Get("/sessions/{sessionID}", h.Read)
r.Get("/sessions/{sessionID}/trace", h.Trace)
r.Delete("/sessions/{sessionID}", h.Delete)
r.Post("/sessions/{sessionID}/cancel", h.Cancel)
r.Get("/sessions/prompts", h.ListPrompts)
r.Post("/sessions/{sessionID}/state/{stateID}", h.ResumeState)
r.Delete("/sessions/{sessionID}/state/{stateID}", h.DeleteState)
})
}

View File

@@ -94,14 +94,7 @@ type (
SessionID uint64 `json:",string"`
}
SessionTrace struct {
// SessionID PATH parameter
//
// Session ID
SessionID uint64 `json:",string"`
}
SessionDelete struct {
SessionCancel struct {
// SessionID PATH parameter
//
// Session ID
@@ -127,18 +120,6 @@ type (
// Prompt variables
Input *expr.Vars
}
SessionDeleteState struct {
// SessionID PATH parameter
//
// Session ID
SessionID uint64 `json:",string"`
// StateID PATH parameter
//
// State ID
StateID uint64 `json:",string"`
}
)
// NewSessionList request
@@ -339,60 +320,25 @@ func (r *SessionRead) Fill(req *http.Request) (err error) {
return err
}
// NewSessionTrace request
func NewSessionTrace() *SessionTrace {
return &SessionTrace{}
// NewSessionCancel request
func NewSessionCancel() *SessionCancel {
return &SessionCancel{}
}
// Auditable returns all auditable/loggable parameters
func (r SessionTrace) Auditable() map[string]interface{} {
func (r SessionCancel) Auditable() map[string]interface{} {
return map[string]interface{}{
"sessionID": r.SessionID,
}
}
// Auditable returns all auditable/loggable parameters
func (r SessionTrace) GetSessionID() uint64 {
func (r SessionCancel) GetSessionID() uint64 {
return r.SessionID
}
// Fill processes request and fills internal variables
func (r *SessionTrace) Fill(req *http.Request) (err error) {
{
var val string
// path params
val = chi.URLParam(req, "sessionID")
r.SessionID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
}
return err
}
// NewSessionDelete request
func NewSessionDelete() *SessionDelete {
return &SessionDelete{}
}
// Auditable returns all auditable/loggable parameters
func (r SessionDelete) Auditable() map[string]interface{} {
return map[string]interface{}{
"sessionID": r.SessionID,
}
}
// Auditable returns all auditable/loggable parameters
func (r SessionDelete) GetSessionID() uint64 {
return r.SessionID
}
// Fill processes request and fills internal variables
func (r *SessionDelete) Fill(req *http.Request) (err error) {
func (r *SessionCancel) Fill(req *http.Request) (err error) {
{
var val string
@@ -529,50 +475,3 @@ func (r *SessionResumeState) Fill(req *http.Request) (err error) {
return err
}
// NewSessionDeleteState request
func NewSessionDeleteState() *SessionDeleteState {
return &SessionDeleteState{}
}
// Auditable returns all auditable/loggable parameters
func (r SessionDeleteState) Auditable() map[string]interface{} {
return map[string]interface{}{
"sessionID": r.SessionID,
"stateID": r.StateID,
}
}
// Auditable returns all auditable/loggable parameters
func (r SessionDeleteState) GetSessionID() uint64 {
return r.SessionID
}
// Auditable returns all auditable/loggable parameters
func (r SessionDeleteState) GetStateID() uint64 {
return r.StateID
}
// Fill processes request and fills internal variables
func (r *SessionDeleteState) Fill(req *http.Request) (err error) {
{
var val string
// path params
val = chi.URLParam(req, "sessionID")
r.SessionID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
val = chi.URLParam(req, "stateID")
r.StateID, err = payload.ParseUint64(val), nil
if err != nil {
return err
}
}
return err
}

View File

@@ -2,7 +2,6 @@ package rest
import (
"context"
"fmt"
"github.com/cortezaproject/corteza-server/automation/rest/request"
"github.com/cortezaproject/corteza-server/automation/service"
"github.com/cortezaproject/corteza-server/automation/types"
@@ -24,6 +23,7 @@ type (
LookupByID(ctx context.Context, sessionID uint64) (*types.Session, error)
Resume(sessionID, stateID uint64, i auth.Identifiable, input *expr.Vars) error
PendingPrompts(context.Context) []*wfexec.PendingPrompt
Cancel(context.Context, uint64) error
}
sessionSetPayload struct {
@@ -79,12 +79,8 @@ func (ctrl Session) Read(ctx context.Context, r *request.SessionRead) (interface
return ctrl.svc.LookupByID(ctx, r.SessionID)
}
func (ctrl Session) Delete(ctx context.Context, r *request.SessionDelete) (interface{}, error) {
return nil, fmt.Errorf("not implemented")
}
func (ctrl Session) Trace(ctx context.Context, trace *request.SessionTrace) (interface{}, error) {
return nil, fmt.Errorf("not implemented")
func (ctrl Session) Cancel(ctx context.Context, r *request.SessionCancel) (interface{}, error) {
return true, ctrl.svc.Cancel(ctx, r.SessionID)
}
func (ctrl Session) ListPrompts(ctx context.Context, r *request.SessionListPrompts) (interface{}, error) {
@@ -99,10 +95,6 @@ func (ctrl Session) ResumeState(ctx context.Context, r *request.SessionResumeSta
return api.OK(), ctrl.svc.Resume(r.SessionID, r.StateID, auth.GetIdentityFromContext(ctx), r.Input)
}
func (ctrl Session) DeleteState(ctx context.Context, r *request.SessionDeleteState) (interface{}, error) {
return nil, fmt.Errorf("not implemented")
}
func (ctrl Session) makeFilterPayload(ctx context.Context, ss types.SessionSet, f types.SessionFilter, err error) (*sessionSetPayload, error) {
if err != nil {
return nil, err

View File

@@ -243,6 +243,9 @@ func (svc *session) Start(ctx context.Context, g *wfexec.Graph, ssp types.Sessio
// Resume resumes suspended session/state
//
// Session can only be resumed by knowing session and state ID. Resume is an asynchronous operation
//
// There is minimum access-control deep inside wfexec.Session.Resume function
// that compares identity with state owner
func (svc *session) Resume(sessionID, stateID uint64, i auth.Identifiable, input *expr.Vars) error {
var (
ctx = auth.SetIdentityToContext(context.Background(), i)
@@ -267,6 +270,36 @@ func (svc *session) Resume(sessionID, stateID uint64, i auth.Identifiable, input
return nil
}
// Terminates session ID
func (svc *session) Cancel(ctx context.Context, sessionID uint64) (err error) {
svc.mux.RLock()
var (
wf *types.Workflow
ses = svc.pool[sessionID]
)
// unlock right away!
// when session is canceled, handler pick it up and
// locks it again
svc.mux.RUnlock()
if ses == nil {
return errors.NotFound("session not found or already canceled")
}
if wf, err = loadWorkflow(ctx, svc.store, ses.WorkflowID); err != nil {
return
}
if !svc.ac.CanManageSessionsOnWorkflow(ctx, wf) {
return SessionErrNotAllowedToManage()
}
ses.Cancel()
return nil
}
// spawns a new session
//
// We need initial context for the session because we want to catch all cancellations or timeouts from there
@@ -414,7 +447,10 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa
svc.mux.Lock()
defer svc.mux.Unlock()
log := svc.log.With(zap.Uint64("sessionID", s.ID()))
log := svc.log.With(
zap.Uint64("sessionID", s.ID()),
zap.Stringer("status", i),
)
ses := svc.pool[s.ID()]
if ses == nil {
@@ -424,18 +460,23 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa
log = log.With(zap.Uint64("workflowID", ses.WorkflowID))
log.Debug("state change handler")
var (
// By default, we want to update session when new status is prompted, delayed, completed or failed
// By default, we want to update session when new status is prompted, delayed, completed, canceled or failed
// But if status is active, we'll flush it every X frames (sessionStateFlushFrequency)
update = true
frame = state.MakeFrame()
frame *wfexec.Frame
)
// Stacktrace will be set to !nil if frame collection is needed
if len(ses.RuntimeStacktrace) > 0 {
// calculate how long it took to get to this step
frame.ElapsedTime = uint(frame.CreatedAt.Sub(ses.RuntimeStacktrace[0].CreatedAt) / time.Millisecond)
if state != nil {
frame = state.MakeFrame()
// Stacktrace will be set to !nil if frame collection is needed
if len(ses.RuntimeStacktrace) > 0 {
// calculate how long it took to get to this step
frame.ElapsedTime = uint(frame.CreatedAt.Sub(ses.RuntimeStacktrace[0].CreatedAt) / time.Millisecond)
}
}
ses.AppendRuntimeStacktrace(frame)
@@ -476,9 +517,16 @@ func (svc *session) stateChangeHandler(ctx context.Context) wfexec.StateChangeHa
case wfexec.SessionFailed:
ses.SuspendedAt = nil
ses.CompletedAt = now()
ses.Error = state.Error()
if state != nil {
ses.Error = state.Error()
}
ses.Status = types.SessionFailed
case wfexec.SessionCanceled:
ses.SuspendedAt = nil
ses.CompletedAt = now()
ses.Status = types.SessionCanceled
default:
// force update on every F new frames (F=sessionStateFlushFrequency) but only when stacktrace is not nil
update = ses.RuntimeStacktrace != nil && len(ses.RuntimeStacktrace)%sessionStateFlushFrequency == 0

View File

@@ -110,6 +110,7 @@ const (
SessionSuspended
SessionFailed
SessionCompleted
SessionCanceled
)
func NewSession(s *wfexec.Session) *Session {
@@ -131,6 +132,11 @@ func (s *Session) Resume(ctx context.Context, stateID uint64, input *expr.Vars)
return s.session.Resume(ctx, stateID, input)
}
func (s *Session) Cancel() {
s.session.Cancel()
s.Status = SessionCanceled
}
func (s *Session) PendingPrompts(ownerId uint64) []*wfexec.PendingPrompt {
return s.session.UserPendingPrompts(ownerId)
}
@@ -139,7 +145,9 @@ func (s *Session) GC() bool {
s.l.RLock()
defer s.l.RUnlock()
return s.CompletedAt != nil || s.session.Error() != nil
return s.CompletedAt != nil ||
s.Status == SessionCanceled ||
s.session.Error() != nil
}
// WaitResults wait blocks until workflow session is completed or fails (or context is canceled) and returns resuts
@@ -245,6 +253,8 @@ func (s SessionStatus) String() string {
return "failed"
case SessionCompleted:
return "completed"
case SessionCanceled:
return "canceled"
}
return "unknown"

View File

@@ -123,6 +123,7 @@ const (
SessionDelayed
SessionFailed
SessionCompleted
SessionCanceled
)
var (
@@ -136,6 +137,8 @@ var (
c := time.Now()
return &c
}
errCanceled = fmt.Errorf("canceled")
)
func (s SessionStatus) String() string {
@@ -150,6 +153,8 @@ func (s SessionStatus) String() string {
return "failed"
case SessionCompleted:
return "completed"
case SessionCanceled:
return "aborted"
}
return "UNKNOWN-SESSION-STATUS"
@@ -197,6 +202,10 @@ func (s *Session) Status() SessionStatus {
switch {
case s.err != nil:
if s.err == errCanceled {
return SessionCanceled
}
return SessionFailed
case len(s.prompted) > 0:
@@ -425,7 +434,7 @@ func (s *Session) WaitUntil(ctx context.Context, expected ...SessionStatus) erro
}
case <-ctx.Done():
s.log.Debug("wait context done", zap.Error(ctx.Err()))
s.log.Debug("wait context canceled", zap.Error(ctx.Err()))
s.Cancel()
return s.err
}
@@ -573,8 +582,9 @@ func (s *Session) worker(ctx context.Context) {
}
func (s *Session) Cancel() {
s.log.Debug("canceling")
s.qErr <- fmt.Errorf("canceled")
s.log.Warn("canceling")
s.qErr <- errCanceled
s.eventHandler(SessionCanceled, nil, s)
}
func (s *Session) Stop() {