3
0

Refactored, updated tests

This commit is contained in:
Peter Grlica 2022-03-18 16:39:02 +01:00
parent 2617cb8e5b
commit e4dc6dc5f5
12 changed files with 221 additions and 210 deletions

View File

@ -3,6 +3,7 @@ package ctx
import (
"context"
"github.com/cortezaproject/corteza-server/pkg/apigw/profiler"
"github.com/cortezaproject/corteza-server/pkg/apigw/types"
)
@ -29,12 +30,12 @@ func ProfilerToContext(ctx context.Context, h interface{}) context.Context {
return context.WithValue(ctx, ContextKeyProfiler, h)
}
func ProfilerFromContext(ctx context.Context) (h interface{}) {
func ProfilerFromContext(ctx context.Context) (h *profiler.Hit) {
hh := ctx.Value(ContextKeyProfiler)
if hh == nil {
return nil
}
return hh
return hh.(*profiler.Hit)
}

View File

@ -9,7 +9,6 @@ import (
"time"
agctx "github.com/cortezaproject/corteza-server/pkg/apigw/ctx"
prf "github.com/cortezaproject/corteza-server/pkg/apigw/profiler"
"github.com/cortezaproject/corteza-server/pkg/apigw/types"
pe "github.com/cortezaproject/corteza-server/pkg/errors"
"github.com/cortezaproject/corteza-server/pkg/expr"
@ -261,7 +260,7 @@ func (pr *profiler) Handler() types.HandlerFunc {
scope = agctx.ScopeFromContext(ctx)
)
if pr.opts.ProfilerEnabled && pr.opts.ProfilerGlobal {
if pr.opts.ProfilerGlobal {
// profiler global overrides any profiling prefilter
// the hit is registered on lower level
return
@ -276,8 +275,8 @@ func (pr *profiler) Handler() types.HandlerFunc {
return
}
hit.(*prf.Hit).Ts = &n
hit.(*prf.Hit).R = scope.Request()
hit.Ts = &n
hit.R = scope.Request()
r = r.WithContext(agctx.ProfilerToContext(r.Context(), hit))

View File

@ -5,7 +5,10 @@ import (
"net/http/httptest"
"testing"
agctx "github.com/cortezaproject/corteza-server/pkg/apigw/ctx"
prf "github.com/cortezaproject/corteza-server/pkg/apigw/profiler"
"github.com/cortezaproject/corteza-server/pkg/apigw/types"
h "github.com/cortezaproject/corteza-server/pkg/http"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/stretchr/testify/require"
)
@ -149,6 +152,64 @@ func Test_queryParamHandle(t *testing.T) {
}
}
func Test_profilerHandle_profilerGlobal(t *testing.T) {
type (
tfp struct {
name string
opts *options.ApigwOpt
r *http.Request
exp *h.Request
}
)
var (
rr = httptest.NewRequest(http.MethodGet, "/foo", http.NoBody)
tcc = []tfp{
{
name: "skip profiler hit on profiler global = true",
opts: &options.ApigwOpt{ProfilerGlobal: true},
r: rr,
exp: nil,
},
{
name: "add profiler hit on profiler global = false",
opts: &options.ApigwOpt{ProfilerGlobal: false},
r: rr,
exp: createRequest(rr),
},
}
)
for _, tc := range tcc {
var (
req = require.New(t)
ph = NewProfiler(tc.opts)
hfn = ph.Handler()
hr = createRequest(tc.r)
hit = &prf.Hit{}
)
req.Nil(hit.R)
tc.r = tc.r.WithContext(agctx.ScopeToContext(tc.r.Context(), &types.Scp{"request": hr}))
tc.r = tc.r.WithContext(agctx.ProfilerToContext(tc.r.Context(), hit))
err := hfn(httptest.NewRecorder(), tc.r)
req.NoError(err)
scoped := agctx.ProfilerFromContext(tc.r.Context())
req.Equal(tc.exp, scoped.R)
}
}
func createRequest(r *http.Request) (hr *h.Request) {
hr, _ = h.NewRequest(r)
return
}
func testMerge(h types.Handler, tc tf) func(t *testing.T) {
return func(t *testing.T) {
var (

57
pkg/apigw/profiler/hit.go Normal file
View File

@ -0,0 +1,57 @@
package profiler
import (
"encoding/base64"
"fmt"
"time"
h "github.com/cortezaproject/corteza-server/pkg/http"
)
type (
Hit struct {
ID string
Status int
Route uint64
R *h.Request
Ts *time.Time
Tf *time.Time
D *time.Duration
}
Hits map[string][]*Hit
)
func (h *Hit) generateID() {
h.ID = base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%s_%d", h.R.URL.Path, h.Ts.UnixNano())))
}
func (s Hits) Filter(fn func(k string, v *Hit) bool) Hits {
ss := Hits{}
for k, v := range s {
for _, vv := range v {
if !fn(k, vv) {
continue
}
ss[k] = append(ss[k], vv)
}
}
return ss
}
func (h Hits) Len() int {
return h.Len()
}
func (h Hits) Less(i, j int) bool {
return false
}
func (h Hits) Swap(i, j int) {
return
}

View File

@ -1,8 +1,6 @@
package profiler
import (
"encoding/base64"
"fmt"
"net/http"
"time"
@ -10,31 +8,9 @@ import (
)
type (
Hits map[string][]*Hit
Profiler struct {
l Hits
}
Hit struct {
ID string
Status int
Route uint64
R *h.Request
Ts *time.Time
Tf *time.Time
D *time.Duration
}
CtxHit []*Stage
Stage struct {
Name string
Ts *time.Time
Tf *time.Time
}
)
func New() *Profiler {
@ -90,35 +66,3 @@ func (p *Profiler) Hits(s Sort) Hits {
func (p *Profiler) id(r *h.Request) string {
return r.URL.Path
}
func (h *Hit) generateID() {
h.ID = base64.URLEncoding.EncodeToString([]byte(fmt.Sprintf("%s_%d", h.R.URL.Path, h.Ts.UnixNano())))
}
func (s Hits) Filter(fn func(k string, v *Hit) bool) Hits {
ss := Hits{}
for k, v := range s {
for _, vv := range v {
if !fn(k, vv) {
continue
}
ss[k] = append(ss[k], vv)
}
}
return ss
}
func (h Hits) Len() int {
return h.Len()
}
func (h Hits) Less(i, j int) bool {
return false
}
func (h Hits) Swap(i, j int) {
return
}

View File

@ -1,157 +1,113 @@
package profiler
// import (
// "net/http/httptest"
// "strings"
// "testing"
// "time"
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
// "github.com/cortezaproject/corteza-server/pkg/filter"
// h "github.com/cortezaproject/corteza-server/pkg/http"
// "github.com/cortezaproject/corteza-server/system/types"
// "github.com/davecgh/go-spew/spew"
// )
h "github.com/cortezaproject/corteza-server/pkg/http"
"github.com/stretchr/testify/require"
)
// func Test_Profiler(t *testing.T) {
// // var (
// // p = Profiler{
// // l: make(map[string]*Hit),
// // }
// // req = require.New(t)
// // )
const (
day = time.Hour * 24
)
// // // in goes the h.Request
// // rr, err := http.NewRequest("POST", "/foo", strings.NewReader(`foo`))
func Test_ApigwProfiler_newHit(t *testing.T) {
var (
p = New()
req = require.New(t)
// // req.NoError(err)
rr, err = h.NewRequest(httptest.NewRequest("POST", "/foo", strings.NewReader(`foo`)))
hit = p.Hit(rr)
)
// // hh, err := h.NewRequest(rr)
// // req.NoError(err)
req.NoError(err)
req.Equal(hit.Status, http.StatusOK)
req.NotNil(hit.Ts)
req.NotNil(hit.ID)
}
// // // need to create an internal profiling struct to hold the request?
// // p.Push(hh)
func Test_ApigwProfiler_push(t *testing.T) {
var (
p = New()
req = require.New(t)
rr, err = h.NewRequest(httptest.NewRequest("POST", "/foo", strings.NewReader(`foo`)))
// // spew.Dump(p.l)
hit = p.Hit(rr)
id = p.Push(hit)
)
// t.Fail()
// }
_, found := p.l[id]
// func Test_Profiler2(t *testing.T) {
// // types:
// // + list of hits, aggregated by endpoint (ie /parse/js)
// // - list of hits for a specific endpoint
// // - list of hits for a specific registered route
req.NoError(err)
req.NotEmpty(id)
req.Len(p.l, 1)
req.True(found)
}
// now := time.Now()
// then := time.Date(2022, time.March, 1, 1, 1, 1, 0, time.UTC)
// later := time.Date(2022, time.March, 1, 1, 1, 1, 30, time.UTC)
func Test_ApigwProfiler_filterPath(t *testing.T) {
var (
pp = New()
req = require.New(t)
// pp := New()
// hr, _ := h.NewRequest(httptest.NewRequest("POST", "/foo", strings.NewReader(`foo`)))
// hr2, _ := h.NewRequest(httptest.NewRequest("GET", "/sometotherpath", strings.NewReader(`foo`)))
// pp.Push(&Hit{R: hr, Ts: &then})
// pp.Push(&Hit{R: hr, Ts: &then})
// pp.Push(&Hit{R: hr, Ts: &now})
// pp.Push(&Hit{R: hr, Ts: &now})
// pp.Push(&Hit{R: hr2, Ts: &now})
// pp.Push(&Hit{R: hr2, Ts: &later})
// pp.Push(&Hit{R: hr2, Ts: &later})
// pp.Push(&Hit{R: hr2, Ts: &later})
// pp.Push(&Hit{R: hr2, Ts: &later})
// pp.Push(&Hit{R: hr2, Ts: &later})
// pp.Push(&Hit{R: hr2, Ts: &later})
// pp.Push(&Hit{R: hr2, Ts: &later})
now = time.Date(2022, time.March, 1, 1, 1, 1, 0, time.UTC)
then = now.Add(-1 * day)
later = now.Add(day)
// var err error
// f := types.ApigwProfilerFilter{}
// if f.Sorting, err = filter.NewSorting("count DESC"); err != nil {
// spew.Dump(err)
// }
hr, _ = h.NewRequest(httptest.NewRequest("POST", "/foo", strings.NewReader(`foo`)))
hr2, _ = h.NewRequest(httptest.NewRequest("GET", "/bar", strings.NewReader(`foo`)))
hr3, _ = h.NewRequest(httptest.NewRequest("GET", "/baz", strings.NewReader(`foo`)))
)
// list := pp.Dump(Sort{})
pp.Push(&Hit{R: hr, Ts: &then})
pp.Push(&Hit{R: hr, Ts: &then})
pp.Push(&Hit{R: hr2, Ts: &now})
pp.Push(&Hit{R: hr3, Ts: &later})
// // list of aggregations
// // - keep showing, just refresh
list := pp.Hits(Sort{
Path: hr3.RequestURI,
})
// // list := pp.Dump(Sort{Before: &later, Size: 3})
// // spew.Dump(list)
_, found := list[hr3.RequestURI]
// // list = list.Filter(func(k string, v *Hit) bool {
// // if v.R.URL.Path == "/sometotherpath" {
// // return true
// // }
req.True(found)
req.Len(list[hr3.RequestURI], 1)
}
// // return false
// // })
func Test_ApigwProfiler_filterHit(t *testing.T) {
var (
pp = New()
req = require.New(t)
// // spew.Dump("LIST", list)
now = time.Date(2022, time.March, 1, 1, 1, 1, 0, time.UTC)
then = now.Add(-1 * day)
later = now.Add(day)
// // for p, v := range list {
// // for _, vv := range v {
// // spew.Dump(fmt.Sprintf("Path: %s, S: %s, F: %s", p, vv.Ts, vv.Ts))
// // }
// // }
hr, _ = h.NewRequest(httptest.NewRequest("POST", "/foo", strings.NewReader(`foo`)))
hr2, _ = h.NewRequest(httptest.NewRequest("GET", "/bar", strings.NewReader(`foo`)))
hr3, _ = h.NewRequest(httptest.NewRequest("GET", "/baz", strings.NewReader(`foo`)))
)
// var (
// r = make(types.ApigwProfilerAggregationSet, 0)
// tsum, tmin, tmax time.Duration
// ssum, smin, smax int64
// i uint64 = 1
// )
pp.Push(&Hit{R: hr, Ts: &then})
pp.Push(&Hit{R: hr, Ts: &now})
pp.Push(&Hit{R: hr2, Ts: &now})
pp.Push(&Hit{R: hr2, Ts: &later})
pp.Push(&Hit{R: hr2, Ts: &later})
// for p, v := range list {
// tmin, tmax, tsum = time.Hour, 0, 0
// smin, smax, ssum = 0, 0, 0
h := pp.Hit(hr3)
h.Ts = &later
// i = 0
id := pp.Push(h)
// for _, vv := range v {
// var (
// d = vv.Tf.Sub(*vv.Ts)
// s = vv.R.ContentLength
// )
list := pp.Hits(Sort{
Hit: h.ID,
})
// if d < tmin {
// tmin = d
// }
_, found := list[id]
// if d > tmax {
// tmax = d
// }
// if s < smin {
// smin = s
// }
// if s > smax {
// smax = s
// }
// tsum += d
// ssum += s
// i++
// }
// spew.Dump("TSUM", tsum.Seconds())
// r = append(r, &types.ApigwProfilerAggregation{
// Path: p,
// Count: i,
// Tmin: tmin,
// Tmax: tmax,
// Tavg: time.Duration(int64(tsum.Seconds()/float64(i))) * time.Second,
// Smin: smin,
// Smax: smax,
// Savg: float64(ssum) / float64(i),
// })
// }
// spew.Dump(r)
// SortAggregation(&r, &f)
// spew.Dump(r)
// t.Fail()
// }
req.True(found)
req.Len(list[id], 1)
}

View File

@ -2,9 +2,7 @@ package profiler
type (
Sort struct {
Hit string
Path string
Size uint64
Before string
Hit string
Path string
}
)

View File

@ -79,7 +79,7 @@ func (r route) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
if hit = actx.ProfilerFromContext(req.Context()).(*profiler.Hit); hit != nil && hit.R != nil {
if hit = actx.ProfilerFromContext(req.Context()); hit != nil && hit.R != nil {
// updated hit from a possible prefilter
// we need to push route ID even if the profiler is disabled
hit.Route = r.ID

View File

@ -127,7 +127,6 @@ func (s *apigw) Reload(ctx context.Context) (err error) {
var (
defaultMethodResponse = helperMethodNotAllowed(s.opts, s.pr)
defaultResponse = helperDefaultResponse(s.opts, s.pr)
// profiler = helperProfiler(s.opts, s.pr)
)
s.mx.NotFound(defaultResponse)

View File

@ -1690,7 +1690,7 @@ endpoints:
path:
- { type: uint64, name: queueID, required: true, title: Queue ID }
- title: API Gateway routes
- title: Integration gateway routes
path: "/apigw/route"
entrypoint: apigwRoute
authentication: []
@ -1751,7 +1751,7 @@ endpoints:
path: "/{routeID}/undelete"
parameters: { path: [ { name: routeID, type: uint64, required: true, title: "Route ID" } ] }
- title: API Gateway filters
- title: Integration gateway filters
path: "/apigw/filter"
entrypoint: apigwFilter
authentication: []
@ -1822,7 +1822,7 @@ endpoints:
title: Proxy auth definitions
path: "/proxy_auth/def"
- title: API Gateway profiler
- title: Integration gateway profiler
path: "/apigw/profiler"
entrypoint: apigwProfiler
authentication: []

View File

@ -17,7 +17,7 @@ import (
var (
sortAggFields = []string{"path", "count", "size_min", "size_max", "size_avg", "time_min", "time_max", "time_avg"}
sortRouteFields = []string{"time_start", "time_finish", "time_duration"}
sortRouteFields = []string{"time_start", "time_finish", "time_duration", "content_length", "http_status_code"}
)
const (
@ -56,9 +56,8 @@ func (svc *apigwProfiler) Hits(ctx context.Context, filter types.ApigwProfilerFi
}
var sorting = profiler.Sort{
Hit: filter.Hit,
Path: filter.Path,
Before: filter.Before,
Hit: filter.Hit,
Path: filter.Path,
}
var (
@ -130,8 +129,7 @@ func (svc *apigwProfiler) HitsAggregated(ctx context.Context, filter types.Apigw
var (
list = apigw.Service().Profiler().Hits(profiler.Sort{
Path: filter.Path,
Before: filter.Before,
Path: filter.Path,
})
tsum, tmin, tmax time.Duration

View File

@ -207,10 +207,8 @@ type (
Enabled bool `kv:"-" json:"enabled"`
} `kv:"federation" json:"federation"`
// Federation settings
// Integration gateway settings
Apigw struct {
// This only holds the value of APIGW_PROFILER_ENABLED for now
//
ProfilerEnabled bool `kv:"-" json:"profilerEnabled"`
ProfilerGlobal bool `kv:"-" json:"profilerGlobal"`
} `kv:"apigw" json:"apigw"`