3
0

POC exporting record datasources

This commit is contained in:
Tomaž Jerman 2023-02-23 11:17:42 +01:00
parent 25b880e049
commit a8b60c6525
11 changed files with 315 additions and 39 deletions

View File

@ -107,6 +107,12 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer
if err != nil {
return nil, err
}
// @todo consider changing this.
// Currently it's required because the .decode may return some
// nested nodes as well.
// Consider a flag or a new function.
aux = envoyx.NodesForResourceType(ref.ResourceType, aux...)
if len(aux) == 0 {
return nil, fmt.Errorf("invalid reference %v", ref)
}
@ -125,21 +131,31 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer
var aux envoyx.NodeSet
for i, wf := range wrappedFilters {
switch wf.rt {
{{- range .resources -}}
{{ range .resources -}}
{{- if .envoy.omit}}{{continue}}{{ end -}}
case types.{{.expIdent}}ResourceType:
aux, err = d.decode{{.expIdent}}(ctx, s, dl, d.make{{.expIdent}}Filter(scopedNodes[i], refNodes[i], wf.f))
if err != nil {
return
}
for _, a := range aux {
a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers)
a.References = envoyx.MergeRefs(a.References, refRefs[i])
}
out = append(out, aux...)
case types.{{.expIdent}}ResourceType:
aux, err = d.decode{{.expIdent}}(ctx, s, dl, d.make{{.expIdent}}Filter(scopedNodes[i], refNodes[i], wf.f))
if err != nil {
return
}
for _, a := range aux {
a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers)
a.References = envoyx.MergeRefs(a.References, refRefs[i])
}
out = append(out, aux...)
{{ end -}}
{{ end }}
default:
aux, err = d.extendDecoder(ctx, s, dl, wf.rt, refNodes[i], wf.f)
if err!= nil {
return
}
for _, a := range aux {
a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers)
a.References = envoyx.MergeRefs(a.References, refRefs[i])
}
out = append(out, aux...)
}
}

View File

@ -0,0 +1,95 @@
package envoy
import (
"context"
"encoding/csv"
"fmt"
"io"
"github.com/cortezaproject/corteza/server/pkg/envoyx"
)
type (
CsvEncoder struct{}
)
func (e CsvEncoder) Encode(ctx context.Context, p envoyx.EncodeParams, rt string, nodes envoyx.NodeSet, tt envoyx.Traverser) (err error) {
w, err := e.getWriter(p)
if err != nil {
return
}
cw := csv.NewWriter(w)
switch rt {
case ComposeRecordDatasourceAuxType:
_, err = e.encodeRecordDatasources(ctx, cw, p, nodes, tt)
if err != nil {
return
}
}
cw.Flush()
return
}
func (e CsvEncoder) encodeRecordDatasources(ctx context.Context, writer *csv.Writer, p envoyx.EncodeParams, nodes envoyx.NodeSet, tt envoyx.Traverser) (out any, err error) {
for _, n := range nodes {
_, err = e.encodeRecordDatasource(ctx, writer, p, n, tt)
if err != nil {
return
}
}
return
}
func (e CsvEncoder) encodeRecordDatasource(ctx context.Context, writer *csv.Writer, p envoyx.EncodeParams, node *envoyx.Node, tt envoyx.Traverser) (_ any, err error) {
rds := node.Datasource.(*RecordDatasource)
out := make(map[string]string)
header := make([]string, 0, 4)
row := make([]string, 0, 4)
var more bool
for {
_, more, err = rds.Next(ctx, out)
if err != nil || !more {
return
}
if len(header) == 0 {
for k := range out {
header = append(header, k)
}
err = writer.Write(header)
if err != nil {
return
}
}
for _, h := range header {
row = append(row, out[h])
}
err = writer.Write(row)
if err != nil {
return
}
row = nil
}
}
func (e CsvEncoder) getWriter(p envoyx.EncodeParams) (out io.Writer, err error) {
aux, ok := p.Params["writer"]
if ok {
out, ok = aux.(io.Writer)
if ok {
return
}
}
err = fmt.Errorf("csv encoder expects a writer conforming to io.Writer interface")
return
}

View File

@ -1,6 +1,9 @@
package envoy
import (
"context"
"github.com/cortezaproject/corteza/server/pkg/dal"
"github.com/cortezaproject/corteza/server/pkg/envoyx"
"github.com/spf13/cast"
)
@ -19,6 +22,16 @@ type (
// @todo we might need to flush these to the disc in case a huge dataset is passed in
refToID map[string]uint64
}
// iteratorProvider is a wrapper around the dal.Iterator to conform to the
// envoy.Provider interface
iteratorProvider struct {
iter dal.Iterator
rowCache auxRecord
}
auxRecord map[string]string
)
func (rd *RecordDatasource) SetProvider(s envoyx.Provider) bool {
@ -30,12 +43,12 @@ func (rd *RecordDatasource) SetProvider(s envoyx.Provider) bool {
return true
}
func (rd *RecordDatasource) Next(out map[string]string) (ident string, more bool, err error) {
func (rd *RecordDatasource) Next(ctx context.Context, out map[string]string) (ident string, more bool, err error) {
if rd.rowCache == nil {
rd.rowCache = make(map[string]string)
}
more, err = rd.provider.Next(rd.rowCache)
more, err = rd.provider.Next(ctx, rd.rowCache)
if err != nil || !more {
return
}
@ -47,11 +60,18 @@ func (rd *RecordDatasource) Next(out map[string]string) (ident string, more bool
return
}
func (rd *RecordDatasource) Reset() (err error) {
return rd.provider.Reset()
func (rd *RecordDatasource) Reset(ctx context.Context) (err error) {
return rd.provider.Reset(ctx)
}
func (rd *RecordDatasource) applyMapping(in, out map[string]string) {
if len(rd.mapping.Mapping.m) == 0 {
for k, v := range in {
out[k] = v
}
return
}
for _, m := range rd.mapping.Mapping.m {
if m.Skip {
continue
@ -71,3 +91,39 @@ func (rd *RecordDatasource) ResolveRef(ref any) (out uint64, err error) {
out = rd.refToID[r]
return
}
func (ar auxRecord) SetValue(name string, pos uint, value any) (err error) {
ar[name] = cast.ToString(value)
return
}
func (ip *iteratorProvider) Next(ctx context.Context, out map[string]string) (more bool, err error) {
if ip.rowCache == nil {
ip.rowCache = make(auxRecord)
}
if !ip.iter.Next(ctx) {
return false, ip.iter.Err()
}
err = ip.iter.Scan(ip.rowCache)
if err != nil {
return
}
for k, v := range ip.rowCache {
out[k] = v
}
return true, nil
}
// @todo consider omitting these from the interface since they're not always needed
func (ip *iteratorProvider) Reset(ctx context.Context) (err error) {
return
}
// @todo consider omitting these from the interface since they're not always needed
func (ip *iteratorProvider) Ident() (out string) {
return
}

View File

@ -104,6 +104,12 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer
if err != nil {
return nil, err
}
// @todo consider changing this.
// Currently it's required because the .decode may return some
// nested nodes as well.
// Consider a flag or a new function.
aux = envoyx.NodesForResourceType(ref.ResourceType, aux...)
if len(aux) == 0 {
return nil, fmt.Errorf("invalid reference %v", ref)
}
@ -177,6 +183,16 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer
}
out = append(out, aux...)
default:
aux, err = d.extendDecoder(ctx, s, dl, wf.rt, refNodes[i], wf.f)
if err != nil {
return
}
for _, a := range aux {
a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers)
a.References = envoyx.MergeRefs(a.References, refRefs[i])
}
out = append(out, aux...)
}
}

View File

@ -2,10 +2,13 @@ package envoy
import (
"context"
"fmt"
"github.com/cortezaproject/corteza/server/compose/dalutils"
"github.com/cortezaproject/corteza/server/compose/types"
"github.com/cortezaproject/corteza/server/pkg/dal"
"github.com/cortezaproject/corteza/server/pkg/envoyx"
"github.com/cortezaproject/corteza/server/pkg/filter"
"github.com/cortezaproject/corteza/server/store"
)
@ -87,6 +90,7 @@ func (d StoreDecoder) extendedModuleDecoder(ctx context.Context, s store.Storer,
var ff types.ModuleFieldSet
for _, b := range base {
mod := b.Resource.(*types.Module)
ff, _, err = store.SearchComposeModuleFields(ctx, s, types.ModuleFieldFilter{ModuleID: []uint64{b.Resource.GetID()}})
if err != nil {
return
@ -104,6 +108,8 @@ func (d StoreDecoder) extendedModuleDecoder(ctx context.Context, s store.Storer,
}),
Scope: b.Scope,
})
mod.Fields = append(mod.Fields, f)
}
}
@ -115,3 +121,87 @@ func (d StoreDecoder) decodeChartRefs(c *types.Chart) (refs map[string]envoyx.Re
// @todo
return
}
func (d StoreDecoder) extendDecoder(ctx context.Context, s store.Storer, dl dal.FullService, rt string, refs map[string]*envoyx.Node, rf envoyx.ResourceFilter) (out envoyx.NodeSet, err error) {
switch rt {
case ComposeRecordDatasourceAuxType:
return d.decodeRecordDatasource(ctx, s, dl, refs, rf)
}
return
}
func (d StoreDecoder) decodeRecordDatasource(ctx context.Context, s store.Storer, dl dal.FullService, refs map[string]*envoyx.Node, rf envoyx.ResourceFilter) (out envoyx.NodeSet, err error) {
var (
ok bool
module *types.Module
moduleNode *envoyx.Node
namespace *types.Namespace
namespaceNode *envoyx.Node
)
// Get the refs
namespaceNode, ok = refs["NamespaceID"]
if !ok {
err = fmt.Errorf("namespace ref not found")
return
}
namespace = namespaceNode.Resource.(*types.Namespace)
moduleNode, ok = refs["ModuleID"]
if !ok {
err = fmt.Errorf("module ref not found")
return
}
module = moduleNode.Resource.(*types.Module)
// Get the iterator
iter, _, err := dalutils.ComposeRecordsIterator(ctx, dl, module, types.RecordFilter{
ModuleID: module.ID,
NamespaceID: namespace.ID,
Paging: filter.Paging{
Limit: rf.Limit,
},
})
if err != nil {
return
}
ou := &RecordDatasource{
provider: &iteratorProvider{iter: iter},
refToID: make(map[string]uint64),
}
rr := map[string]envoyx.Ref{
"NamespaceID": namespaceNode.ToRef(),
"ModuleID": moduleNode.ToRef(),
}
// @todo add refs based on module fields
// for _, f := range module.Fields {
// if f.Kind != "Record" {
// continue
// }
// rr[fmt.Sprintf("%s.module", f.Name)] = r
// rr[fmt.Sprintf("%s.datasource", f.Name)] = envoyx.Ref{
// ResourceType: ComposeRecordDatasourceAuxType,
// Identifiers: r.Identifiers,
// Scope: r.Scope,
// Optional: true,
// }
// }
out = append(out, &envoyx.Node{
Datasource: ou,
ResourceType: ComposeRecordDatasourceAuxType,
Identifiers: envoyx.MakeIdentifiers(module.ID, module.Handle),
References: rr,
Scope: moduleNode.Scope,
})
return
}

View File

@ -90,12 +90,13 @@ func (e StoreEncoder) encodeModuleExtend(ctx context.Context, p envoyx.EncodePar
ns := nsNode.Resource.(*types.Namespace)
// @todo get connection and things from there
model, err := service.ModuleToModel(ns, mod, "compose_record")
models, err := service.ModulesToModelSet(dl, ns, mod)
if err != nil {
return err
}
return dl.ReplaceModel(ctx, model)
// @note there is only one model so this is ok
return dl.ReplaceModel(ctx, models[0])
}
func (e StoreEncoder) encodeModuleExtendSubResources(ctx context.Context, p envoyx.EncodeParams, s store.Storer, n *envoyx.Node, tree envoyx.Traverser) (err error) {

View File

@ -31,7 +31,7 @@ func (e StoreEncoder) prepareRecordDatasource(ctx context.Context, p envoyx.Enco
if err != nil {
return
}
err = ds.Reset()
err = ds.Reset(ctx)
if err != nil {
return
}
@ -51,7 +51,7 @@ func (e StoreEncoder) prepareRecords(ctx context.Context, p envoyx.EncodeParams,
ds.refToID = make(map[string]uint64)
for {
ident, more, err = ds.Next(aux)
ident, more, err = ds.Next(ctx, aux)
if err != nil || !more {
return
}
@ -149,15 +149,14 @@ func (e StoreEncoder) encodeRecordDatasource(ctx context.Context, p envoyx.Encod
// - second pass makes the getters
getters := make(map[string]*recordGetter)
for k := range modIndex {
aux := makeRecordGetter(dl, tree, n, modIndex[k], dsIndex[k])
getters[k] = aux
getters[k] = makeRecordGetter(dl, tree, n, modIndex[k], dsIndex[k])
}
// Iterate and encode
//
// @todo utilize batching
for {
ident, more, err = ds.Next(auxRec)
ident, more, err = ds.Next(ctx, auxRec)
if err != nil || !more {
return
}

View File

@ -1154,7 +1154,7 @@ func DalModelReplace(ctx context.Context, dmm dalModelManager, ns *types.Namespa
models dal.ModelSet
)
models, err = modulesToModelSet(dmm, ns, modules...)
models, err = ModulesToModelSet(dmm, ns, modules...)
if err != nil {
return
}
@ -1170,11 +1170,11 @@ func DalModelReplace(ctx context.Context, dmm dalModelManager, ns *types.Namespa
}
func dalAttributeReplace(ctx context.Context, dmm dalModelManager, ns *types.Namespace, old, new *types.Module, hasRecords bool) (err error) {
oldModel, err := modulesToModelSet(dmm, ns, old)
oldModel, err := ModulesToModelSet(dmm, ns, old)
if err != nil {
return
}
newModel, err := modulesToModelSet(dmm, ns, new)
newModel, err := ModulesToModelSet(dmm, ns, new)
if err != nil {
return
}
@ -1202,12 +1202,12 @@ func DalModelRemove(ctx context.Context, dmm dalModelManager, mm ...*types.Modul
return
}
// modulesToModelSet takes a modules for a namespace and converts all of them
// ModulesToModelSet takes a modules for a namespace and converts all of them
// into a model set for the DAL
//
// Ident partition placeholders are replaced here as well alongside
// with the revision models where revisions are enabled
func modulesToModelSet(dmm dalModelManager, ns *types.Namespace, mm ...*types.Module) (out dal.ModelSet, err error) {
func ModulesToModelSet(dmm dalModelManager, ns *types.Namespace, mm ...*types.Module) (out dal.ModelSet, err error) {
var (
conn *dal.ConnectionWrap
model *dal.Model

View File

@ -1,6 +1,7 @@
package csv
import (
"context"
"encoding/csv"
"io"
"io/ioutil"
@ -57,7 +58,7 @@ func Decoder(r io.Reader, ident string) (out *decoder, err error) {
}
r, err = out.flushTemp(r)
defer out.Reset()
defer out.Reset(nil)
if err != nil {
return
}
@ -103,14 +104,14 @@ func (d *decoder) Fields() []string {
}
// Reset resets the decoder to the start
func (d *decoder) Reset() error {
func (d *decoder) Reset(_ context.Context) error {
_, err := d.src.Seek(0, 0)
d.skipHead = true
return err
}
// Next returns the field: value mapping for the next row
func (d *decoder) Next(out map[string]string) (more bool, err error) {
func (d *decoder) Next(_ context.Context, out map[string]string) (more bool, err error) {
if d.skipHead {
_, err = d.reader.Read()
if err != nil {

View File

@ -35,28 +35,28 @@ func TestDecoder(t *testing.T) {
aux := make(map[string]string)
var more bool
more, err = dc.Next(aux)
more, err = dc.Next(nil, aux)
req.NoError(err)
req.True(more)
req.Equal("r1f1", aux["f1"])
req.Equal("r1f2", aux["f2"])
req.Equal("r1f3", aux["f3"])
more, err = dc.Next(aux)
more, err = dc.Next(nil, aux)
req.NoError(err)
req.True(more)
req.Equal("r2f1", aux["f1"])
req.Equal("r2f2", aux["f2"])
req.Equal("r2f3", aux["f3"])
more, err = dc.Next(aux)
more, err = dc.Next(nil, aux)
req.NoError(err)
req.True(more)
req.Equal("r3f1", aux["f1"])
req.Equal("r3f2", aux["f2"])
req.Equal("r3f3", aux["f3"])
more, err = dc.Next(aux)
more, err = dc.Next(nil, aux)
req.NoError(err)
req.False(more)
})

View File

@ -1,15 +1,17 @@
package envoyx
import "context"
type (
Provider interface {
Next(out map[string]string) (more bool, err error)
Reset() error
Next(ctx context.Context, out map[string]string) (more bool, err error)
Reset(ctx context.Context) error
Ident() string
}
Datasource interface {
Next(out map[string]string) (ident string, more bool, err error)
Reset() error
Next(ctx context.Context, out map[string]string) (ident string, more bool, err error)
Reset(ctx context.Context) error
SetProvider(Provider) bool
}
)