diff --git a/pkg/dal/exec_aggregate.go b/pkg/dal/exec_aggregate.go index e8e1a96d1..73feaf161 100644 --- a/pkg/dal/exec_aggregate.go +++ b/pkg/dal/exec_aggregate.go @@ -24,7 +24,7 @@ type ( aggregateDefs []aggregateAttr rowTester tester - keyMaker keyMaker + keyWalker keyWalker // Keep track of the registered groupIndex so we can easier implement the iterator. // Simple index tracking won't be enough, because btree is self-balancing -- things change @@ -37,7 +37,7 @@ type ( ctr int } - keyMaker func(context.Context, ValueGetter) (groupKey, error) + keyWalker func(context.Context, ValueGetter, func(context.Context, groupKey, ValueGetter) error) error ) // init initializes the execution step's state @@ -56,7 +56,8 @@ func (xs *aggregate) init(ctx context.Context) (err error) { for _, a := range xs.groupDefs { kk = append(kk, a.expr) } - xs.keyMaker, err = aggregateGroupKeyMaker(kk...) + + xs.keyWalker, err = aggregateGroupKeyWalker(kk...) if err != nil { return } @@ -257,7 +258,6 @@ func (xs *aggregate) pullEntireSource(ctx context.Context) (err error) { } // Drain the source - var k groupKey for xs.source.Next(ctx) { err = xs.source.Scan(r) if err != nil { @@ -267,13 +267,7 @@ func (xs *aggregate) pullEntireSource(ctx context.Context) (err error) { // Get the key for this row // @todo we probably can reuse the key or at least cache keys and avoid re-computation. // My fairly hacky attempt boosted performance by ~20% - k, err = xs.keyMaker(ctx, r) - if err != nil { - return - } - - // Add the row to the group - err = xs.addToGroup(ctx, k, r) + err = xs.keyWalker(ctx, r, xs.addToGroup) if err != nil { return } @@ -455,31 +449,141 @@ func (xs *aggregate) initScanRow() (out *Row) { return } -// aggregateGroupKeyMaker returns a function that computes a group key for the given row -func aggregateGroupKeyMaker(kk ...*ql.ASTNode) (out keyMaker, err error) { - runners := make([]*runnerGval, len(kk)) - - // Initialize evaluators for every key definition +// aggregateGroupKeyWalker prepares a function which runs the provided function over all group keys +// +// This is required to handle multi-value attributes when used in group keys. +// +// Algorithm TL;DR +// +// We're going backwards in the slice of idents we need to handle (going the other way around) +// should also work but I chose to do it like so. +// +// For every ident, keep track of the number of elements and the index of the last unhandled ident. +// +// For every iteration, collect all of the values pointed to by the unused index. +// After the key is processed, increment the counter for the current ident. +// If the counter exceeds the number of elements, reset the counter for the current ident and move +// the ident pointer backwards (repeat if that ident's counter also exceeds the limit). +// +// When we find an indent which still has some items to process (counter doesn't exceed limit), +// reset the ident pointer to the end of the slice and repeat the whole thing. +func aggregateGroupKeyWalker(kk ...*ql.ASTNode) (out keyWalker, err error) { // @todo option to copy constants and idents + runners, err := makeExprRunners(kk...) + if err != nil { + return + } + + // We'll sort the idents to keep the output consistent; the order doesn't matter + // but it will simplify testing. + idents, hasConstants := keysFromExpr(kk...) + sort.Strings(idents) + + out = func(ctx context.Context, vg ValueGetter, run func(context.Context, groupKey, ValueGetter) error) error { + // Edgecase for when all of the expressions return constant values + if len(idents) == 0 { + // This should be impossible but better safe then sorry + if !hasConstants { + return nil + } + + k, err := makeGroupKey(ctx, runners, vg) + if err != nil { + return err + } + + return run(ctx, k, vg) + } + + // For every value combination of idents used in agg. key expressions + // construct a key and run the runner. + + limits := vg.CountValues() + ptr := len(idents) - 1 + counts := make(map[string]uint, len(limits)) + for k := range limits { + counts[k] = 0 + } + + handle := func() (err error) { + aux := make(map[string]any, len(limits)) + + for k, i := range counts { + aux[k], err = vg.GetValue(k, i) + if err != nil { + return + } + } + + kk, err := makeGroupKey(ctx, runners, aux) + if err != nil { + return err + } + + return run(ctx, kk, vg) + } + + outer: + for ptr >= 0 { + handle() + + counts[idents[ptr]]++ + for { + // There are still values to process so we can skip the rest + if counts[idents[ptr]] < limits[idents[ptr]] { + continue outer + } + + // Reset the counter for the current ptr since we'll move back. + // The outer loop will have this reset all of the counters after the current ptr. + counts[idents[ptr]] = 0 + + ptr-- + if ptr < 0 { + break outer + } + + counts[idents[ptr]]++ + + if counts[idents[ptr]] >= limits[idents[ptr]] { + continue + } + + // We need to reset to the end so the next ident gets all of the values + // that appear after it. + ptr = len(idents) - 1 + break + } + } + + return nil + } + + return +} + +func makeExprRunners(kk ...*ql.ASTNode) (out []*runnerGval, err error) { + out = make([]*runnerGval, len(kk)) + for i, k := range kk { - runners[i], err = newRunnerGvalParsed(k) + out[i], err = newRunnerGvalParsed(k) if err != nil { return } } - out = func(ctx context.Context, vg ValueGetter) (gk groupKey, err error) { - gk = make(groupKey, len(runners)) - for i, r := range runners { - v, err := r.Eval(ctx, vg) - if err != nil { - return nil, err - } - - gk[i] = v - } - return gk, nil - } - return } + +func makeGroupKey(ctx context.Context, runners []*runnerGval, vals any) (gk groupKey, err error) { + gk = make(groupKey, len(runners)) + for i, r := range runners { + v, err := r.Eval(ctx, vals) + if err != nil { + return nil, err + } + + gk[i] = v + } + return gk, nil +} diff --git a/pkg/dal/exec_aggregate_test.go b/pkg/dal/exec_aggregate_test.go index 3020cbf2e..a215c706b 100644 --- a/pkg/dal/exec_aggregate_test.go +++ b/pkg/dal/exec_aggregate_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/cortezaproject/corteza-server/pkg/filter" + "github.com/cortezaproject/corteza-server/pkg/ql" "github.com/stretchr/testify/require" ) @@ -24,8 +25,9 @@ func TestStepAggregate(t *testing.T) { outAttributes []simpleAttribute sourceAttributes []simpleAttribute - in []simpleRow - out []simpleRow + inSimple []simpleRow + inComplex []*Row + out []simpleRow f internalFilter } @@ -43,7 +45,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "g1", "v1": 10, "txt": "foo"}, {"k1": "g1", "v1": 20, "txt": "fas"}, {"k1": "g2", "v1": 15, "txt": "bar"}, @@ -68,7 +70,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "g1", "v1": 10, "txt": "foo"}, {"k1": "g1", "v1": 20, "txt": "fas"}, {"k1": "g2", "v1": 15, "txt": "bar"}, @@ -94,7 +96,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "a", "v1": 2, "txt": "fas"}, {"k1": "a", "k2": "b", "v1": 3, "txt": "fas"}, @@ -124,7 +126,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(add(v1, 2))", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "g1", "v1": 10, "txt": "foo"}, {"k1": "g1", "v1": 20, "txt": "fas"}, {"k1": "g2", "v1": 15, "txt": "bar"}, @@ -154,7 +156,7 @@ func TestStepAggregate(t *testing.T) { }, }, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "a", "v1": 2, "txt": "fas"}, {"k1": "a", "k2": "b", "v1": 3, "txt": "fas"}, @@ -186,7 +188,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "a", "v1": 2, "txt": "fas"}, {"k1": "a", "k2": "b", "v1": 3, "txt": "fas"}, @@ -218,7 +220,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "b", "v1": 2, "txt": "fas"}, {"k1": "b", "k2": "a", "v1": 3, "txt": "fas"}, @@ -249,7 +251,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "a", "v1": 2, "txt": "fas"}, {"k1": "a", "k2": "b", "v1": 3, "txt": "fas"}, @@ -281,7 +283,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "a", "v1": 2, "txt": "fas"}, {"k1": "a", "k2": "b", "v1": 3, "txt": "fas"}, @@ -313,7 +315,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "a", "v1": 2, "txt": "fas"}, {"k1": "a", "k2": "b", "v1": 3, "txt": "fas"}, @@ -349,7 +351,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "a", "v1": 2, "txt": "fas"}, {"k1": "a", "k2": "b", "v1": 3, "txt": "fas"}, @@ -380,7 +382,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "v1": 2, "txt": "fas"}, {"k1": "b", "v1": 3, "txt": "fas"}, @@ -406,7 +408,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "v1": 2, "txt": "fas"}, {"k1": "b", "v1": 3, "txt": "fas"}, @@ -432,7 +434,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "v1": 2, "txt": "fas"}, {"k1": "b", "v1": 3, "txt": "fas"}, @@ -458,7 +460,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "v1": 2, "txt": "fas"}, {"k1": "b", "v1": 3, "txt": "fas"}, @@ -486,7 +488,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "b", "v1": 2, "txt": "fas"}, {"k1": "b", "k2": "c", "v1": 3, "txt": "fas"}, @@ -515,7 +517,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "b", "v1": 2, "txt": "fas"}, {"k1": "b", "k2": "c", "v1": 3, "txt": "fas"}, @@ -544,7 +546,7 @@ func TestStepAggregate(t *testing.T) { expr: "sum(v1)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"k1": "a", "k2": "a", "v1": 10, "txt": "foo"}, {"k1": "a", "k2": "b", "v1": 2, "txt": "fas"}, {"k1": "b", "k2": "c", "v1": 3, "txt": "fas"}, @@ -578,7 +580,7 @@ func TestStepAggregate(t *testing.T) { expr: "count(name)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"dob": "2022-10-20T09:44:49Z", "name": "Ana"}, {"dob": "2022-10-20T09:44:49Z", "name": "John"}, {"dob": "2021-10-20T09:44:49Z", "name": "Jane"}, @@ -608,7 +610,7 @@ func TestStepAggregate(t *testing.T) { expr: "count(name)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"dob": "2022-10-20T09:44:49Z", "name": "Ana"}, {"dob": "2022-10-20T09:44:49Z", "name": "John"}, {"dob": "2021-10-20T09:44:49Z", "name": "Jane"}, @@ -638,7 +640,7 @@ func TestStepAggregate(t *testing.T) { expr: "count(name)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"name": "Ana"}, {"name": "John"}, {"name": "Jane"}, @@ -666,7 +668,7 @@ func TestStepAggregate(t *testing.T) { expr: "count(name)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"name": "Ana"}, {"name": "John"}, {"name": "Jane"}, @@ -695,7 +697,7 @@ func TestStepAggregate(t *testing.T) { expr: "count(name)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"dob": "2022-10-20T09:44:49Z", "name": "Ana"}, {"dob": "2022-10-20T09:44:49Z", "name": "John"}, {"dob": "2021-10-20T09:44:49Z", "name": "Jane"}, @@ -727,7 +729,7 @@ func TestStepAggregate(t *testing.T) { expr: "count(name)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"thing": "A", "name": "Ana"}, {"name": "John"}, {"name": "Jane"}, @@ -759,7 +761,7 @@ func TestStepAggregate(t *testing.T) { expr: "count(name)", }}, - in: []simpleRow{ + inSimple: []simpleRow{ {"thing": "A", "another": "A", "name": "Ana"}, {"thing": "A", "name": "Ana"}, {"another": "A", "name": "Ana"}, @@ -780,20 +782,76 @@ func TestStepAggregate(t *testing.T) { }, } + multiValues := []testCase{ + { + name: "multi value val", + sourceAttributes: basicAttrs, + group: []simpleAttribute{{ + ident: "k1", + }}, + outAttributes: []simpleAttribute{{ + ident: "v1", + expr: "sum(v1)", + }}, + + inComplex: []*Row{ + (&Row{}).WithValue("k1", 0, "g1").WithValue("v1", 0, 10).WithValue("v1", 1, 10), + (&Row{}).WithValue("k1", 0, "g2").WithValue("v1", 0, 10), + }, + + out: []simpleRow{ + {"k1": "g1", "v1": float64(20)}, + {"k1": "g2", "v1": float64(10)}, + }, + + f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}}, + }, + { + name: "multi value group", + sourceAttributes: basicAttrs, + group: []simpleAttribute{{ + ident: "k1", + }}, + outAttributes: []simpleAttribute{{ + ident: "v1", + expr: "sum(v1)", + }}, + + inComplex: []*Row{ + (&Row{}).WithValue("k1", 0, "g1").WithValue("k1", 1, "g2").WithValue("v1", 0, 10), + (&Row{}).WithValue("k1", 0, "g2").WithValue("v1", 0, 10), + }, + + out: []simpleRow{ + {"k1": "g1", "v1": float64(10)}, + {"k1": "g2", "v1": float64(20)}, + }, + + f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}}, + }, + } + batches := [][]testCase{ baseBehavior, filtering, sorting, exprGroups, nilValues, + multiValues, } for _, batch := range batches { for _, tc := range batch { t.Run(tc.name, func(t *testing.T) { bootstrapAggregate(t, func(ctx context.Context, t *testing.T, sa *Aggregate, b Buffer) { - for _, r := range tc.in { - require.NoError(t, b.Add(ctx, r)) + if len(tc.inComplex) > 0 { + for _, r := range tc.inComplex { + require.NoError(t, b.Add(ctx, r)) + } + } else { + for _, r := range tc.inSimple { + require.NoError(t, b.Add(ctx, r)) + } } sa.Ident = tc.name sa.SourceAttributes = saToMapping(tc.sourceAttributes...) @@ -1343,3 +1401,78 @@ func TestStepAggregate_paging(t *testing.T) { }) } } + +func TestAggregate_groupKeyWalker(t *testing.T) { + tcc := []struct { + name string + defs []*ql.ASTNode + in ValueGetter + out []groupKey + }{ + { + name: "all constants", + defs: []*ql.ASTNode{ + {Value: ql.MakeValueOf("Number", 10)}, + {Value: ql.MakeValueOf("Number", 20)}, + }, + in: (&Row{}).WithValue("k1", 0, "a"), + out: []groupKey{{float64(10), float64(20)}}, + }, + { + name: "multiple values 1 2 1", + defs: []*ql.ASTNode{ + {Symbol: "k1"}, + {Symbol: "k2"}, + {Symbol: "k3"}, + }, + in: (&Row{}).WithValue("k1", 0, "k1 1"). + WithValue("k2", 0, "k2 1"). + WithValue("k2", 1, "k2 2"). + WithValue("k3", 0, "k3 1"), + out: []groupKey{ + {"k1 1", "k2 1", "k3 1"}, + {"k1 1", "k2 2", "k3 1"}, + }, + }, + { + name: "single value with constant", + defs: []*ql.ASTNode{ + {Symbol: "k1"}, + {Value: ql.MakeValueOf("Number", 10)}, + }, + in: (&Row{}).WithValue("k1", 0, "k1 1"), + out: []groupKey{ + {"k1 1", float64(10)}, + }, + }, + { + name: "multi value with constant", + defs: []*ql.ASTNode{ + {Symbol: "k2"}, + {Value: ql.MakeValueOf("Number", 10)}, + }, + in: (&Row{}).WithValue("k2", 0, "k2 1"). + WithValue("k2", 1, "k2 2"), + out: []groupKey{ + {"k2 1", float64(10)}, + {"k2 2", float64(10)}, + }, + }, + } + + ctx := context.Background() + for _, tc := range tcc { + t.Run(tc.name, func(t *testing.T) { + w, err := aggregateGroupKeyWalker(tc.defs...) + require.NoError(t, err) + + i := 0 + w(ctx, tc.in, func(ctx context.Context, k groupKey, vg ValueGetter) error { + require.Equal(t, tc.out[i], k) + i++ + return nil + }) + require.Equal(t, len(tc.out), i) + }) + } +} diff --git a/pkg/dal/utils.go b/pkg/dal/utils.go index 7ec071af0..af04f5c5d 100644 --- a/pkg/dal/utils.go +++ b/pkg/dal/utils.go @@ -468,3 +468,29 @@ func indexAttrsInto(dst map[string]bool, aa ...AttributeMapping) { dst[a.Identifier()] = true } } + +// keysFromExpr returns all of the identifiers used in agg. group expressions +// +// The hasConstants return argument is true if any of the expressions returns a +// constant value, such as year(now()) or 42 +func keysFromExpr(nn ...*ql.ASTNode) (out []string, hasConstants bool) { + out = make([]string, 0, (len(nn)+1)*2) + auxOut := make(map[string]bool, (len(nn)+1)*2) + + for _, n := range nn { + symbols := n.CollectSymbols() + if len(symbols) == 0 { + hasConstants = true + } + + for _, s := range symbols { + auxOut[s] = true + } + } + + for k := range auxOut { + out = append(out, k) + } + + return +} diff --git a/pkg/ql/ast_nodes.go b/pkg/ql/ast_nodes.go index af67c19a1..9a7d57f42 100644 --- a/pkg/ql/ast_nodes.go +++ b/pkg/ql/ast_nodes.go @@ -118,6 +118,20 @@ func (t *typedValue) MarshalJSON() ([]byte, error) { return json.Marshal(aux) } +func (n *ASTNode) CollectSymbols() (out []string) { + out = make([]string, 0, 8) + + n.Traverse(func(a *ASTNode) (bool, *ASTNode, error) { + if a.Symbol != "" { + out = append(out, a.Symbol) + } + + return true, a, nil + }) + + return +} + // Traverse traverses the AST down to leaf nodes. // // If fnc. returns false, the traversal of the current branch ends.