3
0

Refactor system's naming convention

This commit is contained in:
Tomaž Jerman 2020-03-25 13:37:43 +01:00
parent 9fada4be65
commit ecf85b340d
14 changed files with 964 additions and 901 deletions

View File

@ -102,7 +102,7 @@ func (app *App) RegisterCliCommands(p *cobra.Command) {
p.AddCommand(
commands.Importer(),
commands.Exporter(),
commands.Migrator(),
commands.ImporterNG(),
// temp command, will be removed in 2020.6
automation.ScriptExporter(SERVICE),
)

View File

@ -19,14 +19,14 @@ import (
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/cli"
mgg "github.com/cortezaproject/corteza-server/pkg/migrate"
mgt "github.com/cortezaproject/corteza-server/pkg/migrate/types"
ngi "github.com/cortezaproject/corteza-server/pkg/ngImporter"
ngt "github.com/cortezaproject/corteza-server/pkg/ngImporter/types"
)
func Migrator() *cobra.Command {
func ImporterNG() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate",
Short: "Migrate",
Use: "ng-import",
Short: "Importer next-gen",
Run: func(cmd *cobra.Command, args []string) {
var (
@ -37,7 +37,7 @@ func Migrator() *cobra.Command {
ns *types.Namespace
err error
mg []mgt.Migrateable
mg []ngt.ImportSource
)
svcNs := service.DefaultNamespace.With(ctx)
@ -70,12 +70,12 @@ func Migrator() *cobra.Command {
ext := filepath.Ext(info.Name())
name := info.Name()[0 : len(info.Name())-len(ext)]
mm := migrateableSource(mg, name)
mm := importSource(mg, name)
mm.Name = name
mm.Path = path
mm.Source = file
mg = migrateableAdd(mg, mm)
mg = addImportSource(mg, mm)
}
return nil
})
@ -92,11 +92,11 @@ func Migrator() *cobra.Command {
ext := filepath.Ext(info.Name())
// @todo improve this!!
name := info.Name()[0 : len(info.Name())-len(ext)-4]
mm := migrateableSource(mg, name)
mm := importSource(mg, name)
mm.Name = name
mm.Map = file
mm.DataMap = file
mg = migrateableAdd(mg, mm)
mg = addImportSource(mg, mm)
} else if strings.HasSuffix(info.Name(), ".join.json") {
file, err := os.Open(path)
if err != nil {
@ -106,11 +106,11 @@ func Migrator() *cobra.Command {
ext := filepath.Ext(info.Name())
// @todo improve this!!
name := info.Name()[0 : len(info.Name())-len(ext)-5]
mm := migrateableSource(mg, name)
mm := importSource(mg, name)
mm.Name = name
mm.Join = file
mm.SourceJoin = file
mg = migrateableAdd(mg, mm)
mg = addImportSource(mg, mm)
} else if strings.HasSuffix(info.Name(), ".value.json") {
file, err := os.Open(path)
if err != nil {
@ -120,7 +120,7 @@ func Migrator() *cobra.Command {
ext := filepath.Ext(info.Name())
// @todo improve this!!
name := info.Name()[0 : len(info.Name())-len(ext)-6]
mm := migrateableSource(mg, name)
mm := importSource(mg, name)
mm.Name = name
var vmp map[string]map[string]string
@ -131,7 +131,7 @@ func Migrator() *cobra.Command {
}
mm.ValueMap = vmp
mg = migrateableAdd(mg, mm)
mg = addImportSource(mg, mm)
}
return nil
})
@ -143,7 +143,7 @@ func Migrator() *cobra.Command {
// clean up migrateables
hasW := false
out := make([]mgt.Migrateable, 0)
out := make([]ngt.ImportSource, 0)
for _, m := range mg {
if m.Source != nil {
out = append(out, m)
@ -158,12 +158,14 @@ func Migrator() *cobra.Command {
fmt.Print("warnings detected; continue [y/N]? ")
_, err := fmt.Scanln(&rsp)
rsp = strings.ToLower(rsp)
if err != nil || rsp != "y" && rsp != "yes" {
log.Fatal("migration aborted due to warnings")
}
}
err = mgg.Migrate(out, ns, ctx)
err = ngi.Import(out, ns, ctx)
if err != nil {
panic(err)
}
@ -172,24 +174,25 @@ func Migrator() *cobra.Command {
}
cmd.Flags().String("namespace", "", "Import into namespace (by ID or string)")
cmd.Flags().String("src", "", "Directory with migration files")
cmd.Flags().String("meta", "", "Directory with meta files")
cmd.Flags().String("src", "", "Directory with import files")
cmd.Flags().String("meta", "", "Directory with import meta files")
return cmd
}
// small helper functions for migrateable node management
func migrateableSource(mg []mgt.Migrateable, name string) mgt.Migrateable {
// small helper functions for import source node management
func importSource(mg []ngt.ImportSource, name string) ngt.ImportSource {
for _, m := range mg {
if m.Name == name {
return m
}
}
return mgt.Migrateable{}
return ngt.ImportSource{}
}
func migrateableAdd(mg []mgt.Migrateable, mm mgt.Migrateable) []mgt.Migrateable {
func addImportSource(mg []ngt.ImportSource, mm ngt.ImportSource) []ngt.ImportSource {
for i, m := range mg {
if m.Name == mm.Name {
mg[i] = mm

View File

@ -1,191 +0,0 @@
= Data Migration System
This package implements a graph-based data migration system.
== Algorithm
* read & prepare files in a provided directory.
A file's name represents a module handle, so make sure that they match up,
* import system users and take note of there references.
System users will be needed with each migration, so they should be done in step 1,
* initialize the graph based on the provided files and their corresponding modules.
The system can automatically determine dependencies based on their fields,
* remove any cycle from the graph, by splicing one of the cycle nodes.
A spliced node is only able to import it's data, but it can not manage it's dependencies, as they might not yet be known.
Dependencies are updated when it's parent node is being processed,
* determine "leaf" nodes; the nodes with no dependencies.
These can be imported immediately.
* import each leaf node with satisfied dependencies.
Since leaf nodes have no more dependencies, they can be imported in parallel (@todo),
* when the node is finished importing, provide it's mapping information to each one of it's parents, as they will need it.
Also update the list of leaf nodes with parent nodes that have satisfied dependencies.
== Migration Mapping
A big part of this system is the support for migration maps; ie. what field from the original source should map into what module and under what field.
====
Currently only simple conditions, such as `type=specialType` are supported.
====
=== Algorithm
* unmarshal the given `.map.json`
* for each entry of the given source:
** determine the used map based on the provided `where` field & the rows content
** based on the provided `map` entries, update/create buffers
* flush data
=== Example
.source.map.json
[source,json]
----
[
{
"where": "type=type1",
"map": [
{
"from": "id",
"to": "splice_1.original"
},
{
"from": "id",
"to": "splice_2.original"
},
{
"from": "id",
"to": "splice.id"
},
{
"from": "field1",
"to": "splice.customName"
},
{
"from": "field2",
"to": "splice_1.customName"
},
{
"from": "field3",
"to": "splice_2.customName"
}
]
}
]
----
== Joining Migration Sources
An important feature is the system's ability to construct a migration map from multiple migration sources.
For example; we want to populate a `User` module, that includes data from `User.csv` and `SysUser.csv`.
=== Algorithrm
* unmarshal the given `.join.json`
* for each migration node that defines a `.join.json`:
** determine all "joined" migration nodes that will be used in this join operation,
** create `{ field: { id: [ value, ... ] } }` object for each base migration node, based on joined nodes,
** when processing the migration node, respect the above mentioned object and include the specified data.
=== Example
.source.join.json
`.join.json` files define how multiple migration nodes should join into a single module.
The below example instructs, that the current module should be constructed from it self and `subMod`; based on the `SubModRef` and `subMod.Id` relation.
When creating a `.map.json` file, values from the join operation are available under the specified alias (`...->alias`).
[source,json]
----
{
"SubModRef->smod": "subMod.Id"
}
----
.source.map.json
[source,json]
----
[
{
"map": [
{
"from": "Id",
"to": "baseMod.Id"
},
{
"from": "baseField1",
"to": "baseMod.baseField1"
},
{
"from": "smod.field1",
"to": "baseMod.SubModField1"
}
]
}
]
----
It is also possible to define a join operation on multiple fields at the same time -- useful in cases where a unique PK is not available and must be constructed.
The following example uses `CreatedDate` and `CreatedById` fields as an index.
[source,json]
----
{
"[CreatedDate,CreatedById]->smod": "subMod.[CreatedDate,CreatedById]"
}
----
== Value Mapping
The system allows us to map a specific value from the provided `.csv` file into a value used by the system.
For example; we can map `In Progress` into `in_progress`.
The mapping also supports a default value, by using the `*` wildcard.
=== Algorithrm
* unmarshal the given `.value.json`
* before applying a value for the given field, attempt to map the value
** if mapping is successful, use the mapped value,
** else if default value exists, use the default value,
** else use the original value.
=== Example
.source.values.json
The following value mapping maps `sys_status` field's values; the left one into the right one, with a default of `"new"` (`"*": "new"`).
[source,json]
----
{
"sys_status": {
"In Progress": "in_progress",
"Send to QA": "qa_pending",
"Submit Job": "qa_approved",
"*": "new"
}
}
----
The system also provides support for arbitrary mathematical expressions.
If you wish to perform an expression, prefix the mapped value with `=EVL=`; for example `=EVL=numFmt(cell, \"%.0f\")`.
Variables:
* current cell -- `cell`,
* current row -- `row`; access cell by it's name -- eg. `row.FirstName + row.LastName`.
The following example will remove the decimal point from every `sys_rating` in the given source.
[source,json]
----
{
"sys_rating": {
"*": "=EVL=numFmt(cell, \"%.0f\")"
}
}
----

View File

@ -1,400 +0,0 @@
package migrate
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"io"
"strconv"
"sync"
"time"
"github.com/cortezaproject/corteza-server/compose/repository"
"github.com/cortezaproject/corteza-server/compose/service"
cct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/migrate/types"
sysRepo "github.com/cortezaproject/corteza-server/system/repository"
sysTypes "github.com/cortezaproject/corteza-server/system/types"
"github.com/davecgh/go-spew/spew"
"github.com/schollz/progressbar/v2"
)
var (
userModHandle = "User"
)
type (
Migrator struct {
// a set of nodes included in the migration
nodes []*types.Node
// list of leaf nodes, that we might be able to migrate
Leafs []*types.Node
}
)
func Migrate(mg []types.Migrateable, ns *cct.Namespace, ctx context.Context) error {
var preProcW []string
mig := &Migrator{}
svcMod := service.DefaultModule.With(ctx)
var err error
// 1. migrate all the users, so we can reference then accross the entire system
var mgUsr *types.Migrateable
for _, m := range mg {
if m.Name == userModHandle {
mgUsr = &m
break
}
}
var uMap map[string]uint64
if mgUsr != nil {
um, mgu, err := migrateUsers(mgUsr, ns, ctx)
if err != nil {
return err
}
uMap = um
found := false
for i, m := range mg {
if m.Name == mgu.Name {
mg[i] = *mgu
found = true
break
}
}
if !found {
mg = append(mg, *mgu)
}
}
// Handle source joins
mg, err = sourceJoin(mg)
if err != nil {
return err
}
// 2. prepare and link migration nodes
for _, mgR := range mg {
ss, err := splitStream(mgR)
if err != nil {
return err
}
for _, m := range ss {
// 2.1 load module
mod, err := svcMod.FindByHandle(ns.ID, m.Name)
if err != nil {
preProcW = append(preProcW, err.Error()+" "+m.Name)
continue
}
// 2.2 get header fields
r := csv.NewReader(m.Source)
var header []string
if m.Header != nil {
header = *m.Header
} else {
header, err = r.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
}
// 2.3 create migration node
n := &types.Node{
Name: m.Name,
Module: mod,
Namespace: ns,
Reader: r,
Header: header,
Lock: &sync.Mutex{},
FieldMap: m.FieldMap,
ValueMap: m.ValueMap,
}
n = mig.AddNode(n)
// 2.4 prepare additional migration nodes, to provide dep. constraints
for _, f := range mod.Fields {
if f.Kind == "Record" {
refMod := f.Options["moduleID"]
if refMod == nil {
preProcW = append(preProcW, "moduleField.record.missingRef"+" "+m.Name+" "+f.Name)
continue
}
modID, ok := refMod.(string)
if !ok {
preProcW = append(preProcW, "moduleField.record.invalidRefFormat"+" "+m.Name+" "+f.Name)
continue
}
vv, err := strconv.ParseUint(modID, 10, 64)
if err != nil {
preProcW = append(preProcW, err.Error())
continue
}
mm, err := svcMod.FindByID(ns.ID, vv)
if err != nil {
preProcW = append(preProcW, err.Error()+" "+m.Name+" "+f.Name+" "+modID)
continue
}
nn := &types.Node{
Name: mm.Handle,
Module: mm,
Namespace: ns,
Lock: &sync.Mutex{},
}
nn = mig.AddNode(nn)
n.LinkAdd(nn)
}
}
}
}
spew.Dump("PRE-PROC WARNINGS", preProcW)
mig.MakeAcyclic()
for _, n := range mig.nodes {
// keep track of leaf nodes for later importing
if !n.HasChildren() {
mig.Leafs = append(mig.Leafs, n)
}
}
fmt.Printf("migration.prepared\n")
fmt.Printf("no. of nodes %d\n", len(mig.nodes))
fmt.Printf("no. of entry points %d\n", len(mig.Leafs))
err = mig.Migrate(ctx, uMap)
if err != nil {
return err
}
return nil
}
// if function resolves an existing node, it will merge with the provided node
// and return the new reference
func (m *Migrator) AddNode(n *types.Node) *types.Node {
var fn *types.Node
for _, nn := range m.nodes {
if nn.Compare(n) {
fn = nn
break
}
}
if fn == nil {
m.nodes = append(m.nodes, n)
return n
}
fn.Merge(n)
return fn
}
// it converts the graph from a cyclic (unsafe) graph to an acyclic (safe) graph
// that can be processed with a single algorithm
func (m *Migrator) MakeAcyclic() {
// splices the node from the cycle and thus preventing the cycle
splice := func(n *types.Node, from *types.Node) {
spl := n.Splice(from)
m.AddNode(spl)
}
for _, n := range m.nodes {
if !n.Visited {
n.Traverse(splice)
}
}
}
// processess migration nodes and migrates the data from the provided source files
func (m *Migrator) Migrate(ctx context.Context, users map[string]uint64) error {
fmt.Println("(•_•)")
fmt.Println("(-•_•)>⌐■-■")
fmt.Println("(⌐■_■)")
fmt.Print("\n\n\n")
db := repository.DB(ctx)
repoRecord := repository.Record(ctx, db)
bar := progressbar.New(len(m.nodes))
return db.Transaction(func() (err error) {
for len(m.Leafs) > 0 {
var wg sync.WaitGroup
ch := make(chan types.PostProc, len(m.Leafs))
for _, n := range m.Leafs {
wg.Add(1)
// migrate & update leaf nodes
go n.Migrate(repoRecord, users, &wg, ch, bar)
}
wg.Wait()
var nl []*types.Node
for len(ch) > 0 {
pp := <-ch
if pp.Err != nil {
spew.Dump("ERR", pp.Err, pp.Node.Stringify())
return pp.Err
}
if pp.Leafs != nil {
for _, n := range pp.Leafs {
for _, l := range nl {
if n.Compare(l) {
goto skip
}
}
if n.Satisfied() {
nl = append(nl, n)
}
skip:
}
}
}
m.Leafs = nl
}
fmt.Print("\n\n\n")
fmt.Println("(⌐■_■)")
fmt.Println("(-•_•)>⌐■-■")
fmt.Println("(•_•)")
return nil
})
}
// migrates provided users
// this should be a pre-requisite to any further migration, as user information is required
func migrateUsers(mg *types.Migrateable, ns *cct.Namespace, ctx context.Context) (map[string]uint64, *types.Migrateable, error) {
db := repository.DB(ctx)
repoUser := sysRepo.User(ctx, db)
// this provides a map between SF ID -> CortezaID
mapping := make(map[string]uint64)
// create a new buffer for user object, so we don't loose our data
var bb bytes.Buffer
ww := csv.NewWriter(&bb)
defer ww.Flush()
// get fields
r := csv.NewReader(mg.Source)
header, err := r.Read()
if err != nil {
return nil, nil, err
}
ww.Write(header)
// create users
for {
looper:
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, nil, err
}
ww.Write(record)
u := &sysTypes.User{}
for i, h := range header {
val := record[i]
// when creating users we only care about a handfull of values.
// the rest are included in the module
switch h {
case "Username":
u.Username = record[i]
break
case "Email":
u.Email = record[i]
break
case "FirstName":
u.Name = record[i]
break
case "LastName":
u.Name = u.Name + " " + record[i]
break
case "Alias":
u.Handle = record[i]
break
case "CreatedDate":
if val != "" {
u.CreatedAt, err = time.Parse(types.SfDateTime, val)
if err != nil {
return nil, nil, err
}
}
break
case "LastModifiedDate":
if val != "" {
tt, err := time.Parse(types.SfDateTime, val)
u.UpdatedAt = &tt
if err != nil {
return nil, nil, err
}
}
break
// ignore deleted values, as SF provides minimal info about those
case "IsDeleted":
if val == "1" {
goto looper
}
}
}
// this allows us to reuse existing users
uu, err := repoUser.FindByEmail(u.Email)
if err == nil {
u = uu
} else {
u, err = repoUser.Create(u)
if err != nil {
return nil, nil, err
}
}
mapping[record[0]] = u.ID
}
uu := &types.Migrateable{
Name: mg.Name,
Header: mg.Header,
Map: mg.Map,
Path: mg.Path,
Source: &bb,
}
return mapping, uu, nil
}

View File

@ -1,37 +0,0 @@
package types
import (
"io"
)
var (
SfDateTime = "2006-01-02 15:04:05"
)
type (
JoinedNodeEntry map[string]string
JoinedNodeRecords []JoinedNodeEntry
Migrateable struct {
Name string
Path string
Header *[]string
Source io.Reader
// map is used for stream splitting
Map io.Reader
// join is used for source joining
Join io.Reader
Joins []*JoinedNode
// alias.ID: [value]
FieldMap map[string]JoinedNodeRecords
// helps us determine what value field to use for linking
AliasMap map[string][]string
// value is used for field value mapping
// field: value from: value to
ValueMap map[string]map[string]string
}
)

163
pkg/ngImporter/README.adoc Normal file
View File

@ -0,0 +1,163 @@
= Corteza Next Gen Import System
====
The system currently supports only system users & compose records.
====
Corteza NG import defines a powerful system, that can be used for importing data with arbitrary complex structure.
=== Algorithm:
* create `ImportSource` nodes based on the provided source files,
* import system users,
* handle **source join operations**; see below,
* handle **data mapping operations**; see below,
* construct `ImportNode` graph based on the provided `ImportSource` nodes,
* remove cycles from the generated `ImportNode` graph; see below,
* import data based on the constructed graph -- allows structured import.
== Data Mapping
The system also allows us to define how the imported data is mapped.
For example, we can map data from the `Contact.csv` file into the `Contact` and `Friend` modules.
It also allows us to combine multiple sources into a single source, with the help of above mentioned **Source Join Operation**.
=== Algorithm
* parse the given `.map.json`
* for each entry of the given source:
** determine the used map based on the provided `where` field & the rows content
** based on the provided `map` entries, create new `ImportSource` nodes
=== Example
.source.map.json
[source,json]
----
[
{
"where": "type==\"type1\"",
"map": [
{
"from": "f1",
"to": "mod1.field"
},
{
"from": "f2",
"to": "mod2.anotherField"
},
{
"from": "f3",
"to": "mod3.lastField"
}
]
}
]
----
== Source Join Operation
The import system allows us to define relations between multiple import sources in order to construct a single import source; such as creating a `Client` record based on `User` and `Contact` records.
=== Algorighm:
* parse the given `.join.json` file,
* for each import source, that defines a `.join.json` file:
** determine all join nodes that will be used in this operation,
** load all records for each joined node and create appropriate indexes for quicker lookup.
This indexed data is used later with the actual import.
=== Example
`.join.json` files define how multiple migration nodes should join into a single module.
The below example instructs, that the current source should be joined with `subMod` based on the `SubModRef == Id` relation, identified by the alias `smod`.
When the data, the values from this operation are available under the specified alias; for example `smod.SomeField`.
.source.join.json
[source,json]
----
{
"SubModRef->smod": "subMod.Id"
}
----
.source.map.json
[source,json]
----
[
{
"map": [
{
"from": "Id",
"to": "baseMod.Id"
},
{
"from": "baseField1",
"to": "baseMod.baseField1"
},
{
"from": "smod.field1",
"to": "baseMod.SubModField1"
}
]
}
]
----
It is also possible to define a join operation on multiple fields at the same time -- useful in cases where we must construct a PK.
The following example uses `CreatedDate` and `CreatedById` fields as an index.
[source,json]
----
{
"[CreatedDate,CreatedById]->smod": "subMod.[CreatedDate,CreatedById]"
}
----
== Value Mapping
The system also allows us to map specific field values into another value for that field.
This can be useful for mapping values from the import source into our internal data modal values.
For example; we can map `In Progress` into `in_progress`.
The system also supports a default value, by using the `*` wildcard.
=== Algorithrm
* parse the given `.value.json`
* before applying a value for the given field, attempt to map the value
** if mapping is successful, use the mapped value,
** else if default value exists, use the default value,
** else use the original value.
=== Example
.source.values.json
[source,json]
----
{
"sys_status": {
"In Progress": "in_progress",
"Send to QA": "qa_pending",
"Submit Job": "qa_approved",
"*": "new"
}
}
----
The system also provides support for mathematical expressions.
If you wish to perform an expression, prefix the mapped value with `=EVL=`; for example `=EVL=numFmt(cell, \"%.0f\")`.
Expression context:
* `cell` -- current cell,
* `row` -- current row, provided as a `{field: value}` object.
The following example will remove the decimal point from every `sys_rating` in the given source.
[source,json]
----
{
"sys_rating": {
"*": "=EVL=numFmt(cell, \"%.0f\")"
}
}
----

View File

@ -0,0 +1,128 @@
package migrate
import (
"bytes"
"context"
"encoding/csv"
"io"
"time"
"github.com/cortezaproject/corteza-server/compose/repository"
cct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/ngImporter/types"
sysRepo "github.com/cortezaproject/corteza-server/system/repository"
sysTypes "github.com/cortezaproject/corteza-server/system/types"
)
// imports system users based on the provided source
func importUsers(ctx context.Context, is *types.ImportSource, ns *cct.Namespace) (map[string]uint64, *types.ImportSource, error) {
db := repository.DB(ctx)
repoUser := sysRepo.User(ctx, db)
// this provides a map between importSourceID -> CortezaID
mapping := make(map[string]uint64)
// create a new buffer for user object, so we don't loose our data
var bb bytes.Buffer
ww := csv.NewWriter(&bb)
defer ww.Flush()
// get fields
r := csv.NewReader(is.Source)
header, err := r.Read()
if err != nil {
return nil, nil, err
}
ww.Write(header)
// create users
for {
looper:
record, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, nil, err
}
ww.Write(record)
u := &sysTypes.User{}
for i, h := range header {
val := record[i]
// when creating users we only care about a handfull of values.
// the rest are included in the module
switch h {
case "Username":
u.Username = record[i]
break
case "Email":
u.Email = record[i]
break
case "FirstName":
u.Name = record[i]
break
case "LastName":
u.Name = u.Name + " " + record[i]
break
case "Alias":
u.Handle = record[i]
break
case "CreatedDate":
if val != "" {
u.CreatedAt, err = time.Parse(types.SfDateTimeLayout, val)
if err != nil {
return nil, nil, err
}
}
break
case "LastModifiedDate":
if val != "" {
tt, err := time.Parse(types.SfDateTimeLayout, val)
u.UpdatedAt = &tt
if err != nil {
return nil, nil, err
}
}
break
// ignore deleted values, as SF provides minimal info about those
case "IsDeleted":
if val == "1" {
goto looper
}
}
}
// this allows us to reuse existing users
uu, err := repoUser.FindByEmail(u.Email)
if err == nil {
u = uu
} else {
u, err = repoUser.Create(u)
if err != nil {
return nil, nil, err
}
}
mapping[record[0]] = u.ID
}
nis := &types.ImportSource{
Name: is.Name,
Header: is.Header,
DataMap: is.DataMap,
Path: is.Path,
Source: &bb,
}
return mapping, nis, nil
}

View File

@ -9,31 +9,33 @@ import (
"regexp"
"strings"
"github.com/cortezaproject/corteza-server/pkg/migrate/types"
"github.com/cortezaproject/corteza-server/pkg/ngImporter/types"
)
type (
// mapLink helps us keep track between base nodes, joined nodes and the fields used
// for creating the link
mapLink struct {
jn *types.JoinedNode
jn *types.JoinNode
// field from the base node used in the op.
baseField []string
baseFields []string
// alias to use for the base field; allows us to use the same field multiple times
baseFieldAlias string
// field from the joined node to use in the opp.
joinField []string
joinFields []string
}
// temporary node for the join op.
node struct {
mg *types.Migrateable
// temporary migration node mapper based on aliases
mapper map[string]mapLink
is *types.ImportSource
// maps { alias: mapLink }
mapper map[string]mapLink
// maps { alisa: [field] }
aliasMap map[string][]string
}
exprEval struct {
// defines params that should be used in the given join opp.
joinEval struct {
baseFields []string
baseFieldAlias string
joinModule string
@ -41,55 +43,54 @@ type (
}
)
// Creates JoinNodes for each Migrateable node included in a source join process
// See readme for more
func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) {
// it defines join context for each ImportSource that defines the join operation.
// it returns a new set of ImportSource nodes, excluding the joined ones.
// algorighem outline:
// * determine all nodes that define the join operation (base node)
// * take note of all nodes, that will be used in join operations (join node)
// * load data for each join node
// * index each row based on the specified alias and it's specified ID field
func joinData(iss []types.ImportSource) ([]types.ImportSource, error) {
var rr []*node
joinedNodes := make(map[string]*types.JoinedNode)
joinedNodes := make(map[string]*types.JoinNode)
// Algorithm outline:
// 1. determine all migration nodes that will be used as joined nodes
// 2. load entries for each join node
// 3. construct new output migration nodes
// 1. determination
for _, mg := range mm {
// determine base & join nodes
for _, mg := range iss {
// this helps us avoid pesky pointer issues :)
ww := mg
nd := &node{mg: &ww}
nd := &node{is: &ww}
rr = append(rr, nd)
if mg.Join == nil {
if mg.SourceJoin == nil {
continue
}
// defer this, so we can do simple nil checks
// defer this initialization, so we can do simple nil checks
nd.mapper = make(map[string]mapLink)
nd.aliasMap = make(map[string][]string)
// join definition map defines how two sources are joined
// joinDef describes how nodes should be joined
var joinDef map[string]string
src, _ := ioutil.ReadAll(mg.Join)
src, _ := ioutil.ReadAll(mg.SourceJoin)
err := json.Unmarshal(src, &joinDef)
if err != nil {
return nil, err
}
// find all joined nodes for the given base node
// determine join nodes for the given base node
for base, condition := range joinDef {
expr := splitExpr(base, condition)
if _, ok := nd.aliasMap[expr.baseFieldAlias]; ok {
return nil, errors.New("alias.used " + nd.mg.Name + " " + expr.baseFieldAlias)
return nil, errors.New("alias.duplicated " + nd.is.Name + " " + expr.baseFieldAlias)
}
nd.aliasMap[expr.baseFieldAlias] = expr.baseFields
// register migration node as join node
for _, m := range mm {
for _, m := range iss {
if m.Name == expr.joinModule {
if _, ok := joinedNodes[expr.joinModule]; !ok {
ww := m
joinedNodes[expr.joinModule] = &types.JoinedNode{
joinedNodes[expr.joinModule] = &types.JoinNode{
Mg: &ww,
Name: ww.Name,
}
@ -99,9 +100,9 @@ func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) {
jn := joinedNodes[expr.joinModule]
nd.mapper[expr.baseFieldAlias] = mapLink{
jn: jn,
baseField: expr.baseFields,
baseFields: expr.baseFields,
baseFieldAlias: expr.baseFieldAlias,
joinField: expr.joinFields,
joinFields: expr.joinFields,
}
break
}
@ -109,9 +110,9 @@ func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) {
}
}
// 2. load entries
// load join node's data
for _, jn := range joinedNodes {
jn.Entries = make([]map[string]string, 0)
jn.Records = make([]map[string]string, 0)
reader := csv.NewReader(jn.Mg.Source)
// header
@ -131,68 +132,68 @@ func sourceJoin(mm []types.Migrateable) ([]types.Migrateable, error) {
}
row := make(map[string]string)
jn.Entries = append(jn.Entries, row)
jn.Records = append(jn.Records, row)
for i, c := range record {
row[header[i]] = c
}
}
}
// 3. output
out := make([]types.Migrateable, 0)
// generate new SourceNodes & build join node indexes
out := make([]types.ImportSource, 0)
for _, nd := range rr {
fnd := false
found := false
for _, jn := range joinedNodes {
if nd.mg.Name == jn.Name {
fnd = true
if nd.is.Name == jn.Name {
found = true
break
}
}
// skip joined nodes
if fnd {
if found {
continue
}
o := nd.mg
oIs := nd.is
// skip nodes with no mappings
if nd.mapper == nil {
out = append(out, *o)
out = append(out, *oIs)
continue
}
o.AliasMap = nd.aliasMap
oIs.AliasMap = nd.aliasMap
// create `alias.ID`: []entry mappings, that will be used when migrating
// create `alias.ID`: []entry mappings, that will be used when importing
for alias, link := range nd.mapper {
if o.FieldMap == nil {
o.FieldMap = make(map[string]types.JoinedNodeRecords)
if oIs.FieldMap == nil {
oIs.FieldMap = make(map[string]types.JoinNodeRecords)
}
for _, e := range link.jn.Entries {
for _, e := range link.jn.Records {
jj := []string{}
for _, jf := range link.joinField {
for _, jf := range link.joinFields {
jj = append(jj, e[jf])
}
kk := alias + "." + strings.Join(jj[:], ".")
if _, ok := o.FieldMap[kk]; !ok {
o.FieldMap[kk] = make(types.JoinedNodeRecords, 0)
index := alias + "." + strings.Join(jj[:], ".")
if _, ok := oIs.FieldMap[index]; !ok {
oIs.FieldMap[index] = make(types.JoinNodeRecords, 0)
}
o.FieldMap[kk] = append(o.FieldMap[kk], e)
oIs.FieldMap[index] = append(oIs.FieldMap[index], e)
}
}
out = append(out, *o)
out = append(out, *oIs)
}
return out, nil
}
// helper to split the join expression
func splitExpr(base, joined string) exprEval {
rr := exprEval{}
func splitExpr(base, joined string) joinEval {
rr := joinEval{}
// original node
rx := regexp.MustCompile(`\[?(?P<bf>[\w,]+)\]?->(?P<bfa>\w+)`)

281
pkg/ngImporter/main.go Normal file
View File

@ -0,0 +1,281 @@
package migrate
import (
"context"
"encoding/csv"
"fmt"
"io"
"strconv"
"sync"
"github.com/cortezaproject/corteza-server/compose/repository"
"github.com/cortezaproject/corteza-server/compose/service"
cct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/ngImporter/types"
"github.com/davecgh/go-spew/spew"
"github.com/schollz/progressbar/v2"
)
type (
// Importer contains the context of the entire importing operation
Importer struct {
// a set of import nodes that define a graph
nodes []*types.ImportNode
// a set of leaf import nodes, that can be imported in the next cycle
Leafs []*types.ImportNode
}
)
// Import initializes the import process for the given set of ImportSource nodes
// algorithm outline:
// * import all users used within the given import sources
// * handle source join operations
// * handle data mapping operations
// * build graph from ImportNodes based on the provided ImportSource nodes
// * remove cycles from the given graph
// * import data based on node dependencies
func Import(iss []types.ImportSource, ns *cct.Namespace, ctx context.Context) error {
// contains warnings raised by the pre process steps
var preProcW []string
imp := &Importer{}
svcMod := service.DefaultModule.With(ctx)
var err error
// import users
var usrSrc *types.ImportSource
for _, m := range iss {
if m.Name == types.UserModHandle {
usrSrc = &m
break
}
}
// maps sourceUserID to CortezaID
var uMap map[string]uint64
if usrSrc != nil {
um, mgu, err := importUsers(ctx, usrSrc, ns)
if err != nil {
return err
}
uMap = um
// replace the old source node with the new one (updated data stream)
found := false
for i, m := range iss {
if m.Name == mgu.Name {
iss[i] = *mgu
found = true
break
}
}
if !found {
iss = append(iss, *mgu)
}
}
iss, err = joinData(iss)
if err != nil {
return err
}
// data mapping & graph construction
for _, is := range iss {
nIss, err := mapData(is)
if err != nil {
return err
}
for _, nIs := range nIss {
// preload module
mod, err := svcMod.FindByHandle(ns.ID, nIs.Name)
if err != nil {
preProcW = append(preProcW, err.Error()+" "+nIs.Name)
continue
}
// define headers
r := csv.NewReader(nIs.Source)
var header []string
if nIs.Header != nil {
header = *nIs.Header
} else {
header, err = r.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
}
// create node & push to graph
n := &types.ImportNode{
Name: nIs.Name,
Module: mod,
Namespace: ns,
Reader: r,
Header: header,
Lock: &sync.Mutex{},
FieldMap: nIs.FieldMap,
ValueMap: nIs.ValueMap,
}
n = imp.AddNode(n)
// prepare additional import nodes based on it's record fields
for _, f := range mod.Fields {
if f.Kind == "Record" {
refMod := f.Options["moduleID"]
if refMod == nil {
preProcW = append(preProcW, "moduleField.record.missingRef"+" "+nIs.Name+" "+f.Name)
continue
}
modID, ok := refMod.(string)
if !ok {
preProcW = append(preProcW, "moduleField.record.invalidRefFormat"+" "+nIs.Name+" "+f.Name)
continue
}
vv, err := strconv.ParseUint(modID, 10, 64)
if err != nil {
preProcW = append(preProcW, err.Error())
continue
}
mm, err := svcMod.FindByID(ns.ID, vv)
if err != nil {
preProcW = append(preProcW, err.Error()+" "+nIs.Name+" "+f.Name+" "+modID)
continue
}
nn := &types.ImportNode{
Name: mm.Handle,
Module: mm,
Namespace: ns,
Lock: &sync.Mutex{},
}
nn = imp.AddNode(nn)
n.LinkAdd(nn)
}
}
}
}
spew.Dump("PRE-PROC WARNINGS", preProcW)
imp.RemoveCycles()
// take note of leaf nodes that can be imported right away
for _, n := range imp.nodes {
if !n.HasChildren() {
imp.Leafs = append(imp.Leafs, n)
}
}
fmt.Printf("import.prepared\n")
fmt.Printf("no. of nodes %d\n", len(imp.nodes))
fmt.Printf("no. of entry points %d\n", len(imp.Leafs))
err = imp.Import(ctx, uMap)
if err != nil {
return err
}
return nil
}
// AddNode attempts to add the given node into the graph. If the node can already be
// identified, the two nodes are merged.
func (m *Importer) AddNode(n *types.ImportNode) *types.ImportNode {
var fn *types.ImportNode
for _, nn := range m.nodes {
if nn.CompareTo(n) {
fn = nn
break
}
}
if fn == nil {
m.nodes = append(m.nodes, n)
return n
}
fn.Merge(n)
return fn
}
// RemoveCycles removes all cycles in the given graph, by restructuring/recreating the nodes
// and their dependencies.
func (m *Importer) RemoveCycles() {
splice := func(n *types.ImportNode, from *types.ImportNode) {
spl := n.Splice(from)
m.AddNode(spl)
}
for _, n := range m.nodes {
if !n.Visited {
n.SeekCycles(splice)
}
}
}
// Import runs the import over each ImportNode in the given graph
func (m *Importer) Import(ctx context.Context, users map[string]uint64) error {
fmt.Println("(•_•)")
fmt.Println("(-•_•)>⌐■-■")
fmt.Println("(⌐■_■)")
fmt.Print("\n\n\n")
db := repository.DB(ctx)
repoRecord := repository.Record(ctx, db)
bar := progressbar.New(len(m.nodes))
return db.Transaction(func() (err error) {
for len(m.Leafs) > 0 {
var wg sync.WaitGroup
ch := make(chan types.PostProc, len(m.Leafs))
for _, n := range m.Leafs {
wg.Add(1)
go n.Import(repoRecord, users, &wg, ch, bar)
}
wg.Wait()
var nl []*types.ImportNode
for len(ch) > 0 {
pp := <-ch
if pp.Err != nil {
spew.Dump("ERR", pp.Err, pp.Node.Stringify())
return pp.Err
}
// update the set of available leaf nodes
if pp.Leafs != nil {
for _, n := range pp.Leafs {
for _, l := range nl {
if n.CompareTo(l) {
goto skip
}
}
if n.Satisfied() {
nl = append(nl, n)
}
skip:
}
}
}
m.Leafs = nl
}
fmt.Print("\n\n\n")
fmt.Println("(⌐■_■)")
fmt.Println("(-•_•)>⌐■-■")
fmt.Println("(•_•)")
return nil
})
}

View File

@ -10,11 +10,12 @@ import (
"io/ioutil"
"strings"
"github.com/cortezaproject/corteza-server/pkg/migrate/types"
"github.com/cortezaproject/corteza-server/pkg/ngImporter/types"
)
type (
SplitBuffer struct {
// MapBuffer helps us keep track of new SourceNodes, defined by the map operation.
MapBuffer struct {
buffer *bytes.Buffer
name string
row []string
@ -27,26 +28,31 @@ type (
}
)
// this function splits the stream of the given migrateable node.
// See readme for more info
func splitStream(m types.Migrateable) ([]types.Migrateable, error) {
var rr []types.Migrateable
if m.Map == nil {
rr = append(rr, m)
// maps data from the original ImportSource node into new ImportSource nodes
// based on the provided DataMap.
// Algorithm outline:
// * parse data map
// * for each record in the original import source, based on the map, create new
// MapBuffers
// * create new import sources based on map buffers
func mapData(is types.ImportSource) ([]types.ImportSource, error) {
var rr []types.ImportSource
if is.DataMap == nil {
rr = append(rr, is)
return rr, nil
}
// unpack the map
// @todo provide a better structure!!
var streamMap []map[string]interface{}
src, _ := ioutil.ReadAll(m.Map)
err := json.Unmarshal(src, &streamMap)
var dataMap []map[string]interface{}
src, _ := ioutil.ReadAll(is.DataMap)
err := json.Unmarshal(src, &dataMap)
if err != nil {
return nil, err
}
// get header fields
r := csv.NewReader(m.Source)
r := csv.NewReader(is.Source)
header, err := r.Read()
if err == io.EOF {
return rr, nil
@ -56,16 +62,15 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) {
return nil, err
}
// maps header field -> field index for a nicer lookup
// maps { header field: field index } for a nicer lookup
hMap := make(map[string]int)
for i, h := range header {
hMap[h] = i
}
bufs := make(map[string]*SplitBuffer)
// splitting magic
bufs := make(map[string]*MapBuffer)
// data mapping
for {
record, err := r.Read()
if err == io.EOF {
@ -76,35 +81,35 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) {
return nil, err
}
// on next row, old stream's headers are finished
// on next row, currently acquired headers are marked as final
for _, b := range bufs {
b.hasHeader = true
}
// find first applicable map, that can be used for the given row.
// default maps should not inclide a where field
for _, strmp := range streamMap {
// find applicable maps, that can be used for the given row.
// the system allows composition, so all applicable maps are used.
for _, strmp := range dataMap {
if checkWhere(strmp["where"], record, hMap) {
maps, ok := strmp["map"].([]interface{})
if !ok {
return nil, errors.New("streamMap.invalidMap " + m.Name)
return nil, errors.New("dataMap.invalidMap " + is.Name)
}
// populate splitted streams
// handle current record and it's values
for _, mp := range maps {
mm, ok := mp.(map[string]interface{})
if !ok {
return nil, errors.New("streamMap.map.invalidEntry " + m.Name)
return nil, errors.New("dataMap.map.invalidEntry " + is.Name)
}
from, ok := mm["from"].(string)
if !ok {
return nil, errors.New("streamMap.map.entry.invalidFrom " + m.Name)
return nil, errors.New("dataMap.map.entry.invalidFrom " + is.Name)
}
to, ok := mm["to"].(string)
if !ok {
return nil, errors.New("streamMap.map.invalidTo " + m.Name)
return nil, errors.New("dataMap.map.invalidTo " + is.Name)
}
vv := strings.Split(to, ".")
@ -115,7 +120,7 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) {
var bb bytes.Buffer
ww := csv.NewWriter(&bb)
defer ww.Flush()
bufs[nm] = &SplitBuffer{
bufs[nm] = &MapBuffer{
buffer: &bb,
writer: ww,
name: nm,
@ -125,12 +130,12 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) {
val := record[hMap[from]]
// handle joins
// handle data join
if strings.Contains(from, ".") {
// construct a `alias.joinOnID` value, so we can perform a simple map lookup
pts := strings.Split(from, ".")
baseFieldAlias := pts[0]
originalOn := m.AliasMap[baseFieldAlias]
originalOn := is.AliasMap[baseFieldAlias]
joinField := pts[1]
oo := []string{}
@ -160,15 +165,15 @@ func splitStream(m types.Migrateable) ([]types.Migrateable, error) {
}
}
// make migrateable nodes from the generated streams
// construct output import source nodes
for _, v := range bufs {
rr = append(rr, types.Migrateable{
rr = append(rr, types.ImportSource{
Name: v.name,
Source: v.buffer,
Header: &v.header,
FieldMap: m.FieldMap,
AliasMap: m.AliasMap,
ValueMap: m.ValueMap,
FieldMap: is.FieldMap,
AliasMap: is.AliasMap,
ValueMap: is.ValueMap,
})
}
@ -187,7 +192,7 @@ func checkWhere(where interface{}, row []string, hMap map[string]int) bool {
return true
}
ev, err := types.Exprs().NewEvaluable(ww)
ev, err := types.GLang().NewEvaluable(ww)
if err != nil {
panic(err)
}

View File

@ -0,0 +1,51 @@
package types
import (
"io"
)
type (
// JoinNodeEntry represents a { field: value } map for the given record in the
// join node.
JoinNodeEntry map[string]string
// JoinNodeRecords represents a set of records that should be used together when
// accessing the specified alias.
JoinNodeRecords []JoinNodeEntry
// JoinNode represents an ImportSource that will be used in combination with another,
// to create a record based on multiple sources
JoinNode struct {
Mg *ImportSource
Name string
// Records represents a set of records, available in the given import source.
// [{ field: value }]
Records []map[string]string
}
// ImportSource helps us perform some pre-proc operations before the actual import,
// such as data mapping and source joining.
ImportSource struct {
Name string
Path string
Header *[]string
Source io.Reader
// DataMap allows us to specify what values from the original source should
// map into what fields of what module
DataMap io.Reader
// SourceJoin allows us to specify what import sources should be joined
// when mapping values.
SourceJoin io.Reader
// FieldMap stores records from the joined import source.
// Records are indexed by {alias: [record]}
FieldMap map[string]JoinNodeRecords
// AliasMap helps us determine what fields are stored under the given alias.
AliasMap map[string][]string
// Value Map allows us to map specific values from the given import source into
// a specified value used by Corteza.
ValueMap map[string]map[string]string
}
)

View File

@ -8,8 +8,8 @@ import (
"github.com/PaesslerAG/gval"
)
// generates a simple gval language to be used within the migration
func Exprs() gval.Language {
// Glang generates a gval language, that can be used for expression evaluation
func GLang() gval.Language {
return gval.NewLanguage(
gval.JSON(),
gval.Arithmetic(),
@ -30,12 +30,12 @@ func Exprs() gval.Language {
// diff between two dates in seconds
gval.Function("dateDiff", func(d1, d2 string) (float64, error) {
t1, err := time.Parse(SfDateTime, d1)
t1, err := time.Parse(SfDateTimeLayout, d1)
if err != nil {
return 0, err
}
t2, err := time.Parse(SfDateTime, d2)
t2, err := time.Parse(SfDateTimeLayout, d2)
if err != nil {
return 0, err
}

View File

@ -0,0 +1,43 @@
package types
import "unicode/utf8"
const (
// SfDateTimeLayout represents the date-time template used by sales force
SfDateTimeLayout = "2006-01-02 15:04:05"
// DateOnlyLayout represents our internal date only date-time fields
DateOnlyLayout = "2006-01-02"
// TimeOnlyLayout represents our internal time only date-time fields
TimeOnlyLayout = "15:04:05Z"
// EvalPrefix defines the prefix used by formulas, defined by value mapping
EvalPrefix = "=EVL="
UserModHandle = "User"
)
var (
// ExprLang contains gval language that should be used for any expression evaluation
ExprLang = GLang()
)
type (
// PostProc is used withing channels, used by the import process.
PostProc struct {
// Leafs defines a set of leaf nodes that can be imported next.
Leafs []*ImportNode
// ...
Err error
// Node contains the current node; usefull for debugging
Node *ImportNode
}
)
// helper function for removing invalid UTF runes from the given string.
// SalesForce &nbsp; with a special character, that is not supported in our char set
func fixUtf(r rune) rune {
if r == utf8.RuneError {
return -1
}
return r
}

View File

@ -9,7 +9,6 @@ import (
"strings"
"sync"
"time"
"unicode/utf8"
"github.com/cortezaproject/corteza-server/compose/repository"
"github.com/cortezaproject/corteza-server/compose/types"
@ -17,89 +16,84 @@ import (
)
type (
JoinedNode struct {
Mg *Migrateable
Name string
// id: field: value
Entries []map[string]string
}
// graph node
Node struct {
// unique node name
// ImportNode helps us perform the actual import.
// Multiple ImportNodes define a graph, which helps us with dependency resolution
// and determination of proper import order
ImportNode struct {
// Name is used for unique node identification. It should match the target resource name.
Name string
// keep note of parent refs, so we don't need to inverse it ;)
Parents []*Node
// keep note of Children, as they are our dependencies
Children []*Node
// mapping from migrated IDs to Corteza IDs
mapping map[string]Map
// determines if node is in current path; used for loop detection
// Parents represents the node's parents and the nodes that depend on this node.
Parents []*ImportNode
// Children represents the node's children and this node's dependencies.
Children []*ImportNode
// used for idMap between import source's IDs into CortezaIDs
idMap map[string]Map
// determines if node is in current path; used for cycle detection
inPath bool
// determines if node was spliced; used to break the loop
spliced bool
original *Node
spl *Node
// records are applicable in the case of spliced nodes
// determines if this node was spliced from the original path in order to break the cycle
isSpliced bool
// points to the original node (from spliced)
original *ImportNode
// points to the spliced node (from original)
spliced *ImportNode
// defines the records that were created by the spliced node.
// They are later used to insert missing dependencies.
records []*types.Record
// some refs
// some refs...
Module *types.Module
Namespace *types.Namespace
Reader *csv.Reader
// meta
// some meta...
Header []string
Visited bool
Lock *sync.Mutex
Lock *sync.Mutex
// FieldMap stores records from the joined import source.
// Records are indexed by {alias: [record]}
FieldMap map[string]JoinNodeRecords
// field: recordID: [value]
FieldMap map[string]JoinedNodeRecords
// field: value from: value to
// Value Map allows us to map specific values from the given import source into
// a specified value used by Corteza.
ValueMap map[string]map[string]string
}
// map between migrated ID and Corteza ID
// Map maps between import sourceID -> CortezaID
Map map[string]string
PostProc struct {
Leafs []*Node
Err error
Node *Node
}
)
const (
evalPrefix = "=EVL="
)
// helper, to determine if the two nodes are equal
func (n *Node) Compare(to *Node) bool {
return n.Name == to.Name && n.spliced == to.spliced
// CompareTo compares the two nodes. It uses the name and it's variant
func (n *ImportNode) CompareTo(to *ImportNode) bool {
return n.Name == to.Name && n.isSpliced == to.isSpliced
}
// helper to stringify the node
func (n *Node) Stringify() string {
return fmt.Sprintf("NODE > n: %s; spliced: %t; inPath: %t;", n.Name, n.spliced, n.inPath)
// Stringify stringifies the given node; usefull for debugging
func (n *ImportNode) Stringify() string {
return fmt.Sprintf("NODE > n: %s; spliced: %t", n.Name, n.isSpliced)
}
// adds a new map to the given node
func (n *Node) addMap(key string, m Map) {
// adds a new ID map to the given node's existing ID map
func (n *ImportNode) addMap(key string, m Map) {
n.Lock.Lock()
if n.mapping == nil {
n.mapping = map[string]Map{}
defer n.Lock.Unlock()
if n.idMap == nil {
n.idMap = map[string]Map{}
}
n.mapping[key] = m
n.Lock.Unlock()
n.idMap[key] = m
}
// does the actual data migration for the given node
func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) {
// Import performs the actual data import.
// The algoritem defines two steps:
// * source import,
// * reference correction.
// For details refer to the README.
func (n *ImportNode) Import(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc, bar *progressbar.ProgressBar) {
defer wg.Done()
defer bar.Add(1)
@ -107,12 +101,12 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]
mapping := make(Map)
if n.Reader != nil {
// if records exist (from spliced node); correct refs
if !n.spliced && n.records != nil && len(n.records) > 0 {
// when importing a node that defined a spliced node, we should only correct it's refs
if !n.isSpliced && n.records != nil && len(n.records) > 0 {
// we can just reuse the mapping object, since it will remain the same
mapping = n.mapping[fmt.Sprint(n.Module.ID)]
mapping = n.idMap[fmt.Sprint(n.Module.ID)]
err := updateRefs(n, repoRecord)
err := n.correctRecordRefs(repoRecord)
if err != nil {
ch <- PostProc{
Leafs: nil,
@ -122,7 +116,9 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]
return
}
} else {
mapping, err = importNodeSource(n, users, repoRecord)
// when importing a spliced node or a node that did not define a spliced node, we should
// import it's data
mapping, err = n.importNodeSource(users, repoRecord)
if err != nil {
ch <- PostProc{
Leafs: nil,
@ -134,9 +130,9 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]
}
}
var rtr []*Node
var rtr []*ImportNode
var pps []*Node
var pps []*ImportNode
for _, pp := range n.Parents {
pps = append(pps, pp)
}
@ -159,16 +155,16 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]
// determines if node is Satisfied and can be imported
// it is Satisfied, when all of it's dependencies have been imported ie. no
// more child refs
func (n *Node) Satisfied() bool {
func (n *ImportNode) Satisfied() bool {
return !n.HasChildren()
}
func (n *Node) HasChildren() bool {
func (n *ImportNode) HasChildren() bool {
return n.Children != nil && len(n.Children) > 0
}
// partially Merge the two nodes
func (n *Node) Merge(nn *Node) {
func (n *ImportNode) Merge(nn *ImportNode) {
if nn.Module != nil {
n.Module = nn.Module
}
@ -187,13 +183,13 @@ func (n *Node) Merge(nn *Node) {
}
// link the two nodes
func (n *Node) LinkAdd(to *Node) {
func (n *ImportNode) LinkAdd(to *ImportNode) {
n.addChild(to)
to.addParent(n)
}
// remove the link between the two nodes
func (n *Node) LinkRemove(from *Node) {
func (n *ImportNode) LinkRemove(from *ImportNode) {
n.Lock.Lock()
n.Children = n.removeIfPresent(from, n.Children)
from.Parents = from.removeIfPresent(n, from.Parents)
@ -201,21 +197,21 @@ func (n *Node) LinkRemove(from *Node) {
}
// adds a parent node to the given node
func (n *Node) addParent(add *Node) {
func (n *ImportNode) addParent(add *ImportNode) {
n.Parents = n.addIfMissing(add, n.Parents)
}
// adds a child node to the given node
func (n *Node) addChild(add *Node) {
func (n *ImportNode) addChild(add *ImportNode) {
n.Children = n.addIfMissing(add, n.Children)
}
// adds a node, if it doesn't yet exist
func (n *Node) addIfMissing(add *Node, list []*Node) []*Node {
var fn *Node
func (n *ImportNode) addIfMissing(add *ImportNode, list []*ImportNode) []*ImportNode {
var fn *ImportNode
for _, nn := range list {
if add.Compare(nn) {
if add.CompareTo(nn) {
fn = nn
}
}
@ -228,9 +224,9 @@ func (n *Node) addIfMissing(add *Node, list []*Node) []*Node {
}
// removes the node, if it exists
func (n *Node) removeIfPresent(rem *Node, list []*Node) []*Node {
func (n *ImportNode) removeIfPresent(rem *ImportNode, list []*ImportNode) []*ImportNode {
for i, nn := range list {
if rem.Compare(nn) {
if rem.CompareTo(nn) {
// https://stackoverflow.com/a/37335777
list[len(list)-1], list[i] = list[i], list[len(list)-1]
return list[:len(list)-1]
@ -241,11 +237,11 @@ func (n *Node) removeIfPresent(rem *Node, list []*Node) []*Node {
}
// traverses the graph and notifies us of any cycles
func (n *Node) Traverse(cycle func(n *Node, to *Node)) {
func (n *ImportNode) SeekCycles(cycle func(n *ImportNode, to *ImportNode)) {
n.inPath = true
n.Visited = true
var cc []*Node
var cc []*ImportNode
for _, nn := range n.Children {
cc = append(cc, nn)
}
@ -257,14 +253,14 @@ func (n *Node) Traverse(cycle func(n *Node, to *Node)) {
if nn.inPath {
cycle(n, nn)
} else {
nn.Traverse(cycle)
nn.SeekCycles(cycle)
}
}
n.inPath = false
}
func (n *Node) DFS() {
func (n *ImportNode) DFS() {
n.inPath = true
for _, nn := range n.Children {
@ -277,14 +273,14 @@ func (n *Node) DFS() {
}
// clones the given node
func (n *Node) clone() *Node {
return &Node{
func (n *ImportNode) clone() *ImportNode {
return &ImportNode{
Name: n.Name,
Parents: n.Parents,
Children: n.Children,
mapping: n.mapping,
idMap: n.idMap,
inPath: n.inPath,
spliced: n.spliced,
isSpliced: n.isSpliced,
original: n.original,
records: n.records,
Visited: n.Visited,
@ -296,18 +292,18 @@ func (n *Node) clone() *Node {
}
// splices the node from the original graph and removes the cycle
func (n *Node) Splice(from *Node) *Node {
splicedN := from.spl
func (n *ImportNode) Splice(from *ImportNode) *ImportNode {
splicedN := from.spliced
if splicedN == nil {
splicedN = from.clone()
splicedN.spliced = true
splicedN.isSpliced = true
splicedN.Parents = nil
splicedN.Children = nil
splicedN.inPath = false
splicedN.original = from
from.spl = splicedN
from.spliced = splicedN
from.LinkAdd(splicedN)
}
@ -318,7 +314,8 @@ func (n *Node) Splice(from *Node) *Node {
return splicedN
}
func sysField(f string) bool {
// helper to determine if this is a system field
func isSysField(f string) bool {
switch f {
case "OwnerId",
"IsDeleted",
@ -331,8 +328,8 @@ func sysField(f string) bool {
return false
}
func updateRefs(n *Node, repo repository.RecordRepository) error {
// correct references
// updates the given node's record values that depend on another record
func (n *ImportNode) correctRecordRefs(repo repository.RecordRepository) error {
for _, r := range n.records {
for _, v := range r.Values {
var f *types.ModuleField
@ -356,7 +353,9 @@ func updateRefs(n *Node, repo repository.RecordRepository) error {
return errors.New("moduleField.record.invalidRefFormat")
}
if mod, ok := n.mapping[ref]; ok {
// in case of a missing ref, make sure to remove the reference.
// otherwise this will cause internal errors when trying to resolve CortezaID.
if mod, ok := n.idMap[ref]; ok {
if vv, ok := mod[val]; ok {
v.Value = vv
} else {
@ -370,7 +369,7 @@ func updateRefs(n *Node, repo repository.RecordRepository) error {
}
}
// update values
// update values; skip out empty values
nv := types.RecordValueSet{}
for _, v := range r.Values {
if v.Value != "" {
@ -387,18 +386,10 @@ func updateRefs(n *Node, repo repository.RecordRepository) error {
return nil
}
func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRepository) (Map, error) {
// imports the given node's source
func (n *ImportNode) importNodeSource(users map[string]uint64, repo repository.RecordRepository) (Map, error) {
mapping := make(Map)
fixUtf := func(r rune) rune {
if r == utf8.RuneError {
return -1
}
return r
}
lng := Exprs()
for {
looper:
record, err := n.Reader.Read()
@ -416,18 +407,22 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
CreatedAt: time.Now(),
}
vals := types.RecordValueSet{}
recordValues := types.RecordValueSet{}
// convert the given row into a { field: value } map; this will be used
// for expression evaluation
row := map[string]string{}
for i, h := range n.Header {
row[h] = record[i]
}
for i, h := range n.Header {
// will contain string values for the given field
var values []string
val := record[i]
if sysField(h) {
// system values should be kept on the record's root level
if isSysField(h) {
switch h {
case "OwnerId":
rr.OwnedBy = users[val]
@ -442,7 +437,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
case "CreatedDate":
if val != "" {
rr.CreatedAt, err = time.Parse(SfDateTime, val)
rr.CreatedAt, err = time.Parse(SfDateTimeLayout, val)
if err != nil {
return nil, err
}
@ -459,7 +454,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
case "LastModifiedDate":
if val != "" {
tt, err := time.Parse(SfDateTime, val)
tt, err := time.Parse(SfDateTimeLayout, val)
rr.UpdatedAt = &tt
if err != nil {
return nil, err
@ -468,6 +463,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
break
}
} else {
// other user defined values should be kept inside `values`
joined := ""
if strings.Contains(h, ":") {
pts := strings.Split(h, ":")
@ -475,6 +471,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
joined = pts[1]
}
// find corresponding field
var f *types.ModuleField
for _, ff := range n.Module.Fields {
if ff.Name == h {
@ -487,29 +484,29 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
continue
}
// simple hack to fully support multiple values from a joined node
vvs := make([]string, 0)
// check if joinable
// temp set of raw values that should be processed further.
// this gives us support for multi value fields when joining a sources
rawValues := make([]string, 0)
if joined != "" {
tmp := n.FieldMap[val]
for _, e := range tmp {
vvs = append(vvs, e[joined])
rawValues = append(rawValues, e[joined])
}
} else {
vvs = []string{val}
rawValues = []string{val}
}
for _, val := range vvs {
for _, val := range rawValues {
// handle references. Spliced nodes should not perform this step, since
// they can't rely on any dependency. This is corrected with `correctRecordRefs`
if f.Options["moduleID"] != nil {
// spliced nodes should NOT manage their references
if !n.spliced {
if !n.isSpliced {
ref, ok := f.Options["moduleID"].(string)
if !ok {
return nil, errors.New("moduleField.record.invalidRefFormat")
}
if mod, ok := n.mapping[ref]; ok && val != "" {
if mod, ok := n.idMap[ref]; ok && val != "" {
if v, ok := mod[val]; ok && v != "" {
val = v
} else {
@ -521,6 +518,7 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
}
values = append(values, val)
} else if f.Kind == "User" {
// handle user references
if u, ok := users[val]; ok {
val = fmt.Sprint(u)
} else {
@ -528,58 +526,30 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
}
values = append(values, val)
} else {
// generic value handling
val = strings.Map(fixUtf, val)
if val == "" {
continue
}
// assure date time formatting
if f.Kind == "DateTime" {
pvl, err := time.Parse(SfDateTime, val)
val, err = assureDateFormat(val, f.Options)
if err != nil {
return nil, err
}
if f.Options.Bool("onlyDate") {
val = pvl.Format("2006-01-02")
} else if f.Options.Bool("onlyTime") {
val = pvl.Format("15:04:05Z")
} else {
val = pvl.Format(time.RFC3339)
}
}
values = append(values, val)
}
}
// value post-proc & record value creation
for i, v := range values {
if fmp, ok := n.ValueMap[h]; ok {
nvl := ""
if mpv, ok := fmp[v]; ok {
nvl = mpv
} else if mpv, ok := fmp["*"]; ok {
nvl = mpv
}
if nvl != "" && strings.HasPrefix(nvl, evalPrefix) {
opp := nvl[len(evalPrefix):len(nvl)]
ev, err := lng.NewEvaluable(opp)
if err != nil {
return nil, err
}
v, err = ev.EvalString(context.Background(), map[string]interface{}{"cell": v, "row": row})
if err != nil {
return nil, err
}
} else if nvl != "" {
v = nvl
}
v, err = n.mapValue(h, v, row)
if err != nil {
return nil, err
}
vals = append(vals, &types.RecordValue{
recordValues = append(recordValues, &types.RecordValue{
Name: h,
Value: v,
Place: uint(i),
@ -595,22 +565,68 @@ func importNodeSource(n *Node, users map[string]uint64, repo repository.RecordRe
}
// update record values with recordID
for _, v := range vals {
for _, v := range recordValues {
v.RecordID = r.ID
}
err = repo.UpdateValues(r.ID, vals)
err = repo.UpdateValues(r.ID, recordValues)
if err != nil {
return nil, err
}
// spliced nodes should preserve their records for later ref processing
if n.spliced {
rr.Values = vals
if n.isSpliced {
rr.Values = recordValues
n.original.records = append(n.original.records, rr)
}
// update mapping map
mapping[record[0]] = fmt.Sprint(rr.ID)
}
return mapping, nil
}
func (n *ImportNode) mapValue(field, val string, row map[string]string) (string, error) {
if fmp, ok := n.ValueMap[field]; ok {
nvl := ""
if mpv, ok := fmp[val]; ok {
nvl = mpv
} else if mpv, ok := fmp["*"]; ok {
nvl = mpv
}
// expression evaluation
if nvl != "" && strings.HasPrefix(nvl, EvalPrefix) {
opp := nvl[len(EvalPrefix):len(nvl)]
ev, err := ExprLang.NewEvaluable(opp)
if err != nil {
return "", err
}
val, err = ev.EvalString(context.Background(), map[string]interface{}{"cell": val, "row": row})
if err != nil {
return "", err
}
} else if nvl != "" {
val = nvl
}
}
return val, nil
}
// helper to assure correct date time formatting
func assureDateFormat(val string, opt types.ModuleFieldOptions) (string, error) {
pvl, err := time.Parse(SfDateTimeLayout, val)
if err != nil {
return "", err
}
if opt.Bool("onlyDate") {
val = pvl.Format(DateOnlyLayout)
} else if opt.Bool("onlyTime") {
val = pvl.Format(TimeOnlyLayout)
} else {
val = pvl.Format(time.RFC3339)
}
return val, nil
}