Updated nodes sync filtering
This commit is contained in:
parent
f987586b3e
commit
bc571195cd
@ -10,6 +10,7 @@ import (
|
||||
cs "github.com/cortezaproject/corteza-server/compose/service"
|
||||
ct "github.com/cortezaproject/corteza-server/compose/types"
|
||||
"github.com/cortezaproject/corteza-server/federation/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/filter"
|
||||
"github.com/cortezaproject/corteza-server/system/service"
|
||||
ss "github.com/cortezaproject/corteza-server/system/service"
|
||||
st "github.com/cortezaproject/corteza-server/system/types"
|
||||
@ -43,7 +44,6 @@ func NewSync(s *Syncer, m *Mapper, sm SharedModuleService, cs cs.RecordService,
|
||||
// TODO - check if any of the newly missing fields are actually being used so a safe update
|
||||
// is possible
|
||||
func (s *Sync) CanUpdateSharedModule(ctx context.Context, new *types.SharedModule, existing *types.SharedModule) (bool, error) {
|
||||
|
||||
// check for mapped fields
|
||||
fstr, err := json.Marshal(new.Fields)
|
||||
f2str, err := json.Marshal(existing.Fields)
|
||||
@ -60,7 +60,7 @@ func (s *Sync) CanUpdateSharedModule(ctx context.Context, new *types.SharedModul
|
||||
}
|
||||
|
||||
// ProcessPayload passes the payload to the syncer lib
|
||||
func (s *Sync) ProcessPayload(ctx context.Context, payload []byte, out chan Url, url types.SyncerURI, processer Processer) (int, error) {
|
||||
func (s *Sync) ProcessPayload(ctx context.Context, payload []byte, out chan Url, url types.SyncerURI, processer Processer) (ProcesserResponse, error) {
|
||||
return s.syncer.Process(ctx, payload, out, url, processer)
|
||||
}
|
||||
|
||||
@ -141,6 +141,36 @@ func (s *Sync) PrepareModuleMappings(ctx context.Context, mappings *types.Module
|
||||
return s.mapper.Prepare((*mappings).FieldMapping), nil
|
||||
}
|
||||
|
||||
func (s *Sync) GetLastStructureSyncStatus(ctx context.Context, nodeID, externalFederationModuleID uint64) (syncStatus string, err error) {
|
||||
var list types.NodeSyncSet
|
||||
|
||||
list, _, err = DefaultNodeSync.Search(ctx, types.NodeSyncFilter{
|
||||
NodeID: nodeID,
|
||||
ModuleID: externalFederationModuleID,
|
||||
SyncType: types.NodeSyncTypeStructure,
|
||||
Sorting: filter.Sorting{
|
||||
Sort: filter.SortExprSet{
|
||||
&filter.SortExpr{Column: "time_action", Descending: true},
|
||||
},
|
||||
},
|
||||
Paging: filter.Paging{Limit: 1},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
syncStatus = types.NodeSyncStatusSuccess
|
||||
return
|
||||
}
|
||||
|
||||
if len(list) == 0 {
|
||||
syncStatus = types.NodeSyncStatusSuccess
|
||||
return
|
||||
}
|
||||
|
||||
syncStatus = list[0].SyncStatus
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Sync) GetLastSyncTime(ctx context.Context, nodeID uint64, syncType string) (*time.Time, error) {
|
||||
ns, err := DefaultNodeSync.LookupLastSuccessfulSync(ctx, nodeID, syncType)
|
||||
|
||||
|
||||
@ -12,6 +12,10 @@ func (s Store) convertFederationNodesSyncFilter(f types.NodeSyncFilter) (query s
|
||||
query = query.Where("fdns.rel_node = ?", f.NodeID)
|
||||
}
|
||||
|
||||
if f.ModuleID > 0 {
|
||||
query = query.Where("fdns.rel_module = ?", f.ModuleID)
|
||||
}
|
||||
|
||||
if f.SyncStatus != "" {
|
||||
query = query.Where("fdns.sync_status = ?", f.SyncStatus)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user