3
0

Expand test suite, cleanup/refactor based on results

This commit is contained in:
Tomaž Jerman
2022-09-01 14:36:12 +02:00
parent 7e0d55dfa1
commit ccc1064fac
24 changed files with 2208 additions and 627 deletions

View File

@@ -6,11 +6,13 @@ import (
"math"
"github.com/cortezaproject/corteza-server/pkg/ql"
"github.com/modern-go/reflect2"
"github.com/spf13/cast"
)
type (
// aggregator performs value aggregation primarily used in the pipeline aggregate step
// aggregator performs value aggregation primarily used for the aggregate step
// @todo consider moving into a separate package
//
// The aggregator performs the requested value expressions as well as
// aggregation operations over the evaluated values.
@@ -18,11 +20,11 @@ type (
// The aggregator computes the values on the fly (for the ones it can).
aggregator struct {
// aggregates holds output aggregation values
// @todo we'll use float64 for all values, but this is not the best way to do it (probably).
// @note we'll use float64 for all values, but it might make more sense to split it up
// in the future. For now, it'll be ok.
aggregates []float64
// counts holds the number of values for each aggregate including
// multi value fields.
// counts holds the number of values for each aggregate including multi value fields.
// Counts are currently only used for average.
counts []int
@@ -30,12 +32,12 @@ type (
//
// Eac index corresponds to the aggregates and counts slices
def []aggregateDef
// scanned indicates whether the aggregator has been scanned as some ops
// must be blocked after the fact
// scanned indicates whether the aggregator has been scanned since
// we need to block writes after the first scan.
scanned bool
}
// aggregateDef defines a single aggregate (output attribute when aggregating)
// aggregateDef is a wrapper to outline some aggregate value
aggregateDef struct {
outIdent string
@@ -51,10 +53,11 @@ var (
//
// @todo consider making this expandable via some registry/plugin/...
aggregateFunctionIndex = map[string]bool{
"sum": true,
"min": true,
"max": true,
"avg": true,
"count": true,
"sum": true,
"min": true,
"max": true,
"avg": true,
}
)
@@ -129,6 +132,9 @@ func (a *aggregator) Scan(s ValueSetter) (err error) {
// aggregate applies the provided value into the requested aggregate
func (a *aggregator) aggregate(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
switch attr.aggOp {
case "count":
return a.count(ctx, attr, i, v)
case "sum":
return a.sum(ctx, attr, i, v)
@@ -146,9 +152,15 @@ func (a *aggregator) aggregate(ctx context.Context, attr aggregateDef, i int, v
}
// walkValues traverses the available values for the specified attribute
func (a *aggregator) walkValues(ctx context.Context, r ValueGetter, cc map[string]uint, attr aggregateDef, run func(v any)) (err error) {
func (a *aggregator) walkValues(ctx context.Context, r ValueGetter, cc map[string]uint, attr aggregateDef, run func(v any, isNil bool)) (err error) {
var out any
if attr.inIdent == "" {
run(attr.eval.Eval(ctx, r))
out, err = attr.eval.Eval(ctx, r)
if err != nil {
return
}
run(out, reflect2.IsNil(out))
return nil
}
@@ -157,7 +169,7 @@ func (a *aggregator) walkValues(ctx context.Context, r ValueGetter, cc map[strin
if err != nil {
return err
}
run(v)
run(v, reflect2.IsNil(v))
}
return nil
@@ -165,10 +177,28 @@ func (a *aggregator) walkValues(ctx context.Context, r ValueGetter, cc map[strin
// Aggregate methods
func (a *aggregator) sum(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
a.aggregates[i] += cast.ToFloat64(v)
func (a *aggregator) count(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any, isNil bool) {
if isNil {
return
}
a.aggregates[i]++
a.counts[i]++
})
if err != nil {
return
}
return
}
func (a *aggregator) sum(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any, isNil bool) {
if isNil {
return
}
a.aggregates[i] += cast.ToFloat64(v)
a.counts[i]++
})
if err != nil {
@@ -179,7 +209,11 @@ func (a *aggregator) sum(ctx context.Context, attr aggregateDef, i int, v ValueG
}
func (a *aggregator) min(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any, isNil bool) {
if isNil {
return
}
if a.counts[i] == 0 {
a.aggregates[i] = cast.ToFloat64(v)
} else {
@@ -195,7 +229,11 @@ func (a *aggregator) min(ctx context.Context, attr aggregateDef, i int, v ValueG
}
func (a *aggregator) max(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any, isNil bool) {
if isNil {
return
}
if a.counts[i] == 0 {
a.aggregates[i] = cast.ToFloat64(v)
} else {
@@ -211,7 +249,11 @@ func (a *aggregator) max(ctx context.Context, attr aggregateDef, i int, v ValueG
}
func (a *aggregator) avg(ctx context.Context, attr aggregateDef, i int, v ValueGetter) (err error) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any) {
err = a.walkValues(ctx, v, v.CountValues(), attr, func(v any, isNil bool) {
if isNil {
return
}
a.aggregates[i] += cast.ToFloat64(v)
a.counts[i]++
@@ -240,6 +282,8 @@ func (a *aggregator) completeAverage() {
// Utilities
// mappingToAggregateDef constructs an aggregate definition from the provided mapping
// @note this will probably change when I change this AttributeMapping thing
func mappingToAggregateDef(a AttributeMapping) (def aggregateDef, err error) {
def = aggregateDef{
outIdent: a.Identifier(),
@@ -278,7 +322,8 @@ func unpackMappingSource(a AttributeMapping) (ident string, expr *ql.ASTNode, er
return
}
expr, err = newConverterGval().parser.Parse(base)
// @note the converter is being reused so this is ok
expr, err = newConverterGval().Parse(base)
return
}

View File

@@ -16,6 +16,32 @@ func TestAggregator(t *testing.T) {
out simpleRow
}{
// Plain operations
{
name: "simple count",
rows: []simpleRow{
{"v": 1},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"count": float64(4)},
attrubutes: []simpleAttribute{
{ident: "count", expr: "count(v)"},
},
},
{
name: "simple count nulls",
rows: []simpleRow{
{"v": nil},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"count": float64(3)},
attrubutes: []simpleAttribute{
{ident: "count", expr: "count(v)"},
},
},
{
name: "simple sum",
rows: []simpleRow{
@@ -29,6 +55,19 @@ func TestAggregator(t *testing.T) {
{ident: "sum", expr: "sum(v)"},
},
},
{
name: "simple sum nulls",
rows: []simpleRow{
{"v": nil},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"sum": float64(51.5)},
attrubutes: []simpleAttribute{
{ident: "sum", expr: "sum(v)"},
},
},
{
name: "simple min",
rows: []simpleRow{
@@ -42,6 +81,19 @@ func TestAggregator(t *testing.T) {
{ident: "min", expr: "min(v)"},
},
},
{
name: "simple min nulls",
rows: []simpleRow{
{"v": nil},
{"v": 5},
{"v": 35},
{"v": 11.5},
},
out: simpleRow{"min": float64(5)},
attrubutes: []simpleAttribute{
{ident: "min", expr: "min(v)"},
},
},
{
name: "simple max",
rows: []simpleRow{
@@ -55,6 +107,19 @@ func TestAggregator(t *testing.T) {
{ident: "max", expr: "max(v)"},
},
},
{
name: "simple max nulls",
rows: []simpleRow{
{"v": 1},
{"v": 5},
{"v": nil},
{"v": 11.5},
},
out: simpleRow{"max": float64(11.5)},
attrubutes: []simpleAttribute{
{ident: "max", expr: "max(v)"},
},
},
{
name: "simple avg",
rows: []simpleRow{
@@ -68,8 +133,22 @@ func TestAggregator(t *testing.T) {
{ident: "avg", expr: "avg(v)"},
},
},
{
name: "simple avg nulls",
rows: []simpleRow{
{"v": nil},
{"v": 5},
{"v": 35},
{"v": nil},
},
out: simpleRow{"avg": float64(20)},
attrubutes: []simpleAttribute{
{ident: "avg", expr: "avg(v)"},
},
},
// With a nested expression
// @todo tests to assure nil values; omitting due to the gval issue
{
name: "nested expression",
rows: []simpleRow{

View File

@@ -130,18 +130,18 @@ type (
const (
AttributeTypeID AttributeType = "corteza::dal:attribute-type:id"
AttributeTypeRef = "corteza::dal:attribute-type:ref"
AttributeTypeTimestamp = "corteza::dal:attribute-type:timestamp"
AttributeTypeTime = "corteza::dal:attribute-type:time"
AttributeTypeDate = "corteza::dal:attribute-type:date"
AttributeTypeNumber = "corteza::dal:attribute-type:number"
AttributeTypeText = "corteza::dal:attribute-type:text"
AttributeTypeBoolean = "corteza::dal:attribute-type:boolean"
AttributeTypeEnum = "corteza::dal:attribute-type:enum"
AttributeTypeGeometry = "corteza::dal:attribute-type:geometry"
AttributeTypeJSON = "corteza::dal:attribute-type:json"
AttributeTypeBlob = "corteza::dal:attribute-type:blob"
AttributeTypeUUID = "corteza::dal:attribute-type:uuid"
AttributeTypeRef AttributeType = "corteza::dal:attribute-type:ref"
AttributeTypeTimestamp AttributeType = "corteza::dal:attribute-type:timestamp"
AttributeTypeTime AttributeType = "corteza::dal:attribute-type:time"
AttributeTypeDate AttributeType = "corteza::dal:attribute-type:date"
AttributeTypeNumber AttributeType = "corteza::dal:attribute-type:number"
AttributeTypeText AttributeType = "corteza::dal:attribute-type:text"
AttributeTypeBoolean AttributeType = "corteza::dal:attribute-type:boolean"
AttributeTypeEnum AttributeType = "corteza::dal:attribute-type:enum"
AttributeTypeGeometry AttributeType = "corteza::dal:attribute-type:geometry"
AttributeTypeJSON AttributeType = "corteza::dal:attribute-type:json"
AttributeTypeBlob AttributeType = "corteza::dal:attribute-type:blob"
AttributeTypeUUID AttributeType = "corteza::dal:attribute-type:uuid"
)
func (t TypeID) IsNullable() bool { return t.Nullable }

View File

@@ -2,8 +2,6 @@ package dal
import (
"context"
"fmt"
"strings"
"github.com/cortezaproject/corteza-server/pkg/filter"
)
@@ -18,6 +16,7 @@ type (
}
// Buffer provides a place where you can buffer the data provided by DAL
// @note Buffers are currently primarily used for testing
Buffer interface {
// Seek moves the index pointer to the specified location
// After the Seek call, a Next() call is required
@@ -34,199 +33,4 @@ type (
// Add adds a new ValueGetter to the buffer
Add(context.Context, ValueGetter) (err error)
}
Row struct {
counters map[string]uint
values valueSet
// ...
// Metadata to make it easier to work with
// @todo add when needed
}
valueSet map[string][]any
)
func (r Row) SelectGVal(ctx context.Context, k string) (interface{}, error) {
if r.values[k] == nil {
return nil, nil
}
if len(r.values[k]) == 0 {
return nil, nil
}
o := r.values[k][0]
return o, nil
}
func (r *Row) Reset() {
for k := range r.counters {
r.counters[k] = 0
}
}
func (r *Row) SetValue(name string, pos uint, v any) error {
if r.values == nil {
r.values = make(valueSet)
}
if r.counters == nil {
r.counters = make(map[string]uint)
}
// Make sure there is space for it
// @note benchmarking proves that the rest of the function introduces
// a lot of memory pressure.
// Investigate options on reworking this/reducing allocations.
if int(pos)+1 > len(r.values[name]) {
r.values[name] = append(r.values[name], make([]any, (int(pos)+1)-len(r.values[name]))...)
}
r.values[name][pos] = v
if pos >= r.counters[name] {
r.counters[name]++
}
return nil
}
// WithValue is a simple helper to construct rows with populated values
// The main use is for tests so restrain from using it in code.
func (r *Row) WithValue(name string, pos uint, v any) *Row {
err := r.SetValue(name, pos, v)
if err != nil {
panic(err)
}
return r
}
func (r *Row) CountValues() map[string]uint {
return r.counters
}
func (r *Row) GetValue(name string, pos uint) (any, error) {
if r.values == nil {
return nil, nil
}
if r.counters == nil {
return nil, nil
}
if pos >= r.counters[name] {
return nil, nil
}
return r.values[name][pos], nil
}
func (r *Row) String() string {
out := make([]string, 0, 20)
for k, vv := range r.values {
for i, v := range vv {
out = append(out, fmt.Sprintf("%s [%d] %v", k, i, v))
}
}
return strings.Join(out, " | ")
}
func (r Row) Copy() *Row {
out := &r
out.values = out.values.Copy()
return out
}
func (vv valueSet) Copy() valueSet {
out := make(valueSet)
for n, vv := range vv {
out[n] = vv
}
return out
}
func mergeRows(mapping []AttributeMapping, dst *Row, ss ...*Row) (err error) {
if len(mapping) == 0 {
return mergeRowsFull(dst, ss...)
}
return mergeRowsMapped(mapping, dst, ss...)
}
func mergeRowsFull(dst *Row, rows ...*Row) (err error) {
for _, r := range rows {
for name, vv := range r.values {
for i, values := range vv {
if dst.values == nil {
dst.values = make(valueSet)
dst.counters = make(map[string]uint)
}
if i == 0 {
dst.values[name] = make([]any, len(vv))
dst.counters[name] = 0
}
err = dst.SetValue(name, uint(i), values)
if err != nil {
return
}
}
}
}
return
}
func mergeRowsMapped(mapping []AttributeMapping, out *Row, rows ...*Row) (err error) {
for _, mp := range mapping {
name := mp.Source()
for _, r := range rows {
if r.values[name] != nil {
if out.values == nil {
out.values = make(valueSet)
out.counters = make(map[string]uint)
}
out.values[mp.Identifier()] = r.values[name]
out.counters[mp.Identifier()] = r.counters[name]
break
}
}
}
return
}
// makeRowComparator is a utility for easily making a row comparator for
// the given sort expression
func makeRowComparator(ss ...*filter.SortExpr) func(a, b *Row) bool {
return func(a, b *Row) bool {
for _, s := range ss {
cmp := compareGetters(a, b, a.counters, b.counters, s.Column)
less, skip := evalCmpResult(cmp, s)
if !skip {
return less
}
}
return false
}
}
func evalCmpResult(cmp int, s *filter.SortExpr) (less, skip bool) {
if cmp != 0 {
if s.Descending {
return cmp > 0, false
}
return cmp < 0, false
}
return false, true
}

View File

@@ -1,74 +0,0 @@
package dal
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMergeRows(t *testing.T) {
tcc := []struct {
name string
a *Row
b *Row
mapping []AttributeMapping
out *Row
}{{
name: "full merge; no mapping",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
out: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello").WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
}, {
name: "full merge; no mapping; collision",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr2", 0, true).WithValue("attr3", 0, "ee").WithValue("attr3", 1, 25),
out: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, true).WithValue("attr3", 0, "ee").WithValue("attr3", 1, 25),
},
{
name: "mapped merge",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
out: (&Row{}).WithValue("a", 0, 10).WithValue("b", 0, "hi").WithValue("b", 1, "hello").WithValue("c", 0, true).WithValue("d", 0, "ee").WithValue("d", 1, 25),
mapping: saToMapping([]simpleAttribute{{
ident: "a",
source: "attr1",
}, {
ident: "b",
source: "attr2",
}, {
ident: "c",
source: "attr3",
}, {
ident: "d",
source: "attr4",
}}...),
}, {
name: "mapped merge with conflicts",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
out: (&Row{}).WithValue("a", 0, 10).WithValue("b", 0, true).WithValue("c", 0, "ee").WithValue("c", 1, 25),
mapping: saToMapping([]simpleAttribute{{
ident: "a",
source: "attr1",
}, {
ident: "b",
source: "attr2",
}, {
ident: "b",
source: "attr3",
}, {
ident: "c",
source: "attr4",
}}...),
}}
for _, c := range tcc {
t.Run(c.name, func(t *testing.T) {
out := &Row{}
err := mergeRows(c.mapping, out, c.a, c.b)
require.NoError(t, err)
require.Equal(t, c.out, out)
})
}
}

View File

@@ -96,7 +96,11 @@ func (xs *aggregate) next(ctx context.Context) (more bool, err error) {
}
// Check if we want to keep it
if !xs.keep(ctx, xs.scanRow) {
k, err := xs.keep(ctx, xs.scanRow)
if err != nil {
return false, err
}
if !k {
continue
}
@@ -125,6 +129,7 @@ func (xs *aggregate) More(limit uint, v ValueGetter) (err error) {
xs.groupIndex = btree.NewGeneric[*aggregateGroup](xs.compareGroupKeys)
xs.groups = make([]*aggregateGroup, 0, 128)
xs.planned = false
xs.i = 0
return
}
@@ -304,15 +309,27 @@ func (t *aggregate) compareGroupKeys(a, b *aggregateGroup) (out bool) {
}
func (s *aggregate) getGroupKey(ctx context.Context, r ValueGetter, key groupKey) (err error) {
var out any
for i, attr := range s.def.Group {
// @todo support expressions?
v, err := r.GetValue(attr.Expression(), 0)
if err != nil {
return err
expr := attr.Expression()
if expr != "" {
rnr, err := newRunnerGval(attr.Expression())
if err != nil {
return err
}
out, err = rnr.Eval(ctx, r)
if err != nil {
return err
}
} else {
out, _ = r.GetValue(attr.Identifier(), 0)
}
// @todo multi-value support?
key[i] = v
key[i] = out
}
return nil
@@ -351,9 +368,9 @@ func (s *aggregate) scanKey(g *aggregateGroup, dst *Row) (err error) {
return nil
}
func (s *aggregate) keep(ctx context.Context, r *Row) bool {
func (s *aggregate) keep(ctx context.Context, r *Row) (bool, error) {
if s.rowTester == nil {
return true
return true, nil
}
return s.rowTester.Test(ctx, r)
}

View File

@@ -16,18 +16,22 @@ func TestStepAggregate(t *testing.T) {
{ident: "txt"},
}
tcc := []struct {
name string
type (
testCase struct {
name string
group []simpleAttribute
outAttributes []simpleAttribute
sourceAttributes []simpleAttribute
group []simpleAttribute
outAttributes []simpleAttribute
sourceAttributes []simpleAttribute
in []simpleRow
out []simpleRow
in []simpleRow
out []simpleRow
f internalFilter
}{
f internalFilter
}
)
baseBehavior := []testCase{
// Basic behavior
{
name: "basic one key group",
@@ -53,6 +57,31 @@ func TestStepAggregate(t *testing.T) {
f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}},
},
{
name: "basic one key group rename values",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "key_one",
source: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "something_something",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "g1", "v1": 10, "txt": "foo"},
{"k1": "g1", "v1": 20, "txt": "fas"},
{"k1": "g2", "v1": 15, "txt": "bar"},
},
out: []simpleRow{
{"key_one": "g1", "something_something": float64(30)},
{"key_one": "g2", "something_something": float64(15)},
},
f: internalFilter{orderBy: filter.SortExprSet{{Column: "key_one"}}},
},
{
name: "basic multi key group",
sourceAttributes: basicAttrs,
@@ -109,8 +138,9 @@ func TestStepAggregate(t *testing.T) {
f: internalFilter{orderBy: filter.SortExprSet{{Column: "k1"}}},
},
}
// Filtering
filtering := []testCase{
{
name: "filtering constraints single attr",
sourceAttributes: basicAttrs,
@@ -239,6 +269,38 @@ func TestStepAggregate(t *testing.T) {
expression: "v1 > 10 && v1 < 20",
},
},
{
name: "filtering expression check renamed aggregate",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}, {
ident: "k2",
}},
outAttributes: []simpleAttribute{{
ident: "some_sum_value",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "k2": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "k2": "a", "v1": 2, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
{"k1": "a", "k2": "b", "v1": 3, "txt": "fas"},
// ---
{"k1": "b", "k2": "a", "v1": 20, "txt": "fas"},
{"k1": "b", "k2": "a", "v1": 31, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "k2": "a", "some_sum_value": float64(12)},
},
f: internalFilter{
expression: "some_sum_value > 10 && some_sum_value < 20",
},
},
{
name: "filtering expression constant true",
sourceAttributes: basicAttrs,
@@ -305,8 +367,9 @@ func TestStepAggregate(t *testing.T) {
expression: "false",
},
},
}
// Sorting
sorting := []testCase{
{
name: "sorting single key full key asc",
sourceAttributes: basicAttrs,
@@ -333,6 +396,58 @@ func TestStepAggregate(t *testing.T) {
orderBy: filter.SortExprSet{{Column: "k1", Descending: false}},
},
},
{
name: "sorting single aggregate full asc",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "some_sum_value",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "b", "some_sum_value": float64(3)},
{"k1": "a", "some_sum_value": float64(12)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "some_sum_value", Descending: false}},
},
},
{
name: "sorting single aggregate full desc",
sourceAttributes: basicAttrs,
group: []simpleAttribute{{
ident: "k1",
}},
outAttributes: []simpleAttribute{{
ident: "some_sum_value",
expr: "sum(v1)",
}},
in: []simpleRow{
{"k1": "a", "v1": 10, "txt": "foo"},
{"k1": "a", "v1": 2, "txt": "fas"},
{"k1": "b", "v1": 3, "txt": "fas"},
},
out: []simpleRow{
{"k1": "a", "some_sum_value": float64(12)},
{"k1": "b", "some_sum_value": float64(3)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "some_sum_value", Descending: true}},
},
},
{
name: "sorting single key full key dsc",
sourceAttributes: basicAttrs,
@@ -448,34 +563,262 @@ func TestStepAggregate(t *testing.T) {
},
}
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
bootstrapAggregate(t, func(ctx context.Context, t *testing.T, sa *Aggregate, b Buffer) {
for _, r := range tc.in {
require.NoError(t, b.Add(ctx, r))
}
sa.Ident = tc.name
sa.SourceAttributes = saToMapping(tc.sourceAttributes...)
sa.Group = saToMapping(tc.group...)
sa.OutAttributes = saToMapping(tc.outAttributes...)
sa.filter = tc.f
exprGroups := []testCase{
{
name: "expression as key year",
sourceAttributes: []simpleAttribute{
{ident: "dob"},
{ident: "name"},
},
group: []simpleAttribute{{
ident: "dob_y",
expr: "year(dob)",
}},
outAttributes: []simpleAttribute{{
ident: "users",
expr: "count(name)",
}},
err := sa.init(ctx)
require.NoError(t, err)
aa, err := sa.exec(ctx, b)
require.NoError(t, err)
in: []simpleRow{
{"dob": "2022-10-20T09:44:49Z", "name": "Ana"},
{"dob": "2022-10-20T09:44:49Z", "name": "John"},
{"dob": "2021-10-20T09:44:49Z", "name": "Jane"},
},
i := 0
for aa.Next(ctx) {
out := simpleRow{}
require.NoError(t, aa.Scan(out))
require.Equal(t, tc.out[i], out)
i++
}
require.NoError(t, aa.Err())
require.Equal(t, len(tc.out), i)
out: []simpleRow{
{"dob_y": 2021, "users": float64(1)},
{"dob_y": 2022, "users": float64(2)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "dob_y", Descending: false}, {Column: "k2", Descending: true}},
},
},
{
name: "expression as key year with calc",
sourceAttributes: []simpleAttribute{
{ident: "dob"},
{ident: "name"},
},
group: []simpleAttribute{{
ident: "dob_y",
expr: "year(dob)/10",
}},
outAttributes: []simpleAttribute{{
ident: "users",
expr: "count(name)",
}},
in: []simpleRow{
{"dob": "2022-10-20T09:44:49Z", "name": "Ana"},
{"dob": "2022-10-20T09:44:49Z", "name": "John"},
{"dob": "2021-10-20T09:44:49Z", "name": "Jane"},
},
out: []simpleRow{
{"dob_y": 202.1, "users": float64(1)},
{"dob_y": 202.2, "users": float64(2)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "dob_y", Descending: false}, {Column: "k2", Descending: true}},
},
},
{
name: "same group expression",
sourceAttributes: []simpleAttribute{
{ident: "name"},
},
group: []simpleAttribute{{
ident: "d",
// @note will only run for a year then will need to be changed
expr: "year(now())",
}},
outAttributes: []simpleAttribute{{
ident: "users",
expr: "count(name)",
}},
in: []simpleRow{
{"name": "Ana"},
{"name": "John"},
{"name": "Jane"},
},
out: []simpleRow{
{"d": 2022, "users": float64(3)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "dob_y", Descending: false}, {Column: "k2", Descending: true}},
},
},
{
name: "same group constant",
sourceAttributes: []simpleAttribute{
{ident: "name"},
},
group: []simpleAttribute{{
ident: "d",
expr: "'a'",
}},
outAttributes: []simpleAttribute{{
ident: "users",
expr: "count(name)",
}},
in: []simpleRow{
{"name": "Ana"},
{"name": "John"},
{"name": "Jane"},
},
out: []simpleRow{
{"d": "a", "users": float64(3)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "dob_y", Descending: false}, {Column: "k2", Descending: true}},
},
},
{
name: "expression as key concatenated",
sourceAttributes: []simpleAttribute{
{ident: "dob"},
{ident: "name"},
},
group: []simpleAttribute{{
ident: "dob_y",
expr: "concat(string(year(dob)), '-', string(month(dob)))",
}},
outAttributes: []simpleAttribute{{
ident: "users",
expr: "count(name)",
}},
in: []simpleRow{
{"dob": "2022-10-20T09:44:49Z", "name": "Ana"},
{"dob": "2022-10-20T09:44:49Z", "name": "John"},
{"dob": "2021-10-20T09:44:49Z", "name": "Jane"},
},
out: []simpleRow{
{"dob_y": "2021-10", "users": float64(1)},
{"dob_y": "2022-10", "users": float64(2)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "dob_y", Descending: false}, {Column: "k2", Descending: true}},
},
},
}
nilValues := []testCase{
{
name: "nil in group key single value",
sourceAttributes: []simpleAttribute{
{ident: "thing"},
{ident: "name"},
},
group: []simpleAttribute{{
ident: "thing",
}},
outAttributes: []simpleAttribute{{
ident: "users",
expr: "count(name)",
}},
in: []simpleRow{
{"thing": "A", "name": "Ana"},
{"name": "John"},
{"name": "Jane"},
},
out: []simpleRow{
{"thing": nil, "users": float64(2)},
{"thing": "A", "users": float64(1)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "thing", Descending: false}},
},
},
{
name: "nil in group key multiple value",
sourceAttributes: []simpleAttribute{
{ident: "thing"},
{ident: "another"},
{ident: "name"},
},
group: []simpleAttribute{{
ident: "thing",
}, {
ident: "another",
}},
outAttributes: []simpleAttribute{{
ident: "users",
expr: "count(name)",
}},
in: []simpleRow{
{"thing": "A", "another": "A", "name": "Ana"},
{"thing": "A", "name": "Ana"},
{"another": "A", "name": "Ana"},
{"name": "John"},
{"name": "Jane"},
},
out: []simpleRow{
{"thing": nil, "another": nil, "users": float64(2)},
{"thing": nil, "another": "A", "users": float64(1)},
{"thing": "A", "another": nil, "users": float64(1)},
{"thing": "A", "another": "A", "users": float64(1)},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "thing", Descending: false}, {Column: "another", Descending: false}},
},
},
}
batches := [][]testCase{
baseBehavior,
filtering,
sorting,
exprGroups,
nilValues,
}
for _, batch := range batches {
for _, tc := range batch {
t.Run(tc.name, func(t *testing.T) {
bootstrapAggregate(t, func(ctx context.Context, t *testing.T, sa *Aggregate, b Buffer) {
for _, r := range tc.in {
require.NoError(t, b.Add(ctx, r))
}
sa.Ident = tc.name
sa.SourceAttributes = saToMapping(tc.sourceAttributes...)
sa.Group = saToMapping(tc.group...)
sa.OutAttributes = saToMapping(tc.outAttributes...)
sa.filter = tc.f
err := sa.init(ctx)
require.NoError(t, err)
aa, err := sa.exec(ctx, b)
require.NoError(t, err)
i := 0
for aa.Next(ctx) {
out := simpleRow{}
require.NoError(t, aa.Scan(out))
require.Equal(t, tc.out[i], out)
i++
}
require.NoError(t, aa.Err())
require.Equal(t, len(tc.out), i)
})
})
})
}
}
}
@@ -672,6 +1015,7 @@ func TestStepAggregate_more(t *testing.T) {
i++
}
require.NoError(t, aa.Err())
require.Equal(t, len(tc.out2), i)
})
}

View File

@@ -27,7 +27,7 @@ type (
leftSource Iterator
rightSource Iterator
err error
scanRow *Row
scanRow ValueGetter
planned bool
filtered bool
@@ -48,7 +48,7 @@ type (
// @todo consider a generic slice for cases when sorting is not needed.
// This will probably save up on memory/time since we don't even need
// to pull everything.
outSorted *btree.Generic[*Row]
outSorted *btree.Generic[ValueGetter]
i int
}
)
@@ -66,7 +66,7 @@ func (xs *joinLeft) init(ctx context.Context) (err error) {
// Enabling locks does have a performance impact so you might be better off by
// constructing multiple of these but then you'll also need to complicate
// the .Next methods a bit.
xs.outSorted = btree.NewGenericOptions[*Row](makeRowComparator(xs.filter.OrderBy()...), btree.Options{NoLocks: true})
xs.outSorted = btree.NewGenericOptions[ValueGetter](makeRowComparator(xs.filter.OrderBy()...), btree.Options{NoLocks: true})
xs.joinLeftAttr = xs.def.LeftAttributes[xs.leftAttrIndex[xs.def.On.Left]]
xs.joinRightAttr = xs.def.RightAttributes[xs.rightAttrIndex[xs.def.On.Right]]
@@ -99,7 +99,7 @@ func (xs *joinLeft) More(limit uint, v ValueGetter) (err error) {
// Redo the state
// @todo adjust based on aggregation plan; reuse buffered, etc.
xs.relIndex = newRelIndex()
xs.outSorted = btree.NewGenericOptions[*Row](makeRowComparator(xs.filter.OrderBy()...), btree.Options{NoLocks: true})
xs.outSorted = btree.NewGenericOptions[ValueGetter](makeRowComparator(xs.filter.OrderBy()...), btree.Options{NoLocks: true})
xs.scanRow = nil
xs.planned = false
xs.i = 0
@@ -296,7 +296,11 @@ func (xs *joinLeft) joinRight(ctx context.Context, left *Row) (err error) {
}
// Assert if we want to keep
if !xs.keep(ctx, r) {
k, err := xs.keep(ctx, r)
if err != nil {
return err
}
if !k {
continue
}
@@ -390,9 +394,9 @@ func (xs *joinLeft) indexRightRow(r *Row) (err error) {
}
// keep checks if the row should be kept or discarded
func (xs *joinLeft) keep(ctx context.Context, r *Row) bool {
func (xs *joinLeft) keep(ctx context.Context, r *Row) (bool, error) {
if xs.rowTester == nil {
return true
return true, nil
}
return xs.rowTester.Test(ctx, r)

View File

@@ -33,21 +33,24 @@ func TestStepJoinLocal(t *testing.T) {
{ident: "f_val", t: TypeText{}},
}
tcc := []struct {
name string
type (
testCase struct {
name string
outAttributes []simpleAttribute
leftAttributes []simpleAttribute
rightAttributes []simpleAttribute
joinPred JoinPredicate
outAttributes []simpleAttribute
leftAttributes []simpleAttribute
rightAttributes []simpleAttribute
joinPred JoinPredicate
lIn []simpleRow
fIn []simpleRow
out []simpleRow
lIn []simpleRow
fIn []simpleRow
out []simpleRow
f internalFilter
}{
// Basic behavior
f internalFilter
}
)
baseBehavior := []testCase{
{
name: "basic link",
outAttributes: basicAttrs,
@@ -145,8 +148,325 @@ func TestStepJoinLocal(t *testing.T) {
fIn: []simpleRow{},
out: []simpleRow{},
},
}
sorting := []testCase{
{
name: "sorting single key full asc",
outAttributes: basicAttrs,
leftAttributes: basicLocalAttrs,
rightAttributes: basicForeignAttrs,
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
// Filtering
lIn: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1"},
{"l_pk": 2, "l_val": "l2 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"l_pk": 2, "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "l_pk", Descending: false}},
},
},
{
name: "sorting single key full desc",
outAttributes: basicAttrs,
leftAttributes: basicLocalAttrs,
rightAttributes: basicForeignAttrs,
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1"},
{"l_pk": 2, "l_val": "l2 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 2, "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 1, "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "l_pk", Descending: true}},
},
},
{
name: "sorting multiple key left first all asc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "b", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "c", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
{"l_pk": 2, "left_order": "b", "right_order": "c", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "left_order", Descending: false}, {Column: "right_order", Descending: false}},
},
},
{
name: "sorting multiple key left first all desc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "b", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "c", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 2, "left_order": "b", "right_order": "c", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "left_order", Descending: true}, {Column: "right_order", Descending: true}},
},
},
{
name: "sorting multiple key left first asc desc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "b", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "c", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"l_pk": 2, "left_order": "b", "right_order": "c", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "left_order", Descending: false}, {Column: "right_order", Descending: true}},
},
},
{
name: "sorting multiple key left first desc asc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "b", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "c", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
{"l_pk": 2, "left_order": "b", "right_order": "c", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "left_order", Descending: true}, {Column: "right_order", Descending: false}},
},
},
{
name: "sorting multiple key right first all asc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "c", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "b", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
{"l_pk": 2, "left_order": "c", "right_order": "b", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "right_order", Descending: false}, {Column: "left_order", Descending: false}},
},
},
{
name: "sorting multiple key right first all desc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "c", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "b", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 2, "left_order": "c", "right_order": "b", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "right_order", Descending: true}, {Column: "left_order", Descending: true}},
},
},
{
name: "sorting multiple key right first asc desc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "c", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "b", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"l_pk": 2, "left_order": "c", "right_order": "b", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "right_order", Descending: false}, {Column: "left_order", Descending: true}},
},
},
{
name: "sorting multiple key right first desc asc",
outAttributes: append(basicAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}, simpleAttribute{ident: "right_order", t: TypeText{}}),
leftAttributes: append(basicLocalAttrs, simpleAttribute{ident: "left_order", t: TypeText{}}),
rightAttributes: append(basicForeignAttrs, simpleAttribute{ident: "right_order", t: TypeText{}}),
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "left_order": "a", "l_val": "l1 v1"},
{"l_pk": 2, "left_order": "c", "l_val": "l2 v1"},
{"l_pk": 3, "left_order": "b", "l_val": "l3 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "right_order": "a", "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "right_order": "b", "f_fk": 2, "f_val": "f2 v1"},
{"f_pk": 3, "right_order": "b", "f_fk": 3, "f_val": "f3 v1"},
},
out: []simpleRow{
{"l_pk": 3, "left_order": "b", "right_order": "b", "l_val": "l3 v1", "f_pk": 3, "f_fk": 3, "f_val": "f3 v1"},
{"l_pk": 2, "left_order": "c", "right_order": "b", "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 1, "left_order": "a", "right_order": "a", "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "right_order", Descending: true}, {Column: "left_order", Descending: false}},
},
},
{
name: "sorting nulls asc",
outAttributes: basicAttrs,
leftAttributes: basicLocalAttrs,
rightAttributes: basicForeignAttrs,
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1"},
{"l_pk": 2, "l_val": nil},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 2, "l_val": nil, "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 1, "l_val": "l1 v1", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "l_val", Descending: false}},
},
},
{
name: "sorting nulls desc",
outAttributes: basicAttrs,
leftAttributes: basicLocalAttrs,
rightAttributes: basicForeignAttrs,
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "l_val": nil},
{"l_pk": 2, "l_val": "l2 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 2, "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 1, "l_val": nil, "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "l_val", Descending: true}},
},
},
}
filtering := []testCase{
{
name: "filtering constraints single attr",
outAttributes: append(basicAttrs, simpleAttribute{ident: "l_const"}),
@@ -319,8 +639,8 @@ func TestStepJoinLocal(t *testing.T) {
expression: "l_val == 'l2 v1'",
},
},
// Paging
}
paging := []testCase{
{
name: "paging cut off first entry",
outAttributes: basicAttrs,
@@ -402,54 +722,107 @@ func TestStepJoinLocal(t *testing.T) {
},
},
}
nilValues := []testCase{
{
name: "basic nil left join value",
outAttributes: basicAttrs,
leftAttributes: basicLocalAttrs,
rightAttributes: basicForeignAttrs,
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": nil, "l_val": "l1 v1"},
{"l_pk": 2, "l_val": "l2 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 2, "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
},
{
name: "basic nil right join value",
outAttributes: basicAttrs,
leftAttributes: basicLocalAttrs,
rightAttributes: basicForeignAttrs,
joinPred: JoinPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1"},
{"l_pk": 2, "l_val": "l2 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": nil, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 2, "l_val": "l2 v1", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
},
}
batches := [][]testCase{
baseBehavior,
sorting,
filtering,
paging,
nilValues,
}
ctx := context.Background()
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
l := InMemoryBuffer()
for _, r := range tc.lIn {
require.NoError(t, l.Add(ctx, r))
}
for _, batch := range batches {
for _, tc := range batch {
t.Run(tc.name, func(t *testing.T) {
l := InMemoryBuffer()
for _, r := range tc.lIn {
require.NoError(t, l.Add(ctx, r))
}
f := InMemoryBuffer()
for _, r := range tc.fIn {
require.NoError(t, f.Add(ctx, r))
}
f := InMemoryBuffer()
for _, r := range tc.fIn {
require.NoError(t, f.Add(ctx, r))
}
tc.f.orderBy = filter.SortExprSet{
{Column: "l_pk"},
{Column: "f_pk"},
}
// @todo please, move this into test cases; this was a cheat
if len(tc.f.orderBy) == 0 {
tc.f.orderBy = filter.SortExprSet{
{Column: "l_pk"},
{Column: "f_pk"},
}
}
def := Join{
Ident: "foo",
On: tc.joinPred,
OutAttributes: saToMapping(tc.outAttributes...),
LeftAttributes: saToMapping(tc.leftAttributes...),
RightAttributes: saToMapping(tc.rightAttributes...),
filter: tc.f,
def := Join{
Ident: "foo",
On: tc.joinPred,
OutAttributes: saToMapping(tc.outAttributes...),
LeftAttributes: saToMapping(tc.leftAttributes...),
RightAttributes: saToMapping(tc.rightAttributes...),
filter: tc.f,
plan: joinPlan{},
}
plan: joinPlan{},
}
err := def.init(ctx)
require.NoError(t, err)
xs, err := def.exec(ctx, l, f)
require.NoError(t, err)
err := def.init(ctx)
require.NoError(t, err)
xs, err := def.exec(ctx, l, f)
require.NoError(t, err)
i := 0
for xs.Next(ctx) {
require.NoError(t, xs.Err())
out := simpleRow{}
require.NoError(t, xs.Err())
require.NoError(t, xs.Scan(out))
i := 0
for xs.Next(ctx) {
require.NoError(t, xs.Err())
out := simpleRow{}
require.NoError(t, xs.Err())
require.NoError(t, xs.Scan(out))
require.Equal(t, tc.out[i], out)
require.Equal(t, tc.out[i], out)
i++
}
require.Equal(t, len(tc.out), i)
})
i++
}
require.Equal(t, len(tc.out), i)
})
}
}
}

View File

@@ -377,7 +377,11 @@ func (xs *linkLeft) pullEntireLeftSource(ctx context.Context) (err error) {
continue
}
if !xs.keep(ctx, l, rel) {
k, err := xs.keep(ctx, l, rel)
if err != nil {
return err
}
if !k {
continue
}
@@ -494,24 +498,28 @@ func (xs *linkLeft) sortLeftRows() (err error) {
// keep checks if the row should be kept or discarded
//
// Link's keep is a bit more complicated and it looks at the related buffer as well.
func (xs *linkLeft) keep(ctx context.Context, left *Row, buffer *relIndexBuffer) (keep bool) {
func (xs *linkLeft) keep(ctx context.Context, left *Row, buffer *relIndexBuffer) (keep bool, err error) {
// If no buffer, we won't keep -- left inner join like behavior
if buffer == nil {
return false
return false, nil
}
// No tester include all ok rows
if xs.rowTester == nil {
return true
return true, nil
}
ch := &rowLink{a: left}
for _, ch.b = range buffer.rows {
if !xs.rowTester.Test(ctx, ch) {
return false
k, err := xs.rowTester.Test(ctx, ch)
if err != nil {
return false, err
}
if !k {
return false, nil
}
}
return true
return true, nil
}
func (xs *linkLeft) collectPrimaryAttributes() (out []string) {

View File

@@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/require"
)
func TestStepLinkleft(t *testing.T) {
func TestStepLinkLeft(t *testing.T) {
crs1 := &filter.PagingCursor{}
crs1.Set("l_pk", 1, false)
crs1.Set("l_val", "l1 v1", false)
@@ -35,22 +35,25 @@ func TestStepLinkleft(t *testing.T) {
{ident: "f_val", t: TypeText{}},
}
tcc := []struct {
name string
type (
testCase struct {
name string
leftAttributes []simpleAttribute
rightAttributes []simpleAttribute
leftOutAttributes []simpleAttribute
rightOutAttributes []simpleAttribute
linkPred LinkPredicate
leftAttributes []simpleAttribute
rightAttributes []simpleAttribute
leftOutAttributes []simpleAttribute
rightOutAttributes []simpleAttribute
linkPred LinkPredicate
lIn []simpleRow
fIn []simpleRow
out []simpleRow
lIn []simpleRow
fIn []simpleRow
out []simpleRow
f internalFilter
}{
// Basic behavior
f internalFilter
}
)
baseBehavior := []testCase{
{
name: "basic link",
leftAttributes: basicLeftAttrs,
@@ -180,8 +183,9 @@ func TestStepLinkleft(t *testing.T) {
fIn: []simpleRow{},
out: []simpleRow{},
},
}
// Filtering
filtering := []testCase{
{
name: "filtering constraints single attr",
leftAttributes: basicLeftAttrs,
@@ -428,8 +432,66 @@ func TestStepLinkleft(t *testing.T) {
expression: "f_val == 'f1 v1'",
},
},
}
// Paging
sorting := []testCase{
{
name: "sorting single key full asc",
leftAttributes: basicLeftAttrs,
rightAttributes: basicRightAttrs,
leftOutAttributes: basicOutLeftAttrs,
rightOutAttributes: basicOutRightAttrs,
linkPred: LinkPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1"},
{"l_pk": 2, "l_val": "l2 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1"},
{"$sys.ref": "right", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"l_pk": 2, "l_val": "l2 v1"},
{"$sys.ref": "right", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "l_pk", Descending: false}},
},
},
{
name: "sorting single key full desc",
leftAttributes: basicLeftAttrs,
rightAttributes: basicRightAttrs,
leftOutAttributes: basicOutLeftAttrs,
rightOutAttributes: basicOutRightAttrs,
linkPred: LinkPredicate{Left: "l_pk", Right: "f_fk"},
lIn: []simpleRow{
{"l_pk": 1, "l_val": "l1 v1"},
{"l_pk": 2, "l_val": "l2 v1"},
},
fIn: []simpleRow{
{"f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
{"f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
},
out: []simpleRow{
{"l_pk": 2, "l_val": "l2 v1"},
{"$sys.ref": "right", "f_pk": 2, "f_fk": 2, "f_val": "f2 v1"},
{"l_pk": 1, "l_val": "l1 v1"},
{"$sys.ref": "right", "f_pk": 1, "f_fk": 1, "f_val": "f1 v1"},
},
f: internalFilter{
orderBy: filter.SortExprSet{{Column: "l_pk", Descending: true}},
},
},
}
paging := []testCase{
{
name: "paging cut off first entry",
leftAttributes: basicLeftAttrs,
@@ -511,50 +573,59 @@ func TestStepLinkleft(t *testing.T) {
},
}
batches := [][]testCase{
baseBehavior,
filtering,
sorting,
paging,
}
ctx := context.Background()
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
l := InMemoryBuffer()
for _, r := range tc.lIn {
require.NoError(t, l.Add(ctx, r))
}
for _, batch := range batches {
for _, tc := range batch {
t.Run(tc.name, func(t *testing.T) {
l := InMemoryBuffer()
for _, r := range tc.lIn {
require.NoError(t, l.Add(ctx, r))
}
f := InMemoryBuffer()
for _, r := range tc.fIn {
require.NoError(t, f.Add(ctx, r))
}
f := InMemoryBuffer()
for _, r := range tc.fIn {
require.NoError(t, f.Add(ctx, r))
}
def := Link{
Ident: "foo",
RelLeft: "left",
RelRight: "right",
def := Link{
Ident: "foo",
RelLeft: "left",
RelRight: "right",
On: tc.linkPred,
LeftAttributes: saToMapping(tc.leftAttributes...),
RightAttributes: saToMapping(tc.rightAttributes...),
OutLeftAttributes: saToMapping(tc.leftOutAttributes...),
OutRightAttributes: saToMapping(tc.rightOutAttributes...),
filter: tc.f,
}
On: tc.linkPred,
LeftAttributes: saToMapping(tc.leftAttributes...),
RightAttributes: saToMapping(tc.rightAttributes...),
OutLeftAttributes: saToMapping(tc.leftOutAttributes...),
OutRightAttributes: saToMapping(tc.rightOutAttributes...),
filter: tc.f,
}
err := def.init(ctx)
require.NoError(t, err)
xs, err := def.exec(ctx, l, f)
require.NoError(t, err)
err := def.init(ctx)
require.NoError(t, err)
xs, err := def.exec(ctx, l, f)
require.NoError(t, err)
i := 0
for xs.Next(ctx) {
require.NoError(t, xs.Err())
out := simpleRow{}
require.NoError(t, xs.Err())
require.NoError(t, xs.Scan(out))
i := 0
for xs.Next(ctx) {
require.NoError(t, xs.Err())
out := simpleRow{}
require.NoError(t, xs.Err())
require.NoError(t, xs.Scan(out))
require.Equal(t, tc.out[i], out)
require.Equal(t, tc.out[i], out)
i++
}
require.Equal(t, len(tc.out), i)
})
i++
}
require.Equal(t, len(tc.out), i)
})
}
}
}

View File

@@ -13,6 +13,7 @@ type (
internalFilter struct {
constraints map[string][]any
stateConstraints map[string]filter.State
metaConstraints map[string]any
expression string
expParsed *ql.ASTNode
orderBy filter.SortExprSet
@@ -23,7 +24,7 @@ type (
func (f internalFilter) Constraints() map[string][]any { return f.constraints }
func (f internalFilter) StateConstraints() map[string]filter.State { return f.stateConstraints }
func (f internalFilter) MetaConstraints() map[string]any { return nil }
func (f internalFilter) MetaConstraints() map[string]any { return f.metaConstraints }
func (f internalFilter) Expression() string { return f.expression }
func (f internalFilter) OrderBy() filter.SortExprSet { return f.orderBy }
func (f internalFilter) Limit() uint { return f.limit }
@@ -58,20 +59,30 @@ func toInternalFilter(f filter.Filter) (out internalFilter, err error) {
return
}
func FilterForExpr(n *ql.ASTNode) internalFilter {
func FilterFromExpr(n *ql.ASTNode) internalFilter {
// @todo consider adding string expr for consistency
return internalFilter{
expParsed: n,
}
}
func (a internalFilter) WithConstraints(c map[string][]any) internalFilter {
a.constraints = c
return a
// MergeFilters returns a new filter based on a overwritten by b
func (a internalFilter) MergeFilters(b filter.Filter) (c internalFilter, err error) {
// In case we got a generic b filter, convert it to the internal one for easier handling
aux, ok := b.(internalFilter)
if !ok {
var err error
aux, err = toInternalFilter(b)
if err != nil {
return c, err
}
}
return a.mergeFilters(aux), nil
}
// mergeFilters returns a new filter based on a overwritten by values from b
func (a internalFilter) mergeFilters(b internalFilter) (c internalFilter) {
c = a
// expression

View File

@@ -193,25 +193,17 @@ func (b *inmemBuffer) Len() int {
}
func (b *inmemBuffer) Less(i, j int) bool {
var (
err error
ra ValueGetter
rb ValueGetter
rac map[string]uint
rbc map[string]uint
)
ra, rac, err = b.rowAt(i)
ra, _, err := b.rowAt(i)
if err != nil {
panic(err)
}
rb, rbc, err = b.rowAt(j)
rb, _, err := b.rowAt(j)
if err != nil {
panic(err)
}
return valueGetterCounterComparator(b.sort, ra, rb, rac, rbc)
return makeRowComparator(b.sort...)(ra, rb)
}
func (b *inmemBuffer) Swap(i, j int) {

View File

@@ -17,6 +17,8 @@ type (
ResourceType string
Resource string
Refs map[string]any
}
// Model describes the underlying data and its shape
@@ -29,6 +31,10 @@ type (
ResourceID uint64
ResourceType string
// Refs is an arbitrary map to identify a model
// @todo consider reworking this; I'm not the biggest fan
Refs map[string]any
SensitivityLevelID uint64
Attributes AttributeSet
@@ -135,6 +141,26 @@ func (mm ModelSet) FindByIdent(ident string) *Model {
return nil
}
// FindByRefs returns the first Model that matches the given refs
func (mm ModelSet) FindByRefs(refs map[string]any) *Model {
for _, model := range mm {
for k, v := range refs {
ref, ok := model.Refs[k]
if !ok {
goto skip
}
if v != ref {
goto skip
}
}
return model
skip:
}
return nil
}
// FilterByReferenced returns all of the models that reference b
func (aa ModelSet) FilterByReferenced(b *Model) (out ModelSet) {
for _, aModel := range aa {

61
pkg/dal/model_test.go Normal file
View File

@@ -0,0 +1,61 @@
package dal
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestModelFindByRefs(t *testing.T) {
tcc := []struct {
name string
in ModelSet
refs map[string]interface{}
found bool
}{
{
name: "one ref found",
in: ModelSet{
{Refs: map[string]any{"a": 1}},
},
refs: map[string]any{"a": 1},
found: true,
},
{
name: "one ref not found",
in: ModelSet{
{Refs: map[string]any{"a": 1}},
},
refs: map[string]any{"b": 1},
found: false,
},
{
name: "n refs found",
in: ModelSet{
{Refs: map[string]any{"a": 1, "b": 1, "c": 1}},
},
refs: map[string]any{"a": 1, "b": 1},
found: true,
},
{
name: "n refs not found",
in: ModelSet{
{Refs: map[string]any{"a": 1, "b": 1}},
},
refs: map[string]any{"a": 1, "b": 2},
found: false,
},
}
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
m := tc.in.FindByRefs(tc.refs)
if tc.found {
require.NotNil(t, m)
} else {
require.Nil(t, m)
}
})
}
}

View File

@@ -166,17 +166,19 @@ func (pp Pipeline) root() PipelineStep {
// Slice returns a new pipeline with all steps of the subtree with ident as root
func (pp Pipeline) Slice(ident string) (out Pipeline) {
var r PipelineStep
// Make a copy so we can assure the caller can go ham over the pipeline
ppc := pp.Clone()
// Find root
for _, p := range pp {
var r PipelineStep
for _, p := range ppc {
if p.Identifier() == ident {
r = p
break
}
}
return pp.slice(r)
return ppc.slice(r)
}
// slice is the recursive counterpart for the Slice method
@@ -343,6 +345,10 @@ func (p Pipeline) Clone() (out Pipeline) {
aux := *s
out = append(out, &aux)
case *Datasource:
aux := *s
out = append(out, &aux)
default:
panic("unsupported step")
}

View File

@@ -6,10 +6,14 @@ import (
type (
tester interface {
Test(ctx context.Context, params any) bool
// Test returns a boolean output based on the expression, mostly used for filters
// @todo remove the error and rely on validator so make sure everything is valid
Test(ctx context.Context, params any) (bool, error)
}
evaluator interface {
Eval(ctx context.Context, params any) any
// Eval returns some value based on the expression, mostly used for attribute eval
// @todo remove the error and rely on validator so make sure eve
Eval(ctx context.Context, params any) (any, error)
}
)

View File

@@ -6,6 +6,7 @@ import (
"strings"
"github.com/PaesslerAG/gval"
"github.com/cortezaproject/corteza-server/pkg/gvalfnc"
"github.com/cortezaproject/corteza-server/pkg/ql"
)
@@ -22,19 +23,11 @@ type (
)
var (
refToGvalExp = map[string]*exprHandlerGval{
// keywords
"null": {
Handler: func(args ...string) string {
return fmt.Sprintf("%s == null", args[0])
},
},
"nnull": {
Handler: func(args ...string) string {
return fmt.Sprintf("%s != null", args[0])
},
},
globalGvalConverter converterGval
)
var (
refToGvalExp = map[string]*exprHandlerGval{
// operators
"not": {
Handler: func(args ...string) string {
@@ -53,6 +46,11 @@ var (
return strings.Join(args, " || ")
},
},
"xor": {
Handler: func(args ...string) string {
return fmt.Sprintf("(%s != %s)", args[0], args[1])
},
},
// - comp.
"eq": {
@@ -109,6 +107,100 @@ var (
},
},
// - strings
"concat": {
Handler: func(args ...string) string {
return fmt.Sprintf("concat(%s)", strings.Join(args, ", "))
},
},
// @todo implement; the commented versions are not good enough
// "like": {
// Handler: func(args ...string) string {
// // @todo better regex construction
// nn := strings.Replace(strings.Trim(args[1], "\""), "%", ".*", -1)
// nn = strings.Replace(nn, "_", ".[1]", -1)
// return fmt.Sprintf("%s =~ ^%s$", args[0], nn)
// },
// },
// "nlike": {
// Handler: func(args ...string) string {
// // @todo better regex construction
// nn := strings.Replace(args[1], "%", ".*", -1)
// nn = strings.Replace(nn, "_", ".[1]", -1)
// return fmt.Sprintf("!(%s =~ ^%s$)", args[0], nn)
// },
// },
// "is": {
// Handler: func(args ...string) string {
// return fmt.Sprintf("%s == %s", args[0], args[1])
// }
// }
// - filtering
"now": {
Handler: func(args ...string) string {
return "now()"
},
},
"quarter": {
Handler: func(args ...string) string {
return fmt.Sprintf("quarter(%s)", args[0])
},
},
"year": {
Handler: func(args ...string) string {
return fmt.Sprintf("year(%s)", args[0])
},
},
"month": {
Handler: func(args ...string) string {
return fmt.Sprintf("month(%s)", args[0])
},
},
"date": {
Handler: func(args ...string) string {
return fmt.Sprintf("date(%s)", args[0])
},
},
// generic stuff
"null": {
Handler: func(args ...string) string {
return fmt.Sprintf("isNil(%s)", args[0])
},
},
"nnull": {
Handler: func(args ...string) string {
return fmt.Sprintf("!isNil(%s)", args[0])
},
},
"exists": {
Handler: func(args ...string) string {
return fmt.Sprintf("!isNil(%s)", args[0])
},
},
// - typecast
"float": {
Handler: func(args ...string) string {
return fmt.Sprintf("float(%s)", args[0])
},
},
"int": {
Handler: func(args ...string) string {
return fmt.Sprintf("int(%s)", args[0])
},
},
"string": {
Handler: func(args ...string) string {
return fmt.Sprintf("string(%s)", args[0])
},
},
"group": {
Handler: func(args ...string) string {
return fmt.Sprintf("(%s)", args[0])
@@ -119,9 +211,13 @@ var (
// newConverterGval initializes a new gval exp. converter
func newConverterGval() converterGval {
return converterGval{
parser: ql.NewParser(),
if globalGvalConverter.parser == nil {
globalGvalConverter = converterGval{
parser: ql.NewParser(),
}
}
return globalGvalConverter
}
// newRunnerGval initializes a new gval exp. runner from the provided expression
@@ -139,9 +235,7 @@ func newRunnerGval(expr string) (out *runnerGval, err error) {
return nil, err
}
// @todo add all those extra functions the expr package uses?
// Potentially not ok since we allow a subset of operations (for compatability)
out.eval, err = gval.Full().NewEvaluable(expr)
out.eval, err = newGval(expr)
return
}
@@ -155,16 +249,62 @@ func newRunnerGvalParsed(n *ql.ASTNode) (out *runnerGval, err error) {
return
}
// @todo add all those extra functions the expr package uses?
// Potentially not ok since we allow a subset of operations (for compatability)
out.eval, err = gval.Full().NewEvaluable(expr)
out.eval, err = newGval(expr)
return
}
// newGval initializes a new gval evaluatable for the provided expression
//
// The eval. includes a small subset of supported expr. functions which may be
// used in the pipeline.
//
// @note the subset is limited to simplify the (eventual) offloading to the DB.
// At some point, more functions will be supported, and the ones which can't
// be offloaded will be performed in some exec. step.
func newGval(expr string) (gval.Evaluable, error) {
return gval.Full(
// Extra functions we'll need
// @note don't bring in all of the expr. pkg functions as we'll need to
// support these on the DB as well
gval.Function("now", gvalfnc.Now),
gval.Function("quarter", gvalfnc.Quarter),
gval.Function("year", gvalfnc.Year),
gval.Function("month", gvalfnc.Month),
gval.Function("date", gvalfnc.Date),
gval.Function("isNil", gvalfnc.IsNil),
gval.Function("float", gvalfnc.CastFloat),
gval.Function("int", gvalfnc.CastInt),
gval.Function("string", gvalfnc.CastString),
gval.Function("concat", gvalfnc.ConcatStrings),
).NewEvaluable(expr)
}
func (e *runnerGval) Test(ctx context.Context, rows any) (bool, error) {
o, err := e.eval(ctx, rows)
if err != nil {
return false, err
}
v, ok := o.(bool)
return ok && v, nil
}
func (e *runnerGval) Eval(ctx context.Context, rows any) (any, error) {
o, err := e.eval(ctx, rows)
if err != nil {
return nil, err
}
return o, nil
}
// Parse parses the QL expression into QL ASTNodes
func (c converterGval) Parse(expr string) (*ql.ASTNode, error) {
return c.parser.Parse(expr)
}
// Convert converts the given nodes into a GVal expression
// @todo add more validation so we can potentially omit exec. error checks
func (c converterGval) Convert(n *ql.ASTNode) (expr string, err error) {
return c.convert(n)
}
@@ -193,27 +333,9 @@ func (c converterGval) convert(n *ql.ASTNode) (_ string, err error) {
}
func (c converterGval) refHandler(n *ql.ASTNode, args ...string) (out string, err error) {
if refToGvalExp[n.Ref] == nil {
r := strings.ToLower(n.Ref)
if refToGvalExp[r] == nil {
return "", fmt.Errorf("unknown ref %q", n.Ref)
}
return refToGvalExp[n.Ref].Handler(args...), nil
}
func (e *runnerGval) Test(ctx context.Context, rows any) bool {
o, err := e.eval(ctx, rows)
if err != nil {
return false
}
v, ok := o.(bool)
return ok && v
}
func (e *runnerGval) Eval(ctx context.Context, rows any) any {
o, err := e.eval(ctx, rows)
if err != nil {
return nil
}
return o
return refToGvalExp[r].Handler(args...), nil
}

201
pkg/dal/runner_gval_test.go Normal file
View File

@@ -0,0 +1,201 @@
package dal
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestRowEvaluatorTest(t *testing.T) {
tc := []struct {
name string
expr string
in map[string]ValueGetter
out bool
}{{
name: "single row ok",
expr: `row.test == 10`,
in: map[string]ValueGetter{
"row": (&Row{}).WithValue("test", 0, 10),
},
out: true,
}, {
name: "single row nok",
expr: `row.test == 11`,
in: map[string]ValueGetter{
"row": (&Row{}).WithValue("test", 0, 10),
},
out: false,
}, {
name: "two rows ok",
expr: `local.key == foreign.ref`,
in: map[string]ValueGetter{
"local": (&Row{}).WithValue("key", 0, 10),
"foreign": (&Row{}).WithValue("ref", 0, 10),
},
out: true,
}, {
name: "two rows nok",
expr: `local.key == foreign.ref`,
in: map[string]ValueGetter{
"local": (&Row{}).WithValue("key", 0, 10),
"foreign": (&Row{}).WithValue("ref", 0, 11),
},
out: false,
}}
ctx := context.Background()
for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
evl, err := newRunnerGval(c.expr)
require.NoError(t, err)
tt, err := evl.Test(ctx, c.in)
require.NoError(t, err)
require.Equal(t, c.out, tt)
})
}
}
func TestHandlers(t *testing.T) {
yr := time.Now().Year()
mnt := int(time.Now().Month())
qtr := mnt / 4
dy := time.Now().Day()
// @todo some of the tests were omitted because they would be too flaky
// or can not yet pass (the gval nil handlers primarily)
tcc := []struct {
name string
expr string
in simpleRow
out any
}{
{
name: "not",
expr: `!true`,
in: simpleRow{},
out: false,
},
{
name: "and",
expr: `true && false`,
in: simpleRow{},
out: false,
},
{
name: "or",
expr: `false || true`,
in: simpleRow{},
out: true,
},
{
name: "xor",
expr: `'a' xor 'b'`,
in: simpleRow{},
out: true,
},
{
name: "eq",
expr: `1 == 2`,
in: simpleRow{},
out: false,
},
{
name: "ne",
expr: `3 != 2`,
in: simpleRow{},
out: true,
},
{
name: "lt",
expr: `1 < 2`,
in: simpleRow{},
out: true,
},
{
name: "le",
expr: `1 <= 2`,
in: simpleRow{},
out: true,
},
{
name: "gt",
expr: `2 > 1`,
in: simpleRow{},
out: true,
},
{
name: "ge",
expr: `1 >= 2`,
in: simpleRow{},
out: false,
},
{
name: "add",
expr: `1+3`,
in: simpleRow{},
out: float64(4),
},
{
name: "sub",
expr: `3-5`,
in: simpleRow{},
out: float64(-2),
},
{
name: "mult",
expr: `1*2`,
in: simpleRow{},
out: float64(2),
},
{
name: "div",
expr: `2/2`,
in: simpleRow{},
out: float64(1),
},
{
name: "concat",
expr: `concat('a', 'b')`,
in: simpleRow{},
out: "ab",
},
{
name: "quarter",
expr: `quarter(now())`,
in: simpleRow{},
out: qtr,
},
{
name: "year",
expr: `year(now())`,
in: simpleRow{},
out: yr,
},
{
name: "month",
expr: `month(now())`,
in: simpleRow{},
out: mnt,
},
{
name: "date",
expr: `date(now())`,
in: simpleRow{},
out: dy,
},
}
for _, tc := range tcc {
t.Run(tc.name, func(t *testing.T) {
evl, err := newRunnerGval(tc.expr)
require.NoError(t, err)
out, err := evl.Eval(context.Background(), tc.in)
require.NoError(t, err)
require.Equal(t, tc.out, out)
})
}
}

View File

@@ -1,58 +0,0 @@
package dal
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestRowEvaluatorTest(t *testing.T) {
tc := []struct {
name string
expr string
in map[string]ValueGetter
out bool
}{{
name: "single row ok",
expr: `row.test == 10`,
in: map[string]ValueGetter{
"row": (&Row{}).WithValue("test", 0, 10),
},
out: true,
}, {
name: "single row nok",
expr: `row.test == 11`,
in: map[string]ValueGetter{
"row": (&Row{}).WithValue("test", 0, 10),
},
out: false,
}, {
name: "two rows ok",
expr: `local.key == foreign.ref`,
in: map[string]ValueGetter{
"local": (&Row{}).WithValue("key", 0, 10),
"foreign": (&Row{}).WithValue("ref", 0, 10),
},
out: true,
}, {
name: "two rows nok",
expr: `local.key == foreign.ref`,
in: map[string]ValueGetter{
"local": (&Row{}).WithValue("key", 0, 10),
"foreign": (&Row{}).WithValue("ref", 0, 11),
},
out: false,
}}
ctx := context.Background()
for _, c := range tc {
t.Run(c.name, func(t *testing.T) {
evl, err := newRunnerGval(c.expr)
require.NoError(t, err)
require.Equal(t, c.out, evl.Test(ctx, c.in))
})
}
}

View File

@@ -56,6 +56,9 @@ type (
Delete(ctx context.Context, mf ModelRef, operations OperationSet, vv ...ValueGetter) (err error)
Truncate(ctx context.Context, mf ModelRef, operations OperationSet) (err error)
Run(ctx context.Context, pp Pipeline) (iter Iterator, err error)
Dryrun(ctx context.Context, pp Pipeline) (err error)
SearchConnectionIssues(connectionID uint64) (out []error)
SearchModelIssues(resourceID uint64) (out []error)
}
@@ -414,8 +417,8 @@ func (svc *service) Update(ctx context.Context, mf ModelRef, operations Operatio
return
}
func (svc *service) FindModel(mf ModelRef) *Model {
return svc.getModelByFilter(mf)
func (svc *service) FindModel(mr ModelRef) *Model {
return svc.getModelByRef(mr)
}
func (svc *service) Search(ctx context.Context, mf ModelRef, operations OperationSet, f filter.Filter) (iter Iterator, err error) {
@@ -597,7 +600,7 @@ func (svc *service) Truncate(ctx context.Context, mf ModelRef, operations Operat
}
func (svc *service) storeOpPrep(ctx context.Context, mf ModelRef, operations OperationSet) (model *Model, cw *ConnectionWrap, err error) {
model = svc.getModelByFilter(mf)
model = svc.getModelByRef(mf)
if model == nil {
err = errModelNotFound(mf.ResourceID)
return
@@ -928,6 +931,14 @@ func (svc *service) ReplaceModelAttribute(ctx context.Context, model *Model, old
return
}
// FindModelByRefs returns the model with all of the given refs matching
//
// @note refs are primarily used for DAL pipelines where steps can reference models
// by handles and slugs such as module and namespace.
func (svc *service) FindModelByRefs(connectionID uint64, refs map[string]any) *Model {
return svc.models[connectionID].FindByRefs(refs)
}
func (svc *service) FindModelByResourceID(connectionID uint64, resourceID uint64) *Model {
if connectionID == 0 {
connectionID = svc.defConnID
@@ -1050,15 +1061,17 @@ func (svc *service) registerModelToConnection(ctx context.Context, cw *Connectio
return nil, nil
}
func (svc *service) getModelByFilter(mf ModelRef) *Model {
if mf.ConnectionID == 0 {
mf.ConnectionID = svc.defConnID
func (svc *service) getModelByRef(mr ModelRef) *Model {
if mr.ConnectionID == 0 {
mr.ConnectionID = svc.defConnID
}
if mf.ResourceID > 0 {
return svc.FindModelByResourceID(mf.ConnectionID, mf.ResourceID)
if mr.Refs != nil {
return svc.FindModelByRefs(mr.ConnectionID, mr.Refs)
} else if mr.ResourceID > 0 {
return svc.FindModelByResourceID(mr.ConnectionID, mr.ResourceID)
}
return svc.FindModelByResourceIdent(mf.ConnectionID, mf.ResourceType, mf.Resource)
return svc.FindModelByResourceIdent(mr.ConnectionID, mr.ResourceType, mr.Resource)
}
func (svc *service) validateNewSensitivityLevels(levels *sensitivityLevelIndex) (err error) {

View File

@@ -1,6 +1,7 @@
package dal
import (
"context"
"fmt"
"strings"
"time"
@@ -18,6 +19,20 @@ type (
Src string
Props MapProperties
}
// Row is a generic implementation for ValueGetter and ValueSetter
//
// Primarily used within DAL pipeline execution steps, but may also be used
// outside.
Row struct {
counters map[string]uint
values valueSet
// Metadata to make it easier to work with
// @todo add when needed
}
valueSet map[string][]any
)
func (sa SimpleAttr) Identifier() string { return sa.Ident }
@@ -25,42 +40,153 @@ func (sa SimpleAttr) Expression() (expression string) { return sa.Expr }
func (sa SimpleAttr) Source() (ident string) { return sa.Src }
func (sa SimpleAttr) Properties() MapProperties { return sa.Props }
// WithValue is a simple helper to construct rows with populated values
//
// @note The main use is for tests so restrain from using it in code.
func (r *Row) WithValue(name string, pos uint, v any) *Row {
err := r.SetValue(name, pos, v)
if err != nil {
panic(err)
}
return r
}
func (r Row) SelectGVal(ctx context.Context, k string) (interface{}, error) {
return r.GetValue(k, 0)
}
// Reset clears out the row so the same instance can be reused where possible
//
// Important: Reset only clears out the counters and does not re-init/clear out
// the underlaying values. Don't directly iterate over the values, but use the
// counters.
func (r *Row) Reset() {
for k := range r.counters {
r.counters[k] = 0
}
}
func (r *Row) SetValue(name string, pos uint, v any) error {
if r.values == nil {
r.values = make(valueSet)
}
if r.counters == nil {
r.counters = make(map[string]uint)
}
// Make sure there is space for it
// @note benchmarking proves that the rest of the function introduces a lot of memory pressure.
// Investigate options on reworking this/reducing allocations.
if int(pos)+1 > len(r.values[name]) {
r.values[name] = append(r.values[name], make([]any, (int(pos)+1)-len(r.values[name]))...)
}
r.values[name][pos] = v
if pos >= r.counters[name] {
r.counters[name]++
}
return nil
}
func (r *Row) CountValues() map[string]uint {
return r.counters
}
func (r *Row) GetValue(name string, pos uint) (any, error) {
if r.values == nil {
return nil, nil
}
if r.counters == nil {
return nil, nil
}
if pos >= r.counters[name] {
return nil, nil
}
return r.values[name][pos], nil
}
func (r *Row) String() string {
out := make([]string, 0, 20)
for k, cc := range r.counters {
for i := uint(0); i < cc; i++ {
v := r.values[k][i]
out = append(out, fmt.Sprintf("%s [%d] %v", k, i, v))
}
}
return strings.Join(out, " | ")
}
// compareGetters compares the two ValueGetters
// @todo multi-value support?
// -1: a is less then b
// 0: a is equal to b
// 1: a is greater then b
//
// Multi value rules:
// - if a has less items then b, a is less then b (-1)
// - if a has more items then b, a is more then b (1)
// - if a and b have the same amount of items; if any of the corresponding values
// are different, that outcome is used as the result
//
// This function is used to satisfy sort's less function requirement.
func compareGetters(a, b ValueGetter, ac, bc map[string]uint, attr string) int {
va, err := a.GetValue(attr, 0)
if err != nil {
// If a has less values then b, then a is less then b
if ac[attr] < bc[attr] {
return -1
} else if ac[attr] > bc[attr] {
return 1
}
vb, err := b.GetValue(attr, 0)
if err != nil {
return 1
// If a and b have the same number of values, then we need to compare them
for i := uint(0); i < ac[attr]; i++ {
va, err := a.GetValue(attr, i)
if err != nil {
return 1
}
vb, err := b.GetValue(attr, i)
if err != nil {
return 1
}
// Continue the cmp. until we find two values that are different
cmp := compareValues(va, vb)
if cmp != 0 {
return cmp
}
}
return compareValues(va, vb)
// If any value is different from the other, the loop above would end; so
// here, we can safely say they are the same
return 0
}
// compareValues compares the two values
// @todo support for other types and slices
// @todo identify what other types we should support
// -1: a is less then b
// 0: a is equal to b
// 1: a is greater then b
//
// @note I considered using gval here but using gval proved to bring
// a bit too much overhead.
//
// @note I considered using GVal here but it introduces more overhead then
// what I've conjured here.
// @todo look into using generics or some wrapping types here
func compareValues(va, vb any) int {
// simple/edge cases
if va == vb {
return 0
}
if va == nil {
return -1
}
if vb == nil {
return 1
}
// Compare based on type
switch ca := va.(type) {
case string:
cb, err := cast.ToStringE(vb)
@@ -123,9 +249,6 @@ func compareValues(va, vb any) int {
if err != nil {
return -1
}
if xa.Equal(cb) {
return 0
}
if xa.Before(cb) {
return -1
}
@@ -133,10 +256,9 @@ func compareValues(va, vb any) int {
return 1
}
}
return -1
}
// // // // // // // // // // // // // // // // // // // // // // // // //
panic(fmt.Sprintf("unsupported type for values %v, %v", va, vb))
}
// constraintsToExpression converts the given constraints map to a ql parsable expression
func constraintsToExpression(cc map[string][]any) string {
@@ -247,19 +369,90 @@ func prepareGenericRowTester(f internalFilter) (_ tester, err error) {
return newRunnerGval(expr)
}
func valueGetterCounterComparator(ss filter.SortExprSet, a, b ValueGetter, ca, cb map[string]uint) bool {
// @todo we can probably remove the branching here and write a bool alg. expr.
for _, s := range ss {
cmp := compareGetters(a, b, ca, cb, s.Column)
// makeRowComparator returns a ValueGetter comparator for the given sort expr
func makeRowComparator(ss ...*filter.SortExpr) func(a, b ValueGetter) bool {
return func(a, b ValueGetter) bool {
for _, s := range ss {
cmp := compareGetters(a, b, a.CountValues(), b.CountValues(), s.Column)
if cmp != 0 {
if s.Descending {
return cmp > 0
less, skip := evalCmpResult(cmp, s)
if !skip {
return less
}
return cmp < 0
}
return false
}
}
func evalCmpResult(cmp int, s *filter.SortExpr) (less, skip bool) {
if cmp != 0 {
if s.Descending {
return cmp > 0, false
}
return cmp < 0, false
}
return false
return false, true
}
// mergeRows merges all of the provided rows into the destination row
//
// If AttributeMapping is provided, that is taken into account, else
// everything is merged together with the last value winning.
func mergeRows(mapping []AttributeMapping, dst *Row, rows ...*Row) (err error) {
if len(mapping) == 0 {
return mergeRowsFull(dst, rows...)
}
return mergeRowsMapped(mapping, dst, rows...)
}
// mergeRowsFull merges all of the provided rows into the destination row
// The last provided value takes priority.
func mergeRowsFull(dst *Row, rows ...*Row) (err error) {
for _, r := range rows {
for name, vv := range r.values {
for i, values := range vv {
if dst.values == nil {
dst.values = make(valueSet)
dst.counters = make(map[string]uint)
}
if i == 0 {
dst.values[name] = make([]any, len(vv))
dst.counters[name] = 0
}
err = dst.SetValue(name, uint(i), values)
if err != nil {
return
}
}
}
}
return
}
// mergeRowsMapped merges all of the provided rows into the destination row using the provided mapping
// The last provided value takes priority.
func mergeRowsMapped(mapping []AttributeMapping, out *Row, rows ...*Row) (err error) {
for _, mp := range mapping {
name := mp.Source()
for _, r := range rows {
if r.values[name] != nil {
if out.values == nil {
out.values = make(valueSet)
out.counters = make(map[string]uint)
}
out.values[mp.Identifier()] = r.values[name]
out.counters[mp.Identifier()] = r.counters[name]
break
}
}
}
return
}

View File

@@ -0,0 +1,42 @@
package dal
import (
"testing"
"github.com/cortezaproject/corteza-server/pkg/filter"
)
// goos: linux
// goarch: amd64
// pkg: github.com/cortezaproject/corteza-server/pkg/dal
// cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
// BenchmarkRowComparator-12 161826 7015 ns/op 5120 B/op 40 allocs/op
// PASS
func BenchmarkRowComparator(b *testing.B) {
cmp := makeRowComparator(
&filter.SortExpr{Column: "a", Descending: false},
&filter.SortExpr{Column: "b", Descending: true},
&filter.SortExpr{Column: "c", Descending: true},
&filter.SortExpr{Column: "d", Descending: false},
)
r1 := simpleRow{"a": 10, "b": "aa", "c": 33, "d": 500}
r2 := simpleRow{"a": 50, "b": "aaaaa", "c": 31, "d": 10}
r3 := simpleRow{"a": 31, "b": "a", "c": 11, "d": 1000}
r4 := simpleRow{"a": 42, "b": "", "c": 0, "d": 300}
r5 := simpleRow{"a": 22, "b": "aaaaaaaaaa", "c": -25, "d": 21}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cmp(r1, r2)
cmp(r1, r3)
cmp(r1, r4)
cmp(r1, r5)
cmp(r2, r3)
cmp(r2, r4)
cmp(r2, r5)
cmp(r3, r4)
cmp(r3, r5)
cmp(r4, r5)
}
}

View File

@@ -33,6 +33,16 @@ func TestCompareValues(t *testing.T) {
a: 10,
b: 9,
out: 1,
}, {
name: "two ints; a nil",
a: nil,
b: 10,
out: -1,
}, {
name: "two ints; b nil",
a: 10,
b: nil,
out: 1,
},
{
@@ -50,6 +60,16 @@ func TestCompareValues(t *testing.T) {
a: "aa",
b: "a",
out: 1,
}, {
name: "two strings; a nil",
a: nil,
b: "aa",
out: -1,
}, {
name: "two strings; b nil",
a: "aa",
b: nil,
out: 1,
},
{
@@ -67,6 +87,16 @@ func TestCompareValues(t *testing.T) {
a: uint(10),
b: uint(9),
out: 1,
}, {
name: "two uints; a nil",
a: nil,
b: uint(10),
out: -1,
}, {
name: "two uints; b nil",
a: uint(10),
b: nil,
out: 1,
},
{
@@ -84,6 +114,16 @@ func TestCompareValues(t *testing.T) {
a: float64(10),
b: float64(9),
out: 1,
}, {
name: "two floats; a nil",
a: nil,
b: float64(10),
out: -1,
}, {
name: "two floats; b nil",
a: float64(10),
b: nil,
out: 1,
},
{
@@ -101,6 +141,23 @@ func TestCompareValues(t *testing.T) {
a: m,
b: n,
out: 1,
}, {
name: "two times; a nil",
a: nil,
b: n,
out: -1,
}, {
name: "two times; b nil",
a: n,
b: nil,
out: 1,
},
{
name: "two nils",
a: nil,
b: nil,
out: 0,
},
}
@@ -119,24 +176,62 @@ func TestCompareGetters(t *testing.T) {
attr string
out int
}{{
name: "eq",
name: "single eq",
a: simpleRow{"a": 10},
b: simpleRow{"a": 10},
attr: "a",
out: 0,
}, {
name: "lt",
name: "single lt",
a: simpleRow{"a": 9},
b: simpleRow{"a": 10},
attr: "a",
out: -1,
}, {
name: "gt",
name: "single gt",
a: simpleRow{"a": 10},
b: simpleRow{"a": 9},
attr: "a",
out: 1,
}}
},
{
name: "multi eq both empty",
a: &Row{},
b: &Row{},
attr: "a",
out: 0,
}, {
name: "multi eq same values",
a: (&Row{}).WithValue("a", 0, 1),
b: (&Row{}).WithValue("a", 0, 1),
attr: "a",
out: 0,
}, {
name: "multi lt a less items",
a: (&Row{}),
b: (&Row{}).WithValue("a", 0, 1),
attr: "a",
out: -1,
}, {
name: "multi lt a item less",
a: (&Row{}).WithValue("a", 0, 0),
b: (&Row{}).WithValue("a", 0, 1),
attr: "a",
out: -1,
}, {
name: "multi gt a more items",
a: (&Row{}).WithValue("a", 0, 1),
b: (&Row{}),
attr: "a",
out: 1,
}, {
name: "multi gt a item more",
a: (&Row{}).WithValue("a", 0, 1),
b: (&Row{}).WithValue("a", 0, 0),
attr: "a",
out: 1,
}}
for _, c := range tcc {
t.Run(c.name, func(t *testing.T) {
@@ -234,3 +329,205 @@ func TestStateConstraintsToExpression(t *testing.T) {
})
}
}
func TestMergeRows(t *testing.T) {
tcc := []struct {
name string
a *Row
b *Row
mapping []AttributeMapping
out *Row
}{{
name: "full merge; no mapping",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
out: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello").WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
}, {
name: "full merge; no mapping; collision",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr2", 0, true).WithValue("attr3", 0, "ee").WithValue("attr3", 1, 25),
out: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, true).WithValue("attr3", 0, "ee").WithValue("attr3", 1, 25),
},
{
name: "mapped merge",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
out: (&Row{}).WithValue("a", 0, 10).WithValue("b", 0, "hi").WithValue("b", 1, "hello").WithValue("c", 0, true).WithValue("d", 0, "ee").WithValue("d", 1, 25),
mapping: saToMapping([]simpleAttribute{{
ident: "a",
source: "attr1",
}, {
ident: "b",
source: "attr2",
}, {
ident: "c",
source: "attr3",
}, {
ident: "d",
source: "attr4",
}}...),
}, {
name: "mapped merge with conflicts",
a: (&Row{}).WithValue("attr1", 0, 10).WithValue("attr2", 0, "hi").WithValue("attr2", 1, "hello"),
b: (&Row{}).WithValue("attr3", 0, true).WithValue("attr4", 0, "ee").WithValue("attr4", 1, 25),
out: (&Row{}).WithValue("a", 0, 10).WithValue("b", 0, true).WithValue("c", 0, "ee").WithValue("c", 1, 25),
mapping: saToMapping([]simpleAttribute{{
ident: "a",
source: "attr1",
}, {
ident: "b",
source: "attr2",
}, {
ident: "b",
source: "attr3",
}, {
ident: "c",
source: "attr4",
}}...),
}}
for _, c := range tcc {
t.Run(c.name, func(t *testing.T) {
out := &Row{}
err := mergeRows(c.mapping, out, c.a, c.b)
require.NoError(t, err)
require.Equal(t, c.out, out)
})
}
}
func TestRowComparator(t *testing.T) {
// @todo add some more extreme cases
tcc := []struct {
name string
a ValueGetter
b ValueGetter
ss filter.SortExprSet
less bool
}{
// Simple one col cases
{
name: "single column simple asc less",
a: simpleRow{"a": 10},
b: simpleRow{"a": 20},
ss: filter.SortExprSet{{Column: "a", Descending: false}},
less: true,
},
{
name: "single column simple asc more",
a: simpleRow{"a": 20},
b: simpleRow{"a": 10},
ss: filter.SortExprSet{{Column: "a", Descending: false}},
less: false,
},
{
name: "single column simple asc equal",
a: simpleRow{"a": 10},
b: simpleRow{"a": 10},
ss: filter.SortExprSet{{Column: "a", Descending: false}},
less: false,
},
{
name: "single column simple desc less",
a: simpleRow{"a": 20},
b: simpleRow{"a": 10},
ss: filter.SortExprSet{{Column: "a", Descending: true}},
less: true,
},
{
name: "single column simple desc more",
a: simpleRow{"a": 10},
b: simpleRow{"a": 20},
ss: filter.SortExprSet{{Column: "a", Descending: true}},
less: false,
},
{
name: "single column simple desc equal",
a: simpleRow{"a": 10},
b: simpleRow{"a": 10},
ss: filter.SortExprSet{{Column: "a", Descending: true}},
less: false,
},
// basic 2 col cases
{
name: "two column asc less first priority",
a: simpleRow{"a": 10, "b": 100},
b: simpleRow{"a": 20, "b": 1},
ss: filter.SortExprSet{{Column: "a", Descending: false}, {Column: "b", Descending: false}},
less: true,
},
{
name: "two column asc less first equal",
a: simpleRow{"a": 10, "b": 1},
b: simpleRow{"a": 10, "b": 2},
ss: filter.SortExprSet{{Column: "a", Descending: false}, {Column: "b", Descending: false}},
less: true,
},
{
name: "two column asc equal",
a: simpleRow{"a": 10, "b": 1},
b: simpleRow{"a": 10, "b": 1},
ss: filter.SortExprSet{{Column: "a", Descending: false}, {Column: "b", Descending: false}},
less: false,
},
{
name: "two column desc less first priority",
a: simpleRow{"a": 20, "b": 1},
b: simpleRow{"a": 10, "b": 100},
ss: filter.SortExprSet{{Column: "a", Descending: true}, {Column: "b", Descending: true}},
less: true,
},
{
name: "two column desc less first equal",
a: simpleRow{"a": 10, "b": 2},
b: simpleRow{"a": 10, "b": 1},
ss: filter.SortExprSet{{Column: "a", Descending: true}, {Column: "b", Descending: true}},
less: true,
},
{
name: "two column desc equal",
a: simpleRow{"a": 10, "b": 1},
b: simpleRow{"a": 10, "b": 1},
ss: filter.SortExprSet{{Column: "a", Descending: true}, {Column: "b", Descending: true}},
less: false,
},
}
for _, c := range tcc {
t.Run(c.name, func(t *testing.T) {
less := makeRowComparator(c.ss...)(c.a, c.b)
require.Equal(t, c.less, less)
})
}
}
func TestRowResetting(t *testing.T) {
r := &Row{}
gv := func(ident string, pos uint) any {
v, err := r.GetValue(ident, pos)
require.NoError(t, err)
return v
}
r.SetValue("a", 0, 1)
r.SetValue("a", 0, 2)
r.SetValue("a", 1, 1)
require.Equal(t, 2, gv("a", 0))
require.Equal(t, 1, gv("a", 1))
require.Equal(t, uint(2), r.counters["a"])
r.Reset()
require.Equal(t, uint(0), r.counters["a"])
r.SetValue("a", 0, 3)
r.SetValue("b", 0, 4)
require.Equal(t, 3, gv("a", 0))
require.Equal(t, uint(1), r.counters["a"])
require.Equal(t, 4, gv("b", 0))
require.Equal(t, uint(1), r.counters["b"])
}