From 8b82eaee1df30ef1a3ad07cc16897cf8b288f213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toma=C5=BE=20Jerman?= Date: Sun, 1 Mar 2020 20:51:04 +0100 Subject: [PATCH] Improve thread syncing --- pkg/migrate/main.go | 55 ++++++++++++++++++++------------------- pkg/migrate/types/node.go | 6 +++++ 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/pkg/migrate/main.go b/pkg/migrate/main.go index 3a0991047..d1fa1ba35 100644 --- a/pkg/migrate/main.go +++ b/pkg/migrate/main.go @@ -186,43 +186,44 @@ func (m *Migrator) Migrate(ctx context.Context, users map[string]uint64) error { db := repository.DB(ctx) repoRecord := repository.Record(ctx, db) - for len(m.Leafs) > 0 { - var wg sync.WaitGroup + for len(m.Leafs) > 0 { + var wg sync.WaitGroup - ch := make(chan types.PostProc, len(m.Leafs)) - for _, n := range m.Leafs { - wg.Add(1) + ch := make(chan types.PostProc, len(m.Leafs)) + for _, n := range m.Leafs { + wg.Add(1) - // migrate & update leaf nodes - go n.Migrate(repoRecord, users, &wg, ch) - } - - wg.Wait() - - // var add []*types.Node - for len(ch) > 0 { - pp := <-ch - if pp.Err != nil { - return pp.Err + // migrate & update leaf nodes + go n.Migrate(repoRecord, users, &wg, ch) } + wg.Wait() + var nl []*types.Node - for _, n := range pp.Leafs { - for _, l := range nl { - if n.Compare(l) { - goto skip + for len(ch) > 0 { + pp := <-ch + if pp.Err != nil { + return pp.Err + } + + if pp.Leafs != nil { + for _, n := range pp.Leafs { + for _, l := range nl { + if n.Compare(l) { + goto skip + } + } + if n.Satisfied() { + nl = append(nl, n) + } + + skip: } - } - if n.Satisfied() { - nl = append(nl, n) - } - skip: + } } - m.Leafs = nl } - } return nil } diff --git a/pkg/migrate/types/node.go b/pkg/migrate/types/node.go index aec2a41b3..6dd04264e 100644 --- a/pkg/migrate/types/node.go +++ b/pkg/migrate/types/node.go @@ -41,6 +41,8 @@ type ( // meta Header []string Visited bool + + Lock *sync.Mutex } // map between migrated ID and Corteza ID @@ -64,11 +66,13 @@ func (n *Node) Stringify() string { // adds a new map to the given node func (n *Node) addMap(key string, m Map) { + n.Lock.Lock() if n.mapping == nil { n.mapping = map[string]Map{} } n.mapping[key] = m + n.Lock.Unlock() } // does the actual data migration for the given node @@ -158,8 +162,10 @@ func (n *Node) LinkAdd(to *Node) { // remove the link between the two nodes func (n *Node) LinkRemove(from *Node) { + n.Lock.Lock() n.Children = n.removeIfPresent(from, n.Children) from.Parents = from.removeIfPresent(n, from.Parents) + n.Lock.Unlock() } // adds a parent node to the given node