3
0

Refactored cli sync commands, added sync service, mapper, processer, refactored channels

This commit is contained in:
Peter Grlica
2020-10-30 12:33:30 +01:00
parent 104ae577cb
commit b4a9c9d36a
18 changed files with 463 additions and 289 deletions

View File

@@ -13,10 +13,23 @@ import (
)
const (
limit = 100
httpTimeout time.Duration = 10
baseURL string = "http://localhost:8084"
)
var (
sync *service.Sync
mapper *service.Mapper
surls = make(chan service.Surl, 1)
spayloads = make(chan service.Spayload, 1)
countProcess = 0
countPersist = 0
)
type (
serviceInitializer interface {
InitServices(ctx context.Context) error
@@ -58,21 +71,17 @@ func Sync(app serviceInitializer) *cobra.Command {
return cmd
}
func queueUrl(url *types.SyncerURI, urls chan types.SyncerURI, handler *service.Syncer) {
func queueUrl(url *types.SyncerURI, urls chan service.Surl, meta service.Processer) {
s, _ := url.String()
service.DefaultLogger.Info(fmt.Sprintf("Adding %s to queue", s))
handler.Queue(*url, urls)
}
func getLastSyncTime(ctx context.Context, nodeID uint64, syncType string) *time.Time {
ns, _ := service.DefaultNodeSync.LookupLastSuccessfulSync(ctx, nodeID, syncType)
if ns != nil {
return &ns.TimeOfAction
t := service.Surl{
Url: *url,
Meta: meta,
}
return nil
sync.QueueUrl(t, urls)
}
func commandPreRunInitService(ctx context.Context, app serviceInitializer) func(*cobra.Command, []string) error {

View File

@@ -6,77 +6,45 @@ import (
"io/ioutil"
"time"
cs "github.com/cortezaproject/corteza-server/compose/service"
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/decoder"
"github.com/davecgh/go-spew/spew"
"github.com/spf13/cobra"
)
var (
dataSyncer *service.Syncer
type (
dataProcesser struct {
ID uint64
NodeID uint64
ComposeModuleID uint64
ComposeNamespaceID uint64
ModuleMappingValues *ct.RecordValueSet
}
)
func commandSyncData(ctx context.Context) func(*cobra.Command, []string) {
// from
// - module id (Account) = 167296235227578369
// - namespace id = 167296235059806209
// to
// - module id (Account) = 167296265594339329
// - namespace id = 167296264570929153
const syncDelay = 10
return func(_ *cobra.Command, _ []string) {
// todo
// - get all nodes and loop
// - get all shared modules that have a field mapping
// - for each shared module, the add method needs shared module info as payload
// todo - get node by id
const (
nodeID = 276342359342989444
moduleID = 376342359342989444
limit = 5
syncDelay = 10
)
node, err := service.DefaultNode.FindByID(ctx, nodeID)
if err != nil {
service.DefaultLogger.Info("could not find any nodes to sync")
return
}
ticker := time.NewTicker(time.Second * syncDelay)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", node.SharedNodeID, moduleID)
dataSyncer = service.NewSyncer()
// todo - get auth from the node
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// get the last sync per-node
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeData)
ticker := time.NewTicker(time.Second * syncDelay)
ex := ""
if lastSync != nil {
ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339))
}
mapper = &service.Mapper{}
service.DefaultLogger.Info(fmt.Sprintf("Starting structure sync%s", ex))
sync = service.NewSync(
&service.Syncer{},
&service.Mapper{},
service.DefaultSharedModule,
cs.DefaultRecord)
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
go queueUrl(&url, urls, dataSyncer)
queueDataForNodes(ctx, sync)
for {
select {
@@ -85,23 +53,9 @@ func commandSyncData(ctx context.Context) func(*cobra.Command, []string) {
service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist))
return
case <-ticker.C:
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure)
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
if lastSync == nil {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules from beginning of time, since lastSync == nil"))
} else {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules, start with time: %s", lastSync.Format(time.RFC3339)))
}
go queueUrl(&url, urls, dataSyncer)
case url := <-urls:
// do the whole process again
queueDataForNodes(ctx, sync)
case url := <-surls:
select {
case <-ctx.Done():
// do any cleanup here
@@ -110,57 +64,65 @@ func commandSyncData(ctx context.Context) func(*cobra.Command, []string) {
default:
}
s, err := url.String()
s, err := url.Url.String()
if err != nil {
continue
}
responseBody, err := dataSyncer.Fetch(ctx, s)
responseBody, err := sync.FetchUrl(ctx, s)
if err != nil {
continue
}
payloads <- responseBody
case p := <-payloads:
body, err := ioutil.ReadAll(p)
spayload := service.Spayload{
Payload: responseBody,
Meta: url.Meta,
}
spayloads <- spayload
// payloads <- responseBody
case p := <-spayloads:
body, err := ioutil.ReadAll(p.Payload)
// handle error
if err != nil {
continue
}
basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", p.Meta.(*dataProcesser).NodeID, p.Meta.(*dataProcesser).ID)
u := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
}
// method needs:
// - compose module ID (from module_mapping)
// - namespace ID (from module mapping)
err = dataSyncer.Process(ctx, body, node.ID, urls, u, dataSyncer, persistExposedRecordData)
err = sync.ProcessPayload(ctx, body, surls, u, p.Meta.(*dataProcesser))
if err != nil {
// handle error
service.DefaultLogger.Error(fmt.Sprintf("error on handling payload: %s", err))
service.DefaultLogger.Info(fmt.Sprintf("error on handling payload: %s", err))
} else {
n := time.Now().UTC()
n, err := service.DefaultNode.FindBySharedNodeID(ctx, p.Meta.(*dataProcesser).NodeID)
if err != nil {
service.DefaultLogger.Info(fmt.Sprintf("could not update sync status: %s", err))
}
// add to db - nodes_sync
new := &types.NodeSync{
NodeID: node.ID,
NodeID: (*n).ID,
SyncStatus: types.NodeSyncStatusSuccess,
SyncType: types.NodeSyncTypeStructure,
TimeOfAction: n,
SyncType: types.NodeSyncTypeData,
TimeOfAction: time.Now().UTC(),
}
new, err := service.DefaultNodeSync.Create(ctx, new)
new, err = service.DefaultNodeSync.Create(ctx, new)
if err != nil {
service.DefaultLogger.Warn(fmt.Sprintf("could not update sync status: %s", err))
service.DefaultLogger.Info(fmt.Sprintf("could not update sync status: %s", err))
}
}
}
@@ -168,13 +130,75 @@ func commandSyncData(ctx context.Context) func(*cobra.Command, []string) {
}
}
// persistExposedRecordData gets the payload from syncer and
func queueDataForNodes(ctx context.Context, sync *service.Sync) {
nodes, err := sync.GetPairedNodes(ctx)
if err != nil {
return
}
// get all shared modules and their module mappings
for _, n := range nodes {
set, err := sync.GetSharedModules(ctx, n.ID)
if err != nil {
service.DefaultLogger.Warn(fmt.Sprintf("could not get shared modules for node: %d, skipping", n.ID))
continue
}
// go through set and prepare module mappings for it
for _, sm := range set {
mappings, _ := sync.GetModuleMappings(ctx, sm.ID)
if mappings == nil {
service.DefaultLogger.Info(fmt.Sprintf("could not prepare module mappings for shared module: %d, skipping", sm.ID))
continue
}
mappingValues, err := sync.PrepareModuleMappings(ctx, mappings)
if err != nil || mappingValues == nil {
service.DefaultLogger.Info(fmt.Sprintf("could not prepare module mappings for shared module: %d, skipping", sm.ID))
continue
}
// get the last sync per-node
lastSync, _ := sync.GetLastSyncTime(ctx, n.ID, types.NodeSyncTypeData)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", n.SharedNodeID, sm.ExternalFederationModuleID)
ex := ""
if lastSync != nil {
ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339))
}
service.DefaultLogger.Info(fmt.Sprintf("starting structure sync%s for node %d, module %d", ex, n.ID, sm.ID))
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
processer := &dataProcesser{
ID: sm.ExternalFederationModuleID,
NodeID: n.SharedNodeID,
ComposeModuleID: mappings.ComposeModuleID,
ComposeNamespaceID: mappings.ComposeNamespaceID,
ModuleMappingValues: &mappingValues,
}
go queueUrl(&url, surls, processer)
}
}
}
// Process gets the payload from syncer and
// uses the decode package to decode the whole set, depending on
// the filtering that was used (limit)
func persistExposedRecordData(ctx context.Context, payload []byte, nodeID uint64, dataSyncer *service.Syncer) error {
func (dp *dataProcesser) Process(ctx context.Context, payload []byte) error {
countProcess = countProcess + 1
// now := time.Now()
o, err := decoder.DecodeFederationRecordSync([]byte(payload))
if err != nil {
@@ -188,58 +212,15 @@ func persistExposedRecordData(ctx context.Context, payload []byte, nodeID uint64
service.DefaultLogger.Info(fmt.Sprintf("Adding %d objects", len(o)))
for _, er := range o {
// spew.Dump(o)
// create a new compose record
// get the compose module for this record
// fill in the values
// tmp
vals := []*ct.RecordValue{
&ct.RecordValue{
Name: "AccountName",
Value: "Accout name is required",
},
}
for _, v := range er.Values {
if v.Name == "AccountName" {
vv := &ct.RecordValue{
Name: "Name",
Value: v.Value,
}
vals = append(vals, vv)
}
if v.Name == "Facebook" {
vv := &ct.RecordValue{
Name: "Facebook",
Value: v.Value,
}
vals = append(vals, vv)
}
if v.Name == "Phone" {
vv := &ct.RecordValue{
Name: "Phone",
Value: v.Value,
}
vals = append(vals, vv)
}
}
// - module id (Account) = 167296265594339329
// - namespace id = 167296264570929153
mapper.Merge(&er.Values, dp.ModuleMappingValues)
rec := &ct.Record{
ModuleID: 167296265594339329,
NamespaceID: 167296264570929153,
Values: vals,
ModuleID: dp.ComposeModuleID,
NamespaceID: dp.ComposeNamespaceID,
Values: *dp.ModuleMappingValues,
}
rc, err := dataSyncer.CService.With(ctx).Create(rec)
spew.Dump(rc, err)
sync.CreateRecord(ctx, rec)
}
return nil

View File

@@ -3,69 +3,41 @@ package commands
import (
"context"
"fmt"
"io"
"io/ioutil"
"time"
cs "github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/decoder"
"github.com/spf13/cobra"
)
var (
urls = make(chan types.SyncerURI, 1)
payloads = make(chan io.Reader, 1)
countProcess = 0
countPersist = 0
structureSyncer *service.Syncer
type (
structureProcesser struct {
NodeID uint64
}
)
func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) {
const syncDelay = 10
return func(_ *cobra.Command, _ []string) {
// todo - get node by id
const (
nodeID = 276342359342989444
limit = 5
syncDelay = 10
)
node, err := service.DefaultNode.FindByID(ctx, nodeID)
if err != nil {
service.DefaultLogger.Info("could not find any nodes to sync")
return
}
ticker := time.NewTicker(time.Second * syncDelay)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", node.SharedNodeID)
structureSyncer = service.NewSyncer()
// todo - get auth from the node
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// get the last sync per-node
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure)
ticker := time.NewTicker(time.Second * syncDelay)
ex := ""
if lastSync != nil {
ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339))
}
sync = service.NewSync(
&service.Syncer{},
&service.Mapper{},
service.DefaultSharedModule,
cs.DefaultRecord)
service.DefaultLogger.Info(fmt.Sprintf("Starting structure sync%s", ex))
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
go queueUrl(&url, urls, structureSyncer)
queueStructureForNodes(ctx, sync)
for {
select {
@@ -74,23 +46,9 @@ func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) {
service.DefaultLogger.Info(fmt.Sprintf("Stopping sync [processed: %d, persisted: %d]", countProcess, countPersist))
return
case <-ticker.C:
lastSync := getLastSyncTime(ctx, nodeID, types.NodeSyncTypeStructure)
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
if lastSync == nil {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules from beginning of time, since lastSync == nil"))
} else {
service.DefaultLogger.Info(fmt.Sprintf("Start fetching modules, start with time: %s", lastSync.Format(time.RFC3339)))
}
go queueUrl(&url, urls, structureSyncer)
case url := <-urls:
// do the whole process again
queueStructureForNodes(ctx, sync)
case url := <-surls:
select {
case <-ctx.Done():
// do any cleanup here
@@ -99,53 +57,64 @@ func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) {
default:
}
s, err := url.String()
s, err := url.Url.String()
if err != nil {
continue
}
responseBody, err := structureSyncer.Fetch(ctx, s)
responseBody, err := sync.FetchUrl(ctx, s)
if err != nil {
continue
}
payloads <- responseBody
case p := <-payloads:
body, err := ioutil.ReadAll(p)
spayload := service.Spayload{
Payload: responseBody,
Meta: url.Meta,
}
spayloads <- spayload
case p := <-spayloads:
body, err := ioutil.ReadAll(p.Payload)
// handle error
if err != nil {
continue
}
basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", p.Meta.(*structureProcesser).NodeID)
u := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
}
err = structureSyncer.Process(ctx, body, node.ID, urls, u, structureSyncer, persistExposedModuleStructure)
err = sync.ProcessPayload(ctx, body, surls, u, p.Meta.(*structureProcesser))
if err != nil {
// handle error
service.DefaultLogger.Error(fmt.Sprintf("error on handling payload: %s", err))
service.DefaultLogger.Info(fmt.Sprintf("error on handling payload: %s", err))
} else {
n := time.Now().UTC()
n, err := service.DefaultNode.FindBySharedNodeID(ctx, p.Meta.(*structureProcesser).NodeID)
if err != nil {
service.DefaultLogger.Info(fmt.Sprintf("could not update sync status: %s", err))
}
// add to db - nodes_sync
new := &types.NodeSync{
NodeID: node.ID,
NodeID: (*n).ID,
SyncStatus: types.NodeSyncStatusSuccess,
SyncType: types.NodeSyncTypeStructure,
TimeOfAction: n,
TimeOfAction: time.Now().UTC(),
}
new, err := service.DefaultNodeSync.Create(ctx, new)
new, err = service.DefaultNodeSync.Create(ctx, new)
if err != nil {
service.DefaultLogger.Warn(fmt.Sprintf("could not update sync status: %s", err))
service.DefaultLogger.Info(fmt.Sprintf("could not update sync status: %s", err))
}
}
}
@@ -153,10 +122,45 @@ func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) {
}
}
// persistExposedModuleStructure gets the payload from syncer and
func queueStructureForNodes(ctx context.Context, sync *service.Sync) {
nodes, err := sync.GetPairedNodes(ctx)
if err != nil {
return
}
// get all shared modules and their module mappings
for _, n := range nodes {
// get the last sync per-node
lastSync, _ := sync.GetLastSyncTime(ctx, n.ID, types.NodeSyncTypeStructure)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", n.SharedNodeID)
ex := ""
if lastSync != nil {
ex = fmt.Sprintf(" from last sync on (%s)", lastSync.Format(time.RFC3339))
}
service.DefaultLogger.Info(fmt.Sprintf("starting structure sync%s for node %d", ex, n.ID))
url := types.SyncerURI{
BaseURL: baseURL,
Path: basePath,
Limit: limit,
LastSync: lastSync,
}
processer := &structureProcesser{
NodeID: n.SharedNodeID,
}
go queueUrl(&url, surls, processer)
}
}
// Process gets the payload from syncer and
// uses the decode package to decode the whole set, depending on
// the filtering that was used (limit)
func persistExposedModuleStructure(ctx context.Context, payload []byte, nodeID uint64, structureSyncer *service.Syncer) error {
func (dp *structureProcesser) Process(ctx context.Context, payload []byte) error {
countProcess = countProcess + 1
now := time.Now()
@@ -174,14 +178,14 @@ func persistExposedModuleStructure(ctx context.Context, payload []byte, nodeID u
for i, em := range o {
n := &types.SharedModule{
NodeID: nodeID,
NodeID: dp.NodeID,
ExternalFederationModuleID: em.ID,
Fields: em.Fields,
Handle: fmt.Sprintf("Handle %d %d", i, now.Unix()),
Name: fmt.Sprintf("Name %d %d", i, now.Unix()),
}
n, err := structureSyncer.Service.Create(ctx, n)
n, err := sync.CreateSharedModule(ctx, n)
service.DefaultLogger.Info(fmt.Sprintf("Added shared module: %d", n.ID))

View File

@@ -35,11 +35,11 @@ func (ctrl ManageStructure) CreateExposed(ctx context.Context, r *request.Manage
return nil, errors.New("TODO - http 400 bad request - use compose namespace id in request")
}
return (service.ExposedModule()).Create(ctx, mod)
return (service.DefaultExposedModule).Create(ctx, mod)
}
func (ctrl ManageStructure) ReadExposed(ctx context.Context, r *request.ManageStructureReadExposed) (interface{}, error) {
return (service.ExposedModule()).FindByID(ctx, r.GetNodeID(), r.GetModuleID())
return (service.DefaultExposedModule).FindByID(ctx, r.GetNodeID(), r.GetModuleID())
}
func (ctrl ManageStructure) UpdateExposed(ctx context.Context, r *request.ManageStructureUpdateExposed) (interface{}, error) {
@@ -52,15 +52,15 @@ func (ctrl ManageStructure) UpdateExposed(ctx context.Context, r *request.Manage
Fields: r.Fields,
}
)
return (service.ExposedModule()).Update(ctx, em)
return (service.DefaultExposedModule).Update(ctx, em)
}
func (ctrl ManageStructure) RemoveExposed(ctx context.Context, r *request.ManageStructureRemoveExposed) (interface{}, error) {
return (service.ExposedModule()).DeleteByID(ctx, r.NodeID, r.ModuleID)
return (service.DefaultExposedModule).DeleteByID(ctx, r.NodeID, r.ModuleID)
}
func (ctrl ManageStructure) ReadShared(ctx context.Context, r *request.ManageStructureReadShared) (interface{}, error) {
return (service.SharedModule()).FindByID(ctx, r.GetNodeID(), r.GetModuleID())
return (service.DefaultSharedModule).FindByID(ctx, r.GetNodeID(), r.GetModuleID())
}
func (ctrl ManageStructure) CreateMappings(ctx context.Context, r *request.ManageStructureCreateMappings) (interface{}, error) {
@@ -71,11 +71,11 @@ func (ctrl ManageStructure) CreateMappings(ctx context.Context, r *request.Manag
FieldMapping: r.Fields,
}
return (service.ModuleMapping()).Create(ctx, mm)
return (service.DefaultModuleMapping).Create(ctx, mm)
}
func (ctrl ManageStructure) ReadMappings(ctx context.Context, r *request.ManageStructureReadMappings) (interface{}, error) {
return (service.ModuleMapping()).FindByID(ctx, r.ModuleID)
return (service.DefaultModuleMapping).FindByID(ctx, r.ModuleID)
}
func (ctrl ManageStructure) ListAll(ctx context.Context, r *request.ManageStructureListAll) (interface{}, error) {
@@ -86,12 +86,12 @@ func (ctrl ManageStructure) ListAll(ctx context.Context, r *request.ManageStruct
switch true {
case r.Exposed:
list, _, err = (service.ExposedModule()).Find(ctx, types.ExposedModuleFilter{
list, _, err = (service.DefaultExposedModule).Find(ctx, types.ExposedModuleFilter{
NodeID: r.NodeID,
})
break
case r.Shared:
list, _, err = (service.SharedModule()).Find(ctx, types.SharedModuleFilter{
list, _, err = (service.DefaultSharedModule).Find(ctx, types.SharedModuleFilter{
NodeID: r.NodeID,
})
break

View File

@@ -106,23 +106,22 @@ func (ctrl SyncData) ReadExposedAll(ctx context.Context, r *request.SyncDataRead
func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExposed) (interface{}, error) {
var (
err error
node *types.Node
f = ct.RecordFilter{}
em *types.ExposedModule
err error
em *types.ExposedModule
)
if node, err = service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil {
if _, err := service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil {
return nil, err
}
// use the fetched node
if em, err = (service.ExposedModule()).FindByID(ctx, node.ID, r.ModuleID); err != nil {
if em, err = service.DefaultExposedModule.FindByID(ctx, r.NodeID, r.ModuleID); err != nil {
return nil, err
}
f.ModuleID = em.ComposeModuleID
f.Query = r.Query
f := ct.RecordFilter{
ModuleID: em.ComposeModuleID,
Query: buildLastSyncQuery(r.LastSync),
}
if f.Paging, err = filter.NewPaging(r.Limit, r.PageCursor); err != nil {
return nil, err
@@ -132,8 +131,6 @@ func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExp
return nil, err
}
f.Query = buildLastSyncQuery(r.LastSync)
list, f, err := (cs.Record()).Find(f)
if err != nil {

View File

@@ -0,0 +1,42 @@
package service
import (
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/types"
)
type (
Mapper struct{}
)
// Merge copies the values from originating structure
// do the destination
//
// mostly, there will be less mapped fields on the destination
// side, so start looping from here
func (m *Mapper) Merge(in *ct.RecordValueSet, out *ct.RecordValueSet) {
for _, destVal := range *out {
for _, origVal := range *in {
if destVal.Name == origVal.Name {
destVal.Value = origVal.Value
break
}
}
}
return
}
// Prepare creates a set of Records to be used later
// when the fields will be mapped via Merge()
func (m *Mapper) Prepare(mappings types.ModuleFieldMappingSet) (out ct.RecordValueSet) {
for _, mm := range mappings {
rv := &ct.RecordValue{
Name: mm.Destination.Name,
Value: "",
}
out = append(out, rv)
}
return
}

View File

@@ -11,9 +11,9 @@ import (
type (
moduleMapping struct {
ctx context.Context
store store.Storer
compose composeService.ModuleService
module composeService.ModuleService
namespace composeService.NamespaceService
actionlog actionlog.Recorder
}
@@ -30,10 +30,10 @@ type (
func ModuleMapping() ModuleMappingService {
return &moduleMapping{
ctx: context.Background(),
store: DefaultStore,
compose: composeService.Module(),
actionlog: DefaultActionlog,
module: composeService.DefaultModule,
namespace: composeService.DefaultNamespace,
}
}
@@ -103,18 +103,20 @@ func (svc moduleMapping) Create(ctx context.Context, new *types.ModuleMapping) (
aProps = &moduleMappingActionProps{created: new}
)
// check if compose module actually exists
// TODO - how do we handle namespace?
// if _, err := svc.compose.With(ctx).FindByID(r.NamespaceID, new.ComposeModuleID); err == nil {
// return nil, ExposedModuleErrComposeModuleNotFound()
// }
err := store.Tx(ctx, svc.store, func(ctx context.Context, s store.Storer) (err error) {
// TODO
// if !svc.ac.CanCreateFederationExposedModule(ctx, ns) {
// return ExposedModuleErrNotAllowedToCreate()
// }
if _, err := svc.namespace.With(ctx).FindByID(new.ComposeNamespaceID); err != nil {
return ModuleMappingErrComposeNamespaceNotFound()
}
if _, err := svc.module.With(ctx).FindByID(new.ComposeNamespaceID, new.ComposeModuleID); err != nil {
return ModuleMappingErrComposeModuleNotFound()
}
// Check for federation module - compose.Module combo
if err = svc.uniqueCheck(ctx, new); err != nil {
return err

View File

@@ -549,6 +549,36 @@ func ModuleMappingErrComposeModuleNotFound(props ...*moduleMappingActionProps) *
}
// ModuleMappingErrComposeNamespaceNotFound returns "federation:module_mapping.composeNamespaceNotFound" audit event as actionlog.Warning
//
//
// This function is auto-generated.
//
func ModuleMappingErrComposeNamespaceNotFound(props ...*moduleMappingActionProps) *moduleMappingError {
var e = &moduleMappingError{
timestamp: time.Now(),
resource: "federation:module_mapping",
error: "composeNamespaceNotFound",
action: "error",
message: "compose namespace not found",
log: "compose namespace not found",
severity: actionlog.Warning,
props: func() *moduleMappingActionProps {
if len(props) > 0 {
return props[0]
}
return nil
}(),
}
if len(props) > 0 {
e.props = props[0]
}
return e
}
// ModuleMappingErrFederationModuleNotFound returns "federation:module_mapping.federationModuleNotFound" audit event as actionlog.Warning
//
//

View File

@@ -53,6 +53,10 @@ errors:
message: "compose module not found"
severity: "warning"
- error: composeNamespaceNotFound
message: "compose namespace not found"
severity: "warning"
- error: federationModuleNotFound
message: "federation module not found"
severity: "warning"

View File

@@ -0,0 +1,9 @@
package service
import "context"
type (
Processer interface {
Process(ctx context.Context, payload []byte) error
}
)

View File

@@ -44,6 +44,7 @@ var (
DefaultNodeSync NodeSyncService
DefaultExposedModule ExposedModuleService
DefaultSharedModule SharedModuleService
DefaultModuleMapping ModuleMappingService
// wrapper around time.Now() that will aid service testing
now = func() *time.Time {
@@ -123,6 +124,7 @@ func Initialize(ctx context.Context, log *zap.Logger, s store.Storer, c Config)
DefaultNodeSync = NodeSync()
DefaultExposedModule = ExposedModule()
DefaultSharedModule = SharedModule()
DefaultModuleMapping = ModuleMapping()
return
}

View File

@@ -0,0 +1,92 @@
package service
import (
"context"
"io"
"time"
cs "github.com/cortezaproject/corteza-server/compose/service"
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/types"
)
type (
Sync struct {
syncer *Syncer
mapper *Mapper
sharedModuleService SharedModuleService
composeRecordService cs.RecordService
}
)
func NewSync(s *Syncer, m *Mapper, sm SharedModuleService, cs cs.RecordService) *Sync {
return &Sync{
syncer: s,
mapper: m,
sharedModuleService: sm,
composeRecordService: cs,
}
}
func (s *Sync) ProcessPayload(ctx context.Context, payload []byte, out chan Surl, url types.SyncerURI, processer Processer) error {
return s.syncer.Process(ctx, payload, out, url, processer)
}
func (s *Sync) QueueUrl(url Surl, out chan Surl) {
s.syncer.Queue(url, out)
}
func (s *Sync) FetchUrl(ctx context.Context, url string) (io.Reader, error) {
return s.syncer.Fetch(ctx, url)
}
func (s *Sync) CreateRecord(ctx context.Context, rec *ct.Record) (*ct.Record, error) {
return s.composeRecordService.With(ctx).Create(rec)
}
func (s *Sync) CreateSharedModule(ctx context.Context, new *types.SharedModule) (*types.SharedModule, error) {
return s.sharedModuleService.Create(ctx, new)
}
func (s *Sync) GetPairedNodes(ctx context.Context) (types.NodeSet, error) {
set, _, err := DefaultNode.Search(ctx, types.NodeFilter{Status: types.NodeStatusPaired})
if err != nil {
return nil, err
}
return set, nil
}
func (s *Sync) GetSharedModules(ctx context.Context, nodeID uint64) (types.SharedModuleSet, error) {
set, _, err := DefaultSharedModule.Find(ctx, types.SharedModuleFilter{NodeID: nodeID})
if err != nil {
return nil, err
}
set, _ = set.Filter(func(sm *types.SharedModule) (bool, error) {
return len(sm.Fields) > 0, nil
})
return set, nil
}
func (s *Sync) GetModuleMappings(ctx context.Context, moduleID uint64) (out *types.ModuleMapping, err error) {
out, err = DefaultModuleMapping.FindByID(ctx, moduleID)
return
}
func (s *Sync) PrepareModuleMappings(ctx context.Context, mappings *types.ModuleMapping) (ct.RecordValueSet, error) {
return s.mapper.Prepare((*mappings).FieldMapping), nil
}
func (s *Sync) GetLastSyncTime(ctx context.Context, nodeID uint64, syncType string) (*time.Time, error) {
ns, err := DefaultNodeSync.LookupLastSuccessfulSync(ctx, nodeID, syncType)
if err != nil || ns == nil {
return nil, err
}
return &ns.TimeOfAction, nil
}

View File

@@ -8,14 +8,19 @@ import (
"net/http"
"time"
"github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/federation/types"
)
type (
Syncer struct {
Service SharedModuleService
CService service.RecordService
Syncer struct{}
Surl struct {
Url types.SyncerURI
Meta Processer
}
Spayload struct {
Payload io.Reader
Meta Processer
}
AuxResponseSet struct {
@@ -32,7 +37,7 @@ type (
}
)
func (h *Syncer) Queue(url types.SyncerURI, out chan types.SyncerURI) {
func (h *Syncer) Queue(url Surl, out chan Surl) {
out <- url
}
@@ -58,9 +63,8 @@ func (h *Syncer) Fetch(ctx context.Context, url string) (io.Reader, error) {
return resp.Body, nil
}
func (h *Syncer) Process(ctx context.Context, payload []byte, nodeID uint64, out chan types.SyncerURI, url types.SyncerURI, handler *Syncer, fn func(ctx context.Context, payload []byte, nodeID uint64, handler *Syncer) error) error {
aux := AuxResponseSet{}
err := json.Unmarshal(payload, &aux)
func (h *Syncer) Process(ctx context.Context, payload []byte, out chan Surl, url types.SyncerURI, processer Processer) error {
aux, err := h.ParseHeader(ctx, payload)
if err != nil {
return err
@@ -69,18 +73,16 @@ func (h *Syncer) Process(ctx context.Context, payload []byte, nodeID uint64, out
if aux.Response.Filter.NextPage != "" {
url.NextPage = aux.Response.Filter.NextPage
// out <- url
// sync data
// out <- fmt.Sprintf("%s/federation/exposed/modules/196342359342989002/records/?limit=%d&pageCursor=%s", "http://localhost:8084", aux.Response.Filter.Limit, aux.Response.Filter.NextPage)
out <- Surl{
Url: url,
Meta: processer,
}
}
return fn(ctx, payload, nodeID, handler)
return processer.Process(ctx, payload)
}
func NewSyncer() *Syncer {
return &Syncer{
Service: DefaultSharedModule,
CService: service.DefaultRecord,
}
func (h *Syncer) ParseHeader(ctx context.Context, payload []byte) (aux AuxResponseSet, err error) {
err = json.Unmarshal(payload, &aux)
return
}

View File

@@ -11,7 +11,7 @@ import (
type (
ExposedRecord struct {
ID uint64 `json:"recordID,string"`
Values []*types.RecordValue `json:"values"`
Values types.RecordValueSet `json:"values"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt *time.Time `json:"updatedAt,omitempty"`

View File

@@ -32,7 +32,7 @@ rdbms:
table: federation_nodes_sync
customFilterConverter: true
mapFields:
NodeID: { column: node_id }
NodeID: { column: rel_node }
SyncType: { column: sync_type }
SyncStatus: { column: sync_status }
TimeOfAction: { column: time_action }

View File

@@ -107,8 +107,8 @@ func (s Store) fetchFullPageOfFederationNodesSyncs(
)
// Make sure we always end our sort by primary keys
if sort.Get("node_id") == nil {
sort = append(sort, &filter.SortExpr{Column: "node_id"})
if sort.Get("rel_node") == nil {
sort = append(sort, &filter.SortExpr{Column: "rel_node"})
}
// Apply sorting expr from filter to query
@@ -223,7 +223,7 @@ func (s Store) QueryFederationNodesSyncs(
// It returns sync activity
func (s Store) LookupFederationNodesSyncByNodeID(ctx context.Context, node_id uint64) (*types.NodeSync, error) {
return s.execLookupFederationNodesSync(ctx, squirrel.Eq{
s.preprocessColumn("fdns.node_id", ""): store.PreprocessValue(node_id, ""),
s.preprocessColumn("fdns.rel_node", ""): store.PreprocessValue(node_id, ""),
})
}
@@ -232,7 +232,7 @@ func (s Store) LookupFederationNodesSyncByNodeID(ctx context.Context, node_id ui
// It returns sync activity
func (s Store) LookupFederationNodesSyncByNodeIDSyncTypeSyncStatus(ctx context.Context, node_id uint64, sync_type string, sync_status string) (*types.NodeSync, error) {
return s.execLookupFederationNodesSync(ctx, squirrel.Eq{
s.preprocessColumn("fdns.node_id", ""): store.PreprocessValue(node_id, ""),
s.preprocessColumn("fdns.rel_node", ""): store.PreprocessValue(node_id, ""),
s.preprocessColumn("fdns.sync_type", ""): store.PreprocessValue(sync_type, ""),
s.preprocessColumn("fdns.sync_status", ""): store.PreprocessValue(sync_status, ""),
})
@@ -271,9 +271,9 @@ func (s Store) partialFederationNodesSyncUpdate(ctx context.Context, onlyColumns
err = s.execUpdateFederationNodesSyncs(
ctx,
squirrel.Eq{
s.preprocessColumn("fdns.node_id", ""): store.PreprocessValue(res.NodeID, ""),
s.preprocessColumn("fdns.rel_node", ""): store.PreprocessValue(res.NodeID, ""),
},
s.internalFederationNodesSyncEncoder(res).Skip("node_id").Only(onlyColumns...))
s.internalFederationNodesSyncEncoder(res).Skip("rel_node").Only(onlyColumns...))
if err != nil {
return s.config.ErrorHandler(err)
}
@@ -304,7 +304,7 @@ func (s Store) DeleteFederationNodesSync(ctx context.Context, rr ...*types.NodeS
for _, res := range rr {
err = s.execDeleteFederationNodesSyncs(ctx, squirrel.Eq{
s.preprocessColumn("fdns.node_id", ""): store.PreprocessValue(res.NodeID, ""),
s.preprocessColumn("fdns.rel_node", ""): store.PreprocessValue(res.NodeID, ""),
})
if err != nil {
return s.config.ErrorHandler(err)
@@ -317,7 +317,7 @@ func (s Store) DeleteFederationNodesSync(ctx context.Context, rr ...*types.NodeS
// DeleteFederationNodesSyncByNodeID Deletes row from the federation_nodes_sync table
func (s Store) DeleteFederationNodesSyncByNodeID(ctx context.Context, nodeID uint64) error {
return s.execDeleteFederationNodesSyncs(ctx, squirrel.Eq{
s.preprocessColumn("fdns.node_id", ""): store.PreprocessValue(nodeID, ""),
s.preprocessColumn("fdns.rel_node", ""): store.PreprocessValue(nodeID, ""),
})
}
@@ -362,7 +362,7 @@ func (s Store) execUpsertFederationNodesSyncs(ctx context.Context, set store.Pay
s.config,
s.federationNodesSyncTable(),
set,
"node_id",
"rel_node",
)
if err != nil {
@@ -428,7 +428,7 @@ func (Store) federationNodesSyncColumns(aa ...string) []string {
}
return []string{
alias + "node_id",
alias + "rel_node",
alias + "sync_type",
alias + "sync_status",
alias + "time_action",
@@ -442,7 +442,7 @@ func (Store) federationNodesSyncColumns(aa ...string) []string {
// With optional string arg, all columns are returned aliased
func (Store) sortableFederationNodesSyncColumns() []string {
return []string{
"node_id",
"rel_node",
"time_action",
}
}
@@ -453,7 +453,7 @@ func (Store) sortableFederationNodesSyncColumns() []string {
// func when rdbms.customEncoder=true
func (s Store) internalFederationNodesSyncEncoder(res *types.NodeSync) store.Payload {
return store.Payload{
"node_id": res.NodeID,
"rel_node": res.NodeID,
"sync_type": res.SyncType,
"sync_status": res.SyncStatus,
"time_action": res.TimeOfAction,
@@ -477,15 +477,15 @@ func (s Store) collectFederationNodesSyncCursorValues(res *types.NodeSync, cc ..
// All known primary key columns
pkNode_id bool
pkRel_node bool
collect = func(cc ...string) {
for _, c := range cc {
switch c {
case "node_id":
case "rel_node":
cursor.Set(c, res.NodeID, false)
pkNode_id = true
pkRel_node = true
case "time_action":
cursor.Set(c, res.TimeOfAction, false)
@@ -495,8 +495,8 @@ func (s Store) collectFederationNodesSyncCursorValues(res *types.NodeSync, cc ..
)
collect(cc...)
if !hasUnique || !(pkNode_id && true) {
collect("node_id")
if !hasUnique || !(pkRel_node && true) {
collect("rel_node")
}
return cursor

View File

@@ -13,11 +13,11 @@ func (s Store) convertFederationNodesSyncFilter(f types.NodeSyncFilter) (query s
}
if f.SyncStatus != "" {
query = query.Where("fdns.rel_compose_module = ?", f.SyncStatus)
query = query.Where("fdns.sync_status = ?", f.SyncStatus)
}
if f.SyncType != "" {
query = query.Where("fdns.rel_compose_namespace = ?", f.SyncType)
query = query.Where("fdns.sync_type = ?", f.SyncType)
}
return

View File

@@ -546,7 +546,7 @@ func (Schema) FederationNodes() *Table {
func (Schema) FederationNodesSync() *Table {
return TableDef("federation_nodes_sync",
ColumnDef("node_id", ColumnTypeIdentifier),
ColumnDef("rel_node", ColumnTypeIdentifier),
ColumnDef("sync_type", ColumnTypeText),
ColumnDef("sync_status", ColumnTypeText),
ColumnDef("time_action", ColumnTypeTimestamp),