3
0

Implement pipeline aggregate execution step

This commit is contained in:
Tomaž Jerman
2022-08-22 11:46:41 +02:00
parent f80a0b8107
commit 5b89af4e01
5 changed files with 1627 additions and 0 deletions

313
pkg/dal/aggregator.go Normal file
View File

@@ -0,0 +1,313 @@
package dal
import (
"context"
"fmt"
"math"
"github.com/cortezaproject/corteza-server/pkg/ql"
"github.com/spf13/cast"
)
type (
// aggregator performs value aggregation primarily used in the pipeline aggregate step
//
// The aggregator performs the requested value expressions as well as
// aggregation operations over the evaluated values.
//
// The aggregator computes the values on the fly (for the ones it can).
aggregator struct {
// aggregates holds output aggregation values
// @todo we'll use float64 for all values, but this is not the best way to do it (probably).
aggregates []float64
// counts holds the number of values for each aggregate including
// multi value fields.
// Counts are currently only used for average.
counts []int
// def provides a list of aggregates.
//
// Eac index corresponds to the aggregates and counts slices
def []aggregateDef
// scanned indicates whether the aggregator has been scanned as some ops
// must be blocked after the fact
scanned bool
}
// aggregateDef defines a single aggregate (output attribute when aggregating)
aggregateDef struct {
outIdent string
aggOp string
inIdent string
eval evaluator
}
)
var (
// aggregateFunctionIndex specifies all of the registered aggregate functions
//
// @todo consider making this expandable via some registry/plugin/...
aggregateFunctionIndex = map[string]bool{
"sum": true,
"min": true,
"max": true,
"avg": true,
}
)
// Aggregator initializes a new aggregator for the given set of mappings
//
// The aggregator is not routine safe; consider defining multiple aggregators
// and then combining them together.
func Aggregator(attributes ...AttributeMapping) (_ *aggregator, err error) {
// Convert attribute mappings into an internal structure
// @todo consider dropping this internal structure; not sure how much I like it
// but I think having something in between is best.
attrs := make([]aggregateDef, len(attributes))
for i, a := range attributes {
attrs[i], err = mappingToAggregateDef(a)
if err != nil {
return
}
}
return &aggregator{
def: attrs,
// init appropriate slices
aggregates: make([]float64, len(attributes)),
counts: make([]int, len(attributes)),
}, nil
}
// Aggregate aggregates the given value
func (a *aggregator) Aggregate(ctx context.Context, v ValueGetter) (err error) {
if a.scanned {
// If we attempt to add data to the aggregator, the previous data might
// no longer be valid.
return fmt.Errorf("cannot call Aggregate on an already scanned aggregator")
}
// @todo consider throwing all of the aggregates into a routine.
// I think (for now) performance gains will be negligible.
for i, attr := range a.def {
err = a.aggregate(ctx, attr, i, v)
if err != nil {
return
}
}
return nil
}
// Scan scans the aggregated values into the setter
func (a *aggregator) Scan(s ValueSetter) (err error) {
// On first scan, complete partial aggregates
if !a.scanned {
a.completePartials()
}
a.scanned = true
// Set the values
for i, attr := range a.def {
// @note each aggregated value can be at most one so no need for multi-value
// suport here.
err = s.SetValue(attr.outIdent, 0, a.aggregates[i])
if err != nil {
return
}
}
return
}
// aggregate applies the provided value into the requested aggregate
func (a *aggregator) aggregate(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
switch attr.aggOp {
case "sum":
return a.sum(ctx, attr, i, v)
case "min":
return a.min(ctx, attr, i, v)
case "max":
return a.max(ctx, attr, i, v)
case "avg":
return a.avg(ctx, attr, i, v)
}
return fmt.Errorf("unsupported aggregate function: %s", attr.aggOp)
}
// walkValues traverses the available values for the specified attribute
func (a *aggregator) walkValues(ctx context.Context, r ValueGetter, cc map[string]uint, attr aggregateDef, run func(v any)) (err error) {
if attr.inIdent == "" {
run(attr.eval.Eval(ctx, r))
return nil
}
for i := uint(0); i < cc[attr.inIdent]; i++ {
v, err := r.GetValue(attr.inIdent, i)
if err != nil {
return err
}
run(v)
}
return nil
}
// Aggregate methods
func (a *aggregator) sum(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
a.aggregates[i] += cast.ToFloat64(v)
a.counts[i]++
})
if err != nil {
return
}
return
}
func (a *aggregator) min(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
if a.counts[i] == 0 {
a.aggregates[i] = cast.ToFloat64(v)
} else {
a.aggregates[i] = math.Min(a.aggregates[i], cast.ToFloat64(v))
}
a.counts[i]++
})
if err != nil {
return
}
return
}
func (a *aggregator) max(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
if a.counts[i] == 0 {
a.aggregates[i] = cast.ToFloat64(v)
} else {
a.aggregates[i] = math.Max(a.aggregates[i], cast.ToFloat64(v))
}
a.counts[i]++
})
if err != nil {
return
}
return
}
func (a *aggregator) avg(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
a.aggregates[i] += cast.ToFloat64(v)
a.counts[i]++
})
if err != nil {
return
}
return
}
func (a *aggregator) completePartials() {
a.completeAverage()
}
func (a *aggregator) completeAverage() {
for i, attr := range a.def {
if attr.aggOp == "avg" {
if a.counts[i] == 0 {
return
}
a.aggregates[i] = a.aggregates[i] / float64(a.counts[i])
}
}
}
// Utilities
func mappingToAggregateDef(a AttributeMapping) (def aggregateDef, err error) {
def = aggregateDef{
outIdent: a.Identifier(),
}
inIdent, expr, err := unpackMappingSource(a)
if err != nil {
return
}
// Take it from the source
if inIdent != "" {
def.inIdent = inIdent
return
}
// Take it from the expression
// - agg. op.
def.aggOp, expr, err = unpackExpressionNode(expr)
if err != nil {
return
}
// - make evaluator
def.eval, err = newRunnerGvalParsed(expr)
if err != nil {
return
}
return
}
func unpackMappingSource(a AttributeMapping) (ident string, expr *ql.ASTNode, err error) {
base := a.Expression()
if base == "" {
ident = a.Identifier()
return
}
expr, err = newConverterGval().parser.Parse(base)
return
}
func unpackExpressionNode(n *ql.ASTNode) (aggOp string, expr *ql.ASTNode, err error) {
// @todo check for supported aggregators
if n.Ref != "" {
aggOp = n.Ref
}
if !aggregateFunctionIndex[aggOp] {
err = fmt.Errorf("root expression must be an aggregate function")
return
}
if len(n.Args) != 1 {
err = fmt.Errorf("impossible state: aggregate function must have exactly one argument")
return
}
expr = n.Args[0]
return
}
// reset is a benchmarking utility to reset the aggregator
//
// Don't use it in production code.
func (a *aggregator) reset() {
for i := 0; i < len(a.aggregates); i++ {
a.aggregates[i] = 0
a.counts[i] = 0
}
a.scanned = false
}

124
pkg/dal/aggregator_test.go Normal file
View File

@@ -0,0 +1,124 @@
package dal
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestAggregator(t *testing.T) {
tcc := []struct {
name string
rows []simpleRow
attrubutes []simpleAttribute
out simpleRow
}{
// Plain operations
{
name: "simple sum",
rows: []simpleRow{
{"v": 1},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"sum": float64(52.5)},
attrubutes: []simpleAttribute{
{ident: "sum", expr: "sum(v)"},
},
},
{
name: "simple min",
rows: []simpleRow{
{"v": 1},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"min": float64(1)},
attrubutes: []simpleAttribute{
{ident: "min", expr: "min(v)"},
},
},
{
name: "simple max",
rows: []simpleRow{
{"v": 1},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"max": float64(35)},
attrubutes: []simpleAttribute{
{ident: "max", expr: "max(v)"},
},
},
{
name: "simple avg",
rows: []simpleRow{
{"v": 1},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"avg": float64(13.125)},
attrubutes: []simpleAttribute{
{ident: "avg", expr: "avg(v)"},
},
},
// With a nested expression
{
name: "nested expression",
rows: []simpleRow{
{"v": 1},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"sum": float64(60.5)},
attrubutes: []simpleAttribute{
{ident: "sum", expr: "sum(v + 2)"},
},
},
}
ctx := context.Background()
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
agg, err := Aggregator(saToMapping(tc.attrubutes...)...)
require.NoError(t, err)
for _, r := range tc.rows {
require.NoError(t, agg.Aggregate(ctx, r))
}
out := make(simpleRow)
err = agg.Scan(out)
require.NoError(t, err)
require.Equal(t, tc.out, out)
})
}
}
func TestAggregatorInit(t *testing.T) {
t.Run("non supported agg. op.", func(t *testing.T) {
_, err := Aggregator(
simpleAttribute{
expr: "div(v)",
},
)
require.Error(t, err)
})
t.Run("invalid expression", func(t *testing.T) {
_, err := Aggregator(
simpleAttribute{
expr: "sum(q we)",
},
)
require.Error(t, err)
})
}

433
pkg/dal/exec_aggregate.go Normal file
View File

@@ -0,0 +1,433 @@
package dal
import (
"context"
"fmt"
"sort"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/tidwall/btree"
)
type (
aggregate struct {
def Aggregate
filter internalFilter
source Iterator
err error
scanRow *row
planned bool
rowTester tester
// Keep track of the registered groupIndex so we can easier implement the iterator.
// Simple index tracking won't be enough, because btree is self-balancing -- things change
//
// @todo worth considering using the inmembuffer and fork it around...
groupIndex *btree.Generic[*aggregateGroup]
groups []*aggregateGroup
i int
}
)
// init initializes the execution step's state
func (xs *aggregate) init(ctx context.Context) (err error) {
// Initialize state variables
xs.scanRow = xs.initScanRow()
xs.groupIndex = btree.NewGeneric[*aggregateGroup](xs.compareGroupKeys)
xs.groups = make([]*aggregateGroup, 0, 128)
xs.rowTester, err = prepareGenericRowTester(xs.def.Filter)
if err != nil {
return
}
// Apply the decided aggregation plan
return xs.applyPlan(ctx)
}
// // // // // // // // // // // // // // // // // // // // // // // // //
// Iterator methods
func (xs *aggregate) Next(ctx context.Context) (more bool) {
// Assure aggregation plan was taken into account
xs.err = xs.applyPlan(ctx)
if xs.err != nil {
return false
}
// Go next...
more, xs.err = xs.next(ctx)
return
}
// next prepares the next scannable row
//
// The method is adjusted based on the defined aggregation plan.
// The method performs appropriate filtering/sorting (if needed).
func (xs *aggregate) next(ctx context.Context) (more bool, err error) {
var g *aggregateGroup
for {
// Make sure it's cleared out and ready for fresh data
xs.scanRow.Reset()
// Make sure we have data
err = xs.prepareNext(ctx)
if err != nil {
return false, err
}
// Next group
g, err = xs.nextGroup(ctx)
if err != nil || g == nil {
return false, err
}
// Make sure the key is in there
// @todo potential optimization where we don't scan entire composited keys
err = xs.scanKey(g, xs.scanRow)
if err != nil {
return
}
// Scan the data
err = g.agg.Scan(xs.scanRow)
if err != nil {
return
}
// Check if we want to keep it
if !xs.keep(ctx, xs.scanRow) {
continue
}
break
}
return true, nil
}
func (xs *aggregate) More(limit uint, v ValueGetter) (err error) {
// Redo the cursor
xs.def.Filter.cursor, err = filter.PagingCursorFrom(xs.def.Filter.OrderBy(), v, xs.collectPrimaryAttributes()...)
if err != nil {
return
}
// Redo the row tester
xs.rowTester, err = prepareGenericRowTester(xs.def.Filter)
if err != nil {
return
}
// Redo the state
// @todo adjust based on aggregation plan; reuse buffered, etc.
xs.scanRow.Reset()
xs.groupIndex = btree.NewGeneric[*aggregateGroup](xs.compareGroupKeys)
xs.groups = make([]*aggregateGroup, 0, 128)
xs.planned = false
return
}
func (s *aggregate) Err() error { return s.err }
func (s *aggregate) Scan(dst ValueSetter) (err error) {
if s.i < 0 {
return fmt.Errorf("@todo err not initialized; next first")
}
var v any
for name, cc := range s.scanRow.CountValues() {
for i := uint(0); i < cc; i++ {
// omitting err here since it won't happen
v, _ = s.scanRow.GetValue(name, i)
err = dst.SetValue(name, i, v)
if err != nil {
return err
}
}
}
return nil
}
func (s *aggregate) Close() error {
if s.source != nil {
return s.source.Close()
}
return nil
}
func (s *aggregate) BackCursor(v ValueGetter) (*filter.PagingCursor, error) {
c, err := filter.PagingCursorFrom(s.def.Filter.OrderBy(), v, s.collectPrimaryAttributes()...)
if err != nil {
return nil, err
}
c.ROrder = true
c.LThen = s.def.Filter.OrderBy().Reversed()
return c, nil
}
func (s *aggregate) ForwardCursor(v ValueGetter) (*filter.PagingCursor, error) {
c, err := filter.PagingCursorFrom(s.def.Filter.OrderBy(), v, s.collectPrimaryAttributes()...)
if err != nil {
return nil, err
}
return c, nil
}
// // // // // // // // // // // // // // // // // // // // // // // // //
// Utilities
// applyPlan runs additional initialization code based on the defined aggregation plan
func (xs *aggregate) applyPlan(ctx context.Context) (err error) {
if xs.planned || xs.err != nil {
return xs.err
}
xs.planned = true
switch {
case !xs.def.plan.partialScan:
return xs.pullEntireSource(ctx)
}
return
}
func (xs *aggregate) prepareNext(ctx context.Context) (err error) {
// Pull next chunk from source if not entirely buffered
if xs.def.plan.partialScan {
err = xs.pullNextChunk(ctx)
if err != nil {
return
}
}
return
}
// pullEntireSource pulls the entire source into aggregators
//
// The entire source should be pulled when underlaying datasources can't provide
// more appropriately ordered data due to costs.
func (xs *aggregate) pullEntireSource(ctx context.Context) (err error) {
if xs.source == nil {
return fmt.Errorf("unable to pull data: no source defined")
}
// @todo consider pre-populating the hashmaps
r := &row{
counters: make(map[string]uint),
values: make(valueSet),
}
// Drain the source
for xs.source.Next(ctx) {
err = xs.source.Scan(r)
if err != nil {
return
}
// Get the key for this row
// @todo try to reuse key; probably a much simpler thing could work
k := make(groupKey, len(xs.def.Group))
err = xs.getGroupKey(ctx, r, k)
if err != nil {
return
}
// Add the row to the group
err = xs.addToGroup(ctx, k, r)
if err != nil {
return
}
r.Reset()
}
xs.err = xs.source.Err()
if xs.err != nil {
return xs.err
}
xs.sortGroups()
return
}
// pullNextChunk pulls the next chunk into the aggregators
//
// Source should be pulled in chunks when the underlaying datasource can provide
// more appropriately ordered data.
func (xs *aggregate) pullNextChunk(ctx context.Context) (err error) {
return fmt.Errorf("not implemented")
}
func (s *aggregate) addToGroup(ctx context.Context, key groupKey, r ValueGetter) (err error) {
if s.groupIndex == nil {
s.groupIndex = btree.NewGeneric[*aggregateGroup](s.compareGroupKeys)
}
// Try to get the existing one; if it doesn't exist, push a new one
// @todo this causes a bit of a memory pressure; investigate
g, ok := s.groupIndex.Get(&aggregateGroup{key: key})
if !ok {
g, err = s.wrapGroup(ctx, key)
if err != nil {
return err
}
s.groupIndex.Set(g)
s.groups = append(s.groups, g)
}
// if it exists/was added, add it to the aggregator
err = g.agg.Aggregate(ctx, r)
if err != nil {
return
}
return
}
// Group key comparator
// @todo can this be moved to the struct's Less method?
func (t *aggregate) compareGroupKeys(a, b *aggregateGroup) (out bool) {
for i := range a.key {
va := a.key[i]
vb := b.key[i]
out = out || compareValues(va, vb) < 0
}
return
}
func (s *aggregate) getGroupKey(ctx context.Context, r ValueGetter, key groupKey) (err error) {
for i, attr := range s.def.Group {
// @todo support expressions?
v, err := r.GetValue(attr.Expression(), 0)
if err != nil {
return err
}
// @todo multi-value support?
key[i] = v
}
return nil
}
func (s *aggregate) wrapGroup(ctx context.Context, key groupKey) (g *aggregateGroup, err error) {
agg, err := Aggregator(s.def.OutAttributes...)
if err != nil {
return
}
g = &aggregateGroup{
key: key,
agg: agg,
}
return
}
func (xs *aggregate) nextGroup(ctx context.Context) (_ *aggregateGroup, err error) {
if xs.i >= len(xs.groups) {
return
}
xs.i++
return xs.groups[xs.i-1], nil
}
func (s *aggregate) scanKey(g *aggregateGroup, dst *row) (err error) {
for i, attr := range s.def.Group {
// @todo multi value support?
dst.SetValue(attr.Identifier(), 0, g.key[i])
// omitting err; internal row won't raise them
}
return nil
}
func (s *aggregate) keep(ctx context.Context, r *row) bool {
if s.rowTester == nil {
return true
}
return s.rowTester.Test(ctx, r)
}
// Each group key is a PK candidate; all together form a composite key
func (s *aggregate) collectPrimaryAttributes() (out []string) {
out = make([]string, 0, 2)
for _, m := range s.def.Group {
out = append(out, m.Identifier())
}
return
}
func (s *aggregate) sortGroups() {
sort.SliceStable(s.groups, func(i, j int) bool {
ga := s.groups[i]
gb := s.groups[j]
var (
va any
vb any
)
for _, o := range s.def.Filter.OrderBy() {
x := inKeys(s.def.Group, o.Column)
if x > -1 {
va = ga.key[x]
} else {
x := inKeys(s.def.OutAttributes, o.Column)
va = ga.agg.aggregates[x]
}
x = inKeys(s.def.Group, o.Column)
if x > -1 {
vb = gb.key[x]
} else {
x := inKeys(s.def.OutAttributes, o.Column)
vb = gb.agg.aggregates[x]
}
cmp := compareValues(va, vb)
if cmp != 0 {
if o.Descending {
return cmp > 0
}
return cmp < 0
}
}
return false
})
}
func inKeys(kk []AttributeMapping, ident string) int {
for i, k := range kk {
if k.Identifier() == ident {
return i
}
}
return -1
}
func (xs *aggregate) initScanRow() (out *row) {
// base
out = &row{
counters: make(map[string]uint),
values: make(valueSet),
}
// pre-populate with known attrs
for _, attr := range append(xs.def.Group, xs.def.OutAttributes...) {
out.values[attr.Identifier()] = make([]any, 0, 2)
out.counters[attr.Identifier()] = 0
}
return
}

View File

@@ -0,0 +1,83 @@
package dal
import (
"context"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
)
func benchmarkExecAggregate(b *testing.B, n int) {
b.StopTimer()
ctx := context.Background()
group := []simpleAttribute{{ident: "g", expr: "g"}}
outAttributes := []simpleAttribute{{ident: "sum", expr: "sum(s1_v)"}}
sourceAttributes := []simpleAttribute{
{ident: "k"},
{ident: "g"},
{ident: "s1_v"},
}
ggs := []string{"a", "b", "c", "d", "e", "f", "g"}
// Inmem buffer for example
buff := InMemoryBuffer()
for i := 0; i < n; i++ {
require.NoError(b, buff.Add(ctx, simpleRow{"k": i + 1, "g": ggs[rand.Intn(len(ggs))], "s1_v": rand.Intn(200)}))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
def := Aggregate{
Ident: "agg",
Group: saToMapping(group...),
OutAttributes: saToMapping(outAttributes...),
SourceAttributes: saToMapping(sourceAttributes...),
}
b.StartTimer()
_, err := def.Initialize(ctx, buff)
require.NoError(b, err)
b.StopTimer()
buff.Seek(ctx, 0)
}
}
// goos: linux
// goarch: amd64
// pkg: github.com/cortezaproject/corteza-server/pkg/dal
// cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
// BenchmarkExecAggregate_20000-12 63 17902131 ns/op 1423343 B/op 80432 allocs/op
// BenchmarkExecAggregate_40000-12 33 36179481 ns/op 2783435 B/op 160432 allocs/op
// BenchmarkExecAggregate_60000-12 21 53681403 ns/op 4143471 B/op 240433 allocs/op
// BenchmarkExecAggregate_80000-12 15 72234005 ns/op 5503370 B/op 320432 allocs/op
// BenchmarkExecAggregate_100000-12 12 92145195 ns/op 6863674 B/op 400434 allocs/op
// BenchmarkExecAggregate_120000-12 10 109842507 ns/op 8223711 B/op 480433 allocs/op
// BenchmarkExecAggregate_140000-12 8 129250726 ns/op 9583616 B/op 560433 allocs/op
// BenchmarkExecAggregate_160000-12 7 144896012 ns/op 10943776 B/op 640434 allocs/op
// BenchmarkExecAggregate_180000-12 7 162705594 ns/op 12303718 B/op 720434 allocs/op
// BenchmarkExecAggregate_200000-12 6 181970479 ns/op 13663848 B/op 800434 allocs/op
// BenchmarkExecAggregate_220000-12 5 204168375 ns/op 15024054 B/op 880435 allocs/op
// BenchmarkExecAggregate_240000-12 5 218573247 ns/op 16383470 B/op 960432 allocs/op
// BenchmarkExecAggregate_260000-12 5 238612939 ns/op 17744016 B/op 1040435 allocs/op
// BenchmarkExecAggregate_280000-12 4 261442973 ns/op 19103558 B/op 1120433 allocs/op
// BenchmarkExecAggregate_300000-12 4 342276668 ns/op 20464244 B/op 1200436 allocs/op
func BenchmarkExecAggregate_20000(b *testing.B) { benchmarkExecAggregate(b, 20000) }
func BenchmarkExecAggregate_40000(b *testing.B) { benchmarkExecAggregate(b, 40000) }
func BenchmarkExecAggregate_60000(b *testing.B) { benchmarkExecAggregate(b, 60000) }
func BenchmarkExecAggregate_80000(b *testing.B) { benchmarkExecAggregate(b, 80000) }
func BenchmarkExecAggregate_100000(b *testing.B) { benchmarkExecAggregate(b, 100000) }
func BenchmarkExecAggregate_120000(b *testing.B) { benchmarkExecAggregate(b, 120000) }
func BenchmarkExecAggregate_140000(b *testing.B) { benchmarkExecAggregate(b, 140000) }
func BenchmarkExecAggregate_160000(b *testing.B) { benchmarkExecAggregate(b, 160000) }
func BenchmarkExecAggregate_180000(b *testing.B) { benchmarkExecAggregate(b, 180000) }
func BenchmarkExecAggregate_200000(b *testing.B) { benchmarkExecAggregate(b, 200000) }
func BenchmarkExecAggregate_220000(b *testing.B) { benchmarkExecAggregate(b, 220000) }
func BenchmarkExecAggregate_240000(b *testing.B) { benchmarkExecAggregate(b, 240000) }
func BenchmarkExecAggregate_260000(b *testing.B) { benchmarkExecAggregate(b, 260000) }
func BenchmarkExecAggregate_280000(b *testing.B) { benchmarkExecAggregate(b, 280000) }
func BenchmarkExecAggregate_300000(b *testing.B) { benchmarkExecAggregate(b, 300000) }

View File

@@ -0,0 +1,674 @@
package dal
import (
"context"
"testing"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/stretchr/testify/require"
)
func TestStepAggregate(t *testing.T) {
basicAttrs := []simpleAttribute{
{ident: "k1"},
{ident: "k2"},
{ident: "v1"},
{ident: "txt"},
}
tcc := []struct {
name string
group []simpleAttribute
outAttributes []simpleAttribute
sourceAttributes []simpleAttribute
in []simpleRow
out []simpleRow
f internalFilter
}{
// Basic behavior
{
name: "basic one key group",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "g1", "v1": 10, "txt": "foo"},
{"k1": "g1", "v1": 20, "txt": "fas"},
{"k1": "g2", "v1": 15, "txt": "bar"},
},
out: []simpleRow{
{"k1": "g1", "v1": float64(30)},
{"k1": "g2", "v1": float64(15)},
},
f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}},
},
{
name: "basic multi key group",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "a", "v1": float64(12)},
{"k1": "a", "k2": "b", "v1": float64(6)},
{"k1": "b", "k2": "a", "v1": float64(51)},
},
f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}, {Column: "k2"}}},
},
{
name: "basic expr in value aggregation",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(add(v1, 2))",
}},
in: []simpleRow{
{"k1": "g1", "v1": 10, "txt": "foo"},
{"k1": "g1", "v1": 20, "txt": "fas"},
{"k1": "g2", "v1": 15, "txt": "bar"},
},
out: []simpleRow{
{"k1": "g1", "v1": float64(34)},
{"k1": "g2", "v1": float64(17)},
},
f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}},
},
// Filtering
{
name: "filtering constraints single attr",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
},
},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "a", "v1": float64(12)},
{"k1": "a", "k2": "b", "v1": float64(6)},
},
f: internalFilter{
constraints: map[string][]any{"k1": {"a"}},
},
},
{
name: "filtering constraints multiple attrs",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
// ---
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "b", "v1": float64(6)},
},
f: internalFilter{
constraints: map[string][]any{"k1": {"a"}, "k2": {"b"}},
},
},
{
name: "filtering constraints single attr multiple options",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 3, "txt": "fas"},
{"k1": "c", "k2": "a", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "a", "v1": float64(10)},
{"k1": "a", "k2": "b", "v1": float64(2)},
{"k1": "b", "k2": "a", "v1": float64(3)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "k1"}, {Column: "k2"}},
constraints: map[string][]any{"k1": {"a", "b"}},
},
},
{
name: "filtering expression simple expression",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
// ---
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "a", "v1": float64(12)},
},
f: internalFilter{
expression: "v1 > 10 && v1 < 20",
},
},
{
name: "filtering expression constant true",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
// ---
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "a", "v1": float64(12)},
{"k1": "a", "k2": "b", "v1": float64(6)},
{"k1": "b", "k2": "a", "v1": float64(51)},
},
f: internalFilter{
expression: "true",
orderBy: filter.SortExprSet{{Column: "k1"}, {Column: "k2"}},
},
},
{
name: "filtering expression constant false",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
// ---
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out: []simpleRow{},
f: internalFilter{
expression: "false",
},
},
// Sorting
{
name: "sorting single key full key asc",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "v1": float64(12)},
{"k1": "b", "v1": float64(3)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "k1", Descending: false}},
},
},
{
name: "sorting single key full key dsc",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "b", "v1": float64(3)},
{"k1": "a", "v1": float64(12)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "k1", Descending: true}},
},
},
{
name: "sorting multiple key full key asc",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "c", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "a", "v1": float64(10)},
{"k1": "a", "k2": "b", "v1": float64(2)},
{"k1": "b", "k2": "c", "v1": float64(3)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "k1", Descending: false}, {Column: "k2", Descending: false}},
},
},
{
name: "sorting multiple key full key dsc",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "c", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "b", "k2": "c", "v1": float64(3)},
{"k1": "a", "k2": "b", "v1": float64(2)},
{"k1": "a", "k2": "a", "v1": float64(10)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "k1", Descending: true}, {Column: "k2", Descending: true}},
},
},
{
name: "sorting multiple key full key mixed",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "b", "v1": 2, "txt": "fas"},
{"k1": "b", "k2": "c", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "b", "v1": float64(2)},
{"k1": "a", "k2": "a", "v1": float64(10)},
{"k1": "b", "k2": "c", "v1": float64(3)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "k1", Descending: false}, {Column: "k2", Descending: true}},
},
},
}
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
bootstrapAggregate(t, func(ctx context.Context, t *testing.T, sa *Aggregate, b Buffer) {
for _, r := range tc.in {
require.NoError(t, b.Add(ctx, r))
}
sa.Ident = tc.name
sa.SourceAttributes = saToMapping(tc.sourceAttributes...)
sa.Group = saToMapping(tc.group...)
sa.OutAttributes = saToMapping(tc.outAttributes...)
sa.Filter = tc.f
aa, err := sa.Initialize(ctx, b)
require.NoError(t, err)
i := 0
for aa.Next(ctx) {
out := simpleRow{}
require.NoError(t, aa.Scan(out))
require.Equal(t, tc.out[i], out)
i++
}
require.NoError(t, aa.Err())
require.Equal(t, len(tc.out), i)
})
})
}
}
func TestStepAggregate_cursorCollect_forward(t *testing.T) {
tcc := []struct {
name string
ss filter.SortExprSet
in simpleRow
group []simpleAttribute
outAttributes []simpleAttribute
out func() *filter.PagingCursor
err bool
}{
{
name: "simple",
in: simpleRow{"pk1": 1, "f1": "v1"},
group: []simpleAttribute{{
ident: "pk1",
}},
outAttributes: []simpleAttribute{{
ident: "f1",
}},
out: func() *filter.PagingCursor {
pc := &filter.PagingCursor{}
pc.Set("pk1", 1, false)
return pc
},
},
}
for _, c := range tcc {
t.Run(c.name, func(t *testing.T) {
def := Aggregate{
Filter: internalFilter{
orderBy: c.ss,
},
Group: saToMapping(c.group...),
OutAttributes: saToMapping(c.outAttributes...),
}
out, err := (&aggregate{def: def}).ForwardCursor(c.in)
require.NoError(t, err)
require.Equal(t, c.out(), out)
})
}
}
func TestStepAggregate_cursorCollect_back(t *testing.T) {
tcc := []struct {
name string
ss filter.SortExprSet
in simpleRow
group []simpleAttribute
outAttributes []simpleAttribute
out func() *filter.PagingCursor
err bool
}{
{
name: "simple",
in: simpleRow{"pk1": 1, "f1": "v1"},
group: []simpleAttribute{{
ident: "pk1",
}},
outAttributes: []simpleAttribute{{
ident: "f1",
}},
out: func() *filter.PagingCursor {
pc := &filter.PagingCursor{}
pc.Set("pk1", 1, false)
pc.ROrder = true
return pc
},
},
}
for _, c := range tcc {
t.Run(c.name, func(t *testing.T) {
def := Aggregate{
Filter: internalFilter{
orderBy: c.ss,
},
Group: saToMapping(c.group...),
OutAttributes: saToMapping(c.outAttributes...),
}
out, err := (&aggregate{def: def}).BackCursor(c.in)
require.NoError(t, err)
require.Equal(t, c.out(), out)
})
}
}
func TestStepAggregate_more(t *testing.T) {
basicAttrs := []simpleAttribute{
{ident: "k1"},
{ident: "k2"},
{ident: "v1"},
{ident: "txt"},
}
tcc := []struct {
name string
in []simpleRow
group []simpleAttribute
outAttributes []simpleAttribute
sourceAttributes []simpleAttribute
def *Aggregate
out1 []simpleRow
out2 []simpleRow
}{
{
name: "multiple keys",
sourceAttributes: basicAttrs,
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
// ---
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out1: []simpleRow{
{"k1": "a", "k2": "a", "v1": float64(12)},
},
out2: []simpleRow{
{"k1": "a", "k2": "b", "v1": float64(6)},
{"k1": "b", "k2": "a", "v1": float64(51)},
},
def: &Aggregate{},
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "v1",
expr: "sum(v1)",
},
},
},
}
ctx := context.Background()
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
buff := InMemoryBuffer()
for _, r := range tc.in {
require.NoError(t, buff.Add(ctx, r))
}
d := tc.def
d.Group = saToMapping(tc.group...)
d.OutAttributes = saToMapping(tc.outAttributes...)
d.SourceAttributes = saToMapping(tc.sourceAttributes...)
for _, k := range tc.group {
d.Filter.orderBy = append(d.Filter.orderBy, &filter.SortExpr{Column: k.ident})
}
aa, err := d.Initialize(ctx, buff)
require.NoError(t, err)
require.True(t, aa.Next(ctx))
out := simpleRow{}
require.NoError(t, aa.Err())
require.NoError(t, aa.Scan(out))
require.Equal(t, tc.out1[0], out)
buff.Seek(ctx, 0)
require.NoError(t, aa.More(0, out))
i := 0
for aa.Next(ctx) {
out := simpleRow{}
require.NoError(t, aa.Err())
require.NoError(t, aa.Scan(out))
require.Equal(t, tc.out2[i], out)
i++
}
require.Equal(t, len(tc.out2), i)
})
}
}