3
0

TMP OneTimeTool to add legacy ids to imported records

This commit is contained in:
Tomaž Jerman 2020-08-04 13:15:13 +02:00
parent e50472ed8b
commit 06f25a87f3
5 changed files with 228 additions and 31 deletions

View File

@ -29,12 +29,14 @@ func NGImporter() *cobra.Command {
Run: func(cmd *cobra.Command, args []string) {
var (
ctx = auth.SetSuperUserContext(cli.Context())
nsFlag = cmd.Flags().Lookup("namespace").Value.String()
srcFlag = cmd.Flags().Lookup("src").Value.String()
metaFlag = cmd.Flags().Lookup("meta").Value.String()
ns *types.Namespace
err error
ctx = auth.SetSuperUserContext(cli.Context())
nsFlag = cmd.Flags().Lookup("namespace").Value.String()
srcFlag = cmd.Flags().Lookup("src").Value.String()
metaFlag = cmd.Flags().Lookup("meta").Value.String()
toTsFlag = cmd.Flags().Lookup("to-timestamp").Value.String()
fixupFlag = cmd.Flags().Lookup("fixup").Changed
ns *types.Namespace
err error
iss []ngt.ImportSource
)
@ -160,7 +162,12 @@ func NGImporter() *cobra.Command {
}
}
err = ngi.Import(ctx, out, ns)
c := &ngt.Config{
ToTimestamp: toTsFlag,
RefFixup: fixupFlag,
}
err = ngi.Import(ctx, out, ns, c)
if err != nil {
panic(err)
}
@ -171,6 +178,8 @@ func NGImporter() *cobra.Command {
cmd.Flags().String("namespace", "", "Import into namespace (by ID or string)")
cmd.Flags().String("src", "", "Directory with import files")
cmd.Flags().String("meta", "", "Directory with import meta files")
cmd.Flags().String("to-timestamp", "", "Process records upto this timestamp")
cmd.Flags().BoolP("fixup", "", false, "Fixup legacy IDs")
return cmd
}

View File

@ -11,6 +11,7 @@ import (
"github.com/cortezaproject/corteza-server/compose/repository"
cct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/ngimporter/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
"github.com/schollz/progressbar/v2"
)
@ -32,7 +33,7 @@ type (
// * build graph from ImportNodes based on the provided ImportSource nodes
// * remove cycles from the given graph
// * import data based on node dependencies
func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) error {
func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace, cfg *types.Config) error {
// contains warnings raised by the pre process steps
var preProcW []string
imp := &Importer{}
@ -132,11 +133,20 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er
for _, nIs := range nIss {
// preload module
mod, err := findModuleByHandle(modRepo, ns.ID, nIs.Name)
if mod != nil {
types.ModulesGlobal = append(types.ModulesGlobal, mod)
}
if err != nil {
preProcW = append(preProcW, err.Error()+" "+nIs.Name)
continue
}
mod, err = assureLegacyFields(modRepo, mod, cfg)
if err != nil {
// this is a fatal error, we shouldn't continue if this fails
return err
}
// define headers
r := csv.NewReader(nIs.Source)
var header []string
@ -198,6 +208,9 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er
Namespace: ns,
Lock: &sync.Mutex{},
}
if mm != nil {
types.ModulesGlobal = append(types.ModulesGlobal, mm)
}
nn = imp.AddNode(nn)
n.LinkAdd(nn)
@ -211,26 +224,34 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er
log.Printf("[warning] %s\n", w)
}
imp.RemoveCycles()
// take note of leaf nodes that can be imported right away
for _, n := range imp.nodes {
if !n.HasChildren() {
imp.Leafs = append(imp.Leafs, n)
if cfg.RefFixup {
err = imp.AssureLegacyID(ctx, cfg)
if err != nil {
log.Println("[importer] failed")
return err
}
}
} else {
imp.RemoveCycles()
log.Printf("[importer] prepared\n")
log.Printf("[importer] node count: %d\n", len(imp.nodes))
log.Printf("[importer] leaf count: %d\n", len(imp.Leafs))
// take note of leaf nodes that can be imported right away
for _, n := range imp.nodes {
if !n.HasChildren() {
imp.Leafs = append(imp.Leafs, n)
}
}
log.Println("[importer] started")
err = imp.Import(ctx, uMap)
if err != nil {
log.Println("[importer] failed")
return err
log.Printf("[importer] prepared\n")
log.Printf("[importer] node count: %d\n", len(imp.nodes))
log.Printf("[importer] leaf count: %d\n", len(imp.Leafs))
log.Println("[importer] started")
err = imp.Import(ctx, uMap)
if err != nil {
log.Println("[importer] failed")
return err
}
log.Println("[importer] finished")
}
log.Println("[importer] finished")
return nil
}
@ -270,6 +291,29 @@ func (imp *Importer) RemoveCycles() {
}
}
func (m *Importer) AssureLegacyID(ctx context.Context, cfg *types.Config) error {
db := repository.DB(ctx)
repoRecord := repository.Record(ctx, db)
bar := progressbar.New(len(m.nodes))
return db.Transaction(func() (err error) {
// since this is a ott ment to be ran after the data is already there, there is no
// need to worry about references.
for _, n := range m.nodes {
ts := ""
if cfg != nil {
ts = cfg.ToTimestamp
}
err := n.AssureLegacyID(repoRecord, ts)
if err != nil {
return err
}
bar.Add(1)
}
return nil
})
}
// Import runs the import over each ImportNode in the given graph
func (m *Importer) Import(ctx context.Context, users map[string]uint64) error {
db := repository.DB(ctx)
@ -278,16 +322,12 @@ func (m *Importer) Import(ctx context.Context, users map[string]uint64) error {
return db.Transaction(func() (err error) {
for len(m.Leafs) > 0 {
var wg sync.WaitGroup
ch := make(chan types.PostProc, len(m.Leafs))
for _, n := range m.Leafs {
wg.Add(1)
go n.Import(repoRecord, users, &wg, ch, bar)
n.Import(repoRecord.With(ctx, db), users, ch, bar)
}
wg.Wait()
var nl []*types.ImportNode
for len(ch) > 0 {
pp := <-ch
@ -346,3 +386,36 @@ func findModuleByHandle(repo repository.ModuleRepository, namespaceID uint64, ha
return mod, nil
}
func assureLegacyFields(repo repository.ModuleRepository, mod *cct.Module, cfg *types.Config) (*cct.Module, error) {
dirty := false
// make a copy of the original fields, so we don't mess with it
ff := make(cct.ModuleFieldSet, 0)
mod.Fields.Walk(func(f *cct.ModuleField) error {
ff = append(ff, f)
return nil
})
// assure the legacy id reference
f := mod.Fields.FindByName(types.LegacyRefIDField)
if f == nil {
dirty = true
ff = append(ff, &cct.ModuleField{
ModuleID: mod.ID,
Kind: "String",
Name: types.LegacyRefIDField,
})
}
if dirty {
// we are simply adding the given field, there is no harm in skipping the records checking
err := repo.UpdateFields(mod.ID, ff, true)
if err != nil {
return nil, err
}
mod.Fields = ff
}
return mod, nil
}

View File

@ -3,6 +3,8 @@ package types
import (
"time"
"unicode/utf8"
"github.com/cortezaproject/corteza-server/compose/types"
)
const (
@ -28,6 +30,8 @@ const (
var (
// ExprLang contains gval language that should be used for any expression evaluation
ExprLang = GLang()
ModulesGlobal = make(types.ModuleSet, 0)
)
type (

View File

@ -97,8 +97,7 @@ func (n *ImportNode) addMap(key string, m Map) {
// * source import,
// * reference correction.
// For details refer to the README.
func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) {
defer wg.Done()
func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, ch chan PostProc, bar *progressbar.ProgressBar) {
defer bar.Add(1)
var err error
@ -191,6 +190,111 @@ func (n *ImportNode) fetchRemoteRef(ref, refMod string, repo repository.RecordRe
return "", nil
}
func (n *ImportNode) AssureLegacyID(repoRecord repository.RecordRepository, toTimestamp string) error {
limit := uint(10000)
pager := func(page uint) (types.RecordSet, *types.RecordFilter, error) {
// fetch all records, ordered by the ID for this module before the specified timestamp (if provided)
f := types.RecordFilter{
Sort: "id ASC",
Deleted: rh.FilterStateInclusive,
ModuleID: n.Module.ID,
NamespaceID: n.Namespace.ID,
PageFilter: rh.PageFilter{
Page: page,
PerPage: limit,
},
}
if toTimestamp != "" {
f.Query = fmt.Sprintf("createdAt <= '%s'", toTimestamp)
}
rr, ff, err := repoRecord.Find(n.Module, f)
rvs, err := repoRecord.LoadValues(n.Module.Fields.Names(), rr.IDs())
if err != nil {
return nil, nil, err
}
err = rr.Walk(func(r *types.Record) error {
r.Values = rvs.FilterByRecordID(r.ID)
return nil
})
if err != nil {
return nil, nil, err
}
return rr, &ff, nil
}
// loop through the csv entries and provide the legacy ref id field value
i := uint(0)
page := uint(1)
var rr types.RecordSet
var f *types.RecordFilter
var err error
for {
// <= because i is 0-based (array indexes)
if f == nil || i >= f.Page*f.PerPage {
rr, f, err = pager(page)
if err != nil {
return err
}
page++
}
// this only happenes when there is no source for the module; ie. some imported source
// references a module that was not there initially.
// such cases can be skipped.
if n.Reader == nil {
return nil
}
record, err := n.Reader.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
// since the importer skips these, these should also be ignored here
if record[0] == "" {
continue
}
if i >= uint(f.Count) {
return errors.New(fmt.Sprintf("[error] the number of csv entries exceeded record count: %d for node: %s", f.Count, n.Name))
}
r := rr[i-((f.Page-1)*f.PerPage)]
rvs := r.Values
rv := rvs.FilterByName(LegacyRefIDField)
if rv == nil {
rvs = append(rvs, &types.RecordValue{
RecordID: r.ID,
Name: LegacyRefIDField,
Place: 0,
Value: record[0],
Updated: true,
})
err := repoRecord.UpdateValues(r.ID, rvs)
if err != nil {
return err
}
}
i++
}
// final sanity checks
// - check that the counters match up
if f.Count != i {
return errors.New(fmt.Sprintf("[error] the number of records and csv entries don't match; records: %d, csv: %d, node: %s", f.Count, i, n.Name))
}
return nil
}
// determines if node is Satisfied and can be imported
// it is Satisfied, when all of it's dependencies have been imported ie. no
// more child refs

View File

@ -48,4 +48,11 @@ type (
// a specified value used by Corteza.
ValueMap map[string]map[string]string
}
// Config helps us define different global configuration options that are used
// during the import process.
Config struct {
ToTimestamp string
RefFixup bool
}
)