From ef72ffa3739bcd7693c64d8652e453bb765d73c6 Mon Sep 17 00:00:00 2001 From: Vivek Patel Date: Wed, 28 Sep 2022 12:24:43 +0530 Subject: [PATCH] Fix total and page navigation for records Introduce helper func(IteratorPaging) to count total and create page navigation for records and also fixes timezone and milliseconds fir dateTime field value for query. --- compose/dalutils/records.go | 104 +++++----------------- pkg/dal/iterator.go | 121 ++++++++++++++++++++++++++ store/adapters/rdbms/drivers/types.go | 4 +- tests/compose/record_test.go | 68 ++++++++++++++- 4 files changed, 209 insertions(+), 88 deletions(-) diff --git a/compose/dalutils/records.go b/compose/dalutils/records.go index 3662e9013..f3eaeed81 100644 --- a/compose/dalutils/records.go +++ b/compose/dalutils/records.go @@ -2,8 +2,6 @@ package dalutils import ( "context" - "math" - "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/pkg/dal" "github.com/cortezaproject/corteza-server/pkg/filter" @@ -118,20 +116,8 @@ func drainIterator(ctx context.Context, iter dal.Iterator, mod *types.Module, f // close iterator after we've drained it defer iter.Close() - const ( - // minimum amount of records we need to re-fetch - minRefetch = 10 - - // refetch 20% more records that we missed - refetchFactor = 1.2 - ) - var ( - // counter for false checks - checked uint - fetched uint - ok bool - r *types.Record + r *types.Record ) // Get the requested number of record @@ -141,86 +127,36 @@ func drainIterator(ctx context.Context, iter dal.Iterator, mod *types.Module, f set = make(types.RecordSet, 0, 1000) } - for f.Limit == 0 || uint(len(set)) < f.Limit { - // reset counters every drain - checked = 0 - fetched = 0 - - // drain whatever we fetched - for iter.Next(ctx) { - fetched++ - if err = iter.Err(); err != nil { - return - } - - r = prepareRecordTarget(mod) - if err = iter.Scan(r); err != nil { - return - } - - // check fetched record - if f.Check != nil { - if ok, err = f.Check(r); err != nil { - return - } else if !ok { - continue - } - } - - checked++ - set = append(set, r) - } - - // if an error occurred inside Next(), - // we need to stop draining - if err = iter.Err(); err != nil { - return - } - - if fetched == 0 || f.Limit == 0 || (0 < f.Limit && fetched < f.Limit) { - // do not re-fetch if: - // 1) nothing was fetch in the previous run - // 2) there was no limit (everything was fetched) - // 3) there are less fetched items then value of limit - break - } - - // Fetch more records - if checked > 0 { - howMuchMore := checked - if howMuchMore < minRefetch { - howMuchMore = minRefetch - } - - howMuchMore = uint(math.Floor(float64(howMuchMore) * refetchFactor)) - - // request more items - if err = iter.More(howMuchMore, r); err != nil { - return - } - } - } - // Make out filter outFilter = f pp := f.Paging.Clone() - if len(set) > 0 && f.PrevPage != nil { - pp.PrevPage, err = iter.BackCursor(set[0]) - if err != nil { + err = dal.IteratorPaging(ctx, iter, pp, f.Sorting, func(i dal.Iterator) (out dal.ValueGetter, ok bool) { + r = prepareRecordTarget(mod) + if err = i.Scan(r); err != nil { return } - } - if len(set) > 0 { - pp.NextPage, err = iter.ForwardCursor(set[len(set)-1]) - if err != nil { - return + // check fetched record + if f.Check != nil { + if ok, err = f.Check(r); err != nil { + return nil, false + } else if !ok { + return nil, ok + } } + + if f.Limit == 0 || uint(len(set)) < f.Limit { + set = append(set, r) + } + + return r, true + }) + if err != nil { + return } outFilter.Paging = *pp - outFilter.Total = uint(len(set)) return } diff --git a/pkg/dal/iterator.go b/pkg/dal/iterator.go index 72ae5957b..9c34d5279 100644 --- a/pkg/dal/iterator.go +++ b/pkg/dal/iterator.go @@ -3,6 +3,7 @@ package dal import ( "context" "encoding/json" + "fmt" "github.com/cortezaproject/corteza-server/pkg/filter" "io" ) @@ -63,3 +64,123 @@ func IteratorEncodeJSON(ctx context.Context, w io.Writer, iter Iterator, initTar return } + +// IteratorPaging helper function for record paging cursor and total +func IteratorPaging(ctx context.Context, iter Iterator, pp *filter.Paging, ss filter.Sorting, fn func(i Iterator) (ValueGetter, bool)) (err error) { + if pp == nil { + return + } + + if pp.PageCursor != nil { + if pp.IncPageNavigation || pp.IncTotal { + return fmt.Errorf("not allowed to fetch page navigation or total item count with page cursor") + } + } + + pp.Total = 0 + pp.PrevPage = nil + pp.NextPage = nil + pp.PageNavigation = []*filter.Page{} + + const howMuchMore = 1000 + + var ( + counter uint + total uint + fetchMore bool + + cur *filter.PagingCursor + page = filter.Page{ + Page: 1, + Count: 0, + Cursor: nil, + } + ) + + for iter.Next(ctx) { + if err = iter.Err(); err != nil { + return + } + + r, ok := fn(iter) + if !ok { + continue + } + + counter++ + total++ + page.Count++ + + if pp.PrevPage == nil { + pp.PrevPage, err = iter.BackCursor(r) + if err != nil { + return + } + } + + // cursor for each page + cur, err = iter.ForwardCursor(r) + if err != nil { + return + } + + if total%pp.Limit == 0 { + pp.PageNavigation = append(pp.PageNavigation, &filter.Page{ + Page: page.Page, + Count: page.Count, + Cursor: page.Cursor, + }) + + // prep next page + page = filter.Page{ + Page: uint(len(pp.PageNavigation) + 1), + Count: 0, + Cursor: cur, + } + + // fetch more records + fetchMore = true + + if pp.NextPage == nil { + pp.NextPage = cur + } + } + + if (len(pp.PageNavigation) == 1 && fetchMore) || counter == howMuchMore { + counter = 0 + fetchMore = false + + // request more items + if err = iter.More(howMuchMore, r); err != nil { + return + } + } + } + + // push the last page to page navigation + if page.Count > 0 { + pp.PageNavigation = append(pp.PageNavigation, &filter.Page{ + Page: page.Page, + Count: page.Count, + Cursor: page.Cursor, + }) + } + + if pp.PageCursor == nil { + pp.PrevPage = nil + } + + if pp.NextPage != nil && len(pp.PageNavigation) == 1 { + pp.NextPage = nil + } + + if pp.IncTotal { + pp.Total = total + } + + if !pp.IncPageNavigation { + pp.PageNavigation = nil + } + + return +} diff --git a/store/adapters/rdbms/drivers/types.go b/store/adapters/rdbms/drivers/types.go index 5ffd16f58..1a519a725 100644 --- a/store/adapters/rdbms/drivers/types.go +++ b/store/adapters/rdbms/drivers/types.go @@ -121,7 +121,7 @@ func (t *TypeTimestamp) Decode(raw any) (any, bool, error) { } if dec.Valid { - return dec.Time.Format(TimestampLayout(t.Timezone, t.Precision)), dec.Valid, nil + return dec.Time.UTC().Format(TimestampLayout(t.Timezone, t.Precision)), dec.Valid, nil } return nil, false, nil @@ -300,7 +300,7 @@ func TimestampLayout(tz bool, precision int) string { } func TimeLayout(tz bool, precision int) string { - var layout = "15:04:05" + var layout = "15:04:05.00000" if precision > 0 { layout += "." + strings.Repeat("9", precision) } diff --git a/tests/compose/record_test.go b/tests/compose/record_test.go index 6672544f6..986ffac3a 100644 --- a/tests/compose/record_test.go +++ b/tests/compose/record_test.go @@ -160,8 +160,6 @@ func (h helper) makeRecord(module *types.Module, rvs ...*types.RecordValue) *typ } rec.SetModule(module) - rec.SetModule(module) - h.noError(dalutils.ComposeRecordCreate(context.Background(), defDal, module, rec)) return rec @@ -211,6 +209,72 @@ func TestRecordList(t *testing.T) { End() } +func TestRecordListWithPaginationAndSorting(t *testing.T) { + h := newHelper(t) + h.clearRecords() + + module := h.repoMakeRecordModuleWithFields("record testing module") + helpers.AllowMe(h, module.RbacResource(), "records.search") + + var aux = struct { + Response struct { + Filter struct { + NextPage *string + PrevPage *string + PageNavigation []struct { + Page int + Items int + Cursor *string + } + } + } + }{} + + for i := 0; i < 7; i++ { + h.makeRecord(module, &types.RecordValue{Name: "name", Value: fmt.Sprintf("%d", i+1)}) + } + + // 1st page + h.apiInit(). + Get(fmt.Sprintf("/namespace/%d/module/%d/record/", module.NamespaceID, module.ID)). + Query("incTotal", "true"). + Query("incPageNavigation", "true"). + Query("limit", "2"). + Query("sort", "createdAt DESC"). + Header("Accept", "application/json"). + Expect(t). + Status(http.StatusOK). + Assert(helpers.AssertNoErrors). + Assert(jsonpath.Equal(`$.response.set[0].values[0].value`, "7")). + Assert(jsonpath.Equal(`$.response.set[1].values[0].value`, "6")). + Assert(jsonpath.Equal(`$.response.filter.total`, float64(7))). + Assert(jsonpath.Present(`$.response.filter.pageNavigation`)). + Assert(jsonpath.Len(`$.response.filter.pageNavigation`, 4)). + End(). + JSON(&aux) + + h.a.Len(aux.Response.Filter.PageNavigation, 4) + h.a.NotNil(aux.Response.Filter.PageNavigation[1].Cursor) + + // 2nd page + h.apiInit(). + Get(fmt.Sprintf("/namespace/%d/module/%d/record/", module.NamespaceID, module.ID)). + Query("incTotal", "false"). + Query("incPageNavigation", "false"). + Query("limit", "2"). + Query("pageCursor", *aux.Response.Filter.PageNavigation[1].Cursor). + Query("sort", "createdAt DESC"). + Header("Accept", "application/json"). + Expect(t). + Status(http.StatusOK). + Assert(helpers.AssertNoErrors). + Assert(jsonpath.Equal(`$.response.set[0].values[0].value`, "5")). + Assert(jsonpath.Equal(`$.response.set[1].values[0].value`, "4")). + Assert(jsonpath.NotPresent(`$.response.filter.total`)). + Assert(jsonpath.NotPresent(`$.response.filter.pageNavigation`)). + End() +} + func TestRecordListForbiddenRecords(t *testing.T) { h := newHelper(t) h.clearRecords()