3
0

Added sync service, data and structure sync commands

This commit is contained in:
Peter Grlica 2020-10-28 17:33:26 +01:00
parent f7f69895a8
commit 17e4f6ff75
6 changed files with 814 additions and 0 deletions

View File

@ -0,0 +1,82 @@
package commands
import (
"context"
"fmt"
"time"
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/cli"
"github.com/spf13/cobra"
)
const (
httpTimeout time.Duration = 10
baseURL string = "http://localhost:8084"
)
type (
serviceInitializer interface {
InitServices(ctx context.Context) error
}
)
func Sync(app serviceInitializer) *cobra.Command {
ctx := cli.Context()
// Sync commands.
cmd := &cobra.Command{
Use: "sync",
Short: "Sync commands",
}
// Sync structure.
syncStructureCmd := &cobra.Command{
Use: "structure",
Short: "Sync structure",
PreRunE: commandPreRunInitService(ctx, app),
Run: commandSyncStructure(ctx),
}
syncDataCmd := &cobra.Command{
Use: "data",
Short: "Sync data",
PreRunE: commandPreRunInitService(ctx, app),
Run: commandSyncData(ctx),
}
cmd.AddCommand(
syncStructureCmd,
syncDataCmd,
)
return cmd
}
func queueUrl(url *types.SyncerURI, urls chan types.SyncerURI, handler *service.Syncer) {
s, _ := url.String()
service.DefaultLogger.Info(fmt.Sprintf("Adding %s to queue", s))
handler.Queue(*url, urls)
}
func getLastSyncTime(ctx context.Context, nodeID uint64, syncType string) *time.Time {
ns, _ := service.DefaultNodeSync.LookupLastSuccessfulSync(ctx, nodeID, syncType)
if ns != nil {
return &ns.TimeOfAction
}
return nil
}
func commandPreRunInitService(ctx context.Context, app serviceInitializer) func(*cobra.Command, []string) error {
return func(_ *cobra.Command, _ []string) error {
return app.InitServices(ctx)
}
}

View File

@ -0,0 +1,246 @@
package commands
import (
"context"
"fmt"
"io/ioutil"
"time"
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/decoder"
"github.com/davecgh/go-spew/spew"
"github.com/spf13/cobra"
)
var (
dataSyncer *service.Syncer
)
func commandSyncData(ctx context.Context) func(*cobra.Command, []string) {
// from
// - module id (Account) = 167296235227578369
// - namespace id = 167296235059806209
// to
// - module id (Account) = 167296265594339329
// - namespace id = 167296264570929153
return func(_ *cobra.Command, _ []string) {
// todo
// - get all nodes and loop
// - get all shared modules that have a field mapping
// - for each shared module, the add method needs shared module info as payload
// todo - get node by id
const (
nodeID = 276342359342989444
moduleID = 376342359342989444
limit = 5
syncDelay = 10
)
node, err := service.DefaultNode.FindByID(ctx, nodeID)
if err != nil {
service.DefaultLogger.Info("could not find any nodes to sync")
return
}
ticker := time.NewTicker(time.Second * syncDelay)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", node.SharedNodeID, moduleID)
dataSyncer = service.NewSyncer()
// todo - get auth from the node
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// get the last sync per-node
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeData)
ex := ""
if lastSync != nil {
ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339))
}
service.DefaultLogger.Info(fmt.Sprintf("Starting structure sync%s", ex))
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
go queueUrl(&url, urls, dataSyncer)
for {
select {
case <-ctx.Done():
// do any cleanup here
service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist))
return
case <-ticker.C:
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure)
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
if lastSync == nil {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules from beginning of time, since lastSync == nil"))
} else {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules, start with time: %s", lastSync.Format(time.RFC3339)))
}
go queueUrl(&url, urls, dataSyncer)
case url := <-urls:
select {
case <-ctx.Done():
// do any cleanup here
service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist))
return
default:
}
s, err := url.String()
if err != nil {
continue
}
responseBody, err := dataSyncer.Fetch(ctx, s)
if err != nil {
continue
}
payloads <- responseBody
case p := <-payloads:
body, err := ioutil.ReadAll(p)
// handle error
if err != nil {
continue
}
u := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
}
// method needs:
// - compose module ID (from module_mapping)
// - namespace ID (from module mapping)
err = dataSyncer.Process(ctx, body, node.ID, urls, u, dataSyncer, persistExposedRecordData)
if err != nil {
// handle error
service.DefaultLogger.Error(fmt.Sprintf("error on handling payload: %s", err))
} else {
n := time.Now().UTC()
// add to db - nodes_sync
new := &types.NodeSync{
NodeID: node.ID,
SyncStatus: types.NodeSyncStatusSuccess,
SyncType: types.NodeSyncTypeStructure,
TimeOfAction: n,
}
new, err := service.DefaultNodeSync.Create(ctx, new)
if err != nil {
service.DefaultLogger.Warn(fmt.Sprintf("could not update sync status: %s", err))
}
}
}
}
}
}
// persistExposedRecordData gets the payload from syncer and
// uses the decode package to decode the whole set, depending on
// the filtering that was used (limit)
func persistExposedRecordData(ctx context.Context, payload []byte, nodeID uint64, dataSyncer *service.Syncer) error {
countProcess = countProcess + 1
// now := time.Now()
o, err := decoder.DecodeFederationRecordSync([]byte(payload))
if err != nil {
return err
}
if len(o) == 0 {
return nil
}
service.DefaultLogger.Info(fmt.Sprintf("Adding %d objects", len(o)))
for _, er := range o {
// spew.Dump(o)
// create a new compose record
// get the compose module for this record
// fill in the values
// tmp
vals := []*ct.RecordValue{
&ct.RecordValue{
Name: "AccountName",
Value: "Accout name is required",
},
}
for _, v := range er.Values {
if v.Name == "AccountName" {
vv := &ct.RecordValue{
Name: "Name",
Value: v.Value,
}
vals = append(vals, vv)
}
if v.Name == "Facebook" {
vv := &ct.RecordValue{
Name: "Facebook",
Value: v.Value,
}
vals = append(vals, vv)
}
if v.Name == "Phone" {
vv := &ct.RecordValue{
Name: "Phone",
Value: v.Value,
}
vals = append(vals, vv)
}
}
// - module id (Account) = 167296265594339329
// - namespace id = 167296264570929153
rec := &ct.Record{
ModuleID: 167296265594339329,
NamespaceID: 167296264570929153,
Values: vals,
}
rc, err := dataSyncer.CService.With(ctx).Create(rec)
spew.Dump(rc, err)
}
return nil
}

View File

@ -0,0 +1,197 @@
package commands
import (
"context"
"fmt"
"io"
"io/ioutil"
"time"
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/decoder"
"github.com/spf13/cobra"
)
var (
urls = make(chan types.SyncerURI, 1)
payloads = make(chan io.Reader, 1)
countProcess = 0
countPersist = 0
structureSyncer *service.Syncer
)
func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) {
return func(_ *cobra.Command, _ []string) {
// todo - get node by id
const (
nodeID = 276342359342989444
limit = 5
syncDelay = 10
)
node, err := service.DefaultNode.FindByID(ctx, nodeID)
if err != nil {
service.DefaultLogger.Info("could not find any nodes to sync")
return
}
ticker := time.NewTicker(time.Second * syncDelay)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", node.SharedNodeID)
structureSyncer = service.NewSyncer()
// todo - get auth from the node
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// get the last sync per-node
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure)
ex := ""
if lastSync != nil {
ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339))
}
service.DefaultLogger.Info(fmt.Sprintf("Starting structure sync%s", ex))
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
go queueUrl(&url, urls, structureSyncer)
for {
select {
case <-ctx.Done():
// do any cleanup here
service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist))
return
case <-ticker.C:
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure)
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
if lastSync == nil {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules from beginning of time, since lastSync == nil"))
} else {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules, start with time: %s", lastSync.Format(time.RFC3339)))
}
go queueUrl(&url, urls, structureSyncer)
case url := <-urls:
select {
case <-ctx.Done():
// do any cleanup here
service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist))
return
default:
}
s, err := url.String()
if err != nil {
continue
}
responseBody, err := structureSyncer.Fetch(ctx, s)
if err != nil {
continue
}
payloads <- responseBody
case p := <-payloads:
body, err := ioutil.ReadAll(p)
// handle error
if err != nil {
continue
}
u := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
}
err = structureSyncer.Process(ctx, body, node.ID, urls, u, structureSyncer, persistExposedModuleStructure)
if err != nil {
// handle error
service.DefaultLogger.Error(fmt.Sprintf("error on handling payload: %s", err))
} else {
n := time.Now().UTC()
// add to db - nodes_sync
new := &types.NodeSync{
NodeID: node.ID,
SyncStatus: types.NodeSyncStatusSuccess,
SyncType: types.NodeSyncTypeStructure,
TimeOfAction: n,
}
new, err := service.DefaultNodeSync.Create(ctx, new)
if err != nil {
service.DefaultLogger.Warn(fmt.Sprintf("could not update sync status: %s", err))
}
}
}
}
}
}
// persistExposedModuleStructure gets the payload from syncer and
// uses the decode package to decode the whole set, depending on
// the filtering that was used (limit)
func persistExposedModuleStructure(ctx context.Context, payload []byte, nodeID uint64, structureSyncer *service.Syncer) error {
countProcess = countProcess + 1
now := time.Now()
o, err := decoder.DecodeFederationModuleSync([]byte(payload))
if err != nil {
return err
}
if len(o) == 0 {
return nil
}
service.DefaultLogger.Info(fmt.Sprintf("Adding %d objects", len(o)))
for i, em := range o {
n := &types.SharedModule{
NodeID: nodeID,
ExternalFederationModuleID: em.ID,
Fields: em.Fields,
Handle: fmt.Sprintf("Handle %d %d", i, now.Unix()),
Name: fmt.Sprintf("Name %d %d", i, now.Unix()),
}
n, err := structureSyncer.Service.Create(ctx, n)
service.DefaultLogger.Info(fmt.Sprintf("Added shared module: %d", n.ID))
if err != nil {
service.DefaultLogger.Error(fmt.Sprintf("could not create shared module: %s", err))
continue
}
countPersist = countPersist + 1
}
return nil
}

View File

@ -0,0 +1,86 @@
package service
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"time"
"github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/federation/types"
)
type (
Syncer struct {
Service SharedModuleService
CService service.RecordService
}
AuxResponseSet struct {
Response struct {
Filter struct {
NodeID string
ComposeModuleID string
Query string
Limit int
NextPage string
PrevPage string
} `json:"filter"`
} `json:"response"`
}
)
func (h *Syncer) Queue(url types.SyncerURI, out chan types.SyncerURI) {
out <- url
}
func (h *Syncer) Fetch(ctx context.Context, url string) (io.Reader, error) {
client := http.Client{
Timeout: time.Duration(3) * time.Second,
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, errors.New("404")
}
return resp.Body, nil
}
func (h *Syncer) Process(ctx context.Context, payload []byte, nodeID uint64, out chan types.SyncerURI, url types.SyncerURI, handler *Syncer, fn func(ctx context.Context, payload []byte, nodeID uint64, handler *Syncer) error) error {
aux := AuxResponseSet{}
err := json.Unmarshal(payload, &aux)
if err != nil {
return err
}
if aux.Response.Filter.NextPage != "" {
url.NextPage = aux.Response.Filter.NextPage
// out <- url
// sync data
// out <- fmt.Sprintf("%s/federation/exposed/modules/196342359342989002/records/?limit=%d&pageCursor=%s", "http://localhost:8084", aux.Response.Filter.Limit, aux.Response.Filter.NextPage)
}
return fn(ctx, payload, nodeID, handler)
}
func NewSyncer() *Syncer {
return &Syncer{
Service: DefaultSharedModule,
CService: service.DefaultRecord,
}
}

View File

@ -0,0 +1,111 @@
package types
import (
"fmt"
"net/url"
"strconv"
"strings"
"time"
"github.com/davecgh/go-spew/spew"
)
// SyncerURI serves as a default struct for handling url queues
// for fetcher
type (
SyncerURI struct {
Limit int
LastSync *time.Time
// todo
// LastSync *LastSyncTime
Path string
BaseURL string
NextPage string
LastPage string
}
)
// Does the transformation to string, does not
// url encode page cursor, since it needs to be base64
// encoded
func (s *SyncerURI) String() (string, error) {
var query []string
// using query instead of url.Values
// because pageCursor should not be url encoded
query = append(query, fmt.Sprintf("limit=%d", s.Limit))
if s.NextPage != "" {
query = append(query, fmt.Sprintf("pageCursor=%s", s.NextPage))
} else if s.LastPage != "" {
query = append(query, fmt.Sprintf("pageCursor=%s", s.LastPage))
}
if s.LastSync != nil {
query = append(query, fmt.Sprintf("lastSync=%d", s.LastSync.Unix()))
}
return fmt.Sprintf("%s%s?%s", s.BaseURL, s.Path, strings.Join(query[:], "&")), nil
}
// Parse the uri to the struct
func (s *SyncerURI) Parse(uri string) error {
u, err := url.Parse(uri)
if err != nil {
return err
}
var limit int
l := u.Query().Get("limit")
if l != "" {
if limit, err = strconv.Atoi(l); err != nil {
return err
}
}
s.Path = u.Path
s.Limit = limit
if u.Host != "" {
s.BaseURL = fmt.Sprintf("%s://%s", u.Scheme, u.Host)
}
ls := u.Query().Get("lastSync")
if ls != "" {
parsed, err := parseLastSync(ls)
if err != nil {
return err
}
s.LastSync = parsed
}
return nil
}
func parseLastSync(lastSync string) (*time.Time, error) {
spew.Dump("parse last sync")
if i, err := strconv.ParseInt(lastSync, 10, 64); err == nil {
spew.Dump("returning")
t := time.Unix(i, 0)
return &t, nil
}
// try different format if above fails
spew.Dump("TRY HERE")
if t, err := time.Parse(time.RFC3339, lastSync); err == nil {
return &t, nil
}
t, err := time.Parse("2006-01-02", lastSync)
if err != nil {
return nil, err
}
return &t, nil
}

View File

@ -0,0 +1,92 @@
package types
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestSyncerURIString(t *testing.T) {
var (
req = require.New(t)
)
now, _ := time.Parse("2006-01-02 15:04:05", "2020-10-23 11:11:11")
tests := []struct {
name string
url *SyncerURI
expect string
}{
{
name: "next page cursor",
url: &SyncerURI{BaseURL: "https://example.url", NextPage: "NEXT_PAGE_CURSOR=="},
expect: "https://example.url?limit=0&pageCursor=NEXT_PAGE_CURSOR==",
},
{
name: "last page cursor",
url: &SyncerURI{BaseURL: "https://example.url", LastPage: "LAST_PAGE_CURSOR=="},
expect: "https://example.url?limit=0&pageCursor=LAST_PAGE_CURSOR==",
},
{
name: "limit results",
url: &SyncerURI{BaseURL: "https://example.url", Limit: 666},
expect: "https://example.url?limit=666",
},
{
name: "base path",
url: &SyncerURI{Path: "/relative/path"},
expect: "/relative/path?limit=0",
},
{
name: "last sync",
url: &SyncerURI{Path: "/relative/path", LastSync: &now},
expect: "/relative/path?limit=0&lastSync=2020-10-23T11:11:11Z",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r, _ := tt.url.String()
req.Equal(tt.expect, r)
})
}
}
func TestSyncerURIParse(t *testing.T) {
var (
req = require.New(t)
)
// TODO - add a parse last sync date
// now, _ := time.Parse("2006-01-02 15:04:05", "2020-10-23 11:11:11")
tests := []struct {
name string
url string
expect *SyncerURI
}{
{
name: "parse limit",
url: "https://example.url?limit=11",
expect: &SyncerURI{BaseURL: "https://example.url", Limit: 11},
},
{
name: "parse path",
url: "/path/to/endpoint/",
expect: &SyncerURI{Path: "/path/to/endpoint/"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &SyncerURI{}
err := s.Parse(tt.url)
req.Equal(tt.expect, s)
req.NoError(err)
})
}
}