3
0

Improve aggregate's multi-value support to match the legacy version

This commit is contained in:
Tomaž Jerman
2022-09-23 13:57:54 +02:00
parent a3831aaf87
commit 7e2fec6da1
4 changed files with 336 additions and 59 deletions

View File

@@ -24,7 +24,7 @@ type (
aggregateDefs []aggregateAttr
rowTester tester
keyMaker keyMaker
keyWalker keyWalker
// Keep track of the registered groupIndex so we can easier implement the iterator.
// Simple index tracking won't be enough, because btree is self-balancing -- things change
@@ -37,7 +37,7 @@ type (
ctr int
}
keyMaker func(context.Context, ValueGetter) (groupKey, error)
keyWalker func(context.Context, ValueGetter, func(context.Context, groupKey, ValueGetter) error) error
)
// init initializes the execution step's state
@@ -56,7 +56,8 @@ func (xs *aggregate) init(ctx context.Context) (err error) {
for _, a := range xs.groupDefs {
kk = append(kk, a.expr)
}
xs.keyMaker, err = aggregateGroupKeyMaker(kk...)
xs.keyWalker, err = aggregateGroupKeyWalker(kk...)
if err != nil {
return
}
@@ -257,7 +258,6 @@ func (xs *aggregate) pullEntireSource(ctx context.Context) (err error) {
}
// Drain the source
var k groupKey
for xs.source.Next(ctx) {
err = xs.source.Scan(r)
if err != nil {
@@ -267,13 +267,7 @@ func (xs *aggregate) pullEntireSource(ctx context.Context) (err error) {
// Get the key for this row
// @todo we probably can reuse the key or at least cache keys and avoid re-computation.
// My fairly hacky attempt boosted performance by ~20%
k, err = xs.keyMaker(ctx, r)
if err != nil {
return
}
// Add the row to the group
err = xs.addToGroup(ctx, k, r)
err = xs.keyWalker(ctx, r, xs.addToGroup)
if err != nil {
return
}
@@ -455,31 +449,141 @@ func (xs *aggregate) initScanRow() (out *Row) {
return
}
// aggregateGroupKeyMaker returns a function that computes a group key for the given row
func aggregateGroupKeyMaker(kk ...*ql.ASTNode) (out keyMaker, err error) {
runners := make([]*runnerGval, len(kk))
// Initialize evaluators for every key definition
// aggregateGroupKeyWalker prepares a function which runs the provided function over all group keys
//
// This is required to handle multi-value attributes when used in group keys.
//
// Algorithm TL;DR
//
// We're going backwards in the slice of idents we need to handle (going the other way around)
// should also work but I chose to do it like so.
//
// For every ident, keep track of the number of elements and the index of the last unhandled ident.
//
// For every iteration, collect all of the values pointed to by the unused index.
// After the key is processed, increment the counter for the current ident.
// If the counter exceeds the number of elements, reset the counter for the current ident and move
// the ident pointer backwards (repeat if that ident's counter also exceeds the limit).
//
// When we find an indent which still has some items to process (counter doesn't exceed limit),
// reset the ident pointer to the end of the slice and repeat the whole thing.
func aggregateGroupKeyWalker(kk ...*ql.ASTNode) (out keyWalker, err error) {
// @todo option to copy constants and idents
runners, err := makeExprRunners(kk...)
if err != nil {
return
}
// We'll sort the idents to keep the output consistent; the order doesn't matter
// but it will simplify testing.
idents, hasConstants := keysFromExpr(kk...)
sort.Strings(idents)
out = func(ctx context.Context, vg ValueGetter, run func(context.Context, groupKey, ValueGetter) error) error {
// Edgecase for when all of the expressions return constant values
if len(idents) == 0 {
// This should be impossible but better safe then sorry
if !hasConstants {
return nil
}
k, err := makeGroupKey(ctx, runners, vg)
if err != nil {
return err
}
return run(ctx, k, vg)
}
// For every value combination of idents used in agg. key expressions
// construct a key and run the runner.
limits := vg.CountValues()
ptr := len(idents) - 1
counts := make(map[string]uint, len(limits))
for k := range limits {
counts[k] = 0
}
handle := func() (err error) {
aux := make(map[string]any, len(limits))
for k, i := range counts {
aux[k], err = vg.GetValue(k, i)
if err != nil {
return
}
}
kk, err := makeGroupKey(ctx, runners, aux)
if err != nil {
return err
}
return run(ctx, kk, vg)
}
outer:
for ptr >= 0 {
handle()
counts[idents[ptr]]++
for {
// There are still values to process so we can skip the rest
if counts[idents[ptr]] < limits[idents[ptr]] {
continue outer
}
// Reset the counter for the current ptr since we'll move back.
// The outer loop will have this reset all of the counters after the current ptr.
counts[idents[ptr]] = 0
ptr--
if ptr < 0 {
break outer
}
counts[idents[ptr]]++
if counts[idents[ptr]] >= limits[idents[ptr]] {
continue
}
// We need to reset to the end so the next ident gets all of the values
// that appear after it.
ptr = len(idents) - 1
break
}
}
return nil
}
return
}
func makeExprRunners(kk ...*ql.ASTNode) (out []*runnerGval, err error) {
out = make([]*runnerGval, len(kk))
for i, k := range kk {
runners[i], err = newRunnerGvalParsed(k)
out[i], err = newRunnerGvalParsed(k)
if err != nil {
return
}
}
out = func(ctx context.Context, vg ValueGetter) (gk groupKey, err error) {
gk = make(groupKey, len(runners))
for i, r := range runners {
v, err := r.Eval(ctx, vg)
if err != nil {
return nil, err
}
gk[i] = v
}
return gk, nil
}
return
}
func makeGroupKey(ctx context.Context, runners []*runnerGval, vals any) (gk groupKey, err error) {
gk = make(groupKey, len(runners))
for i, r := range runners {
v, err := r.Eval(ctx, vals)
if err != nil {
return nil, err
}
gk[i] = v
}
return gk, nil
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/ql"
"github.com/stretchr/testify/require"
)
@@ -24,8 +25,9 @@ func TestStepAggregate(t *testing.T) {
outAttributes []simpleAttribute
sourceAttributes []simpleAttribute
in []simpleRow
out []simpleRow
inSimple []simpleRow
inComplex []*Row
out []simpleRow
f internalFilter
}
@@ -43,7 +45,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "g1", "v1": 10, "txt": "foo"},
{"k1": "g1", "v1": 20, "txt": "fas"},
{"k1": "g2", "v1": 15, "txt": "bar"},
@@ -68,7 +70,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "g1", "v1": 10, "txt": "foo"},
{"k1": "g1", "v1": 20, "txt": "fas"},
{"k1": "g2", "v1": 15, "txt": "bar"},
@@ -94,7 +96,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
@@ -124,7 +126,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(add(v1, 2))",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "g1", "v1": 10, "txt": "foo"},
{"k1": "g1", "v1": 20, "txt": "fas"},
{"k1": "g2", "v1": 15, "txt": "bar"},
@@ -154,7 +156,7 @@ func TestStepAggregate(t *testing.T) {
},
},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
@@ -186,7 +188,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
@@ -218,7 +220,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 3, "txt": "fas"},
@@ -249,7 +251,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
@@ -281,7 +283,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
@@ -313,7 +315,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
@@ -349,7 +351,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
@@ -380,7 +382,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
@@ -406,7 +408,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
@@ -432,7 +434,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
@@ -458,7 +460,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
@@ -486,7 +488,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "c", "v1": 3, "txt": "fas"},
@@ -515,7 +517,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "c", "v1": 3, "txt": "fas"},
@@ -544,7 +546,7 @@ func TestStepAggregate(t *testing.T) {
expr: "sum(v1)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "c", "v1": 3, "txt": "fas"},
@@ -578,7 +580,7 @@ func TestStepAggregate(t *testing.T) {
expr: "count(name)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"dob": "2022-10-20T09:44:49Z", "name": "Ana"},
{"dob": "2022-10-20T09:44:49Z", "name": "John"},
{"dob": "2021-10-20T09:44:49Z", "name": "Jane"},
@@ -608,7 +610,7 @@ func TestStepAggregate(t *testing.T) {
expr: "count(name)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"dob": "2022-10-20T09:44:49Z", "name": "Ana"},
{"dob": "2022-10-20T09:44:49Z", "name": "John"},
{"dob": "2021-10-20T09:44:49Z", "name": "Jane"},
@@ -638,7 +640,7 @@ func TestStepAggregate(t *testing.T) {
expr: "count(name)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"name": "Ana"},
{"name": "John"},
{"name": "Jane"},
@@ -666,7 +668,7 @@ func TestStepAggregate(t *testing.T) {
expr: "count(name)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"name": "Ana"},
{"name": "John"},
{"name": "Jane"},
@@ -695,7 +697,7 @@ func TestStepAggregate(t *testing.T) {
expr: "count(name)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"dob": "2022-10-20T09:44:49Z", "name": "Ana"},
{"dob": "2022-10-20T09:44:49Z", "name": "John"},
{"dob": "2021-10-20T09:44:49Z", "name": "Jane"},
@@ -727,7 +729,7 @@ func TestStepAggregate(t *testing.T) {
expr: "count(name)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"thing": "A", "name": "Ana"},
{"name": "John"},
{"name": "Jane"},
@@ -759,7 +761,7 @@ func TestStepAggregate(t *testing.T) {
expr: "count(name)",
}},
in: []simpleRow{
inSimple: []simpleRow{
{"thing": "A", "another": "A", "name": "Ana"},
{"thing": "A", "name": "Ana"},
{"another": "A", "name": "Ana"},
@@ -780,20 +782,76 @@ func TestStepAggregate(t *testing.T) {
},
}
multiValues := []testCase{
{
name: "multi value val",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
inComplex: []*Row{
(&Row{}).WithValue("k1", 0, "g1").WithValue("v1", 0, 10).WithValue("v1", 1, 10),
(&Row{}).WithValue("k1", 0, "g2").WithValue("v1", 0, 10),
},
out: []simpleRow{
{"k1": "g1", "v1": float64(20)},
{"k1": "g2", "v1": float64(10)},
},
f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}},
},
{
name: "multi value group",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
inComplex: []*Row{
(&Row{}).WithValue("k1", 0, "g1").WithValue("k1", 1, "g2").WithValue("v1", 0, 10),
(&Row{}).WithValue("k1", 0, "g2").WithValue("v1", 0, 10),
},
out: []simpleRow{
{"k1": "g1", "v1": float64(10)},
{"k1": "g2", "v1": float64(20)},
},
f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}},
},
}
batches := [][]testCase{
baseBehavior,
filtering,
sorting,
exprGroups,
nilValues,
multiValues,
}
for _, batch := range batches {
for _, tc := range batch {
t.Run(tc.name, func(t *testing.T) {
bootstrapAggregate(t, func(ctx context.Context, t *testing.T, sa *Aggregate, b Buffer) {
for _, r := range tc.in {
require.NoError(t, b.Add(ctx, r))
if len(tc.inComplex) > 0 {
for _, r := range tc.inComplex {
require.NoError(t, b.Add(ctx, r))
}
} else {
for _, r := range tc.inSimple {
require.NoError(t, b.Add(ctx, r))
}
}
sa.Ident = tc.name
sa.SourceAttributes = saToMapping(tc.sourceAttributes...)
@@ -1343,3 +1401,78 @@ func TestStepAggregate_paging(t *testing.T) {
})
}
}
func TestAggregate_groupKeyWalker(t *testing.T) {
tcc := []struct {
name string
defs []*ql.ASTNode
in ValueGetter
out []groupKey
}{
{
name: "all constants",
defs: []*ql.ASTNode{
{Value: ql.MakeValueOf("Number", 10)},
{Value: ql.MakeValueOf("Number", 20)},
},
in: (&Row{}).WithValue("k1", 0, "a"),
out: []groupKey{{float64(10), float64(20)}},
},
{
name: "multiple values 1 2 1",
defs: []*ql.ASTNode{
{Symbol: "k1"},
{Symbol: "k2"},
{Symbol: "k3"},
},
in: (&Row{}).WithValue("k1", 0, "k1 1").
WithValue("k2", 0, "k2 1").
WithValue("k2", 1, "k2 2").
WithValue("k3", 0, "k3 1"),
out: []groupKey{
{"k1 1", "k2 1", "k3 1"},
{"k1 1", "k2 2", "k3 1"},
},
},
{
name: "single value with constant",
defs: []*ql.ASTNode{
{Symbol: "k1"},
{Value: ql.MakeValueOf("Number", 10)},
},
in: (&Row{}).WithValue("k1", 0, "k1 1"),
out: []groupKey{
{"k1 1", float64(10)},
},
},
{
name: "multi value with constant",
defs: []*ql.ASTNode{
{Symbol: "k2"},
{Value: ql.MakeValueOf("Number", 10)},
},
in: (&Row{}).WithValue("k2", 0, "k2 1").
WithValue("k2", 1, "k2 2"),
out: []groupKey{
{"k2 1", float64(10)},
{"k2 2", float64(10)},
},
},
}
ctx := context.Background()
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
w, err := aggregateGroupKeyWalker(tc.defs...)
require.NoError(t, err)
i := 0
w(ctx, tc.in, func(ctx context.Context, k groupKey, vg ValueGetter) error {
require.Equal(t, tc.out[i], k)
i++
return nil
})
require.Equal(t, len(tc.out), i)
})
}
}

View File

@@ -468,3 +468,29 @@ func indexAttrsInto(dst map[string]bool, aa ...AttributeMapping) {
dst[a.Identifier()] = true
}
}
// keysFromExpr returns all of the identifiers used in agg. group expressions
//
// The hasConstants return argument is true if any of the expressions returns a
// constant value, such as year(now()) or 42
func keysFromExpr(nn ...*ql.ASTNode) (out []string, hasConstants bool) {
out = make([]string, 0, (len(nn)+1)*2)
auxOut := make(map[string]bool, (len(nn)+1)*2)
for _, n := range nn {
symbols := n.CollectSymbols()
if len(symbols) == 0 {
hasConstants = true
}
for _, s := range symbols {
auxOut[s] = true
}
}
for k := range auxOut {
out = append(out, k)
}
return
}

View File

@@ -118,6 +118,20 @@ func (t *typedValue) MarshalJSON() ([]byte, error) {
return json.Marshal(aux)
}
func (n *ASTNode) CollectSymbols() (out []string) {
out = make([]string, 0, 8)
n.Traverse(func(a *ASTNode) (bool, *ASTNode, error) {
if a.Symbol != "" {
out = append(out, a.Symbol)
}
return true, a, nil
})
return
}
// Traverse traverses the AST down to leaf nodes.
//
// If fnc. returns false, the traversal of the current branch ends.