3
0

Added support for async routes

This commit is contained in:
Peter Grlica 2021-08-26 12:40:13 +02:00
parent 35912c61ec
commit f02ff7338b
12 changed files with 231 additions and 44 deletions

View File

@ -14,6 +14,12 @@ func ScopeToContext(ctx context.Context, s *types.Scp) context.Context {
return context.WithValue(ctx, ContextKeyScope, s)
}
func ScopeFromContext(ctx context.Context) *types.Scp {
return ctx.Value(ContextKeyScope).(*types.Scp)
func ScopeFromContext(ctx context.Context) (ss *types.Scp) {
s := ctx.Value(ContextKeyScope)
if s == nil {
return &types.Scp{}
}
return s.(*types.Scp)
}

View File

@ -40,7 +40,7 @@ type (
log *zap.Logger
params struct {
Func string `json:"func"`
Func string `json:"jsfunc"`
Encode bool `json:"encode"`
}
}

View File

@ -113,5 +113,5 @@ func Test_processerPayload(t *testing.T) {
}
func prepareFuncPayload(s string) string {
return fmt.Sprintf(`{"func": "%s"}`, base64.StdEncoding.EncodeToString([]byte(s)))
return fmt.Sprintf(`{"jsfunc": "%s"}`, base64.StdEncoding.EncodeToString([]byte(s)))
}

View File

@ -0,0 +1,19 @@
package pipeline
import (
"net/http"
"github.com/go-chi/chi"
)
type (
chiHandlerChain struct {
chain []func(http.Handler) http.Handler
}
)
func (cc chiHandlerChain) Handler() (h http.Handler) {
return chi.
Chain(cc.chain...).
HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {})
}

View File

@ -6,15 +6,16 @@ import (
"time"
"github.com/cortezaproject/corteza-server/pkg/apigw/types"
"github.com/go-chi/chi"
"go.uber.org/zap"
)
type (
Worker struct {
Handler func(rw http.ResponseWriter, r *http.Request) error
Weight int
Async bool
Name string
Type types.FilterKind
Handler func(rw http.ResponseWriter, r *http.Request) error
}
workerSet []*Worker
@ -23,6 +24,7 @@ type (
workers workerSet
err types.ErrorHandlerFunc
log *zap.Logger
async bool
}
)
@ -37,6 +39,10 @@ func NewPipeline(log *zap.Logger) *Pl {
}
}
func (pp *Pl) Async(a bool) {
pp.async = a
}
func (pp *Pl) Error() types.ErrorHandlerFunc {
return pp.err
}
@ -46,47 +52,63 @@ func (pp *Pl) ErrorHandler(ff types.ErrorHandlerFunc) {
pp.err = ff
}
// add filter
// Add filter
func (pp *Pl) Add(w *Worker) {
pp.workers = append(pp.workers, w)
sort.Sort(pp.workers)
}
func (pp *Pl) AddHandler(h http.Handler) {}
// Handler is the main operating entry point for requests
// that handles filter groups
func (pp *Pl) Handler() http.Handler {
var (
middleware []func(http.Handler) http.Handler
)
// use the chi implementation of chains
chiChain := chiHandlerChain{
chain: pp.makeMiddleware(pp.workers...),
}
for _, wrker := range pp.workers {
return chiChain.Handler()
}
// makeMiddleware creates a list of handlers from workers
// it is used in chaining
func (pp *Pl) makeMiddleware(hh ...*Worker) (middleware []func(http.Handler) http.Handler) {
for _, wrker := range hh {
middleware = append(middleware, pp.makeHandler(*wrker))
}
return chi.Chain(middleware...).Handler(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {}))
return middleware
}
// makeHandler creates a handler from worker, it also
// wraps the error handling
func (pp *Pl) makeHandler(hh Worker) func(next http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
var (
err error
start = time.Now()
log = pp.log.With(zap.String("filter", hh.Name))
)
pp.log.Debug("started processing", zap.String("filter", hh.Name))
log.Debug("started processing", zap.Bool("async", hh.Async))
// if w.async {
// ctx = context.Background()
// r.WithContext(context.Background())
// go w.handler(rw, r)
// next.ServeHTTP(rw, r)
// } else {
fn := func() (err error) {
err = hh.Handler(rw, r)
log.Debug("finished processing", zap.Duration("duration", time.Since(start)))
return
}
err := hh.Handler(rw, r)
pp.log.Debug("finished processing",
zap.String("filter", hh.Name),
zap.Duration("duration", time.Since(start)))
if hh.Async {
go func() {
// only log error, do not call error handler,
// since we do not reply back the response (it was already sent)
if err = fn(); err != nil {
log.Error(err.Error())
}
}()
} else {
err = fn()
}
if err != nil {
pp.err(rw, r, err)
@ -94,8 +116,6 @@ func (pp *Pl) makeHandler(hh Worker) func(next http.Handler) http.Handler {
} else {
next.ServeHTTP(rw, r)
}
// }
})
}
}
@ -105,3 +125,18 @@ func (a workerSet) Less(i, j int) bool {
return a[i].Weight < a[j].Weight
}
func (a workerSet) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a workerSet) Filter(f func(*Worker) (bool, error)) (out workerSet, err error) {
var ok bool
out = workerSet{}
for i := range a {
if ok, err = f(a[i]); err != nil {
return
} else if ok {
out = append(out, a[i])
}
}
return
}

View File

@ -154,7 +154,6 @@ func (s *apigw) Router(r chi.Router) {
// init all the routes
func (s *apigw) Init(ctx context.Context, route ...*route) {
var (
hasPostFilters bool
defaultPostFilter types.Handler
)
@ -174,7 +173,10 @@ func (s *apigw) Init(ctx context.Context, route ...*route) {
pipe = pipeline.NewPipeline(log)
)
hasPostFilters = false
// pipeline needs to know how to handle
// async processers
pipe.Async(r.meta.async)
r.opts = s.opts
r.log = log
@ -188,6 +190,13 @@ func (s *apigw) Init(ctx context.Context, route ...*route) {
for _, f := range regFilters {
flog := log.With(zap.String("ref", f.Ref))
// make sure there is only one postfilter
// on async routes
if r.meta.async && f.Kind == string(types.PostFilter) {
flog.Debug("not registering filter for async route")
continue
}
ff, err := s.registerFilter(f, r)
if err != nil {
@ -197,29 +206,25 @@ func (s *apigw) Init(ctx context.Context, route ...*route) {
pipe.Add(ff)
// check if it's a postfilter for async support
if f.Kind == string(types.PostFilter) {
hasPostFilters = true
}
flog.Debug("registered filter")
}
r.handler = pipe.Handler()
r.errHandler = pipe.Error()
// add default postfilter on async
// routes if not present
if r.meta.async && !hasPostFilters {
if r.meta.async {
log.Info("registering default postfilter", zap.Error(err))
pipe.Add(&pipeline.Worker{
Handler: defaultPostFilter.Handler(),
Name: defaultPostFilter.String(),
Type: types.PostFilter,
Weight: math.MaxInt8,
})
}
r.handler = pipe.Handler()
r.errHandler = pipe.Error()
log.Debug("successfuly registered route")
}
}
@ -246,8 +251,10 @@ func (s *apigw) registerFilter(f *st.ApigwFilter, r *route) (ff *pipeline.Worker
}
ff = &pipeline.Worker{
Async: r.meta.async && f.Kind == string(types.Processer),
Handler: handler.Handler(),
Name: handler.String(),
Type: types.FilterKind(f.Kind),
Weight: filter.FilterWeight(int(f.Weight), types.FilterKind(f.Kind)),
}

View File

@ -1,14 +1,11 @@
package types
import (
"fmt"
"net/http"
)
type (
Stringer interface {
String() string
}
HTTPHandler interface {
Handler() HandlerFunc
}
@ -19,7 +16,7 @@ type (
Handler interface {
HTTPHandler
Stringer
fmt.Stringer
Merge([]byte) (Handler, error)
Meta() FilterMeta

View File

@ -15,6 +15,7 @@ import (
type (
ApigwOpt struct {
Enabled bool `env:"APIGW_ENABLED"`
Debug bool `env:"APIGW_DEBUG"`
LogEnabled bool `env:"APIGW_LOG_ENABLED"`
LogRequestBody bool `env:"APIGW_LOG_REQUEST_BODY"`
ProxyEnableDebugLog bool `env:"APIGW_PROXY_ENABLE_DEBUG_LOG"`
@ -27,6 +28,7 @@ type (
func Apigw() (o *ApigwOpt) {
o = &ApigwOpt{
Enabled: true,
Debug: false,
LogEnabled: false,
LogRequestBody: false,
ProxyEnableDebugLog: false,

View File

@ -11,6 +11,12 @@ props:
description: |-
Enable API Gateway
- name: Debug
type: bool
default: false
description: |-
Enable API Gateway debugging info
- name: logEnabled
type: bool
default: false

View File

@ -5,6 +5,7 @@ import (
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/apigw"
agtypes "github.com/cortezaproject/corteza-server/pkg/apigw/types"
a "github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/system/types"
@ -83,6 +84,13 @@ func (svc *apigwFilter) Create(ctx context.Context, new *types.ApigwFilter) (q *
return ApigwFilterErrNotFound(qProps)
}
// check for existing filters if route is async
if r.Meta.Async {
if err = svc.validateAsyncRoute(ctx, r, new, qProps); err != nil {
return
}
}
if err = store.CreateApigwFilter(ctx, svc.store, new); err != nil {
return err
}
@ -100,6 +108,33 @@ func (svc *apigwFilter) Create(ctx context.Context, new *types.ApigwFilter) (q *
return q, svc.recordAction(ctx, qProps, ApigwFilterActionCreate, err)
}
func (svc *apigwFilter) validateAsyncRoute(ctx context.Context, r *types.ApigwRoute, f *types.ApigwFilter, props *apigwFilterActionProps) (err error) {
filters, _, err := svc.Search(ctx, types.ApigwFilterFilter{
RouteID: r.ID,
Enabled: true,
})
if err != nil {
return err
}
if f.Kind == string(agtypes.Processer) {
processers, _ := filters.Filter(func(af *types.ApigwFilter) (bool, error) {
return af.Kind == string(agtypes.Processer), nil
})
if len(processers) == 1 {
return ApigwFilterErrAsyncRouteTooManyProcessers(props)
}
}
if f.Kind == string(agtypes.PostFilter) {
return ApigwFilterErrAsyncRouteTooManyAfterFilters(props)
}
return
}
func (svc *apigwFilter) Update(ctx context.Context, upd *types.ApigwFilter) (q *types.ApigwFilter, err error) {
var (
qProps = &apigwFilterActionProps{filter: upd}

View File

@ -615,6 +615,78 @@ func ApigwFilterErrNotAllowedToUndelete(mm ...*apigwFilterActionProps) *errors.E
return e
}
// ApigwFilterErrAsyncRouteTooManyProcessers returns "system:filter.asyncRouteTooManyProcessers" as *errors.Error
//
//
// This function is auto-generated.
//
func ApigwFilterErrAsyncRouteTooManyProcessers(mm ...*apigwFilterActionProps) *errors.Error {
var p = &apigwFilterActionProps{}
if len(mm) > 0 {
p = mm[0]
}
var e = errors.New(
errors.KindInternal,
p.Format("processer already exists for this async route", nil),
errors.Meta("type", "asyncRouteTooManyProcessers"),
errors.Meta("resource", "system:filter"),
// action log entry; no formatting, it will be applied inside recordAction fn.
errors.Meta(apigwFilterLogMetaKey{}, "failed to add {{filter}}; too many processers, async route"),
errors.Meta(apigwFilterPropsMetaKey{}, p),
// translation namespace & key
errors.Meta(locale.ErrorMetaNamespace{}, "system"),
errors.Meta(locale.ErrorMetaKey{}, "apigwFilter.errors.asyncRouteTooManyProcessers"),
errors.StackSkip(1),
)
if len(mm) > 0 {
}
return e
}
// ApigwFilterErrAsyncRouteTooManyAfterFilters returns "system:filter.asyncRouteTooManyAfterFilters" as *errors.Error
//
//
// This function is auto-generated.
//
func ApigwFilterErrAsyncRouteTooManyAfterFilters(mm ...*apigwFilterActionProps) *errors.Error {
var p = &apigwFilterActionProps{}
if len(mm) > 0 {
p = mm[0]
}
var e = errors.New(
errors.KindInternal,
p.Format("no after filters are allowd for this async route", nil),
errors.Meta("type", "asyncRouteTooManyAfterFilters"),
errors.Meta("resource", "system:filter"),
// action log entry; no formatting, it will be applied inside recordAction fn.
errors.Meta(apigwFilterLogMetaKey{}, "failed to add {{filter}}; too many afterfilters, async route"),
errors.Meta(apigwFilterPropsMetaKey{}, p),
// translation namespace & key
errors.Meta(locale.ErrorMetaNamespace{}, "system"),
errors.Meta(locale.ErrorMetaKey{}, "apigwFilter.errors.asyncRouteTooManyAfterFilters"),
errors.StackSkip(1),
)
if len(mm) > 0 {
}
return e
}
// *********************************************************************************************************************
// *********************************************************************************************************************

View File

@ -73,3 +73,11 @@ errors:
- error: notAllowedToUndelete
message: "not allowed to undelete this filter"
log: "failed to undelete {{filter}}; insufficient permissions"
- error: asyncRouteTooManyProcessers
message: "processer already exists for this async route"
log: "failed to add {{filter}}; too many processers, async route"
- error: asyncRouteTooManyAfterFilters
message: "no after filters are allowd for this async route"
log: "failed to add {{filter}}; too many afterfilters, async route"