Added data sync update functionality
This commit is contained in:
parent
cede129ede
commit
0db26041c0
5
Makefile
5
Makefile
@ -34,12 +34,13 @@ COVER_FLAGS ?= -covermode=$(COVER_MODE) -coverprofile=$(COVER_PROFILE)
|
||||
COVER_PKGS_messaging = ./messaging/...
|
||||
COVER_PKGS_system = ./system/...
|
||||
COVER_PKGS_compose = ./compose/...
|
||||
COVER_PKGS_federation = ./federation/...
|
||||
COVER_PKGS_pkg = ./pkg/...
|
||||
COVER_PKGS_all = $(COVER_PKGS_pkg),$(COVER_PKGS_messaging),$(COVER_PKGS_system),$(COVER_PKGS_compose)
|
||||
COVER_PKGS_all = $(COVER_PKGS_pkg),$(COVER_PKGS_messaging),$(COVER_PKGS_system),$(COVER_PKGS_compose),$(COVER_PKGS_federation)
|
||||
COVER_PKGS_integration = $(COVER_PKGS_all)
|
||||
|
||||
TEST_SUITE_pkg = ./pkg/...
|
||||
TEST_SUITE_services = ./compose/... ./messaging/... ./system/...
|
||||
TEST_SUITE_services = ./compose/... ./messaging/... ./system/... ./federation/...
|
||||
TEST_SUITE_unit = $(TEST_SUITE_pkg) $(TEST_SUITE_services)
|
||||
TEST_SUITE_integration = ./tests/...
|
||||
TEST_SUITE_store = ./store/tests/...
|
||||
|
||||
@ -123,7 +123,7 @@ func (svc exposedModule) Update(ctx context.Context, updated *types.ExposedModul
|
||||
updated.UpdatedBy = auth.GetIdentityFromContext(ctx).Identity()
|
||||
|
||||
// set labels
|
||||
AddFederationLabel(m, node.BaseURL)
|
||||
AddFederationLabel(m, "federation", node.BaseURL)
|
||||
|
||||
if _, err := svc.module.With(ctx).Update(m); err != nil {
|
||||
return err
|
||||
@ -267,7 +267,7 @@ func (svc exposedModule) Create(ctx context.Context, new *types.ExposedModule) (
|
||||
}
|
||||
|
||||
// set labels
|
||||
AddFederationLabel(m, node.BaseURL)
|
||||
AddFederationLabel(m, "federation", node.BaseURL)
|
||||
|
||||
if _, err := svc.module.With(ctx).Update(m); err != nil {
|
||||
return err
|
||||
|
||||
@ -141,7 +141,7 @@ func (svc moduleMapping) Create(ctx context.Context, new *types.ModuleMapping) (
|
||||
}
|
||||
|
||||
// set labels
|
||||
AddFederationLabel(m, "")
|
||||
AddFederationLabel(m, "federation", "")
|
||||
|
||||
if _, err := svc.module.With(ctx).Update(m); err != nil {
|
||||
return err
|
||||
@ -185,7 +185,7 @@ func (svc moduleMapping) Update(ctx context.Context, updated *types.ModuleMappin
|
||||
}
|
||||
|
||||
// set labels
|
||||
AddFederationLabel(m, "")
|
||||
AddFederationLabel(m, "federation", "")
|
||||
|
||||
if _, err := svc.module.With(ctx).Update(m); err != nil {
|
||||
return err
|
||||
|
||||
@ -475,7 +475,7 @@ func (svc node) fetchFederatedUser(ctx context.Context, n *types.Node) (*sysType
|
||||
Handle: uHandle,
|
||||
}
|
||||
|
||||
AddFederationLabel(user, n.BaseURL)
|
||||
AddFederationLabel(user, "federation", n.BaseURL)
|
||||
|
||||
// Create a user to service this node
|
||||
r, err := service.DefaultRole.With(ctx).FindByName("federation")
|
||||
|
||||
@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
ct "github.com/cortezaproject/corteza-server/compose/types"
|
||||
"github.com/cortezaproject/corteza-server/federation/types"
|
||||
@ -32,6 +33,11 @@ type (
|
||||
// uses the decode package to decode the whole set, depending on
|
||||
// the filtering that was used (limit)
|
||||
func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (ProcesserResponse, error) {
|
||||
var (
|
||||
rec *ct.Record
|
||||
err error
|
||||
)
|
||||
|
||||
processed := 0
|
||||
o, err := decoder.DecodeFederationRecordSync([]byte(payload))
|
||||
|
||||
@ -52,15 +58,40 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (Processer
|
||||
for _, er := range o {
|
||||
dp.SyncService.mapper.Merge(&er.Values, dp.ModuleMappingValues, dp.ModuleMappings)
|
||||
|
||||
rec := &ct.Record{
|
||||
ModuleID: dp.ComposeModuleID,
|
||||
NamespaceID: dp.ComposeNamespaceID,
|
||||
Values: *dp.ModuleMappingValues,
|
||||
if er.UpdatedAt != nil {
|
||||
|
||||
rec, err = dp.findRecordByFederationID(ctx, er.ID, dp.ComposeModuleID, dp.ComposeNamespaceID)
|
||||
|
||||
if err != nil {
|
||||
// could not find existing record
|
||||
continue
|
||||
}
|
||||
|
||||
if rec != nil {
|
||||
rec.Values = *dp.ModuleMappingValues
|
||||
}
|
||||
}
|
||||
|
||||
AddFederationLabel(rec, dp.NodeBaseURL)
|
||||
// if the record was updated on origin, but we somehow do not have it
|
||||
// create it anyway
|
||||
if rec == nil {
|
||||
|
||||
_, err := dp.SyncService.CreateRecord(ctx, rec)
|
||||
rec = &ct.Record{
|
||||
ModuleID: dp.ComposeModuleID,
|
||||
NamespaceID: dp.ComposeNamespaceID,
|
||||
Values: *dp.ModuleMappingValues,
|
||||
}
|
||||
|
||||
AddFederationLabel(rec, "federation", dp.NodeBaseURL)
|
||||
AddFederationLabel(rec, "federation_extrecord", fmt.Sprintf("%d", er.ID))
|
||||
|
||||
}
|
||||
|
||||
if rec.ID != 0 {
|
||||
_, err = dp.SyncService.UpdateRecord(ctx, rec)
|
||||
} else {
|
||||
_, err = dp.SyncService.CreateRecord(ctx, rec)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
continue
|
||||
@ -73,3 +104,20 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (Processer
|
||||
Processed: processed,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// findRecordByFederationID finds any already existing records via
|
||||
// federation label
|
||||
func (dp *dataProcesser) findRecordByFederationID(ctx context.Context, recordID, moduleID, namespaceID uint64) (r *ct.Record, err error) {
|
||||
filter := ct.RecordFilter{
|
||||
NamespaceID: namespaceID,
|
||||
ModuleID: moduleID,
|
||||
Labels: map[string]string{"federation_extrecord": fmt.Sprintf("%d", recordID)}}
|
||||
|
||||
if s, err := dp.SyncService.FindRecords(ctx, filter); err == nil {
|
||||
if len(s) == 1 {
|
||||
r = s[0]
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -25,6 +25,9 @@ type (
|
||||
testRecordServicePersistSuccess struct {
|
||||
cs.RecordService
|
||||
}
|
||||
testRecordServiceUpdateSuccess struct {
|
||||
cs.RecordService
|
||||
}
|
||||
testRecordServicePersistError struct {
|
||||
cs.RecordService
|
||||
}
|
||||
@ -50,7 +53,7 @@ func TestProcesserData_persist(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
"successful persist on valid mapping",
|
||||
`{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}]}]}}`,
|
||||
`{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}],"createdAt":"2020-12-05T10:10:10Z"}]}}`,
|
||||
`[{"origin":{"kind":"String","name":"Description","label":"Description","isMulti":false},"destination":{"kind":"String","name":"Name","label":"Description","isMulti":false}},{"origin":{"kind":"Url","name":"Facebook","label":"Facebook","isMulti":false},"destination":{"kind":"Url","name":"Fb","label":"Facebook","isMulti":false}}]`,
|
||||
1,
|
||||
"",
|
||||
@ -63,6 +66,21 @@ func TestProcesserData_persist(t *testing.T) {
|
||||
&testUserService{},
|
||||
&testRoleService{}),
|
||||
},
|
||||
{
|
||||
"successful update on valid mapping and existing federated record",
|
||||
`{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}],"createdAt":"2020-12-05T10:10:10Z", "updatedAt":"2020-12-06T10:10:10Z"}]}}`,
|
||||
`[{"origin":{"kind":"String","name":"Description","label":"Description","isMulti":false},"destination":{"kind":"String","name":"Name","label":"Description","isMulti":false}},{"origin":{"kind":"Url","name":"Facebook","label":"Facebook","isMulti":false},"destination":{"kind":"Url","name":"Fb","label":"Facebook","isMulti":false}}]`,
|
||||
1,
|
||||
"",
|
||||
&ct.RecordValueSet{&ct.RecordValue{Name: "Fb", Value: ""}},
|
||||
NewSync(
|
||||
&Syncer{},
|
||||
&Mapper{},
|
||||
&testSharedModuleService{},
|
||||
&testRecordServiceUpdateSuccess{},
|
||||
&testUserService{},
|
||||
&testRoleService{}),
|
||||
},
|
||||
{
|
||||
"persist error on valid mapping",
|
||||
`{"response": {"set": [{"recordID":"1","values":[{"name":"Facebook","value":"foobar"}]}]}}`,
|
||||
@ -161,14 +179,33 @@ func TestProcesserData_persist(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// create success
|
||||
func (s testRecordServicePersistSuccess) Create(record *ct.Record) (*ct.Record, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s testRecordServicePersistSuccess) Find(filter ct.RecordFilter) (ct.RecordSet, ct.RecordFilter, error) {
|
||||
return ct.RecordSet{}, ct.RecordFilter{}, nil
|
||||
}
|
||||
|
||||
func (s testRecordServicePersistSuccess) With(_ context.Context) cs.RecordService {
|
||||
return &testRecordServicePersistSuccess{}
|
||||
}
|
||||
|
||||
// update success
|
||||
func (s testRecordServiceUpdateSuccess) Update(record *ct.Record) (*ct.Record, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s testRecordServiceUpdateSuccess) Find(filter ct.RecordFilter) (ct.RecordSet, ct.RecordFilter, error) {
|
||||
return ct.RecordSet{&ct.Record{ID: 2}}, ct.RecordFilter{}, nil
|
||||
}
|
||||
|
||||
func (s testRecordServiceUpdateSuccess) With(_ context.Context) cs.RecordService {
|
||||
return &testRecordServiceUpdateSuccess{}
|
||||
}
|
||||
|
||||
// create error
|
||||
func (s testRecordServicePersistError) Create(record *ct.Record) (*ct.Record, error) {
|
||||
return nil, errors.New("mocked error")
|
||||
}
|
||||
|
||||
@ -162,15 +162,15 @@ func Watchers(ctx context.Context) {
|
||||
|
||||
go syncStructure.Watch(
|
||||
ctx,
|
||||
time.Second*DefaultOptions.StructureMonitorInterval,
|
||||
DefaultOptions.StructureMonitorInterval,
|
||||
DefaultOptions.StructurePageSize)
|
||||
|
||||
go syncData.Watch(
|
||||
ctx,
|
||||
time.Second*DefaultOptions.DataMonitorInterval,
|
||||
DefaultOptions.DataMonitorInterval,
|
||||
DefaultOptions.DataPageSize)
|
||||
}
|
||||
|
||||
func AddFederationLabel(entity label.LabeledResource, value string) {
|
||||
entity.SetLabel("federation", value)
|
||||
func AddFederationLabel(entity label.LabeledResource, key string, value string) {
|
||||
entity.SetLabel(key, value)
|
||||
}
|
||||
|
||||
@ -80,6 +80,17 @@ func (s *Sync) CreateRecord(ctx context.Context, rec *ct.Record) (*ct.Record, er
|
||||
return s.composeRecordService.With(ctx).Create(rec)
|
||||
}
|
||||
|
||||
// UpdateRecord wraps the compose Record service Update
|
||||
func (s *Sync) UpdateRecord(ctx context.Context, rec *ct.Record) (*ct.Record, error) {
|
||||
return s.composeRecordService.With(ctx).Update(rec)
|
||||
}
|
||||
|
||||
// FindRecord find the record via federation label
|
||||
func (s *Sync) FindRecords(ctx context.Context, filter ct.RecordFilter) (set ct.RecordSet, err error) {
|
||||
set, _, err = s.composeRecordService.With(ctx).Find(filter)
|
||||
return
|
||||
}
|
||||
|
||||
// LookupSharedModule find the shared module if exists
|
||||
func (s *Sync) LookupSharedModule(ctx context.Context, new *types.SharedModule) (*types.SharedModule, error) {
|
||||
var sm *types.SharedModule
|
||||
@ -101,14 +112,17 @@ func (s *Sync) LookupSharedModule(ctx context.Context, new *types.SharedModule)
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
// UpdateSharedModule wraps the federation SharedModule service Update
|
||||
func (s *Sync) UpdateSharedModule(ctx context.Context, updated *types.SharedModule) (*types.SharedModule, error) {
|
||||
return s.sharedModuleService.Update(ctx, updated)
|
||||
}
|
||||
|
||||
// CreateSharedModule wraps the federation SharedModule service Create
|
||||
func (s *Sync) CreateSharedModule(ctx context.Context, new *types.SharedModule) (*types.SharedModule, error) {
|
||||
return s.sharedModuleService.Create(ctx, new)
|
||||
}
|
||||
|
||||
// GetPairedNodes finds successfuly paired nodes
|
||||
func (s *Sync) GetPairedNodes(ctx context.Context) (types.NodeSet, error) {
|
||||
set, _, err := DefaultNode.Search(ctx, types.NodeFilter{Status: types.NodeStatusPaired})
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user