3
0

Refactored services, rest and rdbms

Added tests

Added ac to routes and functions

Proxy processer and auth servicer

Added options and extensive logging

Fixed calls to rbac in service, added default http handler on gateway root
This commit is contained in:
Peter Grlica
2021-07-12 08:32:07 +02:00
parent 9fe52398a9
commit da9100287c
64 changed files with 3960 additions and 1952 deletions

View File

@@ -2,7 +2,11 @@ package apigw
import (
"context"
"encoding/json"
as "github.com/cortezaproject/corteza-server/automation/service"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/system/types"
"github.com/go-chi/chi"
"go.uber.org/zap"
@@ -10,11 +14,12 @@ import (
type (
storer interface {
SearchApigwRoutes(ctx context.Context, f types.RouteFilter) (types.RouteSet, types.RouteFilter, error)
SearchApigwFunctions(ctx context.Context, f types.FunctionFilter) (types.FunctionSet, types.FunctionFilter, error)
SearchApigwRoutes(ctx context.Context, f types.ApigwRouteFilter) (types.ApigwRouteSet, types.ApigwRouteFilter, error)
SearchApigwFunctions(ctx context.Context, f types.ApigwFunctionFilter) (types.ApigwFunctionSet, types.ApigwFunctionFilter, error)
}
apigw struct {
opts *options.ApigwOpt
log *zap.Logger
reg *registry
routes []*route
@@ -37,7 +42,7 @@ func Set(a *apigw) {
}
// Setup handles the singleton service
func Setup(opts interface{}, log *zap.Logger, storer storer) {
func Setup(opts *options.ApigwOpt, log *zap.Logger, storer storer) {
if apiGw != nil {
return
}
@@ -45,11 +50,12 @@ func Setup(opts interface{}, log *zap.Logger, storer storer) {
apiGw = New(opts, log, storer)
}
func New(opts interface{}, logger *zap.Logger, storer storer) *apigw {
func New(opts *options.ApigwOpt, logger *zap.Logger, storer storer) *apigw {
reg := NewRegistry()
reg.Preload()
return &apigw{
opts: opts,
log: logger,
storer: storer,
reload: make(chan bool),
@@ -64,7 +70,7 @@ func (s *apigw) Reload(ctx context.Context) {
}
func (s *apigw) loadRoutes(ctx context.Context) (rr []*route, err error) {
routes, _, err := s.storer.SearchApigwRoutes(ctx, types.RouteFilter{Enabled: true})
routes, _, err := s.storer.SearchApigwRoutes(ctx, types.ApigwRouteFilter{Enabled: true, Deleted: filter.StateExcluded})
if err != nil {
return
@@ -83,66 +89,76 @@ func (s *apigw) loadRoutes(ctx context.Context) (rr []*route, err error) {
return
}
func (s *apigw) loadFunctions(ctx context.Context, route uint64) (ff []*types.Function, err error) {
ff, _, err = s.storer.SearchApigwFunctions(ctx, types.FunctionFilter{})
func (s *apigw) loadFunctions(ctx context.Context, route uint64) (ff []*types.ApigwFunction, err error) {
ff, _, err = s.storer.SearchApigwFunctions(ctx, types.ApigwFunctionFilter{RouteID: route})
return
}
func (s *apigw) Router(ctx context.Context) func(r chi.Router) {
return func(r chi.Router) {
routes, err := s.loadRoutes(ctx)
func (s *apigw) Router(r chi.Router) {
var (
ctx = context.Background()
)
if err != nil {
s.log.Error("could not load routes", zap.Error(err))
return
}
r.HandleFunc("/", helperDefaultResponse(s.opts))
s.Init(ctx, routes...)
routes, err := s.loadRoutes(ctx)
for _, route := range s.routes {
r.Handle(route.endpoint, route)
}
if err != nil {
s.log.Error("could not load routes", zap.Error(err))
return
}
go func() {
for {
select {
case <-s.reload:
s.log.Debug("got reload signal")
s.Init(ctx, routes...)
routes, err := s.loadRoutes(ctx)
for _, route := range s.routes {
r.Handle(route.endpoint, route)
}
if err != nil {
s.log.Error("could not reload routes", zap.Error(err))
return
}
go func() {
for {
select {
case <-s.reload:
routes, err := s.loadRoutes(ctx)
s.Init(ctx, routes...)
for _, route := range s.routes {
r.Handle(route.endpoint, route)
}
case <-ctx.Done():
s.log.Debug("done! getting out")
if err != nil {
s.log.Error("could not reload API Gateway routes", zap.Error(err))
return
}
s.log.Debug("reloading API Gateway routes and functions", zap.Int("count", len(routes)))
s.Init(ctx, routes...)
for _, route := range s.routes {
r.Handle(route.endpoint, route)
}
case <-ctx.Done():
s.log.Debug("shutting down API Gateway service")
return
}
}()
}
}
}()
}
// init all the routes
func (s *apigw) Init(ctx context.Context, route ...*route) {
s.routes = route
s.log.Debug("initializing routes\n", zap.Int("num", len(s.routes)))
s.log.Debug("registering routes", zap.Int("count", len(s.routes)))
for _, r := range s.routes {
r.pipe = &pl{}
log := s.log.With(zap.String("route", r.String()))
r.pipe = NewPipeline(log)
r.opts = s.opts
r.log = log
regFuncs, err := s.loadFunctions(ctx, r.ID)
if err != nil {
s.log.Error("could not load functions for route", zap.String("route", r.endpoint), zap.Error(err))
log.Error("could not load functions for route", zap.Error(err))
continue
}
@@ -154,20 +170,31 @@ func (s *apigw) Init(ctx context.Context, route ...*route) {
})
for _, f := range regFuncs {
fc := functionHandler{}
h, err := s.reg.Get(f.Ref)
if err != nil {
s.log.Error("could not register function for route", zap.String("route", r.endpoint), zap.Error(err))
log.Error("could not register function for route", zap.Error(err))
continue
}
fc.Merge(ctx, h.Meta(f))
fc.SetHandler(h.Handler())
enc, err := json.Marshal(f.Params)
r.pipe.Add(fc, f.Params)
if err != nil {
log.Error("could not load params for function", zap.String("ref", f.Ref), zap.Error(err))
continue
}
h, err = s.reg.Merge(h, enc)
if err != nil {
log.Error("could not merge params to handler", zap.String("ref", f.Ref), zap.Error(err))
continue
}
r.pipe.Add(h)
}
log.Debug("successfuly registered route")
}
}
@@ -176,10 +203,13 @@ func (s *apigw) Funcs(kind string) (list functionMetaList) {
if kind != "" {
list, _ = list.Filter(func(fm *functionMeta) (bool, error) {
// return fm.
return fm.Kind == kind, nil
return string(fm.Kind) == kind, nil
})
}
return
}
func NewWorkflow() (wf WfExecer) {
return as.Workflow(&zap.Logger{}, options.CorredorOpt{})
}

View File

@@ -2,39 +2,187 @@ package apigw
import (
"context"
"net/http"
"net/http/httptest"
"errors"
"testing"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/cortezaproject/corteza-server/pkg/logger"
"github.com/cortezaproject/corteza-server/pkg/wfexec"
"github.com/cortezaproject/corteza-server/system/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func execFn(t *testing.T, r *http.Request, fn wfHandler) error {
type (
// overriding mockHandler with only
// the merge function
mockExistingHandler struct {
*mockHandler
merge func(params []byte) (Handler, error)
}
)
func Test_serviceLoadRoutes(t *testing.T) {
var (
req = require.New(t)
ctx = context.Background()
scope = &expr.Vars{}
graph = wfexec.NewGraph()
recorder = httptest.NewRecorder()
ctx = context.Background()
req = require.New(t)
)
scope.Set("envelope", envelope{
Request: r,
Writer: recorder,
})
mockStorer := &mockStorer{
r: func(c context.Context, arf types.ApigwRouteFilter) (s types.ApigwRouteSet, f types.ApigwRouteFilter, err error) {
s = types.ApigwRouteSet{
{ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0},
{ID: 2, Endpoint: "/endpoint2", Method: "POST", Debug: false, Enabled: true, Group: 0},
}
return
},
}
step := wfexec.NewGenericStep(fn.self())
service := &apigw{
storer: mockStorer,
}
graph.AddStep(step)
sess := wfexec.NewSession(ctx, graph, wfexec.SetLogger(logger.Default()))
err := sess.Exec(ctx, step, scope)
r, err := service.loadRoutes(ctx)
req.NoError(err)
return sess.Wait(ctx)
req.Len(r, 2)
}
func Test_serviceLoadFunctions(t *testing.T) {
var (
ctx = context.Background()
req = require.New(t)
)
mockStorer := &mockStorer{
f: func(c context.Context, aff types.ApigwFunctionFilter) (s types.ApigwFunctionSet, f types.ApigwFunctionFilter, err error) {
s = types.ApigwFunctionSet{
{ID: 1, Route: 1},
{ID: 2, Route: 2},
}
return
},
}
service := &apigw{
storer: mockStorer,
}
r, err := service.loadFunctions(ctx, 2)
req.NoError(err)
req.Len(r, 2)
}
func Test_serviceInit(t *testing.T) {
type (
tf struct {
name string
expLen int
st mockStorer
reg *registry
}
)
var (
tcc = []tf{
{
name: "could not register 1 function for route",
st: mockStorer{
r: func(c context.Context, arf types.ApigwRouteFilter) (s types.ApigwRouteSet, f types.ApigwRouteFilter, err error) {
s = types.ApigwRouteSet{
{ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0},
}
return
},
f: func(c context.Context, aff types.ApigwFunctionFilter) (s types.ApigwFunctionSet, f types.ApigwFunctionFilter, err error) {
s = types.ApigwFunctionSet{
{ID: 1, Route: 1, Ref: "testExistingFunction"},
{ID: 2, Route: 1, Ref: "testNotExistingFunction"},
}
return
},
},
reg: &registry{
h: map[string]Handler{"testExistingFunction": &mockHandler{}},
},
expLen: 1,
},
{
name: "successful register of 2 functions for route",
st: mockStorer{
r: func(c context.Context, arf types.ApigwRouteFilter) (s types.ApigwRouteSet, f types.ApigwRouteFilter, err error) {
s = types.ApigwRouteSet{
{ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0},
}
return
},
f: func(c context.Context, aff types.ApigwFunctionFilter) (s types.ApigwFunctionSet, f types.ApigwFunctionFilter, err error) {
s = types.ApigwFunctionSet{
{ID: 1, Route: 1, Ref: "testExistingFunction"},
{ID: 2, Route: 1, Ref: "testExistingFunction"},
}
return
},
},
reg: &registry{
h: map[string]Handler{"testExistingFunction": &mockHandler{}},
},
expLen: 2,
},
{
name: "could not merge params for function",
st: mockStorer{
r: func(c context.Context, arf types.ApigwRouteFilter) (s types.ApigwRouteSet, f types.ApigwRouteFilter, err error) {
s = types.ApigwRouteSet{
{ID: 1, Endpoint: "/endpoint", Method: "GET", Debug: false, Enabled: true, Group: 0},
}
return
},
f: func(c context.Context, aff types.ApigwFunctionFilter) (s types.ApigwFunctionSet, f types.ApigwFunctionFilter, err error) {
s = types.ApigwFunctionSet{
{ID: 1, Route: 1, Ref: "testExistingFunction", Params: types.ApigwFuncParams{}},
}
return
},
},
reg: &registry{
h: map[string]Handler{
"testExistingFunction": &mockExistingHandler{
merge: func(params []byte) (Handler, error) {
return nil, errors.New("testttt")
},
},
},
},
expLen: 0,
},
}
)
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
var (
req = require.New(t)
ctx = context.Background()
)
service := &apigw{
log: zap.NewNop(),
storer: tc.st,
reg: tc.reg,
}
rr, err := service.loadRoutes(ctx)
req.NoError(err)
service.Init(ctx, rr...)
req.NotEmpty(service.routes)
req.Len(service.routes[0].pipe.w, tc.expLen)
})
}
}
func (h mockExistingHandler) Merge(params []byte) (Handler, error) {
return h.merge(params)
}

128
pkg/apigw/auth.go Normal file
View File

@@ -0,0 +1,128 @@
package apigw
import (
"encoding/base64"
"fmt"
"net/http"
)
const (
authTypeOauth2 authType = "oauth2"
authTypeHeader authType = "header"
authTypeQuery authType = "query"
authTypeBasic authType = "basic"
authTypeJwt authType = "jwt"
authTypeNoop authType = "noop"
)
type (
AuthServicer interface {
Do(*http.Request) error
}
authServicerNoop struct{}
authServicerHeader struct {
params map[string]interface{}
}
authServicerQuery struct {
params map[string]interface{}
}
authServicerBasic struct {
user string
pass string
}
authType string
authParams struct {
Type authType `json:"type"`
Params map[string]interface{} `json:"params"`
}
)
func NewAuthHeader(p authParams) (s authServicerHeader, err error) {
s = authServicerHeader{
params: p.Params,
}
return
}
func NewAuthQuery(p authParams) (s authServicerQuery, err error) {
s = authServicerQuery{
params: p.Params,
}
return
}
func NewAuthBasic(p authParams) (s authServicerBasic, err error) {
var (
ok bool
user, pass string
)
if user, ok = p.Params["username"].(string); !ok {
err = fmt.Errorf("invalid param username")
return
}
if pass, ok = p.Params["password"].(string); !ok {
err = fmt.Errorf("invalid param password")
return
}
s = authServicerBasic{user: user, pass: pass}
return
}
func NewAuthServicer(c *http.Client, p authParams) (AuthServicer, error) {
switch p.Type {
case authTypeHeader:
return NewAuthHeader(p)
case authTypeQuery:
return NewAuthQuery(p)
case authTypeBasic:
return NewAuthBasic(p)
default:
return authServicerNoop{}, nil
}
}
func (s authServicerHeader) Do(r *http.Request) error {
for k, v := range s.params {
r.Header.Add(k, v.(string))
}
return nil
}
func (s authServicerQuery) Do(r *http.Request) error {
if len(s.params) == 0 {
return nil
}
q := r.URL.Query()
for k, v := range s.params {
q.Set(k, v.(string))
}
r.URL.RawQuery = q.Encode()
return nil
}
func (s authServicerBasic) Do(r *http.Request) error {
bs := base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", s.user, s.pass)))
r.Header.Set("Authorization", fmt.Sprintf("Basic %s", bs))
return nil
}
func (s authServicerNoop) Do(r *http.Request) error {
return nil
}

121
pkg/apigw/auth_test.go Normal file
View File

@@ -0,0 +1,121 @@
package apigw
import (
"net/http"
"testing"
"github.com/stretchr/testify/require"
)
type (
mockRoundTripper func(*http.Request) (*http.Response, error)
)
func (mrt mockRoundTripper) RoundTrip(rq *http.Request) (r *http.Response, err error) {
return mrt(rq)
}
func Test_authDo(t *testing.T) {
type (
tf struct {
name string
err string
errv string
params authParams
exp http.Header
}
)
var (
tcc = []tf{
{
name: "auth header match headers",
params: authParams{
Type: authTypeHeader,
Params: map[string]interface{}{
"Client-Id": "123455",
"Client_credentials": "pass1234",
},
},
exp: http.Header{
"Client-Id": []string{"123455"},
"Client_credentials": []string{"pass1234"},
},
},
{
name: "auth header match canonicalized headers",
params: authParams{
Type: authTypeHeader,
Params: map[string]interface{}{
"camelCaseHeader": "123455",
},
},
exp: http.Header{
"Camelcaseheader": []string{"123455"},
},
},
{
name: "auth basic match headers",
params: authParams{
Type: authTypeBasic,
Params: map[string]interface{}{
"username": "user",
"password": "pass1234",
},
},
exp: http.Header{"Authorization": []string{"Basic dXNlcjpwYXNzMTIzNA=="}},
},
{
name: "auth basic match headers fail user validation",
params: authParams{
Type: authTypeBasic,
Params: map[string]interface{}{"password": "pass1234"},
},
exp: http.Header{},
errv: "invalid param username",
},
{
name: "auth basic match headers fail pass validation",
params: authParams{
Type: authTypeBasic,
Params: map[string]interface{}{"username": "user"},
},
exp: http.Header{},
errv: "invalid param password",
},
{
name: "noop default fallback",
params: authParams{},
exp: http.Header{},
},
}
)
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
var (
req = require.New(t)
c = http.DefaultClient
)
c.Transport = mockRoundTripper(func(r *http.Request) (rs *http.Response, err error) { return })
rq, _ := http.NewRequest("POST", "/foo", http.NoBody)
authServicer, err := NewAuthServicer(c, tc.params)
if tc.errv != "" {
req.EqualError(err, tc.errv)
return
}
err = authServicer.Do(rq)
if tc.err != "" {
req.EqualError(err, tc.err)
} else {
req.Equal(tc.exp, rq.Header)
}
})
}
}

18
pkg/apigw/error.go Normal file
View File

@@ -0,0 +1,18 @@
package apigw
import (
"context"
"net/http"
)
type (
defaultErrorHandler struct{}
)
func (h defaultErrorHandler) Exec(ctx context.Context, scope *scp, err error) {
// set http status code
scope.Writer().WriteHeader(http.StatusInternalServerError)
// set body
scope.Writer().Write([]byte(err.Error()))
}

View File

@@ -1,17 +1,22 @@
package apigw
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/cortezaproject/corteza-server/system/types"
"github.com/davecgh/go-spew/spew"
"net/url"
)
type (
expediterRedirection struct{}
expediterRedirection struct {
functionMeta
params struct {
HTTPStatus int `json:"status,string"`
Location string `json:"location"`
}
}
errorHandler struct {
name string
@@ -21,41 +26,91 @@ type (
}
)
func NewExpediterRedirection() expediterRedirection {
return expediterRedirection{}
func NewExpediterRedirection() (e *expediterRedirection) {
e = &expediterRedirection{}
e.Step = 3
e.Name = "expediterRedirection"
e.Label = "Redirection expediter"
e.Kind = FunctionKindExpediter
e.Args = []*functionMetaArg{
{
Type: "status",
Label: "status",
Options: map[string]interface{}{},
},
{
Type: "text",
Label: "location",
Options: map[string]interface{}{},
},
}
return
}
func (h expediterRedirection) Meta(f *types.Function) functionMeta {
return functionMeta{
Step: 3,
Name: "expediterRedirection",
Label: "Redirection expediter",
Kind: "expediter",
Weight: int(f.Weight),
Params: f.Params,
}
func (h expediterRedirection) String() string {
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
}
func (h expediterRedirection) Handler() handlerFunc {
return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error {
scope.Writer().Header().Add(fmt.Sprintf("step_%d", ff.step), ff.name)
http.Redirect(scope.Writer(), scope.Request(), params["location"].(string), http.StatusFound)
func (h expediterRedirection) Meta() functionMeta {
return h.functionMeta
}
return nil
func (f *expediterRedirection) Merge(params []byte) (Handler, error) {
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params)
return f, err
}
func (h expediterRedirection) Exec(ctx context.Context, scope *scp) error {
loc, err := url.ParseRequestURI(h.params.Location)
if err != nil {
return fmt.Errorf("could not redirect: %s", err)
}
status := h.params.HTTPStatus
if !checkStatus("redirect", status) {
return fmt.Errorf("could not redirect: wrong status %d", status)
}
http.Redirect(scope.Writer(), scope.Request(), loc.String(), status)
return nil
}
func (pp errorHandler) Exec(ctx context.Context, scope *scp, err error) {
type (
responseHelper struct {
Msg string `json:"msg"`
ErrResponse struct {
Msg string `json:"msg"`
} `json:"error"`
}
)
resp := responseHelper{
Msg: err.Error(),
ErrResponse: struct {
Msg string "json:\"msg\""
}{
Msg: err.Error(),
},
}
spew.Dump("ERR in expediter", err, resp)
// set http status code
scope.Writer().WriteHeader(http.StatusInternalServerError)
// set body
json.NewEncoder(scope.Writer()).Encode(resp)
}
func checkStatus(typ string, status int) bool {
switch typ {
case "redirect":
return status >= 300 && status <= 399
default:
return true
}
}

View File

@@ -0,0 +1,74 @@
package apigw
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/require"
)
func Test_expediterRedirection(t *testing.T) {
type (
tf struct {
name string
expr string
err string
}
)
var (
tcc = []tf{
{
name: "simple redirection",
expr: `{"status":"302", "location": "http://redire.ct/to"}`,
},
{
name: "permanent redirection",
expr: `{"status":"301", "location": "http://redire.ct/to"}`,
},
{
name: "url validation",
expr: `{"status":"301", "location": "invalid url"}`,
err: `could not redirect: parse "invalid url": invalid URI for request`,
},
{
name: "invalid redirection status",
expr: `{"status":"400", "location": "http://redire.ct/to"}`,
err: "could not redirect: wrong status 400",
},
}
)
for _, tc := range tcc {
var (
ctx = context.Background()
)
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
r, err := http.NewRequest(http.MethodGet, "/foo", http.NoBody)
req.NoError(err)
rc := httptest.NewRecorder()
scope := &scp{"request": r, "writer": rc}
h := NewExpediterRedirection()
h.Merge([]byte(tc.expr))
err = h.Exec(ctx, scope)
if tc.err != "" {
req.EqualError(err, tc.err)
return
}
req.NoError(err)
req.Equal(h.params.Location, rc.Header().Get("Location"))
req.Equal(h.params.HTTPStatus, rc.Code)
})
}
}

View File

@@ -1,75 +1,52 @@
package apigw
import (
"context"
"github.com/cortezaproject/corteza-server/system/types"
const (
FunctionKindVerifier FunctionKind = "verifier"
FunctionKindValidator FunctionKind = "validator"
FunctionKindProcesser FunctionKind = "processer"
FunctionKindExpediter FunctionKind = "expediter"
)
type (
functionMetaList []*functionMeta
FunctionKind string
Handler interface {
Handler() handlerFunc
Meta(f *types.Function) functionMeta
}
Execer
Stringer
handlerFunc func(context.Context, *scp, map[string]interface{}, functionHandler) error
Merge([]byte) (Handler, error)
Meta() functionMeta
}
functionMeta struct {
Step int `json:"step"`
Weight int `json:"-"`
Name string `json:"name"`
Label string `json:"label"`
Kind string `json:"kind"`
Params map[string]interface{} `json:"-"`
Args []*functionMetaArg `json:"params,omitempty"`
Step int `json:"step"`
Weight int `json:"-"`
Name string `json:"name"`
Label string `json:"label"`
Kind FunctionKind `json:"kind"`
Args []*functionMetaArg `json:"params,omitempty"`
}
functionMetaList []*functionMeta
functionMetaArg struct {
Label string `json:"label"`
Type string `json:"type"`
Example string `json:"example"`
Options map[string]interface{} `json:"options"`
}
functionHandler struct {
step int
weight int
name string
label string
kind string
handler handlerFunc
params map[string]interface{}
}
)
func (ff functionHandler) Exec(ctx context.Context, scope *scp, params map[string]interface{}) error {
return ff.handler(ctx, scope, params, ff)
}
func (ff *functionHandler) SetHandler(h handlerFunc) {
ff.handler = h
}
func (ff *functionHandler) Merge(ctx context.Context, p functionMeta) {
ff.step = p.Step
ff.kind = p.Kind
ff.label = p.Label
ff.name = p.Name
ff.weight = p.Weight
ff.params = p.Params
}
func (ff functionHandler) Weight() int {
// if there's gonna be more than 1000 funcs
// per step, we're doing something wrong
return ff.step*1000 + ff.weight
}
// func (ff functionHandler) Weight() int {
// // if there's gonna be more than 1000 funcs
// // per step, we're doing something wrong
// return ff.step*1000 + ff.weight
// }
func (fm functionMetaList) Filter(f func(*functionMeta) (bool, error)) (out functionMetaList, err error) {
var ok bool
out = functionMetaList{}
for i := range fm {
if ok, err = f(fm[i]); err != nil {
return

21
pkg/apigw/helpers.go Normal file
View File

@@ -0,0 +1,21 @@
package apigw
import (
"net/http"
"github.com/cortezaproject/corteza-server/pkg/options"
)
const (
devHelperResponseBody string = `Hey developer!`
)
func helperDefaultResponse(opt *options.ApigwOpt) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if opt.LogEnabled {
http.Error(w, devHelperResponseBody, http.StatusTeapot)
} else {
http.Error(w, ``, http.StatusFound)
}
}
}

View File

@@ -1,74 +0,0 @@
package apigw
import (
"context"
"net/http"
"testing"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/stretchr/testify/require"
)
func TestAuthenticationOriginMatcher(t *testing.T) {
type (
tf struct {
name string
origin string
exp string
req *http.Request
}
)
var (
ctx = context.Background()
tcc = []tf{
{
name: "fail on origin",
origin: "http://fail.ed",
exp: "workflow 0 step 0 execution failed: origin fail",
req: &http.Request{
Header: http.Header{
"Origin": []string{
"http://localhost",
},
},
},
},
{
name: "success on origin",
origin: "http://localhost",
exp: "",
req: &http.Request{
Header: http.Header{
"Origin": []string{
"http://localhost",
},
},
},
},
}
)
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
var (
req = require.New(t)
input = &expr.Vars{}
)
input.Set("origin", tc.origin)
err := execFn(t, tc.req, authenticationOriginMatcher(ctx, input))
if tc.exp != "" {
req.EqualError(err, tc.exp)
} else {
req.NoError(err)
}
})
}
}

View File

@@ -2,13 +2,16 @@ package apigw
import (
"context"
"fmt"
"net/http"
"sort"
"github.com/cortezaproject/corteza-server/pkg/options"
"go.uber.org/zap"
)
type (
Execer interface {
Exec(context.Context, *scp, map[string]interface{}) error
Exec(context.Context, *scp) error
}
Sorter interface {
@@ -19,26 +22,35 @@ type (
Exec(context.Context, *scp, error)
}
Payload struct {
params map[string]interface{}
worker Worker
}
Worker interface {
Execer
Sorter
// Sorter
Stringer
}
workers []Payload
Stringer interface {
String() string
}
workers []Worker
pl struct {
w workers
err ErrorHandler
log *zap.Logger
}
scp map[string]interface{}
)
func NewPipeline(log *zap.Logger, w ...Worker) *pl {
return &pl{
w: w,
log: log,
err: defaultErrorHandler{},
}
}
func (s scp) Request() *http.Request {
if _, ok := s["request"]; ok {
return s["request"].(*http.Request)
@@ -55,19 +67,39 @@ func (s scp) Writer() http.ResponseWriter {
return nil
}
func (s scp) Opts() *options.ApigwOpt {
if _, ok := s["opts"]; ok {
return s["opts"].(*options.ApigwOpt)
}
return nil
}
func (s scp) Set(k string, v interface{}) {
s[k] = v
}
func (s scp) Get(k string) (v interface{}, err error) {
var ok bool
if v, ok = s[k]; !ok {
err = fmt.Errorf("could not get key on index: %s", k)
return
}
return
}
// Exec takes care of error handling and main
// functionality that takes place in worker
func (pp *pl) Exec(ctx context.Context, scope *scp) (err error) {
for _, w := range pp.w {
err = w.worker.Exec(ctx, scope, w.params)
pp.log.Debug("executing worker", zap.Any("worker", w.String()))
err = w.Exec(ctx, scope)
if err != nil {
// call the error handler
pp.err.Exec(ctx, scope, err)
pp.log.Debug("could not execute worker", zap.Error(err))
return
}
}
@@ -76,10 +108,12 @@ func (pp *pl) Exec(ctx context.Context, scope *scp) (err error) {
}
// Add registers a new worker with parameters
// fethed from store
func (pp *pl) Add(ff Worker, p map[string]interface{}) {
pp.w = append(pp.w, Payload{worker: ff, params: p})
sort.Sort(pp.w)
// fetched from store
func (pp *pl) Add(w Worker) {
pp.w = append(pp.w, w)
// sort.Sort(pp.w)
pp.log.Debug("registered worker", zap.Any("worker", w.String()))
}
// add error handler
@@ -87,8 +121,8 @@ func (pp *pl) ErrorHandler(ff ErrorHandler) {
pp.err = ff
}
func (a workers) Len() int { return len(a) }
func (a workers) Less(i, j int) bool {
return a[i].worker.Weight() < a[j].worker.Weight()
}
func (a workers) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// func (a workers) Len() int { return len(a) }
// func (a workers) Less(i, j int) bool {
// return a[i].worker.Weight() < a[j].worker.Weight()
// }
// func (a workers) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@@ -0,0 +1,69 @@
package apigw
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func NewPl() *pl {
return NewPipeline(zap.NewNop())
}
func Test_pipelineAdd(t *testing.T) {
var (
req = require.New(t)
)
p := NewPl()
p.Add(mockExecer{})
req.Len(p.w, 1)
}
func Test_pipelineExec(t *testing.T) {
var (
ctx = context.Background()
req = require.New(t)
scope = &scp{"foo": 1}
)
p := NewPl()
p.Add(mockExecer{
exec: func(c context.Context, s *scp) (err error) {
s.Set("foo", 2)
return nil
},
})
err := p.Exec(ctx, scope)
req.NoError(err)
foo, err := scope.Get("foo")
req.NoError(err)
req.Equal(2, foo)
}
func Test_pipelineExecErr(t *testing.T) {
var (
ctx = context.Background()
req = require.New(t)
scope = &scp{"foo": 1}
)
p := NewPl()
p.Add(mockExecer{
exec: func(c context.Context, s *scp) (err error) {
return fmt.Errorf("error returned")
},
})
err := p.Exec(ctx, scope)
req.Error(err, "error returned")
}

View File

@@ -1,12 +1,32 @@
package apigw
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"time"
atypes "github.com/cortezaproject/corteza-server/automation/types"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/cortezaproject/corteza-server/system/types"
"go.uber.org/zap"
)
var (
hopHeaders = []string{
"Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te",
"Trailers",
"Transfer-Encoding",
"Upgrade",
}
)
type (
@@ -15,93 +35,252 @@ type (
}
processerWorkflow struct {
functionMeta
d WfExecer
params struct {
Workflow uint64 `json:"workflow"`
}
}
processerProxy struct {
functionMeta
a AuthServicer
c *http.Client
log *zap.Logger
params struct {
Location string `json:"location"`
Auth authParams `json:"auth"`
}
}
)
func NewProcesserWorkflow(wf WfExecer) processerWorkflow {
return processerWorkflow{
d: wf,
}
}
func NewProcesserWorkflow(wf WfExecer) (p *processerWorkflow) {
p = &processerWorkflow{}
func (h processerWorkflow) Meta(f *types.Function) functionMeta {
return functionMeta{
Step: 2,
Name: "processerWorkflow",
Label: "Workflow processer",
Kind: "processer",
Weight: int(f.Weight),
Params: f.Params,
Args: []*functionMetaArg{
{
Type: "workflow",
Label: "workflow",
Options: map[string]interface{}{},
},
p.d = wf
p.Step = 2
p.Name = "processerWorkflow"
p.Label = "Workflow processer"
p.Kind = FunctionKindProcesser
p.Args = []*functionMetaArg{
{
Type: "workflow",
Label: "workflow",
Options: map[string]interface{}{},
},
}
return
}
func (h processerWorkflow) Handler() handlerFunc {
return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error {
var (
wfID int64
ok bool
err error
)
func (h processerWorkflow) String() string {
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
}
// validate workflow param
if _, ok = params["workflow"]; !ok {
return fmt.Errorf("invalid param workflow")
}
func (h processerWorkflow) Meta() functionMeta {
return h.functionMeta
}
wfID, err = expr.CastToInteger(params["workflow"])
func (f *processerWorkflow) Merge(params []byte) (Handler, error) {
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params)
if err != nil {
return err
}
return f, err
}
// setup scope for workflow
vv := map[string]interface{}{
"request": scope.Request(),
}
func (h processerWorkflow) Exec(ctx context.Context, scope *scp) error {
var (
err error
)
// get the request data and put it into vars
in, err := expr.NewVars(vv)
// setup scope for workflow
vv := map[string]interface{}{
"request": scope.Request(),
}
if err != nil {
return err
}
wp := atypes.WorkflowExecParams{
Trace: false,
// todo depending on settings per-route
Async: false,
// todo depending on settings per-route
Wait: true,
Input: in,
}
out, _, err := h.d.Exec(ctx, uint64(wfID), wp)
if err != nil {
return err
}
// merge out with scope
merged, err := in.Merge(out)
if err != nil {
return err
}
mm, err := expr.CastToVars(merged)
for k, v := range mm {
scope.Set(k, v)
}
// get the request data and put it into vars
in, err := expr.NewVars(vv)
if err != nil {
return err
}
wp := atypes.WorkflowExecParams{
Trace: false,
// todo depending on settings per-route
Async: false,
// todo depending on settings per-route
Wait: true,
Input: in,
}
out, _, err := h.d.Exec(ctx, uint64(h.params.Workflow), wp)
if err != nil {
return err
}
// merge out with scope
merged, err := in.Merge(out)
if err != nil {
return err
}
mm, err := expr.CastToVars(merged)
for k, v := range mm {
scope.Set(k, v)
}
return err
}
func NewProcesserProxy(l *zap.Logger, c *http.Client) (p *processerProxy) {
p = &processerProxy{}
p.c = c
p.log = l
p.Step = 2
p.Name = "processerProxy"
p.Label = "Proxy processer"
p.Kind = FunctionKindProcesser
p.Args = []*functionMetaArg{
{
Type: "text",
Label: "location",
Options: map[string]interface{}{},
},
}
return
}
func (h processerProxy) String() string {
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
}
func (h processerProxy) Meta() functionMeta {
return h.functionMeta
}
func (f *processerProxy) Merge(params []byte) (Handler, error) {
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&f.params)
if err != nil {
return nil, err
}
// get the auth mechanism
f.a, err = NewAuthServicer(f.c, f.params.Auth)
if err != nil {
return nil, fmt.Errorf("could not load auth servicer for proxying: %s", err)
}
return f, err
}
func (h processerProxy) Exec(ctx context.Context, scope *scp) (err error) {
ctx, cancel := context.WithTimeout(ctx, scope.Opts().ProxyOutboundTimeout)
defer cancel()
req := scope.Request()
log := h.log.With(zap.String("ref", h.Name))
outreq := req.Clone(ctx)
l, err := url.ParseRequestURI(h.params.Location)
if err != nil {
return fmt.Errorf("could not parse destination location for proxying: %s", err)
}
// should we preserve query params? headers? post data?
outreq.URL = l
outreq.RequestURI = ""
outreq.Method = req.Method
outreq.Host = l.Hostname()
// use authservicer, set any additional headers
err = h.a.Do(outreq)
if err != nil {
return fmt.Errorf("errors setting auth for proxying: %s", err)
}
// merge the old query params to the new request
// do not overwrite old ones
// do it after the authServicer, since we also may add them there
mergeQueryParams(req, outreq)
if scope.Opts().ProxyEnableDebugLog {
o, _ := httputil.DumpRequestOut(outreq, false)
log.Debug("proxy outbound request", zap.Any("request", string(o)))
}
// temporary metrics before the proper functionality
startTime := time.Now()
// todo - disable / enable follow redirects, already
// added to options
resp, err := h.c.Do(outreq)
if err != nil {
return fmt.Errorf("could not proxy request: %s", err)
}
if scope.Opts().ProxyEnableDebugLog {
o, _ := httputil.DumpResponse(resp, false)
log.Debug("proxy outbound response", zap.Any("request", string(o)), zap.Duration("duration", time.Since(startTime)))
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("could not read get body on proxy request: %s", err)
}
mergeHeaders(resp.Header, scope.Writer().Header())
// add to writer
scope.Writer().Write(b)
return nil
}
func mergeHeaders(orig, dest http.Header) {
OUTER:
for name, values := range orig {
// skip headers that need to be omitted
// when proxying
for _, v := range hopHeaders {
if v == name {
continue OUTER
}
}
dest[name] = values
}
}
func mergeQueryParams(orig, dest *http.Request) {
origValues := dest.URL.Query()
for k, qp := range orig.URL.Query() {
// skip existing
if dest.URL.Query().Get(k) != "" {
continue
}
for _, v := range qp {
origValues.Add(k, v)
}
}
dest.URL.RawQuery = origValues.Encode()
}

195
pkg/apigw/processer_test.go Normal file
View File

@@ -0,0 +1,195 @@
package apigw
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func Test_processerProxy(t *testing.T) {
type (
exp struct {
Status int
Header http.Header
Body *bytes.Buffer
}
tf struct {
name string
err string
params string
exp exp
rq *http.Request
fn func(*require.Assertions) mockRoundTripper
}
)
var (
tcc = []tf{
{
name: "proxy processer with auth headers",
fn: func(req *require.Assertions) mockRoundTripper {
return func(r *http.Request) (rs *http.Response, err error) {
rs = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("default response")),
}
return
}
},
params: `{"location": "/foo", "auth": {"type": "header", "params": {"access-token": "123", "client": "456"}}}`,
exp: exp{
Status: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/plain; charset=utf-8"}},
Body: bytes.NewBufferString("default response"),
},
},
{
name: "proxy processer with auth query params",
fn: func(req *require.Assertions) mockRoundTripper {
return func(r *http.Request) (rs *http.Response, err error) {
rs = &http.Response{}
req.Equal("access-param=123%2B456", r.URL.RawQuery)
return
}
},
params: `{"location": "/foo", "auth": {"type": "query", "params": {"access-param": "123+456"}}}`,
exp: exp{
Status: http.StatusOK,
Header: http.Header{"Content-Type": []string{"text/plain; charset=utf-8"}},
Body: bytes.NewBuffer(nil),
},
},
{
name: "proxy processer with auth headers unauthorized",
fn: func(req *require.Assertions) mockRoundTripper {
return func(r *http.Request) (rs *http.Response, err error) {
rs = &http.Response{
StatusCode: http.StatusUnauthorized,
Body: io.NopCloser(strings.NewReader("unauthorized response")),
}
return
}
},
params: `{"location": "/foo", "auth": {"type": "header", "params": {"access-token": "123", "client": "456"}}}`,
exp: exp{
Status: http.StatusUnauthorized,
Header: http.Header{"Content-Type": []string{"text/plain; charset=utf-8"}},
Body: bytes.NewBufferString("unauthorized response"),
},
},
{
name: "proxy processer params parse error",
params: `{"location": "invalid url", "auth": {"type": "header", "params": {}}}`,
err: `could not parse destination location for proxying: parse "invalid url": invalid URI for request`,
},
{
name: "proxy processer params request error",
fn: func(req *require.Assertions) mockRoundTripper {
return func(r *http.Request) (rs *http.Response, err error) {
err = fmt.Errorf("error on client.Do")
return
}
},
params: `{"location": "https://example.com", "auth": {"type": "header", "params": {}}}`,
err: `could not proxy request: Post "https://example.com": error on client.Do`,
},
{
name: "proxy processer hop headers removed",
fn: func(req *require.Assertions) mockRoundTripper {
return func(r *http.Request) (rs *http.Response, err error) {
rs = &http.Response{
Header: http.Header{
"Proxy-Authenticate": []string{`Basic realm="Access to the internal site"`},
"Content-Type": []string{"application/json; charset=utf-8"},
},
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("default response")),
}
return
}
},
params: `{"location": "https://example.com", "auth": {"type": "header", "params": {}}}`,
exp: exp{
Status: http.StatusUnauthorized,
Header: http.Header{"Content-Type": []string{"application/json; charset=utf-8"}},
Body: bytes.NewBufferString("default response"),
},
},
{
name: "proxy processer query parameters merged",
fn: func(req *require.Assertions) mockRoundTripper {
return func(r *http.Request) (rs *http.Response, err error) {
rs = &http.Response{}
req.Equal("access-param=123%2B456&addedCustomQueryParam=true", r.URL.RawQuery)
return
}
},
params: `{"location": "https://example.com", "auth": {"type": "query", "params": {"access-param": "123+456"}}}`,
exp: exp{
Status: http.StatusUnauthorized,
Header: http.Header{"Content-Type": []string{"text/plain; charset=utf-8"}},
Body: bytes.NewBuffer(nil),
},
rq: &http.Request{
Header: http.Header{},
URL: &url.URL{Path: "/foo", RawQuery: "addedCustomQueryParam=true"},
Body: http.NoBody,
Method: "POST",
},
},
}
)
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
var (
ctx = context.Background()
req = require.New(t)
c = http.DefaultClient
rq = tc.rq
)
if tc.fn != nil {
c.Transport = mockRoundTripper(tc.fn(req))
}
if rq == nil {
rq, _ = http.NewRequest("POST", "/foo", strings.NewReader(`custom request body`))
}
proxy := NewProcesserProxy(zap.NewNop(), c)
proxy.Merge([]byte(tc.params))
scope := &scp{
"request": rq,
"writer": httptest.NewRecorder(),
"opts": options.Apigw(),
}
err := proxy.Exec(ctx, scope)
if tc.err != "" {
req.EqualError(err, tc.err)
} else {
req.NoError(err)
req.Equal(tc.exp.Header, scope.Writer().(*httptest.ResponseRecorder).Header())
req.Equal(tc.exp.Body, scope.Writer().(*httptest.ResponseRecorder).Body)
}
})
}
}

View File

@@ -2,11 +2,9 @@ package apigw
import (
"fmt"
"net/http"
"github.com/cortezaproject/corteza-server/automation/service"
as "github.com/cortezaproject/corteza-server/automation/service"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/system/types"
)
type (
@@ -25,6 +23,11 @@ func (r *registry) Add(n string, h Handler) {
r.h[n] = h
}
func (r *registry) Merge(h Handler, b []byte) (hh Handler, err error) {
hh, err = h.Merge(b)
return
}
func (r *registry) Get(identifier string) (Handler, error) {
var (
ok bool
@@ -40,8 +43,8 @@ func (r *registry) Get(identifier string) (Handler, error) {
func (r *registry) All() (list functionMetaList) {
for _, handler := range r.h {
m := handler.Meta(&types.Function{})
list = append(list, &m)
meta := handler.Meta()
list = append(list, &meta)
}
return
@@ -50,10 +53,8 @@ func (r *registry) All() (list functionMetaList) {
func (r *registry) Preload() {
r.Add("verifierQueryParam", NewVerifierQueryParam())
r.Add("verifierOrigin", NewVerifierOrigin())
r.Add("validatorHeader", NewValidatorHeader())
r.Add("expediterRedirection", NewExpediterRedirection())
r.Add("processerWorkflow", NewProcesserWorkflow(NewWorkflow()))
}
func NewWorkflow() WfExecer {
return as.Workflow(service.DefaultLogger, options.CorredorOpt{})
r.Add("processerProxy", NewProcesserProxy(service.DefaultLogger, http.DefaultClient))
}

100
pkg/apigw/registry_test.go Normal file
View File

@@ -0,0 +1,100 @@
package apigw
import (
"testing"
"github.com/stretchr/testify/require"
)
func Test_registryAddGet(t *testing.T) {
var (
req = require.New(t)
r = NewRegistry()
)
r.Add("mockHandler", mockHandler{})
h, err := r.Get("mockHandler")
req.NoError(err)
req.Len(r.h, 1)
req.IsType(mockHandler{}, h)
}
func Test_registryAddGetErr(t *testing.T) {
var (
req = require.New(t)
r = NewRegistry()
)
r.Add("mockHandler", mockHandler{})
h, err := r.Get("foo")
req.EqualError(err, "could not get element from registry: foo")
req.Len(r.h, 1)
req.Nil(h)
}
func Test_registryMerge(t *testing.T) {
type (
tf struct {
name string
err string
params string
exp string
}
)
var (
tcc = []tf{
{
name: "set params",
params: `{"foo":"bar"}`,
exp: "bar",
},
{
name: "set invalid params",
params: `{"foo1":"bar"}`,
exp: "",
},
{
name: "set invalid params err",
params: `{"foo1":"bar"`,
exp: "",
err: "unexpected EOF",
},
}
)
for _, tc := range tcc {
var (
req = require.New(t)
r = NewRegistry()
)
m, err := r.Merge(mockHandler{}, []byte(tc.params))
if tc.err != "" {
req.EqualError(err, tc.err)
} else {
req.Equal(m.(mockHandler).Foo, tc.exp)
req.NoError(err)
}
}
}
func Test_registryAll(t *testing.T) {
var (
req = require.New(t)
r = NewRegistry()
)
r.Add("mockHandler", mockHandler{})
list := r.All()
req.Len(list, 1)
req.NotEmpty(list[0].Name)
}

View File

@@ -1,8 +1,12 @@
package apigw
import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"github.com/cortezaproject/corteza-server/pkg/options"
"go.uber.org/zap"
)
type (
@@ -11,22 +15,50 @@ type (
endpoint string
method string
opts *options.ApigwOpt
log *zap.Logger
pipe *pl
}
)
func (r route) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var (
ctx = context.Background()
ctx = req.Context()
scope = scp{}
)
scope["request"] = req
scope["writer"] = w
scope.Set("request", req)
scope.Set("writer", w)
scope.Set("opts", r.opts)
if err := r.validate(req); err != nil {
r.log.Debug("error validating request on route", zap.Error(err))
r.pipe.err.Exec(ctx, &scope, fmt.Errorf("could not validate request: %s", err))
return
}
if r.opts.LogEnabled {
o, _ := httputil.DumpRequest(req, false)
r.log.Debug("incoming request", zap.Any("request", string(o)))
}
err := r.pipe.Exec(ctx, &scope)
if err != nil {
// log error
// call the error handler
r.log.Debug("calling default error handler on error")
r.pipe.err.Exec(ctx, &scope, err)
}
}
func (r route) validate(req *http.Request) (err error) {
if req.Method != r.method {
err = fmt.Errorf("invalid method %s", req.Method)
}
return
}
func (r route) String() string {
return fmt.Sprintf("%s %s", r.method, r.endpoint)
}

117
pkg/apigw/route_test.go Normal file
View File

@@ -0,0 +1,117 @@
package apigw
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"testing"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func Test_pl(t *testing.T) {
type (
tf struct {
name string
handler Worker
method string
errHandler ErrorHandler
expStatus int
expError error
}
)
var (
tcc = []tf{
{
name: "successful exec",
handler: mockExecer{
exec: func(c context.Context, s *scp) (err error) {
s.Writer().WriteHeader(http.StatusTemporaryRedirect)
return
},
},
errHandler: mockErrorExecer{
exec: func(c context.Context, s *scp, e error) {
s.Writer().Write([]byte(e.Error()))
},
},
method: "POST",
expStatus: http.StatusTemporaryRedirect,
expError: nil,
},
{
name: "unsuccessful exec",
handler: mockExecer{
exec: func(c context.Context, s *scp) (err error) {
s.Writer().WriteHeader(http.StatusTemporaryRedirect)
return errors.New("test error")
},
},
errHandler: mockErrorExecer{
exec: func(c context.Context, s *scp, e error) {
s.Writer().WriteHeader(http.StatusInternalServerError)
s.Writer().Write([]byte(e.Error()))
},
},
method: "POST",
expStatus: http.StatusTemporaryRedirect,
expError: errors.New("test error"),
},
{
name: "request method validation fail",
handler: mockExecer{
exec: func(c context.Context, s *scp) (err error) {
s.Writer().WriteHeader(http.StatusTemporaryRedirect)
return errors.New("test error")
},
},
errHandler: mockErrorExecer{
exec: func(c context.Context, s *scp, e error) {
s.Writer().WriteHeader(http.StatusInternalServerError)
s.Writer().Write([]byte(e.Error()))
},
},
method: "GET",
expStatus: http.StatusInternalServerError,
expError: errors.New("could not validate request: invalid method POST"),
},
}
)
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
var (
req = require.New(t)
rr = httptest.NewRecorder()
pipe = NewPl()
)
r, err := http.NewRequest("POST", "/foo", http.NoBody)
req.NoError(err)
pipe.Add(tc.handler)
pipe.ErrorHandler(tc.errHandler)
route := &route{
method: tc.method,
pipe: pipe,
log: zap.NewNop(),
opts: options.Apigw(),
}
route.ServeHTTP(rr, r)
expError := ""
if tc.expError != nil {
expError = tc.expError.Error()
}
req.Equal(tc.expStatus, rr.Result().StatusCode)
req.Equal(expError, rr.Body.String())
})
}
}

67
pkg/apigw/test.go Normal file
View File

@@ -0,0 +1,67 @@
package apigw
import (
"bytes"
"context"
"encoding/json"
"github.com/cortezaproject/corteza-server/system/types"
)
type (
mockExecer struct {
exec func(context.Context, *scp) (err error)
}
mockErrorExecer struct {
exec func(context.Context, *scp, error)
}
mockHandler struct {
Foo string `json:"foo"`
}
mockStorer struct {
f func(context.Context, types.ApigwFunctionFilter) (types.ApigwFunctionSet, types.ApigwFunctionFilter, error)
r func(context.Context, types.ApigwRouteFilter) (types.ApigwRouteSet, types.ApigwRouteFilter, error)
}
)
func (h mockHandler) String() string {
return "mockHandler"
}
func (h mockHandler) Exec(_ context.Context, _ *scp) error {
panic("not implemented") // TODO: Implement
}
func (h mockHandler) Merge(params []byte) (Handler, error) {
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&h)
return h, err
}
func (h mockHandler) Meta() functionMeta {
return functionMeta{
Name: "return mocked function metadata",
}
}
func (td mockStorer) SearchApigwRoutes(ctx context.Context, f types.ApigwRouteFilter) (s types.ApigwRouteSet, ff types.ApigwRouteFilter, err error) {
return td.r(ctx, f)
}
func (td mockStorer) SearchApigwFunctions(ctx context.Context, f types.ApigwFunctionFilter) (s types.ApigwFunctionSet, ff types.ApigwFunctionFilter, err error) {
return td.f(ctx, f)
}
func (me mockExecer) String() string {
return "mockExecer"
}
func (me mockExecer) Exec(ctx context.Context, s *scp) (err error) {
return me.exec(ctx, s)
}
func (me mockErrorExecer) Exec(ctx context.Context, s *scp, e error) {
me.exec(ctx, s, e)
}

87
pkg/apigw/validator.go Normal file
View File

@@ -0,0 +1,87 @@
package apigw
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/cortezaproject/corteza-server/pkg/expr"
)
type (
validatorHeader struct {
functionMeta
params struct {
Expr string `json:"expr"`
}
}
)
func NewValidatorHeader() (v *validatorHeader) {
v = &validatorHeader{}
v.Step = 3
v.Name = "validatorHeader"
v.Label = "Header validator"
v.Kind = FunctionKindValidator
v.Args = []*functionMetaArg{
{
Type: "expr",
Label: "expr",
Options: map[string]interface{}{},
},
}
return
}
func (h validatorHeader) String() string {
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
}
func (h validatorHeader) Meta() functionMeta {
return h.functionMeta
}
func (v *validatorHeader) Merge(params []byte) (Handler, error) {
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params)
return v, err
}
func (h validatorHeader) Exec(ctx context.Context, scope *scp) error {
vv := map[string]interface{}{}
headers := scope.Request().Header
for k, v := range headers {
// sanitize header keys?
vv[k] = v[0]
}
// get the request data and put it into vars
out, err := expr.NewVars(vv)
if err != nil {
return err
}
pp := expr.NewParser()
tt, err := pp.Parse(h.params.Expr)
if err != nil {
return fmt.Errorf("could not parse matching expression: %s", err)
}
b, err := tt.Test(ctx, out)
if err != nil {
return fmt.Errorf("could not validate headers: %s", err)
}
if !b {
return fmt.Errorf("could not validate headers")
}
return nil
}

View File

@@ -3,58 +3,75 @@ package apigw
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/stretchr/testify/require"
)
func TestContentLengthValidator(t *testing.T) {
func Test_validatorHeader(t *testing.T) {
type (
tf struct {
name string
limit int
exp string
body string
name string
expr string
err string
headers http.Header
}
)
var (
ctx = context.Background()
tcc = []tf{
{
name: "fail on content length > limit",
limit: 10,
exp: "workflow 0 step 0 execution failed: content length overriden",
body: "A message that is 31 bytes long",
name: "matching simple",
expr: `{"expr":"foo == \"bar\""}`,
headers: map[string][]string{"foo": {"bar"}},
},
{
name: "success on content length < limit",
limit: 10,
exp: "",
body: "Below 10",
name: "matching case",
expr: `{"expr":"Foo == \"bar\""}`,
headers: map[string][]string{"Foo": {"bar"}},
},
{
name: "non matching value",
expr: `{"expr":"Foo == \"bar1\""}`,
headers: map[string][]string{"Foo": {"bar"}},
err: "could not validate headers",
},
{
name: "non matching key",
expr: `{"expr":"Foo1 == \"bar\""}`,
headers: map[string][]string{"Foo": {"bar"}},
err: "could not validate headers: failed to select 'Foo1' on *expr.Vars: no such key 'Foo1'",
},
{
name: "matching header with hyphen - TODO",
expr: `{"expr":"Content-type == \"application/json\""}`,
headers: map[string][]string{"Content-type": {"application/json"}},
},
}
)
for _, tc := range tcc {
var (
ctx = context.Background()
)
t.Run(tc.name, func(t *testing.T) {
var (
req = require.New(t)
input = &expr.Vars{}
)
req := require.New(t)
input.Set("length", tc.limit)
r, err := http.NewRequest(http.MethodGet, "/foo", http.NoBody)
r.Header = tc.headers
r := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(tc.body))
req.NoError(err)
err := execFn(t, r, contentLengthValidator(ctx, input))
scope := &scp{"request": r}
if tc.exp != "" {
req.EqualError(err, tc.exp)
h := NewValidatorHeader()
h.Merge([]byte(tc.expr))
err = h.Exec(ctx, scope)
if tc.err != "" {
req.EqualError(err, tc.err)
} else {
req.NoError(err)
}

View File

@@ -1,165 +1,161 @@
package apigw
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/cortezaproject/corteza-server/pkg/expr"
"github.com/cortezaproject/corteza-server/system/types"
"github.com/davecgh/go-spew/spew"
)
type (
verifierQueryParam struct{}
verifierOrigin struct{}
verifierQueryParam struct {
functionMeta
params struct {
Expr string `json:"expr"`
}
}
verifierOrigin struct {
functionMeta
params struct {
Expr string `json:"expr"`
}
}
)
func NewVerifierOrigin() verifierOrigin {
return verifierOrigin{}
}
func NewVerifierOrigin() (v *verifierOrigin) {
v = &verifierOrigin{}
func NewVerifierQueryParam() verifierQueryParam {
return verifierQueryParam{}
}
v.Step = 0
v.Name = "verifierOrigin"
v.Label = "Origin verifier"
v.Kind = FunctionKindVerifier
func (h verifierQueryParam) Meta(f *types.Function) functionMeta {
return functionMeta{
Step: 0,
Name: "verifierQueryParam",
Label: "Query parameters verifier",
Kind: "verifier",
Weight: int(f.Weight),
Params: f.Params,
Args: []*functionMetaArg{
{
Type: "expr",
Label: "expr",
Options: map[string]interface{}{},
},
v.Args = []*functionMetaArg{
{
Type: "expr",
Label: "expr",
Options: map[string]interface{}{},
},
}
return
}
func (h verifierOrigin) Meta(f *types.Function) functionMeta {
return functionMeta{
Step: 0,
Name: "verifierOrigin",
Label: "Origin verifier",
Kind: "verifier",
Weight: int(f.Weight),
Params: f.Params,
Args: []*functionMetaArg{
{
Type: "expr",
Label: "expr",
Options: map[string]interface{}{},
},
func (h verifierOrigin) String() string {
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
}
func (h verifierOrigin) Meta() functionMeta {
return h.functionMeta
}
func (v *verifierOrigin) Merge(params []byte) (Handler, error) {
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params)
return v, err
}
func (h verifierOrigin) Exec(ctx context.Context, scope *scp) error {
vv := map[string]interface{}{
"origin": scope.Request().Header.Get("Origin"),
}
// get the request data and put it into vars
out, err := expr.NewVars(vv)
if err != nil {
return err
}
// spew.Dump("OUT", out)
pp := expr.NewParser()
tt, err := pp.Parse(h.params.Expr)
if err != nil {
return fmt.Errorf("could not parse matching expression: %s", err)
}
b, err := tt.Test(ctx, out)
if err != nil {
return fmt.Errorf("could not validate origin: %s", err)
}
if !b {
return fmt.Errorf("could not validate origin")
}
return nil
}
func NewVerifierQueryParam() (v *verifierQueryParam) {
v = &verifierQueryParam{}
v.Step = 0
v.Name = "verifierQueryParam"
v.Label = "Query parameters verifier"
v.Kind = "verifier"
v.Args = []*functionMetaArg{
{
Type: "expr",
Label: "expr",
Options: map[string]interface{}{},
},
}
return
}
func (h verifierQueryParam) Handler() handlerFunc {
return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error {
for k := range ff.params {
func (h verifierQueryParam) String() string {
return fmt.Sprintf("apigw function %s (%s)", h.Name, h.Label)
}
v, ok := params[k]
func (h verifierQueryParam) Meta() functionMeta {
return h.functionMeta
}
if !ok {
spew.Dump("not in params", k)
continue
}
func (v *verifierQueryParam) Merge(params []byte) (Handler, error) {
err := json.NewDecoder(bytes.NewBuffer(params)).Decode(&v.params)
return v, err
}
vv := map[string]interface{}{}
vals := scope.Request().URL.Query()
func (h verifierQueryParam) Exec(ctx context.Context, scope *scp) error {
vv := map[string]interface{}{}
vals := scope.Request().URL.Query()
for k, v := range vals {
vv[k] = v[0]
}
// get the request data and put it into vars
out, err := expr.NewVars(vv)
if err != nil {
// spew.Dump("ERR!", err)
return err
}
pp := expr.NewParser()
tt, err := pp.Parse(v.(string))
if err != nil {
// spew.Dump("ERR!", err)
return err
}
b, err := tt.Test(ctx, out)
if err != nil {
// spew.Dump("ERR!", err)
return err
}
spew.Dump("BBBB", b)
if !b {
return fmt.Errorf("failed on step %d, function %s", ff.step, ff.name)
}
}
// testing
scope.Request().Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name)
return nil
for k, v := range vals {
vv[k] = v[0]
}
}
func (h verifierOrigin) Handler() handlerFunc {
return func(ctx context.Context, scope *scp, params map[string]interface{}, ff functionHandler) error {
for k := range ff.params {
v, ok := params[k]
// get the request data and put it into vars
out, err := expr.NewVars(vv)
if !ok {
spew.Dump("not in params", k)
continue
}
vv := map[string]interface{}{
"origin": scope.Request().Header.Get("Origin"),
}
// get the request data and put it into vars
out, err := expr.NewVars(vv)
if err != nil {
spew.Dump("ERR!", err)
return err
}
pp := expr.NewParser()
tt, err := pp.Parse(v.(string))
if err != nil {
spew.Dump("ERR!", err)
return err
}
b, err := tt.Test(ctx, out)
if err != nil {
spew.Dump("ERR!", err)
return err
}
spew.Dump("BBBB", b)
if !b {
return fmt.Errorf("failed on step %d, function %s", ff.step, ff.name)
}
}
// testing
scope.Request().Header.Add(fmt.Sprintf("step_%d", ff.step), ff.name)
return nil
if err != nil {
return err
}
pp := expr.NewParser()
tt, err := pp.Parse(h.params.Expr)
if err != nil {
return fmt.Errorf("could not parse matching expression: %s", err)
}
b, err := tt.Test(ctx, out)
if err != nil {
return fmt.Errorf("could not validate query params: %s", err)
}
if !b {
return fmt.Errorf("could not validate query params")
}
// }
// testing
scope.Request().Header.Add(fmt.Sprintf("step_%d", h.Step), h.Name)
return nil
}

141
pkg/apigw/verifier_test.go Normal file
View File

@@ -0,0 +1,141 @@
package apigw
import (
"context"
"net/http"
"testing"
"github.com/stretchr/testify/require"
)
func Test_verifierQueryParam(t *testing.T) {
type (
tf struct {
name string
expr string
err string
url string
}
)
var (
tcc = []tf{
{
name: "matching simple query parameter",
expr: `{"expr":"foo == \"bar\""}`,
url: "https://examp.le?foo=bar",
},
{
name: "matching simple query parameter - invalid expression key",
expr: `{"expr1":"foo == \"bar\""}`,
url: "https://examp.le?foo=bar",
err: "could not parse matching expression: parsing error: - 1:1 unexpected EOF while scanning extensions",
},
{
name: "matching simple query parameter - missing value",
expr: `{"expr":"foo == \"bar\""}`,
url: "https://examp.le?foo=bar1",
err: "could not validate query params",
},
{
name: "matching simple query parameter - missing value",
expr: `{"expr":"foo == \"bar-baz\""}`,
url: "https://examp.le?foo=bar-baz",
},
}
)
for _, tc := range tcc {
var (
ctx = context.Background()
)
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
r, err := http.NewRequest(http.MethodGet, tc.url, http.NoBody)
req.NoError(err)
scope := &scp{"request": r}
h := NewVerifierQueryParam()
h.Merge([]byte(tc.expr))
err = h.Exec(ctx, scope)
if tc.err != "" {
req.EqualError(err, tc.err)
} else {
req.NoError(err)
}
})
}
}
func Test_verifierOrigin(t *testing.T) {
type (
tf struct {
name string
expr string
err string
o string
}
)
var (
tcc = []tf{
{
name: "matching simple origin value",
expr: `{"expr":"origin == \"https://www.google.com\""}`,
o: "https://www.google.com",
},
{
name: "matching simple nonexistent origin value",
expr: `{"expr":"origin == \"https://www.google.com\""}`,
o: "",
err: "could not validate origin",
},
{
name: "matching simple origin value - invalid expression key",
expr: `{"expr1":"origin == \"https://www.google.com\""}`,
o: "",
err: "could not parse matching expression: parsing error: \t - 1:1 unexpected EOF while scanning extensions",
},
{
name: "matching simple origin value - invalid expression key",
expr: `{"expr1":"origin == \"https"}`,
o: "",
err: "could not parse matching expression: parsing error: \t - 1:1 unexpected EOF while scanning extensions",
},
}
)
for _, tc := range tcc {
var (
ctx = context.Background()
)
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
r, err := http.NewRequest(http.MethodGet, "/foo", http.NoBody)
r.Header.Set("Origin", tc.o)
req.NoError(err)
scope := &scp{"request": r}
h := NewVerifierOrigin()
h.Merge([]byte(tc.expr))
err = h.Exec(ctx, scope)
if tc.err != "" {
req.EqualError(err, tc.err)
} else {
req.NoError(err)
}
})
}
}

View File

@@ -7,10 +7,11 @@ package resource
//
// Definitions file that controls how this file is generated:
// - system.apigw-function.yaml
// - system.apigw-route.yaml
// - system.application.yaml
// - system.auth-client.yaml
// - system.role.yaml
// - system.route.yaml
// - system.template.yaml
// - system.user.yaml
// - system.yaml
@@ -19,6 +20,32 @@ import (
"github.com/cortezaproject/corteza-server/system/types"
)
// SystemApigwFunctionRbacReferences generates RBAC references
//
// Resources with "envoy: false" are skipped
//
// This function is auto-generated
func SystemApigwFunctionRbacReferences(apigwFunction string) (res *Ref, pp []*Ref, err error) {
if apigwFunction != "*" {
res = &Ref{ResourceType: types.ApigwFunctionResourceType, Identifiers: MakeIdentifiers(apigwFunction)}
}
return
}
// SystemApigwRouteRbacReferences generates RBAC references
//
// Resources with "envoy: false" are skipped
//
// This function is auto-generated
func SystemApigwRouteRbacReferences(apigwRoute string) (res *Ref, pp []*Ref, err error) {
if apigwRoute != "*" {
res = &Ref{ResourceType: types.ApigwRouteResourceType, Identifiers: MakeIdentifiers(apigwRoute)}
}
return
}
// SystemApplicationRbacReferences generates RBAC references
//
// Resources with "envoy: false" are skipped
@@ -58,19 +85,6 @@ func SystemRoleRbacReferences(role string) (res *Ref, pp []*Ref, err error) {
return
}
// SystemRouteRbacReferences generates RBAC references
//
// Resources with "envoy: false" are skipped
//
// This function is auto-generated
func SystemRouteRbacReferences(route string) (res *Ref, pp []*Ref, err error) {
if route != "*" {
res = &Ref{ResourceType: types.RouteResourceType, Identifiers: MakeIdentifiers(route)}
}
return
}
// SystemTemplateRbacReferences generates RBAC references
//
// Resources with "envoy: false" are skipped

View File

@@ -16,10 +16,11 @@ package resource
// - compose.page.yaml
// - compose.record.yaml
// - compose.yaml
// - system.apigw-function.yaml
// - system.apigw-route.yaml
// - system.application.yaml
// - system.auth-client.yaml
// - system.role.yaml
// - system.route.yaml
// - system.template.yaml
// - system.user.yaml
// - system.yaml
@@ -169,6 +170,26 @@ func ParseRule(res string) (string, *Ref, []*Ref, error) {
// Component resource, no path
return composeTypes.ComponentResourceType, nil, nil, nil
case systemTypes.ApigwFunctionResourceType:
if len(path) != 1 {
return "", nil, nil, fmt.Errorf("expecting 1 reference components in path, got %d", len(path))
}
ref, pp, err := SystemApigwFunctionRbacReferences(
// apigwFunction
path[0],
)
return systemTypes.ApigwFunctionResourceType, ref, pp, err
case systemTypes.ApigwRouteResourceType:
if len(path) != 1 {
return "", nil, nil, fmt.Errorf("expecting 1 reference components in path, got %d", len(path))
}
ref, pp, err := SystemApigwRouteRbacReferences(
// apigwRoute
path[0],
)
return systemTypes.ApigwRouteResourceType, ref, pp, err
case systemTypes.ApplicationResourceType:
if len(path) != 1 {
return "", nil, nil, fmt.Errorf("expecting 1 reference components in path, got %d", len(path))
@@ -199,16 +220,6 @@ func ParseRule(res string) (string, *Ref, []*Ref, error) {
)
return systemTypes.RoleResourceType, ref, pp, err
case systemTypes.RouteResourceType:
if len(path) != 1 {
return "", nil, nil, fmt.Errorf("expecting 1 reference components in path, got %d", len(path))
}
ref, pp, err := SystemRouteRbacReferences(
// route
path[0],
)
return systemTypes.RouteResourceType, ref, pp, err
case systemTypes.TemplateResourceType:
if len(path) != 1 {
return "", nil, nil, fmt.Errorf("expecting 1 reference components in path, got %d", len(path))

47
pkg/options/apigw.gen.go generated Normal file
View File

@@ -0,0 +1,47 @@
package options
// This file is auto-generated.
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
//
// Definitions file that controls how this file is generated:
// pkg/options/apigw.yaml
import (
"time"
)
type (
ApigwOpt struct {
Enabled bool `env:"APIGW_ENABLED"`
LogEnabled bool `env:"APIGW_LOG_ENABLED"`
ProxyEnableDebugLog bool `env:"APIGW_PROXY_ENABLE_DEBUG_LOG"`
ProxyFollowRedirects bool `env:"APIGW_PROXY_FOLLOW_REDIRECTS"`
ProxyOutboundTimeout time.Duration `env:"APIGW_PROXY_OUTBOUND_TIMEOUT"`
}
)
// Apigw initializes and returns a ApigwOpt with default values
func Apigw() (o *ApigwOpt) {
o = &ApigwOpt{
Enabled: true,
LogEnabled: false,
ProxyEnableDebugLog: false,
ProxyFollowRedirects: true,
ProxyOutboundTimeout: time.Second * 30,
}
fill(o)
// Function that allows access to custom logic inside the parent function.
// The custom logic in the other file should be like:
// func (o *Apigw) Defaults() {...}
func(o interface{}) {
if def, ok := o.(interface{ Defaults() }); ok {
def.Defaults()
}
}(o)
return
}

37
pkg/options/apigw.yaml Normal file
View File

@@ -0,0 +1,37 @@
docs:
title: API Gateway
imports:
- time
props:
- name: Enabled
type: bool
default: true
description: |-
Enable API Gateway
- name: logEnabled
type: bool
default: false
description: |-
Enable extra logging
- name: proxyEnableDebugLog
type: bool
default: false
description: |-
Enable full debug log on requests / responses - warning, includes sensitive data
- name: proxyFollowRedirects
type: bool
default: true
description: |-
Follow redirects on proxy requests
- name: proxyOutboundTimeout
type: time.Duration
default: time.Second * 30
description: |-
Outbound request timeout