diff --git a/compose/commands/ng_importer.go b/compose/commands/ng_importer.go index 14887b9fc..ff1f5f10d 100644 --- a/compose/commands/ng_importer.go +++ b/compose/commands/ng_importer.go @@ -29,12 +29,14 @@ func NGImporter() *cobra.Command { Run: func(cmd *cobra.Command, args []string) { var ( - ctx = auth.SetSuperUserContext(cli.Context()) - nsFlag = cmd.Flags().Lookup("namespace").Value.String() - srcFlag = cmd.Flags().Lookup("src").Value.String() - metaFlag = cmd.Flags().Lookup("meta").Value.String() - ns *types.Namespace - err error + ctx = auth.SetSuperUserContext(cli.Context()) + nsFlag = cmd.Flags().Lookup("namespace").Value.String() + srcFlag = cmd.Flags().Lookup("src").Value.String() + metaFlag = cmd.Flags().Lookup("meta").Value.String() + toTsFlag = cmd.Flags().Lookup("to-timestamp").Value.String() + fixupFlag = cmd.Flags().Lookup("fixup").Changed + ns *types.Namespace + err error iss []ngt.ImportSource ) @@ -160,7 +162,12 @@ func NGImporter() *cobra.Command { } } - err = ngi.Import(ctx, out, ns) + c := &ngt.Config{ + ToTimestamp: toTsFlag, + RefFixup: fixupFlag, + } + + err = ngi.Import(ctx, out, ns, c) if err != nil { panic(err) } @@ -171,6 +178,8 @@ func NGImporter() *cobra.Command { cmd.Flags().String("namespace", "", "Import into namespace (by ID or string)") cmd.Flags().String("src", "", "Directory with import files") cmd.Flags().String("meta", "", "Directory with import meta files") + cmd.Flags().String("to-timestamp", "", "Process records upto this timestamp") + cmd.Flags().BoolP("fixup", "", false, "Fixup legacy IDs") return cmd } diff --git a/pkg/ngimporter/main.go b/pkg/ngimporter/main.go index dfe8ce19a..77f31c60b 100644 --- a/pkg/ngimporter/main.go +++ b/pkg/ngimporter/main.go @@ -11,6 +11,7 @@ import ( "github.com/cortezaproject/corteza-server/compose/repository" cct "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/pkg/ngimporter/types" + "github.com/cortezaproject/corteza-server/pkg/rh" "github.com/schollz/progressbar/v2" ) @@ -32,12 +33,13 @@ type ( // * build graph from ImportNodes based on the provided ImportSource nodes // * remove cycles from the given graph // * import data based on node dependencies -func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) error { +func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace, cfg *types.Config) error { // contains warnings raised by the pre process steps var preProcW []string imp := &Importer{} db := repository.DB(ctx) modRepo := repository.Module(ctx, db) + recRepo := repository.Record(ctx, db) var err error // import users @@ -50,7 +52,7 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er } // maps sourceUserID to CortezaID - var uMap map[string]uint64 + uMap := make(map[string]uint64) if usrSrc != nil { um, mgu, err := importUsers(ctx, usrSrc, ns) if err != nil { @@ -88,11 +90,20 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er for _, nIs := range nIss { // preload module mod, err := findModuleByHandle(modRepo, ns.ID, nIs.Name) + if mod != nil { + types.ModulesGlobal = append(types.ModulesGlobal, mod) + } if err != nil { preProcW = append(preProcW, err.Error()+" "+nIs.Name) continue } + mod, err = assureLegacyFields(modRepo, mod, cfg) + if err != nil { + // this is a fatal error, we shouldn't continue if this fails + return err + } + // define headers r := csv.NewReader(nIs.Source) var header []string @@ -154,6 +165,9 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er Namespace: ns, Lock: &sync.Mutex{}, } + if mm != nil { + types.ModulesGlobal = append(types.ModulesGlobal, mm) + } nn = imp.AddNode(nn) n.LinkAdd(nn) @@ -167,26 +181,77 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er log.Printf("[warning] %s\n", w) } - imp.RemoveCycles() - - // take note of leaf nodes that can be imported right away - for _, n := range imp.nodes { - if !n.HasChildren() { - imp.Leafs = append(imp.Leafs, n) + if cfg.RefFixup { + err = imp.AssureLegacyID(ctx, cfg) + if err != nil { + log.Println("[importer] failed") + return err + } + } else { + // populate with existing users + uMod, err := findModuleByHandle(modRepo, ns.ID, "user") + if err != nil { + return err + } + rr, _, err := recRepo.Find(uMod, cct.RecordFilter{ + ModuleID: uMod.ID, + Deleted: rh.FilterStateInclusive, + NamespaceID: ns.ID, + Query: "sys_legacy_ref_id IS NOT NULL", + PageFilter: rh.PageFilter{ + Page: 1, + PerPage: 0, + }, + }) + if err != nil { + return err } - } - log.Printf("[importer] prepared\n") - log.Printf("[importer] node count: %d\n", len(imp.nodes)) - log.Printf("[importer] leaf count: %d\n", len(imp.Leafs)) + rvs, err := recRepo.LoadValues(uMod.Fields.Names(), rr.IDs()) + if err != nil { + return err + } - log.Println("[importer] started") - err = imp.Import(ctx, uMap) - if err != nil { - log.Println("[importer] failed") - return err + err = rr.Walk(func(r *cct.Record) error { + r.Values = rvs.FilterByRecordID(r.ID) + return nil + }) + if err != nil { + return err + } + + rr.Walk(func(r *cct.Record) error { + vr := r.Values.Get("sys_legacy_ref_id", 0) + vu := r.Values.Get("UserID", 0) + u, err := strconv.ParseUint(vu.Value, 10, 64) + if err != nil { + return err + } + uMap[vr.Value] = u + return nil + }) + + imp.RemoveCycles() + + // take note of leaf nodes that can be imported right away + for _, n := range imp.nodes { + if !n.HasChildren() { + imp.Leafs = append(imp.Leafs, n) + } + } + + log.Printf("[importer] prepared\n") + log.Printf("[importer] node count: %d\n", len(imp.nodes)) + log.Printf("[importer] leaf count: %d\n", len(imp.Leafs)) + + log.Println("[importer] started") + err = imp.Import(ctx, uMap) + if err != nil { + log.Println("[importer] failed") + return err + } + log.Println("[importer] finished") } - log.Println("[importer] finished") return nil } @@ -226,6 +291,29 @@ func (imp *Importer) RemoveCycles() { } } +func (m *Importer) AssureLegacyID(ctx context.Context, cfg *types.Config) error { + db := repository.DB(ctx) + repoRecord := repository.Record(ctx, db) + bar := progressbar.New(len(m.nodes)) + + return db.Transaction(func() (err error) { + // since this is a ott ment to be ran after the data is already there, there is no + // need to worry about references. + for _, n := range m.nodes { + ts := "" + if cfg != nil { + ts = cfg.ToTimestamp + } + err := n.AssureLegacyID(repoRecord, ts) + if err != nil { + return err + } + bar.Add(1) + } + return nil + }) +} + // Import runs the import over each ImportNode in the given graph func (m *Importer) Import(ctx context.Context, users map[string]uint64) error { db := repository.DB(ctx) @@ -234,16 +322,12 @@ func (m *Importer) Import(ctx context.Context, users map[string]uint64) error { return db.Transaction(func() (err error) { for len(m.Leafs) > 0 { - var wg sync.WaitGroup ch := make(chan types.PostProc, len(m.Leafs)) for _, n := range m.Leafs { - wg.Add(1) - go n.Import(repoRecord, users, &wg, ch, bar) + n.Import(repoRecord.With(ctx, db), users, ch, bar) } - wg.Wait() - var nl []*types.ImportNode for len(ch) > 0 { pp := <-ch @@ -302,3 +386,36 @@ func findModuleByHandle(repo repository.ModuleRepository, namespaceID uint64, ha return mod, nil } + +func assureLegacyFields(repo repository.ModuleRepository, mod *cct.Module, cfg *types.Config) (*cct.Module, error) { + dirty := false + + // make a copy of the original fields, so we don't mess with it + ff := make(cct.ModuleFieldSet, 0) + mod.Fields.Walk(func(f *cct.ModuleField) error { + ff = append(ff, f) + return nil + }) + + // assure the legacy id reference + f := mod.Fields.FindByName(types.LegacyRefIDField) + if f == nil { + dirty = true + ff = append(ff, &cct.ModuleField{ + ModuleID: mod.ID, + Kind: "String", + Name: types.LegacyRefIDField, + }) + } + + if dirty { + // we are simply adding the given field, there is no harm in skipping the records checking + err := repo.UpdateFields(mod.ID, ff, true) + if err != nil { + return nil, err + } + mod.Fields = ff + } + + return mod, nil +} diff --git a/pkg/ngimporter/types/general.go b/pkg/ngimporter/types/general.go index 1e013ab9a..43a723b07 100644 --- a/pkg/ngimporter/types/general.go +++ b/pkg/ngimporter/types/general.go @@ -3,6 +3,8 @@ package types import ( "time" "unicode/utf8" + + "github.com/cortezaproject/corteza-server/compose/types" ) const ( @@ -21,11 +23,15 @@ const ( MetaMapExt = ".map.json" MetaJoinExt = ".join.json" MetaValueExt = ".value.json" + + LegacyRefIDField = "sys_legacy_ref_id" ) var ( // ExprLang contains gval language that should be used for any expression evaluation ExprLang = GLang() + + ModulesGlobal = make(types.ModuleSet, 0) ) type ( diff --git a/pkg/ngimporter/types/import_node.go b/pkg/ngimporter/types/import_node.go index 561580f71..d3575bf56 100644 --- a/pkg/ngimporter/types/import_node.go +++ b/pkg/ngimporter/types/import_node.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "strconv" "strings" "sync" "time" @@ -14,6 +15,7 @@ import ( "github.com/cortezaproject/corteza-server/compose/repository" cv "github.com/cortezaproject/corteza-server/compose/service/values" "github.com/cortezaproject/corteza-server/compose/types" + "github.com/cortezaproject/corteza-server/pkg/rh" "github.com/schollz/progressbar/v2" ) @@ -95,8 +97,7 @@ func (n *ImportNode) addMap(key string, m Map) { // * source import, // * reference correction. // For details refer to the README. -func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) { - defer wg.Done() +func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, ch chan PostProc, bar *progressbar.ProgressBar) { defer bar.Add(1) var err error @@ -154,6 +155,146 @@ func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[st } } +func (n *ImportNode) fetchRemoteRef(ref, refMod string, repo repository.RecordRepository) (string, error) { + refModU, err := strconv.ParseUint(refMod, 10, 64) + if err != nil { + return "", err + } + + fl := types.RecordFilter{ + ModuleID: refModU, + NamespaceID: n.Namespace.ID, + Deleted: rh.FilterStateInclusive, + Query: fmt.Sprintf("%s='%s'", LegacyRefIDField, ref), + PageFilter: rh.PageFilter{ + Page: 1, + PerPage: 1, + }, + } + + var refModM *types.Module + if ModulesGlobal != nil { + refModM = ModulesGlobal.FindByID(refModU) + } + if refModM != nil { + rr, _, err := repo.Find(refModM, fl) + if err != nil { + return "", err + } + if len(rr) < 1 { + return "", errors.New(fmt.Sprintf("[error] referenced record %s not found on node %s for module %s", ref, n.Name, refModM.Name)) + } + return strconv.FormatUint(rr[0].ID, 10), nil + } + + return "", nil +} + +func (n *ImportNode) AssureLegacyID(repoRecord repository.RecordRepository, toTimestamp string) error { + limit := uint(10000) + pager := func(page uint) (types.RecordSet, *types.RecordFilter, error) { + // fetch all records, ordered by the ID for this module before the specified timestamp (if provided) + f := types.RecordFilter{ + Sort: "id ASC", + Deleted: rh.FilterStateInclusive, + ModuleID: n.Module.ID, + NamespaceID: n.Namespace.ID, + PageFilter: rh.PageFilter{ + Page: page, + PerPage: limit, + }, + } + if toTimestamp != "" { + f.Query = fmt.Sprintf("createdAt <= '%s'", toTimestamp) + } + + rr, ff, err := repoRecord.Find(n.Module, f) + rvs, err := repoRecord.LoadValues(n.Module.Fields.Names(), rr.IDs()) + if err != nil { + return nil, nil, err + } + + err = rr.Walk(func(r *types.Record) error { + r.Values = rvs.FilterByRecordID(r.ID) + return nil + }) + if err != nil { + return nil, nil, err + } + return rr, &ff, nil + } + + // loop through the csv entries and provide the legacy ref id field value + i := uint(0) + page := uint(1) + var rr types.RecordSet + var f *types.RecordFilter + var err error + + for { + // <= because i is 0-based (array indexes) + if f == nil || i >= f.Page*f.PerPage { + rr, f, err = pager(page) + if err != nil { + return err + } + page++ + } + + // this only happenes when there is no source for the module; ie. some imported source + // references a module that was not there initially. + // such cases can be skipped. + if n.Reader == nil { + return nil + } + + record, err := n.Reader.Read() + if err == io.EOF { + break + } + + if err != nil { + return err + } + + // since the importer skips these, these should also be ignored here + if record[0] == "" { + continue + } + + if i >= uint(f.Count) { + return errors.New(fmt.Sprintf("[error] the number of csv entries exceeded record count: %d for node: %s", f.Count, n.Name)) + } + r := rr[i-((f.Page-1)*f.PerPage)] + rvs := r.Values + rv := rvs.FilterByName(LegacyRefIDField) + if rv == nil { + rvs = append(rvs, &types.RecordValue{ + RecordID: r.ID, + Name: LegacyRefIDField, + Place: 0, + Value: record[0], + Updated: true, + }) + + err := repoRecord.UpdateValues(r.ID, rvs) + if err != nil { + return err + } + } + + i++ + } + + // final sanity checks + // - check that the counters match up + if f.Count != i { + return errors.New(fmt.Sprintf("[error] the number of records and csv entries don't match; records: %d, csv: %d, node: %s", f.Count, i, n.Name)) + } + + return nil +} + // determines if node is Satisfied and can be imported // it is Satisfied, when all of it's dependencies have been imported ie. no // more child refs @@ -344,19 +485,26 @@ func (n *ImportNode) correctRecordRefs(repo repository.RecordRepository) error { return errors.New("moduleField.record.invalidRefFormat") } + fetch := false + // in case of a missing ref, make sure to remove the reference. // otherwise this will cause internal errors when trying to resolve CortezaID. - if mod, ok := n.idMap[ref]; ok { - if vv, ok := mod[val]; ok { - v.Value = vv - v.Updated = true - } else { - v.Value = "" + if mod, ok := n.idMap[ref]; !ok { + fetch = true + } else if vv, ok := mod[val]; !ok { + fetch = true + } else { + v.Value = vv + v.Updated = true + } + + if fetch { + val, err := n.fetchRemoteRef(val, ref, repo) + if err != nil { continue } - } else { - v.Value = "" - continue + v.Value = val + v.Updated = true } } } @@ -409,6 +557,14 @@ func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.R recordValues := types.RecordValueSet{} + // assure a valid legacy reference + recordValues = append(recordValues, &types.RecordValue{ + Name: LegacyRefIDField, + Value: record[0], + Place: 0, + Updated: true, + }) + // convert the given row into a { field: value } map; this will be used // for expression evaluation row := map[string]string{} @@ -506,13 +662,28 @@ func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.R return nil, errors.New("moduleField.record.invalidRefFormat") } - if mod, ok := n.idMap[ref]; ok && val != "" { - if v, ok := mod[val]; ok && v != "" { - val = v - } else { + fetch := false + + if val == "" { + continue + } + + if mod, ok := n.idMap[ref]; !ok { + fetch = true + } else if v, ok := mod[val]; !ok || v == "" { + fetch = true + } else { + val = v + } + + if fetch { + val, err = n.fetchRemoteRef(val, ref, repo) + if err != nil { continue } - } else { + } + + if val == "" { continue } } diff --git a/pkg/ngimporter/types/import_source.go b/pkg/ngimporter/types/import_source.go index fa3c4ddc4..4ee482cf2 100644 --- a/pkg/ngimporter/types/import_source.go +++ b/pkg/ngimporter/types/import_source.go @@ -48,4 +48,11 @@ type ( // a specified value used by Corteza. ValueMap map[string]map[string]string } + + // Config helps us define different global configuration options that are used + // during the import process. + Config struct { + ToTimestamp string + RefFixup bool + } )