From 17e4f6ff75f4404303dbff8e69cdb82ab56b4ea2 Mon Sep 17 00:00:00 2001 From: Peter Grlica Date: Wed, 28 Oct 2020 17:33:26 +0100 Subject: [PATCH] Added sync service, data and structure sync commands --- federation/commands/sync.go | 82 +++++++++ federation/commands/sync_data.go | 246 ++++++++++++++++++++++++++ federation/commands/sync_structure.go | 197 +++++++++++++++++++++ federation/service/syncer.go | 86 +++++++++ federation/types/syncer_uri.go | 111 ++++++++++++ federation/types/syncer_uri_test.go | 92 ++++++++++ 6 files changed, 814 insertions(+) create mode 100644 federation/commands/sync.go create mode 100644 federation/commands/sync_data.go create mode 100644 federation/commands/sync_structure.go create mode 100644 federation/service/syncer.go create mode 100644 federation/types/syncer_uri.go create mode 100644 federation/types/syncer_uri_test.go diff --git a/federation/commands/sync.go b/federation/commands/sync.go new file mode 100644 index 000000000..63066b618 --- /dev/null +++ b/federation/commands/sync.go @@ -0,0 +1,82 @@ +package commands + +import ( + "context" + "fmt" + "time" + + "github.com/cortezaproject/corteza-server/federation/service" + "github.com/cortezaproject/corteza-server/federation/types" + "github.com/cortezaproject/corteza-server/pkg/cli" + + "github.com/spf13/cobra" +) + +const ( + httpTimeout time.Duration = 10 + baseURL string = "http://localhost:8084" +) + +type ( + serviceInitializer interface { + InitServices(ctx context.Context) error + } +) + +func Sync(app serviceInitializer) *cobra.Command { + + ctx := cli.Context() + + // Sync commands. + cmd := &cobra.Command{ + Use: "sync", + Short: "Sync commands", + } + + // Sync structure. + syncStructureCmd := &cobra.Command{ + Use: "structure", + Short: "Sync structure", + + PreRunE: commandPreRunInitService(ctx, app), + Run: commandSyncStructure(ctx), + } + + syncDataCmd := &cobra.Command{ + Use: "data", + Short: "Sync data", + + PreRunE: commandPreRunInitService(ctx, app), + Run: commandSyncData(ctx), + } + + cmd.AddCommand( + syncStructureCmd, + syncDataCmd, + ) + + return cmd +} + +func queueUrl(url *types.SyncerURI, urls chan types.SyncerURI, handler *service.Syncer) { + s, _ := url.String() + + service.DefaultLogger.Info(fmt.Sprintf("Adding %s to queue", s)) + handler.Queue(*url, urls) +} + +func getLastSyncTime(ctx context.Context, nodeID uint64, syncType string) *time.Time { + ns, _ := service.DefaultNodeSync.LookupLastSuccessfulSync(ctx, nodeID, syncType) + + if ns != nil { + return &ns.TimeOfAction + } + + return nil +} + +func commandPreRunInitService(ctx context.Context, app serviceInitializer) func(*cobra.Command, []string) error { + return func(_ *cobra.Command, _ []string) error { + return app.InitServices(ctx) + } +} diff --git a/federation/commands/sync_data.go b/federation/commands/sync_data.go new file mode 100644 index 000000000..261328bf8 --- /dev/null +++ b/federation/commands/sync_data.go @@ -0,0 +1,246 @@ +package commands + +import ( + "context" + "fmt" + "io/ioutil" + "time" + + ct "github.com/cortezaproject/corteza-server/compose/types" + "github.com/cortezaproject/corteza-server/federation/service" + "github.com/cortezaproject/corteza-server/federation/types" + "github.com/cortezaproject/corteza-server/pkg/decoder" + "github.com/davecgh/go-spew/spew" + "github.com/spf13/cobra" +) + +var ( + dataSyncer *service.Syncer +) + +func commandSyncData(ctx context.Context) func(*cobra.Command, []string) { + + // from + // - module id (Account) = 167296235227578369 + // - namespace id = 167296235059806209 + + // to + // - module id (Account) = 167296265594339329 + // - namespace id = 167296264570929153 + + return func(_ *cobra.Command, _ []string) { + + // todo + // - get all nodes and loop + // - get all shared modules that have a field mapping + // - for each shared module, the add method needs shared module info as payload + + // todo - get node by id + const ( + nodeID = 276342359342989444 + moduleID = 376342359342989444 + limit = 5 + syncDelay = 10 + ) + + node, err := service.DefaultNode.FindByID(ctx, nodeID) + + if err != nil { + service.DefaultLogger.Info("could not find any nodes to sync") + return + } + + ticker := time.NewTicker(time.Second * syncDelay) + basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", node.SharedNodeID, moduleID) + + dataSyncer = service.NewSyncer() + + // todo - get auth from the node + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // get the last sync per-node + lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeData) + + ex := "" + if lastSync != nil { + ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339)) + } + + service.DefaultLogger.Info(fmt.Sprintf("Starting structure sync%s", ex)) + + url := types.SyncerURI{ + BaseURL: baseURL, + Path: basePath, + Limit: limit, + LastSync: lastSync, + } + + go queueUrl(&url, urls, dataSyncer) + + for { + select { + case <-ctx.Done(): + // do any cleanup here + service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist)) + return + case <-ticker.C: + lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure) + + url := types.SyncerURI{ + BaseURL: baseURL, + Path: basePath, + Limit: limit, + LastSync: lastSync, + } + + if lastSync == nil { + service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules from beginning of time, since lastSync == nil")) + } else { + service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules, start with time: %s", lastSync.Format(time.RFC3339))) + } + + go queueUrl(&url, urls, dataSyncer) + case url := <-urls: + select { + case <-ctx.Done(): + // do any cleanup here + service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist)) + return + default: + } + + s, err := url.String() + + if err != nil { + continue + } + + responseBody, err := dataSyncer.Fetch(ctx, s) + + if err != nil { + continue + } + + payloads <- responseBody + case p := <-payloads: + body, err := ioutil.ReadAll(p) + + // handle error + if err != nil { + continue + } + + u := types.SyncerURI{ + BaseURL: baseURL, + Path: basePath, + Limit: limit, + } + + // method needs: + // - compose module ID (from module_mapping) + // - namespace ID (from module mapping) + + err = dataSyncer.Process(ctx, body, node.ID, urls, u, dataSyncer, persistExposedRecordData) + + if err != nil { + // handle error + service.DefaultLogger.Error(fmt.Sprintf("error on handling payload: %s", err)) + } else { + n := time.Now().UTC() + + // add to db - nodes_sync + new := &types.NodeSync{ + NodeID: node.ID, + SyncStatus: types.NodeSyncStatusSuccess, + SyncType: types.NodeSyncTypeStructure, + TimeOfAction: n, + } + + new, err := service.DefaultNodeSync.Create(ctx, new) + + if err != nil { + service.DefaultLogger.Warn(fmt.Sprintf("could not update sync status: %s", err)) + } + } + } + } + } +} + +// persistExposedRecordData gets the payload from syncer and +// uses the decode package to decode the whole set, depending on +// the filtering that was used (limit) +func persistExposedRecordData(ctx context.Context, payload []byte, nodeID uint64, dataSyncer *service.Syncer) error { + countProcess = countProcess + 1 + + // now := time.Now() + o, err := decoder.DecodeFederationRecordSync([]byte(payload)) + + if err != nil { + return err + } + + if len(o) == 0 { + return nil + } + + service.DefaultLogger.Info(fmt.Sprintf("Adding %d objects", len(o))) + + for _, er := range o { + // spew.Dump(o) + // create a new compose record + // get the compose module for this record + // fill in the values + + // tmp + vals := []*ct.RecordValue{ + &ct.RecordValue{ + Name: "AccountName", + Value: "Accout name is required", + }, + } + + for _, v := range er.Values { + if v.Name == "AccountName" { + vv := &ct.RecordValue{ + Name: "Name", + Value: v.Value, + } + vals = append(vals, vv) + } + + if v.Name == "Facebook" { + vv := &ct.RecordValue{ + Name: "Facebook", + Value: v.Value, + } + vals = append(vals, vv) + } + + if v.Name == "Phone" { + vv := &ct.RecordValue{ + Name: "Phone", + Value: v.Value, + } + vals = append(vals, vv) + } + } + + // - module id (Account) = 167296265594339329 + // - namespace id = 167296264570929153 + + rec := &ct.Record{ + ModuleID: 167296265594339329, + NamespaceID: 167296264570929153, + Values: vals, + } + + rc, err := dataSyncer.CService.With(ctx).Create(rec) + + spew.Dump(rc, err) + + } + + return nil +} diff --git a/federation/commands/sync_structure.go b/federation/commands/sync_structure.go new file mode 100644 index 000000000..0015074d8 --- /dev/null +++ b/federation/commands/sync_structure.go @@ -0,0 +1,197 @@ +package commands + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "time" + + "github.com/cortezaproject/corteza-server/federation/service" + "github.com/cortezaproject/corteza-server/federation/types" + "github.com/cortezaproject/corteza-server/pkg/decoder" + "github.com/spf13/cobra" +) + +var ( + urls = make(chan types.SyncerURI, 1) + payloads = make(chan io.Reader, 1) + countProcess = 0 + countPersist = 0 + structureSyncer *service.Syncer +) + +func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) { + + return func(_ *cobra.Command, _ []string) { + + // todo - get node by id + const ( + nodeID = 276342359342989444 + limit = 5 + syncDelay = 10 + ) + + node, err := service.DefaultNode.FindByID(ctx, nodeID) + + if err != nil { + service.DefaultLogger.Info("could not find any nodes to sync") + return + } + + ticker := time.NewTicker(time.Second * syncDelay) + basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", node.SharedNodeID) + + structureSyncer = service.NewSyncer() + + // todo - get auth from the node + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // get the last sync per-node + lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure) + + ex := "" + if lastSync != nil { + ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339)) + } + + service.DefaultLogger.Info(fmt.Sprintf("Starting structure sync%s", ex)) + + url := types.SyncerURI{ + BaseURL: baseURL, + Path: basePath, + Limit: limit, + LastSync: lastSync, + } + + go queueUrl(&url, urls, structureSyncer) + + for { + select { + case <-ctx.Done(): + // do any cleanup here + service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist)) + return + case <-ticker.C: + lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure) + + url := types.SyncerURI{ + BaseURL: baseURL, + Path: basePath, + Limit: limit, + LastSync: lastSync, + } + + if lastSync == nil { + service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules from beginning of time, since lastSync == nil")) + } else { + service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules, start with time: %s", lastSync.Format(time.RFC3339))) + } + + go queueUrl(&url, urls, structureSyncer) + case url := <-urls: + select { + case <-ctx.Done(): + // do any cleanup here + service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist)) + return + default: + } + + s, err := url.String() + + if err != nil { + continue + } + + responseBody, err := structureSyncer.Fetch(ctx, s) + + if err != nil { + continue + } + + payloads <- responseBody + case p := <-payloads: + body, err := ioutil.ReadAll(p) + + // handle error + if err != nil { + continue + } + + u := types.SyncerURI{ + BaseURL: baseURL, + Path: basePath, + Limit: limit, + } + + err = structureSyncer.Process(ctx, body, node.ID, urls, u, structureSyncer, persistExposedModuleStructure) + + if err != nil { + // handle error + service.DefaultLogger.Error(fmt.Sprintf("error on handling payload: %s", err)) + } else { + n := time.Now().UTC() + + // add to db - nodes_sync + new := &types.NodeSync{ + NodeID: node.ID, + SyncStatus: types.NodeSyncStatusSuccess, + SyncType: types.NodeSyncTypeStructure, + TimeOfAction: n, + } + + new, err := service.DefaultNodeSync.Create(ctx, new) + + if err != nil { + service.DefaultLogger.Warn(fmt.Sprintf("could not update sync status: %s", err)) + } + } + } + } + } +} + +// persistExposedModuleStructure gets the payload from syncer and +// uses the decode package to decode the whole set, depending on +// the filtering that was used (limit) +func persistExposedModuleStructure(ctx context.Context, payload []byte, nodeID uint64, structureSyncer *service.Syncer) error { + countProcess = countProcess + 1 + + now := time.Now() + o, err := decoder.DecodeFederationModuleSync([]byte(payload)) + + if err != nil { + return err + } + + if len(o) == 0 { + return nil + } + + service.DefaultLogger.Info(fmt.Sprintf("Adding %d objects", len(o))) + + for i, em := range o { + n := &types.SharedModule{ + NodeID: nodeID, + ExternalFederationModuleID: em.ID, + Fields: em.Fields, + Handle: fmt.Sprintf("Handle %d %d", i, now.Unix()), + Name: fmt.Sprintf("Name %d %d", i, now.Unix()), + } + + n, err := structureSyncer.Service.Create(ctx, n) + + service.DefaultLogger.Info(fmt.Sprintf("Added shared module: %d", n.ID)) + + if err != nil { + service.DefaultLogger.Error(fmt.Sprintf("could not create shared module: %s", err)) + continue + } + + countPersist = countPersist + 1 + } + + return nil +} diff --git a/federation/service/syncer.go b/federation/service/syncer.go new file mode 100644 index 000000000..f005741d4 --- /dev/null +++ b/federation/service/syncer.go @@ -0,0 +1,86 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "time" + + "github.com/cortezaproject/corteza-server/compose/service" + "github.com/cortezaproject/corteza-server/federation/types" +) + +type ( + Syncer struct { + Service SharedModuleService + CService service.RecordService + } + + AuxResponseSet struct { + Response struct { + Filter struct { + NodeID string + ComposeModuleID string + Query string + Limit int + NextPage string + PrevPage string + } `json:"filter"` + } `json:"response"` + } +) + +func (h *Syncer) Queue(url types.SyncerURI, out chan types.SyncerURI) { + out <- url +} + +func (h *Syncer) Fetch(ctx context.Context, url string) (io.Reader, error) { + client := http.Client{ + Timeout: time.Duration(3) * time.Second, + } + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, errors.New("404") + } + + return resp.Body, nil +} + +func (h *Syncer) Process(ctx context.Context, payload []byte, nodeID uint64, out chan types.SyncerURI, url types.SyncerURI, handler *Syncer, fn func(ctx context.Context, payload []byte, nodeID uint64, handler *Syncer) error) error { + aux := AuxResponseSet{} + err := json.Unmarshal(payload, &aux) + + if err != nil { + return err + } + + if aux.Response.Filter.NextPage != "" { + url.NextPage = aux.Response.Filter.NextPage + + // out <- url + + // sync data + // out <- fmt.Sprintf("%s/federation/exposed/modules/196342359342989002/records/?limit=%d&pageCursor=%s", "http://localhost:8084", aux.Response.Filter.Limit, aux.Response.Filter.NextPage) + } + + return fn(ctx, payload, nodeID, handler) +} + +func NewSyncer() *Syncer { + return &Syncer{ + Service: DefaultSharedModule, + CService: service.DefaultRecord, + } +} diff --git a/federation/types/syncer_uri.go b/federation/types/syncer_uri.go new file mode 100644 index 000000000..4dc9778bf --- /dev/null +++ b/federation/types/syncer_uri.go @@ -0,0 +1,111 @@ +package types + +import ( + "fmt" + "net/url" + "strconv" + "strings" + "time" + + "github.com/davecgh/go-spew/spew" +) + +// SyncerURI serves as a default struct for handling url queues +// for fetcher +type ( + SyncerURI struct { + Limit int + LastSync *time.Time + // todo + // LastSync *LastSyncTime + Path string + BaseURL string + NextPage string + LastPage string + } +) + +// Does the transformation to string, does not +// url encode page cursor, since it needs to be base64 +// encoded +func (s *SyncerURI) String() (string, error) { + var query []string + + // using query instead of url.Values + // because pageCursor should not be url encoded + query = append(query, fmt.Sprintf("limit=%d", s.Limit)) + + if s.NextPage != "" { + query = append(query, fmt.Sprintf("pageCursor=%s", s.NextPage)) + } else if s.LastPage != "" { + query = append(query, fmt.Sprintf("pageCursor=%s", s.LastPage)) + } + + if s.LastSync != nil { + query = append(query, fmt.Sprintf("lastSync=%d", s.LastSync.Unix())) + } + + return fmt.Sprintf("%s%s?%s", s.BaseURL, s.Path, strings.Join(query[:], "&")), nil +} + +// Parse the uri to the struct +func (s *SyncerURI) Parse(uri string) error { + u, err := url.Parse(uri) + + if err != nil { + return err + } + + var limit int + l := u.Query().Get("limit") + + if l != "" { + if limit, err = strconv.Atoi(l); err != nil { + return err + } + } + + s.Path = u.Path + s.Limit = limit + + if u.Host != "" { + s.BaseURL = fmt.Sprintf("%s://%s", u.Scheme, u.Host) + } + + ls := u.Query().Get("lastSync") + + if ls != "" { + parsed, err := parseLastSync(ls) + + if err != nil { + return err + } + + s.LastSync = parsed + } + + return nil +} + +func parseLastSync(lastSync string) (*time.Time, error) { + spew.Dump("parse last sync") + if i, err := strconv.ParseInt(lastSync, 10, 64); err == nil { + spew.Dump("returning") + t := time.Unix(i, 0) + return &t, nil + } + + // try different format if above fails + spew.Dump("TRY HERE") + if t, err := time.Parse(time.RFC3339, lastSync); err == nil { + return &t, nil + } + + t, err := time.Parse("2006-01-02", lastSync) + + if err != nil { + return nil, err + } + + return &t, nil +} diff --git a/federation/types/syncer_uri_test.go b/federation/types/syncer_uri_test.go new file mode 100644 index 000000000..1e74a7000 --- /dev/null +++ b/federation/types/syncer_uri_test.go @@ -0,0 +1,92 @@ +package types + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSyncerURIString(t *testing.T) { + var ( + req = require.New(t) + ) + + now, _ := time.Parse("2006-01-02 15:04:05", "2020-10-23 11:11:11") + + tests := []struct { + name string + url *SyncerURI + expect string + }{ + { + name: "next page cursor", + url: &SyncerURI{BaseURL: "https://example.url", NextPage: "NEXT_PAGE_CURSOR=="}, + expect: "https://example.url?limit=0&pageCursor=NEXT_PAGE_CURSOR==", + }, + { + name: "last page cursor", + url: &SyncerURI{BaseURL: "https://example.url", LastPage: "LAST_PAGE_CURSOR=="}, + expect: "https://example.url?limit=0&pageCursor=LAST_PAGE_CURSOR==", + }, + { + name: "limit results", + url: &SyncerURI{BaseURL: "https://example.url", Limit: 666}, + expect: "https://example.url?limit=666", + }, + { + name: "base path", + url: &SyncerURI{Path: "/relative/path"}, + expect: "/relative/path?limit=0", + }, + { + name: "last sync", + url: &SyncerURI{Path: "/relative/path", LastSync: &now}, + expect: "/relative/path?limit=0&lastSync=2020-10-23T11:11:11Z", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r, _ := tt.url.String() + + req.Equal(tt.expect, r) + }) + } +} + +func TestSyncerURIParse(t *testing.T) { + var ( + req = require.New(t) + ) + + // TODO - add a parse last sync date + // now, _ := time.Parse("2006-01-02 15:04:05", "2020-10-23 11:11:11") + + tests := []struct { + name string + url string + expect *SyncerURI + }{ + { + name: "parse limit", + url: "https://example.url?limit=11", + expect: &SyncerURI{BaseURL: "https://example.url", Limit: 11}, + }, + { + name: "parse path", + url: "/path/to/endpoint/", + expect: &SyncerURI{Path: "/path/to/endpoint/"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SyncerURI{} + err := s.Parse(tt.url) + + req.Equal(tt.expect, s) + req.NoError(err) + }) + } +}