From 9872d92c955e1a9d7a6d18cd808bbfb8f9778d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Fri, 23 Jul 2021 16:23:53 +0200 Subject: [PATCH] Add support for paging & slight refactor --- compose/service/record_datasource.go | 2 + pkg/filter/pagination.go | 17 + pkg/report/datasource.go | 6 +- .../{datasource_filtering.go => filtering.go} | 32 ++ pkg/report/frame.go | 79 +-- pkg/report/model.go | 472 ++++++------------ pkg/report/model_graph.go | 130 +++++ pkg/report/step_join.go | 38 +- pkg/report/util.go | 23 - store/rdbms/compose_record_datasource.go | 335 +++++++++---- tests/reporter/grouping_test.go | 70 +++ tests/reporter/joining_test.go | 325 ++++++++++++ tests/reporter/loading_test.go | 123 +++++ 13 files changed, 1149 insertions(+), 503 deletions(-) rename pkg/report/{datasource_filtering.go => filtering.go} (66%) create mode 100644 pkg/report/model_graph.go diff --git a/compose/service/record_datasource.go b/compose/service/record_datasource.go index 52ae2d301..4e34cba16 100644 --- a/compose/service/record_datasource.go +++ b/compose/service/record_datasource.go @@ -75,6 +75,8 @@ func (svc record) Datasource(ctx context.Context, ld *report.LoadStepDefinition) c = report.MakeColumnOfKind("Record") c.Name = "id" c.Label = "Record ID" + c.Primary = true + c.Unique = true cols = append(cols, c) for _, f := range mod.Fields { diff --git a/pkg/filter/pagination.go b/pkg/filter/pagination.go index a510495f6..2836c7022 100644 --- a/pkg/filter/pagination.go +++ b/pkg/filter/pagination.go @@ -71,6 +71,23 @@ func NewPaging(limit uint, cursor string) (p Paging, err error) { return } +func (p *Paging) Clone() *Paging { + if p == nil { + return nil + } + + return &Paging{ + Limit: p.Limit, + PageCursor: p.PageCursor, + NextPage: p.NextPage, + PrevPage: p.PrevPage, + IncPageNavigation: p.IncPageNavigation, + IncTotal: p.IncTotal, + PageNavigation: p.PageNavigation, + Total: p.Total, + } +} + func (p *PagingCursor) Walk(fn func(string, interface{}, bool)) { for i, key := range p.keys { fn(key, p.values[i], p.desc[i]) diff --git a/pkg/report/datasource.go b/pkg/report/datasource.go index 9faf73af5..714cf4183 100644 --- a/pkg/report/datasource.go +++ b/pkg/report/datasource.go @@ -7,12 +7,11 @@ import ( type ( // DatasourceProvider provides access to system datasources, such as ComposeRecords DatasourceProvider interface { - // Datasource initializes and returns the Datasource the reporter can use + // Datasource initializes and returns the Datasource we can use Datasource(context.Context, *LoadStepDefinition) (Datasource, error) } - // Loader returns the next Frame from the Datasource - // @todo better memory reuse + // Loader returns the next Frame from the Datasource; returns nil, nil if no more Loader func(cap int) ([]*Frame, error) // Closer closes the Datasource Closer func() @@ -20,6 +19,7 @@ type ( DatasourceSet []Datasource Datasource interface { Name() string + // Closer return argument may be omitted for some datasources Load(context.Context, ...*FrameDefinition) (Loader, Closer, error) Describe() FrameDescriptionSet } diff --git a/pkg/report/datasource_filtering.go b/pkg/report/filtering.go similarity index 66% rename from pkg/report/datasource_filtering.go rename to pkg/report/filtering.go index 1ecd69956..5ff29bb9b 100644 --- a/pkg/report/datasource_filtering.go +++ b/pkg/report/filtering.go @@ -60,3 +60,35 @@ func (base *RowDefinition) MergeOr(merge *RowDefinition) *RowDefinition { return rr } + +func (base *RowDefinition) Clone() (out *RowDefinition) { + if base == nil { + return + } + out = &RowDefinition{} + + if base.Cells != nil { + out.Cells = make(map[string]*CellDefinition) + for k, v := range base.Cells { + out.Cells[k] = &CellDefinition{ + Op: v.Op, + Value: v.Value, + } + } + } + + if base.And != nil { + out.And = make([]*RowDefinition, len(base.And)) + for i, def := range base.And { + out.And[i] = def.Clone() + } + } + if base.Or != nil { + out.Or = make([]*RowDefinition, len(base.Or)) + for i, def := range base.Or { + out.Or[i] = def.Clone() + } + } + + return +} diff --git a/pkg/report/frame.go b/pkg/report/frame.go index 1c4d409b3..1baf9311c 100644 --- a/pkg/report/frame.go +++ b/pkg/report/frame.go @@ -37,9 +37,12 @@ type ( frameCellCaster func(in interface{}) (expr.TypedValue, error) FrameColumnSet []*FrameColumn FrameColumn struct { - Name string `json:"name"` - Label string `json:"label"` - Kind string `json:"kind"` + Name string `json:"name"` + Label string `json:"label"` + Kind string `json:"kind"` + Primary bool `json:"primary"` + Unique bool `json:"unique"` + Caster frameCellCaster `json:"-"` } @@ -65,10 +68,6 @@ type ( } ) -const ( - columnWildcard = "*" -) - func MakeColumnOfKind(k string) *FrameColumn { return &FrameColumn{ Kind: k, @@ -150,32 +149,6 @@ func (b CellDefinition) OpToCmp() string { } } -// Slice in place -func (f *Frame) Slice(startIndex, size int) (a, b *Frame) { - a = &Frame{ - Name: f.Name, - Source: f.Source, - Ref: f.Ref, - RefValue: f.RefValue, - RelColumn: f.RelColumn, - - Columns: f.Columns, - } - b = &Frame{ - Name: f.Name, - Source: f.Source, - Ref: f.Ref, - RefValue: f.RefValue, - RelColumn: f.RelColumn, - - Columns: f.Columns, - } - - a.Rows = f.Rows[startIndex:size] - b.Rows = f.Rows[size:] - return a, b -} - // With guard element func (f *Frame) WalkRowsG(cb func(i int, r FrameRow) error) (err error) { err = f.WalkRows(cb) @@ -285,6 +258,33 @@ func (f *Frame) String() string { return out } +func (f *Frame) CollectCursorValues(r FrameRow, cc ...*filter.SortExpr) *filter.PagingCursor { + // @todo pk and unique things; how should we do it? + + cursor := &filter.PagingCursor{LThen: filter.SortExprSet(cc).Reversed()} + + for _, c := range cc { + // the check for existence should be performed way in advanced so we won't bother here + cursor.Set(c.Column, r[f.Columns.Find(c.Column)].Get(), c.Descending) + } + + return cursor +} + +func (cc FrameColumnSet) Clone() (out FrameColumnSet) { + out = make(FrameColumnSet, len(cc)) + for i, c := range cc { + out[i] = &FrameColumn{ + Name: c.Name, + Label: c.Label, + Kind: c.Kind, + Caster: c.Caster, + } + } + + return +} + func (cc FrameColumnSet) Find(name string) int { for i, c := range cc { if c.Name == name { @@ -341,6 +341,19 @@ func (r FrameRow) ToVars(cc FrameColumnSet) (vv *expr.Vars, err error) { return } +func (f *FrameDefinition) Clone() (out *FrameDefinition) { + return &FrameDefinition{ + Name: f.Name, + Source: f.Source, + Ref: f.Ref, + + Rows: f.Rows.Clone(), + Columns: f.Columns.Clone(), + Paging: f.Paging.Clone(), + Sorting: f.Sorting.Clone(), + } +} + func (dd FrameDefinitionSet) Find(name string) *FrameDefinition { for _, d := range dd { if d.Name == name { diff --git a/pkg/report/model.go b/pkg/report/model.go index 312530f37..39d2ca6fb 100644 --- a/pkg/report/model.go +++ b/pkg/report/model.go @@ -6,24 +6,25 @@ import ( "fmt" "github.com/cortezaproject/corteza-server/pkg/filter" - "github.com/spf13/cast" ) type ( model struct { - steps []Step + ran bool + steps []step datasources DatasourceSet } + // M is the model interface that should be used when trying to model the datasource M interface { - Add(...Step) M + Add(...step) M Run(context.Context) error Load(context.Context, ...*FrameDefinition) ([]*Frame, error) Describe(source string) (FrameDescriptionSet, error) } - StepSet []Step - Step interface { + stepSet []step + step interface { Name() string Source() []string Run(context.Context, ...Datasource) (Datasource, error) @@ -38,18 +39,15 @@ type ( Group *GroupStepDefinition `json:"group,omitempty"` // @todo Transform } - - modelGraphNode struct { - step Step - ds Datasource - - pp []*modelGraphNode - cc []*modelGraphNode - } ) +// Model initializes the model based on the provided sources and step definitions. +// +// Additional steps may be added after the model is constructed. +// Call `M.Run(context.Context)` to allow the model to be used for requesting data. +// Additional steps may not be added after the `M.Run(context.Context)` was called func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*StepDefinition) (M, error) { - steps := make([]Step, 0, len(dd)) + steps := make([]step, 0, len(dd)) ss := make(DatasourceSet, 0, len(steps)*2) err := func() error { @@ -62,7 +60,7 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St s, ok := sources[d.Load.Source] if !ok { - return fmt.Errorf("unresolved data source: %s", d.Load.Source) + return fmt.Errorf("unresolved datasource: %s", d.Load.Source) } ds, err := s.Datasource(ctx, d.Load) if err != nil { @@ -80,7 +78,7 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St // @todo Transform default: - return errors.New("malformed step definition") + return errors.New("malformed step definition: unsupported step kind") } } return nil @@ -96,348 +94,180 @@ func Model(ctx context.Context, sources map[string]DatasourceProvider, dd ...*St }, nil } -func (m *model) Add(ss ...Step) M { +// Add adds additional steps to the model +func (m *model) Add(ss ...step) M { m.steps = append(m.steps, ss...) return m } +// Run bakes the model configuration and makes the requested data available func (m *model) Run(ctx context.Context) (err error) { - // initial validation - err = m.validateModel() - if err != nil { - return fmt.Errorf("failed to validate the model: %w", err) - } + const errPfx = "failed to run the model" + defer func() { + m.ran = true + }() + + // initial validation + err = func() (err error) { + if m.ran { + return errors.New("model already ran") + } + + if len(m.steps)+len(m.datasources) == 0 { + return errors.New("no model steps defined") + } + + for _, s := range m.steps { + err = s.Validate() + if err != nil { + return err + } + } - // nothing left to do - if len(m.steps) == 0 { return nil + }() + if err != nil { + return fmt.Errorf("%s: failed to validate the model: %w", errPfx, err) } // construct the step graph - gg, err := m.buildStepGraph(m.steps, m.datasources) - if err != nil { - return err + // + // If there are no steps, there is nothing to reduce + if len(m.steps) == 0 { + return nil } - - m.datasources = nil - for _, n := range gg { - aux, err := m.reduceGraph(ctx, n) + err = func() (err error) { + gg, err := m.buildStepGraph(m.steps, m.datasources) if err != nil { return err } - m.datasources = append(m.datasources, aux) + + m.datasources = nil + for _, n := range gg { + aux, err := m.reduceGraph(ctx, n) + if err != nil { + return err + } + m.datasources = append(m.datasources, aux) + } + return nil + }() + if err != nil { + return fmt.Errorf("%s: %w", errPfx, err) } return nil } +// Describe returns the descriptions for the requested model datasources +// +// The Run method must be called before the description can be provided. func (m *model) Describe(source string) (out FrameDescriptionSet, err error) { - ds := m.datasources.Find(source) - if ds == nil { - return nil, fmt.Errorf("model does not contain the datasource: %s", source) + var ds Datasource + + err = func() error { + if !m.ran { + return fmt.Errorf("model was not yet ran") + } + + ds := m.datasources.Find(source) + if ds == nil { + return fmt.Errorf("model does not contain the datasource: %s", source) + } + + return nil + }() + if err != nil { + return nil, fmt.Errorf("unable to describe the model source: %w", err) } return ds.Describe(), nil } -func (m *model) Load(ctx context.Context, dd ...*FrameDefinition) ([]*Frame, error) { - var err error - - for _, d := range dd { - err = m.applyPaging(d, d.Paging, d.Sorting) - if err != nil { - return nil, err - } - } - - // @todo variable root def - def := dd[0] - - ds := m.datasources.Find(def.Source) - if ds == nil { - return nil, fmt.Errorf("unresolved source: %s", def.Source) - } - - l, c, err := ds.Load(ctx, dd...) - if err != nil { - return nil, err - } - defer c() - - i := 0 - if def.Paging != nil && def.Paging.Limit > 0 { - i = int(def.Paging.Limit) - } else { - i = -1 - } - - ff, err := l(i + 1) - if err != nil { - return nil, err - } - - dds := FrameDefinitionSet(dd) - for _, f := range ff { - def = dds.FindBySourceRef(f.Source, f.Ref) - if def == nil { - return nil, fmt.Errorf("unable to find frame definition for frame: src-%s, ref-%s", f.Source, f.Ref) - } - - // ff[i], err = m.calculatePaging(f, def.Paging, def.Sorting) - // if err != nil { - // return nil, err - // } - } - - return ff, err -} - -func (m *model) calculatePaging(f *Frame, p *filter.Paging, ss filter.SortExprSet) (*Frame, error) { - if p == nil { - p = &filter.Paging{} - } - +// Load returns the Frames based on the provided FrameDefinitions +// +// The Run method must be called before the frames can be provided. +func (m *model) Load(ctx context.Context, dd ...*FrameDefinition) (ff []*Frame, err error) { var ( - hasPrev = p.PageCursor != nil - hasNext = f.Size() > int(p.Limit) - out = &filter.Paging{} + def *FrameDefinition + ds Datasource ) - out.Limit = p.Limit + // request validation + err = func() error { + // - all frame definitions must define the same datasource; call Load multiple times if + // you need to access multiple datasources + for i, d := range dd { + if i == 0 { + continue + } + if d.Source != dd[i-1].Source { + return fmt.Errorf("frame definition source missmatch: expected %s, got %s", dd[i-1].Source, d.Source) + } + } - if hasNext { - f, _ = f.Slice(0, f.Size()-1) - out.NextPage = m.calculatePageCursor(f.LastRow(), f.Columns, ss) - } + def = dd[0] - if hasPrev { - out.PrevPage = m.calculatePageCursor(f.FirstRow(), f.Columns, ss) - } + ds = m.datasources.Find(def.Source) + if ds == nil { + return fmt.Errorf("unresolved datasource: %s", def.Source) + } - f.Paging = out - f.Sorting = &filter.Sorting{ - Sort: ss, - } - - return f, nil -} - -func (m *model) calculatePageCursor(r FrameRow, cc FrameColumnSet, ss filter.SortExprSet) *filter.PagingCursor { - out := &filter.PagingCursor{LThen: ss.Reversed()} - - for _, s := range ss { - ci := cc.Find(s.Column) - out.Set(s.Column, r[ci].Get(), s.Descending) - } - - return out -} - -func (m *model) applyPaging(def *FrameDefinition, p *filter.Paging, ss filter.SortExprSet) (err error) { - if p == nil { return nil - } - - ss, err = p.PageCursor.Sort(ss) + }() if err != nil { - return err + return nil, fmt.Errorf("unable to load frames: invalid request: %w", err) } - // @todo somesort of a primary key to avoid edgecases - sort := ss.Clone() - if p.PageCursor != nil && p.PageCursor.ROrder { - sort.Reverse() - } - def.Sorting = sort + // apply any frame definition defaults + aux := make([]*FrameDefinition, len(dd)) + for i, d := range dd { + aux[i] = d.Clone() + + // assure paging is always provided so we can ignore nil checks + if aux[i].Paging == nil { + aux[i].Paging = &filter.Paging{ + Limit: defaultPageSize, + } + } + + // assure sorting is always provided so we can ignore nil checks + if aux[i].Sorting == nil { + aux[i].Sorting = filter.SortExprSet{} + } + } + dd = aux + + // assure paging is always provided so we can ignore nil checks + if def.Paging == nil { + def.Paging = &filter.Paging{ + Limit: defaultPageSize, + } + } + + // assure sorting is always provided so we can ignore nil checks + if def.Sorting == nil { + def.Sorting = filter.SortExprSet{} + } + + // load the data + err = func() error { + l, c, err := ds.Load(ctx, dd...) + if err != nil { + return err + } + defer c() + + ff, err = l(int(def.Paging.Limit)) + if err != nil { + return err + } - // convert cursor to rows def - if p.PageCursor == nil { return nil + }() + if err != nil { + return nil, fmt.Errorf("unable to load frames: %w", err) } - rd := &RowDefinition{ - Cells: make(map[string]*CellDefinition), - } - kk := p.PageCursor.Keys() - vv := p.PageCursor.Values() - for i, k := range kk { - v, err := cast.ToStringE(vv[i]) - if err != nil { - return err - } - - lt := p.PageCursor.Desc()[i] - if p.PageCursor.IsROrder() { - lt = !lt - } - op := "" - if lt { - op = "lt" - } else { - op = "gt" - } - - rd.Cells[k] = &CellDefinition{ - Op: op, - Value: fmt.Sprintf("'%s'", v), - } - } - def.Rows = rd.MergeAnd(def.Rows) - - return nil -} - -func (m *model) validateModel() error { - if len(m.steps)+len(m.datasources) == 0 { - return errors.New("no model steps defined") - } - - var err error - for _, s := range m.steps { - err = s.Validate() - if err != nil { - return err - } - } - - return nil -} - -func (m *model) buildStepGraph(ss StepSet, dd DatasourceSet) ([]*modelGraphNode, error) { - mp := make(map[string]*modelGraphNode) - - for _, s := range ss { - s := s - - // make sure that the step is in the graph - n, ok := mp[s.Name()] - if !ok { - n = &modelGraphNode{ - step: s, - } - mp[s.Name()] = n - } else { - n.step = s - } - - // make sure the child step is in there - for _, src := range s.Source() { - c, ok := mp[src] - if !ok { - c = &modelGraphNode{ - // will be added later - step: nil, - pp: []*modelGraphNode{n}, - - ds: dd.Find(src), - } - mp[src] = c - } - n.cc = append(n.cc, c) - } - } - - // return all of the root nodes - out := make([]*modelGraphNode, 0, len(ss)) - for _, n := range mp { - if len(n.pp) == 0 { - out = append(out, n) - } - } - - return out, nil -} - -func (m *model) reduceGraph(ctx context.Context, n *modelGraphNode) (out Datasource, err error) { - auxO := make([]Datasource, len(n.cc)) - if len(n.cc) > 0 { - for i, c := range n.cc { - out, err = m.reduceGraph(ctx, c) - if err != nil { - return nil, err - } - auxO[i] = out - } - } - - bail := func() (out Datasource, err error) { - if n.step == nil { - if n.ds != nil { - return n.ds, nil - } - - return out, nil - } - - aux, err := n.step.Run(ctx, auxO...) - if err != nil { - return nil, err - } - - return aux, nil - } - - if n.step == nil { - return bail() - } - - // check if this one can reduce the existing datasources - // - // for now, only "simple branches are supported" - var o Datasource - if len(auxO) > 1 { - return bail() - } else if len(auxO) > 0 { - // use the only available output - o = auxO[0] - } else { - // use own datasource (in case of leaves nodes) - o = n.ds - } - - if n.step.Def().Group != nil { - gds, ok := o.(GroupableDatasource) - if !ok { - return bail() - } - - ok, err = gds.Group(n.step.Def().Group.GroupDefinition, n.step.Name()) - if err != nil { - return nil, err - } else if !ok { - return bail() - } - - out = gds - // we've covered this step with the child step; ignore it - return out, nil - } - - return bail() -} - -// @todo cleanup the bellow two? - -func (sd *StepDefinition) source() string { - switch { - case sd.Load != nil: - return sd.Load.Source - case sd.Group != nil: - return sd.Group.Source - // @todo Transform - default: - return "" - } -} - -func (sd *StepDefinition) name() string { - switch { - case sd.Load != nil: - return sd.Load.Name - case sd.Group != nil: - return sd.Group.Name - // @todo Transform - default: - return "" - } + return ff, nil } diff --git a/pkg/report/model_graph.go b/pkg/report/model_graph.go new file mode 100644 index 000000000..1ee83366b --- /dev/null +++ b/pkg/report/model_graph.go @@ -0,0 +1,130 @@ +package report + +import "context" + +type ( + modelGraphNode struct { + step step + ds Datasource + + pp []*modelGraphNode + cc []*modelGraphNode + } +) + +// Internally, the model uses a stepGraph to resolve the dependencies between the steps +// allowing us to perform some preprocessing, such as size reduction and shape validation +// +// @todo most of this might need to be done at runtime, not buildtime +func (m *model) buildStepGraph(ss stepSet, dd DatasourceSet) ([]*modelGraphNode, error) { + mp := make(map[string]*modelGraphNode) + + for _, s := range ss { + s := s + + // make sure that the step is in the graph + n, ok := mp[s.Name()] + if !ok { + n = &modelGraphNode{ + step: s, + } + mp[s.Name()] = n + } else { + n.step = s + } + + // make sure the child step is in there + for _, src := range s.Source() { + c, ok := mp[src] + if !ok { + c = &modelGraphNode{ + // will be added later + step: nil, + pp: []*modelGraphNode{n}, + + ds: dd.Find(src), + } + mp[src] = c + } + n.cc = append(n.cc, c) + } + } + + // return all of the root nodes + out := make([]*modelGraphNode, 0, len(ss)) + for _, n := range mp { + if len(n.pp) == 0 { + out = append(out, n) + } + } + + return out, nil +} + +func (m *model) reduceGraph(ctx context.Context, n *modelGraphNode) (out Datasource, err error) { + auxO := make([]Datasource, len(n.cc)) + if len(n.cc) > 0 { + for i, c := range n.cc { + out, err = m.reduceGraph(ctx, c) + if err != nil { + return nil, err + } + auxO[i] = out + } + } + + bail := func() (out Datasource, err error) { + if n.step == nil { + if n.ds != nil { + return n.ds, nil + } + + return out, nil + } + + aux, err := n.step.Run(ctx, auxO...) + if err != nil { + return nil, err + } + + return aux, nil + } + + if n.step == nil { + return bail() + } + + // check if this one can reduce the existing datasources + // + // for now, only "simple branches are supported" + var o Datasource + if len(auxO) > 1 { + return bail() + } else if len(auxO) > 0 { + // use the only available output + o = auxO[0] + } else { + // use own datasource (in case of leaves nodes) + o = n.ds + } + + if n.step.Def().Group != nil { + gds, ok := o.(GroupableDatasource) + if !ok { + return bail() + } + + ok, err = gds.Group(n.step.Def().Group.GroupDefinition, n.step.Name()) + if err != nil { + return nil, err + } else if !ok { + return bail() + } + + out = gds + // we've covered this step with the child step; ignore it + return out, nil + } + + return bail() +} diff --git a/pkg/report/step_join.go b/pkg/report/step_join.go index fe3898a6c..8ea73c7cc 100644 --- a/pkg/report/step_join.go +++ b/pkg/report/step_join.go @@ -139,11 +139,10 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade if foreignDef == nil { return nil, fmt.Errorf("definition for foreign datasource not found: %s", d.def.ForeignSource) } - if localDef.Paging == nil { - localDef.Paging = &filter.Paging{} - } - if foreignDef.Paging == nil { - foreignDef.Paging = &filter.Paging{} + + // - page cursor on foreign datasource is not allowed + if foreignDef.Paging.PageCursor != nil { + return nil, fmt.Errorf("definition for foreign datasource may not define a page cursor") } // - key columns @@ -166,6 +165,11 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade return nil, err } + // @todo support this + if useSubSort && localDef.Paging.PageCursor != nil { + return nil, fmt.Errorf("paging not supported when the foreign datasource defines base sort") + } + if foreignDS != "" { if foreignDS != d.foreign.Name() { return nil, fmt.Errorf("foreign sort datasource not part of the join: %s", foreignDS) @@ -206,10 +210,8 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade // - prepare loader, closer mainLoader, mainCloser, err = prtDS.Partition(ctx, partitionSize, d.def.ForeignColumn, foreignDef) } else { - mainPageCap = defaultPageSize - if localDef.Paging != nil && localDef.Paging.Limit > 0 { - mainPageCap = localDef.Paging.Limit - } + mainPageCap = localDef.Paging.Limit + // nothing special needed mainLoader, mainCloser, err = d.base.Load(ctx, localDef) } @@ -381,10 +383,8 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade } // - determine partition size - partitionSize := defaultPartitionSize - if foreignDef.Paging != nil && foreignDef.Paging.Limit > 0 { - partitionSize = foreignDef.Paging.Limit - } + // +1 for paging reasons + partitionSize := foreignDef.Paging.Limit + 1 // - prepare key pre-filter foreignDef.Rows = d.keySliceToFilter(d.def.ForeignColumn, keys).MergeAnd(foreignDef.Rows) @@ -404,9 +404,21 @@ func (d *joinedDataset) Load(ctx context.Context, dd ...*FrameDefinition) (Loade } for i := range subFrames { + // meta subFrames[i].Name = foreignDef.Name subFrames[i].Source = foreignDef.Source subFrames[i].Ref = foreignDef.Ref + + // paging + if uint(len(subFrames[i].Rows)) >= partitionSize { + subFrames[i].Rows = subFrames[i].Rows[0 : partitionSize-1] + + if subFrames[i].Paging == nil { + subFrames[i].Paging = &filter.Paging{} + } + subFrames[i].Paging.NextPage = subFrames[i].CollectCursorValues(subFrames[i].LastRow(), foreignDef.Sorting...) + subFrames[i].Paging.NextPage.LThen = foreignDef.Sorting.Reversed() + } } } if err != nil { diff --git a/pkg/report/util.go b/pkg/report/util.go index e787487fd..020406127 100644 --- a/pkg/report/util.go +++ b/pkg/report/util.go @@ -2,7 +2,6 @@ package report import ( "reflect" - "strings" ) func isNil(i interface{}) bool { @@ -15,25 +14,3 @@ func isNil(i interface{}) bool { } return false } - -func dimensionOf(k string) string { - pp := strings.Split(k, ".") - if len(pp) < 2 { - return "" - } - - return pp[0] -} - -func columnOf(k string) string { - if k == columnWildcard { - return k - } - - pp := strings.Split(k, ".") - if len(pp) < 2 { - return "" - } - - return strings.Join(pp[1:], ".") -} diff --git a/store/rdbms/compose_record_datasource.go b/store/rdbms/compose_record_datasource.go index 5468eb98e..961c40f77 100644 --- a/store/rdbms/compose_record_datasource.go +++ b/store/rdbms/compose_record_datasource.go @@ -9,9 +9,11 @@ import ( "github.com/Masterminds/squirrel" "github.com/cortezaproject/corteza-server/compose/types" + "github.com/cortezaproject/corteza-server/pkg/filter" "github.com/cortezaproject/corteza-server/pkg/ql" "github.com/cortezaproject/corteza-server/pkg/report" "github.com/cortezaproject/corteza-server/pkg/slice" + "github.com/cortezaproject/corteza-server/store/rdbms/builders" "github.com/jmoiron/sqlx" ) @@ -30,8 +32,9 @@ type ( baseFilter *report.RowDefinition cols report.FrameColumnSet - qBuilder squirrel.SelectBuilder + q squirrel.SelectBuilder nestLevel int + nestLabel string levelColumns map[string]string } ) @@ -48,22 +51,29 @@ var ( // supportedGroupingFunctions = ... ) +// ComposeRecordDatasourceBuilder initializes and returns a datasource builder for compose record resource +// +// @todo try to make the resulting query as flat as possible func ComposeRecordDatasourceBuilder(s *Store, module *types.Module, ld *report.LoadStepDefinition) (report.Datasource, error) { var err error r := &recordDatasource{ - name: ld.Name, - module: module, - store: s, - cols: ld.Columns, + name: ld.Name, + module: module, + store: s, + cols: ld.Columns, + + // levelColumns help us keep track of what columns are currently available levelColumns: make(map[string]string), } - r.qBuilder, err = r.baseQuery(ld.Rows) - + r.q, err = r.baseQuery(ld.Rows) return r, err } +// Name returns the name we should use when referencing this datasource +// +// The name is determined from the user-specified name, or implied from the context. func (r *recordDatasource) Name() string { if r.name != "" { return r.name @@ -79,12 +89,31 @@ func (r *recordDatasource) Name() string { return r.module.Handle } -// @todo add Transform -// @todo try to make Group and Transform use the base query +func (r *recordDatasource) Describe() report.FrameDescriptionSet { + return report.FrameDescriptionSet{ + &report.FrameDescription{ + Source: r.Name(), + Columns: r.cols, + }, + } +} +func (r *recordDatasource) Load(ctx context.Context, dd ...*report.FrameDefinition) (l report.Loader, c report.Closer, err error) { + def := dd[0] + + q, err := r.preloadQuery(def) + if err != nil { + return nil, nil, err + } + + return r.load(ctx, def, q) +} + +// Group instructs the datasource to provide grouped and aggregated output func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, error) { defer func() { r.nestLevel++ + r.nestLabel = "group" r.name = name }() @@ -94,13 +123,12 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e ok = false ) - cls := r.levelColumns + auxLevelColumns := r.levelColumns r.levelColumns = make(map[string]string) - - gCols := make(report.FrameColumnSet, 0, 10) + groupCols := make(report.FrameColumnSet, 0, 10) for _, g := range d.Keys { - auxKind, ok = cls[g.Column] + auxKind, ok = auxLevelColumns[g.Column] if !ok { return false, fmt.Errorf("column %s does not exist on level %d", g.Column, r.nestLevel) } @@ -114,7 +142,7 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e if c.Label == "" { c.Label = c.Name } - gCols = append(gCols, c) + groupCols = append(groupCols, c) r.levelColumns[g.Name] = auxKind q = q.Column(fmt.Sprintf("%s as `%s`", g.Column, g.Name)). @@ -130,7 +158,7 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e return false, fmt.Errorf("the aggregation function is required when the column is omitted") } } else { - auxKind, ok = cls[c.Column] + auxKind, ok = auxLevelColumns[c.Column] if !ok { return false, fmt.Errorf("column %s does not exist on level %d", c.Column, r.nestLevel) } @@ -156,46 +184,28 @@ func (r *recordDatasource) Group(d report.GroupDefinition, name string) (bool, e if col.Label == "" { col.Label = col.Name } - gCols = append(gCols, col) + groupCols = append(groupCols, col) r.levelColumns[c.Name] = auxKind q = q. - Column(fmt.Sprintf("%s as `%s`", qParam, c.Name)) + Column(squirrel.Alias(squirrel.Expr(qParam), c.Name)) } if d.Rows != nil { // @todo validate groupping functions - hh, err := r.rowFilterToString("", gCols, d.Rows) + hh, err := r.rowFilterToString("", groupCols, d.Rows) if err != nil { return false, err } q = q.Having(hh) } - r.cols = gCols - r.qBuilder = q.FromSelect(r.qBuilder, fmt.Sprintf("l%d", r.nestLevel)) + r.cols = groupCols + r.q = q.FromSelect(r.q, fmt.Sprintf("l%d", r.nestLevel)) return true, nil } -func (r *recordDatasource) Describe() report.FrameDescriptionSet { - return report.FrameDescriptionSet{ - &report.FrameDescription{ - Source: r.Name(), - Columns: r.cols, - }, - } -} - -func (r *recordDatasource) Load(ctx context.Context, dd ...*report.FrameDefinition) (l report.Loader, c report.Closer, err error) { - def := dd[0] - - q, err := r.preloadQuery(def) - if err != nil { - return nil, nil, err - } - - return r.load(ctx, def, q) -} +// @todo add Transform func (r *recordDatasource) Partition(ctx context.Context, partitionSize uint, partitionCol string, dd ...*report.FrameDefinition) (l report.Loader, c report.Closer, err error) { def := dd[0] @@ -205,29 +215,31 @@ func (r *recordDatasource) Partition(ctx context.Context, partitionSize uint, pa return nil, nil, err } + var ss []string + if len(def.Sorting) > 0 { + ss, err = r.sortExpr(def.Sorting) + if err != nil { + return nil, nil, err + } + } + // the partitioning wrap // @todo move this to the DB driver package? // @todo squash the query a bit? try to move most of this to the base query to remove // one sub-select - prt := squirrel.Select(fmt.Sprintf("*, row_number() over(partition by %s order by %s) as pp_rank", partitionCol, partitionCol)). + prt := squirrel.Select(fmt.Sprintf("*, row_number() over(partition by %s order by %s) as pp_rank", partitionCol, strings.Join(ss, ","))). FromSelect(q, "partition_base") - // @odo make it better, please... - ss, err := r.sortExpr(def) - if err != nil { - return nil, nil, err - } - + // the sort is already defined when partitioning so it's unneeded here q = squirrel.Select("*"). FromSelect(prt, "partition_wrap"). - Where(fmt.Sprintf("pp_rank <= %d", partitionSize)). - OrderBy(ss...) + Where(fmt.Sprintf("pp_rank <= %d", partitionSize)) return r.load(ctx, def, q) } func (r *recordDatasource) preloadQuery(def *report.FrameDefinition) (squirrel.SelectBuilder, error) { - q := r.qBuilder + q := r.q // assure columns // - undefined columns = all columns @@ -248,56 +260,59 @@ func (r *recordDatasource) preloadQuery(def *report.FrameDefinition) (squirrel.S // when filtering/sorting, wrap the base query in a sub-select, so we don't need to // worry about exact column names. - // - // @todo flatten the query if def.Rows != nil || def.Sorting != nil { - wrap := squirrel.Select("*").FromSelect(q, "w_base") + q = squirrel.Select("*").FromSelect(q, "w_base") + } - // additional filtering - if def.Rows != nil { - f, err := r.rowFilterToString("", r.cols, def.Rows) - if err != nil { - return q, err - } - wrap = wrap.Where(f) + // - filtering + if def.Rows != nil { + f, err := r.rowFilterToString("", r.cols, def.Rows) + if err != nil { + return q, err } - - // additional sorting - if len(def.Sorting) > 0 { - ss, err := r.sortExpr(def) - if err != nil { - return q, err - } - - wrap = wrap.OrderBy(ss...) - } - - q = wrap + q = q.Where(f) } return q, nil } -func (r *recordDatasource) sortExpr(def *report.FrameDefinition) ([]string, error) { - ss := make([]string, len(def.Sorting)) - for i, c := range def.Sorting { - ci := r.cols.Find(c.Column) - if ci == -1 { - return nil, fmt.Errorf("sort column not resolved: %s", c.Column) - } +func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition, q squirrel.SelectBuilder) (l report.Loader, c report.Closer, err error) { + sort := def.Sorting - _, _, typeCast, err := r.store.config.CastModuleFieldToColumnType(r.cols[ci], c.Column) - if err != nil { - return nil, err + // - paging related stuff + if def.Paging.PageCursor != nil { + // Page cursor exists so we need to validate it against used sort + // To cover the case when paging cursor is set but sorting is empty, we collect the sorting instructions + // from the cursor. + // This (extracted sorting info) is then returned as part of response + if def.Sorting, err = def.Paging.PageCursor.Sort(def.Sorting); err != nil { + return nil, nil, err } - - ss[i] = r.store.config.SqlSortHandler(fmt.Sprintf(typeCast, c.Column), c.Descending) } - return ss, nil -} + // Cloned sorting instructions for the actual sorting + // Original must be kept for cursor creation + sort = def.Sorting.Clone() + + // When cursor for a previous page is used it's marked as reversed + // This tells us to flip the descending flag on all used sort keys + if def.Paging.PageCursor != nil && def.Paging.PageCursor.ROrder { + sort.Reverse() + } + + if def.Paging.PageCursor != nil { + q = q.Where(builders.CursorCondition(def.Paging.PageCursor, nil)) + } + + if len(sort) > 0 { + ss, err := r.sortExpr(sort) + if err != nil { + return nil, nil, err + } + + q = q.OrderBy(ss...) + } -func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition, q squirrel.SelectBuilder) (l report.Loader, c report.Closer, err error) { r.rows, err = r.store.Query(ctx, q) if err != nil { return nil, nil, fmt.Errorf("cannot execute query: %w", err) @@ -312,30 +327,32 @@ func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition checkCap := cap > 0 - // fetch & convert the data + // Fetch & convert the data. + // Go 1 over the requested cap to be able to determine if there are + // any additional pages i := 0 - // @todo make it in place f.Columns = def.Columns - f.Rows = make(report.FrameRowSet, 0, cap) + f.Rows = make(report.FrameRowSet, 0, cap+1) for r.rows.Next() { i++ - err = r.Cast(r.rows, f) + err = r.cast(r.rows, f) if err != nil { return nil, err } - if checkCap && i >= cap { + // If the count goes over the capacity, then we have a next page + if checkCap && i > cap { out := []*report.Frame{f} f = &report.Frame{} i = 0 - return out, nil + return r.calculatePaging(out, def.Sorting, uint(cap), def.Paging.PageCursor), nil } } if i > 0 { - return []*report.Frame{f}, nil + return r.calculatePaging([]*report.Frame{f}, def.Sorting, uint(cap), def.Paging.PageCursor), nil } return nil, nil }, func() { @@ -346,31 +363,61 @@ func (r *recordDatasource) load(ctx context.Context, def *report.FrameDefinition }, nil } -// @todo handle those rv_ prefixes; for now omitted +// baseQuery prepares the initial SQL that will be used for data access +// +// The query includes all of the requested columns in the required types to avid the need to type cast. func (r *recordDatasource) baseQuery(f *report.RowDefinition) (sqb squirrel.SelectBuilder, err error) { var ( joinTpl = "compose_record_value AS %s ON (%s.record_id = crd.id AND %s.name = '%s' AND %s.deleted_at IS NULL)" - - report = r.store.composeRecordsSelectBuilder(). - Where("crd.deleted_at IS NULL"). - Where("crd.module_id = ?", r.module.ID) ) - // Prepare all of the mod columns - // @todo make this as small as possible! - for _, f := range r.module.Fields { - report = report.LeftJoin(strings.ReplaceAll(joinTpl, "%s", f.Name)). - Column(f.Name + ".value as " + f.Name) - - r.levelColumns[f.Name] = f.Kind + // - the initial set of available columns + // + // @todo at what level should the requested columns be validated? + r.nestLevel = 0 + r.nestLabel = "base" + for _, c := range r.cols { + r.levelColumns[c.Name] = c.Kind } + // - base query + sqb = r.store.SelectBuilder("compose_record AS crd"). + Where("crd.deleted_at IS NULL"). + Where("crd.module_id = ?", r.module.ID). + Where("crd.rel_namespace = ?", r.module.NamespaceID) + + // - based on the definition, preload the columns + var ( + col string + is bool + isJoined = make(map[string]bool) + ) + for _, c := range r.cols { + if isJoined[c.Name] { + continue + } + isJoined[c.Name] = true + + // native record columns don't need any extra handling + if col, _, is = isRealRecordCol(c.Name); is { + sqb = sqb.Column(squirrel.Alias(squirrel.Expr(col), c.Name)) + continue + } + + // non-native record columns need to have their type casted before use + _, _, tcp, _ := r.store.config.CastModuleFieldToColumnType(c, c.Name) + sqb = sqb.LeftJoin(strings.ReplaceAll(joinTpl, "%s", c.Name)). + Column(squirrel.Alias(squirrel.Expr(fmt.Sprintf(tcp, c.Name+".value")), c.Name)) + } + + // - any initial filtering we may need to do + // + // @todo better support functions and their validation. if f != nil { - // @todo functions and function validation parser := ql.NewParser() parser.OnIdent = func(i ql.Ident) (ql.Ident, error) { if _, ok := r.levelColumns[i.Value]; !ok { - return i, fmt.Errorf("column %s does not exist on level %d", i.Value, r.nestLevel) + return i, fmt.Errorf("column %s does not exist on level %d (%s)", i.Value, r.nestLevel, r.nestLabel) } return i, nil @@ -380,17 +427,66 @@ func (r *recordDatasource) baseQuery(f *report.RowDefinition) (sqb squirrel.Sele if err != nil { return sqb, err } + astq, err := parser.ParseExpression(fl) if err != nil { return sqb, err } - report = report.Where(astq.String()) + + sqb = sqb.Where(astq.String()) } - return report, nil + return sqb, nil } -func (b *recordDatasource) Cast(row sqlx.ColScanner, out *report.Frame) error { +func (b *recordDatasource) calculatePaging(out []*report.Frame, sorting filter.SortExprSet, limit uint, cursor *filter.PagingCursor) []*report.Frame { + for _, o := range out { + var ( + hasPrev = cursor != nil + hasNext bool + ignoreLimit = limit == 0 + reversedOrder = cursor != nil && cursor.ROrder + ) + + hasNext = uint(len(o.Rows)) > limit + if !ignoreLimit && uint(len(o.Rows)) > limit { + o.Rows = o.Rows[:limit] + } + + if reversedOrder { + // Fetched set needs to be reversed because we've forced a descending order to get the previous page + for i, j := 0, len(o.Rows)-1; i < j; i, j = i+1, j-1 { + o.Rows[i], o.Rows[j] = o.Rows[j], o.Rows[i] + } + + // when in reverse-order rules on what cursor to return change + hasPrev, hasNext = hasNext, hasPrev + } + + if ignoreLimit { + return out + } + + if hasPrev { + o.Paging = &filter.Paging{} + o.Paging.PrevPage = o.CollectCursorValues(o.FirstRow(), sorting...) + o.Paging.PrevPage.ROrder = true + o.Paging.PrevPage.LThen = !sorting.Reversed() + } + + if hasNext { + if o.Paging == nil { + o.Paging = &filter.Paging{} + } + o.Paging.NextPage = o.CollectCursorValues(o.LastRow(), sorting...) + o.Paging.NextPage.LThen = sorting.Reversed() + } + } + + return out +} + +func (b *recordDatasource) cast(row sqlx.ColScanner, out *report.Frame) error { var err error aux := make(map[string]interface{}) if err = sqlx.MapScan(row, aux); err != nil { @@ -537,3 +633,22 @@ func isNil(i interface{}) bool { } return false } + +func (r *recordDatasource) sortExpr(sorting filter.SortExprSet) ([]string, error) { + ss := make([]string, len(sorting)) + for i, c := range sorting { + ci := r.cols.Find(c.Column) + if ci == -1 { + return nil, fmt.Errorf("sort column not resolved: %s", c.Column) + } + + _, _, typeCast, err := r.store.config.CastModuleFieldToColumnType(r.cols[ci], c.Column) + if err != nil { + return nil, err + } + + ss[i] = r.store.config.SqlSortHandler(fmt.Sprintf(typeCast, c.Column), c.Descending) + } + + return ss, nil +} diff --git a/tests/reporter/grouping_test.go b/tests/reporter/grouping_test.go index 280cae7cd..d6db16b29 100644 --- a/tests/reporter/grouping_test.go +++ b/tests/reporter/grouping_test.go @@ -112,4 +112,74 @@ func TestReporterGrouping(t *testing.T) { return nil }) }) + + t.Run("paging", func(t *testing.T) { + fd.Sorting = filter.SortExprSet{ + &filter.SortExpr{Column: "by_name", Descending: false}, + } + fd.Paging = &filter.Paging{ + Limit: 4, + } + + // ^ going up ^ + rr, err := model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r := rr[0] + h.a.NotNil(r.Paging) + h.a.NotNil(r.Paging.NextPage) + h.a.Nil(r.Paging.PrevPage) + h.a.Equal(4, r.Size()) + + req := []string{ + "Engel, 3, 179", + "Manu, 1, 61", + "Maria, 3, 183", + "Sascha, 1, 38", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Equal(req[i], r.String()) + return nil + }) + + fd.Paging.PageCursor = r.Paging.NextPage + rr, err = model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r = rr[0] + h.a.NotNil(r.Paging) + h.a.Nil(r.Paging.NextPage) + h.a.NotNil(r.Paging.PrevPage) + h.a.Equal(2, r.Size()) + req = []string{ + "Sigi, 1, 67", + "Ulli, 3, 122", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Equal(req[i], r.String()) + return nil + }) + + // v going down v + fd.Paging.PageCursor = r.Paging.PrevPage + rr, err = model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r = rr[0] + h.a.NotNil(r.Paging) + h.a.NotNil(r.Paging.NextPage) + h.a.Nil(r.Paging.PrevPage) + h.a.Equal(4, r.Size()) + + req = []string{ + "Engel, 3, 179", + "Manu, 1, 61", + "Maria, 3, 183", + "Sascha, 1, 38", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Equal(req[i], r.String()) + return nil + }) + }) } diff --git a/tests/reporter/joining_test.go b/tests/reporter/joining_test.go index 9e58c5aa5..e84ee5c51 100644 --- a/tests/reporter/joining_test.go +++ b/tests/reporter/joining_test.go @@ -493,6 +493,331 @@ func TestReporterJoining(t *testing.T) { return nil }) }) + + t.Run("paging", func(t *testing.T) { + rp.Frames[0].Paging = &filter.Paging{ + Limit: 5, + } + rp.Frames[0].Sorting = filter.SortExprSet{ + &filter.SortExpr{Column: "join_key", Descending: false}, + } + + rp.Frames[1].Paging = &filter.Paging{ + Limit: 2, + } + rp.Frames[1].Sorting = filter.SortExprSet{ + &filter.SortExpr{Column: "name", Descending: false}, + } + + // ^ going up ^ + + // // // PAGE 1 + rr, err := model.Load(ctx, rp.Frames...) + h.a.NoError(err) + h.a.Len(rr, 4) + local := rr[0] + ix := indexJoinedResult(rr) + _ = ix + + // local + h.a.Equal(5, local.Size()) + h.a.NotNil(local.Paging) + h.a.NotNil(local.Paging.NextPage) + h.a.Nil(local.Paging.PrevPage) + req := []string{ + ", Engel_Kempf, Engel, Kempf", + ", Engel_Kiefer, Engel, Kiefer", + ", Engel_Loritz, Engel, Loritz", + ", Manu_Specht, Manu, Specht", + ", Maria_Krüger, Maria, Krüger", + } + local.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Manu_Specht + f := ix["Manu_Specht"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Manu_Specht, u10 j1, d, 45, 56", + ", Manu_Specht, u10 j2, c, 83, 70", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Engel_Kiefer + f = ix["Engel_Kiefer"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Engel_Kiefer, u12 j1, a, 42, 69", + ", Engel_Kiefer, u12 j10, c, 79, 25", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Engel_Loritz + f = ix["Engel_Loritz"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Engel_Loritz, u3 j1, a, 10, 1", + ", Engel_Loritz, u3 j2, a, 0, 0", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // // // PAGE 2 + rp.Frames[0].Paging.PageCursor = local.Paging.NextPage + rr, err = model.Load(ctx, rp.Frames...) + h.a.NoError(err) + + h.a.Len(rr, 4) + local = rr[0] + ix = indexJoinedResult(rr) + _ = ix + + // local + h.a.Equal(5, local.Size()) + h.a.NotNil(local.Paging) + h.a.NotNil(local.Paging.NextPage) + h.a.NotNil(local.Paging.PrevPage) + req = []string{ + ", Maria_Königsmann, Maria, Königsmann", + ", Maria_Spannagel, Maria, Spannagel", + ", Sascha_Jans, Sascha, Jans", + ", Sigi_Goldschmidt, Sigi, Goldschmidt", + ", Ulli_Böhler, Ulli, Böhler", + ", Ulli_Förstner, Ulli, Förstner", + } + local.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Maria_Königsmann + f = ix["Maria_Königsmann"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Maria_Königsmann, u1 j1, a, 10, 2", + ", Maria_Königsmann, u1 j2, b, 20, 5", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Engel_Kiefer + f = ix["Ulli_Böhler"] + h.a.NotNil(f) + h.a.Nil(f.Paging) + req = []string{ + ", Ulli_Böhler, u5 j1, a, 1, 2", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Sigi_Goldschmidt + f = ix["Sigi_Goldschmidt"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Sigi_Goldschmidt, u7 j1, d, 10, 29", + ", Sigi_Goldschmidt, u7 j2, a, 10, 21", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // // // PAGE 3 + rp.Frames[0].Paging.PageCursor = local.Paging.NextPage + rr, err = model.Load(ctx, rp.Frames...) + h.a.NoError(err) + + h.a.Len(rr, 1) + local = rr[0] + ix = indexJoinedResult(rr) + _ = ix + + // local + h.a.Equal(2, local.Size()) + h.a.NotNil(local.Paging) + h.a.Nil(local.Paging.NextPage) + h.a.NotNil(local.Paging.PrevPage) + req = []string{ + ", Ulli_Förstner, Ulli, Förstner", + ", Ulli_Haupt, Ulli, Haupt", + } + local.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // v going down v + + // // // PAGE 2 + rp.Frames[0].Paging.PageCursor = local.Paging.PrevPage + rr, err = model.Load(ctx, rp.Frames...) + h.a.NoError(err) + + h.a.Len(rr, 4) + local = rr[0] + ix = indexJoinedResult(rr) + _ = ix + + // local + h.a.Equal(5, local.Size()) + h.a.NotNil(local.Paging) + h.a.NotNil(local.Paging.NextPage) + h.a.NotNil(local.Paging.PrevPage) + req = []string{ + ", Maria_Königsmann, Maria, Königsmann", + ", Maria_Spannagel, Maria, Spannagel", + ", Sascha_Jans, Sascha, Jans", + ", Sigi_Goldschmidt, Sigi, Goldschmidt", + ", Ulli_Böhler, Ulli, Böhler", + ", Ulli_Förstner, Ulli, Förstner", + } + local.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Maria_Königsmann + f = ix["Maria_Königsmann"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Maria_Königsmann, u1 j1, a, 10, 2", + ", Maria_Königsmann, u1 j2, b, 20, 5", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Engel_Kiefer + f = ix["Ulli_Böhler"] + h.a.NotNil(f) + h.a.Nil(f.Paging) + req = []string{ + ", Ulli_Böhler, u5 j1, a, 1, 2", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Sigi_Goldschmidt + f = ix["Sigi_Goldschmidt"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Sigi_Goldschmidt, u7 j1, d, 10, 29", + ", Sigi_Goldschmidt, u7 j2, a, 10, 21", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // // // PAGE 1 + rp.Frames[0].Paging.PageCursor = local.Paging.PrevPage + rr, err = model.Load(ctx, rp.Frames...) + h.a.NoError(err) + h.a.Len(rr, 4) + local = rr[0] + ix = indexJoinedResult(rr) + _ = ix + + // local + h.a.Equal(5, local.Size()) + h.a.NotNil(local.Paging) + h.a.NotNil(local.Paging.NextPage) + h.a.Nil(local.Paging.PrevPage) + req = []string{ + ", Engel_Kempf, Engel, Kempf", + ", Engel_Kiefer, Engel, Kiefer", + ", Engel_Loritz, Engel, Loritz", + ", Manu_Specht, Manu, Specht", + ", Maria_Krüger, Maria, Krüger", + } + local.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Manu_Specht + f = ix["Manu_Specht"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Manu_Specht, u10 j1, d, 45, 56", + ", Manu_Specht, u10 j2, c, 83, 70", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Engel_Kiefer + f = ix["Engel_Kiefer"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Engel_Kiefer, u12 j1, a, 42, 69", + ", Engel_Kiefer, u12 j10, c, 79, 25", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // Engel_Loritz + f = ix["Engel_Loritz"] + h.a.NotNil(f) + h.a.NotNil(f.Paging) + h.a.NotNil(f.Paging.NextPage) + h.a.Nil(f.Paging.PrevPage) + req = []string{ + ", Engel_Loritz, u3 j1, a, 10, 1", + ", Engel_Loritz, u3 j2, a, 0, 0", + } + f.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + }) } func indexJoinedResult(ff []*report.Frame) map[string]*report.Frame { diff --git a/tests/reporter/loading_test.go b/tests/reporter/loading_test.go index 440be747b..5ece694c1 100644 --- a/tests/reporter/loading_test.go +++ b/tests/reporter/loading_test.go @@ -124,6 +124,129 @@ func TestReporterLoading(t *testing.T) { return nil }) }) + + t.Run("paging", func(t *testing.T) { + fd.Paging = &filter.Paging{ + Limit: 5, + } + fd.Sorting = filter.SortExprSet{ + &filter.SortExpr{Column: "join_key", Descending: false}, + } + fd.Columns = report.FrameColumnSet{ + &report.FrameColumn{Name: "id", Kind: "Record"}, + &report.FrameColumn{Name: "join_key", Kind: "String"}, + &report.FrameColumn{Name: "first_name", Kind: "String"}, + } + + // ^ going up ^ + rr, err := model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r := rr[0] + h.a.NotNil(r.Paging) + h.a.NotNil(r.Paging.NextPage) + h.a.Nil(r.Paging.PrevPage) + h.a.Equal(5, r.Size()) + + // omit the ID's because they are generated + req := []string{ + ", Engel_Kempf, Engel", + ", Engel_Kiefer, Engel", + ", Engel_Loritz, Engel", + ", Manu_Specht, Manu", + ", Maria_Krüger, Maria", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + fd.Paging.PageCursor = r.Paging.NextPage + rr, err = model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r = rr[0] + h.a.NotNil(r.Paging) + h.a.NotNil(r.Paging.NextPage) + h.a.NotNil(r.Paging.PrevPage) + h.a.Equal(5, r.Size()) + + req = []string{ + ", Maria_Königsmann, Maria", + ", Maria_Spannagel, Maria", + ", Sascha_Jans, Sascha", + ", Sigi_Goldschmidt, Sigi", + ", Ulli_Böhler, Ulli", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + fd.Paging.PageCursor = r.Paging.NextPage + rr, err = model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r = rr[0] + h.a.NotNil(r.Paging) + h.a.Nil(r.Paging.NextPage) + h.a.NotNil(r.Paging.PrevPage) + h.a.Equal(2, r.Size()) + + req = []string{ + ", Ulli_Förstner, Ulli", + ", Ulli_Haupt, Ulli", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + // v going down v + fd.Paging.PageCursor = r.Paging.PrevPage + rr, err = model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r = rr[0] + h.a.NotNil(r.Paging) + h.a.NotNil(r.Paging.NextPage) + h.a.NotNil(r.Paging.PrevPage) + h.a.Equal(5, r.Size()) + + req = []string{ + ", Maria_Königsmann, Maria", + ", Maria_Spannagel, Maria", + ", Sascha_Jans, Sascha", + ", Sigi_Goldschmidt, Sigi", + ", Ulli_Böhler, Ulli", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + + fd.Paging.PageCursor = r.Paging.PrevPage + rr, err = model.Load(ctx, fd) + h.a.NoError(err) + h.a.Len(rr, 1) + r = rr[0] + h.a.NotNil(r.Paging) + h.a.NotNil(r.Paging.NextPage) + h.a.Nil(r.Paging.PrevPage) + h.a.Equal(5, r.Size()) + + req = []string{ + ", Engel_Kempf, Engel", + ", Engel_Kiefer, Engel", + ", Engel_Loritz, Engel", + ", Manu_Specht, Manu", + ", Maria_Krüger, Maria", + } + r.WalkRows(func(i int, r report.FrameRow) error { + h.a.Contains(r.String(), req[i]) + return nil + }) + }) } func prepare(t *testing.T, report string) (context.Context, helper, store.Storer, *auxReport) {