Properly handle join cases where local key is not unique
This commit is contained in:
@@ -361,17 +361,30 @@ func (d *joinedDataset) sortFrameBuffers(localLoader, foreignLoader *frameLoadCt
|
||||
// Each foreign frame is on the same index as the corresponding local row.
|
||||
// This simplifies later algorithms and removes the need for additional
|
||||
// mapping structures.
|
||||
//
|
||||
// We need to use a slice of indexes as we don't pull duplicated data; we need
|
||||
// to do this here.
|
||||
bucketsN := make(map[string][]int)
|
||||
//
|
||||
// use maxI for an easier determination of the maximum index.
|
||||
maxI := -1
|
||||
for _, l := range localBuffer {
|
||||
l.WalkRows(func(i int, r FrameRow) error {
|
||||
buckets[cast.ToString(r[localLoader.keyColIndex].Get())] = i
|
||||
k := cast.ToString(r[localLoader.keyColIndex].Get())
|
||||
bucketsN[k] = append(bucketsN[k], i)
|
||||
if i > maxI {
|
||||
maxI = i
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
//
|
||||
// Update foreign frame order based on reordered buckets
|
||||
aux := make([]*Frame, len(foreignBuffer))
|
||||
aux := make([]*Frame, maxI+1)
|
||||
for _, f := range foreignBuffer {
|
||||
aux[buckets[f.RefValue]] = f
|
||||
for _, i := range bucketsN[f.RefValue] {
|
||||
aux[i] = f
|
||||
}
|
||||
}
|
||||
|
||||
return localBuffer, aux
|
||||
|
||||
65
tests/reporter/join_dup_keys_test.go
Normal file
65
tests/reporter/join_dup_keys_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package reporter
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/report"
|
||||
)
|
||||
|
||||
func Test_join_dup_keys(t *testing.T) {
|
||||
var (
|
||||
ctx, h, s = setup(t)
|
||||
m, _, dd = loadScenarioOwnDM(ctx, s, t, h)
|
||||
ff []*report.Frame
|
||||
|
||||
local *report.Frame
|
||||
def = dd[0]
|
||||
)
|
||||
|
||||
// // // PAGE 1
|
||||
ff = loadNoErr(ctx, h, m, def)
|
||||
h.a.Len(ff, 3)
|
||||
local = ff[0]
|
||||
|
||||
// local
|
||||
h.a.Equal(2, local.Size())
|
||||
h.a.NotNil(local.Paging)
|
||||
h.a.NotNil(local.Paging.NextPage)
|
||||
checkRows(h, local,
|
||||
"a, 11",
|
||||
"a, 12")
|
||||
|
||||
h.a.Equal("a", ff[1].RefValue)
|
||||
h.a.Equal("a", ff[2].RefValue)
|
||||
|
||||
// // // PAGE 2
|
||||
def.Paging.PageCursor = local.Paging.NextPage
|
||||
ff = loadNoErr(ctx, h, m, def)
|
||||
h.a.Len(ff, 3)
|
||||
local = ff[0]
|
||||
|
||||
// local
|
||||
h.a.Equal(2, local.Size())
|
||||
h.a.NotNil(local.Paging)
|
||||
h.a.NotNil(local.Paging.NextPage)
|
||||
checkRows(h, local,
|
||||
"a, 13",
|
||||
"b, 14")
|
||||
|
||||
h.a.Equal("a", ff[1].RefValue)
|
||||
h.a.Equal("b", ff[2].RefValue)
|
||||
|
||||
// // // PAGE 1
|
||||
def.Paging.PageCursor = local.Paging.NextPage
|
||||
ff = loadNoErr(ctx, h, m, def)
|
||||
h.a.Len(ff, 2)
|
||||
local = ff[0]
|
||||
|
||||
// local
|
||||
h.a.Equal(1, local.Size())
|
||||
h.a.Nil(local.Paging)
|
||||
checkRows(h, local,
|
||||
"c, 15")
|
||||
|
||||
h.a.Equal("c", ff[1].RefValue)
|
||||
}
|
||||
6
tests/reporter/testdata/join_dup_keys/data_model/aa.csv
vendored
Normal file
6
tests/reporter/testdata/join_dup_keys/data_model/aa.csv
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
id,pk,value
|
||||
1,a,11
|
||||
2,a,12
|
||||
3,a,13
|
||||
4,b,14
|
||||
5,c,15
|
||||
|
6
tests/reporter/testdata/join_dup_keys/data_model/bb.csv
vendored
Normal file
6
tests/reporter/testdata/join_dup_keys/data_model/bb.csv
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
id,fk,value
|
||||
1,a,11
|
||||
2,a,12
|
||||
3,b,13
|
||||
4,b,14
|
||||
5,c,15
|
||||
|
42
tests/reporter/testdata/join_dup_keys/data_model/datamodel.yaml
vendored
Normal file
42
tests/reporter/testdata/join_dup_keys/data_model/datamodel.yaml
vendored
Normal file
@@ -0,0 +1,42 @@
|
||||
namespaces:
|
||||
ns:
|
||||
name: ns name
|
||||
|
||||
modules:
|
||||
aa:
|
||||
records:
|
||||
source: aa.csv
|
||||
key: id
|
||||
mapping:
|
||||
id: /
|
||||
pk:
|
||||
field: pk
|
||||
value:
|
||||
field: value
|
||||
|
||||
fields:
|
||||
pk:
|
||||
label: pk label
|
||||
kind: String
|
||||
value:
|
||||
label: value label
|
||||
kind: Number
|
||||
|
||||
bb:
|
||||
records:
|
||||
source: bb.csv
|
||||
key: id
|
||||
mapping:
|
||||
id: /
|
||||
fk:
|
||||
field: fk
|
||||
value:
|
||||
field: value
|
||||
|
||||
fields:
|
||||
fk:
|
||||
label: fk label
|
||||
kind: String
|
||||
value:
|
||||
label: value label
|
||||
kind: Number
|
||||
38
tests/reporter/testdata/join_dup_keys/report.json
vendored
Normal file
38
tests/reporter/testdata/join_dup_keys/report.json
vendored
Normal file
@@ -0,0 +1,38 @@
|
||||
{
|
||||
"handle": "testing_report",
|
||||
"sources": [
|
||||
{ "step": { "load": {
|
||||
"name": "aa",
|
||||
"source": "composeRecords",
|
||||
"definition": {
|
||||
"module": "aa",
|
||||
"namespace": "ns"
|
||||
}
|
||||
}}},
|
||||
{ "step": { "load": {
|
||||
"name": "bb",
|
||||
"source": "composeRecords",
|
||||
"definition": {
|
||||
"module": "bb",
|
||||
"namespace": "ns"
|
||||
}
|
||||
}}},
|
||||
|
||||
{ "step": { "join": {
|
||||
"name": "joined",
|
||||
|
||||
"localSource": "aa",
|
||||
"localColumn": "pk",
|
||||
"foreignSource": "bb",
|
||||
"foreignColumn": "fk"
|
||||
}}}
|
||||
],
|
||||
|
||||
"frames": [{
|
||||
"name": "result",
|
||||
"source": "joined",
|
||||
"paging": {
|
||||
"limit": 2
|
||||
}
|
||||
}]
|
||||
}
|
||||
Reference in New Issue
Block a user