diff --git a/compose/app.go b/compose/app.go index 8c954d532..69945ce34 100644 --- a/compose/app.go +++ b/compose/app.go @@ -102,7 +102,7 @@ func (app *App) RegisterCliCommands(p *cobra.Command) { p.AddCommand( commands.Importer(), commands.Exporter(), - commands.Migrator(), + commands.ImporterNG(), // temp command, will be removed in 2020.6 automation.ScriptExporter(SERVICE), ) diff --git a/compose/commands/migrator.go b/compose/commands/migrator.go index 4c0a8c214..ec4e85965 100644 --- a/compose/commands/migrator.go +++ b/compose/commands/migrator.go @@ -19,14 +19,14 @@ import ( "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/cli" - mgg "github.com/cortezaproject/corteza-server/pkg/migrate" - mgt "github.com/cortezaproject/corteza-server/pkg/migrate/types" + ngi "github.com/cortezaproject/corteza-server/pkg/ngImporter" + ngt "github.com/cortezaproject/corteza-server/pkg/ngImporter/types" ) -func Migrator() *cobra.Command { +func ImporterNG() *cobra.Command { cmd := &cobra.Command{ - Use: "migrate", - Short: "Migrate", + Use: "ng-import", + Short: "Importer next-gen", Run: func(cmd *cobra.Command, args []string) { var ( @@ -37,7 +37,7 @@ func Migrator() *cobra.Command { ns *types.Namespace err error - mg []mgt.Migrateable + mg []ngt.ImportSource ) svcNs := service.DefaultNamespace.With(ctx) @@ -70,12 +70,12 @@ func Migrator() *cobra.Command { ext := filepath.Ext(info.Name()) name := info.Name()[0 : len(info.Name())-len(ext)] - mm := migrateableSource(mg, name) + mm := importSource(mg, name) mm.Name = name mm.Path = path mm.Source = file - mg = migrateableAdd(mg, mm) + mg = addImportSource(mg, mm) } return nil }) @@ -92,11 +92,11 @@ func Migrator() *cobra.Command { ext := filepath.Ext(info.Name()) // @todo improve this!! name := info.Name()[0 : len(info.Name())-len(ext)-4] - mm := migrateableSource(mg, name) + mm := importSource(mg, name) mm.Name = name - mm.Map = file + mm.DataMap = file - mg = migrateableAdd(mg, mm) + mg = addImportSource(mg, mm) } else if strings.HasSuffix(info.Name(), ".join.json") { file, err := os.Open(path) if err != nil { @@ -106,11 +106,11 @@ func Migrator() *cobra.Command { ext := filepath.Ext(info.Name()) // @todo improve this!! name := info.Name()[0 : len(info.Name())-len(ext)-5] - mm := migrateableSource(mg, name) + mm := importSource(mg, name) mm.Name = name - mm.Join = file + mm.SourceJoin = file - mg = migrateableAdd(mg, mm) + mg = addImportSource(mg, mm) } else if strings.HasSuffix(info.Name(), ".value.json") { file, err := os.Open(path) if err != nil { @@ -120,7 +120,7 @@ func Migrator() *cobra.Command { ext := filepath.Ext(info.Name()) // @todo improve this!! name := info.Name()[0 : len(info.Name())-len(ext)-6] - mm := migrateableSource(mg, name) + mm := importSource(mg, name) mm.Name = name var vmp map[string]map[string]string @@ -131,7 +131,7 @@ func Migrator() *cobra.Command { } mm.ValueMap = vmp - mg = migrateableAdd(mg, mm) + mg = addImportSource(mg, mm) } return nil }) @@ -143,7 +143,7 @@ func Migrator() *cobra.Command { // clean up migrateables hasW := false - out := make([]mgt.Migrateable, 0) + out := make([]ngt.ImportSource, 0) for _, m := range mg { if m.Source != nil { out = append(out, m) @@ -158,12 +158,14 @@ func Migrator() *cobra.Command { fmt.Print("warnings detected; continue [y/N]? ") _, err := fmt.Scanln(&rsp) + rsp = strings.ToLower(rsp) + if err != nil || rsp != "y" && rsp != "yes" { log.Fatal("migration aborted due to warnings") } } - err = mgg.Migrate(out, ns, ctx) + err = ngi.Import(out, ns, ctx) if err != nil { panic(err) } @@ -172,24 +174,25 @@ func Migrator() *cobra.Command { } cmd.Flags().String("namespace", "", "Import into namespace (by ID or string)") - cmd.Flags().String("src", "", "Directory with migration files") - cmd.Flags().String("meta", "", "Directory with meta files") + cmd.Flags().String("src", "", "Directory with import files") + cmd.Flags().String("meta", "", "Directory with import meta files") return cmd } -// small helper functions for migrateable node management -func migrateableSource(mg []mgt.Migrateable, name string) mgt.Migrateable { +// small helper functions for import source node management + +func importSource(mg []ngt.ImportSource, name string) ngt.ImportSource { for _, m := range mg { if m.Name == name { return m } } - return mgt.Migrateable{} + return ngt.ImportSource{} } -func migrateableAdd(mg []mgt.Migrateable, mm mgt.Migrateable) []mgt.Migrateable { +func addImportSource(mg []ngt.ImportSource, mm ngt.ImportSource) []ngt.ImportSource { for i, m := range mg { if m.Name == mm.Name { mg[i] = mm diff --git a/pkg/migrate/README.adoc b/pkg/migrate/README.adoc deleted file mode 100644 index 9d91673af..000000000 --- a/pkg/migrate/README.adoc +++ /dev/null @@ -1,191 +0,0 @@ -= Data Migration System - -This package implements a graph-based data migration system. - -== Algorithm - -* read & prepare files in a provided directory. -A file's name represents a module handle, so make sure that they match up, -* import system users and take note of there references. -System users will be needed with each migration, so they should be done in step 1, -* initialize the graph based on the provided files and their corresponding modules. -The system can automatically determine dependencies based on their fields, -* remove any cycle from the graph, by splicing one of the cycle nodes. -A spliced node is only able to import it's data, but it can not manage it's dependencies, as they might not yet be known. -Dependencies are updated when it's parent node is being processed, -* determine "leaf" nodes; the nodes with no dependencies. -These can be imported immediately. -* import each leaf node with satisfied dependencies. -Since leaf nodes have no more dependencies, they can be imported in parallel (@todo), -* when the node is finished importing, provide it's mapping information to each one of it's parents, as they will need it. -Also update the list of leaf nodes with parent nodes that have satisfied dependencies. - -== Migration Mapping - -A big part of this system is the support for migration maps; ie. what field from the original source should map into what module and under what field. - -==== -Currently only simple conditions, such as `type=specialType` are supported. -==== - -=== Algorithm -* unmarshal the given `.map.json` -* for each entry of the given source: -** determine the used map based on the provided `where` field & the rows content -** based on the provided `map` entries, update/create buffers -* flush data - -=== Example - -.source.map.json -[source,json] ----- -[ - { - "where": "type=type1", - - "map": [ - { - "from": "id", - "to": "splice_1.original" - }, - { - "from": "id", - "to": "splice_2.original" - }, - { - "from": "id", - "to": "splice.id" - }, - - { - "from": "field1", - "to": "splice.customName" - }, - { - "from": "field2", - "to": "splice_1.customName" - }, - { - "from": "field3", - "to": "splice_2.customName" - } - ] - } -] ----- - -== Joining Migration Sources - -An important feature is the system's ability to construct a migration map from multiple migration sources. -For example; we want to populate a `User` module, that includes data from `User.csv` and `SysUser.csv`. - -=== Algorithrm - -* unmarshal the given `.join.json` -* for each migration node that defines a `.join.json`: -** determine all "joined" migration nodes that will be used in this join operation, -** create `{ field: { id: [ value, ... ] } }` object for each base migration node, based on joined nodes, -** when processing the migration node, respect the above mentioned object and include the specified data. - - -=== Example - -.source.join.json - -`.join.json` files define how multiple migration nodes should join into a single module. - -The below example instructs, that the current module should be constructed from it self and `subMod`; based on the `SubModRef` and `subMod.Id` relation. -When creating a `.map.json` file, values from the join operation are available under the specified alias (`...->alias`). - -[source,json] ----- -{ - "SubModRef->smod": "subMod.Id" -} ----- - -.source.map.json -[source,json] ----- -[ - { - "map": [ - { - "from": "Id", - "to": "baseMod.Id" - }, - - { - "from": "baseField1", - "to": "baseMod.baseField1" - }, - - { - "from": "smod.field1", - "to": "baseMod.SubModField1" - } - ] - } -] ----- - -It is also possible to define a join operation on multiple fields at the same time -- useful in cases where a unique PK is not available and must be constructed. -The following example uses `CreatedDate` and `CreatedById` fields as an index. - -[source,json] ----- -{ - "[CreatedDate,CreatedById]->smod": "subMod.[CreatedDate,CreatedById]" -} ----- - -== Value Mapping - -The system allows us to map a specific value from the provided `.csv` file into a value used by the system. -For example; we can map `In Progress` into `in_progress`. -The mapping also supports a default value, by using the `*` wildcard. - -=== Algorithrm - -* unmarshal the given `.value.json` -* before applying a value for the given field, attempt to map the value -** if mapping is successful, use the mapped value, -** else if default value exists, use the default value, -** else use the original value. - -=== Example - -.source.values.json - -The following value mapping maps `sys_status` field's values; the left one into the right one, with a default of `"new"` (`"*": "new"`). - -[source,json] ----- -{ - "sys_status": { - "In Progress": "in_progress", - "Send to QA": "qa_pending", - "Submit Job": "qa_approved", - "*": "new" - } -} ----- - -The system also provides support for arbitrary mathematical expressions. -If you wish to perform an expression, prefix the mapped value with `=EVL=`; for example `=EVL=numFmt(cell, \"%.0f\")`. - -Variables: -* current cell -- `cell`, -* current row -- `row`; access cell by it's name -- eg. `row.FirstName + row.LastName`. - -The following example will remove the decimal point from every `sys_rating` in the given source. - -[source,json] ----- -{ - "sys_rating": { - "*": "=EVL=numFmt(cell, \"%.0f\")" - } -} ----- diff --git a/pkg/migrate/main.go b/pkg/migrate/main.go deleted file mode 100644 index 45d57b1ee..000000000 --- a/pkg/migrate/main.go +++ /dev/null @@ -1,400 +0,0 @@ -package migrate - -import ( - "bytes" - "context" - "encoding/csv" - "fmt" - "io" - "strconv" - "sync" - "time" - - "github.com/cortezaproject/corteza-server/compose/repository" - "github.com/cortezaproject/corteza-server/compose/service" - cct "github.com/cortezaproject/corteza-server/compose/types" - "github.com/cortezaproject/corteza-server/pkg/migrate/types" - sysRepo "github.com/cortezaproject/corteza-server/system/repository" - sysTypes "github.com/cortezaproject/corteza-server/system/types" - "github.com/davecgh/go-spew/spew" - "github.com/schollz/progressbar/v2" -) - -var ( - userModHandle = "User" -) - -type ( - Migrator struct { - // a set of nodes included in the migration - nodes []*types.Node - - // list of leaf nodes, that we might be able to migrate - Leafs []*types.Node - } -) - -func Migrate(mg []types.Migrateable, ns *cct.Namespace, ctx context.Context) error { - var preProcW []string - - mig := &Migrator{} - svcMod := service.DefaultModule.With(ctx) - - var err error - - // 1. migrate all the users, so we can reference then accross the entire system - var mgUsr *types.Migrateable - for _, m := range mg { - if m.Name == userModHandle { - mgUsr = &m - break - } - } - - var uMap map[string]uint64 - if mgUsr != nil { - um, mgu, err := migrateUsers(mgUsr, ns, ctx) - if err != nil { - return err - } - - uMap = um - found := false - for i, m := range mg { - if m.Name == mgu.Name { - mg[i] = *mgu - found = true - break - } - } - - if !found { - mg = append(mg, *mgu) - } - } - - // Handle source joins - mg, err = sourceJoin(mg) - if err != nil { - return err - } - - // 2. prepare and link migration nodes - for _, mgR := range mg { - ss, err := splitStream(mgR) - if err != nil { - return err - } - - for _, m := range ss { - // 2.1 load module - mod, err := svcMod.FindByHandle(ns.ID, m.Name) - if err != nil { - preProcW = append(preProcW, err.Error()+" "+m.Name) - continue - } - - // 2.2 get header fields - r := csv.NewReader(m.Source) - var header []string - if m.Header != nil { - header = *m.Header - } else { - header, err = r.Read() - if err == io.EOF { - break - } - if err != nil { - return err - } - } - - // 2.3 create migration node - n := &types.Node{ - Name: m.Name, - Module: mod, - Namespace: ns, - Reader: r, - Header: header, - Lock: &sync.Mutex{}, - FieldMap: m.FieldMap, - ValueMap: m.ValueMap, - } - n = mig.AddNode(n) - - // 2.4 prepare additional migration nodes, to provide dep. constraints - for _, f := range mod.Fields { - if f.Kind == "Record" { - refMod := f.Options["moduleID"] - if refMod == nil { - preProcW = append(preProcW, "moduleField.record.missingRef"+" "+m.Name+" "+f.Name) - continue - } - - modID, ok := refMod.(string) - if !ok { - preProcW = append(preProcW, "moduleField.record.invalidRefFormat"+" "+m.Name+" "+f.Name) - continue - } - - vv, err := strconv.ParseUint(modID, 10, 64) - if err != nil { - preProcW = append(preProcW, err.Error()) - continue - } - - mm, err := svcMod.FindByID(ns.ID, vv) - if err != nil { - preProcW = append(preProcW, err.Error()+" "+m.Name+" "+f.Name+" "+modID) - continue - } - - nn := &types.Node{ - Name: mm.Handle, - Module: mm, - Namespace: ns, - Lock: &sync.Mutex{}, - } - - nn = mig.AddNode(nn) - n.LinkAdd(nn) - } - } - } - } - - spew.Dump("PRE-PROC WARNINGS", preProcW) - - mig.MakeAcyclic() - - for _, n := range mig.nodes { - // keep track of leaf nodes for later importing - if !n.HasChildren() { - mig.Leafs = append(mig.Leafs, n) - } - } - - fmt.Printf("migration.prepared\n") - fmt.Printf("no. of nodes %d\n", len(mig.nodes)) - fmt.Printf("no. of entry points %d\n", len(mig.Leafs)) - - err = mig.Migrate(ctx, uMap) - if err != nil { - return err - } - - return nil -} - -// if function resolves an existing node, it will merge with the provided node -// and return the new reference -func (m *Migrator) AddNode(n *types.Node) *types.Node { - var fn *types.Node - for _, nn := range m.nodes { - if nn.Compare(n) { - fn = nn - break - } - } - - if fn == nil { - m.nodes = append(m.nodes, n) - - return n - } - - fn.Merge(n) - return fn -} - -// it converts the graph from a cyclic (unsafe) graph to an acyclic (safe) graph -// that can be processed with a single algorithm -func (m *Migrator) MakeAcyclic() { - // splices the node from the cycle and thus preventing the cycle - splice := func(n *types.Node, from *types.Node) { - spl := n.Splice(from) - m.AddNode(spl) - } - - for _, n := range m.nodes { - if !n.Visited { - n.Traverse(splice) - } - } -} - -// processess migration nodes and migrates the data from the provided source files -func (m *Migrator) Migrate(ctx context.Context, users map[string]uint64) error { - fmt.Println("(•_•)") - fmt.Println("(-•_•)>⌐■-■") - fmt.Println("(⌐■_■)") - fmt.Print("\n\n\n") - - db := repository.DB(ctx) - repoRecord := repository.Record(ctx, db) - - bar := progressbar.New(len(m.nodes)) - - return db.Transaction(func() (err error) { - for len(m.Leafs) > 0 { - var wg sync.WaitGroup - - ch := make(chan types.PostProc, len(m.Leafs)) - for _, n := range m.Leafs { - wg.Add(1) - - // migrate & update leaf nodes - go n.Migrate(repoRecord, users, &wg, ch, bar) - } - - wg.Wait() - - var nl []*types.Node - for len(ch) > 0 { - pp := <-ch - if pp.Err != nil { - spew.Dump("ERR", pp.Err, pp.Node.Stringify()) - return pp.Err - } - - if pp.Leafs != nil { - for _, n := range pp.Leafs { - for _, l := range nl { - if n.Compare(l) { - goto skip - } - } - if n.Satisfied() { - nl = append(nl, n) - } - - skip: - } - - } - } - m.Leafs = nl - } - - fmt.Print("\n\n\n") - fmt.Println("(⌐■_■)") - fmt.Println("(-•_•)>⌐■-■") - fmt.Println("(•_•)") - - return nil - }) -} - -// migrates provided users -// this should be a pre-requisite to any further migration, as user information is required -func migrateUsers(mg *types.Migrateable, ns *cct.Namespace, ctx context.Context) (map[string]uint64, *types.Migrateable, error) { - db := repository.DB(ctx) - repoUser := sysRepo.User(ctx, db) - // this provides a map between SF ID -> CortezaID - mapping := make(map[string]uint64) - - // create a new buffer for user object, so we don't loose our data - var bb bytes.Buffer - ww := csv.NewWriter(&bb) - defer ww.Flush() - - // get fields - r := csv.NewReader(mg.Source) - header, err := r.Read() - if err != nil { - return nil, nil, err - } - ww.Write(header) - - // create users - for { - looper: - record, err := r.Read() - if err == io.EOF { - break - } - - if err != nil { - return nil, nil, err - } - - ww.Write(record) - - u := &sysTypes.User{} - for i, h := range header { - val := record[i] - - // when creating users we only care about a handfull of values. - // the rest are included in the module - switch h { - case "Username": - u.Username = record[i] - break - - case "Email": - u.Email = record[i] - break - - case "FirstName": - u.Name = record[i] - break - - case "LastName": - u.Name = u.Name + " " + record[i] - break - - case "Alias": - u.Handle = record[i] - break - - case "CreatedDate": - if val != "" { - u.CreatedAt, err = time.Parse(types.SfDateTime, val) - if err != nil { - return nil, nil, err - } - } - break - - case "LastModifiedDate": - if val != "" { - tt, err := time.Parse(types.SfDateTime, val) - u.UpdatedAt = &tt - if err != nil { - return nil, nil, err - } - } - break - - // ignore deleted values, as SF provides minimal info about those - case "IsDeleted": - if val == "1" { - goto looper - } - } - } - - // this allows us to reuse existing users - uu, err := repoUser.FindByEmail(u.Email) - if err == nil { - u = uu - } else { - u, err = repoUser.Create(u) - if err != nil { - return nil, nil, err - } - } - - mapping[record[0]] = u.ID - } - - uu := &types.Migrateable{ - Name: mg.Name, - Header: mg.Header, - Map: mg.Map, - Path: mg.Path, - Source: &bb, - } - - return mapping, uu, nil -} diff --git a/pkg/migrate/types/migrateable.go b/pkg/migrate/types/migrateable.go deleted file mode 100644 index 0b02a9bf1..000000000 --- a/pkg/migrate/types/migrateable.go +++ /dev/null @@ -1,37 +0,0 @@ -package types - -import ( - "io" -) - -var ( - SfDateTime = "2006-01-02 15:04:05" -) - -type ( - JoinedNodeEntry map[string]string - JoinedNodeRecords []JoinedNodeEntry - - Migrateable struct { - Name string - Path string - - Header *[]string - - Source io.Reader - // map is used for stream splitting - Map io.Reader - - // join is used for source joining - Join io.Reader - Joins []*JoinedNode - // alias.ID: [value] - FieldMap map[string]JoinedNodeRecords - // helps us determine what value field to use for linking - AliasMap map[string][]string - - // value is used for field value mapping - // field: value from: value to - ValueMap map[string]map[string]string - } -) diff --git a/pkg/ngImporter/README.adoc b/pkg/ngImporter/README.adoc new file mode 100644 index 000000000..a04beaf65 --- /dev/null +++ b/pkg/ngImporter/README.adoc @@ -0,0 +1,163 @@ += Corteza Next Gen Import System + +==== +The system currently supports only system users & compose records. +==== + +Corteza NG import defines a powerful system, that can be used for importing data with arbitrary complex structure. + +=== Algorithm: +* create `ImportSource` nodes based on the provided source files, +* import system users, +* handle **source join operations**; see below, +* handle **data mapping operations**; see below, +* construct `ImportNode` graph based on the provided `ImportSource` nodes, +* remove cycles from the generated `ImportNode` graph; see below, +* import data based on the constructed graph -- allows structured import. + +== Data Mapping + +The system also allows us to define how the imported data is mapped. +For example, we can map data from the `Contact.csv` file into the `Contact` and `Friend` modules. +It also allows us to combine multiple sources into a single source, with the help of above mentioned **Source Join Operation**. + +=== Algorithm +* parse the given `.map.json` +* for each entry of the given source: +** determine the used map based on the provided `where` field & the rows content +** based on the provided `map` entries, create new `ImportSource` nodes + +=== Example + +.source.map.json +[source,json] +---- +[ + { + "where": "type==\"type1\"", + + "map": [ + { + "from": "f1", + "to": "mod1.field" + }, + { + "from": "f2", + "to": "mod2.anotherField" + }, + { + "from": "f3", + "to": "mod3.lastField" + } + ] + } +] +---- + +== Source Join Operation + +The import system allows us to define relations between multiple import sources in order to construct a single import source; such as creating a `Client` record based on `User` and `Contact` records. + +=== Algorighm: +* parse the given `.join.json` file, +* for each import source, that defines a `.join.json` file: +** determine all join nodes that will be used in this operation, +** load all records for each joined node and create appropriate indexes for quicker lookup. +This indexed data is used later with the actual import. + +=== Example + +`.join.json` files define how multiple migration nodes should join into a single module. +The below example instructs, that the current source should be joined with `subMod` based on the `SubModRef == Id` relation, identified by the alias `smod`. +When the data, the values from this operation are available under the specified alias; for example `smod.SomeField`. + +.source.join.json +[source,json] +---- +{ + "SubModRef->smod": "subMod.Id" +} +---- + +.source.map.json +[source,json] +---- +[ + { + "map": [ + { + "from": "Id", + "to": "baseMod.Id" + }, + + { + "from": "baseField1", + "to": "baseMod.baseField1" + }, + + { + "from": "smod.field1", + "to": "baseMod.SubModField1" + } + ] + } +] +---- + +It is also possible to define a join operation on multiple fields at the same time -- useful in cases where we must construct a PK. +The following example uses `CreatedDate` and `CreatedById` fields as an index. + +[source,json] +---- +{ + "[CreatedDate,CreatedById]->smod": "subMod.[CreatedDate,CreatedById]" +} +---- + +== Value Mapping + +The system also allows us to map specific field values into another value for that field. +This can be useful for mapping values from the import source into our internal data modal values. +For example; we can map `In Progress` into `in_progress`. +The system also supports a default value, by using the `*` wildcard. + +=== Algorithrm + +* parse the given `.value.json` +* before applying a value for the given field, attempt to map the value +** if mapping is successful, use the mapped value, +** else if default value exists, use the default value, +** else use the original value. + +=== Example + +.source.values.json +[source,json] +---- +{ + "sys_status": { + "In Progress": "in_progress", + "Send to QA": "qa_pending", + "Submit Job": "qa_approved", + "*": "new" + } +} +---- + +The system also provides support for mathematical expressions. +If you wish to perform an expression, prefix the mapped value with `=EVL=`; for example `=EVL=numFmt(cell, \"%.0f\")`. + +Expression context: +* `cell` -- current cell, +* `row` -- current row, provided as a `{field: value}` object. + +The following example will remove the decimal point from every `sys_rating` in the given source. + +[source,json] +---- +{ + "sys_rating": { + "*": "=EVL=numFmt(cell, \"%.0f\")" + } +} +---- diff --git a/pkg/ngImporter/importUsers.go b/pkg/ngImporter/importUsers.go new file mode 100644 index 000000000..c41f67f33 --- /dev/null +++ b/pkg/ngImporter/importUsers.go @@ -0,0 +1,128 @@ +package migrate + +import ( + "bytes" + "context" + "encoding/csv" + "io" + "time" + + "github.com/cortezaproject/corteza-server/compose/repository" + cct "github.com/cortezaproject/corteza-server/compose/types" + "github.com/cortezaproject/corteza-server/pkg/ngImporter/types" + sysRepo "github.com/cortezaproject/corteza-server/system/repository" + sysTypes "github.com/cortezaproject/corteza-server/system/types" +) + +// imports system users based on the provided source +func importUsers(ctx context.Context, is *types.ImportSource, ns *cct.Namespace) (map[string]uint64, *types.ImportSource, error) { + db := repository.DB(ctx) + repoUser := sysRepo.User(ctx, db) + // this provides a map between importSourceID -> CortezaID + mapping := make(map[string]uint64) + + // create a new buffer for user object, so we don't loose our data + var bb bytes.Buffer + ww := csv.NewWriter(&bb) + defer ww.Flush() + + // get fields + r := csv.NewReader(is.Source) + header, err := r.Read() + if err != nil { + return nil, nil, err + } + ww.Write(header) + + // create users + for { + looper: + record, err := r.Read() + if err == io.EOF { + break + } + + if err != nil { + return nil, nil, err + } + + ww.Write(record) + + u := &sysTypes.User{} + for i, h := range header { + val := record[i] + + // when creating users we only care about a handfull of values. + // the rest are included in the module + switch h { + case "Username": + u.Username = record[i] + break + + case "Email": + u.Email = record[i] + break + + case "FirstName": + u.Name = record[i] + break + + case "LastName": + u.Name = u.Name + " " + record[i] + break + + case "Alias": + u.Handle = record[i] + break + + case "CreatedDate": + if val != "" { + u.CreatedAt, err = time.Parse(types.SfDateTimeLayout, val) + if err != nil { + return nil, nil, err + } + } + break + + case "LastModifiedDate": + if val != "" { + tt, err := time.Parse(types.SfDateTimeLayout, val) + u.UpdatedAt = &tt + if err != nil { + return nil, nil, err + } + } + break + + // ignore deleted values, as SF provides minimal info about those + case "IsDeleted": + if val == "1" { + goto looper + } + } + } + + // this allows us to reuse existing users + uu, err := repoUser.FindByEmail(u.Email) + if err == nil { + u = uu + } else { + u, err = repoUser.Create(u) + if err != nil { + return nil, nil, err + } + } + + mapping[record[0]] = u.ID + } + + nis := &types.ImportSource{ + Name: is.Name, + Header: is.Header, + DataMap: is.DataMap, + Path: is.Path, + Source: &bb, + } + + return mapping, nis, nil +} diff --git a/pkg/migrate/join.go b/pkg/ngImporter/joinData.go similarity index 54% rename from pkg/migrate/join.go rename to pkg/ngImporter/joinData.go index 9a75a1b6a..f20e0e53b 100644 --- a/pkg/migrate/join.go +++ b/pkg/ngImporter/joinData.go @@ -9,31 +9,33 @@ import ( "regexp" "strings" - "github.com/cortezaproject/corteza-server/pkg/migrate/types" + "github.com/cortezaproject/corteza-server/pkg/ngImporter/types" ) type ( // mapLink helps us keep track between base nodes, joined nodes and the fields used // for creating the link mapLink struct { - jn *types.JoinedNode + jn *types.JoinNode // field from the base node used in the op. - baseField []string + baseFields []string // alias to use for the base field; allows us to use the same field multiple times baseFieldAlias string // field from the joined node to use in the opp. - joinField []string + joinFields []string } // temporary node for the join op. node struct { - mg *types.Migrateable - // temporary migration node mapper based on aliases - mapper map[string]mapLink + is *types.ImportSource + // maps { alias: mapLink } + mapper map[string]mapLink + // maps { alisa: [field] } aliasMap map[string][]string } - exprEval struct { + // defines params that should be used in the given join opp. + joinEval struct { baseFields []string baseFieldAlias string joinModule string @@ -41,55 +43,54 @@ type ( } ) -// Creates JoinNodes for each Migrateable node included in a source join process -// See readme for more -func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) { +// it defines join context for each ImportSource that defines the join operation. +// it returns a new set of ImportSource nodes, excluding the joined ones. +// algorighem outline: +// * determine all nodes that define the join operation (base node) +// * take note of all nodes, that will be used in join operations (join node) +// * load data for each join node +// * index each row based on the specified alias and it's specified ID field +func joinData(iss []types.ImportSource) ([]types.ImportSource, error) { var rr []*node - joinedNodes := make(map[string]*types.JoinedNode) + joinedNodes := make(map[string]*types.JoinNode) - // Algorithm outline: - // 1. determine all migration nodes that will be used as joined nodes - // 2. load entries for each join node - // 3. construct new output migration nodes - - // 1. determination - for _, mg := range mm { + // determine base & join nodes + for _, mg := range iss { // this helps us avoid pesky pointer issues :) ww := mg - nd := &node{mg: &ww} + nd := &node{is: &ww} rr = append(rr, nd) - if mg.Join == nil { + if mg.SourceJoin == nil { continue } - // defer this, so we can do simple nil checks + // defer this initialization, so we can do simple nil checks nd.mapper = make(map[string]mapLink) nd.aliasMap = make(map[string][]string) - // join definition map defines how two sources are joined + // joinDef describes how nodes should be joined var joinDef map[string]string - src, _ := ioutil.ReadAll(mg.Join) + src, _ := ioutil.ReadAll(mg.SourceJoin) err := json.Unmarshal(src, &joinDef) if err != nil { return nil, err } - // find all joined nodes for the given base node + // determine join nodes for the given base node for base, condition := range joinDef { expr := splitExpr(base, condition) if _, ok := nd.aliasMap[expr.baseFieldAlias]; ok { - return nil, errors.New("alias.used " + nd.mg.Name + " " + expr.baseFieldAlias) + return nil, errors.New("alias.duplicated " + nd.is.Name + " " + expr.baseFieldAlias) } nd.aliasMap[expr.baseFieldAlias] = expr.baseFields - // register migration node as join node - for _, m := range mm { + for _, m := range iss { if m.Name == expr.joinModule { if _, ok := joinedNodes[expr.joinModule]; !ok { ww := m - joinedNodes[expr.joinModule] = &types.JoinedNode{ + joinedNodes[expr.joinModule] = &types.JoinNode{ Mg: &ww, Name: ww.Name, } @@ -99,9 +100,9 @@ func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) { jn := joinedNodes[expr.joinModule] nd.mapper[expr.baseFieldAlias] = mapLink{ jn: jn, - baseField: expr.baseFields, + baseFields: expr.baseFields, baseFieldAlias: expr.baseFieldAlias, - joinField: expr.joinFields, + joinFields: expr.joinFields, } break } @@ -109,9 +110,9 @@ func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) { } } - // 2. load entries + // load join node's data for _, jn := range joinedNodes { - jn.Entries = make([]map[string]string, 0) + jn.Records = make([]map[string]string, 0) reader := csv.NewReader(jn.Mg.Source) // header @@ -131,68 +132,68 @@ func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) { } row := make(map[string]string) - jn.Entries = append(jn.Entries, row) + jn.Records = append(jn.Records, row) for i, c := range record { row[header[i]] = c } } } - // 3. output - out := make([]types.Migrateable, 0) + // generate new SourceNodes & build join node indexes + out := make([]types.ImportSource, 0) for _, nd := range rr { - fnd := false + found := false for _, jn := range joinedNodes { - if nd.mg.Name == jn.Name { - fnd = true + if nd.is.Name == jn.Name { + found = true break } } // skip joined nodes - if fnd { + if found { continue } - o := nd.mg + oIs := nd.is // skip nodes with no mappings if nd.mapper == nil { - out = append(out, *o) + out = append(out, *oIs) continue } - o.AliasMap = nd.aliasMap + oIs.AliasMap = nd.aliasMap - // create `alias.ID`: []entry mappings, that will be used when migrating + // create `alias.ID`: []entry mappings, that will be used when importing for alias, link := range nd.mapper { - if o.FieldMap == nil { - o.FieldMap = make(map[string]types.JoinedNodeRecords) + if oIs.FieldMap == nil { + oIs.FieldMap = make(map[string]types.JoinNodeRecords) } - for _, e := range link.jn.Entries { + for _, e := range link.jn.Records { jj := []string{} - for _, jf := range link.joinField { + for _, jf := range link.joinFields { jj = append(jj, e[jf]) } - kk := alias + "." + strings.Join(jj[:], ".") - if _, ok := o.FieldMap[kk]; !ok { - o.FieldMap[kk] = make(types.JoinedNodeRecords, 0) + index := alias + "." + strings.Join(jj[:], ".") + if _, ok := oIs.FieldMap[index]; !ok { + oIs.FieldMap[index] = make(types.JoinNodeRecords, 0) } - o.FieldMap[kk] = append(o.FieldMap[kk], e) + oIs.FieldMap[index] = append(oIs.FieldMap[index], e) } } - out = append(out, *o) + out = append(out, *oIs) } return out, nil } // helper to split the join expression -func splitExpr(base, joined string) exprEval { - rr := exprEval{} +func splitExpr(base, joined string) joinEval { + rr := joinEval{} // original node rx := regexp.MustCompile(`\[?(?P[\w,]+)\]?->(?P\w+)`) diff --git a/pkg/ngImporter/main.go b/pkg/ngImporter/main.go new file mode 100644 index 000000000..a0d862f0c --- /dev/null +++ b/pkg/ngImporter/main.go @@ -0,0 +1,281 @@ +package migrate + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "strconv" + "sync" + + "github.com/cortezaproject/corteza-server/compose/repository" + "github.com/cortezaproject/corteza-server/compose/service" + cct "github.com/cortezaproject/corteza-server/compose/types" + "github.com/cortezaproject/corteza-server/pkg/ngImporter/types" + "github.com/davecgh/go-spew/spew" + "github.com/schollz/progressbar/v2" +) + +type ( + // Importer contains the context of the entire importing operation + Importer struct { + // a set of import nodes that define a graph + nodes []*types.ImportNode + // a set of leaf import nodes, that can be imported in the next cycle + Leafs []*types.ImportNode + } +) + +// Import initializes the import process for the given set of ImportSource nodes +// algorithm outline: +// * import all users used within the given import sources +// * handle source join operations +// * handle data mapping operations +// * build graph from ImportNodes based on the provided ImportSource nodes +// * remove cycles from the given graph +// * import data based on node dependencies +func Import(iss []types.ImportSource, ns *cct.Namespace, ctx context.Context) error { + // contains warnings raised by the pre process steps + var preProcW []string + imp := &Importer{} + svcMod := service.DefaultModule.With(ctx) + var err error + + // import users + var usrSrc *types.ImportSource + for _, m := range iss { + if m.Name == types.UserModHandle { + usrSrc = &m + break + } + } + + // maps sourceUserID to CortezaID + var uMap map[string]uint64 + if usrSrc != nil { + um, mgu, err := importUsers(ctx, usrSrc, ns) + if err != nil { + return err + } + uMap = um + + // replace the old source node with the new one (updated data stream) + found := false + for i, m := range iss { + if m.Name == mgu.Name { + iss[i] = *mgu + found = true + break + } + } + + if !found { + iss = append(iss, *mgu) + } + } + + iss, err = joinData(iss) + if err != nil { + return err + } + + // data mapping & graph construction + for _, is := range iss { + nIss, err := mapData(is) + if err != nil { + return err + } + + for _, nIs := range nIss { + // preload module + mod, err := svcMod.FindByHandle(ns.ID, nIs.Name) + if err != nil { + preProcW = append(preProcW, err.Error()+" "+nIs.Name) + continue + } + + // define headers + r := csv.NewReader(nIs.Source) + var header []string + if nIs.Header != nil { + header = *nIs.Header + } else { + header, err = r.Read() + if err == io.EOF { + break + } + if err != nil { + return err + } + } + + // create node & push to graph + n := &types.ImportNode{ + Name: nIs.Name, + Module: mod, + Namespace: ns, + Reader: r, + Header: header, + Lock: &sync.Mutex{}, + FieldMap: nIs.FieldMap, + ValueMap: nIs.ValueMap, + } + n = imp.AddNode(n) + + // prepare additional import nodes based on it's record fields + for _, f := range mod.Fields { + if f.Kind == "Record" { + refMod := f.Options["moduleID"] + if refMod == nil { + preProcW = append(preProcW, "moduleField.record.missingRef"+" "+nIs.Name+" "+f.Name) + continue + } + + modID, ok := refMod.(string) + if !ok { + preProcW = append(preProcW, "moduleField.record.invalidRefFormat"+" "+nIs.Name+" "+f.Name) + continue + } + + vv, err := strconv.ParseUint(modID, 10, 64) + if err != nil { + preProcW = append(preProcW, err.Error()) + continue + } + + mm, err := svcMod.FindByID(ns.ID, vv) + if err != nil { + preProcW = append(preProcW, err.Error()+" "+nIs.Name+" "+f.Name+" "+modID) + continue + } + + nn := &types.ImportNode{ + Name: mm.Handle, + Module: mm, + Namespace: ns, + Lock: &sync.Mutex{}, + } + + nn = imp.AddNode(nn) + n.LinkAdd(nn) + } + } + } + } + + spew.Dump("PRE-PROC WARNINGS", preProcW) + + imp.RemoveCycles() + + // take note of leaf nodes that can be imported right away + for _, n := range imp.nodes { + if !n.HasChildren() { + imp.Leafs = append(imp.Leafs, n) + } + } + + fmt.Printf("import.prepared\n") + fmt.Printf("no. of nodes %d\n", len(imp.nodes)) + fmt.Printf("no. of entry points %d\n", len(imp.Leafs)) + + err = imp.Import(ctx, uMap) + if err != nil { + return err + } + + return nil +} + +// AddNode attempts to add the given node into the graph. If the node can already be +// identified, the two nodes are merged. +func (m *Importer) AddNode(n *types.ImportNode) *types.ImportNode { + var fn *types.ImportNode + for _, nn := range m.nodes { + if nn.CompareTo(n) { + fn = nn + break + } + } + + if fn == nil { + m.nodes = append(m.nodes, n) + return n + } + + fn.Merge(n) + return fn +} + +// RemoveCycles removes all cycles in the given graph, by restructuring/recreating the nodes +// and their dependencies. +func (m *Importer) RemoveCycles() { + splice := func(n *types.ImportNode, from *types.ImportNode) { + spl := n.Splice(from) + m.AddNode(spl) + } + + for _, n := range m.nodes { + if !n.Visited { + n.SeekCycles(splice) + } + } +} + +// Import runs the import over each ImportNode in the given graph +func (m *Importer) Import(ctx context.Context, users map[string]uint64) error { + fmt.Println("(•_•)") + fmt.Println("(-•_•)>⌐■-■") + fmt.Println("(⌐■_■)") + fmt.Print("\n\n\n") + + db := repository.DB(ctx) + repoRecord := repository.Record(ctx, db) + bar := progressbar.New(len(m.nodes)) + + return db.Transaction(func() (err error) { + for len(m.Leafs) > 0 { + var wg sync.WaitGroup + + ch := make(chan types.PostProc, len(m.Leafs)) + for _, n := range m.Leafs { + wg.Add(1) + go n.Import(repoRecord, users, &wg, ch, bar) + } + + wg.Wait() + + var nl []*types.ImportNode + for len(ch) > 0 { + pp := <-ch + if pp.Err != nil { + spew.Dump("ERR", pp.Err, pp.Node.Stringify()) + return pp.Err + } + + // update the set of available leaf nodes + if pp.Leafs != nil { + for _, n := range pp.Leafs { + for _, l := range nl { + if n.CompareTo(l) { + goto skip + } + } + if n.Satisfied() { + nl = append(nl, n) + } + + skip: + } + } + } + m.Leafs = nl + } + + fmt.Print("\n\n\n") + fmt.Println("(⌐■_■)") + fmt.Println("(-•_•)>⌐■-■") + fmt.Println("(•_•)") + + return nil + }) +} diff --git a/pkg/migrate/stream.go b/pkg/ngImporter/mapData.go similarity index 62% rename from pkg/migrate/stream.go rename to pkg/ngImporter/mapData.go index e09fbfbc6..61f39f807 100644 --- a/pkg/migrate/stream.go +++ b/pkg/ngImporter/mapData.go @@ -10,11 +10,12 @@ import ( "io/ioutil" "strings" - "github.com/cortezaproject/corteza-server/pkg/migrate/types" + "github.com/cortezaproject/corteza-server/pkg/ngImporter/types" ) type ( - SplitBuffer struct { + // MapBuffer helps us keep track of new SourceNodes, defined by the map operation. + MapBuffer struct { buffer *bytes.Buffer name string row []string @@ -27,26 +28,31 @@ type ( } ) -// this function splits the stream of the given migrateable node. -// See readme for more info -func splitStream(m types.Migrateable) ([]types.Migrateable, error) { - var rr []types.Migrateable - if m.Map == nil { - rr = append(rr, m) +// maps data from the original ImportSource node into new ImportSource nodes +// based on the provided DataMap. +// Algorithm outline: +// * parse data map +// * for each record in the original import source, based on the map, create new +// MapBuffers +// * create new import sources based on map buffers +func mapData(is types.ImportSource) ([]types.ImportSource, error) { + var rr []types.ImportSource + if is.DataMap == nil { + rr = append(rr, is) return rr, nil } // unpack the map // @todo provide a better structure!! - var streamMap []map[string]interface{} - src, _ := ioutil.ReadAll(m.Map) - err := json.Unmarshal(src, &streamMap) + var dataMap []map[string]interface{} + src, _ := ioutil.ReadAll(is.DataMap) + err := json.Unmarshal(src, &dataMap) if err != nil { return nil, err } // get header fields - r := csv.NewReader(m.Source) + r := csv.NewReader(is.Source) header, err := r.Read() if err == io.EOF { return rr, nil @@ -56,16 +62,15 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) { return nil, err } - // maps header field -> field index for a nicer lookup + // maps { header field: field index } for a nicer lookup hMap := make(map[string]int) for i, h := range header { hMap[h] = i } - bufs := make(map[string]*SplitBuffer) - - // splitting magic + bufs := make(map[string]*MapBuffer) + // data mapping for { record, err := r.Read() if err == io.EOF { @@ -76,35 +81,35 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) { return nil, err } - // on next row, old stream's headers are finished + // on next row, currently acquired headers are marked as final for _, b := range bufs { b.hasHeader = true } - // find first applicable map, that can be used for the given row. - // default maps should not inclide a where field - for _, strmp := range streamMap { + // find applicable maps, that can be used for the given row. + // the system allows composition, so all applicable maps are used. + for _, strmp := range dataMap { if checkWhere(strmp["where"], record, hMap) { maps, ok := strmp["map"].([]interface{}) if !ok { - return nil, errors.New("streamMap.invalidMap " + m.Name) + return nil, errors.New("dataMap.invalidMap " + is.Name) } - // populate splitted streams + // handle current record and it's values for _, mp := range maps { mm, ok := mp.(map[string]interface{}) if !ok { - return nil, errors.New("streamMap.map.invalidEntry " + m.Name) + return nil, errors.New("dataMap.map.invalidEntry " + is.Name) } from, ok := mm["from"].(string) if !ok { - return nil, errors.New("streamMap.map.entry.invalidFrom " + m.Name) + return nil, errors.New("dataMap.map.entry.invalidFrom " + is.Name) } to, ok := mm["to"].(string) if !ok { - return nil, errors.New("streamMap.map.invalidTo " + m.Name) + return nil, errors.New("dataMap.map.invalidTo " + is.Name) } vv := strings.Split(to, ".") @@ -115,7 +120,7 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) { var bb bytes.Buffer ww := csv.NewWriter(&bb) defer ww.Flush() - bufs[nm] = &SplitBuffer{ + bufs[nm] = &MapBuffer{ buffer: &bb, writer: ww, name: nm, @@ -125,12 +130,12 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) { val := record[hMap[from]] - // handle joins + // handle data join if strings.Contains(from, ".") { // construct a `alias.joinOnID` value, so we can perform a simple map lookup pts := strings.Split(from, ".") baseFieldAlias := pts[0] - originalOn := m.AliasMap[baseFieldAlias] + originalOn := is.AliasMap[baseFieldAlias] joinField := pts[1] oo := []string{} @@ -160,15 +165,15 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) { } } - // make migrateable nodes from the generated streams + // construct output import source nodes for _, v := range bufs { - rr = append(rr, types.Migrateable{ + rr = append(rr, types.ImportSource{ Name: v.name, Source: v.buffer, Header: &v.header, - FieldMap: m.FieldMap, - AliasMap: m.AliasMap, - ValueMap: m.ValueMap, + FieldMap: is.FieldMap, + AliasMap: is.AliasMap, + ValueMap: is.ValueMap, }) } @@ -187,7 +192,7 @@ func checkWhere(where interface{}, row []string, hMap map[string]int) bool { return true } - ev, err := types.Exprs().NewEvaluable(ww) + ev, err := types.GLang().NewEvaluable(ww) if err != nil { panic(err) } diff --git a/pkg/ngImporter/types/ImportSource.go b/pkg/ngImporter/types/ImportSource.go new file mode 100644 index 000000000..fa3c4ddc4 --- /dev/null +++ b/pkg/ngImporter/types/ImportSource.go @@ -0,0 +1,51 @@ +package types + +import ( + "io" +) + +type ( + // JoinNodeEntry represents a { field: value } map for the given record in the + // join node. + JoinNodeEntry map[string]string + // JoinNodeRecords represents a set of records that should be used together when + // accessing the specified alias. + JoinNodeRecords []JoinNodeEntry + + // JoinNode represents an ImportSource that will be used in combination with another, + // to create a record based on multiple sources + JoinNode struct { + Mg *ImportSource + Name string + // Records represents a set of records, available in the given import source. + // [{ field: value }] + Records []map[string]string + } + + // ImportSource helps us perform some pre-proc operations before the actual import, + // such as data mapping and source joining. + ImportSource struct { + Name string + Path string + + Header *[]string + Source io.Reader + + // DataMap allows us to specify what values from the original source should + // map into what fields of what module + DataMap io.Reader + + // SourceJoin allows us to specify what import sources should be joined + // when mapping values. + SourceJoin io.Reader + // FieldMap stores records from the joined import source. + // Records are indexed by {alias: [record]} + FieldMap map[string]JoinNodeRecords + // AliasMap helps us determine what fields are stored under the given alias. + AliasMap map[string][]string + + // Value Map allows us to map specific values from the given import source into + // a specified value used by Corteza. + ValueMap map[string]map[string]string + } +) diff --git a/pkg/migrate/types/eval.go b/pkg/ngImporter/types/eval.go similarity index 79% rename from pkg/migrate/types/eval.go rename to pkg/ngImporter/types/eval.go index dd691ebf4..df74c83ba 100644 --- a/pkg/migrate/types/eval.go +++ b/pkg/ngImporter/types/eval.go @@ -8,8 +8,8 @@ import ( "github.com/PaesslerAG/gval" ) -// generates a simple gval language to be used within the migration -func Exprs() gval.Language { +// Glang generates a gval language, that can be used for expression evaluation +func GLang() gval.Language { return gval.NewLanguage( gval.JSON(), gval.Arithmetic(), @@ -30,12 +30,12 @@ func Exprs() gval.Language { // diff between two dates in seconds gval.Function("dateDiff", func(d1, d2 string) (float64, error) { - t1, err := time.Parse(SfDateTime, d1) + t1, err := time.Parse(SfDateTimeLayout, d1) if err != nil { return 0, err } - t2, err := time.Parse(SfDateTime, d2) + t2, err := time.Parse(SfDateTimeLayout, d2) if err != nil { return 0, err } diff --git a/pkg/ngImporter/types/general.go b/pkg/ngImporter/types/general.go new file mode 100644 index 000000000..5bfa60797 --- /dev/null +++ b/pkg/ngImporter/types/general.go @@ -0,0 +1,43 @@ +package types + +import "unicode/utf8" + +const ( + // SfDateTimeLayout represents the date-time template used by sales force + SfDateTimeLayout = "2006-01-02 15:04:05" + // DateOnlyLayout represents our internal date only date-time fields + DateOnlyLayout = "2006-01-02" + // TimeOnlyLayout represents our internal time only date-time fields + TimeOnlyLayout = "15:04:05Z" + + // EvalPrefix defines the prefix used by formulas, defined by value mapping + EvalPrefix = "=EVL=" + + UserModHandle = "User" +) + +var ( + // ExprLang contains gval language that should be used for any expression evaluation + ExprLang = GLang() +) + +type ( + // PostProc is used withing channels, used by the import process. + PostProc struct { + // Leafs defines a set of leaf nodes that can be imported next. + Leafs []*ImportNode + // ... + Err error + // Node contains the current node; usefull for debugging + Node *ImportNode + } +) + +// helper function for removing invalid UTF runes from the given string. +// SalesForce   with a special character, that is not supported in our char set +func fixUtf(r rune) rune { + if r == utf8.RuneError { + return -1 + } + return r +} diff --git a/pkg/migrate/types/node.go b/pkg/ngImporter/types/importNode.go similarity index 52% rename from pkg/migrate/types/node.go rename to pkg/ngImporter/types/importNode.go index 469598104..504fb6cb1 100644 --- a/pkg/migrate/types/node.go +++ b/pkg/ngImporter/types/importNode.go @@ -9,7 +9,6 @@ import ( "strings" "sync" "time" - "unicode/utf8" "github.com/cortezaproject/corteza-server/compose/repository" "github.com/cortezaproject/corteza-server/compose/types" @@ -17,89 +16,84 @@ import ( ) type ( - JoinedNode struct { - Mg *Migrateable - Name string - // id: field: value - Entries []map[string]string - } - - // graph node - Node struct { - // unique node name + // ImportNode helps us perform the actual import. + // Multiple ImportNodes define a graph, which helps us with dependency resolution + // and determination of proper import order + ImportNode struct { + // Name is used for unique node identification. It should match the target resource name. Name string - // keep note of parent refs, so we don't need to inverse it ;) - Parents []*Node - // keep note of Children, as they are our dependencies - Children []*Node - // mapping from migrated IDs to Corteza IDs - mapping map[string]Map - // determines if node is in current path; used for loop detection + // Parents represents the node's parents and the nodes that depend on this node. + Parents []*ImportNode + // Children represents the node's children and this node's dependencies. + Children []*ImportNode + + // used for idMap between import source's IDs into CortezaIDs + idMap map[string]Map + + // determines if node is in current path; used for cycle detection inPath bool - // determines if node was spliced; used to break the loop - spliced bool - original *Node - spl *Node - - // records are applicable in the case of spliced nodes + // determines if this node was spliced from the original path in order to break the cycle + isSpliced bool + // points to the original node (from spliced) + original *ImportNode + // points to the spliced node (from original) + spliced *ImportNode + // defines the records that were created by the spliced node. + // They are later used to insert missing dependencies. records []*types.Record - // some refs + // some refs... Module *types.Module Namespace *types.Namespace Reader *csv.Reader - // meta + // some meta... Header []string Visited bool + Lock *sync.Mutex - Lock *sync.Mutex + // FieldMap stores records from the joined import source. + // Records are indexed by {alias: [record]} + FieldMap map[string]JoinNodeRecords - // field: recordID: [value] - FieldMap map[string]JoinedNodeRecords - - // field: value from: value to + // Value Map allows us to map specific values from the given import source into + // a specified value used by Corteza. ValueMap map[string]map[string]string } - // map between migrated ID and Corteza ID + // Map maps between import sourceID -> CortezaID Map map[string]string - - PostProc struct { - Leafs []*Node - Err error - Node *Node - } ) -const ( - evalPrefix = "=EVL=" -) - -// helper, to determine if the two nodes are equal -func (n *Node) Compare(to *Node) bool { - return n.Name == to.Name && n.spliced == to.spliced +// CompareTo compares the two nodes. It uses the name and it's variant +func (n *ImportNode) CompareTo(to *ImportNode) bool { + return n.Name == to.Name && n.isSpliced == to.isSpliced } -// helper to stringify the node -func (n *Node) Stringify() string { - return fmt.Sprintf("NODE > n: %s; spliced: %t; inPath: %t;", n.Name, n.spliced, n.inPath) +// Stringify stringifies the given node; usefull for debugging +func (n *ImportNode) Stringify() string { + return fmt.Sprintf("NODE > n: %s; spliced: %t", n.Name, n.isSpliced) } -// adds a new map to the given node -func (n *Node) addMap(key string, m Map) { +// adds a new ID map to the given node's existing ID map +func (n *ImportNode) addMap(key string, m Map) { n.Lock.Lock() - if n.mapping == nil { - n.mapping = map[string]Map{} + defer n.Lock.Unlock() + + if n.idMap == nil { + n.idMap = map[string]Map{} } - n.mapping[key] = m - n.Lock.Unlock() + n.idMap[key] = m } -// does the actual data migration for the given node -func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) { +// Import performs the actual data import. +// The algoritem defines two steps: +// * source import, +// * reference correction. +// For details refer to the README. +func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) { defer wg.Done() defer bar.Add(1) @@ -107,12 +101,12 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string] mapping := make(Map) if n.Reader != nil { - // if records exist (from spliced node); correct refs - if !n.spliced && n.records != nil && len(n.records) > 0 { + // when importing a node that defined a spliced node, we should only correct it's refs + if !n.isSpliced && n.records != nil && len(n.records) > 0 { // we can just reuse the mapping object, since it will remain the same - mapping = n.mapping[fmt.Sprint(n.Module.ID)] + mapping = n.idMap[fmt.Sprint(n.Module.ID)] - err := updateRefs(n, repoRecord) + err := n.correctRecordRefs(repoRecord) if err != nil { ch <- PostProc{ Leafs: nil, @@ -122,7 +116,9 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string] return } } else { - mapping, err = importNodeSource(n, users, repoRecord) + // when importing a spliced node or a node that did not define a spliced node, we should + // import it's data + mapping, err = n.importNodeSource(users, repoRecord) if err != nil { ch <- PostProc{ Leafs: nil, @@ -134,9 +130,9 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string] } } - var rtr []*Node + var rtr []*ImportNode - var pps []*Node + var pps []*ImportNode for _, pp := range n.Parents { pps = append(pps, pp) } @@ -159,16 +155,16 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string] // determines if node is Satisfied and can be imported // it is Satisfied, when all of it's dependencies have been imported ie. no // more child refs -func (n *Node) Satisfied() bool { +func (n *ImportNode) Satisfied() bool { return !n.HasChildren() } -func (n *Node) HasChildren() bool { +func (n *ImportNode) HasChildren() bool { return n.Children != nil && len(n.Children) > 0 } // partially Merge the two nodes -func (n *Node) Merge(nn *Node) { +func (n *ImportNode) Merge(nn *ImportNode) { if nn.Module != nil { n.Module = nn.Module } @@ -187,13 +183,13 @@ func (n *Node) Merge(nn *Node) { } // link the two nodes -func (n *Node) LinkAdd(to *Node) { +func (n *ImportNode) LinkAdd(to *ImportNode) { n.addChild(to) to.addParent(n) } // remove the link between the two nodes -func (n *Node) LinkRemove(from *Node) { +func (n *ImportNode) LinkRemove(from *ImportNode) { n.Lock.Lock() n.Children = n.removeIfPresent(from, n.Children) from.Parents = from.removeIfPresent(n, from.Parents) @@ -201,21 +197,21 @@ func (n *Node) LinkRemove(from *Node) { } // adds a parent node to the given node -func (n *Node) addParent(add *Node) { +func (n *ImportNode) addParent(add *ImportNode) { n.Parents = n.addIfMissing(add, n.Parents) } // adds a child node to the given node -func (n *Node) addChild(add *Node) { +func (n *ImportNode) addChild(add *ImportNode) { n.Children = n.addIfMissing(add, n.Children) } // adds a node, if it doesn't yet exist -func (n *Node) addIfMissing(add *Node, list []*Node) []*Node { - var fn *Node +func (n *ImportNode) addIfMissing(add *ImportNode, list []*ImportNode) []*ImportNode { + var fn *ImportNode for _, nn := range list { - if add.Compare(nn) { + if add.CompareTo(nn) { fn = nn } } @@ -228,9 +224,9 @@ func (n *Node) addIfMissing(add *Node, list []*Node) []*Node { } // removes the node, if it exists -func (n *Node) removeIfPresent(rem *Node, list []*Node) []*Node { +func (n *ImportNode) removeIfPresent(rem *ImportNode, list []*ImportNode) []*ImportNode { for i, nn := range list { - if rem.Compare(nn) { + if rem.CompareTo(nn) { // https://stackoverflow.com/a/37335777 list[len(list)-1], list[i] = list[i], list[len(list)-1] return list[:len(list)-1] @@ -241,11 +237,11 @@ func (n *Node) removeIfPresent(rem *Node, list []*Node) []*Node { } // traverses the graph and notifies us of any cycles -func (n *Node) Traverse(cycle func(n *Node, to *Node)) { +func (n *ImportNode) SeekCycles(cycle func(n *ImportNode, to *ImportNode)) { n.inPath = true n.Visited = true - var cc []*Node + var cc []*ImportNode for _, nn := range n.Children { cc = append(cc, nn) } @@ -257,14 +253,14 @@ func (n *Node) Traverse(cycle func(n *Node, to *Node)) { if nn.inPath { cycle(n, nn) } else { - nn.Traverse(cycle) + nn.SeekCycles(cycle) } } n.inPath = false } -func (n *Node) DFS() { +func (n *ImportNode) DFS() { n.inPath = true for _, nn := range n.Children { @@ -277,14 +273,14 @@ func (n *Node) DFS() { } // clones the given node -func (n *Node) clone() *Node { - return &Node{ +func (n *ImportNode) clone() *ImportNode { + return &ImportNode{ Name: n.Name, Parents: n.Parents, Children: n.Children, - mapping: n.mapping, + idMap: n.idMap, inPath: n.inPath, - spliced: n.spliced, + isSpliced: n.isSpliced, original: n.original, records: n.records, Visited: n.Visited, @@ -296,18 +292,18 @@ func (n *Node) clone() *Node { } // splices the node from the original graph and removes the cycle -func (n *Node) Splice(from *Node) *Node { - splicedN := from.spl +func (n *ImportNode) Splice(from *ImportNode) *ImportNode { + splicedN := from.spliced if splicedN == nil { splicedN = from.clone() - splicedN.spliced = true + splicedN.isSpliced = true splicedN.Parents = nil splicedN.Children = nil splicedN.inPath = false splicedN.original = from - from.spl = splicedN + from.spliced = splicedN from.LinkAdd(splicedN) } @@ -318,7 +314,8 @@ func (n *Node) Splice(from *Node) *Node { return splicedN } -func sysField(f string) bool { +// helper to determine if this is a system field +func isSysField(f string) bool { switch f { case "OwnerId", "IsDeleted", @@ -331,8 +328,8 @@ func sysField(f string) bool { return false } -func updateRefs(n *Node, repo repository.RecordRepository) error { - // correct references +// updates the given node's record values that depend on another record +func (n *ImportNode) correctRecordRefs(repo repository.RecordRepository) error { for _, r := range n.records { for _, v := range r.Values { var f *types.ModuleField @@ -356,7 +353,9 @@ func updateRefs(n *Node, repo repository.RecordRepository) error { return errors.New("moduleField.record.invalidRefFormat") } - if mod, ok := n.mapping[ref]; ok { + // in case of a missing ref, make sure to remove the reference. + // otherwise this will cause internal errors when trying to resolve CortezaID. + if mod, ok := n.idMap[ref]; ok { if vv, ok := mod[val]; ok { v.Value = vv } else { @@ -370,7 +369,7 @@ func updateRefs(n *Node, repo repository.RecordRepository) error { } } - // update values + // update values; skip out empty values nv := types.RecordValueSet{} for _, v := range r.Values { if v.Value != "" { @@ -387,18 +386,10 @@ func updateRefs(n *Node, repo repository.RecordRepository) error { return nil } -func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRepository) (Map, error) { +// imports the given node's source +func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.RecordRepository) (Map, error) { mapping := make(Map) - fixUtf := func(r rune) rune { - if r == utf8.RuneError { - return -1 - } - return r - } - - lng := Exprs() - for { looper: record, err := n.Reader.Read() @@ -416,18 +407,22 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe CreatedAt: time.Now(), } - vals := types.RecordValueSet{} + recordValues := types.RecordValueSet{} + // convert the given row into a { field: value } map; this will be used + // for expression evaluation row := map[string]string{} for i, h := range n.Header { row[h] = record[i] } for i, h := range n.Header { + // will contain string values for the given field var values []string val := record[i] - if sysField(h) { + // system values should be kept on the record's root level + if isSysField(h) { switch h { case "OwnerId": rr.OwnedBy = users[val] @@ -442,7 +437,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe case "CreatedDate": if val != "" { - rr.CreatedAt, err = time.Parse(SfDateTime, val) + rr.CreatedAt, err = time.Parse(SfDateTimeLayout, val) if err != nil { return nil, err } @@ -459,7 +454,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe case "LastModifiedDate": if val != "" { - tt, err := time.Parse(SfDateTime, val) + tt, err := time.Parse(SfDateTimeLayout, val) rr.UpdatedAt = &tt if err != nil { return nil, err @@ -468,6 +463,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe break } } else { + // other user defined values should be kept inside `values` joined := "" if strings.Contains(h, ":") { pts := strings.Split(h, ":") @@ -475,6 +471,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe joined = pts[1] } + // find corresponding field var f *types.ModuleField for _, ff := range n.Module.Fields { if ff.Name == h { @@ -487,29 +484,29 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe continue } - // simple hack to fully support multiple values from a joined node - vvs := make([]string, 0) - - // check if joinable + // temp set of raw values that should be processed further. + // this gives us support for multi value fields when joining a sources + rawValues := make([]string, 0) if joined != "" { tmp := n.FieldMap[val] for _, e := range tmp { - vvs = append(vvs, e[joined]) + rawValues = append(rawValues, e[joined]) } } else { - vvs = []string{val} + rawValues = []string{val} } - for _, val := range vvs { + for _, val := range rawValues { + // handle references. Spliced nodes should not perform this step, since + // they can't rely on any dependency. This is corrected with `correctRecordRefs` if f.Options["moduleID"] != nil { - // spliced nodes should NOT manage their references - if !n.spliced { + if !n.isSpliced { ref, ok := f.Options["moduleID"].(string) if !ok { return nil, errors.New("moduleField.record.invalidRefFormat") } - if mod, ok := n.mapping[ref]; ok && val != "" { + if mod, ok := n.idMap[ref]; ok && val != "" { if v, ok := mod[val]; ok && v != "" { val = v } else { @@ -521,6 +518,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe } values = append(values, val) } else if f.Kind == "User" { + // handle user references if u, ok := users[val]; ok { val = fmt.Sprint(u) } else { @@ -528,58 +526,30 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe } values = append(values, val) } else { + // generic value handling val = strings.Map(fixUtf, val) - if val == "" { continue } - // assure date time formatting if f.Kind == "DateTime" { - pvl, err := time.Parse(SfDateTime, val) + val, err = assureDateFormat(val, f.Options) if err != nil { return nil, err } - - if f.Options.Bool("onlyDate") { - val = pvl.Format("2006-01-02") - } else if f.Options.Bool("onlyTime") { - val = pvl.Format("15:04:05Z") - } else { - val = pvl.Format(time.RFC3339) - } } values = append(values, val) } } + // value post-proc & record value creation for i, v := range values { - if fmp, ok := n.ValueMap[h]; ok { - nvl := "" - if mpv, ok := fmp[v]; ok { - nvl = mpv - } else if mpv, ok := fmp["*"]; ok { - nvl = mpv - } - - if nvl != "" && strings.HasPrefix(nvl, evalPrefix) { - opp := nvl[len(evalPrefix):len(nvl)] - ev, err := lng.NewEvaluable(opp) - if err != nil { - return nil, err - } - - v, err = ev.EvalString(context.Background(), map[string]interface{}{"cell": v, "row": row}) - if err != nil { - return nil, err - } - } else if nvl != "" { - v = nvl - } + v, err = n.mapValue(h, v, row) + if err != nil { + return nil, err } - - vals = append(vals, &types.RecordValue{ + recordValues = append(recordValues, &types.RecordValue{ Name: h, Value: v, Place: uint(i), @@ -595,22 +565,68 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe } // update record values with recordID - for _, v := range vals { + for _, v := range recordValues { v.RecordID = r.ID } - err = repo.UpdateValues(r.ID, vals) + err = repo.UpdateValues(r.ID, recordValues) if err != nil { return nil, err } // spliced nodes should preserve their records for later ref processing - if n.spliced { - rr.Values = vals + if n.isSpliced { + rr.Values = recordValues n.original.records = append(n.original.records, rr) } + // update mapping map mapping[record[0]] = fmt.Sprint(rr.ID) } return mapping, nil } + +func (n *ImportNode) mapValue(field, val string, row map[string]string) (string, error) { + if fmp, ok := n.ValueMap[field]; ok { + nvl := "" + if mpv, ok := fmp[val]; ok { + nvl = mpv + } else if mpv, ok := fmp["*"]; ok { + nvl = mpv + } + + // expression evaluation + if nvl != "" && strings.HasPrefix(nvl, EvalPrefix) { + opp := nvl[len(EvalPrefix):len(nvl)] + ev, err := ExprLang.NewEvaluable(opp) + if err != nil { + return "", err + } + + val, err = ev.EvalString(context.Background(), map[string]interface{}{"cell": val, "row": row}) + if err != nil { + return "", err + } + } else if nvl != "" { + val = nvl + } + } + return val, nil +} + +// helper to assure correct date time formatting +func assureDateFormat(val string, opt types.ModuleFieldOptions) (string, error) { + pvl, err := time.Parse(SfDateTimeLayout, val) + if err != nil { + return "", err + } + + if opt.Bool("onlyDate") { + val = pvl.Format(DateOnlyLayout) + } else if opt.Bool("onlyTime") { + val = pvl.Format(TimeOnlyLayout) + } else { + val = pvl.Format(time.RFC3339) + } + return val, nil +}