From 9fe52398a9cbbd5da2002b94c20050964461b026 Mon Sep 17 00:00:00 2001 From: Peter Grlica Date: Thu, 8 Jul 2021 17:24:24 +0200 Subject: [PATCH] Added workflow processer --- app/servers.go | 3 +- pkg/apigw/apigw.go | 27 +++---- pkg/apigw/expediter.go | 10 ++- pkg/apigw/pipeline.go | 25 +++++- pkg/apigw/processer.go | 121 +++++++++++++++++------------- pkg/apigw/registry.go | 15 +++- pkg/apigw/route.go | 8 +- pkg/apigw/verifier.go | 16 +++- store/rdbms/apigw_function.gen.go | 8 +- store/rdbms/apigw_route.gen.go | 8 +- 10 files changed, 151 insertions(+), 90 deletions(-) diff --git a/app/servers.go b/app/servers.go index 9eb44739c..c482029a7 100644 --- a/app/servers.go +++ b/app/servers.go @@ -16,7 +16,6 @@ import ( "github.com/cortezaproject/corteza-server/pkg/actionlog" "github.com/cortezaproject/corteza-server/pkg/api/server" "github.com/cortezaproject/corteza-server/pkg/apigw" - "github.com/cortezaproject/corteza-server/pkg/eventbus" "github.com/cortezaproject/corteza-server/pkg/logger" "github.com/cortezaproject/corteza-server/pkg/options" "github.com/cortezaproject/corteza-server/pkg/webapp" @@ -109,7 +108,7 @@ func (app *CortezaApp) mountHttpRoutes(r chi.Router) { // temp api gateway support { - apigw.Setup(struct{}{}, app.Log, eventbus.Service(), service.DefaultStore) + apigw.Setup(struct{}{}, app.Log, service.DefaultStore) r.Route("/gateway", apigw.Service().Router(context.Background())) } diff --git a/pkg/apigw/apigw.go b/pkg/apigw/apigw.go index a1c2dc6ce..d4095162e 100644 --- a/pkg/apigw/apigw.go +++ b/pkg/apigw/apigw.go @@ -15,12 +15,11 @@ type ( } apigw struct { - log *zap.Logger - reg *registry - routes []*route - dispatcher dispatcher - storer storer - reload chan bool + log *zap.Logger + reg *registry + routes []*route + storer storer + reload chan bool } ) @@ -38,24 +37,23 @@ func Set(a *apigw) { } // Setup handles the singleton service -func Setup(opts interface{}, log *zap.Logger, dispatcher dispatcher, storer storer) { +func Setup(opts interface{}, log *zap.Logger, storer storer) { if apiGw != nil { return } - apiGw = New(opts, log, dispatcher, storer) + apiGw = New(opts, log, storer) } -func New(opts interface{}, logger *zap.Logger, dispatcher dispatcher, storer storer) *apigw { +func New(opts interface{}, logger *zap.Logger, storer storer) *apigw { reg := NewRegistry() reg.Preload() return &apigw{ - log: logger, - dispatcher: dispatcher, - storer: storer, - reload: make(chan bool), - reg: reg, + log: logger, + storer: storer, + reload: make(chan bool), + reg: reg, } } @@ -92,7 +90,6 @@ func (s *apigw) loadFunctions(ctx context.Context, route uint64) (ff []*types.Fu func (s *apigw) Router(ctx context.Context) func(r chi.Router) { return func(r chi.Router) { - routes, err := s.loadRoutes(ctx) if err != nil { diff --git a/pkg/apigw/expediter.go b/pkg/apigw/expediter.go index 5bdd90ae4..9b9dfeb25 100644 --- a/pkg/apigw/expediter.go +++ b/pkg/apigw/expediter.go @@ -21,6 +21,10 @@ type ( } ) +func NewExpediterRedirection() expediterRedirection { + return expediterRedirection{} +} + func (h expediterRedirection) Meta(f *types.Function) functionMeta { return functionMeta{ Step: 3, @@ -34,8 +38,8 @@ func (h expediterRedirection) Meta(f *types.Function) functionMeta { func (h expediterRedirection) Handler() handlerFunc { return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error { - scope.writer.Header().Add(fmt.Sprintf("step_%d", ff.step), ff.name) - http.Redirect(scope.writer, scope.req, params["location"].(string), http.StatusFound) + scope.Writer().Header().Add(fmt.Sprintf("step_%d", ff.step), ff.name) + http.Redirect(scope.Writer(), scope.Request(), params["location"].(string), http.StatusFound) return nil } @@ -53,5 +57,5 @@ func (pp errorHandler) Exec(ctx context.Context, scope *scp, err error) { } spew.Dump("ERR in expediter", err, resp) - json.NewEncoder(scope.writer).Encode(resp) + json.NewEncoder(scope.Writer()).Encode(resp) } diff --git a/pkg/apigw/pipeline.go b/pkg/apigw/pipeline.go index 0f28ed2af..c2f347d31 100644 --- a/pkg/apigw/pipeline.go +++ b/pkg/apigw/pipeline.go @@ -36,12 +36,29 @@ type ( err ErrorHandler } - scp struct { - req *http.Request - writer http.ResponseWriter - } + scp map[string]interface{} ) +func (s scp) Request() *http.Request { + if _, ok := s["request"]; ok { + return s["request"].(*http.Request) + } + + return nil +} + +func (s scp) Writer() http.ResponseWriter { + if _, ok := s["writer"]; ok { + return s["writer"].(http.ResponseWriter) + } + + return nil +} + +func (s scp) Set(k string, v interface{}) { + s[k] = v +} + // Exec takes care of error handling and main // functionality that takes place in worker func (pp *pl) Exec(ctx context.Context, scope *scp) (err error) { diff --git a/pkg/apigw/processer.go b/pkg/apigw/processer.go index 9ea0f47fa..56fd11c11 100644 --- a/pkg/apigw/processer.go +++ b/pkg/apigw/processer.go @@ -2,21 +2,29 @@ package apigw import ( "context" + "fmt" - "github.com/cortezaproject/corteza-server/pkg/eventbus" + atypes "github.com/cortezaproject/corteza-server/automation/types" + "github.com/cortezaproject/corteza-server/pkg/expr" "github.com/cortezaproject/corteza-server/system/types" ) type ( - dispatcher interface { - Dispatch(ctx context.Context, ev eventbus.Event) + WfExecer interface { + Exec(ctx context.Context, workflowID uint64, p atypes.WorkflowExecParams) (*expr.Vars, atypes.Stacktrace, error) } processerWorkflow struct { - d dispatcher + d WfExecer } ) +func NewProcesserWorkflow(wf WfExecer) processerWorkflow { + return processerWorkflow{ + d: wf, + } +} + func (h processerWorkflow) Meta(f *types.Function) functionMeta { return functionMeta{ Step: 2, @@ -37,54 +45,63 @@ func (h processerWorkflow) Meta(f *types.Function) functionMeta { func (h processerWorkflow) Handler() handlerFunc { return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error { - // h.d.Dispatch(c, event.ApiOnProcess(&envlp)) + var ( + wfID int64 + ok bool + err error + ) - return nil + // validate workflow param + if _, ok = params["workflow"]; !ok { + return fmt.Errorf("invalid param workflow") + } + + wfID, err = expr.CastToInteger(params["workflow"]) + + if err != nil { + return err + } + + // setup scope for workflow + vv := map[string]interface{}{ + "request": scope.Request(), + } + + // get the request data and put it into vars + in, err := expr.NewVars(vv) + + if err != nil { + return err + } + + wp := atypes.WorkflowExecParams{ + Trace: false, + // todo depending on settings per-route + Async: false, + // todo depending on settings per-route + Wait: true, + Input: in, + } + + out, _, err := h.d.Exec(ctx, uint64(wfID), wp) + + if err != nil { + return err + } + + // merge out with scope + merged, err := in.Merge(out) + + if err != nil { + return err + } + + mm, err := expr.CastToVars(merged) + + for k, v := range mm { + scope.Set(k, v) + } + + return err } } - -// func formDataProcesserFn(c context.Context, er *wfexec.ExecRequest) (r wfexec.ExecResponse, err error) { -// type ( -// formDataProcesserResponse struct { -// Name string `json:"name"` -// } -// ) - -// spew.Dump("step processer fn()") - -// e := er.Scope.GetValue()["envelope"] -// ee := e.Get() - -// // ee.(envelope).Writer.WriteHeader(int(id)) -// ee.(envelope).Writer.Write([]byte(`{"test":"foobar"}`)) - -// e.Assign(ee) - -// // req := values.Get() -// // ww := wr.Get() -// // writer := ww.(http.ResponseWriter) - -// // formValue := req.(*http.Request).PostFormValue("name") - -// // resp := formDataProcesserResponse{ -// // // Name: fmt.Sprintf("AA %s AA", formValue), -// // Name: "formValue", -// // } - -// // encoder := json.NewEncoder(writer) -// // encoder.Encode(resp) - -// // writer.(*httptest.ResponseRecorder).Header()["Content-Type"] = []string{"application/json"} -// // writer.Header().Set("Content-Type", "application/json3") - -// // spew.Dump(writer.(*httptest.ResponseRecorder).Header()) -// // a, b := expr.NewKV(writer) -// // spew.Dump("Aaaaaaaaaaa", a) - -// vv := &expr.Vars{} -// // vv.Set("writer", writer) - -// r = vv - -// return -// } diff --git a/pkg/apigw/registry.go b/pkg/apigw/registry.go index 5a14bfc32..0f7e6362b 100644 --- a/pkg/apigw/registry.go +++ b/pkg/apigw/registry.go @@ -3,6 +3,9 @@ package apigw import ( "fmt" + "github.com/cortezaproject/corteza-server/automation/service" + as "github.com/cortezaproject/corteza-server/automation/service" + "github.com/cortezaproject/corteza-server/pkg/options" "github.com/cortezaproject/corteza-server/system/types" ) @@ -45,8 +48,12 @@ func (r *registry) All() (list functionMetaList) { } func (r *registry) Preload() { - r.Add("verifierQueryParam", verifierQueryParam{}) - r.Add("verifierOrigin", verifierOrigin{}) - r.Add("expediterRedirection", expediterRedirection{}) - r.Add("processerWorkflow", processerWorkflow{}) + r.Add("verifierQueryParam", NewVerifierQueryParam()) + r.Add("verifierOrigin", NewVerifierOrigin()) + r.Add("expediterRedirection", NewExpediterRedirection()) + r.Add("processerWorkflow", NewProcesserWorkflow(NewWorkflow())) +} + +func NewWorkflow() WfExecer { + return as.Workflow(service.DefaultLogger, options.CorredorOpt{}) } diff --git a/pkg/apigw/route.go b/pkg/apigw/route.go index 0749f1fe6..0ee70493a 100644 --- a/pkg/apigw/route.go +++ b/pkg/apigw/route.go @@ -18,12 +18,12 @@ type ( func (r route) ServeHTTP(w http.ResponseWriter, req *http.Request) { var ( ctx = context.Background() - scope = scp{ - req: req, - writer: w, - } + scope = scp{} ) + scope["request"] = req + scope["writer"] = w + err := r.pipe.Exec(ctx, &scope) if err != nil { diff --git a/pkg/apigw/verifier.go b/pkg/apigw/verifier.go index dd68b3649..c20a65bcb 100644 --- a/pkg/apigw/verifier.go +++ b/pkg/apigw/verifier.go @@ -14,6 +14,14 @@ type ( verifierOrigin struct{} ) +func NewVerifierOrigin() verifierOrigin { + return verifierOrigin{} +} + +func NewVerifierQueryParam() verifierQueryParam { + return verifierQueryParam{} +} + func (h verifierQueryParam) Meta(f *types.Function) functionMeta { return functionMeta{ Step: 0, @@ -62,7 +70,7 @@ func (h verifierQueryParam) Handler() handlerFunc { } vv := map[string]interface{}{} - vals := scope.req.URL.Query() + vals := scope.Request().URL.Query() for k, v := range vals { vv[k] = v[0] @@ -99,7 +107,7 @@ func (h verifierQueryParam) Handler() handlerFunc { } // testing - scope.req.Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name) + scope.Request().Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name) return nil } @@ -116,7 +124,7 @@ func (h verifierOrigin) Handler() handlerFunc { } vv := map[string]interface{}{ - "origin": scope.req.Header.Get("Origin"), + "origin": scope.Request().Header.Get("Origin"), } // get the request data and put it into vars @@ -150,7 +158,7 @@ func (h verifierOrigin) Handler() handlerFunc { } // testing - scope.req.Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name) + scope.Request().Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name) return nil } diff --git a/store/rdbms/apigw_function.gen.go b/store/rdbms/apigw_function.gen.go index aa56bff14..298ea4504 100644 --- a/store/rdbms/apigw_function.gen.go +++ b/store/rdbms/apigw_function.gen.go @@ -238,6 +238,7 @@ func (s Store) QueryApigwFunctions( check func(*types.Function) (bool, error), ) ([]*types.Function, error) { var ( + tmp = make([]*types.Function, 0, DefaultSliceCapacity) set = make([]*types.Function, 0, DefaultSliceCapacity) res *types.Function @@ -259,10 +260,15 @@ func (s Store) QueryApigwFunctions( return nil, err } + tmp = append(tmp, res) + } + + for _, res = range tmp { + set = append(set, res) } - return set, rows.Err() + return set, nil } // LookupApigwFunctionByID searches for function by ID diff --git a/store/rdbms/apigw_route.gen.go b/store/rdbms/apigw_route.gen.go index 08d6f3ab4..3fc10724e 100644 --- a/store/rdbms/apigw_route.gen.go +++ b/store/rdbms/apigw_route.gen.go @@ -238,6 +238,7 @@ func (s Store) QueryApigwRoutes( check func(*types.Route) (bool, error), ) ([]*types.Route, error) { var ( + tmp = make([]*types.Route, 0, DefaultSliceCapacity) set = make([]*types.Route, 0, DefaultSliceCapacity) res *types.Route @@ -259,10 +260,15 @@ func (s Store) QueryApigwRoutes( return nil, err } + tmp = append(tmp, res) + } + + for _, res = range tmp { + set = append(set, res) } - return set, rows.Err() + return set, nil } // LookupApigwRouteByID searches for route by ID