diff --git a/pkg/apigw/ctx/ctx.go b/pkg/apigw/ctx/ctx.go index a2c226802..2556acc31 100644 --- a/pkg/apigw/ctx/ctx.go +++ b/pkg/apigw/ctx/ctx.go @@ -14,6 +14,12 @@ func ScopeToContext(ctx context.Context, s *types.Scp) context.Context { return context.WithValue(ctx, ContextKeyScope, s) } -func ScopeFromContext(ctx context.Context) *types.Scp { - return ctx.Value(ContextKeyScope).(*types.Scp) +func ScopeFromContext(ctx context.Context) (ss *types.Scp) { + s := ctx.Value(ContextKeyScope) + + if s == nil { + return &types.Scp{} + } + + return s.(*types.Scp) } diff --git a/pkg/apigw/filter/processer.go b/pkg/apigw/filter/processer.go index 6638deaf7..0d17ec5aa 100644 --- a/pkg/apigw/filter/processer.go +++ b/pkg/apigw/filter/processer.go @@ -40,7 +40,7 @@ type ( log *zap.Logger params struct { - Func string `json:"func"` + Func string `json:"jsfunc"` Encode bool `json:"encode"` } } diff --git a/pkg/apigw/filter/processer_test.go b/pkg/apigw/filter/processer_test.go index b093da438..ec86d77e9 100644 --- a/pkg/apigw/filter/processer_test.go +++ b/pkg/apigw/filter/processer_test.go @@ -113,5 +113,5 @@ func Test_processerPayload(t *testing.T) { } func prepareFuncPayload(s string) string { - return fmt.Sprintf(`{"func": "%s"}`, base64.StdEncoding.EncodeToString([]byte(s))) + return fmt.Sprintf(`{"jsfunc": "%s"}`, base64.StdEncoding.EncodeToString([]byte(s))) } diff --git a/pkg/apigw/pipeline/chain.go b/pkg/apigw/pipeline/chain.go new file mode 100644 index 000000000..9a3e0bbf3 --- /dev/null +++ b/pkg/apigw/pipeline/chain.go @@ -0,0 +1,19 @@ +package pipeline + +import ( + "net/http" + + "github.com/go-chi/chi" +) + +type ( + chiHandlerChain struct { + chain []func(http.Handler) http.Handler + } +) + +func (cc chiHandlerChain) Handler() (h http.Handler) { + return chi. + Chain(cc.chain...). + HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {}) +} diff --git a/pkg/apigw/pipeline/pipeline.go b/pkg/apigw/pipeline/pipeline.go index c89a50301..43bf08e17 100644 --- a/pkg/apigw/pipeline/pipeline.go +++ b/pkg/apigw/pipeline/pipeline.go @@ -6,15 +6,16 @@ import ( "time" "github.com/cortezaproject/corteza-server/pkg/apigw/types" - "github.com/go-chi/chi" "go.uber.org/zap" ) type ( Worker struct { - Handler func(rw http.ResponseWriter, r *http.Request) error Weight int + Async bool Name string + Type types.FilterKind + Handler func(rw http.ResponseWriter, r *http.Request) error } workerSet []*Worker @@ -23,6 +24,7 @@ type ( workers workerSet err types.ErrorHandlerFunc log *zap.Logger + async bool } ) @@ -37,6 +39,10 @@ func NewPipeline(log *zap.Logger) *Pl { } } +func (pp *Pl) Async(a bool) { + pp.async = a +} + func (pp *Pl) Error() types.ErrorHandlerFunc { return pp.err } @@ -46,47 +52,63 @@ func (pp *Pl) ErrorHandler(ff types.ErrorHandlerFunc) { pp.err = ff } -// add filter +// Add filter func (pp *Pl) Add(w *Worker) { pp.workers = append(pp.workers, w) sort.Sort(pp.workers) } -func (pp *Pl) AddHandler(h http.Handler) {} - +// Handler is the main operating entry point for requests +// that handles filter groups func (pp *Pl) Handler() http.Handler { - var ( - middleware []func(http.Handler) http.Handler - ) + // use the chi implementation of chains + chiChain := chiHandlerChain{ + chain: pp.makeMiddleware(pp.workers...), + } - for _, wrker := range pp.workers { + return chiChain.Handler() +} + +// makeMiddleware creates a list of handlers from workers +// it is used in chaining +func (pp *Pl) makeMiddleware(hh ...*Worker) (middleware []func(http.Handler) http.Handler) { + for _, wrker := range hh { middleware = append(middleware, pp.makeHandler(*wrker)) } - return chi.Chain(middleware...).Handler(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {})) + return middleware } +// makeHandler creates a handler from worker, it also +// wraps the error handling func (pp *Pl) makeHandler(hh Worker) func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { var ( + err error start = time.Now() + log = pp.log.With(zap.String("filter", hh.Name)) ) - pp.log.Debug("started processing", zap.String("filter", hh.Name)) + log.Debug("started processing", zap.Bool("async", hh.Async)) - // if w.async { - // ctx = context.Background() - // r.WithContext(context.Background()) - // go w.handler(rw, r) - // next.ServeHTTP(rw, r) - // } else { + fn := func() (err error) { + err = hh.Handler(rw, r) + log.Debug("finished processing", zap.Duration("duration", time.Since(start))) + return + } - err := hh.Handler(rw, r) - - pp.log.Debug("finished processing", - zap.String("filter", hh.Name), - zap.Duration("duration", time.Since(start))) + if hh.Async { + go func() { + // only log error, do not call error handler, + // since we do not reply back the response (it was already sent) + if err = fn(); err != nil { + log.Error(err.Error()) + } + }() + } else { + err = fn() + } if err != nil { pp.err(rw, r, err) @@ -94,8 +116,6 @@ func (pp *Pl) makeHandler(hh Worker) func(next http.Handler) http.Handler { } else { next.ServeHTTP(rw, r) } - // } - }) } } @@ -105,3 +125,18 @@ func (a workerSet) Less(i, j int) bool { return a[i].Weight < a[j].Weight } func (a workerSet) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (a workerSet) Filter(f func(*Worker) (bool, error)) (out workerSet, err error) { + var ok bool + out = workerSet{} + + for i := range a { + if ok, err = f(a[i]); err != nil { + return + } else if ok { + out = append(out, a[i]) + } + } + + return +} diff --git a/pkg/apigw/service.go b/pkg/apigw/service.go index 9f43bc8d2..cb4b325a1 100644 --- a/pkg/apigw/service.go +++ b/pkg/apigw/service.go @@ -154,7 +154,6 @@ func (s *apigw) Router(r chi.Router) { // init all the routes func (s *apigw) Init(ctx context.Context, route ...*route) { var ( - hasPostFilters bool defaultPostFilter types.Handler ) @@ -174,7 +173,10 @@ func (s *apigw) Init(ctx context.Context, route ...*route) { pipe = pipeline.NewPipeline(log) ) - hasPostFilters = false + // pipeline needs to know how to handle + // async processers + pipe.Async(r.meta.async) + r.opts = s.opts r.log = log @@ -188,6 +190,13 @@ func (s *apigw) Init(ctx context.Context, route ...*route) { for _, f := range regFilters { flog := log.With(zap.String("ref", f.Ref)) + // make sure there is only one postfilter + // on async routes + if r.meta.async && f.Kind == string(types.PostFilter) { + flog.Debug("not registering filter for async route") + continue + } + ff, err := s.registerFilter(f, r) if err != nil { @@ -197,29 +206,25 @@ func (s *apigw) Init(ctx context.Context, route ...*route) { pipe.Add(ff) - // check if it's a postfilter for async support - if f.Kind == string(types.PostFilter) { - hasPostFilters = true - } - flog.Debug("registered filter") } - r.handler = pipe.Handler() - r.errHandler = pipe.Error() - // add default postfilter on async // routes if not present - if r.meta.async && !hasPostFilters { + if r.meta.async { log.Info("registering default postfilter", zap.Error(err)) pipe.Add(&pipeline.Worker{ Handler: defaultPostFilter.Handler(), Name: defaultPostFilter.String(), + Type: types.PostFilter, Weight: math.MaxInt8, }) } + r.handler = pipe.Handler() + r.errHandler = pipe.Error() + log.Debug("successfuly registered route") } } @@ -246,8 +251,10 @@ func (s *apigw) registerFilter(f *st.ApigwFilter, r *route) (ff *pipeline.Worker } ff = &pipeline.Worker{ + Async: r.meta.async && f.Kind == string(types.Processer), Handler: handler.Handler(), Name: handler.String(), + Type: types.FilterKind(f.Kind), Weight: filter.FilterWeight(int(f.Weight), types.FilterKind(f.Kind)), } diff --git a/pkg/apigw/types/handler.go b/pkg/apigw/types/handler.go index b1a927639..6ab71497b 100644 --- a/pkg/apigw/types/handler.go +++ b/pkg/apigw/types/handler.go @@ -1,14 +1,11 @@ package types import ( + "fmt" "net/http" ) type ( - Stringer interface { - String() string - } - HTTPHandler interface { Handler() HandlerFunc } @@ -19,7 +16,7 @@ type ( Handler interface { HTTPHandler - Stringer + fmt.Stringer Merge([]byte) (Handler, error) Meta() FilterMeta diff --git a/pkg/options/apigw.gen.go b/pkg/options/apigw.gen.go index 3b9592318..3a68c82ed 100644 --- a/pkg/options/apigw.gen.go +++ b/pkg/options/apigw.gen.go @@ -15,6 +15,7 @@ import ( type ( ApigwOpt struct { Enabled bool `env:"APIGW_ENABLED"` + Debug bool `env:"APIGW_DEBUG"` LogEnabled bool `env:"APIGW_LOG_ENABLED"` LogRequestBody bool `env:"APIGW_LOG_REQUEST_BODY"` ProxyEnableDebugLog bool `env:"APIGW_PROXY_ENABLE_DEBUG_LOG"` @@ -27,6 +28,7 @@ type ( func Apigw() (o *ApigwOpt) { o = &ApigwOpt{ Enabled: true, + Debug: false, LogEnabled: false, LogRequestBody: false, ProxyEnableDebugLog: false, diff --git a/pkg/options/apigw.yaml b/pkg/options/apigw.yaml index 2e08937b7..e769283af 100644 --- a/pkg/options/apigw.yaml +++ b/pkg/options/apigw.yaml @@ -11,6 +11,12 @@ props: description: |- Enable API Gateway + - name: Debug + type: bool + default: false + description: |- + Enable API Gateway debugging info + - name: logEnabled type: bool default: false diff --git a/system/service/apigw_filter.go b/system/service/apigw_filter.go index e5b2ce6db..963eb3cfc 100644 --- a/system/service/apigw_filter.go +++ b/system/service/apigw_filter.go @@ -5,6 +5,7 @@ import ( "github.com/cortezaproject/corteza-server/pkg/actionlog" "github.com/cortezaproject/corteza-server/pkg/apigw" + agtypes "github.com/cortezaproject/corteza-server/pkg/apigw/types" a "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/store" "github.com/cortezaproject/corteza-server/system/types" @@ -83,6 +84,13 @@ func (svc *apigwFilter) Create(ctx context.Context, new *types.ApigwFilter) (q * return ApigwFilterErrNotFound(qProps) } + // check for existing filters if route is async + if r.Meta.Async { + if err = svc.validateAsyncRoute(ctx, r, new, qProps); err != nil { + return + } + } + if err = store.CreateApigwFilter(ctx, svc.store, new); err != nil { return err } @@ -100,6 +108,33 @@ func (svc *apigwFilter) Create(ctx context.Context, new *types.ApigwFilter) (q * return q, svc.recordAction(ctx, qProps, ApigwFilterActionCreate, err) } +func (svc *apigwFilter) validateAsyncRoute(ctx context.Context, r *types.ApigwRoute, f *types.ApigwFilter, props *apigwFilterActionProps) (err error) { + filters, _, err := svc.Search(ctx, types.ApigwFilterFilter{ + RouteID: r.ID, + Enabled: true, + }) + + if err != nil { + return err + } + + if f.Kind == string(agtypes.Processer) { + processers, _ := filters.Filter(func(af *types.ApigwFilter) (bool, error) { + return af.Kind == string(agtypes.Processer), nil + }) + + if len(processers) == 1 { + return ApigwFilterErrAsyncRouteTooManyProcessers(props) + } + } + + if f.Kind == string(agtypes.PostFilter) { + return ApigwFilterErrAsyncRouteTooManyAfterFilters(props) + } + + return +} + func (svc *apigwFilter) Update(ctx context.Context, upd *types.ApigwFilter) (q *types.ApigwFilter, err error) { var ( qProps = &apigwFilterActionProps{filter: upd} diff --git a/system/service/apigw_filter_actions.gen.go b/system/service/apigw_filter_actions.gen.go index 892a40830..48ec1d871 100644 --- a/system/service/apigw_filter_actions.gen.go +++ b/system/service/apigw_filter_actions.gen.go @@ -615,6 +615,78 @@ func ApigwFilterErrNotAllowedToUndelete(mm ...*apigwFilterActionProps) *errors.E return e } +// ApigwFilterErrAsyncRouteTooManyProcessers returns "system:filter.asyncRouteTooManyProcessers" as *errors.Error +// +// +// This function is auto-generated. +// +func ApigwFilterErrAsyncRouteTooManyProcessers(mm ...*apigwFilterActionProps) *errors.Error { + var p = &apigwFilterActionProps{} + if len(mm) > 0 { + p = mm[0] + } + + var e = errors.New( + errors.KindInternal, + + p.Format("processer already exists for this async route", nil), + + errors.Meta("type", "asyncRouteTooManyProcessers"), + errors.Meta("resource", "system:filter"), + + // action log entry; no formatting, it will be applied inside recordAction fn. + errors.Meta(apigwFilterLogMetaKey{}, "failed to add {{filter}}; too many processers, async route"), + errors.Meta(apigwFilterPropsMetaKey{}, p), + + // translation namespace & key + errors.Meta(locale.ErrorMetaNamespace{}, "system"), + errors.Meta(locale.ErrorMetaKey{}, "apigwFilter.errors.asyncRouteTooManyProcessers"), + + errors.StackSkip(1), + ) + + if len(mm) > 0 { + } + + return e +} + +// ApigwFilterErrAsyncRouteTooManyAfterFilters returns "system:filter.asyncRouteTooManyAfterFilters" as *errors.Error +// +// +// This function is auto-generated. +// +func ApigwFilterErrAsyncRouteTooManyAfterFilters(mm ...*apigwFilterActionProps) *errors.Error { + var p = &apigwFilterActionProps{} + if len(mm) > 0 { + p = mm[0] + } + + var e = errors.New( + errors.KindInternal, + + p.Format("no after filters are allowd for this async route", nil), + + errors.Meta("type", "asyncRouteTooManyAfterFilters"), + errors.Meta("resource", "system:filter"), + + // action log entry; no formatting, it will be applied inside recordAction fn. + errors.Meta(apigwFilterLogMetaKey{}, "failed to add {{filter}}; too many afterfilters, async route"), + errors.Meta(apigwFilterPropsMetaKey{}, p), + + // translation namespace & key + errors.Meta(locale.ErrorMetaNamespace{}, "system"), + errors.Meta(locale.ErrorMetaKey{}, "apigwFilter.errors.asyncRouteTooManyAfterFilters"), + + errors.StackSkip(1), + ) + + if len(mm) > 0 { + } + + return e +} + // ********************************************************************************************************************* // ********************************************************************************************************************* diff --git a/system/service/apigw_filter_actions.yaml b/system/service/apigw_filter_actions.yaml index 5a994939f..928af0002 100644 --- a/system/service/apigw_filter_actions.yaml +++ b/system/service/apigw_filter_actions.yaml @@ -73,3 +73,11 @@ errors: - error: notAllowedToUndelete message: "not allowed to undelete this filter" log: "failed to undelete {{filter}}; insufficient permissions" + + - error: asyncRouteTooManyProcessers + message: "processer already exists for this async route" + log: "failed to add {{filter}}; too many processers, async route" + + - error: asyncRouteTooManyAfterFilters + message: "no after filters are allowd for this async route" + log: "failed to add {{filter}}; too many afterfilters, async route"