3
0

Added workflow processer

This commit is contained in:
Peter Grlica 2021-07-08 17:24:24 +02:00
parent 834610b242
commit 9fe52398a9
10 changed files with 151 additions and 90 deletions

View File

@ -16,7 +16,6 @@ import (
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/api/server"
"github.com/cortezaproject/corteza-server/pkg/apigw"
"github.com/cortezaproject/corteza-server/pkg/eventbus"
"github.com/cortezaproject/corteza-server/pkg/logger"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/pkg/webapp"
@ -109,7 +108,7 @@ func (app *CortezaApp) mountHttpRoutes(r chi.Router) {
// temp api gateway support
{
apigw.Setup(struct{}{}, app.Log, eventbus.Service(), service.DefaultStore)
apigw.Setup(struct{}{}, app.Log, service.DefaultStore)
r.Route("/gateway", apigw.Service().Router(context.Background()))
}

View File

@ -15,12 +15,11 @@ type (
}
apigw struct {
log *zap.Logger
reg *registry
routes []*route
dispatcher dispatcher
storer storer
reload chan bool
log *zap.Logger
reg *registry
routes []*route
storer storer
reload chan bool
}
)
@ -38,24 +37,23 @@ func Set(a *apigw) {
}
// Setup handles the singleton service
func Setup(opts interface{}, log *zap.Logger, dispatcher dispatcher, storer storer) {
func Setup(opts interface{}, log *zap.Logger, storer storer) {
if apiGw != nil {
return
}
apiGw = New(opts, log, dispatcher, storer)
apiGw = New(opts, log, storer)
}
func New(opts interface{}, logger *zap.Logger, dispatcher dispatcher, storer storer) *apigw {
func New(opts interface{}, logger *zap.Logger, storer storer) *apigw {
reg := NewRegistry()
reg.Preload()
return &apigw{
log: logger,
dispatcher: dispatcher,
storer: storer,
reload: make(chan bool),
reg: reg,
log: logger,
storer: storer,
reload: make(chan bool),
reg: reg,
}
}
@ -92,7 +90,6 @@ func (s *apigw) loadFunctions(ctx context.Context, route uint64) (ff []*types.Fu
func (s *apigw) Router(ctx context.Context) func(r chi.Router) {
return func(r chi.Router) {
routes, err := s.loadRoutes(ctx)
if err != nil {

View File

@ -21,6 +21,10 @@ type (
}
)
func NewExpediterRedirection() expediterRedirection {
return expediterRedirection{}
}
func (h expediterRedirection) Meta(f *types.Function) functionMeta {
return functionMeta{
Step: 3,
@ -34,8 +38,8 @@ func (h expediterRedirection) Meta(f *types.Function) functionMeta {
func (h expediterRedirection) Handler() handlerFunc {
return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error {
scope.writer.Header().Add(fmt.Sprintf("step_%d", ff.step), ff.name)
http.Redirect(scope.writer, scope.req, params["location"].(string), http.StatusFound)
scope.Writer().Header().Add(fmt.Sprintf("step_%d", ff.step), ff.name)
http.Redirect(scope.Writer(), scope.Request(), params["location"].(string), http.StatusFound)
return nil
}
@ -53,5 +57,5 @@ func (pp errorHandler) Exec(ctx context.Context, scope *scp, err error) {
}
spew.Dump("ERR in expediter", err, resp)
json.NewEncoder(scope.writer).Encode(resp)
json.NewEncoder(scope.Writer()).Encode(resp)
}

View File

@ -36,12 +36,29 @@ type (
err ErrorHandler
}
scp struct {
req *http.Request
writer http.ResponseWriter
}
scp map[string]interface{}
)
func (s scp) Request() *http.Request {
if _, ok := s["request"]; ok {
return s["request"].(*http.Request)
}
return nil
}
func (s scp) Writer() http.ResponseWriter {
if _, ok := s["writer"]; ok {
return s["writer"].(http.ResponseWriter)
}
return nil
}
func (s scp) Set(k string, v interface{}) {
s[k] = v
}
// Exec takes care of error handling and main
// functionality that takes place in worker
func (pp *pl) Exec(ctx context.Context, scope *scp) (err error) {

View File

@ -2,21 +2,29 @@ package apigw
import (
"context"
"fmt"
"github.com/cortezaproject/corteza-server/pkg/eventbus"
atypes "github.com/cortezaproject/corteza-server/automation/types"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/cortezaproject/corteza-server/system/types"
)
type (
dispatcher interface {
Dispatch(ctx context.Context, ev eventbus.Event)
WfExecer interface {
Exec(ctx context.Context, workflowID uint64, p atypes.WorkflowExecParams) (*expr.Vars, atypes.Stacktrace, error)
}
processerWorkflow struct {
d dispatcher
d WfExecer
}
)
func NewProcesserWorkflow(wf WfExecer) processerWorkflow {
return processerWorkflow{
d: wf,
}
}
func (h processerWorkflow) Meta(f *types.Function) functionMeta {
return functionMeta{
Step: 2,
@ -37,54 +45,63 @@ func (h processerWorkflow) Meta(f *types.Function) functionMeta {
func (h processerWorkflow) Handler() handlerFunc {
return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error {
// h.d.Dispatch(c, event.ApiOnProcess(&envlp))
var (
wfID int64
ok bool
err error
)
return nil
// validate workflow param
if _, ok = params["workflow"]; !ok {
return fmt.Errorf("invalid param workflow")
}
wfID, err = expr.CastToInteger(params["workflow"])
if err != nil {
return err
}
// setup scope for workflow
vv := map[string]interface{}{
"request": scope.Request(),
}
// get the request data and put it into vars
in, err := expr.NewVars(vv)
if err != nil {
return err
}
wp := atypes.WorkflowExecParams{
Trace: false,
// todo depending on settings per-route
Async: false,
// todo depending on settings per-route
Wait: true,
Input: in,
}
out, _, err := h.d.Exec(ctx, uint64(wfID), wp)
if err != nil {
return err
}
// merge out with scope
merged, err := in.Merge(out)
if err != nil {
return err
}
mm, err := expr.CastToVars(merged)
for k, v := range mm {
scope.Set(k, v)
}
return err
}
}
// func formDataProcesserFn(c context.Context, er *wfexec.ExecRequest) (r wfexec.ExecResponse, err error) {
// type (
// formDataProcesserResponse struct {
// Name string `json:"name"`
// }
// )
// spew.Dump("step processer fn()")
// e := er.Scope.GetValue()["envelope"]
// ee := e.Get()
// // ee.(envelope).Writer.WriteHeader(int(id))
// ee.(envelope).Writer.Write([]byte(`{"test":"foobar"}`))
// e.Assign(ee)
// // req := values.Get()
// // ww := wr.Get()
// // writer := ww.(http.ResponseWriter)
// // formValue := req.(*http.Request).PostFormValue("name")
// // resp := formDataProcesserResponse{
// // // Name: fmt.Sprintf("AA %s AA", formValue),
// // Name: "formValue",
// // }
// // encoder := json.NewEncoder(writer)
// // encoder.Encode(resp)
// // writer.(*httptest.ResponseRecorder).Header()["Content-Type"] = []string{"application/json"}
// // writer.Header().Set("Content-Type", "application/json3")
// // spew.Dump(writer.(*httptest.ResponseRecorder).Header())
// // a, b := expr.NewKV(writer)
// // spew.Dump("Aaaaaaaaaaa", a)
// vv := &expr.Vars{}
// // vv.Set("writer", writer)
// r = vv
// return
// }

View File

@ -3,6 +3,9 @@ package apigw
import (
"fmt"
"github.com/cortezaproject/corteza-server/automation/service"
as "github.com/cortezaproject/corteza-server/automation/service"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/system/types"
)
@ -45,8 +48,12 @@ func (r *registry) All() (list functionMetaList) {
}
func (r *registry) Preload() {
r.Add("verifierQueryParam", verifierQueryParam{})
r.Add("verifierOrigin", verifierOrigin{})
r.Add("expediterRedirection", expediterRedirection{})
r.Add("processerWorkflow", processerWorkflow{})
r.Add("verifierQueryParam", NewVerifierQueryParam())
r.Add("verifierOrigin", NewVerifierOrigin())
r.Add("expediterRedirection", NewExpediterRedirection())
r.Add("processerWorkflow", NewProcesserWorkflow(NewWorkflow()))
}
func NewWorkflow() WfExecer {
return as.Workflow(service.DefaultLogger, options.CorredorOpt{})
}

View File

@ -18,12 +18,12 @@ type (
func (r route) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var (
ctx = context.Background()
scope = scp{
req: req,
writer: w,
}
scope = scp{}
)
scope["request"] = req
scope["writer"] = w
err := r.pipe.Exec(ctx, &scope)
if err != nil {

View File

@ -14,6 +14,14 @@ type (
verifierOrigin struct{}
)
func NewVerifierOrigin() verifierOrigin {
return verifierOrigin{}
}
func NewVerifierQueryParam() verifierQueryParam {
return verifierQueryParam{}
}
func (h verifierQueryParam) Meta(f *types.Function) functionMeta {
return functionMeta{
Step: 0,
@ -62,7 +70,7 @@ func (h verifierQueryParam) Handler() handlerFunc {
}
vv := map[string]interface{}{}
vals := scope.req.URL.Query()
vals := scope.Request().URL.Query()
for k, v := range vals {
vv[k] = v[0]
@ -99,7 +107,7 @@ func (h verifierQueryParam) Handler() handlerFunc {
}
// testing
scope.req.Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name)
scope.Request().Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name)
return nil
}
@ -116,7 +124,7 @@ func (h verifierOrigin) Handler() handlerFunc {
}
vv := map[string]interface{}{
"origin": scope.req.Header.Get("Origin"),
"origin": scope.Request().Header.Get("Origin"),
}
// get the request data and put it into vars
@ -150,7 +158,7 @@ func (h verifierOrigin) Handler() handlerFunc {
}
// testing
scope.req.Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name)
scope.Request().Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name)
return nil
}

View File

@ -238,6 +238,7 @@ func (s Store) QueryApigwFunctions(
check func(*types.Function) (bool, error),
) ([]*types.Function, error) {
var (
tmp = make([]*types.Function, 0, DefaultSliceCapacity)
set = make([]*types.Function, 0, DefaultSliceCapacity)
res *types.Function
@ -259,10 +260,15 @@ func (s Store) QueryApigwFunctions(
return nil, err
}
tmp = append(tmp, res)
}
for _, res = range tmp {
set = append(set, res)
}
return set, rows.Err()
return set, nil
}
// LookupApigwFunctionByID searches for function by ID

View File

@ -238,6 +238,7 @@ func (s Store) QueryApigwRoutes(
check func(*types.Route) (bool, error),
) ([]*types.Route, error) {
var (
tmp = make([]*types.Route, 0, DefaultSliceCapacity)
set = make([]*types.Route, 0, DefaultSliceCapacity)
res *types.Route
@ -259,10 +260,15 @@ func (s Store) QueryApigwRoutes(
return nil, err
}
tmp = append(tmp, res)
}
for _, res = range tmp {
set = append(set, res)
}
return set, rows.Err()
return set, nil
}
// LookupApigwRouteByID searches for route by ID