3
0

Improve thread syncing

This commit is contained in:
Tomaž Jerman 2020-03-01 20:51:04 +01:00
parent 37a669125e
commit 8b82eaee1d
2 changed files with 34 additions and 27 deletions

View File

@ -186,43 +186,44 @@ func (m *Migrator) Migrate(ctx context.Context, users map[string]uint64) error {
db := repository.DB(ctx)
repoRecord := repository.Record(ctx, db)
for len(m.Leafs) > 0 {
var wg sync.WaitGroup
for len(m.Leafs) > 0 {
var wg sync.WaitGroup
ch := make(chan types.PostProc, len(m.Leafs))
for _, n := range m.Leafs {
wg.Add(1)
ch := make(chan types.PostProc, len(m.Leafs))
for _, n := range m.Leafs {
wg.Add(1)
// migrate & update leaf nodes
go n.Migrate(repoRecord, users, &wg, ch)
}
wg.Wait()
// var add []*types.Node
for len(ch) > 0 {
pp := <-ch
if pp.Err != nil {
return pp.Err
// migrate & update leaf nodes
go n.Migrate(repoRecord, users, &wg, ch)
}
wg.Wait()
var nl []*types.Node
for _, n := range pp.Leafs {
for _, l := range nl {
if n.Compare(l) {
goto skip
for len(ch) > 0 {
pp := <-ch
if pp.Err != nil {
return pp.Err
}
if pp.Leafs != nil {
for _, n := range pp.Leafs {
for _, l := range nl {
if n.Compare(l) {
goto skip
}
}
if n.Satisfied() {
nl = append(nl, n)
}
skip:
}
}
if n.Satisfied() {
nl = append(nl, n)
}
skip:
}
}
m.Leafs = nl
}
}
return nil
}

View File

@ -41,6 +41,8 @@ type (
// meta
Header []string
Visited bool
Lock *sync.Mutex
}
// map between migrated ID and Corteza ID
@ -64,11 +66,13 @@ func (n *Node) Stringify() string {
// adds a new map to the given node
func (n *Node) addMap(key string, m Map) {
n.Lock.Lock()
if n.mapping == nil {
n.mapping = map[string]Map{}
}
n.mapping[key] = m
n.Lock.Unlock()
}
// does the actual data migration for the given node
@ -158,8 +162,10 @@ func (n *Node) LinkAdd(to *Node) {
// remove the link between the two nodes
func (n *Node) LinkRemove(from *Node) {
n.Lock.Lock()
n.Children = n.removeIfPresent(from, n.Children)
from.Parents = from.removeIfPresent(n, from.Parents)
n.Lock.Unlock()
}
// adds a parent node to the given node