diff --git a/compose/dalutils/records.go b/compose/dalutils/records.go index 3662e9013..f3eaeed81 100644 --- a/compose/dalutils/records.go +++ b/compose/dalutils/records.go @@ -2,8 +2,6 @@ package dalutils import ( "context" - "math" - "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/pkg/dal" "github.com/cortezaproject/corteza-server/pkg/filter" @@ -118,20 +116,8 @@ func drainIterator(ctx context.Context, iter dal.Iterator, mod *types.Module, f // close iterator after we've drained it defer iter.Close() - const ( - // minimum amount of records we need to re-fetch - minRefetch = 10 - - // refetch 20% more records that we missed - refetchFactor = 1.2 - ) - var ( - // counter for false checks - checked uint - fetched uint - ok bool - r *types.Record + r *types.Record ) // Get the requested number of record @@ -141,86 +127,36 @@ func drainIterator(ctx context.Context, iter dal.Iterator, mod *types.Module, f set = make(types.RecordSet, 0, 1000) } - for f.Limit == 0 || uint(len(set)) < f.Limit { - // reset counters every drain - checked = 0 - fetched = 0 - - // drain whatever we fetched - for iter.Next(ctx) { - fetched++ - if err = iter.Err(); err != nil { - return - } - - r = prepareRecordTarget(mod) - if err = iter.Scan(r); err != nil { - return - } - - // check fetched record - if f.Check != nil { - if ok, err = f.Check(r); err != nil { - return - } else if !ok { - continue - } - } - - checked++ - set = append(set, r) - } - - // if an error occurred inside Next(), - // we need to stop draining - if err = iter.Err(); err != nil { - return - } - - if fetched == 0 || f.Limit == 0 || (0 < f.Limit && fetched < f.Limit) { - // do not re-fetch if: - // 1) nothing was fetch in the previous run - // 2) there was no limit (everything was fetched) - // 3) there are less fetched items then value of limit - break - } - - // Fetch more records - if checked > 0 { - howMuchMore := checked - if howMuchMore < minRefetch { - howMuchMore = minRefetch - } - - howMuchMore = uint(math.Floor(float64(howMuchMore) * refetchFactor)) - - // request more items - if err = iter.More(howMuchMore, r); err != nil { - return - } - } - } - // Make out filter outFilter = f pp := f.Paging.Clone() - if len(set) > 0 && f.PrevPage != nil { - pp.PrevPage, err = iter.BackCursor(set[0]) - if err != nil { + err = dal.IteratorPaging(ctx, iter, pp, f.Sorting, func(i dal.Iterator) (out dal.ValueGetter, ok bool) { + r = prepareRecordTarget(mod) + if err = i.Scan(r); err != nil { return } - } - if len(set) > 0 { - pp.NextPage, err = iter.ForwardCursor(set[len(set)-1]) - if err != nil { - return + // check fetched record + if f.Check != nil { + if ok, err = f.Check(r); err != nil { + return nil, false + } else if !ok { + return nil, ok + } } + + if f.Limit == 0 || uint(len(set)) < f.Limit { + set = append(set, r) + } + + return r, true + }) + if err != nil { + return } outFilter.Paging = *pp - outFilter.Total = uint(len(set)) return } diff --git a/pkg/dal/iterator.go b/pkg/dal/iterator.go index 72ae5957b..9c34d5279 100644 --- a/pkg/dal/iterator.go +++ b/pkg/dal/iterator.go @@ -3,6 +3,7 @@ package dal import ( "context" "encoding/json" + "fmt" "github.com/cortezaproject/corteza-server/pkg/filter" "io" ) @@ -63,3 +64,123 @@ func IteratorEncodeJSON(ctx context.Context, w io.Writer, iter Iterator, initTar return } + +// IteratorPaging helper function for record paging cursor and total +func IteratorPaging(ctx context.Context, iter Iterator, pp *filter.Paging, ss filter.Sorting, fn func(i Iterator) (ValueGetter, bool)) (err error) { + if pp == nil { + return + } + + if pp.PageCursor != nil { + if pp.IncPageNavigation || pp.IncTotal { + return fmt.Errorf("not allowed to fetch page navigation or total item count with page cursor") + } + } + + pp.Total = 0 + pp.PrevPage = nil + pp.NextPage = nil + pp.PageNavigation = []*filter.Page{} + + const howMuchMore = 1000 + + var ( + counter uint + total uint + fetchMore bool + + cur *filter.PagingCursor + page = filter.Page{ + Page: 1, + Count: 0, + Cursor: nil, + } + ) + + for iter.Next(ctx) { + if err = iter.Err(); err != nil { + return + } + + r, ok := fn(iter) + if !ok { + continue + } + + counter++ + total++ + page.Count++ + + if pp.PrevPage == nil { + pp.PrevPage, err = iter.BackCursor(r) + if err != nil { + return + } + } + + // cursor for each page + cur, err = iter.ForwardCursor(r) + if err != nil { + return + } + + if total%pp.Limit == 0 { + pp.PageNavigation = append(pp.PageNavigation, &filter.Page{ + Page: page.Page, + Count: page.Count, + Cursor: page.Cursor, + }) + + // prep next page + page = filter.Page{ + Page: uint(len(pp.PageNavigation) + 1), + Count: 0, + Cursor: cur, + } + + // fetch more records + fetchMore = true + + if pp.NextPage == nil { + pp.NextPage = cur + } + } + + if (len(pp.PageNavigation) == 1 && fetchMore) || counter == howMuchMore { + counter = 0 + fetchMore = false + + // request more items + if err = iter.More(howMuchMore, r); err != nil { + return + } + } + } + + // push the last page to page navigation + if page.Count > 0 { + pp.PageNavigation = append(pp.PageNavigation, &filter.Page{ + Page: page.Page, + Count: page.Count, + Cursor: page.Cursor, + }) + } + + if pp.PageCursor == nil { + pp.PrevPage = nil + } + + if pp.NextPage != nil && len(pp.PageNavigation) == 1 { + pp.NextPage = nil + } + + if pp.IncTotal { + pp.Total = total + } + + if !pp.IncPageNavigation { + pp.PageNavigation = nil + } + + return +} diff --git a/store/adapters/rdbms/drivers/types.go b/store/adapters/rdbms/drivers/types.go index 5ffd16f58..1a519a725 100644 --- a/store/adapters/rdbms/drivers/types.go +++ b/store/adapters/rdbms/drivers/types.go @@ -121,7 +121,7 @@ func (t *TypeTimestamp) Decode(raw any) (any, bool, error) { } if dec.Valid { - return dec.Time.Format(TimestampLayout(t.Timezone, t.Precision)), dec.Valid, nil + return dec.Time.UTC().Format(TimestampLayout(t.Timezone, t.Precision)), dec.Valid, nil } return nil, false, nil @@ -300,7 +300,7 @@ func TimestampLayout(tz bool, precision int) string { } func TimeLayout(tz bool, precision int) string { - var layout = "15:04:05" + var layout = "15:04:05.00000" if precision > 0 { layout += "." + strings.Repeat("9", precision) } diff --git a/tests/compose/record_test.go b/tests/compose/record_test.go index 6672544f6..986ffac3a 100644 --- a/tests/compose/record_test.go +++ b/tests/compose/record_test.go @@ -160,8 +160,6 @@ func (h helper) makeRecord(module *types.Module, rvs ...*types.RecordValue) *typ } rec.SetModule(module) - rec.SetModule(module) - h.noError(dalutils.ComposeRecordCreate(context.Background(), defDal, module, rec)) return rec @@ -211,6 +209,72 @@ func TestRecordList(t *testing.T) { End() } +func TestRecordListWithPaginationAndSorting(t *testing.T) { + h := newHelper(t) + h.clearRecords() + + module := h.repoMakeRecordModuleWithFields("record testing module") + helpers.AllowMe(h, module.RbacResource(), "records.search") + + var aux = struct { + Response struct { + Filter struct { + NextPage *string + PrevPage *string + PageNavigation []struct { + Page int + Items int + Cursor *string + } + } + } + }{} + + for i := 0; i < 7; i++ { + h.makeRecord(module, &types.RecordValue{Name: "name", Value: fmt.Sprintf("%d", i+1)}) + } + + // 1st page + h.apiInit(). + Get(fmt.Sprintf("/namespace/%d/module/%d/record/", module.NamespaceID, module.ID)). + Query("incTotal", "true"). + Query("incPageNavigation", "true"). + Query("limit", "2"). + Query("sort", "createdAt DESC"). + Header("Accept", "application/json"). + Expect(t). + Status(http.StatusOK). + Assert(helpers.AssertNoErrors). + Assert(jsonpath.Equal(`$.response.set[0].values[0].value`, "7")). + Assert(jsonpath.Equal(`$.response.set[1].values[0].value`, "6")). + Assert(jsonpath.Equal(`$.response.filter.total`, float64(7))). + Assert(jsonpath.Present(`$.response.filter.pageNavigation`)). + Assert(jsonpath.Len(`$.response.filter.pageNavigation`, 4)). + End(). + JSON(&aux) + + h.a.Len(aux.Response.Filter.PageNavigation, 4) + h.a.NotNil(aux.Response.Filter.PageNavigation[1].Cursor) + + // 2nd page + h.apiInit(). + Get(fmt.Sprintf("/namespace/%d/module/%d/record/", module.NamespaceID, module.ID)). + Query("incTotal", "false"). + Query("incPageNavigation", "false"). + Query("limit", "2"). + Query("pageCursor", *aux.Response.Filter.PageNavigation[1].Cursor). + Query("sort", "createdAt DESC"). + Header("Accept", "application/json"). + Expect(t). + Status(http.StatusOK). + Assert(helpers.AssertNoErrors). + Assert(jsonpath.Equal(`$.response.set[0].values[0].value`, "5")). + Assert(jsonpath.Equal(`$.response.set[1].values[0].value`, "4")). + Assert(jsonpath.NotPresent(`$.response.filter.total`)). + Assert(jsonpath.NotPresent(`$.response.filter.pageNavigation`)). + End() +} + func TestRecordListForbiddenRecords(t *testing.T) { h := newHelper(t) h.clearRecords()