From 0db26041c044a0b2854952fffb54853277ffbadc Mon Sep 17 00:00:00 2001 From: Peter Grlica Date: Thu, 10 Dec 2020 18:45:52 +0100 Subject: [PATCH] Added data sync update functionality --- Makefile | 5 +- federation/service/exposed_module.go | 4 +- federation/service/module_mapping.go | 4 +- federation/service/node.go | 2 +- federation/service/processer_data.go | 60 ++++++++++++++++++++--- federation/service/processer_data_test.go | 39 ++++++++++++++- federation/service/service.go | 8 +-- federation/service/sync.go | 14 ++++++ 8 files changed, 118 insertions(+), 18 deletions(-) diff --git a/Makefile b/Makefile index ac6f6af97..3c3d73322 100644 --- a/Makefile +++ b/Makefile @@ -34,12 +34,13 @@ COVER_FLAGS ?= -covermode=$(COVER_MODE) -coverprofile=$(COVER_PROFILE) COVER_PKGS_messaging = ./messaging/... COVER_PKGS_system = ./system/... COVER_PKGS_compose = ./compose/... +COVER_PKGS_federation = ./federation/... COVER_PKGS_pkg = ./pkg/... -COVER_PKGS_all = $(COVER_PKGS_pkg),$(COVER_PKGS_messaging),$(COVER_PKGS_system),$(COVER_PKGS_compose) +COVER_PKGS_all = $(COVER_PKGS_pkg),$(COVER_PKGS_messaging),$(COVER_PKGS_system),$(COVER_PKGS_compose),$(COVER_PKGS_federation) COVER_PKGS_integration = $(COVER_PKGS_all) TEST_SUITE_pkg = ./pkg/... -TEST_SUITE_services = ./compose/... ./messaging/... ./system/... +TEST_SUITE_services = ./compose/... ./messaging/... ./system/... ./federation/... TEST_SUITE_unit = $(TEST_SUITE_pkg) $(TEST_SUITE_services) TEST_SUITE_integration = ./tests/... TEST_SUITE_store = ./store/tests/... diff --git a/federation/service/exposed_module.go b/federation/service/exposed_module.go index 0944c4b22..6f87cc1b5 100644 --- a/federation/service/exposed_module.go +++ b/federation/service/exposed_module.go @@ -123,7 +123,7 @@ func (svc exposedModule) Update(ctx context.Context, updated *types.ExposedModul updated.UpdatedBy = auth.GetIdentityFromContext(ctx).Identity() // set labels - AddFederationLabel(m, node.BaseURL) + AddFederationLabel(m, "federation", node.BaseURL) if _, err := svc.module.With(ctx).Update(m); err != nil { return err @@ -267,7 +267,7 @@ func (svc exposedModule) Create(ctx context.Context, new *types.ExposedModule) ( } // set labels - AddFederationLabel(m, node.BaseURL) + AddFederationLabel(m, "federation", node.BaseURL) if _, err := svc.module.With(ctx).Update(m); err != nil { return err diff --git a/federation/service/module_mapping.go b/federation/service/module_mapping.go index 383f100f2..a94a6c5fe 100644 --- a/federation/service/module_mapping.go +++ b/federation/service/module_mapping.go @@ -141,7 +141,7 @@ func (svc moduleMapping) Create(ctx context.Context, new *types.ModuleMapping) ( } // set labels - AddFederationLabel(m, "") + AddFederationLabel(m, "federation", "") if _, err := svc.module.With(ctx).Update(m); err != nil { return err @@ -185,7 +185,7 @@ func (svc moduleMapping) Update(ctx context.Context, updated *types.ModuleMappin } // set labels - AddFederationLabel(m, "") + AddFederationLabel(m, "federation", "") if _, err := svc.module.With(ctx).Update(m); err != nil { return err diff --git a/federation/service/node.go b/federation/service/node.go index 8bbfce28d..c4c7e9c6c 100644 --- a/federation/service/node.go +++ b/federation/service/node.go @@ -475,7 +475,7 @@ func (svc node) fetchFederatedUser(ctx context.Context, n *types.Node) (*sysType Handle: uHandle, } - AddFederationLabel(user, n.BaseURL) + AddFederationLabel(user, "federation", n.BaseURL) // Create a user to service this node r, err := service.DefaultRole.With(ctx).FindByName("federation") diff --git a/federation/service/processer_data.go b/federation/service/processer_data.go index 67ba67e10..c6302100a 100644 --- a/federation/service/processer_data.go +++ b/federation/service/processer_data.go @@ -2,6 +2,7 @@ package service import ( "context" + "fmt" ct "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/federation/types" @@ -32,6 +33,11 @@ type ( // uses the decode package to decode the whole set, depending on // the filtering that was used (limit) func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (ProcesserResponse, error) { + var ( + rec *ct.Record + err error + ) + processed := 0 o, err := decoder.DecodeFederationRecordSync([]byte(payload)) @@ -52,15 +58,40 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (Processer for _, er := range o { dp.SyncService.mapper.Merge(&er.Values, dp.ModuleMappingValues, dp.ModuleMappings) - rec := &ct.Record{ - ModuleID: dp.ComposeModuleID, - NamespaceID: dp.ComposeNamespaceID, - Values: *dp.ModuleMappingValues, + if er.UpdatedAt != nil { + + rec, err = dp.findRecordByFederationID(ctx, er.ID, dp.ComposeModuleID, dp.ComposeNamespaceID) + + if err != nil { + // could not find existing record + continue + } + + if rec != nil { + rec.Values = *dp.ModuleMappingValues + } } - AddFederationLabel(rec, dp.NodeBaseURL) + // if the record was updated on origin, but we somehow do not have it + // create it anyway + if rec == nil { - _, err := dp.SyncService.CreateRecord(ctx, rec) + rec = &ct.Record{ + ModuleID: dp.ComposeModuleID, + NamespaceID: dp.ComposeNamespaceID, + Values: *dp.ModuleMappingValues, + } + + AddFederationLabel(rec, "federation", dp.NodeBaseURL) + AddFederationLabel(rec, "federation_extrecord", fmt.Sprintf("%d", er.ID)) + + } + + if rec.ID != 0 { + _, err = dp.SyncService.UpdateRecord(ctx, rec) + } else { + _, err = dp.SyncService.CreateRecord(ctx, rec) + } if err != nil { continue @@ -73,3 +104,20 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (Processer Processed: processed, }, nil } + +// findRecordByFederationID finds any already existing records via +// federation label +func (dp *dataProcesser) findRecordByFederationID(ctx context.Context, recordID, moduleID, namespaceID uint64) (r *ct.Record, err error) { + filter := ct.RecordFilter{ + NamespaceID: namespaceID, + ModuleID: moduleID, + Labels: map[string]string{"federation_extrecord": fmt.Sprintf("%d", recordID)}} + + if s, err := dp.SyncService.FindRecords(ctx, filter); err == nil { + if len(s) == 1 { + r = s[0] + } + } + + return +} diff --git a/federation/service/processer_data_test.go b/federation/service/processer_data_test.go index ddac8583a..d712c1cc3 100644 --- a/federation/service/processer_data_test.go +++ b/federation/service/processer_data_test.go @@ -25,6 +25,9 @@ type ( testRecordServicePersistSuccess struct { cs.RecordService } + testRecordServiceUpdateSuccess struct { + cs.RecordService + } testRecordServicePersistError struct { cs.RecordService } @@ -50,7 +53,7 @@ func TestProcesserData_persist(t *testing.T) { }{ { "successful persist on valid mapping", - `{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}]}]}}`, + `{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}],"createdAt":"2020-12-05T10:10:10Z"}]}}`, `[{"origin":{"kind":"String","name":"Description","label":"Description","isMulti":false},"destination":{"kind":"String","name":"Name","label":"Description","isMulti":false}},{"origin":{"kind":"Url","name":"Facebook","label":"Facebook","isMulti":false},"destination":{"kind":"Url","name":"Fb","label":"Facebook","isMulti":false}}]`, 1, "", @@ -63,6 +66,21 @@ func TestProcesserData_persist(t *testing.T) { &testUserService{}, &testRoleService{}), }, + { + "successful update on valid mapping and existing federated record", + `{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}],"createdAt":"2020-12-05T10:10:10Z", "updatedAt":"2020-12-06T10:10:10Z"}]}}`, + `[{"origin":{"kind":"String","name":"Description","label":"Description","isMulti":false},"destination":{"kind":"String","name":"Name","label":"Description","isMulti":false}},{"origin":{"kind":"Url","name":"Facebook","label":"Facebook","isMulti":false},"destination":{"kind":"Url","name":"Fb","label":"Facebook","isMulti":false}}]`, + 1, + "", + &ct.RecordValueSet{&ct.RecordValue{Name: "Fb", Value: ""}}, + NewSync( + &Syncer{}, + &Mapper{}, + &testSharedModuleService{}, + &testRecordServiceUpdateSuccess{}, + &testUserService{}, + &testRoleService{}), + }, { "persist error on valid mapping", `{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}]}]}}`, @@ -161,14 +179,33 @@ func TestProcesserData_persist(t *testing.T) { } } +// create success func (s testRecordServicePersistSuccess) Create(record *ct.Record) (*ct.Record, error) { return nil, nil } +func (s testRecordServicePersistSuccess) Find(filter ct.RecordFilter) (ct.RecordSet, ct.RecordFilter, error) { + return ct.RecordSet{}, ct.RecordFilter{}, nil +} + func (s testRecordServicePersistSuccess) With(_ context.Context) cs.RecordService { return &testRecordServicePersistSuccess{} } +// update success +func (s testRecordServiceUpdateSuccess) Update(record *ct.Record) (*ct.Record, error) { + return nil, nil +} + +func (s testRecordServiceUpdateSuccess) Find(filter ct.RecordFilter) (ct.RecordSet, ct.RecordFilter, error) { + return ct.RecordSet{&ct.Record{ID: 2}}, ct.RecordFilter{}, nil +} + +func (s testRecordServiceUpdateSuccess) With(_ context.Context) cs.RecordService { + return &testRecordServiceUpdateSuccess{} +} + +// create error func (s testRecordServicePersistError) Create(record *ct.Record) (*ct.Record, error) { return nil, errors.New("mocked error") } diff --git a/federation/service/service.go b/federation/service/service.go index 67734e015..dee1bb564 100644 --- a/federation/service/service.go +++ b/federation/service/service.go @@ -162,15 +162,15 @@ func Watchers(ctx context.Context) { go syncStructure.Watch( ctx, - time.Second*DefaultOptions.StructureMonitorInterval, + DefaultOptions.StructureMonitorInterval, DefaultOptions.StructurePageSize) go syncData.Watch( ctx, - time.Second*DefaultOptions.DataMonitorInterval, + DefaultOptions.DataMonitorInterval, DefaultOptions.DataPageSize) } -func AddFederationLabel(entity label.LabeledResource, value string) { - entity.SetLabel("federation", value) +func AddFederationLabel(entity label.LabeledResource, key string, value string) { + entity.SetLabel(key, value) } diff --git a/federation/service/sync.go b/federation/service/sync.go index 4840552a1..b1a9777ad 100644 --- a/federation/service/sync.go +++ b/federation/service/sync.go @@ -80,6 +80,17 @@ func (s *Sync) CreateRecord(ctx context.Context, rec *ct.Record) (*ct.Record, er return s.composeRecordService.With(ctx).Create(rec) } +// UpdateRecord wraps the compose Record service Update +func (s *Sync) UpdateRecord(ctx context.Context, rec *ct.Record) (*ct.Record, error) { + return s.composeRecordService.With(ctx).Update(rec) +} + +// FindRecord find the record via federation label +func (s *Sync) FindRecords(ctx context.Context, filter ct.RecordFilter) (set ct.RecordSet, err error) { + set, _, err = s.composeRecordService.With(ctx).Find(filter) + return +} + // LookupSharedModule find the shared module if exists func (s *Sync) LookupSharedModule(ctx context.Context, new *types.SharedModule) (*types.SharedModule, error) { var sm *types.SharedModule @@ -101,14 +112,17 @@ func (s *Sync) LookupSharedModule(ctx context.Context, new *types.SharedModule) return sm, nil } +// UpdateSharedModule wraps the federation SharedModule service Update func (s *Sync) UpdateSharedModule(ctx context.Context, updated *types.SharedModule) (*types.SharedModule, error) { return s.sharedModuleService.Update(ctx, updated) } +// CreateSharedModule wraps the federation SharedModule service Create func (s *Sync) CreateSharedModule(ctx context.Context, new *types.SharedModule) (*types.SharedModule, error) { return s.sharedModuleService.Create(ctx, new) } +// GetPairedNodes finds successfuly paired nodes func (s *Sync) GetPairedNodes(ctx context.Context) (types.NodeSet, error) { set, _, err := DefaultNode.Search(ctx, types.NodeFilter{Status: types.NodeStatusPaired})