From e50472ed8b27348388a4faafd6283b072a22d73a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Tue, 4 Aug 2020 13:14:09 +0200 Subject: [PATCH 1/3] Allow the importer to use legacy ids for future references --- pkg/ngimporter/main.go | 46 +++++++++++++- pkg/ngimporter/types/general.go | 2 + pkg/ngimporter/types/import_node.go | 95 ++++++++++++++++++++++++----- 3 files changed, 128 insertions(+), 15 deletions(-) diff --git a/pkg/ngimporter/main.go b/pkg/ngimporter/main.go index dfe8ce19a..d019a7fe1 100644 --- a/pkg/ngimporter/main.go +++ b/pkg/ngimporter/main.go @@ -38,6 +38,7 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er imp := &Importer{} db := repository.DB(ctx) modRepo := repository.Module(ctx, db) + recRepo := repository.Record(ctx, db) var err error // import users @@ -50,7 +51,7 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er } // maps sourceUserID to CortezaID - var uMap map[string]uint64 + uMap := make(map[string]uint64) if usrSrc != nil { um, mgu, err := importUsers(ctx, usrSrc, ns) if err != nil { @@ -73,6 +74,49 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er } } + // populate with existing users + uMod, err := findModuleByHandle(modRepo, ns.ID, "user") + if err != nil { + return err + } + rr, _, err := recRepo.Find(uMod, cct.RecordFilter{ + ModuleID: uMod.ID, + Deleted: rh.FilterStateInclusive, + NamespaceID: ns.ID, + Query: "sys_legacy_ref_id IS NOT NULL", + PageFilter: rh.PageFilter{ + Page: 1, + PerPage: 0, + }, + }) + if err != nil { + return err + } + + rvs, err := recRepo.LoadValues(uMod.Fields.Names(), rr.IDs()) + if err != nil { + return err + } + + err = rr.Walk(func(r *cct.Record) error { + r.Values = rvs.FilterByRecordID(r.ID) + return nil + }) + if err != nil { + return err + } + + rr.Walk(func(r *cct.Record) error { + vr := r.Values.Get("sys_legacy_ref_id", 0) + vu := r.Values.Get("UserID", 0) + u, err := strconv.ParseUint(vu.Value, 10, 64) + if err != nil { + return err + } + uMap[vr.Value] = u + return nil + }) + iss, err = joinData(iss) if err != nil { return err diff --git a/pkg/ngimporter/types/general.go b/pkg/ngimporter/types/general.go index 1e013ab9a..8f5100a70 100644 --- a/pkg/ngimporter/types/general.go +++ b/pkg/ngimporter/types/general.go @@ -21,6 +21,8 @@ const ( MetaMapExt = ".map.json" MetaJoinExt = ".join.json" MetaValueExt = ".value.json" + + LegacyRefIDField = "sys_legacy_ref_id" ) var ( diff --git a/pkg/ngimporter/types/import_node.go b/pkg/ngimporter/types/import_node.go index 561580f71..ca44a2e3c 100644 --- a/pkg/ngimporter/types/import_node.go +++ b/pkg/ngimporter/types/import_node.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "strconv" "strings" "sync" "time" @@ -14,6 +15,7 @@ import ( "github.com/cortezaproject/corteza-server/compose/repository" cv "github.com/cortezaproject/corteza-server/compose/service/values" "github.com/cortezaproject/corteza-server/compose/types" + "github.com/cortezaproject/corteza-server/pkg/rh" "github.com/schollz/progressbar/v2" ) @@ -154,6 +156,41 @@ func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[st } } +func (n *ImportNode) fetchRemoteRef(ref, refMod string, repo repository.RecordRepository) (string, error) { + refModU, err := strconv.ParseUint(refMod, 10, 64) + if err != nil { + return "", err + } + + fl := types.RecordFilter{ + ModuleID: refModU, + NamespaceID: n.Namespace.ID, + Deleted: rh.FilterStateInclusive, + Query: fmt.Sprintf("%s='%s'", LegacyRefIDField, ref), + PageFilter: rh.PageFilter{ + Page: 1, + PerPage: 1, + }, + } + + var refModM *types.Module + if ModulesGlobal != nil { + refModM = ModulesGlobal.FindByID(refModU) + } + if refModM != nil { + rr, _, err := repo.Find(refModM, fl) + if err != nil { + return "", err + } + if len(rr) < 1 { + return "", errors.New(fmt.Sprintf("[error] referenced record %s not found on node %s for module %s", ref, n.Name, refModM.Name)) + } + return strconv.FormatUint(rr[0].ID, 10), nil + } + + return "", nil +} + // determines if node is Satisfied and can be imported // it is Satisfied, when all of it's dependencies have been imported ie. no // more child refs @@ -344,19 +381,26 @@ func (n *ImportNode) correctRecordRefs(repo repository.RecordRepository) error { return errors.New("moduleField.record.invalidRefFormat") } + fetch := false + // in case of a missing ref, make sure to remove the reference. // otherwise this will cause internal errors when trying to resolve CortezaID. - if mod, ok := n.idMap[ref]; ok { - if vv, ok := mod[val]; ok { - v.Value = vv - v.Updated = true - } else { - v.Value = "" + if mod, ok := n.idMap[ref]; !ok { + fetch = true + } else if vv, ok := mod[val]; !ok { + fetch = true + } else { + v.Value = vv + v.Updated = true + } + + if fetch { + val, err := n.fetchRemoteRef(val, ref, repo) + if err != nil { continue } - } else { - v.Value = "" - continue + v.Value = val + v.Updated = true } } } @@ -409,6 +453,14 @@ func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.R recordValues := types.RecordValueSet{} + // assure a valid legacy reference + recordValues = append(recordValues, &types.RecordValue{ + Name: LegacyRefIDField, + Value: record[0], + Place: 0, + Updated: true, + }) + // convert the given row into a { field: value } map; this will be used // for expression evaluation row := map[string]string{} @@ -506,13 +558,28 @@ func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.R return nil, errors.New("moduleField.record.invalidRefFormat") } - if mod, ok := n.idMap[ref]; ok && val != "" { - if v, ok := mod[val]; ok && v != "" { - val = v - } else { + fetch := false + + if val == "" { + continue + } + + if mod, ok := n.idMap[ref]; !ok { + fetch = true + } else if v, ok := mod[val]; !ok || v == "" { + fetch = true + } else { + val = v + } + + if fetch { + val, err = n.fetchRemoteRef(val, ref, repo) + if err != nil { continue } - } else { + } + + if val == "" { continue } } From 06f25a87f3e6bb701c61a66b4e1e67ba8261e409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Tue, 4 Aug 2020 13:15:13 +0200 Subject: [PATCH 2/3] TMP OneTimeTool to add legacy ids to imported records --- compose/commands/ng_importer.go | 23 +++-- pkg/ngimporter/main.go | 117 +++++++++++++++++++++----- pkg/ngimporter/types/general.go | 4 + pkg/ngimporter/types/import_node.go | 108 +++++++++++++++++++++++- pkg/ngimporter/types/import_source.go | 7 ++ 5 files changed, 228 insertions(+), 31 deletions(-) diff --git a/compose/commands/ng_importer.go b/compose/commands/ng_importer.go index 14887b9fc..ff1f5f10d 100644 --- a/compose/commands/ng_importer.go +++ b/compose/commands/ng_importer.go @@ -29,12 +29,14 @@ func NGImporter() *cobra.Command { Run: func(cmd *cobra.Command, args []string) { var ( - ctx = auth.SetSuperUserContext(cli.Context()) - nsFlag = cmd.Flags().Lookup("namespace").Value.String() - srcFlag = cmd.Flags().Lookup("src").Value.String() - metaFlag = cmd.Flags().Lookup("meta").Value.String() - ns *types.Namespace - err error + ctx = auth.SetSuperUserContext(cli.Context()) + nsFlag = cmd.Flags().Lookup("namespace").Value.String() + srcFlag = cmd.Flags().Lookup("src").Value.String() + metaFlag = cmd.Flags().Lookup("meta").Value.String() + toTsFlag = cmd.Flags().Lookup("to-timestamp").Value.String() + fixupFlag = cmd.Flags().Lookup("fixup").Changed + ns *types.Namespace + err error iss []ngt.ImportSource ) @@ -160,7 +162,12 @@ func NGImporter() *cobra.Command { } } - err = ngi.Import(ctx, out, ns) + c := &ngt.Config{ + ToTimestamp: toTsFlag, + RefFixup: fixupFlag, + } + + err = ngi.Import(ctx, out, ns, c) if err != nil { panic(err) } @@ -171,6 +178,8 @@ func NGImporter() *cobra.Command { cmd.Flags().String("namespace", "", "Import into namespace (by ID or string)") cmd.Flags().String("src", "", "Directory with import files") cmd.Flags().String("meta", "", "Directory with import meta files") + cmd.Flags().String("to-timestamp", "", "Process records upto this timestamp") + cmd.Flags().BoolP("fixup", "", false, "Fixup legacy IDs") return cmd } diff --git a/pkg/ngimporter/main.go b/pkg/ngimporter/main.go index d019a7fe1..aa027543f 100644 --- a/pkg/ngimporter/main.go +++ b/pkg/ngimporter/main.go @@ -11,6 +11,7 @@ import ( "github.com/cortezaproject/corteza-server/compose/repository" cct "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/pkg/ngimporter/types" + "github.com/cortezaproject/corteza-server/pkg/rh" "github.com/schollz/progressbar/v2" ) @@ -32,7 +33,7 @@ type ( // * build graph from ImportNodes based on the provided ImportSource nodes // * remove cycles from the given graph // * import data based on node dependencies -func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) error { +func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace, cfg *types.Config) error { // contains warnings raised by the pre process steps var preProcW []string imp := &Importer{} @@ -132,11 +133,20 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er for _, nIs := range nIss { // preload module mod, err := findModuleByHandle(modRepo, ns.ID, nIs.Name) + if mod != nil { + types.ModulesGlobal = append(types.ModulesGlobal, mod) + } if err != nil { preProcW = append(preProcW, err.Error()+" "+nIs.Name) continue } + mod, err = assureLegacyFields(modRepo, mod, cfg) + if err != nil { + // this is a fatal error, we shouldn't continue if this fails + return err + } + // define headers r := csv.NewReader(nIs.Source) var header []string @@ -198,6 +208,9 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er Namespace: ns, Lock: &sync.Mutex{}, } + if mm != nil { + types.ModulesGlobal = append(types.ModulesGlobal, mm) + } nn = imp.AddNode(nn) n.LinkAdd(nn) @@ -211,26 +224,34 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er log.Printf("[warning] %s\n", w) } - imp.RemoveCycles() - - // take note of leaf nodes that can be imported right away - for _, n := range imp.nodes { - if !n.HasChildren() { - imp.Leafs = append(imp.Leafs, n) + if cfg.RefFixup { + err = imp.AssureLegacyID(ctx, cfg) + if err != nil { + log.Println("[importer] failed") + return err } - } + } else { + imp.RemoveCycles() - log.Printf("[importer] prepared\n") - log.Printf("[importer] node count: %d\n", len(imp.nodes)) - log.Printf("[importer] leaf count: %d\n", len(imp.Leafs)) + // take note of leaf nodes that can be imported right away + for _, n := range imp.nodes { + if !n.HasChildren() { + imp.Leafs = append(imp.Leafs, n) + } + } - log.Println("[importer] started") - err = imp.Import(ctx, uMap) - if err != nil { - log.Println("[importer] failed") - return err + log.Printf("[importer] prepared\n") + log.Printf("[importer] node count: %d\n", len(imp.nodes)) + log.Printf("[importer] leaf count: %d\n", len(imp.Leafs)) + + log.Println("[importer] started") + err = imp.Import(ctx, uMap) + if err != nil { + log.Println("[importer] failed") + return err + } + log.Println("[importer] finished") } - log.Println("[importer] finished") return nil } @@ -270,6 +291,29 @@ func (imp *Importer) RemoveCycles() { } } +func (m *Importer) AssureLegacyID(ctx context.Context, cfg *types.Config) error { + db := repository.DB(ctx) + repoRecord := repository.Record(ctx, db) + bar := progressbar.New(len(m.nodes)) + + return db.Transaction(func() (err error) { + // since this is a ott ment to be ran after the data is already there, there is no + // need to worry about references. + for _, n := range m.nodes { + ts := "" + if cfg != nil { + ts = cfg.ToTimestamp + } + err := n.AssureLegacyID(repoRecord, ts) + if err != nil { + return err + } + bar.Add(1) + } + return nil + }) +} + // Import runs the import over each ImportNode in the given graph func (m *Importer) Import(ctx context.Context, users map[string]uint64) error { db := repository.DB(ctx) @@ -278,16 +322,12 @@ func (m *Importer) Import(ctx context.Context, users map[string]uint64) error { return db.Transaction(func() (err error) { for len(m.Leafs) > 0 { - var wg sync.WaitGroup ch := make(chan types.PostProc, len(m.Leafs)) for _, n := range m.Leafs { - wg.Add(1) - go n.Import(repoRecord, users, &wg, ch, bar) + n.Import(repoRecord.With(ctx, db), users, ch, bar) } - wg.Wait() - var nl []*types.ImportNode for len(ch) > 0 { pp := <-ch @@ -346,3 +386,36 @@ func findModuleByHandle(repo repository.ModuleRepository, namespaceID uint64, ha return mod, nil } + +func assureLegacyFields(repo repository.ModuleRepository, mod *cct.Module, cfg *types.Config) (*cct.Module, error) { + dirty := false + + // make a copy of the original fields, so we don't mess with it + ff := make(cct.ModuleFieldSet, 0) + mod.Fields.Walk(func(f *cct.ModuleField) error { + ff = append(ff, f) + return nil + }) + + // assure the legacy id reference + f := mod.Fields.FindByName(types.LegacyRefIDField) + if f == nil { + dirty = true + ff = append(ff, &cct.ModuleField{ + ModuleID: mod.ID, + Kind: "String", + Name: types.LegacyRefIDField, + }) + } + + if dirty { + // we are simply adding the given field, there is no harm in skipping the records checking + err := repo.UpdateFields(mod.ID, ff, true) + if err != nil { + return nil, err + } + mod.Fields = ff + } + + return mod, nil +} diff --git a/pkg/ngimporter/types/general.go b/pkg/ngimporter/types/general.go index 8f5100a70..43a723b07 100644 --- a/pkg/ngimporter/types/general.go +++ b/pkg/ngimporter/types/general.go @@ -3,6 +3,8 @@ package types import ( "time" "unicode/utf8" + + "github.com/cortezaproject/corteza-server/compose/types" ) const ( @@ -28,6 +30,8 @@ const ( var ( // ExprLang contains gval language that should be used for any expression evaluation ExprLang = GLang() + + ModulesGlobal = make(types.ModuleSet, 0) ) type ( diff --git a/pkg/ngimporter/types/import_node.go b/pkg/ngimporter/types/import_node.go index ca44a2e3c..d3575bf56 100644 --- a/pkg/ngimporter/types/import_node.go +++ b/pkg/ngimporter/types/import_node.go @@ -97,8 +97,7 @@ func (n *ImportNode) addMap(key string, m Map) { // * source import, // * reference correction. // For details refer to the README. -func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) { - defer wg.Done() +func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, ch chan PostProc, bar *progressbar.ProgressBar) { defer bar.Add(1) var err error @@ -191,6 +190,111 @@ func (n *ImportNode) fetchRemoteRef(ref, refMod string, repo repository.RecordRe return "", nil } +func (n *ImportNode) AssureLegacyID(repoRecord repository.RecordRepository, toTimestamp string) error { + limit := uint(10000) + pager := func(page uint) (types.RecordSet, *types.RecordFilter, error) { + // fetch all records, ordered by the ID for this module before the specified timestamp (if provided) + f := types.RecordFilter{ + Sort: "id ASC", + Deleted: rh.FilterStateInclusive, + ModuleID: n.Module.ID, + NamespaceID: n.Namespace.ID, + PageFilter: rh.PageFilter{ + Page: page, + PerPage: limit, + }, + } + if toTimestamp != "" { + f.Query = fmt.Sprintf("createdAt <= '%s'", toTimestamp) + } + + rr, ff, err := repoRecord.Find(n.Module, f) + rvs, err := repoRecord.LoadValues(n.Module.Fields.Names(), rr.IDs()) + if err != nil { + return nil, nil, err + } + + err = rr.Walk(func(r *types.Record) error { + r.Values = rvs.FilterByRecordID(r.ID) + return nil + }) + if err != nil { + return nil, nil, err + } + return rr, &ff, nil + } + + // loop through the csv entries and provide the legacy ref id field value + i := uint(0) + page := uint(1) + var rr types.RecordSet + var f *types.RecordFilter + var err error + + for { + // <= because i is 0-based (array indexes) + if f == nil || i >= f.Page*f.PerPage { + rr, f, err = pager(page) + if err != nil { + return err + } + page++ + } + + // this only happenes when there is no source for the module; ie. some imported source + // references a module that was not there initially. + // such cases can be skipped. + if n.Reader == nil { + return nil + } + + record, err := n.Reader.Read() + if err == io.EOF { + break + } + + if err != nil { + return err + } + + // since the importer skips these, these should also be ignored here + if record[0] == "" { + continue + } + + if i >= uint(f.Count) { + return errors.New(fmt.Sprintf("[error] the number of csv entries exceeded record count: %d for node: %s", f.Count, n.Name)) + } + r := rr[i-((f.Page-1)*f.PerPage)] + rvs := r.Values + rv := rvs.FilterByName(LegacyRefIDField) + if rv == nil { + rvs = append(rvs, &types.RecordValue{ + RecordID: r.ID, + Name: LegacyRefIDField, + Place: 0, + Value: record[0], + Updated: true, + }) + + err := repoRecord.UpdateValues(r.ID, rvs) + if err != nil { + return err + } + } + + i++ + } + + // final sanity checks + // - check that the counters match up + if f.Count != i { + return errors.New(fmt.Sprintf("[error] the number of records and csv entries don't match; records: %d, csv: %d, node: %s", f.Count, i, n.Name)) + } + + return nil +} + // determines if node is Satisfied and can be imported // it is Satisfied, when all of it's dependencies have been imported ie. no // more child refs diff --git a/pkg/ngimporter/types/import_source.go b/pkg/ngimporter/types/import_source.go index fa3c4ddc4..4ee482cf2 100644 --- a/pkg/ngimporter/types/import_source.go +++ b/pkg/ngimporter/types/import_source.go @@ -48,4 +48,11 @@ type ( // a specified value used by Corteza. ValueMap map[string]map[string]string } + + // Config helps us define different global configuration options that are used + // during the import process. + Config struct { + ToTimestamp string + RefFixup bool + } ) From 88583e2b7328334770c615a1c648592ead63a417 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Wed, 5 Aug 2020 11:23:55 +0200 Subject: [PATCH 3/3] Reorder user preparation logic --- pkg/ngimporter/main.go | 86 +++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/pkg/ngimporter/main.go b/pkg/ngimporter/main.go index aa027543f..77f31c60b 100644 --- a/pkg/ngimporter/main.go +++ b/pkg/ngimporter/main.go @@ -75,49 +75,6 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace, cf } } - // populate with existing users - uMod, err := findModuleByHandle(modRepo, ns.ID, "user") - if err != nil { - return err - } - rr, _, err := recRepo.Find(uMod, cct.RecordFilter{ - ModuleID: uMod.ID, - Deleted: rh.FilterStateInclusive, - NamespaceID: ns.ID, - Query: "sys_legacy_ref_id IS NOT NULL", - PageFilter: rh.PageFilter{ - Page: 1, - PerPage: 0, - }, - }) - if err != nil { - return err - } - - rvs, err := recRepo.LoadValues(uMod.Fields.Names(), rr.IDs()) - if err != nil { - return err - } - - err = rr.Walk(func(r *cct.Record) error { - r.Values = rvs.FilterByRecordID(r.ID) - return nil - }) - if err != nil { - return err - } - - rr.Walk(func(r *cct.Record) error { - vr := r.Values.Get("sys_legacy_ref_id", 0) - vu := r.Values.Get("UserID", 0) - u, err := strconv.ParseUint(vu.Value, 10, 64) - if err != nil { - return err - } - uMap[vr.Value] = u - return nil - }) - iss, err = joinData(iss) if err != nil { return err @@ -231,6 +188,49 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace, cf return err } } else { + // populate with existing users + uMod, err := findModuleByHandle(modRepo, ns.ID, "user") + if err != nil { + return err + } + rr, _, err := recRepo.Find(uMod, cct.RecordFilter{ + ModuleID: uMod.ID, + Deleted: rh.FilterStateInclusive, + NamespaceID: ns.ID, + Query: "sys_legacy_ref_id IS NOT NULL", + PageFilter: rh.PageFilter{ + Page: 1, + PerPage: 0, + }, + }) + if err != nil { + return err + } + + rvs, err := recRepo.LoadValues(uMod.Fields.Names(), rr.IDs()) + if err != nil { + return err + } + + err = rr.Walk(func(r *cct.Record) error { + r.Values = rvs.FilterByRecordID(r.ID) + return nil + }) + if err != nil { + return err + } + + rr.Walk(func(r *cct.Record) error { + vr := r.Values.Get("sys_legacy_ref_id", 0) + vu := r.Values.Get("UserID", 0) + u, err := strconv.ParseUint(vu.Value, 10, 64) + if err != nil { + return err + } + uMap[vr.Value] = u + return nil + }) + imp.RemoveCycles() // take note of leaf nodes that can be imported right away