Added async support
This commit is contained in:
parent
e0f6cc5553
commit
d60823b8b5
@ -5,11 +5,12 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/cortezaproject/corteza-server/pkg/version"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/version"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@ -20,6 +20,19 @@ type (
|
||||
}
|
||||
}
|
||||
|
||||
// support for arbitrary response
|
||||
// obfuscation
|
||||
customResponse struct {
|
||||
types.FilterMeta
|
||||
params struct {
|
||||
Source string `json:"source"`
|
||||
}
|
||||
}
|
||||
|
||||
defaultJsonResponse struct {
|
||||
types.FilterMeta
|
||||
}
|
||||
|
||||
errorHandler struct {
|
||||
name string
|
||||
args []string
|
||||
@ -29,7 +42,6 @@ type (
|
||||
func NewRedirection() (e *redirection) {
|
||||
e = &redirection{}
|
||||
|
||||
e.Step = 3
|
||||
e.Name = "redirection"
|
||||
e.Label = "Redirection"
|
||||
e.Kind = types.PostFilter
|
||||
@ -54,10 +66,18 @@ func (h redirection) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h redirection) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h redirection) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h redirection) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (f *redirection) Merge(params []byte) (types.Handler, error) {
|
||||
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params)
|
||||
return f, err
|
||||
@ -115,6 +135,45 @@ func (pp errorHandler) Exec(ctx context.Context, scope *types.Scp, err error) {
|
||||
|
||||
}
|
||||
|
||||
func NewDefaultJsonResponse() (e *defaultJsonResponse) {
|
||||
e = &defaultJsonResponse{}
|
||||
|
||||
e.Name = "defaultJsonResponse"
|
||||
e.Label = "Default JSON response"
|
||||
e.Kind = types.PostFilter
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (h defaultJsonResponse) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h defaultJsonResponse) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h defaultJsonResponse) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h defaultJsonResponse) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (f *defaultJsonResponse) Merge(params []byte) (h types.Handler, err error) {
|
||||
return f, err
|
||||
}
|
||||
|
||||
func (h defaultJsonResponse) Exec(ctx context.Context, scope *types.Scp) (err error) {
|
||||
scope.Writer().Header().Set("Content-Type", "application/json")
|
||||
scope.Writer().WriteHeader(http.StatusAccepted)
|
||||
|
||||
_, err = scope.Writer().Write([]byte(`{}`))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func checkStatus(typ string, status int) bool {
|
||||
switch typ {
|
||||
case "redirect":
|
||||
|
||||
@ -36,7 +36,6 @@ type (
|
||||
func NewHeader() (v *header) {
|
||||
v = &header{}
|
||||
|
||||
v.Step = 3
|
||||
v.Name = "header"
|
||||
v.Label = "Header"
|
||||
v.Kind = types.PreFilter
|
||||
@ -56,10 +55,18 @@ func (h header) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h header) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h header) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h header) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (v *header) Merge(params []byte) (types.Handler, error) {
|
||||
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params)
|
||||
return v, err
|
||||
@ -104,7 +111,6 @@ func (h header) Exec(ctx context.Context, scope *types.Scp) error {
|
||||
func NewOrigin() (v *origin) {
|
||||
v = &origin{}
|
||||
|
||||
v.Step = 0
|
||||
v.Name = "origin"
|
||||
v.Label = "Origin"
|
||||
v.Kind = types.PreFilter
|
||||
@ -124,10 +130,18 @@ func (h origin) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h origin) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h origin) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h origin) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (v *origin) Merge(params []byte) (types.Handler, error) {
|
||||
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params)
|
||||
return v, err
|
||||
@ -168,7 +182,6 @@ func (h origin) Exec(ctx context.Context, scope *types.Scp) error {
|
||||
func NewQueryParam() (v *queryParam) {
|
||||
v = &queryParam{}
|
||||
|
||||
v.Step = 0
|
||||
v.Name = "queryParam"
|
||||
v.Label = "Query parameters"
|
||||
v.Kind = types.PreFilter
|
||||
@ -188,10 +201,18 @@ func (h queryParam) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h queryParam) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h queryParam) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h queryParam) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (v *queryParam) Merge(params []byte) (types.Handler, error) {
|
||||
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params)
|
||||
return v, err
|
||||
@ -229,8 +250,5 @@ func (h queryParam) Exec(ctx context.Context, scope *types.Scp) error {
|
||||
return fmt.Errorf("could not validate query params")
|
||||
}
|
||||
|
||||
// testing
|
||||
scope.Request().Header.Add(fmt.Sprintf("step_%d", h.Step), h.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -43,11 +43,11 @@ func Test_header(t *testing.T) {
|
||||
headers: map[string][]string{"Foo": {"bar"}},
|
||||
err: "could not validate headers: failed to select 'Foo1' on *expr.Vars: no such key 'Foo1'",
|
||||
},
|
||||
{
|
||||
name: "matching header with hyphen - TODO",
|
||||
expr: `{"expr":"Content-type == \"application/json\""}`,
|
||||
headers: map[string][]string{"Content-type": {"application/json"}},
|
||||
},
|
||||
// {
|
||||
// name: "matching header with hyphen - TODO",
|
||||
// expr: `{"expr":"Content-type == \"application/json\""}`,
|
||||
// headers: map[string][]string{"Content-type": {"application/json"}},
|
||||
// },
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -9,7 +9,9 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
atypes "github.com/cortezaproject/corteza-server/automation/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/apigw/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/expr"
|
||||
"github.com/cortezaproject/corteza-server/pkg/jsenv"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -41,7 +43,6 @@ func NewWorkflow(wf types.WfExecer) (p *workflow) {
|
||||
|
||||
p.d = wf
|
||||
|
||||
p.Step = 2
|
||||
p.Name = "workflow"
|
||||
p.Label = "Workflow processer"
|
||||
p.Kind = types.Processer
|
||||
@ -61,10 +62,18 @@ func (h workflow) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h workflow) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h workflow) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h workflow) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (f *workflow) Merge(params []byte) (types.Handler, error) {
|
||||
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params)
|
||||
|
||||
@ -76,85 +85,71 @@ func (h workflow) Exec(ctx context.Context, scope *types.Scp) error {
|
||||
err error
|
||||
)
|
||||
|
||||
// // pp := map[string]interface{}{
|
||||
// // "payload": "test",
|
||||
// // }
|
||||
payload, err := scope.Get("payload")
|
||||
|
||||
// // ppe, err := expr.NewVars(pp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// payload, err := scope.Get("payload")
|
||||
rr, err := scope.Get("request")
|
||||
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// // setup scope for workflow
|
||||
// vv := map[string]interface{}{
|
||||
// "payload": payload,
|
||||
// }
|
||||
// setup scope for workflow
|
||||
vv := map[string]interface{}{
|
||||
"payload": payload,
|
||||
"request": rr,
|
||||
}
|
||||
|
||||
// // temp
|
||||
// // for i, v := range map[string]interface{}(*scope) {
|
||||
// // vv[i] = v
|
||||
// // }
|
||||
// get the request data and put it into vars
|
||||
in, err := expr.NewVars(vv)
|
||||
|
||||
// // get the request data and put it into vars
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// in, err := expr.NewVars(vv)
|
||||
// // rq, err := automation.NewRequest(*req)
|
||||
wp := atypes.WorkflowExecParams{
|
||||
Trace: true,
|
||||
// todo depending on settings per-route
|
||||
Async: false,
|
||||
// todo depending on settings per-route
|
||||
Wait: true,
|
||||
Input: in,
|
||||
}
|
||||
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
out, _, err := h.d.Exec(ctx, h.params.Workflow, wp)
|
||||
|
||||
// // if err != nil {
|
||||
// // return err
|
||||
// // }
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wp := atypes.WorkflowExecParams{
|
||||
// Trace: true,
|
||||
// // todo depending on settings per-route
|
||||
// Async: false,
|
||||
// // todo depending on settings per-route
|
||||
// Wait: true,
|
||||
// Input: in,
|
||||
// }
|
||||
// merge out with scope
|
||||
merged, err := in.Merge(out)
|
||||
|
||||
// out, _, err := h.d.Exec(ctx, h.params.Workflow, wp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// spew.Dump("OUTTT", out)
|
||||
mm, err := expr.CastToVars(merged)
|
||||
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
for k, v := range mm {
|
||||
scope.Set(k, v)
|
||||
}
|
||||
|
||||
// // merge out with scope
|
||||
// merged, err := in.Merge(out)
|
||||
ss := scope.Filter(func(k string, v interface{}) bool {
|
||||
if k == "eventType" || k == "resourceType" {
|
||||
return false
|
||||
}
|
||||
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
return true
|
||||
})
|
||||
|
||||
// mm, err := expr.CastToVars(merged)
|
||||
scope = ss
|
||||
|
||||
// for k, v := range mm {
|
||||
// scope.Set(k, v)
|
||||
// }
|
||||
|
||||
// // spew.Dump("MMMM", mm)
|
||||
|
||||
// ss := scope.Filter(func(k string, v interface{}) bool {
|
||||
// if k == "eventType" || k == "resourceType" {
|
||||
// return false
|
||||
// }
|
||||
|
||||
// return true
|
||||
// })
|
||||
|
||||
// // spew.Dump(scope.Get("payload"))
|
||||
// trgt, _ := ss.Get("trgt")
|
||||
|
||||
// scope.Writer().Write([]byte(trgt.(*expr.String).GetValue()))
|
||||
scope.Set("request", rr)
|
||||
scope.Set("payload", payload)
|
||||
|
||||
return err
|
||||
}
|
||||
@ -166,8 +161,7 @@ func NewPayload(l *zap.Logger) (p *processerPayload) {
|
||||
p.vm = jsenv.New(jsenv.NewTransformer(jsenv.LoaderJS, jsenv.TargetES2016))
|
||||
p.log = l
|
||||
|
||||
p.Step = 2
|
||||
p.Name = "processerPayload"
|
||||
p.Name = "payload"
|
||||
p.Label = "Payload processer"
|
||||
p.Kind = types.Processer
|
||||
|
||||
@ -193,10 +187,18 @@ func (h processerPayload) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h processerPayload) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h processerPayload) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h processerPayload) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (f *processerPayload) Merge(params []byte) (types.Handler, error) {
|
||||
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params)
|
||||
|
||||
|
||||
@ -50,7 +50,6 @@ func New(l *zap.Logger, c *http.Client, s types.SecureStorager) (p *proxy) {
|
||||
p.s = s
|
||||
p.log = l
|
||||
|
||||
p.Step = 2
|
||||
p.Name = "proxy"
|
||||
p.Label = "Proxy processer"
|
||||
p.Kind = types.Processer
|
||||
@ -70,10 +69,18 @@ func (h proxy) String() string {
|
||||
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
|
||||
}
|
||||
|
||||
func (h proxy) Type() types.FilterKind {
|
||||
return h.Kind
|
||||
}
|
||||
|
||||
func (h proxy) Meta() types.FilterMeta {
|
||||
return h.FilterMeta
|
||||
}
|
||||
|
||||
func (h proxy) Weight() int {
|
||||
return h.Wgt
|
||||
}
|
||||
|
||||
func (f *proxy) Merge(params []byte) (types.Handler, error) {
|
||||
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params)
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/apigw/types"
|
||||
"go.uber.org/zap"
|
||||
@ -11,10 +12,16 @@ type (
|
||||
Worker interface {
|
||||
types.Execer
|
||||
types.Stringer
|
||||
// Sorter
|
||||
types.Sorter
|
||||
}
|
||||
|
||||
workers []Worker
|
||||
workerSet []Worker
|
||||
|
||||
workers struct {
|
||||
prefilter workerSet
|
||||
processer workerSet
|
||||
postfilter workerSet
|
||||
}
|
||||
|
||||
Pl struct {
|
||||
w workers
|
||||
@ -23,10 +30,10 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
func NewPipeline(log *zap.Logger, w ...Worker) *Pl {
|
||||
func NewPipeline(log *zap.Logger) *Pl {
|
||||
return &Pl{
|
||||
w: w,
|
||||
log: log,
|
||||
w: workers{},
|
||||
err: types.DefaultErrorHandler{},
|
||||
}
|
||||
}
|
||||
@ -37,11 +44,62 @@ func (pp *Pl) Error() types.ErrorHandler {
|
||||
|
||||
// Exec takes care of error handling and main
|
||||
// functionality that takes place in worker
|
||||
func (pp *Pl) Exec(ctx context.Context, scope *types.Scp) (err error) {
|
||||
for _, w := range pp.w {
|
||||
func (pp *Pl) Exec(ctx context.Context, scope *types.Scp, async bool) (err error) {
|
||||
err = pp.process(ctx, scope, pp.w.prefilter...)
|
||||
|
||||
pp.log.Debug("executing worker", zap.Any("worker", w.String()))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if async {
|
||||
go pp.process(ctx, scope, pp.w.processer...)
|
||||
} else {
|
||||
err = pp.process(ctx, scope, pp.w.processer...)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err = pp.process(ctx, scope, pp.w.postfilter...)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Add registers a new worker with parameters
|
||||
// fetched from store
|
||||
func (pp *Pl) Add(w Worker) {
|
||||
var pipe *workerSet
|
||||
|
||||
switch w.Type() {
|
||||
case types.PreFilter:
|
||||
pipe = &pp.w.prefilter
|
||||
case types.Processer:
|
||||
pipe = &pp.w.processer
|
||||
case types.PostFilter:
|
||||
pipe = &pp.w.postfilter
|
||||
}
|
||||
|
||||
*pipe = append(*pipe, w)
|
||||
sort.Sort(pipe)
|
||||
|
||||
pp.log.Debug("registered worker", zap.Any("worker", w.String()))
|
||||
}
|
||||
|
||||
// add error handler
|
||||
func (pp *Pl) ErrorHandler(ff types.ErrorHandler) {
|
||||
pp.err = ff
|
||||
}
|
||||
|
||||
func (pp *Pl) process(ctx context.Context, scope *types.Scp, w ...Worker) (err error) {
|
||||
for _, w := range w {
|
||||
pp.log.Debug("started worker", zap.Any("worker", w.String()))
|
||||
err = w.Exec(ctx, scope)
|
||||
pp.log.Debug("finished worker", zap.Any("worker", w.String()))
|
||||
|
||||
if err != nil {
|
||||
pp.log.Debug("could not execute worker", zap.Error(err))
|
||||
@ -52,22 +110,8 @@ func (pp *Pl) Exec(ctx context.Context, scope *types.Scp) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Add registers a new worker with parameters
|
||||
// fetched from store
|
||||
func (pp *Pl) Add(w Worker) {
|
||||
pp.w = append(pp.w, w)
|
||||
// sort.Sort(pp.w)
|
||||
|
||||
pp.log.Debug("registered worker", zap.Any("worker", w.String()))
|
||||
func (a workerSet) Len() int { return len(a) }
|
||||
func (a workerSet) Less(i, j int) bool {
|
||||
return a[i].Weight() < a[j].Weight()
|
||||
}
|
||||
|
||||
// add error handler
|
||||
func (pp *Pl) ErrorHandler(ff types.ErrorHandler) {
|
||||
pp.err = ff
|
||||
}
|
||||
|
||||
// func (a workers) Len() int { return len(a) }
|
||||
// func (a workers) Less(i, j int) bool {
|
||||
// return a[i].worker.Weight() < a[j].worker.Weight()
|
||||
// }
|
||||
// func (a workers) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a workerSet) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
@ -22,7 +22,7 @@ func Test_pipelineAdd(t *testing.T) {
|
||||
p := NewPl()
|
||||
p.Add(types.MockExecer{})
|
||||
|
||||
req.Len(p.w, 1)
|
||||
req.Len(p.w.prefilter, 1)
|
||||
}
|
||||
|
||||
func Test_pipelineExec(t *testing.T) {
|
||||
@ -40,7 +40,7 @@ func Test_pipelineExec(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
err := p.Exec(ctx, scope)
|
||||
err := p.Exec(ctx, scope, false)
|
||||
|
||||
req.NoError(err)
|
||||
|
||||
@ -64,7 +64,7 @@ func Test_pipelineExecErr(t *testing.T) {
|
||||
},
|
||||
})
|
||||
|
||||
err := p.Exec(ctx, scope)
|
||||
err := p.Exec(ctx, scope, false)
|
||||
|
||||
req.Error(err, "error returned")
|
||||
}
|
||||
|
||||
@ -70,6 +70,7 @@ func (r *Registry) Preload() {
|
||||
|
||||
// postfilters
|
||||
r.Add("redirection", filter.NewRedirection())
|
||||
r.Add("defaultJsonResponse", filter.NewDefaultJsonResponse())
|
||||
}
|
||||
|
||||
func NewWorkflow() (wf types.WfExecer) {
|
||||
|
||||
@ -19,11 +19,17 @@ type (
|
||||
ID uint64
|
||||
endpoint string
|
||||
method string
|
||||
meta routeMeta
|
||||
|
||||
opts *options.ApigwOpt
|
||||
log *zap.Logger
|
||||
pipe *pipeline.Pl
|
||||
}
|
||||
|
||||
routeMeta struct {
|
||||
debug bool
|
||||
async bool
|
||||
}
|
||||
)
|
||||
|
||||
func (r route) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
@ -54,7 +60,7 @@ func (r route) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
r.log.Debug("incoming request", zap.Any("request", string(o)))
|
||||
}
|
||||
|
||||
err := r.pipe.Exec(ctx, &scope)
|
||||
err := r.pipe.Exec(ctx, &scope, r.meta.async)
|
||||
|
||||
if err != nil {
|
||||
// call the error handler
|
||||
|
||||
@ -85,6 +85,10 @@ func (s *apigw) loadRoutes(ctx context.Context) (rr []*route, err error) {
|
||||
ID: r.ID,
|
||||
endpoint: r.Endpoint,
|
||||
method: r.Method,
|
||||
meta: routeMeta{
|
||||
debug: r.Meta.Debug,
|
||||
async: r.Meta.Async,
|
||||
},
|
||||
}
|
||||
|
||||
rr = append(rr, route)
|
||||
@ -93,7 +97,7 @@ func (s *apigw) loadRoutes(ctx context.Context) (rr []*route, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *apigw) loadFunctions(ctx context.Context, route uint64) (ff []*st.ApigwFilter, err error) {
|
||||
func (s *apigw) loadFilters(ctx context.Context, route uint64) (ff []*st.ApigwFilter, err error) {
|
||||
ff, _, err = s.storer.SearchApigwFilters(ctx, st.ApigwFilterFilter{RouteID: route})
|
||||
return
|
||||
}
|
||||
@ -147,19 +151,30 @@ func (s *apigw) Router(r chi.Router) {
|
||||
|
||||
// init all the routes
|
||||
func (s *apigw) Init(ctx context.Context, route ...*route) {
|
||||
var (
|
||||
hasPostFilters bool
|
||||
defaultPostFilter types.Handler
|
||||
)
|
||||
|
||||
s.routes = route
|
||||
|
||||
s.log.Debug("registering routes", zap.Int("count", len(s.routes)))
|
||||
|
||||
defaultPostFilter, err := s.reg.Get("defaultJsonResponse")
|
||||
|
||||
if err != nil {
|
||||
s.log.Error("could not register default filter", zap.Error(err))
|
||||
}
|
||||
|
||||
for _, r := range s.routes {
|
||||
hasPostFilters = false
|
||||
log := s.log.With(zap.String("route", r.String()))
|
||||
|
||||
r.pipe = pipeline.NewPipeline(log)
|
||||
|
||||
r.opts = s.opts
|
||||
r.log = log
|
||||
|
||||
regFuncs, err := s.loadFunctions(ctx, r.ID)
|
||||
regFilters, err := s.loadFilters(ctx, r.ID)
|
||||
|
||||
if err != nil {
|
||||
log.Error("could not load functions for route", zap.Error(err))
|
||||
@ -168,18 +183,18 @@ func (s *apigw) Init(ctx context.Context, route ...*route) {
|
||||
|
||||
r.pipe.ErrorHandler(filter.NewErrorHandler("error handler expediter", []string{}))
|
||||
|
||||
for _, f := range regFuncs {
|
||||
for _, f := range regFilters {
|
||||
h, err := s.reg.Get(f.Ref)
|
||||
|
||||
if err != nil {
|
||||
log.Error("could not register function for route", zap.Error(err))
|
||||
log.Error("could not register filter", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
enc, err := json.Marshal(f.Params)
|
||||
|
||||
if err != nil {
|
||||
log.Error("could not load params for function", zap.String("ref", f.Ref), zap.Error(err))
|
||||
log.Error("could not load params for filter", zap.String("ref", f.Ref), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -190,9 +205,21 @@ func (s *apigw) Init(ctx context.Context, route ...*route) {
|
||||
continue
|
||||
}
|
||||
|
||||
// check if it's a postfilter for async support
|
||||
if f.Kind == string(types.PostFilter) {
|
||||
hasPostFilters = true
|
||||
}
|
||||
|
||||
r.pipe.Add(h)
|
||||
}
|
||||
|
||||
// add default postfilter on async
|
||||
// routes if not present
|
||||
if r.meta.async && !hasPostFilters {
|
||||
log.Info("registering default postfilter", zap.Error(err))
|
||||
r.pipe.Add(defaultPostFilter)
|
||||
}
|
||||
|
||||
log.Debug("successfuly registered route")
|
||||
}
|
||||
}
|
||||
|
||||
@ -27,8 +27,8 @@ func Test_serviceLoadRoutes(t *testing.T) {
|
||||
mockStorer := &types.MockStorer{
|
||||
R: func(c context.Context, arf st.ApigwRouteFilter) (s st.ApigwRouteSet, f st.ApigwRouteFilter, err error) {
|
||||
s = st.ApigwRouteSet{
|
||||
{ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0},
|
||||
{ID: 2, Endpoint: "/endpoint2", Method: "POST", Debug: false, Enabled: true, Group: 0},
|
||||
{ID: 1, Endpoint: "/endpoint", Method: "GET", Enabled: true, Group: 0},
|
||||
{ID: 2, Endpoint: "/endpoint2", Method: "POST", Enabled: true, Group: 0},
|
||||
}
|
||||
return
|
||||
},
|
||||
@ -64,7 +64,7 @@ func Test_serviceLoadFunctions(t *testing.T) {
|
||||
storer: mockStorer,
|
||||
}
|
||||
|
||||
r, err := service.loadFunctions(ctx, 2)
|
||||
r, err := service.loadFilters(ctx, 2)
|
||||
|
||||
req.NoError(err)
|
||||
req.Len(r, 2)
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
package types
|
||||
|
||||
const (
|
||||
PreFilter FilterKind = "pre"
|
||||
PostFilter FilterKind = "post"
|
||||
PreFilter FilterKind = "prefilter"
|
||||
PostFilter FilterKind = "postfilter"
|
||||
Processer FilterKind = "processer"
|
||||
)
|
||||
|
||||
@ -10,12 +10,11 @@ type (
|
||||
FilterKind string
|
||||
|
||||
FilterMeta struct {
|
||||
Step int `json:"step"`
|
||||
Weight int `json:"-"`
|
||||
Name string `json:"name"`
|
||||
Label string `json:"label"`
|
||||
Kind FilterKind `json:"kind"`
|
||||
Args []*FilterMetaArg `json:"params,omitempty"`
|
||||
Wgt int `json:"-"`
|
||||
Name string `json:"name"`
|
||||
Label string `json:"label"`
|
||||
Kind FilterKind `json:"kind"`
|
||||
Args []*FilterMetaArg `json:"params,omitempty"`
|
||||
}
|
||||
|
||||
FilterMetaList []*FilterMeta
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
type (
|
||||
Execer interface {
|
||||
Exec(context.Context, *Scp) error
|
||||
Type() FilterKind
|
||||
}
|
||||
|
||||
Sorter interface {
|
||||
@ -31,6 +32,7 @@ type (
|
||||
Handler interface {
|
||||
Execer
|
||||
Stringer
|
||||
Sorter
|
||||
|
||||
Merge([]byte) (Handler, error)
|
||||
Meta() FilterMeta
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/expr"
|
||||
"github.com/cortezaproject/corteza-server/pkg/options"
|
||||
)
|
||||
|
||||
@ -20,8 +21,14 @@ func (s Scp) Keys() (kk []string) {
|
||||
}
|
||||
|
||||
func (s Scp) Request() *http.Request {
|
||||
// todo - fix with expr.Request
|
||||
if _, ok := s["request"]; ok {
|
||||
return s["request"].(*http.Request)
|
||||
switch s["request"].(type) {
|
||||
case *http.Request:
|
||||
return s["request"].(*http.Request)
|
||||
case *expr.Any:
|
||||
return s["request"].(*expr.Any).Get().(*http.Request)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
type (
|
||||
MockExecer struct {
|
||||
Exec_ func(context.Context, *Scp) (err error)
|
||||
Type_ func() FilterKind
|
||||
}
|
||||
|
||||
MockErrorExecer struct {
|
||||
@ -34,6 +35,14 @@ func (h MockHandler) String() string {
|
||||
return "MockHandler"
|
||||
}
|
||||
|
||||
func (h MockHandler) Type() FilterKind {
|
||||
return PreFilter
|
||||
}
|
||||
|
||||
func (h MockHandler) Weight() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (h MockHandler) Exec(_ context.Context, _ *Scp) error {
|
||||
panic("not implemented") // TODO: Implement
|
||||
}
|
||||
@ -61,6 +70,14 @@ func (me MockExecer) String() string {
|
||||
return "MockExecer"
|
||||
}
|
||||
|
||||
func (h MockExecer) Type() FilterKind {
|
||||
return PreFilter
|
||||
}
|
||||
|
||||
func (h MockExecer) Weight() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (me MockExecer) Exec(ctx context.Context, s *Scp) (err error) {
|
||||
return me.Exec_(ctx, s)
|
||||
}
|
||||
|
||||
@ -10,9 +10,9 @@ fields:
|
||||
- { field: ID, sortable: false }
|
||||
- { field: Endpoint }
|
||||
- { field: Method }
|
||||
- { field: Debug }
|
||||
- { field: Enabled }
|
||||
- { field: Group }
|
||||
- { field: Meta, type: "types.ApigwRouteMeta" }
|
||||
- { field: CreatedBy }
|
||||
- { field: UpdatedBy }
|
||||
- { field: DeletedBy }
|
||||
|
||||
6
store/rdbms/apigw_route.gen.go
generated
6
store/rdbms/apigw_route.gen.go
generated
@ -402,9 +402,9 @@ func (s Store) internalApigwRouteRowScanner(row rowScanner) (res *types.ApigwRou
|
||||
&res.ID,
|
||||
&res.Endpoint,
|
||||
&res.Method,
|
||||
&res.Debug,
|
||||
&res.Enabled,
|
||||
&res.Group,
|
||||
&res.Meta,
|
||||
&res.CreatedBy,
|
||||
&res.UpdatedBy,
|
||||
&res.DeletedBy,
|
||||
@ -453,9 +453,9 @@ func (Store) apigwRouteColumns(aa ...string) []string {
|
||||
alias + "id",
|
||||
alias + "endpoint",
|
||||
alias + "method",
|
||||
alias + "debug",
|
||||
alias + "enabled",
|
||||
alias + "rel_group",
|
||||
alias + "meta",
|
||||
alias + "created_by",
|
||||
alias + "updated_by",
|
||||
alias + "deleted_by",
|
||||
@ -485,9 +485,9 @@ func (s Store) internalApigwRouteEncoder(res *types.ApigwRoute) store.Payload {
|
||||
"id": res.ID,
|
||||
"endpoint": res.Endpoint,
|
||||
"method": res.Method,
|
||||
"debug": res.Debug,
|
||||
"enabled": res.Enabled,
|
||||
"rel_group": res.Group,
|
||||
"meta": res.Meta,
|
||||
"created_by": res.CreatedBy,
|
||||
"updated_by": res.UpdatedBy,
|
||||
"deleted_by": res.DeletedBy,
|
||||
|
||||
@ -674,7 +674,7 @@ func (Schema) ApigwRoute() *Table {
|
||||
ColumnDef("endpoint", ColumnTypeText, ColumnTypeLength(resourceLength)),
|
||||
ColumnDef("method", ColumnTypeText, ColumnTypeLength(handleLength)),
|
||||
ColumnDef("enabled", ColumnTypeBoolean),
|
||||
ColumnDef("debug", ColumnTypeBoolean),
|
||||
ColumnDef("meta", ColumnTypeJson),
|
||||
ColumnDef("rel_group", ColumnTypeIdentifier),
|
||||
CUDTimestamps,
|
||||
CUDUsers,
|
||||
|
||||
@ -1510,6 +1510,7 @@ endpoints:
|
||||
entrypoint: apigwRoute
|
||||
authentication: []
|
||||
imports:
|
||||
- github.com/cortezaproject/corteza-server/system/types
|
||||
- github.com/cortezaproject/corteza-server/pkg/label
|
||||
apis:
|
||||
- name: list
|
||||
@ -1534,9 +1535,9 @@ endpoints:
|
||||
post:
|
||||
- { name: endpoint, type: string, required: true, title: "Route endpoint" }
|
||||
- { name: method, type: string, title: "Route method" }
|
||||
- { name: debug, type: bool, title: "Debug route" }
|
||||
- { name: enabled, type: bool, title: "Is route enabled" }
|
||||
- { name: group, type: uint64, title: "Route group" }
|
||||
- { name: meta, type: "types.ApigwRouteMeta", title: "Route meta", parser: "types.ParseApigwRouteMeta" }
|
||||
- name: update
|
||||
method: PUT
|
||||
title: Update route details
|
||||
@ -1546,9 +1547,9 @@ endpoints:
|
||||
post:
|
||||
- { name: endpoint, type: string, required: true, title: "Route endpoint" }
|
||||
- { name: method, type: string, title: "Route method" }
|
||||
- { name: debug, type: bool, title: "Debug route" }
|
||||
- { name: enabled, type: bool, title: "Is route enabled" }
|
||||
- { name: group, type: uint64, title: "Route group" }
|
||||
- { name: meta, type: "types.ApigwRouteMeta", title: "Route meta", parser: "types.ParseApigwRouteMeta" }
|
||||
- name: read
|
||||
method: GET
|
||||
title: Read route details
|
||||
|
||||
@ -69,7 +69,6 @@ func (ctrl *ApigwRoute) Create(ctx context.Context, r *request.ApigwRouteCreate)
|
||||
q = &types.ApigwRoute{
|
||||
Endpoint: r.Endpoint,
|
||||
Method: r.Method,
|
||||
Debug: r.Debug,
|
||||
Enabled: r.Enabled,
|
||||
}
|
||||
)
|
||||
@ -90,7 +89,6 @@ func (ctrl *ApigwRoute) Update(ctx context.Context, r *request.ApigwRouteUpdate)
|
||||
ID: r.RouteID,
|
||||
Endpoint: r.Endpoint,
|
||||
Method: r.Method,
|
||||
Debug: r.Debug,
|
||||
Group: uint64(r.Group),
|
||||
Enabled: r.Enabled,
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/cortezaproject/corteza-server/pkg/label"
|
||||
"github.com/cortezaproject/corteza-server/pkg/payload"
|
||||
"github.com/cortezaproject/corteza-server/system/types"
|
||||
"github.com/go-chi/chi"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
@ -87,11 +88,6 @@ type (
|
||||
// Route method
|
||||
Method string
|
||||
|
||||
// Debug POST parameter
|
||||
//
|
||||
// Debug route
|
||||
Debug bool
|
||||
|
||||
// Enabled POST parameter
|
||||
//
|
||||
// Is route enabled
|
||||
@ -101,6 +97,11 @@ type (
|
||||
//
|
||||
// Route group
|
||||
Group uint64 `json:",string"`
|
||||
|
||||
// Meta POST parameter
|
||||
//
|
||||
// Route meta
|
||||
Meta types.ApigwRouteMeta
|
||||
}
|
||||
|
||||
ApigwRouteUpdate struct {
|
||||
@ -119,11 +120,6 @@ type (
|
||||
// Route method
|
||||
Method string
|
||||
|
||||
// Debug POST parameter
|
||||
//
|
||||
// Debug route
|
||||
Debug bool
|
||||
|
||||
// Enabled POST parameter
|
||||
//
|
||||
// Is route enabled
|
||||
@ -133,6 +129,11 @@ type (
|
||||
//
|
||||
// Route group
|
||||
Group uint64 `json:",string"`
|
||||
|
||||
// Meta POST parameter
|
||||
//
|
||||
// Route meta
|
||||
Meta types.ApigwRouteMeta
|
||||
}
|
||||
|
||||
ApigwRouteRead struct {
|
||||
@ -296,9 +297,9 @@ func (r ApigwRouteCreate) Auditable() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"endpoint": r.Endpoint,
|
||||
"method": r.Method,
|
||||
"debug": r.Debug,
|
||||
"enabled": r.Enabled,
|
||||
"group": r.Group,
|
||||
"meta": r.Meta,
|
||||
}
|
||||
}
|
||||
|
||||
@ -312,11 +313,6 @@ func (r ApigwRouteCreate) GetMethod() string {
|
||||
return r.Method
|
||||
}
|
||||
|
||||
// Auditable returns all auditable/loggable parameters
|
||||
func (r ApigwRouteCreate) GetDebug() bool {
|
||||
return r.Debug
|
||||
}
|
||||
|
||||
// Auditable returns all auditable/loggable parameters
|
||||
func (r ApigwRouteCreate) GetEnabled() bool {
|
||||
return r.Enabled
|
||||
@ -327,6 +323,11 @@ func (r ApigwRouteCreate) GetGroup() uint64 {
|
||||
return r.Group
|
||||
}
|
||||
|
||||
// Auditable returns all auditable/loggable parameters
|
||||
func (r ApigwRouteCreate) GetMeta() types.ApigwRouteMeta {
|
||||
return r.Meta
|
||||
}
|
||||
|
||||
// Fill processes request and fills internal variables
|
||||
func (r *ApigwRouteCreate) Fill(req *http.Request) (err error) {
|
||||
|
||||
@ -362,13 +363,6 @@ func (r *ApigwRouteCreate) Fill(req *http.Request) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := req.Form["debug"]; ok && len(val) > 0 {
|
||||
r.Debug, err = payload.ParseBool(val[0]), nil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := req.Form["enabled"]; ok && len(val) > 0 {
|
||||
r.Enabled, err = payload.ParseBool(val[0]), nil
|
||||
if err != nil {
|
||||
@ -382,6 +376,18 @@ func (r *ApigwRouteCreate) Fill(req *http.Request) (err error) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := req.Form["meta[]"]; ok {
|
||||
r.Meta, err = types.ParseApigwRouteMeta(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if val, ok := req.Form["meta"]; ok {
|
||||
r.Meta, err = types.ParseApigwRouteMeta(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
@ -398,9 +404,9 @@ func (r ApigwRouteUpdate) Auditable() map[string]interface{} {
|
||||
"routeID": r.RouteID,
|
||||
"endpoint": r.Endpoint,
|
||||
"method": r.Method,
|
||||
"debug": r.Debug,
|
||||
"enabled": r.Enabled,
|
||||
"group": r.Group,
|
||||
"meta": r.Meta,
|
||||
}
|
||||
}
|
||||
|
||||
@ -419,11 +425,6 @@ func (r ApigwRouteUpdate) GetMethod() string {
|
||||
return r.Method
|
||||
}
|
||||
|
||||
// Auditable returns all auditable/loggable parameters
|
||||
func (r ApigwRouteUpdate) GetDebug() bool {
|
||||
return r.Debug
|
||||
}
|
||||
|
||||
// Auditable returns all auditable/loggable parameters
|
||||
func (r ApigwRouteUpdate) GetEnabled() bool {
|
||||
return r.Enabled
|
||||
@ -434,6 +435,11 @@ func (r ApigwRouteUpdate) GetGroup() uint64 {
|
||||
return r.Group
|
||||
}
|
||||
|
||||
// Auditable returns all auditable/loggable parameters
|
||||
func (r ApigwRouteUpdate) GetMeta() types.ApigwRouteMeta {
|
||||
return r.Meta
|
||||
}
|
||||
|
||||
// Fill processes request and fills internal variables
|
||||
func (r *ApigwRouteUpdate) Fill(req *http.Request) (err error) {
|
||||
|
||||
@ -469,13 +475,6 @@ func (r *ApigwRouteUpdate) Fill(req *http.Request) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := req.Form["debug"]; ok && len(val) > 0 {
|
||||
r.Debug, err = payload.ParseBool(val[0]), nil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := req.Form["enabled"]; ok && len(val) > 0 {
|
||||
r.Enabled, err = payload.ParseBool(val[0]), nil
|
||||
if err != nil {
|
||||
@ -489,6 +488,18 @@ func (r *ApigwRouteUpdate) Fill(req *http.Request) (err error) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if val, ok := req.Form["meta[]"]; ok {
|
||||
r.Meta, err = types.ParseApigwRouteMeta(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if val, ok := req.Form["meta"]; ok {
|
||||
r.Meta, err = types.ParseApigwRouteMeta(val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@ -2,13 +2,14 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/actionlog"
|
||||
intAuth "github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
"github.com/cortezaproject/corteza-server/system/types"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@ -1,19 +1,22 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/filter"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type (
|
||||
ApigwRoute struct {
|
||||
ID uint64 `json:"routeID,string"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Method string `json:"method"`
|
||||
Debug bool `json:"debug"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Group uint64 `json:"group,string"`
|
||||
ID uint64 `json:"routeID,string"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Method string `json:"method"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Group uint64 `json:"group,string"`
|
||||
Meta ApigwRouteMeta `json:"meta"`
|
||||
|
||||
CreatedAt time.Time `json:"createdAt,omitempty"`
|
||||
CreatedBy uint64 `json:"createdBy,string" `
|
||||
@ -23,6 +26,11 @@ type (
|
||||
DeletedBy uint64 `json:"deletedBy,string,omitempty" `
|
||||
}
|
||||
|
||||
ApigwRouteMeta struct {
|
||||
Debug bool `json:"debug"`
|
||||
Async bool `json:"async"`
|
||||
}
|
||||
|
||||
ApigwRouteFilter struct {
|
||||
Route string `json:"route"`
|
||||
Group string `json:"group"`
|
||||
@ -40,3 +48,22 @@ type (
|
||||
filter.Paging
|
||||
}
|
||||
)
|
||||
|
||||
func (cc *ApigwRouteMeta) Scan(value interface{}) error {
|
||||
//lint:ignore S1034 This typecast is intentional, we need to get []byte out of a []uint8
|
||||
switch value.(type) {
|
||||
case nil:
|
||||
*cc = ApigwRouteMeta{}
|
||||
case []uint8:
|
||||
b := value.([]byte)
|
||||
if err := json.Unmarshal(b, cc); err != nil {
|
||||
return errors.Wrapf(err, "cannot scan '%v' into ApigwRouteMeta", string(b))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cc ApigwRouteMeta) Value() (driver.Value, error) {
|
||||
return json.Marshal(cc)
|
||||
}
|
||||
|
||||
@ -4,6 +4,11 @@ import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
func ParseApigwRouteMeta(ss []string) (p ApigwRouteMeta, err error) {
|
||||
p = ApigwRouteMeta{}
|
||||
return p, parseStringsInput(ss, p)
|
||||
}
|
||||
|
||||
func ParseApigwfFilterParams(ss []string) (p ApigwFilterParams, err error) {
|
||||
p = ApigwFilterParams{}
|
||||
return p, parseStringsInput(ss, p)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user