From bc571195cdbaab47f772b7dcf68d03cb11595b01 Mon Sep 17 00:00:00 2001 From: Peter Grlica Date: Tue, 24 Nov 2020 11:38:10 +0100 Subject: [PATCH] Updated nodes sync filtering --- federation/service/sync.go | 34 ++++++++++++++++++++++++++-- store/rdbms/federation_nodes_sync.go | 4 ++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/federation/service/sync.go b/federation/service/sync.go index 9fa4f4712..30613887b 100644 --- a/federation/service/sync.go +++ b/federation/service/sync.go @@ -10,6 +10,7 @@ import ( cs "github.com/cortezaproject/corteza-server/compose/service" ct "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/federation/types" + "github.com/cortezaproject/corteza-server/pkg/filter" "github.com/cortezaproject/corteza-server/system/service" ss "github.com/cortezaproject/corteza-server/system/service" st "github.com/cortezaproject/corteza-server/system/types" @@ -43,7 +44,6 @@ func NewSync(s *Syncer, m *Mapper, sm SharedModuleService, cs cs.RecordService, // TODO - check if any of the newly missing fields are actually being used so a safe update // is possible func (s *Sync) CanUpdateSharedModule(ctx context.Context, new *types.SharedModule, existing *types.SharedModule) (bool, error) { - // check for mapped fields fstr, err := json.Marshal(new.Fields) f2str, err := json.Marshal(existing.Fields) @@ -60,7 +60,7 @@ func (s *Sync) CanUpdateSharedModule(ctx context.Context, new *types.SharedModul } // ProcessPayload passes the payload to the syncer lib -func (s *Sync) ProcessPayload(ctx context.Context, payload []byte, out chan Url, url types.SyncerURI, processer Processer) (int, error) { +func (s *Sync) ProcessPayload(ctx context.Context, payload []byte, out chan Url, url types.SyncerURI, processer Processer) (ProcesserResponse, error) { return s.syncer.Process(ctx, payload, out, url, processer) } @@ -141,6 +141,36 @@ func (s *Sync) PrepareModuleMappings(ctx context.Context, mappings *types.Module return s.mapper.Prepare((*mappings).FieldMapping), nil } +func (s *Sync) GetLastStructureSyncStatus(ctx context.Context, nodeID, externalFederationModuleID uint64) (syncStatus string, err error) { + var list types.NodeSyncSet + + list, _, err = DefaultNodeSync.Search(ctx, types.NodeSyncFilter{ + NodeID: nodeID, + ModuleID: externalFederationModuleID, + SyncType: types.NodeSyncTypeStructure, + Sorting: filter.Sorting{ + Sort: filter.SortExprSet{ + &filter.SortExpr{Column: "time_action", Descending: true}, + }, + }, + Paging: filter.Paging{Limit: 1}, + }) + + if err != nil { + syncStatus = types.NodeSyncStatusSuccess + return + } + + if len(list) == 0 { + syncStatus = types.NodeSyncStatusSuccess + return + } + + syncStatus = list[0].SyncStatus + + return +} + func (s *Sync) GetLastSyncTime(ctx context.Context, nodeID uint64, syncType string) (*time.Time, error) { ns, err := DefaultNodeSync.LookupLastSuccessfulSync(ctx, nodeID, syncType) diff --git a/store/rdbms/federation_nodes_sync.go b/store/rdbms/federation_nodes_sync.go index 2d25445ec..7f01cefcf 100644 --- a/store/rdbms/federation_nodes_sync.go +++ b/store/rdbms/federation_nodes_sync.go @@ -12,6 +12,10 @@ func (s Store) convertFederationNodesSyncFilter(f types.NodeSyncFilter) (query s query = query.Where("fdns.rel_node = ?", f.NodeID) } + if f.ModuleID > 0 { + query = query.Where("fdns.rel_module = ?", f.ModuleID) + } + if f.SyncStatus != "" { query = query.Where("fdns.sync_status = ?", f.SyncStatus) }