3
0

Paralelize the migration system

This commit is contained in:
Tomaž Jerman
2020-02-28 22:02:54 +01:00
parent 18ad7dea98
commit b781151aa9
2 changed files with 46 additions and 23 deletions

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"strconv"
"sync"
"time"
"github.com/cortezaproject/corteza-server/compose/repository"
@@ -186,37 +187,40 @@ func (m *Migrator) Migrate(ctx context.Context, users map[string]uint64) error {
repoRecord := repository.Record(ctx, db)
for len(m.Leafs) > 0 {
for i := len(m.Leafs) - 1; i >= 0; i-- {
n := m.Leafs[i]
var wg sync.WaitGroup
ch := make(chan types.PostProc, len(m.Leafs))
for _, n := range m.Leafs {
wg.Add(1)
// migrate & update leaf nodes
add, err := n.Migrate(repoRecord, users)
if err != nil {
return err
go n.Migrate(repoRecord, users, &wg, ch)
}
wg.Wait()
// var add []*types.Node
for len(ch) > 0 {
pp := <-ch
if pp.Err != nil {
return pp.Err
}
// this will maintain order
copy(m.Leafs[i:], m.Leafs[i+1:])
m.Leafs = m.Leafs[:len(m.Leafs)-1]
// update leaf nodes (entry points)
// take care to not duplicate the given nodes.
// That would be a bit not optimal :)
for _, a := range add {
for _, n := range m.Leafs {
if a.Compare(n) {
var nl []*types.Node
for _, n := range pp.Leafs {
for _, l := range nl {
if n.Compare(l) {
goto skip
}
}
// only include satisfied nodes, as only they can be processed
if a.Satisfied() {
m.Leafs = append(m.Leafs, a)
if n.Satisfied() {
nl = append(nl, n)
}
skip:
}
m.Leafs = nl
}
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/cortezaproject/corteza-server/compose/repository"
@@ -44,6 +45,11 @@ type (
// map between migrated ID and Corteza ID
Map map[string]string
PostProc struct {
Leafs []*Node
Err error
}
)
// helper, to determine if the two nodes are equal
@@ -66,7 +72,9 @@ func (n *Node) addMap(key string, m Map) {
}
// does the actual data migration for the given node
func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]uint64) ([]*Node, error) {
func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]uint64, wg *sync.WaitGroup, ch chan PostProc) {
defer wg.Done()
fmt.Printf("node.migrate > %s\n", n.Stringify())
var err error
@@ -80,14 +88,22 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]
err := updateRefs(n, repoRecord)
if err != nil {
return nil, err
ch <- PostProc{
Leafs: nil,
Err: err,
}
return
}
fmt.Printf("node.refs.update.done\n")
} else {
fmt.Printf("node.migrate.source\n")
mapping, err = importNodeSource(n, users, repoRecord)
if err != nil {
return nil, err
ch <- PostProc{
Leafs: nil,
Err: err,
}
return
}
fmt.Printf("node.migrate.source.done\n")
}
@@ -104,7 +120,10 @@ func (n *Node) Migrate(repoRecord repository.RecordRepository, users map[string]
p.LinkRemove(n)
}
return rtr, nil
ch <- PostProc{
Leafs: rtr,
Err: nil,
}
}
// determines if node is Satisfied and can be imported