3
0

Merge branch 'feature-ott-legacy-ref-fix' into develop

This commit is contained in:
Tomaž Jerman 2020-08-05 11:47:20 +02:00
commit 4d887ba486
5 changed files with 356 additions and 46 deletions

View File

@ -29,12 +29,14 @@ func NGImporter() *cobra.Command {
Run: func(cmd *cobra.Command, args []string) {
var (
ctx = auth.SetSuperUserContext(cli.Context())
nsFlag = cmd.Flags().Lookup("namespace").Value.String()
srcFlag = cmd.Flags().Lookup("src").Value.String()
metaFlag = cmd.Flags().Lookup("meta").Value.String()
ns *types.Namespace
err error
ctx = auth.SetSuperUserContext(cli.Context())
nsFlag = cmd.Flags().Lookup("namespace").Value.String()
srcFlag = cmd.Flags().Lookup("src").Value.String()
metaFlag = cmd.Flags().Lookup("meta").Value.String()
toTsFlag = cmd.Flags().Lookup("to-timestamp").Value.String()
fixupFlag = cmd.Flags().Lookup("fixup").Changed
ns *types.Namespace
err error
iss []ngt.ImportSource
)
@ -160,7 +162,12 @@ func NGImporter() *cobra.Command {
}
}
err = ngi.Import(ctx, out, ns)
c := &ngt.Config{
ToTimestamp: toTsFlag,
RefFixup: fixupFlag,
}
err = ngi.Import(ctx, out, ns, c)
if err != nil {
panic(err)
}
@ -171,6 +178,8 @@ func NGImporter() *cobra.Command {
cmd.Flags().String("namespace", "", "Import into namespace (by ID or string)")
cmd.Flags().String("src", "", "Directory with import files")
cmd.Flags().String("meta", "", "Directory with import meta files")
cmd.Flags().String("to-timestamp", "", "Process records upto this timestamp")
cmd.Flags().BoolP("fixup", "", false, "Fixup legacy IDs")
return cmd
}

View File

@ -11,6 +11,7 @@ import (
"github.com/cortezaproject/corteza-server/compose/repository"
cct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/ngimporter/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
"github.com/schollz/progressbar/v2"
)
@ -32,12 +33,13 @@ type (
// * build graph from ImportNodes based on the provided ImportSource nodes
// * remove cycles from the given graph
// * import data based on node dependencies
func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) error {
func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace, cfg *types.Config) error {
// contains warnings raised by the pre process steps
var preProcW []string
imp := &Importer{}
db := repository.DB(ctx)
modRepo := repository.Module(ctx, db)
recRepo := repository.Record(ctx, db)
var err error
// import users
@ -50,7 +52,7 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er
}
// maps sourceUserID to CortezaID
var uMap map[string]uint64
uMap := make(map[string]uint64)
if usrSrc != nil {
um, mgu, err := importUsers(ctx, usrSrc, ns)
if err != nil {
@ -88,11 +90,20 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er
for _, nIs := range nIss {
// preload module
mod, err := findModuleByHandle(modRepo, ns.ID, nIs.Name)
if mod != nil {
types.ModulesGlobal = append(types.ModulesGlobal, mod)
}
if err != nil {
preProcW = append(preProcW, err.Error()+" "+nIs.Name)
continue
}
mod, err = assureLegacyFields(modRepo, mod, cfg)
if err != nil {
// this is a fatal error, we shouldn't continue if this fails
return err
}
// define headers
r := csv.NewReader(nIs.Source)
var header []string
@ -154,6 +165,9 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er
Namespace: ns,
Lock: &sync.Mutex{},
}
if mm != nil {
types.ModulesGlobal = append(types.ModulesGlobal, mm)
}
nn = imp.AddNode(nn)
n.LinkAdd(nn)
@ -167,26 +181,77 @@ func Import(ctx context.Context, iss []types.ImportSource, ns *cct.Namespace) er
log.Printf("[warning] %s\n", w)
}
imp.RemoveCycles()
// take note of leaf nodes that can be imported right away
for _, n := range imp.nodes {
if !n.HasChildren() {
imp.Leafs = append(imp.Leafs, n)
if cfg.RefFixup {
err = imp.AssureLegacyID(ctx, cfg)
if err != nil {
log.Println("[importer] failed")
return err
}
} else {
// populate with existing users
uMod, err := findModuleByHandle(modRepo, ns.ID, "user")
if err != nil {
return err
}
rr, _, err := recRepo.Find(uMod, cct.RecordFilter{
ModuleID: uMod.ID,
Deleted: rh.FilterStateInclusive,
NamespaceID: ns.ID,
Query: "sys_legacy_ref_id IS NOT NULL",
PageFilter: rh.PageFilter{
Page: 1,
PerPage: 0,
},
})
if err != nil {
return err
}
}
log.Printf("[importer] prepared\n")
log.Printf("[importer] node count: %d\n", len(imp.nodes))
log.Printf("[importer] leaf count: %d\n", len(imp.Leafs))
rvs, err := recRepo.LoadValues(uMod.Fields.Names(), rr.IDs())
if err != nil {
return err
}
log.Println("[importer] started")
err = imp.Import(ctx, uMap)
if err != nil {
log.Println("[importer] failed")
return err
err = rr.Walk(func(r *cct.Record) error {
r.Values = rvs.FilterByRecordID(r.ID)
return nil
})
if err != nil {
return err
}
rr.Walk(func(r *cct.Record) error {
vr := r.Values.Get("sys_legacy_ref_id", 0)
vu := r.Values.Get("UserID", 0)
u, err := strconv.ParseUint(vu.Value, 10, 64)
if err != nil {
return err
}
uMap[vr.Value] = u
return nil
})
imp.RemoveCycles()
// take note of leaf nodes that can be imported right away
for _, n := range imp.nodes {
if !n.HasChildren() {
imp.Leafs = append(imp.Leafs, n)
}
}
log.Printf("[importer] prepared\n")
log.Printf("[importer] node count: %d\n", len(imp.nodes))
log.Printf("[importer] leaf count: %d\n", len(imp.Leafs))
log.Println("[importer] started")
err = imp.Import(ctx, uMap)
if err != nil {
log.Println("[importer] failed")
return err
}
log.Println("[importer] finished")
}
log.Println("[importer] finished")
return nil
}
@ -226,6 +291,29 @@ func (imp *Importer) RemoveCycles() {
}
}
func (m *Importer) AssureLegacyID(ctx context.Context, cfg *types.Config) error {
db := repository.DB(ctx)
repoRecord := repository.Record(ctx, db)
bar := progressbar.New(len(m.nodes))
return db.Transaction(func() (err error) {
// since this is a ott ment to be ran after the data is already there, there is no
// need to worry about references.
for _, n := range m.nodes {
ts := ""
if cfg != nil {
ts = cfg.ToTimestamp
}
err := n.AssureLegacyID(repoRecord, ts)
if err != nil {
return err
}
bar.Add(1)
}
return nil
})
}
// Import runs the import over each ImportNode in the given graph
func (m *Importer) Import(ctx context.Context, users map[string]uint64) error {
db := repository.DB(ctx)
@ -234,16 +322,12 @@ func (m *Importer) Import(ctx context.Context, users map[string]uint64) error {
return db.Transaction(func() (err error) {
for len(m.Leafs) > 0 {
var wg sync.WaitGroup
ch := make(chan types.PostProc, len(m.Leafs))
for _, n := range m.Leafs {
wg.Add(1)
go n.Import(repoRecord, users, &wg, ch, bar)
n.Import(repoRecord.With(ctx, db), users, ch, bar)
}
wg.Wait()
var nl []*types.ImportNode
for len(ch) > 0 {
pp := <-ch
@ -302,3 +386,36 @@ func findModuleByHandle(repo repository.ModuleRepository, namespaceID uint64, ha
return mod, nil
}
func assureLegacyFields(repo repository.ModuleRepository, mod *cct.Module, cfg *types.Config) (*cct.Module, error) {
dirty := false
// make a copy of the original fields, so we don't mess with it
ff := make(cct.ModuleFieldSet, 0)
mod.Fields.Walk(func(f *cct.ModuleField) error {
ff = append(ff, f)
return nil
})
// assure the legacy id reference
f := mod.Fields.FindByName(types.LegacyRefIDField)
if f == nil {
dirty = true
ff = append(ff, &cct.ModuleField{
ModuleID: mod.ID,
Kind: "String",
Name: types.LegacyRefIDField,
})
}
if dirty {
// we are simply adding the given field, there is no harm in skipping the records checking
err := repo.UpdateFields(mod.ID, ff, true)
if err != nil {
return nil, err
}
mod.Fields = ff
}
return mod, nil
}

View File

@ -3,6 +3,8 @@ package types
import (
"time"
"unicode/utf8"
"github.com/cortezaproject/corteza-server/compose/types"
)
const (
@ -21,11 +23,15 @@ const (
MetaMapExt = ".map.json"
MetaJoinExt = ".join.json"
MetaValueExt = ".value.json"
LegacyRefIDField = "sys_legacy_ref_id"
)
var (
// ExprLang contains gval language that should be used for any expression evaluation
ExprLang = GLang()
ModulesGlobal = make(types.ModuleSet, 0)
)
type (

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log"
"strconv"
"strings"
"sync"
"time"
@ -14,6 +15,7 @@ import (
"github.com/cortezaproject/corteza-server/compose/repository"
cv "github.com/cortezaproject/corteza-server/compose/service/values"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
"github.com/schollz/progressbar/v2"
)
@ -95,8 +97,7 @@ func (n *ImportNode) addMap(key string, m Map) {
// * source import,
// * reference correction.
// For details refer to the README.
func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) {
defer wg.Done()
func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, ch chan PostProc, bar *progressbar.ProgressBar) {
defer bar.Add(1)
var err error
@ -154,6 +155,146 @@ func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[st
}
}
func (n *ImportNode) fetchRemoteRef(ref, refMod string, repo repository.RecordRepository) (string, error) {
refModU, err := strconv.ParseUint(refMod, 10, 64)
if err != nil {
return "", err
}
fl := types.RecordFilter{
ModuleID: refModU,
NamespaceID: n.Namespace.ID,
Deleted: rh.FilterStateInclusive,
Query: fmt.Sprintf("%s='%s'", LegacyRefIDField, ref),
PageFilter: rh.PageFilter{
Page: 1,
PerPage: 1,
},
}
var refModM *types.Module
if ModulesGlobal != nil {
refModM = ModulesGlobal.FindByID(refModU)
}
if refModM != nil {
rr, _, err := repo.Find(refModM, fl)
if err != nil {
return "", err
}
if len(rr) < 1 {
return "", errors.New(fmt.Sprintf("[error] referenced record %s not found on node %s for module %s", ref, n.Name, refModM.Name))
}
return strconv.FormatUint(rr[0].ID, 10), nil
}
return "", nil
}
func (n *ImportNode) AssureLegacyID(repoRecord repository.RecordRepository, toTimestamp string) error {
limit := uint(10000)
pager := func(page uint) (types.RecordSet, *types.RecordFilter, error) {
// fetch all records, ordered by the ID for this module before the specified timestamp (if provided)
f := types.RecordFilter{
Sort: "id ASC",
Deleted: rh.FilterStateInclusive,
ModuleID: n.Module.ID,
NamespaceID: n.Namespace.ID,
PageFilter: rh.PageFilter{
Page: page,
PerPage: limit,
},
}
if toTimestamp != "" {
f.Query = fmt.Sprintf("createdAt <= '%s'", toTimestamp)
}
rr, ff, err := repoRecord.Find(n.Module, f)
rvs, err := repoRecord.LoadValues(n.Module.Fields.Names(), rr.IDs())
if err != nil {
return nil, nil, err
}
err = rr.Walk(func(r *types.Record) error {
r.Values = rvs.FilterByRecordID(r.ID)
return nil
})
if err != nil {
return nil, nil, err
}
return rr, &ff, nil
}
// loop through the csv entries and provide the legacy ref id field value
i := uint(0)
page := uint(1)
var rr types.RecordSet
var f *types.RecordFilter
var err error
for {
// <= because i is 0-based (array indexes)
if f == nil || i >= f.Page*f.PerPage {
rr, f, err = pager(page)
if err != nil {
return err
}
page++
}
// this only happenes when there is no source for the module; ie. some imported source
// references a module that was not there initially.
// such cases can be skipped.
if n.Reader == nil {
return nil
}
record, err := n.Reader.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
// since the importer skips these, these should also be ignored here
if record[0] == "" {
continue
}
if i >= uint(f.Count) {
return errors.New(fmt.Sprintf("[error] the number of csv entries exceeded record count: %d for node: %s", f.Count, n.Name))
}
r := rr[i-((f.Page-1)*f.PerPage)]
rvs := r.Values
rv := rvs.FilterByName(LegacyRefIDField)
if rv == nil {
rvs = append(rvs, &types.RecordValue{
RecordID: r.ID,
Name: LegacyRefIDField,
Place: 0,
Value: record[0],
Updated: true,
})
err := repoRecord.UpdateValues(r.ID, rvs)
if err != nil {
return err
}
}
i++
}
// final sanity checks
// - check that the counters match up
if f.Count != i {
return errors.New(fmt.Sprintf("[error] the number of records and csv entries don't match; records: %d, csv: %d, node: %s", f.Count, i, n.Name))
}
return nil
}
// determines if node is Satisfied and can be imported
// it is Satisfied, when all of it's dependencies have been imported ie. no
// more child refs
@ -344,19 +485,26 @@ func (n *ImportNode) correctRecordRefs(repo repository.RecordRepository) error {
return errors.New("moduleField.record.invalidRefFormat")
}
fetch := false
// in case of a missing ref, make sure to remove the reference.
// otherwise this will cause internal errors when trying to resolve CortezaID.
if mod, ok := n.idMap[ref]; ok {
if vv, ok := mod[val]; ok {
v.Value = vv
v.Updated = true
} else {
v.Value = ""
if mod, ok := n.idMap[ref]; !ok {
fetch = true
} else if vv, ok := mod[val]; !ok {
fetch = true
} else {
v.Value = vv
v.Updated = true
}
if fetch {
val, err := n.fetchRemoteRef(val, ref, repo)
if err != nil {
continue
}
} else {
v.Value = ""
continue
v.Value = val
v.Updated = true
}
}
}
@ -409,6 +557,14 @@ func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.R
recordValues := types.RecordValueSet{}
// assure a valid legacy reference
recordValues = append(recordValues, &types.RecordValue{
Name: LegacyRefIDField,
Value: record[0],
Place: 0,
Updated: true,
})
// convert the given row into a { field: value } map; this will be used
// for expression evaluation
row := map[string]string{}
@ -506,13 +662,28 @@ func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.R
return nil, errors.New("moduleField.record.invalidRefFormat")
}
if mod, ok := n.idMap[ref]; ok && val != "" {
if v, ok := mod[val]; ok && v != "" {
val = v
} else {
fetch := false
if val == "" {
continue
}
if mod, ok := n.idMap[ref]; !ok {
fetch = true
} else if v, ok := mod[val]; !ok || v == "" {
fetch = true
} else {
val = v
}
if fetch {
val, err = n.fetchRemoteRef(val, ref, repo)
if err != nil {
continue
}
} else {
}
if val == "" {
continue
}
}

View File

@ -48,4 +48,11 @@ type (
// a specified value used by Corteza.
ValueMap map[string]map[string]string
}
// Config helps us define different global configuration options that are used
// during the import process.
Config struct {
ToTimestamp string
RefFixup bool
}
)