diff --git a/server/codegen/assets/templates/gocode/envoy/store_decode.go.tpl b/server/codegen/assets/templates/gocode/envoy/store_decode.go.tpl index 9241eeccd..225091414 100644 --- a/server/codegen/assets/templates/gocode/envoy/store_decode.go.tpl +++ b/server/codegen/assets/templates/gocode/envoy/store_decode.go.tpl @@ -107,6 +107,12 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer if err != nil { return nil, err } + + // @todo consider changing this. + // Currently it's required because the .decode may return some + // nested nodes as well. + // Consider a flag or a new function. + aux = envoyx.NodesForResourceType(ref.ResourceType, aux...) if len(aux) == 0 { return nil, fmt.Errorf("invalid reference %v", ref) } @@ -125,21 +131,31 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer var aux envoyx.NodeSet for i, wf := range wrappedFilters { switch wf.rt { -{{- range .resources -}} +{{ range .resources -}} {{- if .envoy.omit}}{{continue}}{{ end -}} - case types.{{.expIdent}}ResourceType: - aux, err = d.decode{{.expIdent}}(ctx, s, dl, d.make{{.expIdent}}Filter(scopedNodes[i], refNodes[i], wf.f)) - if err != nil { - return - } - for _, a := range aux { - a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers) - a.References = envoyx.MergeRefs(a.References, refRefs[i]) - } - out = append(out, aux...) + case types.{{.expIdent}}ResourceType: + aux, err = d.decode{{.expIdent}}(ctx, s, dl, d.make{{.expIdent}}Filter(scopedNodes[i], refNodes[i], wf.f)) + if err != nil { + return + } + for _, a := range aux { + a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers) + a.References = envoyx.MergeRefs(a.References, refRefs[i]) + } + out = append(out, aux...) -{{ end -}} +{{ end }} + default: + aux, err = d.extendDecoder(ctx, s, dl, wf.rt, refNodes[i], wf.f) + if err!= nil { + return + } + for _, a := range aux { + a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers) + a.References = envoyx.MergeRefs(a.References, refRefs[i]) + } + out = append(out, aux...) } } diff --git a/server/compose/envoy/csv_encode.go b/server/compose/envoy/csv_encode.go new file mode 100644 index 000000000..3ee9b5e10 --- /dev/null +++ b/server/compose/envoy/csv_encode.go @@ -0,0 +1,95 @@ +package envoy + +import ( + "context" + "encoding/csv" + "fmt" + "io" + + "github.com/cortezaproject/corteza/server/pkg/envoyx" +) + +type ( + CsvEncoder struct{} +) + +func (e CsvEncoder) Encode(ctx context.Context, p envoyx.EncodeParams, rt string, nodes envoyx.NodeSet, tt envoyx.Traverser) (err error) { + w, err := e.getWriter(p) + if err != nil { + return + } + + cw := csv.NewWriter(w) + + switch rt { + case ComposeRecordDatasourceAuxType: + _, err = e.encodeRecordDatasources(ctx, cw, p, nodes, tt) + if err != nil { + return + } + } + + cw.Flush() + return +} + +func (e CsvEncoder) encodeRecordDatasources(ctx context.Context, writer *csv.Writer, p envoyx.EncodeParams, nodes envoyx.NodeSet, tt envoyx.Traverser) (out any, err error) { + for _, n := range nodes { + _, err = e.encodeRecordDatasource(ctx, writer, p, n, tt) + if err != nil { + return + } + } + + return +} + +func (e CsvEncoder) encodeRecordDatasource(ctx context.Context, writer *csv.Writer, p envoyx.EncodeParams, node *envoyx.Node, tt envoyx.Traverser) (_ any, err error) { + rds := node.Datasource.(*RecordDatasource) + + out := make(map[string]string) + header := make([]string, 0, 4) + + row := make([]string, 0, 4) + var more bool + for { + _, more, err = rds.Next(ctx, out) + if err != nil || !more { + return + } + + if len(header) == 0 { + for k := range out { + header = append(header, k) + } + err = writer.Write(header) + if err != nil { + return + } + } + + for _, h := range header { + row = append(row, out[h]) + } + + err = writer.Write(row) + if err != nil { + return + } + + row = nil + } +} + +func (e CsvEncoder) getWriter(p envoyx.EncodeParams) (out io.Writer, err error) { + aux, ok := p.Params["writer"] + if ok { + out, ok = aux.(io.Writer) + if ok { + return + } + } + + err = fmt.Errorf("csv encoder expects a writer conforming to io.Writer interface") + return +} diff --git a/server/compose/envoy/record_datasource.go b/server/compose/envoy/record_datasource.go index 835f1bf2e..3602887cb 100644 --- a/server/compose/envoy/record_datasource.go +++ b/server/compose/envoy/record_datasource.go @@ -1,6 +1,9 @@ package envoy import ( + "context" + + "github.com/cortezaproject/corteza/server/pkg/dal" "github.com/cortezaproject/corteza/server/pkg/envoyx" "github.com/spf13/cast" ) @@ -19,6 +22,16 @@ type ( // @todo we might need to flush these to the disc in case a huge dataset is passed in refToID map[string]uint64 } + + // iteratorProvider is a wrapper around the dal.Iterator to conform to the + // envoy.Provider interface + iteratorProvider struct { + iter dal.Iterator + + rowCache auxRecord + } + + auxRecord map[string]string ) func (rd *RecordDatasource) SetProvider(s envoyx.Provider) bool { @@ -30,12 +43,12 @@ func (rd *RecordDatasource) SetProvider(s envoyx.Provider) bool { return true } -func (rd *RecordDatasource) Next(out map[string]string) (ident string, more bool, err error) { +func (rd *RecordDatasource) Next(ctx context.Context, out map[string]string) (ident string, more bool, err error) { if rd.rowCache == nil { rd.rowCache = make(map[string]string) } - more, err = rd.provider.Next(rd.rowCache) + more, err = rd.provider.Next(ctx, rd.rowCache) if err != nil || !more { return } @@ -47,11 +60,18 @@ func (rd *RecordDatasource) Next(out map[string]string) (ident string, more bool return } -func (rd *RecordDatasource) Reset() (err error) { - return rd.provider.Reset() +func (rd *RecordDatasource) Reset(ctx context.Context) (err error) { + return rd.provider.Reset(ctx) } func (rd *RecordDatasource) applyMapping(in, out map[string]string) { + if len(rd.mapping.Mapping.m) == 0 { + for k, v := range in { + out[k] = v + } + return + } + for _, m := range rd.mapping.Mapping.m { if m.Skip { continue @@ -71,3 +91,39 @@ func (rd *RecordDatasource) ResolveRef(ref any) (out uint64, err error) { out = rd.refToID[r] return } + +func (ar auxRecord) SetValue(name string, pos uint, value any) (err error) { + ar[name] = cast.ToString(value) + return +} + +func (ip *iteratorProvider) Next(ctx context.Context, out map[string]string) (more bool, err error) { + if ip.rowCache == nil { + ip.rowCache = make(auxRecord) + } + + if !ip.iter.Next(ctx) { + return false, ip.iter.Err() + } + + err = ip.iter.Scan(ip.rowCache) + if err != nil { + return + } + + for k, v := range ip.rowCache { + out[k] = v + } + + return true, nil +} + +// @todo consider omitting these from the interface since they're not always needed +func (ip *iteratorProvider) Reset(ctx context.Context) (err error) { + return +} + +// @todo consider omitting these from the interface since they're not always needed +func (ip *iteratorProvider) Ident() (out string) { + return +} diff --git a/server/compose/envoy/store_decode.gen.go b/server/compose/envoy/store_decode.gen.go index 3492650a8..07b248d61 100644 --- a/server/compose/envoy/store_decode.gen.go +++ b/server/compose/envoy/store_decode.gen.go @@ -104,6 +104,12 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer if err != nil { return nil, err } + + // @todo consider changing this. + // Currently it's required because the .decode may return some + // nested nodes as well. + // Consider a flag or a new function. + aux = envoyx.NodesForResourceType(ref.ResourceType, aux...) if len(aux) == 0 { return nil, fmt.Errorf("invalid reference %v", ref) } @@ -177,6 +183,16 @@ func (d StoreDecoder) decode(ctx context.Context, s store.Storer, dl dal.FullSer } out = append(out, aux...) + default: + aux, err = d.extendDecoder(ctx, s, dl, wf.rt, refNodes[i], wf.f) + if err != nil { + return + } + for _, a := range aux { + a.Identifiers = a.Identifiers.Merge(wf.f.Identifiers) + a.References = envoyx.MergeRefs(a.References, refRefs[i]) + } + out = append(out, aux...) } } diff --git a/server/compose/envoy/store_decode.go b/server/compose/envoy/store_decode.go index 0e45aaedc..ceac3c873 100644 --- a/server/compose/envoy/store_decode.go +++ b/server/compose/envoy/store_decode.go @@ -2,10 +2,13 @@ package envoy import ( "context" + "fmt" + "github.com/cortezaproject/corteza/server/compose/dalutils" "github.com/cortezaproject/corteza/server/compose/types" "github.com/cortezaproject/corteza/server/pkg/dal" "github.com/cortezaproject/corteza/server/pkg/envoyx" + "github.com/cortezaproject/corteza/server/pkg/filter" "github.com/cortezaproject/corteza/server/store" ) @@ -87,6 +90,7 @@ func (d StoreDecoder) extendedModuleDecoder(ctx context.Context, s store.Storer, var ff types.ModuleFieldSet for _, b := range base { + mod := b.Resource.(*types.Module) ff, _, err = store.SearchComposeModuleFields(ctx, s, types.ModuleFieldFilter{ModuleID: []uint64{b.Resource.GetID()}}) if err != nil { return @@ -104,6 +108,8 @@ func (d StoreDecoder) extendedModuleDecoder(ctx context.Context, s store.Storer, }), Scope: b.Scope, }) + + mod.Fields = append(mod.Fields, f) } } @@ -115,3 +121,87 @@ func (d StoreDecoder) decodeChartRefs(c *types.Chart) (refs map[string]envoyx.Re // @todo return } + +func (d StoreDecoder) extendDecoder(ctx context.Context, s store.Storer, dl dal.FullService, rt string, refs map[string]*envoyx.Node, rf envoyx.ResourceFilter) (out envoyx.NodeSet, err error) { + switch rt { + case ComposeRecordDatasourceAuxType: + return d.decodeRecordDatasource(ctx, s, dl, refs, rf) + } + + return +} + +func (d StoreDecoder) decodeRecordDatasource(ctx context.Context, s store.Storer, dl dal.FullService, refs map[string]*envoyx.Node, rf envoyx.ResourceFilter) (out envoyx.NodeSet, err error) { + var ( + ok bool + + module *types.Module + moduleNode *envoyx.Node + namespace *types.Namespace + namespaceNode *envoyx.Node + ) + + // Get the refs + namespaceNode, ok = refs["NamespaceID"] + if !ok { + err = fmt.Errorf("namespace ref not found") + return + } + namespace = namespaceNode.Resource.(*types.Namespace) + + moduleNode, ok = refs["ModuleID"] + if !ok { + err = fmt.Errorf("module ref not found") + return + } + module = moduleNode.Resource.(*types.Module) + + // Get the iterator + iter, _, err := dalutils.ComposeRecordsIterator(ctx, dl, module, types.RecordFilter{ + ModuleID: module.ID, + NamespaceID: namespace.ID, + Paging: filter.Paging{ + Limit: rf.Limit, + }, + }) + + if err != nil { + return + } + + ou := &RecordDatasource{ + provider: &iteratorProvider{iter: iter}, + refToID: make(map[string]uint64), + } + + rr := map[string]envoyx.Ref{ + "NamespaceID": namespaceNode.ToRef(), + "ModuleID": moduleNode.ToRef(), + } + + // @todo add refs based on module fields + // for _, f := range module.Fields { + // if f.Kind != "Record" { + // continue + // } + + // rr[fmt.Sprintf("%s.module", f.Name)] = r + // rr[fmt.Sprintf("%s.datasource", f.Name)] = envoyx.Ref{ + // ResourceType: ComposeRecordDatasourceAuxType, + // Identifiers: r.Identifiers, + // Scope: r.Scope, + // Optional: true, + // } + // } + + out = append(out, &envoyx.Node{ + Datasource: ou, + ResourceType: ComposeRecordDatasourceAuxType, + + Identifiers: envoyx.MakeIdentifiers(module.ID, module.Handle), + References: rr, + Scope: moduleNode.Scope, + }) + + return +} diff --git a/server/compose/envoy/store_encode.go b/server/compose/envoy/store_encode.go index 8eb4d5f09..0e1059a9b 100644 --- a/server/compose/envoy/store_encode.go +++ b/server/compose/envoy/store_encode.go @@ -90,12 +90,13 @@ func (e StoreEncoder) encodeModuleExtend(ctx context.Context, p envoyx.EncodePar ns := nsNode.Resource.(*types.Namespace) // @todo get connection and things from there - model, err := service.ModuleToModel(ns, mod, "compose_record") + models, err := service.ModulesToModelSet(dl, ns, mod) if err != nil { return err } - return dl.ReplaceModel(ctx, model) + // @note there is only one model so this is ok + return dl.ReplaceModel(ctx, models[0]) } func (e StoreEncoder) encodeModuleExtendSubResources(ctx context.Context, p envoyx.EncodeParams, s store.Storer, n *envoyx.Node, tree envoyx.Traverser) (err error) { diff --git a/server/compose/envoy/store_encode_datasource.go b/server/compose/envoy/store_encode_datasource.go index f2636bee7..b91b257af 100644 --- a/server/compose/envoy/store_encode_datasource.go +++ b/server/compose/envoy/store_encode_datasource.go @@ -31,7 +31,7 @@ func (e StoreEncoder) prepareRecordDatasource(ctx context.Context, p envoyx.Enco if err != nil { return } - err = ds.Reset() + err = ds.Reset(ctx) if err != nil { return } @@ -51,7 +51,7 @@ func (e StoreEncoder) prepareRecords(ctx context.Context, p envoyx.EncodeParams, ds.refToID = make(map[string]uint64) for { - ident, more, err = ds.Next(aux) + ident, more, err = ds.Next(ctx, aux) if err != nil || !more { return } @@ -149,15 +149,14 @@ func (e StoreEncoder) encodeRecordDatasource(ctx context.Context, p envoyx.Encod // - second pass makes the getters getters := make(map[string]*recordGetter) for k := range modIndex { - aux := makeRecordGetter(dl, tree, n, modIndex[k], dsIndex[k]) - getters[k] = aux + getters[k] = makeRecordGetter(dl, tree, n, modIndex[k], dsIndex[k]) } // Iterate and encode // // @todo utilize batching for { - ident, more, err = ds.Next(auxRec) + ident, more, err = ds.Next(ctx, auxRec) if err != nil || !more { return } diff --git a/server/compose/service/module.go b/server/compose/service/module.go index 8963daaeb..5348667f6 100644 --- a/server/compose/service/module.go +++ b/server/compose/service/module.go @@ -1154,7 +1154,7 @@ func DalModelReplace(ctx context.Context, dmm dalModelManager, ns *types.Namespa models dal.ModelSet ) - models, err = modulesToModelSet(dmm, ns, modules...) + models, err = ModulesToModelSet(dmm, ns, modules...) if err != nil { return } @@ -1170,11 +1170,11 @@ func DalModelReplace(ctx context.Context, dmm dalModelManager, ns *types.Namespa } func dalAttributeReplace(ctx context.Context, dmm dalModelManager, ns *types.Namespace, old, new *types.Module, hasRecords bool) (err error) { - oldModel, err := modulesToModelSet(dmm, ns, old) + oldModel, err := ModulesToModelSet(dmm, ns, old) if err != nil { return } - newModel, err := modulesToModelSet(dmm, ns, new) + newModel, err := ModulesToModelSet(dmm, ns, new) if err != nil { return } @@ -1202,12 +1202,12 @@ func DalModelRemove(ctx context.Context, dmm dalModelManager, mm ...*types.Modul return } -// modulesToModelSet takes a modules for a namespace and converts all of them +// ModulesToModelSet takes a modules for a namespace and converts all of them // into a model set for the DAL // // Ident partition placeholders are replaced here as well alongside // with the revision models where revisions are enabled -func modulesToModelSet(dmm dalModelManager, ns *types.Namespace, mm ...*types.Module) (out dal.ModelSet, err error) { +func ModulesToModelSet(dmm dalModelManager, ns *types.Namespace, mm ...*types.Module) (out dal.ModelSet, err error) { var ( conn *dal.ConnectionWrap model *dal.Model diff --git a/server/pkg/envoyx/csv/decoder.go b/server/pkg/envoyx/csv/decoder.go index 0622b7597..fa0b207d0 100644 --- a/server/pkg/envoyx/csv/decoder.go +++ b/server/pkg/envoyx/csv/decoder.go @@ -1,6 +1,7 @@ package csv import ( + "context" "encoding/csv" "io" "io/ioutil" @@ -57,7 +58,7 @@ func Decoder(r io.Reader, ident string) (out *decoder, err error) { } r, err = out.flushTemp(r) - defer out.Reset() + defer out.Reset(nil) if err != nil { return } @@ -103,14 +104,14 @@ func (d *decoder) Fields() []string { } // Reset resets the decoder to the start -func (d *decoder) Reset() error { +func (d *decoder) Reset(_ context.Context) error { _, err := d.src.Seek(0, 0) d.skipHead = true return err } // Next returns the field: value mapping for the next row -func (d *decoder) Next(out map[string]string) (more bool, err error) { +func (d *decoder) Next(_ context.Context, out map[string]string) (more bool, err error) { if d.skipHead { _, err = d.reader.Read() if err != nil { diff --git a/server/pkg/envoyx/csv/decoder_test.go b/server/pkg/envoyx/csv/decoder_test.go index 9f5a53a4f..e3e4faaa4 100644 --- a/server/pkg/envoyx/csv/decoder_test.go +++ b/server/pkg/envoyx/csv/decoder_test.go @@ -35,28 +35,28 @@ func TestDecoder(t *testing.T) { aux := make(map[string]string) var more bool - more, err = dc.Next(aux) + more, err = dc.Next(nil, aux) req.NoError(err) req.True(more) req.Equal("r1f1", aux["f1"]) req.Equal("r1f2", aux["f2"]) req.Equal("r1f3", aux["f3"]) - more, err = dc.Next(aux) + more, err = dc.Next(nil, aux) req.NoError(err) req.True(more) req.Equal("r2f1", aux["f1"]) req.Equal("r2f2", aux["f2"]) req.Equal("r2f3", aux["f3"]) - more, err = dc.Next(aux) + more, err = dc.Next(nil, aux) req.NoError(err) req.True(more) req.Equal("r3f1", aux["f1"]) req.Equal("r3f2", aux["f2"]) req.Equal("r3f3", aux["f3"]) - more, err = dc.Next(aux) + more, err = dc.Next(nil, aux) req.NoError(err) req.False(more) }) diff --git a/server/pkg/envoyx/datasource.go b/server/pkg/envoyx/datasource.go index 70d255420..2d3a274e6 100644 --- a/server/pkg/envoyx/datasource.go +++ b/server/pkg/envoyx/datasource.go @@ -1,15 +1,17 @@ package envoyx +import "context" + type ( Provider interface { - Next(out map[string]string) (more bool, err error) - Reset() error + Next(ctx context.Context, out map[string]string) (more bool, err error) + Reset(ctx context.Context) error Ident() string } Datasource interface { - Next(out map[string]string) (ident string, more bool, err error) - Reset() error + Next(ctx context.Context, out map[string]string) (ident string, more bool, err error) + Reset(ctx context.Context) error SetProvider(Provider) bool } )