3
0

Add support for paging & slight refactor

This commit is contained in:
Tomaž Jerman 2021-07-23 16:23:53 +02:00
parent 67a79602f9
commit 9872d92c95
13 changed files with 1149 additions and 503 deletions

View File

@ -75,6 +75,8 @@ func (svc record) Datasource(ctx context.Context, ld *report.LoadStepDefinition)
c = report.MakeColumnOfKind("Record")
c.Name = "id"
c.Label = "Record ID"
c.Primary = true
c.Unique = true
cols = append(cols, c)
for _, f := range mod.Fields {

View File

@ -71,6 +71,23 @@ func NewPaging(limit uint, cursor string) (p Paging, err error) {
return
}
func (p *Paging) Clone() *Paging {
if p == nil {
return nil
}
return &Paging{
Limit: p.Limit,
PageCursor: p.PageCursor,
NextPage: p.NextPage,
PrevPage: p.PrevPage,
IncPageNavigation: p.IncPageNavigation,
IncTotal: p.IncTotal,
PageNavigation: p.PageNavigation,
Total: p.Total,
}
}
func (p *PagingCursor) Walk(fn func(string, interface{}, bool)) {
for i, key := range p.keys {
fn(key, p.values[i], p.desc[i])

View File

@ -7,12 +7,11 @@ import (
type (
// DatasourceProvider provides access to system datasources, such as ComposeRecords
DatasourceProvider interface {
// Datasource initializes and returns the Datasource the reporter can use
// Datasource initializes and returns the Datasource we can use
Datasource(context.Context, *LoadStepDefinition) (Datasource, error)
}
// Loader returns the next Frame from the Datasource
// @todo better memory reuse
// Loader returns the next Frame from the Datasource; returns nil, nil if no more
Loader func(cap int) ([]*Frame, error)
// Closer closes the Datasource
Closer func()
@ -20,6 +19,7 @@ type (
DatasourceSet []Datasource
Datasource interface {
Name() string
// Closer return argument may be omitted for some datasources
Load(context.Context, ...*FrameDefinition) (Loader, Closer, error)
Describe() FrameDescriptionSet
}

View File

@ -60,3 +60,35 @@ func (base *RowDefinition) MergeOr(merge *RowDefinition) *RowDefinition {
return rr
}
func (base *RowDefinition) Clone() (out *RowDefinition) {
if base == nil {
return
}
out = &RowDefinition{}
if base.Cells != nil {
out.Cells = make(map[string]*CellDefinition)
for k, v := range base.Cells {
out.Cells[k] = &CellDefinition{
Op: v.Op,
Value: v.Value,
}
}
}
if base.And != nil {
out.And = make([]*RowDefinition, len(base.And))
for i, def := range base.And {
out.And[i] = def.Clone()
}
}
if base.Or != nil {
out.Or = make([]*RowDefinition, len(base.Or))
for i, def := range base.Or {
out.Or[i] = def.Clone()
}
}
return
}

View File

@ -37,9 +37,12 @@ type (
frameCellCaster func(in interface{}) (expr.TypedValue, error)
FrameColumnSet []*FrameColumn
FrameColumn struct {
Name string `json:"name"`
Label string `json:"label"`
Kind string `json:"kind"`
Name string `json:"name"`
Label string `json:"label"`
Kind string `json:"kind"`
Primary bool `json:"primary"`
Unique bool `json:"unique"`
Caster frameCellCaster `json:"-"`
}
@ -65,10 +68,6 @@ type (
}
)
const (
columnWildcard = "*"
)
func MakeColumnOfKind(k string) *FrameColumn {
return &FrameColumn{
Kind: k,
@ -150,32 +149,6 @@ func (b CellDefinition) OpToCmp() string {
}
}
// Slice in place
func (f *Frame) Slice(startIndex, size int) (a, b *Frame) {
a = &Frame{
Name: f.Name,
Source: f.Source,
Ref: f.Ref,
RefValue: f.RefValue,
RelColumn: f.RelColumn,
Columns: f.Columns,
}
b = &Frame{
Name: f.Name,
Source: f.Source,
Ref: f.Ref,
RefValue: f.RefValue,
RelColumn: f.RelColumn,
Columns: f.Columns,
}
a.Rows = f.Rows[startIndex:size]
b.Rows = f.Rows[size:]
return a, b
}
// With guard element
func (f *Frame) WalkRowsG(cb func(i int, r FrameRow) error) (err error) {
err = f.WalkRows(cb)
@ -285,6 +258,33 @@ func (f *Frame) String() string {
return out
}
func (f *Frame) CollectCursorValues(r FrameRow, cc ...*filter.SortExpr) *filter.PagingCursor {
// @todo pk and unique things; how should we do it?
cursor := &filter.PagingCursor{LThen: filter.SortExprSet(cc).Reversed()}
for _, c := range cc {
// the check for existence should be performed way in advanced so we won't bother here
cursor.Set(c.Column, r[f.Columns.Find(c.Column)].Get(), c.Descending)
}
return cursor
}
func (cc FrameColumnSet) Clone() (out FrameColumnSet) {
out = make(FrameColumnSet, len(cc))
for i, c := range cc {
out[i] = &FrameColumn{
Name: c.Name,
Label: c.Label,
Kind: c.Kind,
Caster: c.Caster,
}
}
return
}
func (cc FrameColumnSet) Find(name string) int {
for i, c := range cc {
if c.Name == name {
@ -341,6 +341,19 @@ func (r FrameRow) ToVars(cc FrameColumnSet) (vv *expr.Vars, err error) {
return
}
func (f *FrameDefinition) Clone() (out *FrameDefinition) {
return &FrameDefinition{
Name: f.Name,
Source: f.Source,
Ref: f.Ref,
Rows: f.Rows.Clone(),
Columns: f.Columns.Clone(),
Paging: f.Paging.Clone(),
Sorting: f.Sorting.Clone(),
}
}
func (dd FrameDefinitionSet) Find(name string) *FrameDefinition {
for _, d := range dd {
if d.Name == name {

View File

@ -6,24 +6,25 @@ import (
"fmt"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/spf13/cast"
)
type (
model struct {
steps []Step
ran bool
steps []step
datasources DatasourceSet
}
// M is the model interface that should be used when trying to model the datasource
M interface {
Add(...Step) M
Add(...step) M
Run(context.Context) error
Load(context.Context, ...*FrameDefinition) ([]*Frame, error)
Describe(source string) (FrameDescriptionSet, error)
}
StepSet []Step
Step interface {
stepSet []step
step interface {
Name() string
Source() []string
Run(context.Context, ...Datasource) (Datasource, error)
@ -38,18 +39,15 @@ type (
Group *GroupStepDefinition `json:"group,omitempty"`
// @todo Transform
}
modelGraphNode struct {
step Step
ds Datasource
pp []*modelGraphNode
cc []*modelGraphNode
}
)
// Model initializes the model based on the provided sources and step definitions.
//
// Additional steps may be added after the model is constructed.
// Call `M.Run(context.Context)` to allow the model to be used for requesting data.
// Additional steps may not be added after the `M.Run(context.Context)` was called
func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*StepDefinition) (M, error) {
steps := make([]Step, 0, len(dd))
steps := make([]step, 0, len(dd))
ss := make(DatasourceSet, 0, len(steps)*2)
err := func() error {
@ -62,7 +60,7 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St
s, ok := sources[d.Load.Source]
if !ok {
return fmt.Errorf("unresolved data source: %s", d.Load.Source)
return fmt.Errorf("unresolved datasource: %s", d.Load.Source)
}
ds, err := s.Datasource(ctx, d.Load)
if err != nil {
@ -80,7 +78,7 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St
// @todo Transform
default:
return errors.New("malformed step definition")
return errors.New("malformed step definition: unsupported step kind")
}
}
return nil
@ -96,348 +94,180 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St
}, nil
}
func (m *model) Add(ss ...Step) M {
// Add adds additional steps to the model
func (m *model) Add(ss ...step) M {
m.steps = append(m.steps, ss...)
return m
}
// Run bakes the model configuration and makes the requested data available
func (m *model) Run(ctx context.Context) (err error) {
// initial validation
err = m.validateModel()
if err != nil {
return fmt.Errorf("failed to validate the model: %w", err)
}
const errPfx = "failed to run the model"
defer func() {
m.ran = true
}()
// initial validation
err = func() (err error) {
if m.ran {
return errors.New("model already ran")
}
if len(m.steps)+len(m.datasources) == 0 {
return errors.New("no model steps defined")
}
for _, s := range m.steps {
err = s.Validate()
if err != nil {
return err
}
}
// nothing left to do
if len(m.steps) == 0 {
return nil
}()
if err != nil {
return fmt.Errorf("%s: failed to validate the model: %w", errPfx, err)
}
// construct the step graph
gg, err := m.buildStepGraph(m.steps, m.datasources)
if err != nil {
return err
//
// If there are no steps, there is nothing to reduce
if len(m.steps) == 0 {
return nil
}
m.datasources = nil
for _, n := range gg {
aux, err := m.reduceGraph(ctx, n)
err = func() (err error) {
gg, err := m.buildStepGraph(m.steps, m.datasources)
if err != nil {
return err
}
m.datasources = append(m.datasources, aux)
m.datasources = nil
for _, n := range gg {
aux, err := m.reduceGraph(ctx, n)
if err != nil {
return err
}
m.datasources = append(m.datasources, aux)
}
return nil
}()
if err != nil {
return fmt.Errorf("%s: %w", errPfx, err)
}
return nil
}
// Describe returns the descriptions for the requested model datasources
//
// The Run method must be called before the description can be provided.
func (m *model) Describe(source string) (out FrameDescriptionSet, err error) {
ds := m.datasources.Find(source)
if ds == nil {
return nil, fmt.Errorf("model does not contain the datasource: %s", source)
var ds Datasource
err = func() error {
if !m.ran {
return fmt.Errorf("model was not yet ran")
}
ds := m.datasources.Find(source)
if ds == nil {
return fmt.Errorf("model does not contain the datasource: %s", source)
}
return nil
}()
if err != nil {
return nil, fmt.Errorf("unable to describe the model source: %w", err)
}
return ds.Describe(), nil
}
func (m *model) Load(ctx context.Context, dd ...*FrameDefinition) ([]*Frame, error) {
var err error
for _, d := range dd {
err = m.applyPaging(d, d.Paging, d.Sorting)
if err != nil {
return nil, err
}
}
// @todo variable root def
def := dd[0]
ds := m.datasources.Find(def.Source)
if ds == nil {
return nil, fmt.Errorf("unresolved source: %s", def.Source)
}
l, c, err := ds.Load(ctx, dd...)
if err != nil {
return nil, err
}
defer c()
i := 0
if def.Paging != nil && def.Paging.Limit > 0 {
i = int(def.Paging.Limit)
} else {
i = -1
}
ff, err := l(i + 1)
if err != nil {
return nil, err
}
dds := FrameDefinitionSet(dd)
for _, f := range ff {
def = dds.FindBySourceRef(f.Source, f.Ref)
if def == nil {
return nil, fmt.Errorf("unable to find frame definition for frame: src-%s, ref-%s", f.Source, f.Ref)
}
// ff[i], err = m.calculatePaging(f, def.Paging, def.Sorting)
// if err != nil {
// return nil, err
// }
}
return ff, err
}
func (m *model) calculatePaging(f *Frame, p *filter.Paging, ss filter.SortExprSet) (*Frame, error) {
if p == nil {
p = &filter.Paging{}
}
// Load returns the Frames based on the provided FrameDefinitions
//
// The Run method must be called before the frames can be provided.
func (m *model) Load(ctx context.Context, dd ...*FrameDefinition) (ff []*Frame, err error) {
var (
hasPrev = p.PageCursor != nil
hasNext = f.Size() > int(p.Limit)
out = &filter.Paging{}
def *FrameDefinition
ds Datasource
)
out.Limit = p.Limit
// request validation
err = func() error {
// - all frame definitions must define the same datasource; call Load multiple times if
// you need to access multiple datasources
for i, d := range dd {
if i == 0 {
continue
}
if d.Source != dd[i-1].Source {
return fmt.Errorf("frame definition source missmatch: expected %s, got %s", dd[i-1].Source, d.Source)
}
}
if hasNext {
f, _ = f.Slice(0, f.Size()-1)
out.NextPage = m.calculatePageCursor(f.LastRow(), f.Columns, ss)
}
def = dd[0]
if hasPrev {
out.PrevPage = m.calculatePageCursor(f.FirstRow(), f.Columns, ss)
}
ds = m.datasources.Find(def.Source)
if ds == nil {
return fmt.Errorf("unresolved datasource: %s", def.Source)
}
f.Paging = out
f.Sorting = &filter.Sorting{
Sort: ss,
}
return f, nil
}
func (m *model) calculatePageCursor(r FrameRow, cc FrameColumnSet, ss filter.SortExprSet) *filter.PagingCursor {
out := &filter.PagingCursor{LThen: ss.Reversed()}
for _, s := range ss {
ci := cc.Find(s.Column)
out.Set(s.Column, r[ci].Get(), s.Descending)
}
return out
}
func (m *model) applyPaging(def *FrameDefinition, p *filter.Paging, ss filter.SortExprSet) (err error) {
if p == nil {
return nil
}
ss, err = p.PageCursor.Sort(ss)
}()
if err != nil {
return err
return nil, fmt.Errorf("unable to load frames: invalid request: %w", err)
}
// @todo somesort of a primary key to avoid edgecases
sort := ss.Clone()
if p.PageCursor != nil && p.PageCursor.ROrder {
sort.Reverse()
}
def.Sorting = sort
// apply any frame definition defaults
aux := make([]*FrameDefinition, len(dd))
for i, d := range dd {
aux[i] = d.Clone()
// assure paging is always provided so we can ignore nil checks
if aux[i].Paging == nil {
aux[i].Paging = &filter.Paging{
Limit: defaultPageSize,
}
}
// assure sorting is always provided so we can ignore nil checks
if aux[i].Sorting == nil {
aux[i].Sorting = filter.SortExprSet{}
}
}
dd = aux
// assure paging is always provided so we can ignore nil checks
if def.Paging == nil {
def.Paging = &filter.Paging{
Limit: defaultPageSize,
}
}
// assure sorting is always provided so we can ignore nil checks
if def.Sorting == nil {
def.Sorting = filter.SortExprSet{}
}
// load the data
err = func() error {
l, c, err := ds.Load(ctx, dd...)
if err != nil {
return err
}
defer c()
ff, err = l(int(def.Paging.Limit))
if err != nil {
return err
}
// convert cursor to rows def
if p.PageCursor == nil {
return nil
}()
if err != nil {
return nil, fmt.Errorf("unable to load frames: %w", err)
}
rd := &RowDefinition{
Cells: make(map[string]*CellDefinition),
}
kk := p.PageCursor.Keys()
vv := p.PageCursor.Values()
for i, k := range kk {
v, err := cast.ToStringE(vv[i])
if err != nil {
return err
}
lt := p.PageCursor.Desc()[i]
if p.PageCursor.IsROrder() {
lt = !lt
}
op := ""
if lt {
op = "lt"
} else {
op = "gt"
}
rd.Cells[k] = &CellDefinition{
Op: op,
Value: fmt.Sprintf("'%s'", v),
}
}
def.Rows = rd.MergeAnd(def.Rows)
return nil
}
func (m *model) validateModel() error {
if len(m.steps)+len(m.datasources) == 0 {
return errors.New("no model steps defined")
}
var err error
for _, s := range m.steps {
err = s.Validate()
if err != nil {
return err
}
}
return nil
}
func (m *model) buildStepGraph(ss StepSet, dd DatasourceSet) ([]*modelGraphNode, error) {
mp := make(map[string]*modelGraphNode)
for _, s := range ss {
s := s
// make sure that the step is in the graph
n, ok := mp[s.Name()]
if !ok {
n = &modelGraphNode{
step: s,
}
mp[s.Name()] = n
} else {
n.step = s
}
// make sure the child step is in there
for _, src := range s.Source() {
c, ok := mp[src]
if !ok {
c = &modelGraphNode{
// will be added later
step: nil,
pp: []*modelGraphNode{n},
ds: dd.Find(src),
}
mp[src] = c
}
n.cc = append(n.cc, c)
}
}
// return all of the root nodes
out := make([]*modelGraphNode, 0, len(ss))
for _, n := range mp {
if len(n.pp) == 0 {
out = append(out, n)
}
}
return out, nil
}
func (m *model) reduceGraph(ctx context.Context, n *modelGraphNode) (out Datasource, err error) {
auxO := make([]Datasource, len(n.cc))
if len(n.cc) > 0 {
for i, c := range n.cc {
out, err = m.reduceGraph(ctx, c)
if err != nil {
return nil, err
}
auxO[i] = out
}
}
bail := func() (out Datasource, err error) {
if n.step == nil {
if n.ds != nil {
return n.ds, nil
}
return out, nil
}
aux, err := n.step.Run(ctx, auxO...)
if err != nil {
return nil, err
}
return aux, nil
}
if n.step == nil {
return bail()
}
// check if this one can reduce the existing datasources
//
// for now, only "simple branches are supported"
var o Datasource
if len(auxO) > 1 {
return bail()
} else if len(auxO) > 0 {
// use the only available output
o = auxO[0]
} else {
// use own datasource (in case of leaves nodes)
o = n.ds
}
if n.step.Def().Group != nil {
gds, ok := o.(GroupableDatasource)
if !ok {
return bail()
}
ok, err = gds.Group(n.step.Def().Group.GroupDefinition, n.step.Name())
if err != nil {
return nil, err
} else if !ok {
return bail()
}
out = gds
// we've covered this step with the child step; ignore it
return out, nil
}
return bail()
}
// @todo cleanup the bellow two?
func (sd *StepDefinition) source() string {
switch {
case sd.Load != nil:
return sd.Load.Source
case sd.Group != nil:
return sd.Group.Source
// @todo Transform
default:
return ""
}
}
func (sd *StepDefinition) name() string {
switch {
case sd.Load != nil:
return sd.Load.Name
case sd.Group != nil:
return sd.Group.Name
// @todo Transform
default:
return ""
}
return ff, nil
}

130
pkg/report/model_graph.go Normal file
View File

@ -0,0 +1,130 @@
package report
import "context"
type (
modelGraphNode struct {
step step
ds Datasource
pp []*modelGraphNode
cc []*modelGraphNode
}
)
// Internally, the model uses a stepGraph to resolve the dependencies between the steps
// allowing us to perform some preprocessing, such as size reduction and shape validation
//
// @todo most of this might need to be done at runtime, not buildtime
func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode, error) {
mp := make(map[string]*modelGraphNode)
for _, s := range ss {
s := s
// make sure that the step is in the graph
n, ok := mp[s.Name()]
if !ok {
n = &modelGraphNode{
step: s,
}
mp[s.Name()] = n
} else {
n.step = s
}
// make sure the child step is in there
for _, src := range s.Source() {
c, ok := mp[src]
if !ok {
c = &modelGraphNode{
// will be added later
step: nil,
pp: []*modelGraphNode{n},
ds: dd.Find(src),
}
mp[src] = c
}
n.cc = append(n.cc, c)
}
}
// return all of the root nodes
out := make([]*modelGraphNode, 0, len(ss))
for _, n := range mp {
if len(n.pp) == 0 {
out = append(out, n)
}
}
return out, nil
}
func (m *model) reduceGraph(ctx context.Context, n *modelGraphNode) (out Datasource, err error) {
auxO := make([]Datasource, len(n.cc))
if len(n.cc) > 0 {
for i, c := range n.cc {
out, err = m.reduceGraph(ctx, c)
if err != nil {
return nil, err
}
auxO[i] = out
}
}
bail := func() (out Datasource, err error) {
if n.step == nil {
if n.ds != nil {
return n.ds, nil
}
return out, nil
}
aux, err := n.step.Run(ctx, auxO...)
if err != nil {
return nil, err
}
return aux, nil
}
if n.step == nil {
return bail()
}
// check if this one can reduce the existing datasources
//
// for now, only "simple branches are supported"
var o Datasource
if len(auxO) > 1 {
return bail()
} else if len(auxO) > 0 {
// use the only available output
o = auxO[0]
} else {
// use own datasource (in case of leaves nodes)
o = n.ds
}
if n.step.Def().Group != nil {
gds, ok := o.(GroupableDatasource)
if !ok {
return bail()
}
ok, err = gds.Group(n.step.Def().Group.GroupDefinition, n.step.Name())
if err != nil {
return nil, err
} else if !ok {
return bail()
}
out = gds
// we've covered this step with the child step; ignore it
return out, nil
}
return bail()
}

View File

@ -139,11 +139,10 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade
if foreignDef == nil {
return nil, fmt.Errorf("definition for foreign datasource not found: %s", d.def.ForeignSource)
}
if localDef.Paging == nil {
localDef.Paging = &filter.Paging{}
}
if foreignDef.Paging == nil {
foreignDef.Paging = &filter.Paging{}
// - page cursor on foreign datasource is not allowed
if foreignDef.Paging.PageCursor != nil {
return nil, fmt.Errorf("definition for foreign datasource may not define a page cursor")
}
// - key columns
@ -166,6 +165,11 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade
return nil, err
}
// @todo support this
if useSubSort && localDef.Paging.PageCursor != nil {
return nil, fmt.Errorf("paging not supported when the foreign datasource defines base sort")
}
if foreignDS != "" {
if foreignDS != d.foreign.Name() {
return nil, fmt.Errorf("foreign sort datasource not part of the join: %s", foreignDS)
@ -206,10 +210,8 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade
// - prepare loader, closer
mainLoader, mainCloser, err = prtDS.Partition(ctx, partitionSize, d.def.ForeignColumn, foreignDef)
} else {
mainPageCap = defaultPageSize
if localDef.Paging != nil && localDef.Paging.Limit > 0 {
mainPageCap = localDef.Paging.Limit
}
mainPageCap = localDef.Paging.Limit
// nothing special needed
mainLoader, mainCloser, err = d.base.Load(ctx, localDef)
}
@ -381,10 +383,8 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade
}
// - determine partition size
partitionSize := defaultPartitionSize
if foreignDef.Paging != nil && foreignDef.Paging.Limit > 0 {
partitionSize = foreignDef.Paging.Limit
}
// +1 for paging reasons
partitionSize := foreignDef.Paging.Limit + 1
// - prepare key pre-filter
foreignDef.Rows = d.keySliceToFilter(d.def.ForeignColumn, keys).MergeAnd(foreignDef.Rows)
@ -404,9 +404,21 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade
}
for i := range subFrames {
// meta
subFrames[i].Name = foreignDef.Name
subFrames[i].Source = foreignDef.Source
subFrames[i].Ref = foreignDef.Ref
// paging
if uint(len(subFrames[i].Rows)) >= partitionSize {
subFrames[i].Rows = subFrames[i].Rows[0 : partitionSize-1]
if subFrames[i].Paging == nil {
subFrames[i].Paging = &filter.Paging{}
}
subFrames[i].Paging.NextPage = subFrames[i].CollectCursorValues(subFrames[i].LastRow(), foreignDef.Sorting...)
subFrames[i].Paging.NextPage.LThen = foreignDef.Sorting.Reversed()
}
}
}
if err != nil {

View File

@ -2,7 +2,6 @@ package report
import (
"reflect"
"strings"
)
func isNil(i interface{}) bool {
@ -15,25 +14,3 @@ func isNil(i interface{}) bool {
}
return false
}
func dimensionOf(k string) string {
pp := strings.Split(k, ".")
if len(pp) < 2 {
return ""
}
return pp[0]
}
func columnOf(k string) string {
if k == columnWildcard {
return k
}
pp := strings.Split(k, ".")
if len(pp) < 2 {
return ""
}
return strings.Join(pp[1:], ".")
}

View File

@ -9,9 +9,11 @@ import (
"github.com/Masterminds/squirrel"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/ql"
"github.com/cortezaproject/corteza-server/pkg/report"
"github.com/cortezaproject/corteza-server/pkg/slice"
"github.com/cortezaproject/corteza-server/store/rdbms/builders"
"github.com/jmoiron/sqlx"
)
@ -30,8 +32,9 @@ type (
baseFilter *report.RowDefinition
cols report.FrameColumnSet
qBuilder squirrel.SelectBuilder
q squirrel.SelectBuilder
nestLevel int
nestLabel string
levelColumns map[string]string
}
)
@ -48,22 +51,29 @@ var (
// supportedGroupingFunctions = ...
)
// ComposeRecordDatasourceBuilder initializes and returns a datasource builder for compose record resource
//
// @todo try to make the resulting query as flat as possible
func ComposeRecordDatasourceBuilder(s *Store, module *types.Module, ld *report.LoadStepDefinition) (report.Datasource, error) {
var err error
r := &recordDatasource{
name: ld.Name,
module: module,
store: s,
cols: ld.Columns,
name: ld.Name,
module: module,
store: s,
cols: ld.Columns,
// levelColumns help us keep track of what columns are currently available
levelColumns: make(map[string]string),
}
r.qBuilder, err = r.baseQuery(ld.Rows)
r.q, err = r.baseQuery(ld.Rows)
return r, err
}
// Name returns the name we should use when referencing this datasource
//
// The name is determined from the user-specified name, or implied from the context.
func (r *recordDatasource) Name() string {
if r.name != "" {
return r.name
@ -79,12 +89,31 @@ func (r *recordDatasource) Name() string {
return r.module.Handle
}
// @todo add Transform
// @todo try to make Group and Transform use the base query
func (r *recordDatasource) Describe() report.FrameDescriptionSet {
return report.FrameDescriptionSet{
&report.FrameDescription{
Source: r.Name(),
Columns: r.cols,
},
}
}
func (r *recordDatasource) Load(ctx context.Context, dd ...*report.FrameDefinition) (l report.Loader, c report.Closer, err error) {
def := dd[0]
q, err := r.preloadQuery(def)
if err != nil {
return nil, nil, err
}
return r.load(ctx, def, q)
}
// Group instructs the datasource to provide grouped and aggregated output
func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, error) {
defer func() {
r.nestLevel++
r.nestLabel = "group"
r.name = name
}()
@ -94,13 +123,12 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e
ok = false
)
cls := r.levelColumns
auxLevelColumns := r.levelColumns
r.levelColumns = make(map[string]string)
gCols := make(report.FrameColumnSet, 0, 10)
groupCols := make(report.FrameColumnSet, 0, 10)
for _, g := range d.Keys {
auxKind, ok = cls[g.Column]
auxKind, ok = auxLevelColumns[g.Column]
if !ok {
return false, fmt.Errorf("column %s does not exist on level %d", g.Column, r.nestLevel)
}
@ -114,7 +142,7 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e
if c.Label == "" {
c.Label = c.Name
}
gCols = append(gCols, c)
groupCols = append(groupCols, c)
r.levelColumns[g.Name] = auxKind
q = q.Column(fmt.Sprintf("%s as `%s`", g.Column, g.Name)).
@ -130,7 +158,7 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e
return false, fmt.Errorf("the aggregation function is required when the column is omitted")
}
} else {
auxKind, ok = cls[c.Column]
auxKind, ok = auxLevelColumns[c.Column]
if !ok {
return false, fmt.Errorf("column %s does not exist on level %d", c.Column, r.nestLevel)
}
@ -156,46 +184,28 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e
if col.Label == "" {
col.Label = col.Name
}
gCols = append(gCols, col)
groupCols = append(groupCols, col)
r.levelColumns[c.Name] = auxKind
q = q.
Column(fmt.Sprintf("%s as `%s`", qParam, c.Name))
Column(squirrel.Alias(squirrel.Expr(qParam), c.Name))
}
if d.Rows != nil {
// @todo validate groupping functions
hh, err := r.rowFilterToString("", gCols, d.Rows)
hh, err := r.rowFilterToString("", groupCols, d.Rows)
if err != nil {
return false, err
}
q = q.Having(hh)
}
r.cols = gCols
r.qBuilder = q.FromSelect(r.qBuilder, fmt.Sprintf("l%d", r.nestLevel))
r.cols = groupCols
r.q = q.FromSelect(r.q, fmt.Sprintf("l%d", r.nestLevel))
return true, nil
}
func (r *recordDatasource) Describe() report.FrameDescriptionSet {
return report.FrameDescriptionSet{
&report.FrameDescription{
Source: r.Name(),
Columns: r.cols,
},
}
}
func (r *recordDatasource) Load(ctx context.Context, dd ...*report.FrameDefinition) (l report.Loader, c report.Closer, err error) {
def := dd[0]
q, err := r.preloadQuery(def)
if err != nil {
return nil, nil, err
}
return r.load(ctx, def, q)
}
// @todo add Transform
func (r *recordDatasource) Partition(ctx context.Context, partitionSize uint, partitionCol string, dd ...*report.FrameDefinition) (l report.Loader, c report.Closer, err error) {
def := dd[0]
@ -205,29 +215,31 @@ func (r *recordDatasource) Partition(ctx context.Context, partitionSize uint, pa
return nil, nil, err
}
var ss []string
if len(def.Sorting) > 0 {
ss, err = r.sortExpr(def.Sorting)
if err != nil {
return nil, nil, err
}
}
// the partitioning wrap
// @todo move this to the DB driver package?
// @todo squash the query a bit? try to move most of this to the base query to remove
// one sub-select
prt := squirrel.Select(fmt.Sprintf("*, row_number() over(partition by %s order by %s) as pp_rank", partitionCol, partitionCol)).
prt := squirrel.Select(fmt.Sprintf("*, row_number() over(partition by %s order by %s) as pp_rank", partitionCol, strings.Join(ss, ","))).
FromSelect(q, "partition_base")
// @odo make it better, please...
ss, err := r.sortExpr(def)
if err != nil {
return nil, nil, err
}
// the sort is already defined when partitioning so it's unneeded here
q = squirrel.Select("*").
FromSelect(prt, "partition_wrap").
Where(fmt.Sprintf("pp_rank <= %d", partitionSize)).
OrderBy(ss...)
Where(fmt.Sprintf("pp_rank <= %d", partitionSize))
return r.load(ctx, def, q)
}
func (r *recordDatasource) preloadQuery(def *report.FrameDefinition) (squirrel.SelectBuilder, error) {
q := r.qBuilder
q := r.q
// assure columns
// - undefined columns = all columns
@ -248,56 +260,59 @@ func (r *recordDatasource) preloadQuery(def *report.FrameDefinition) (squirrel.S
// when filtering/sorting, wrap the base query in a sub-select, so we don't need to
// worry about exact column names.
//
// @todo flatten the query
if def.Rows != nil || def.Sorting != nil {
wrap := squirrel.Select("*").FromSelect(q, "w_base")
q = squirrel.Select("*").FromSelect(q, "w_base")
}
// additional filtering
if def.Rows != nil {
f, err := r.rowFilterToString("", r.cols, def.Rows)
if err != nil {
return q, err
}
wrap = wrap.Where(f)
// - filtering
if def.Rows != nil {
f, err := r.rowFilterToString("", r.cols, def.Rows)
if err != nil {
return q, err
}
// additional sorting
if len(def.Sorting) > 0 {
ss, err := r.sortExpr(def)
if err != nil {
return q, err
}
wrap = wrap.OrderBy(ss...)
}
q = wrap
q = q.Where(f)
}
return q, nil
}
func (r *recordDatasource) sortExpr(def *report.FrameDefinition) ([]string, error) {
ss := make([]string, len(def.Sorting))
for i, c := range def.Sorting {
ci := r.cols.Find(c.Column)
if ci == -1 {
return nil, fmt.Errorf("sort column not resolved: %s", c.Column)
}
func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition, q squirrel.SelectBuilder) (l report.Loader, c report.Closer, err error) {
sort := def.Sorting
_, _, typeCast, err := r.store.config.CastModuleFieldToColumnType(r.cols[ci], c.Column)
if err != nil {
return nil, err
// - paging related stuff
if def.Paging.PageCursor != nil {
// Page cursor exists so we need to validate it against used sort
// To cover the case when paging cursor is set but sorting is empty, we collect the sorting instructions
// from the cursor.
// This (extracted sorting info) is then returned as part of response
if def.Sorting, err = def.Paging.PageCursor.Sort(def.Sorting); err != nil {
return nil, nil, err
}
ss[i] = r.store.config.SqlSortHandler(fmt.Sprintf(typeCast, c.Column), c.Descending)
}
return ss, nil
}
// Cloned sorting instructions for the actual sorting
// Original must be kept for cursor creation
sort = def.Sorting.Clone()
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
if def.Paging.PageCursor != nil && def.Paging.PageCursor.ROrder {
sort.Reverse()
}
if def.Paging.PageCursor != nil {
q = q.Where(builders.CursorCondition(def.Paging.PageCursor, nil))
}
if len(sort) > 0 {
ss, err := r.sortExpr(sort)
if err != nil {
return nil, nil, err
}
q = q.OrderBy(ss...)
}
func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition, q squirrel.SelectBuilder) (l report.Loader, c report.Closer, err error) {
r.rows, err = r.store.Query(ctx, q)
if err != nil {
return nil, nil, fmt.Errorf("cannot execute query: %w", err)
@ -312,30 +327,32 @@ func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition
checkCap := cap > 0
// fetch & convert the data
// Fetch & convert the data.
// Go 1 over the requested cap to be able to determine if there are
// any additional pages
i := 0
// @todo make it in place
f.Columns = def.Columns
f.Rows = make(report.FrameRowSet, 0, cap)
f.Rows = make(report.FrameRowSet, 0, cap+1)
for r.rows.Next() {
i++
err = r.Cast(r.rows, f)
err = r.cast(r.rows, f)
if err != nil {
return nil, err
}
if checkCap && i >= cap {
// If the count goes over the capacity, then we have a next page
if checkCap && i > cap {
out := []*report.Frame{f}
f = &report.Frame{}
i = 0
return out, nil
return r.calculatePaging(out, def.Sorting, uint(cap), def.Paging.PageCursor), nil
}
}
if i > 0 {
return []*report.Frame{f}, nil
return r.calculatePaging([]*report.Frame{f}, def.Sorting, uint(cap), def.Paging.PageCursor), nil
}
return nil, nil
}, func() {
@ -346,31 +363,61 @@ func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition
}, nil
}
// @todo handle those rv_ prefixes; for now omitted
// baseQuery prepares the initial SQL that will be used for data access
//
// The query includes all of the requested columns in the required types to avid the need to type cast.
func (r *recordDatasource) baseQuery(f *report.RowDefinition) (sqb squirrel.SelectBuilder, err error) {
var (
joinTpl = "compose_record_value AS %s ON (%s.record_id = crd.id AND %s.name = '%s' AND %s.deleted_at IS NULL)"
report = r.store.composeRecordsSelectBuilder().
Where("crd.deleted_at IS NULL").
Where("crd.module_id = ?", r.module.ID)
)
// Prepare all of the mod columns
// @todo make this as small as possible!
for _, f := range r.module.Fields {
report = report.LeftJoin(strings.ReplaceAll(joinTpl, "%s", f.Name)).
Column(f.Name + ".value as " + f.Name)
r.levelColumns[f.Name] = f.Kind
// - the initial set of available columns
//
// @todo at what level should the requested columns be validated?
r.nestLevel = 0
r.nestLabel = "base"
for _, c := range r.cols {
r.levelColumns[c.Name] = c.Kind
}
// - base query
sqb = r.store.SelectBuilder("compose_record AS crd").
Where("crd.deleted_at IS NULL").
Where("crd.module_id = ?", r.module.ID).
Where("crd.rel_namespace = ?", r.module.NamespaceID)
// - based on the definition, preload the columns
var (
col string
is bool
isJoined = make(map[string]bool)
)
for _, c := range r.cols {
if isJoined[c.Name] {
continue
}
isJoined[c.Name] = true
// native record columns don't need any extra handling
if col, _, is = isRealRecordCol(c.Name); is {
sqb = sqb.Column(squirrel.Alias(squirrel.Expr(col), c.Name))
continue
}
// non-native record columns need to have their type casted before use
_, _, tcp, _ := r.store.config.CastModuleFieldToColumnType(c, c.Name)
sqb = sqb.LeftJoin(strings.ReplaceAll(joinTpl, "%s", c.Name)).
Column(squirrel.Alias(squirrel.Expr(fmt.Sprintf(tcp, c.Name+".value")), c.Name))
}
// - any initial filtering we may need to do
//
// @todo better support functions and their validation.
if f != nil {
// @todo functions and function validation
parser := ql.NewParser()
parser.OnIdent = func(i ql.Ident) (ql.Ident, error) {
if _, ok := r.levelColumns[i.Value]; !ok {
return i, fmt.Errorf("column %s does not exist on level %d", i.Value, r.nestLevel)
return i, fmt.Errorf("column %s does not exist on level %d (%s)", i.Value, r.nestLevel, r.nestLabel)
}
return i, nil
@ -380,17 +427,66 @@ func (r *recordDatasource) baseQuery(f *report.RowDefinition) (sqb squirrel.Sele
if err != nil {
return sqb, err
}
astq, err := parser.ParseExpression(fl)
if err != nil {
return sqb, err
}
report = report.Where(astq.String())
sqb = sqb.Where(astq.String())
}
return report, nil
return sqb, nil
}
func (b *recordDatasource) Cast(row sqlx.ColScanner, out *report.Frame) error {
func (b *recordDatasource) calculatePaging(out []*report.Frame, sorting filter.SortExprSet, limit uint, cursor *filter.PagingCursor) []*report.Frame {
for _, o := range out {
var (
hasPrev = cursor != nil
hasNext bool
ignoreLimit = limit == 0
reversedOrder = cursor != nil && cursor.ROrder
)
hasNext = uint(len(o.Rows)) > limit
if !ignoreLimit && uint(len(o.Rows)) > limit {
o.Rows = o.Rows[:limit]
}
if reversedOrder {
// Fetched set needs to be reversed because we've forced a descending order to get the previous page
for i, j := 0, len(o.Rows)-1; i < j; i, j = i+1, j-1 {
o.Rows[i], o.Rows[j] = o.Rows[j], o.Rows[i]
}
// when in reverse-order rules on what cursor to return change
hasPrev, hasNext = hasNext, hasPrev
}
if ignoreLimit {
return out
}
if hasPrev {
o.Paging = &filter.Paging{}
o.Paging.PrevPage = o.CollectCursorValues(o.FirstRow(), sorting...)
o.Paging.PrevPage.ROrder = true
o.Paging.PrevPage.LThen = !sorting.Reversed()
}
if hasNext {
if o.Paging == nil {
o.Paging = &filter.Paging{}
}
o.Paging.NextPage = o.CollectCursorValues(o.LastRow(), sorting...)
o.Paging.NextPage.LThen = sorting.Reversed()
}
}
return out
}
func (b *recordDatasource) cast(row sqlx.ColScanner, out *report.Frame) error {
var err error
aux := make(map[string]interface{})
if err = sqlx.MapScan(row, aux); err != nil {
@ -537,3 +633,22 @@ func isNil(i interface{}) bool {
}
return false
}
func (r *recordDatasource) sortExpr(sorting filter.SortExprSet) ([]string, error) {
ss := make([]string, len(sorting))
for i, c := range sorting {
ci := r.cols.Find(c.Column)
if ci == -1 {
return nil, fmt.Errorf("sort column not resolved: %s", c.Column)
}
_, _, typeCast, err := r.store.config.CastModuleFieldToColumnType(r.cols[ci], c.Column)
if err != nil {
return nil, err
}
ss[i] = r.store.config.SqlSortHandler(fmt.Sprintf(typeCast, c.Column), c.Descending)
}
return ss, nil
}

View File

@ -112,4 +112,74 @@ func TestReporterGrouping(t *testing.T) {
return nil
})
})
t.Run("paging", func(t *testing.T) {
fd.Sorting = filter.SortExprSet{
&filter.SortExpr{Column: "by_name", Descending: false},
}
fd.Paging = &filter.Paging{
Limit: 4,
}
// ^ going up ^
rr, err := model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r := rr[0]
h.a.NotNil(r.Paging)
h.a.NotNil(r.Paging.NextPage)
h.a.Nil(r.Paging.PrevPage)
h.a.Equal(4, r.Size())
req := []string{
"Engel, 3, 179",
"Manu, 1, 61",
"Maria, 3, 183",
"Sascha, 1, 38",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Equal(req[i], r.String())
return nil
})
fd.Paging.PageCursor = r.Paging.NextPage
rr, err = model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r = rr[0]
h.a.NotNil(r.Paging)
h.a.Nil(r.Paging.NextPage)
h.a.NotNil(r.Paging.PrevPage)
h.a.Equal(2, r.Size())
req = []string{
"Sigi, 1, 67",
"Ulli, 3, 122",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Equal(req[i], r.String())
return nil
})
// v going down v
fd.Paging.PageCursor = r.Paging.PrevPage
rr, err = model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r = rr[0]
h.a.NotNil(r.Paging)
h.a.NotNil(r.Paging.NextPage)
h.a.Nil(r.Paging.PrevPage)
h.a.Equal(4, r.Size())
req = []string{
"Engel, 3, 179",
"Manu, 1, 61",
"Maria, 3, 183",
"Sascha, 1, 38",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Equal(req[i], r.String())
return nil
})
})
}

View File

@ -493,6 +493,331 @@ func TestReporterJoining(t *testing.T) {
return nil
})
})
t.Run("paging", func(t *testing.T) {
rp.Frames[0].Paging = &filter.Paging{
Limit: 5,
}
rp.Frames[0].Sorting = filter.SortExprSet{
&filter.SortExpr{Column: "join_key", Descending: false},
}
rp.Frames[1].Paging = &filter.Paging{
Limit: 2,
}
rp.Frames[1].Sorting = filter.SortExprSet{
&filter.SortExpr{Column: "name", Descending: false},
}
// ^ going up ^
// // // PAGE 1
rr, err := model.Load(ctx, rp.Frames...)
h.a.NoError(err)
h.a.Len(rr, 4)
local := rr[0]
ix := indexJoinedResult(rr)
_ = ix
// local
h.a.Equal(5, local.Size())
h.a.NotNil(local.Paging)
h.a.NotNil(local.Paging.NextPage)
h.a.Nil(local.Paging.PrevPage)
req := []string{
", Engel_Kempf, Engel, Kempf",
", Engel_Kiefer, Engel, Kiefer",
", Engel_Loritz, Engel, Loritz",
", Manu_Specht, Manu, Specht",
", Maria_Krüger, Maria, Krüger",
}
local.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Manu_Specht
f := ix["Manu_Specht"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Manu_Specht, u10 j1, d, 45, 56",
", Manu_Specht, u10 j2, c, 83, 70",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Engel_Kiefer
f = ix["Engel_Kiefer"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Engel_Kiefer, u12 j1, a, 42, 69",
", Engel_Kiefer, u12 j10, c, 79, 25",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Engel_Loritz
f = ix["Engel_Loritz"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Engel_Loritz, u3 j1, a, 10, 1",
", Engel_Loritz, u3 j2, a, 0, 0",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// // // PAGE 2
rp.Frames[0].Paging.PageCursor = local.Paging.NextPage
rr, err = model.Load(ctx, rp.Frames...)
h.a.NoError(err)
h.a.Len(rr, 4)
local = rr[0]
ix = indexJoinedResult(rr)
_ = ix
// local
h.a.Equal(5, local.Size())
h.a.NotNil(local.Paging)
h.a.NotNil(local.Paging.NextPage)
h.a.NotNil(local.Paging.PrevPage)
req = []string{
", Maria_Königsmann, Maria, Königsmann",
", Maria_Spannagel, Maria, Spannagel",
", Sascha_Jans, Sascha, Jans",
", Sigi_Goldschmidt, Sigi, Goldschmidt",
", Ulli_Böhler, Ulli, Böhler",
", Ulli_Förstner, Ulli, Förstner",
}
local.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Maria_Königsmann
f = ix["Maria_Königsmann"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Maria_Königsmann, u1 j1, a, 10, 2",
", Maria_Königsmann, u1 j2, b, 20, 5",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Engel_Kiefer
f = ix["Ulli_Böhler"]
h.a.NotNil(f)
h.a.Nil(f.Paging)
req = []string{
", Ulli_Böhler, u5 j1, a, 1, 2",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Sigi_Goldschmidt
f = ix["Sigi_Goldschmidt"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Sigi_Goldschmidt, u7 j1, d, 10, 29",
", Sigi_Goldschmidt, u7 j2, a, 10, 21",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// // // PAGE 3
rp.Frames[0].Paging.PageCursor = local.Paging.NextPage
rr, err = model.Load(ctx, rp.Frames...)
h.a.NoError(err)
h.a.Len(rr, 1)
local = rr[0]
ix = indexJoinedResult(rr)
_ = ix
// local
h.a.Equal(2, local.Size())
h.a.NotNil(local.Paging)
h.a.Nil(local.Paging.NextPage)
h.a.NotNil(local.Paging.PrevPage)
req = []string{
", Ulli_Förstner, Ulli, Förstner",
", Ulli_Haupt, Ulli, Haupt",
}
local.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// v going down v
// // // PAGE 2
rp.Frames[0].Paging.PageCursor = local.Paging.PrevPage
rr, err = model.Load(ctx, rp.Frames...)
h.a.NoError(err)
h.a.Len(rr, 4)
local = rr[0]
ix = indexJoinedResult(rr)
_ = ix
// local
h.a.Equal(5, local.Size())
h.a.NotNil(local.Paging)
h.a.NotNil(local.Paging.NextPage)
h.a.NotNil(local.Paging.PrevPage)
req = []string{
", Maria_Königsmann, Maria, Königsmann",
", Maria_Spannagel, Maria, Spannagel",
", Sascha_Jans, Sascha, Jans",
", Sigi_Goldschmidt, Sigi, Goldschmidt",
", Ulli_Böhler, Ulli, Böhler",
", Ulli_Förstner, Ulli, Förstner",
}
local.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Maria_Königsmann
f = ix["Maria_Königsmann"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Maria_Königsmann, u1 j1, a, 10, 2",
", Maria_Königsmann, u1 j2, b, 20, 5",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Engel_Kiefer
f = ix["Ulli_Böhler"]
h.a.NotNil(f)
h.a.Nil(f.Paging)
req = []string{
", Ulli_Böhler, u5 j1, a, 1, 2",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Sigi_Goldschmidt
f = ix["Sigi_Goldschmidt"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Sigi_Goldschmidt, u7 j1, d, 10, 29",
", Sigi_Goldschmidt, u7 j2, a, 10, 21",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// // // PAGE 1
rp.Frames[0].Paging.PageCursor = local.Paging.PrevPage
rr, err = model.Load(ctx, rp.Frames...)
h.a.NoError(err)
h.a.Len(rr, 4)
local = rr[0]
ix = indexJoinedResult(rr)
_ = ix
// local
h.a.Equal(5, local.Size())
h.a.NotNil(local.Paging)
h.a.NotNil(local.Paging.NextPage)
h.a.Nil(local.Paging.PrevPage)
req = []string{
", Engel_Kempf, Engel, Kempf",
", Engel_Kiefer, Engel, Kiefer",
", Engel_Loritz, Engel, Loritz",
", Manu_Specht, Manu, Specht",
", Maria_Krüger, Maria, Krüger",
}
local.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Manu_Specht
f = ix["Manu_Specht"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Manu_Specht, u10 j1, d, 45, 56",
", Manu_Specht, u10 j2, c, 83, 70",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Engel_Kiefer
f = ix["Engel_Kiefer"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Engel_Kiefer, u12 j1, a, 42, 69",
", Engel_Kiefer, u12 j10, c, 79, 25",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// Engel_Loritz
f = ix["Engel_Loritz"]
h.a.NotNil(f)
h.a.NotNil(f.Paging)
h.a.NotNil(f.Paging.NextPage)
h.a.Nil(f.Paging.PrevPage)
req = []string{
", Engel_Loritz, u3 j1, a, 10, 1",
", Engel_Loritz, u3 j2, a, 0, 0",
}
f.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
})
}
func indexJoinedResult(ff []*report.Frame) map[string]*report.Frame {

View File

@ -124,6 +124,129 @@ func TestReporterLoading(t *testing.T) {
return nil
})
})
t.Run("paging", func(t *testing.T) {
fd.Paging = &filter.Paging{
Limit: 5,
}
fd.Sorting = filter.SortExprSet{
&filter.SortExpr{Column: "join_key", Descending: false},
}
fd.Columns = report.FrameColumnSet{
&report.FrameColumn{Name: "id", Kind: "Record"},
&report.FrameColumn{Name: "join_key", Kind: "String"},
&report.FrameColumn{Name: "first_name", Kind: "String"},
}
// ^ going up ^
rr, err := model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r := rr[0]
h.a.NotNil(r.Paging)
h.a.NotNil(r.Paging.NextPage)
h.a.Nil(r.Paging.PrevPage)
h.a.Equal(5, r.Size())
// omit the ID's because they are generated
req := []string{
", Engel_Kempf, Engel",
", Engel_Kiefer, Engel",
", Engel_Loritz, Engel",
", Manu_Specht, Manu",
", Maria_Krüger, Maria",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
fd.Paging.PageCursor = r.Paging.NextPage
rr, err = model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r = rr[0]
h.a.NotNil(r.Paging)
h.a.NotNil(r.Paging.NextPage)
h.a.NotNil(r.Paging.PrevPage)
h.a.Equal(5, r.Size())
req = []string{
", Maria_Königsmann, Maria",
", Maria_Spannagel, Maria",
", Sascha_Jans, Sascha",
", Sigi_Goldschmidt, Sigi",
", Ulli_Böhler, Ulli",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
fd.Paging.PageCursor = r.Paging.NextPage
rr, err = model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r = rr[0]
h.a.NotNil(r.Paging)
h.a.Nil(r.Paging.NextPage)
h.a.NotNil(r.Paging.PrevPage)
h.a.Equal(2, r.Size())
req = []string{
", Ulli_Förstner, Ulli",
", Ulli_Haupt, Ulli",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
// v going down v
fd.Paging.PageCursor = r.Paging.PrevPage
rr, err = model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r = rr[0]
h.a.NotNil(r.Paging)
h.a.NotNil(r.Paging.NextPage)
h.a.NotNil(r.Paging.PrevPage)
h.a.Equal(5, r.Size())
req = []string{
", Maria_Königsmann, Maria",
", Maria_Spannagel, Maria",
", Sascha_Jans, Sascha",
", Sigi_Goldschmidt, Sigi",
", Ulli_Böhler, Ulli",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
fd.Paging.PageCursor = r.Paging.PrevPage
rr, err = model.Load(ctx, fd)
h.a.NoError(err)
h.a.Len(rr, 1)
r = rr[0]
h.a.NotNil(r.Paging)
h.a.NotNil(r.Paging.NextPage)
h.a.Nil(r.Paging.PrevPage)
h.a.Equal(5, r.Size())
req = []string{
", Engel_Kempf, Engel",
", Engel_Kiefer, Engel",
", Engel_Loritz, Engel",
", Manu_Specht, Manu",
", Maria_Krüger, Maria",
}
r.WalkRows(func(i int, r report.FrameRow) error {
h.a.Contains(r.String(), req[i])
return nil
})
})
}
func prepare(t *testing.T, report string) (context.Context, helper, store.Storer, *auxReport) {