3
0

Improve API GW route mounting for easier remounting

This commit is contained in:
Tomaž Jerman 2021-10-01 18:40:20 +02:00 committed by Denis Arh
parent 28148d30c6
commit 0d429fa641
9 changed files with 182 additions and 104 deletions

View File

@ -2,6 +2,7 @@ package app
import (
"context"
"net/http"
"github.com/cortezaproject/corteza-server/auth/settings"
"github.com/cortezaproject/corteza-server/pkg/options"
@ -34,6 +35,10 @@ type (
Watch(ctx context.Context)
}
apigwServicer interface {
http.Handler
}
CortezaApp struct {
Opt *options.Options
lvl int
@ -56,7 +61,8 @@ type (
GrpcServer grpcServer
WsServer wsServer
AuthService authServicer
AuthService authServicer
ApigwService apigwServicer
systemEntitiesInitialized bool
}

View File

@ -16,6 +16,7 @@ import (
fdrService "github.com/cortezaproject/corteza-server/federation/service"
fedService "github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/apigw"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/corredor"
"github.com/cortezaproject/corteza-server/pkg/eventbus"
@ -351,11 +352,18 @@ func (app *CortezaApp) InitServices(ctx context.Context) (err error) {
rbac.SetGlobal(ac)
}
// Initialize resource translation stuff
locale.Global().BindStore(app.Store)
if err = locale.Global().ReloadResourceTranslations(ctx); err != nil {
return err
}
// Initialize API GW bits
apigw.Setup(options.Apigw(), app.Log, app.Store)
if err = apigw.Service().Reload(ctx); err != nil {
return err
}
if app.Opt.Messagebus.Enabled {
// initialize all the queue handlers
messagebus.Service().Init(ctx, app.Store)
@ -482,6 +490,8 @@ func (app *CortezaApp) Activate(ctx context.Context) (err error) {
return fmt.Errorf("failed to init auth service: %w", err)
}
app.ApigwService = apigw.Service()
updateFederationSettings(app.Opt.Federation, sysService.CurrentSettings)
updateAuthSettings(app.AuthService, sysService.CurrentSettings)
sysService.DefaultSettings.Register("auth.", func(ctx context.Context, current interface{}, set types.SettingValueSet) {

View File

@ -10,12 +10,10 @@ import (
automationRest "github.com/cortezaproject/corteza-server/automation/rest"
composeRest "github.com/cortezaproject/corteza-server/compose/rest"
"github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/docs"
federationRest "github.com/cortezaproject/corteza-server/federation/rest"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/api/server"
"github.com/cortezaproject/corteza-server/pkg/apigw"
"github.com/cortezaproject/corteza-server/pkg/logger"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/pkg/webapp"
@ -106,12 +104,6 @@ func (app *CortezaApp) mountHttpRoutes(r chi.Router) {
r.Route("/federation", federationRest.MountRoutes)
}
// API Gateway
{
apigw.Setup(options.Apigw(), service.DefaultLogger, service.DefaultStore)
r.Route("/", apigw.Service().Router)
}
var fullpathDocs = options.CleanBase(ho.BaseUrl, ho.ApiBaseUrl, "docs")
app.Log.Info(
"API docs enabled",

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"net/http"
"github.com/cortezaproject/corteza-server/pkg/apigw/filter"
"github.com/cortezaproject/corteza-server/pkg/apigw/filter/proxy"
@ -29,9 +30,8 @@ type (
log *zap.Logger
reg *registry.Registry
routes []*route
router chi.Router
mx *chi.Mux
storer storer
reload chan bool
}
)
@ -66,18 +66,21 @@ func New(opts *options.ApigwOpt, logger *zap.Logger, storer storer) *apigw {
log: logger,
storer: storer,
reg: reg,
reload: make(chan bool),
}
}
func (s *apigw) Reload(ctx context.Context) {
go func() {
s.reload <- true
}()
// ServeHTTP forwards the given HTTP request to the underlying chi mux which
// then handles the heavy lifting
//
// When reloading routes, make sure to replace the original mux
func (s *apigw) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mx.ServeHTTP(w, r)
}
// ReloadHandler is a wrapper for route reloading logic, primarily needed for testing
func (s *apigw) ReloadHandler(ctx context.Context) {
// Reload reloads routes and their filters
//
// The procedure constructs a new chi mux
func (s *apigw) Reload(ctx context.Context) (err error) {
routes, err := s.loadRoutes(ctx)
if err != nil {
@ -89,81 +92,14 @@ func (s *apigw) ReloadHandler(ctx context.Context) {
s.Init(ctx, routes...)
for _, route := range s.routes {
s.router.Handle(route.endpoint, route)
}
}
func (s *apigw) loadRoutes(ctx context.Context) (rr []*route, err error) {
routes, _, err := s.storer.SearchApigwRoutes(ctx, st.ApigwRouteFilter{
Enabled: true,
Deleted: f.StateExcluded,
})
if err != nil {
return
// Rebuild the mux
s.mx = chi.NewMux()
s.mx.HandleFunc("/", helperDefaultResponse(s.opts))
for _, r := range s.routes {
s.mx.Handle(r.endpoint, r)
}
for _, r := range routes {
route := &route{
ID: r.ID,
endpoint: r.Endpoint,
method: r.Method,
meta: routeMeta{
debug: r.Meta.Debug,
async: r.Meta.Async,
},
}
rr = append(rr, route)
}
return
}
func (s *apigw) loadFilters(ctx context.Context, route uint64) (ff []*st.ApigwFilter, err error) {
ff, _, err = s.storer.SearchApigwFilters(ctx, st.ApigwFilterFilter{
RouteID: route,
Deleted: f.StateExcluded,
})
return
}
func (s *apigw) Router(r chi.Router) {
var (
ctx = context.Background()
)
s.router = r
s.router.HandleFunc("/", helperDefaultResponse(s.opts))
routes, err := s.loadRoutes(ctx)
if err != nil {
s.log.Error("could not load routes", zap.Error(err))
return
}
s.Init(ctx, routes...)
for _, route := range s.routes {
s.router.Handle(route.endpoint, route)
}
go func() {
for {
select {
case <-s.reload:
s.ReloadHandler(ctx)
case <-ctx.Done():
s.log.Debug("shutting down API Gateway service")
return
}
}
}()
return nil
}
// Init all the routes
@ -292,3 +228,40 @@ func (s *apigw) ProxyAuthDef() (list []*proxy.ProxyAuthDefinition) {
list = proxy.ProxyAuthDef()
return
}
func (s *apigw) loadRoutes(ctx context.Context) (rr []*route, err error) {
routes, _, err := s.storer.SearchApigwRoutes(ctx, st.ApigwRouteFilter{
Enabled: true,
Deleted: f.StateExcluded,
})
if err != nil {
return
}
for _, r := range routes {
route := &route{
ID: r.ID,
endpoint: r.Endpoint,
method: r.Method,
meta: routeMeta{
debug: r.Meta.Debug,
async: r.Meta.Async,
},
}
rr = append(rr, route)
}
return
}
func (s *apigw) loadFilters(ctx context.Context, route uint64) (ff []*st.ApigwFilter, err error) {
ff, _, err = s.storer.SearchApigwFilters(ctx, st.ApigwFilterFilter{
RouteID: route,
Deleted: f.StateExcluded,
Enabled: true,
})
return
}

View File

@ -9,5 +9,10 @@ import (
func (s Store) convertApigwRouteFilter(f types.ApigwRouteFilter) (query squirrel.SelectBuilder, err error) {
query = s.apigwRoutesSelectBuilder()
query = filter.StateCondition(query, "ar.deleted_at", f.Deleted)
if f.Enabled {
query = query.Where(squirrel.Eq{"ar.enabled": f.Enabled})
}
return
}

View File

@ -88,7 +88,9 @@ func (svc *apigwRoute) Create(ctx context.Context, new *types.ApigwRoute) (q *ty
// send the signal to reload all routes
if new.Enabled {
apigw.Service().Reload(ctx)
if err = apigw.Service().Reload(ctx); err != nil {
return err
}
}
return nil
@ -129,8 +131,10 @@ func (svc *apigwRoute) Update(ctx context.Context, upd *types.ApigwRoute) (q *ty
q = upd
// send the signal to reload all route
if qq.Enabled != upd.Enabled {
apigw.Service().Reload(ctx)
if qq.Enabled != upd.Enabled || qq.Enabled && upd.Enabled {
if err = apigw.Service().Reload(ctx); err != nil {
return err
}
}
return nil
@ -169,7 +173,9 @@ func (svc *apigwRoute) DeleteByID(ctx context.Context, ID uint64) (err error) {
// send the signal to reload all queues
if q.Enabled {
apigw.Service().Reload(ctx)
if err = apigw.Service().Reload(ctx); err != nil {
return err
}
}
return nil
@ -208,7 +214,9 @@ func (svc *apigwRoute) UndeleteByID(ctx context.Context, ID uint64) (err error)
// send the signal to reload all queues
if q.Enabled {
apigw.Service().Reload(ctx)
if err = apigw.Service().Reload(ctx); err != nil {
return err
}
}
return nil

View File

@ -26,6 +26,7 @@ import (
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/store/sqlite3"
"github.com/cortezaproject/corteza-server/system/rest"
"github.com/cortezaproject/corteza-server/system/service"
sysTypes "github.com/cortezaproject/corteza-server/system/types"
"github.com/cortezaproject/corteza-server/tests/helpers"
@ -58,8 +59,8 @@ func init() {
}
func InitTestApp() {
ctx := cli.Context()
if testApp == nil {
ctx := cli.Context()
testApp = helpers.NewIntegrationTestApp(ctx, func(app *app.CortezaApp) (err error) {
service.DefaultStore, err = sqlite3.ConnectInMemory(ctx)
@ -77,9 +78,17 @@ func InitTestApp() {
r.Use(server.BaseMiddleware(false, logger.Default())...)
helpers.BindAuthMiddleware(r)
// setup API GW routes
// Sys routes for route management tests
rest.MountRoutes(r)
// API gw routes
apigw.Setup(options.Apigw(), service.DefaultLogger, service.DefaultStore)
r.Route("/", apigw.Service().Router)
err := apigw.Service().Reload(ctx)
if err != nil {
panic(err)
}
r.Handle("/*", apigw.Service())
}
}
@ -133,7 +142,7 @@ func (h helper) apiInit() *apitest.APITest {
func setupScenario(t *testing.T) (context.Context, helper, store.Storer) {
ctx, h, s := setup(t)
loadScenario(ctx, s, t, h)
reloadRoutes(ctx)
apigw.Service().Reload(ctx)
return ctx, h, s
}
@ -152,10 +161,6 @@ func setup(t *testing.T) (context.Context, helper, store.Storer) {
return ctx, h, s
}
func reloadRoutes(ctx context.Context) {
apigw.Service().ReloadHandler(ctx)
}
// Unwraps error before it passes it to the tester
func (h helper) noError(err error) {
for errors.Unwrap(err) != nil {

View File

@ -0,0 +1,65 @@
package apigw
import (
"fmt"
"net/http"
"testing"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/tests/helpers"
)
func Test_route_reload(t *testing.T) {
var (
ctx, h, s = setupScenario(t)
)
r, err := store.LookupApigwRouteByEndpoint(ctx, s, "/test")
h.a.NoError(err)
h.apiInit().
Get("/test").
Header("Accept", "application/json").
Expect(t).
Status(http.StatusOK).
Body("60").
End()
helpers.AllowMe(h, r.RbacResource(), "update")
h.apiInit().
Put(fmt.Sprintf("/apigw/route/%d", r.ID)).
Header("Accept", "application/json").
FormData("endpoint", "/test").
FormData("method", "GET").
FormData("enabled", "false").
Expect(t).
Status(http.StatusOK).
Assert(helpers.AssertNoErrors).
End()
h.apiInit().
Get("/test").
Header("Accept", "application/json").
Expect(t).
Status(http.StatusNotFound).
End()
h.apiInit().
Put(fmt.Sprintf("/apigw/route/%d", r.ID)).
Header("Accept", "application/json").
FormData("endpoint", "/test").
FormData("method", "GET").
FormData("enabled", "true").
Expect(t).
Status(http.StatusOK).
Assert(helpers.AssertNoErrors).
End()
h.apiInit().
Get("/test").
Header("Accept", "application/json").
Expect(t).
Status(http.StatusOK).
Body("60").
End()
}

View File

@ -0,0 +1,14 @@
apigateway:
- endpoint: /test
method: GET
enabled: true
filters:
- ref: "payload"
kind: "processer"
params:
jsfunc: |
const x = 10;
const y = 20;
const z = x + y;
return x + y + z