diff --git a/federation/commands/sync_data.go b/federation/commands/sync_data.go index e513ea1fa..2151c51d8 100644 --- a/federation/commands/sync_data.go +++ b/federation/commands/sync_data.go @@ -6,6 +6,7 @@ import ( cs "github.com/cortezaproject/corteza-server/compose/service" "github.com/cortezaproject/corteza-server/federation/service" + ss "github.com/cortezaproject/corteza-server/system/service" "github.com/spf13/cobra" ) @@ -15,7 +16,9 @@ func commandSyncData(ctx context.Context) func(*cobra.Command, []string) { &service.Syncer{}, &service.Mapper{}, service.DefaultSharedModule, - cs.DefaultRecord) + cs.DefaultRecord, + ss.DefaultUser, + ss.DefaultRole) syncData := service.WorkerData(syncService, service.DefaultLogger) syncData.Watch(ctx, time.Second*30, 50) diff --git a/federation/commands/sync_structure.go b/federation/commands/sync_structure.go index 20c5e69ca..2cd309663 100644 --- a/federation/commands/sync_structure.go +++ b/federation/commands/sync_structure.go @@ -6,6 +6,7 @@ import ( cs "github.com/cortezaproject/corteza-server/compose/service" "github.com/cortezaproject/corteza-server/federation/service" + ss "github.com/cortezaproject/corteza-server/system/service" "github.com/spf13/cobra" ) @@ -15,9 +16,11 @@ func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) { &service.Syncer{}, &service.Mapper{}, service.DefaultSharedModule, - cs.DefaultRecord) + cs.DefaultRecord, + ss.DefaultUser, + ss.DefaultRole) syncStructure := service.WorkerStructure(syncService, service.DefaultLogger) - syncStructure.Watch(ctx, time.Second*10, 10) + syncStructure.Watch(ctx, time.Second*service.DefaultOptions.StructureMonitorInterval, service.DefaultOptions.StructurePageSize) } } diff --git a/federation/rest/sync_data.go b/federation/rest/sync_data.go index 2181708bc..b0b6d0b3e 100644 --- a/federation/rest/sync_data.go +++ b/federation/rest/sync_data.go @@ -3,6 +3,7 @@ package rest import ( "context" "fmt" + "strings" "time" cs "github.com/cortezaproject/corteza-server/compose/service" @@ -11,6 +12,8 @@ import ( "github.com/cortezaproject/corteza-server/federation/service" "github.com/cortezaproject/corteza-server/federation/types" "github.com/cortezaproject/corteza-server/pkg/filter" + ss "github.com/cortezaproject/corteza-server/system/service" + st "github.com/cortezaproject/corteza-server/system/types" ) type ( @@ -106,11 +109,12 @@ func (ctrl SyncData) ReadExposedAll(ctx context.Context, r *request.SyncDataRead func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExposed) (interface{}, error) { var ( - err error - em *types.ExposedModule + err error + em *types.ExposedModule + users st.UserSet ) - if _, err := service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil { + if _, err = service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil { return nil, err } @@ -118,9 +122,18 @@ func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExp return nil, err } + if users, _, err = ss.DefaultUser.With(ctx).Find(st.UserFilter{}); err != nil { + return nil, err + } + + users, _ = users.Filter(func(u *st.User) (bool, error) { + return strings.Contains(u.Handle, "federation_"), nil + }) + f := ct.RecordFilter{ ModuleID: em.ComposeModuleID, Query: buildLastSyncQuery(r.LastSync), + Check: ignoreFederated(users), } if f.Paging, err = filter.NewPaging(r.Limit, r.PageCursor); err != nil { @@ -138,13 +151,7 @@ func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExp } // do the actual field filtering - err = list.Walk(func(r *ct.Record) error { - r.Values, err = r.Values.Filter(func(rv *ct.RecordValue) (bool, error) { - return em.Fields.HasField(rv.Name) - }) - - return err - }) + err = list.Walk(filterExposedFields(em)) if err != nil { return nil, err @@ -172,3 +179,33 @@ func buildLastSyncQuery(ts uint64) string { t.UTC().Format(time.RFC3339), t.UTC().Format(time.RFC3339)) } + +// ignoreFederated matches the created user id with any +// of the generated federated users and omits it +func ignoreFederated(users st.UserSet) func(r *ct.Record) (bool, error) { + return func(r *ct.Record) (bool, error) { + var keep = true + users.Walk(func(u *st.User) error { + if u.ID == r.CreatedBy { + keep = false + return nil + } + return nil + }) + + return keep, nil + } +} + +// filterExposedFields omits the fields that are not exposed as defined +// in the exposed module definition in store +func filterExposedFields(em *types.ExposedModule) func(r *ct.Record) error { + return func(r *ct.Record) error { + var err error + r.Values, err = r.Values.Filter(func(rv *ct.RecordValue) (bool, error) { + return em.Fields.HasField(rv.Name) + }) + + return err + } +} diff --git a/federation/service/exposed_module.go b/federation/service/exposed_module.go index 4a3034991..3da63c191 100644 --- a/federation/service/exposed_module.go +++ b/federation/service/exposed_module.go @@ -8,6 +8,7 @@ import ( ct "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/federation/types" "github.com/cortezaproject/corteza-server/pkg/actionlog" + "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/store" ) @@ -110,6 +111,7 @@ func (svc exposedModule) Update(ctx context.Context, updated *types.ExposedModul updated.UpdatedAt = now() updated.CreatedAt = old.CreatedAt + updated.UpdatedBy = auth.GetIdentityFromContext(ctx).Identity() // set labels AddFederationLabel(m, node.BaseURL) @@ -205,12 +207,6 @@ func (svc exposedModule) Create(ctx context.Context, new *types.ExposedModule) ( aProps = &exposedModuleActionProps{create: new} ) - // check if compose module actually exists - // TODO - how do we handle namespace? - // if _, err := svc.compose.With(ctx).FindByID(r.NamespaceID, new.ComposeModuleID); err == nil { - // return nil, ExposedModuleErrComposeModuleNotFound() - // } - err := store.Tx(ctx, svc.store, func(ctx context.Context, s store.Storer) (err error) { // TODO // if !svc.ac.CanCreateFederationExposedModule(ctx, ns) { @@ -235,13 +231,12 @@ func (svc exposedModule) Create(ctx context.Context, new *types.ExposedModule) ( // Check for node - compose.Module combo if err = svc.uniqueCheck(ctx, new); err != nil { - return ExposedModuleErrNotUnique() + return err } new.ID = nextID() new.CreatedAt = *now() - new.UpdatedAt = nil - new.DeletedAt = nil + new.CreatedBy = auth.GetIdentityFromContext(ctx).Identity() // check if Fields can be unmarshaled to the fields structure if new.Fields != nil { @@ -273,7 +268,9 @@ func (svc exposedModule) uniqueCheck(ctx context.Context, m *types.ExposedModule ComposeNamespaceID: m.ComposeNamespaceID, } - if set, _, err := store.SearchFederationExposedModules(ctx, svc.store, f); len(set) > 0 && err == nil { + set, _, err := store.SearchFederationExposedModules(ctx, svc.store, f) + + if len(set) > 0 && err == nil { return ExposedModuleErrNotUnique() } else if err != nil { return err diff --git a/federation/service/service.go b/federation/service/service.go index 14f56876a..5c5299659 100644 --- a/federation/service/service.go +++ b/federation/service/service.go @@ -16,6 +16,7 @@ import ( "github.com/cortezaproject/corteza-server/pkg/options" "github.com/cortezaproject/corteza-server/store" "github.com/cortezaproject/corteza-server/system/service" + ss "github.com/cortezaproject/corteza-server/system/service" "github.com/cortezaproject/corteza-server/system/types" "go.uber.org/zap" ) @@ -137,11 +138,17 @@ func Initialize(ctx context.Context, log *zap.Logger, s store.Storer, c Config) } func Watchers(ctx context.Context) { + if !DefaultOptions.Enabled { + return + } + syncService := NewSync( &Syncer{}, &Mapper{}, DefaultSharedModule, - cs.DefaultRecord) + cs.DefaultRecord, + ss.DefaultUser, + ss.DefaultRole) syncStructure := WorkerStructure(syncService, DefaultLogger) syncData := WorkerData(syncService, service.DefaultLogger) diff --git a/federation/service/shared_module.go b/federation/service/shared_module.go index c0e94fceb..9dade14b3 100644 --- a/federation/service/shared_module.go +++ b/federation/service/shared_module.go @@ -6,6 +6,7 @@ import ( composeService "github.com/cortezaproject/corteza-server/compose/service" "github.com/cortezaproject/corteza-server/federation/types" "github.com/cortezaproject/corteza-server/pkg/actionlog" + "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/store" ) @@ -68,8 +69,7 @@ func (svc sharedModule) Create(ctx context.Context, new *types.SharedModule) (*t new.ID = nextID() new.CreatedAt = *now() - new.UpdatedAt = nil - new.DeletedAt = nil + new.CreatedBy = auth.GetIdentityFromContext(ctx).Identity() // check if Fields can be unmarshaled to the fields structure if new.Fields != nil { @@ -94,6 +94,7 @@ func (svc sharedModule) Update(ctx context.Context, updated *types.SharedModule) err := store.Tx(ctx, svc.store, func(ctx context.Context, s store.Storer) (err error) { updated.UpdatedAt = now() + updated.UpdatedBy = auth.GetIdentityFromContext(ctx).Identity() aProps.setModule(updated) diff --git a/federation/service/sync.go b/federation/service/sync.go index a9d92f56b..9fa4f4712 100644 --- a/federation/service/sync.go +++ b/federation/service/sync.go @@ -3,6 +3,7 @@ package service import ( "context" "encoding/json" + "fmt" "io" "time" @@ -10,6 +11,8 @@ import ( ct "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/federation/types" "github.com/cortezaproject/corteza-server/system/service" + ss "github.com/cortezaproject/corteza-server/system/service" + st "github.com/cortezaproject/corteza-server/system/types" ) type ( @@ -18,15 +21,19 @@ type ( mapper *Mapper sharedModuleService SharedModuleService composeRecordService cs.RecordService + systemUserService ss.UserService + systemRoleService ss.RoleService } ) -func NewSync(s *Syncer, m *Mapper, sm SharedModuleService, cs cs.RecordService) *Sync { +func NewSync(s *Syncer, m *Mapper, sm SharedModuleService, cs cs.RecordService, us ss.UserService, rs ss.RoleService) *Sync { return &Sync{ syncer: s, mapper: m, sharedModuleService: sm, composeRecordService: cs, + systemUserService: us, + systemRoleService: rs, } } @@ -143,3 +150,28 @@ func (s *Sync) GetLastSyncTime(ctx context.Context, nodeID uint64, syncType stri return &ns.TimeOfAction, nil } + +// LoadUserWithRoles gets the federation user, that was +// created at node pairing process +func (s *Sync) LoadUserWithRoles(ctx context.Context, nodeID uint64) (*st.User, error) { + var ( + u *st.User + err error + ) + + // get the federated user, associated for this node + if u, err = s.systemUserService.With(ctx).FindByHandle(fmt.Sprintf("federation_%d", nodeID)); err != nil { + return nil, err + } + + // attach the roles + rr, _, err := s.systemRoleService.With(ctx).Find(st.RoleFilter{MemberID: u.ID}) + + if err != nil { + return nil, err + } + + u.SetRoles(rr.IDs()) + + return u, nil +} diff --git a/federation/service/sync_worker_data.go b/federation/service/sync_worker_data.go index 601128791..56c8621c3 100644 --- a/federation/service/sync_worker_data.go +++ b/federation/service/sync_worker_data.go @@ -8,7 +8,10 @@ import ( ct "github.com/cortezaproject/corteza-server/compose/types" "github.com/cortezaproject/corteza-server/federation/types" + "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/decoder" + st "github.com/cortezaproject/corteza-server/system/types" + "github.com/davecgh/go-spew/spew" "go.uber.org/zap" ) @@ -29,6 +32,8 @@ type ( ModuleMappings *types.ModuleFieldMappingSet ModuleMappingValues *ct.RecordValueSet SyncService *Sync + Node *types.Node + User *st.User } ) @@ -61,6 +66,16 @@ func (w *syncWorkerData) PrepareForNodes(ctx context.Context, urls chan Url) { // get all shared modules and their module mappings for _, n := range nodes { + // get the user, associated for this node + u, err := w.syncService.LoadUserWithRoles(ctx, n.ID) + + if err != nil { + w.logger.Info("could not preload federation user, skipping", zap.Uint64("nodeID", n.ID), zap.Error(err)) + continue + } + + spew.Dump("user", u, u.Roles()) + set, err := w.syncService.GetSharedModules(ctx, n.ID) if err != nil { @@ -91,7 +106,7 @@ func (w *syncWorkerData) PrepareForNodes(ctx context.Context, urls chan Url) { // get the last sync per-node lastSync, _ := w.syncService.GetLastSyncTime(ctx, n.ID, types.NodeSyncTypeData) - basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", n.SharedNodeID, sm.ExternalFederationModuleID) + basePath := fmt.Sprintf("/nodes/%d/modules/%d/records/", n.SharedNodeID, sm.ExternalFederationModuleID) z := []zap.Field{ zap.Uint64("nodeID", n.ID), @@ -122,6 +137,8 @@ func (w *syncWorkerData) PrepareForNodes(ctx context.Context, urls chan Url) { ModuleMappingValues: &mappingValues, NodeBaseURL: n.BaseURL, SyncService: w.syncService, + User: u, + Node: n, } go w.queueUrl(&url, urls, processer) @@ -173,6 +190,9 @@ func (w *syncWorkerData) Watch(ctx context.Context, delay time.Duration, limit i continue } + // use the authToken from node pairing + ctx = context.WithValue(ctx, FederationUserToken, url.Meta.(*dataProcesser).Node.AuthToken) + responseBody, err := w.syncService.FetchUrl(ctx, s) if err != nil { @@ -194,7 +214,7 @@ func (w *syncWorkerData) Watch(ctx context.Context, delay time.Duration, limit i continue } - basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", p.Meta.(*dataProcesser).NodeID, p.Meta.(*dataProcesser).ID) + basePath := fmt.Sprintf("/nodes/%d/modules/%d/records/", p.Meta.(*dataProcesser).NodeID, p.Meta.(*dataProcesser).ID) u := types.SyncerURI{ BaseURL: p.Meta.(*dataProcesser).NodeBaseURL, @@ -242,10 +262,11 @@ func (w *syncWorkerData) Watch(ctx context.Context, delay time.Duration, limit i // uses the decode package to decode the whole set, depending on // the filtering that was used (limit) func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (int, error) { - processed := 0 o, err := decoder.DecodeFederationRecordSync([]byte(payload)) + spew.Dump("got records", o) + if err != nil { return processed, err } @@ -254,6 +275,9 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (int, erro return processed, nil } + // get the user that is tied to this node + ctx = auth.SetIdentityToContext(ctx, dp.User) + for _, er := range o { dp.SyncService.mapper.Merge(&er.Values, dp.ModuleMappingValues, dp.ModuleMappings) @@ -268,6 +292,7 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (int, erro _, err := dp.SyncService.CreateRecord(ctx, rec) if err != nil { + spew.Dump("Error saving record", err, rec) continue } diff --git a/federation/service/sync_worker_structure.go b/federation/service/sync_worker_structure.go index cc95f49c7..9059160e1 100644 --- a/federation/service/sync_worker_structure.go +++ b/federation/service/sync_worker_structure.go @@ -7,7 +7,9 @@ import ( "time" "github.com/cortezaproject/corteza-server/federation/types" + "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/decoder" + st "github.com/cortezaproject/corteza-server/system/types" "github.com/davecgh/go-spew/spew" "go.uber.org/zap" ) @@ -25,6 +27,8 @@ type ( SharedNodeID uint64 NodeBaseURL string SyncService *Sync + Node *types.Node + User *st.User } ) @@ -59,9 +63,20 @@ func (w *syncWorkerStructure) PrepareForNodes(ctx context.Context, urls chan Url // get all shared modules and their module mappings for _, n := range nodes { + + // get the user, associated for this node + u, err := w.syncService.LoadUserWithRoles(ctx, n.ID) + + if err != nil { + w.logger.Info("could not preload federation user, skipping", zap.Uint64("nodeID", n.ID), zap.Error(err)) + continue + } + + spew.Dump("user", u, u.Roles()) + // get the last sync per-node lastSync, _ := w.syncService.GetLastSyncTime(ctx, n.ID, types.NodeSyncTypeStructure) - basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", n.SharedNodeID) + basePath := fmt.Sprintf("/nodes/%d/modules/exposed/", n.SharedNodeID) z := []zap.Field{ zap.Uint64("nodeID", n.ID), @@ -88,6 +103,8 @@ func (w *syncWorkerStructure) PrepareForNodes(ctx context.Context, urls chan Url SharedNodeID: n.SharedNodeID, NodeBaseURL: n.BaseURL, SyncService: w.syncService, + User: u, + Node: n, } go w.queueUrl(&url, urls, processer) @@ -138,6 +155,9 @@ func (w *syncWorkerStructure) Watch(ctx context.Context, delay time.Duration, li continue } + // use the authToken from node pairing + ctx = context.WithValue(ctx, FederationUserToken, url.Meta.(*structureProcesser).Node.AuthToken) + responseBody, err := w.syncService.FetchUrl(ctx, s) if err != nil { @@ -160,7 +180,7 @@ func (w *syncWorkerStructure) Watch(ctx context.Context, delay time.Duration, li continue } - basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", p.Meta.(*structureProcesser).SharedNodeID) + basePath := fmt.Sprintf("/nodes/%d/modules/exposed/", p.Meta.(*structureProcesser).SharedNodeID) u := types.SyncerURI{ BaseURL: p.Meta.(*structureProcesser).NodeBaseURL, @@ -206,7 +226,6 @@ func (w *syncWorkerStructure) Watch(ctx context.Context, delay time.Duration, li // the filtering that was used (limit) func (dp *structureProcesser) Process(ctx context.Context, payload []byte) (int, error) { processed := 0 - o, err := decoder.DecodeFederationModuleSync([]byte(payload)) if err != nil { @@ -217,6 +236,9 @@ func (dp *structureProcesser) Process(ctx context.Context, payload []byte) (int, return processed, nil } + // get the user that is tied to this node + ctx = auth.SetIdentityToContext(ctx, dp.User) + for _, em := range o { new := &types.SharedModule{ NodeID: dp.NodeID, diff --git a/federation/service/syncer.go b/federation/service/syncer.go index aa08cac49..057965e0a 100644 --- a/federation/service/syncer.go +++ b/federation/service/syncer.go @@ -6,6 +6,7 @@ import ( "errors" "io" "net/http" + "net/http/httputil" "time" "github.com/cortezaproject/corteza-server/federation/types" @@ -38,6 +39,8 @@ type ( } ) +const FederationUserToken string = "authToken" + func (h *Syncer) Queue(url Url, out chan Url) { out <- url } @@ -53,6 +56,13 @@ func (h *Syncer) Fetch(ctx context.Context, url string) (io.Reader, error) { return nil, err } + if authToken := ctx.Value(FederationUserToken); authToken != nil { + req.Header.Add("Authorization", `Bearer `+authToken.(string)) + } + + dr, _ := httputil.DumpRequest(req, true) + spew.Dump("> using headers", string(dr)) + resp, err := client.Do(req) if err != nil { spew.Dump("ERR", err) diff --git a/federation/types/exposed_module.go b/federation/types/exposed_module.go index bfc7df71e..b3923fbaa 100644 --- a/federation/types/exposed_module.go +++ b/federation/types/exposed_module.go @@ -17,8 +17,11 @@ type ( Fields ModuleFieldSet `json:"fields"` CreatedAt time.Time `json:"createdAt,omitempty"` + CreatedBy uint64 `json:"createdBy,string" ` UpdatedAt *time.Time `json:"updatedAt,omitempty"` + UpdatedBy uint64 `json:"updatedBy,string,omitempty" ` DeletedAt *time.Time `json:"deletedAt,omitempty"` + DeletedBy uint64 `json:"deletedBy,string,omitempty" ` } ExposedModuleFilter struct { diff --git a/federation/types/shared_module.go b/federation/types/shared_module.go index e0a26b541..bc96c7aa6 100644 --- a/federation/types/shared_module.go +++ b/federation/types/shared_module.go @@ -16,8 +16,11 @@ type ( Fields ModuleFieldSet `json:"fields"` CreatedAt time.Time `json:"createdAt,omitempty"` + CreatedBy uint64 `json:"createdBy,string" ` UpdatedAt *time.Time `json:"updatedAt,omitempty"` + UpdatedBy uint64 `json:"updatedBy,string,omitempty" ` DeletedAt *time.Time `json:"deletedAt,omitempty"` + DeletedBy uint64 `json:"deletedBy,string,omitempty" ` } SharedModuleFilter struct { diff --git a/store/federation_exposed_modules.yaml b/store/federation_exposed_modules.yaml index 0d790577a..c95828cb7 100644 --- a/store/federation_exposed_modules.yaml +++ b/store/federation_exposed_modules.yaml @@ -12,9 +12,12 @@ fields: - { field: ComposeModuleID } - { field: ComposeNamespaceID } - { field: Fields, type: "json.Text" } - - { field: CreatedAt, sortable: true } - - { field: UpdatedAt, sortable: true } - - { field: DeletedAt, sortable: true } + - { field: CreatedBy } + - { field: UpdatedBy } + - { field: DeletedBy } + - { field: CreatedAt } + - { field: UpdatedAt } + - { field: DeletedAt } lookups: - fields: [ID] diff --git a/store/federation_shared_modules.yaml b/store/federation_shared_modules.yaml index 662380f4c..4bd5c2ee5 100644 --- a/store/federation_shared_modules.yaml +++ b/store/federation_shared_modules.yaml @@ -11,9 +11,12 @@ fields: - { field: Name } - { field: ExternalFederationModuleID } - { field: Fields, type: "json.Text" } - - { field: CreatedAt, sortable: true } - - { field: UpdatedAt, sortable: true } - - { field: DeletedAt, sortable: true } + - { field: CreatedBy } + - { field: UpdatedBy } + - { field: DeletedBy } + - { field: CreatedAt } + - { field: UpdatedAt } + - { field: DeletedAt } lookups: - fields: [ID] diff --git a/store/rdbms/federation_exposed_modules.gen.go b/store/rdbms/federation_exposed_modules.gen.go index ba75c2dc5..0bd404721 100644 --- a/store/rdbms/federation_exposed_modules.gen.go +++ b/store/rdbms/federation_exposed_modules.gen.go @@ -30,44 +30,50 @@ func (s Store) SearchFederationExposedModules(ctx context.Context, f types.Expos set []*types.ExposedModule q squirrel.SelectBuilder ) - q, err = s.convertFederationExposedModuleFilter(f) - if err != nil { - return nil, f, err - } - - // Cleanup anything we've accidentally received... - f.PrevPage, f.NextPage = nil, nil - - // When cursor for a previous page is used it's marked as reversed - // This tells us to flip the descending flag on all used sort keys - reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse - - // Sorting and paging are both enabled in definition yaml file - // {search: {enableSorting:true, enablePaging:true}} - curSort := f.Sort.Clone() - - // If paging with reverse cursor, change the sorting - // direction for all columns we're sorting by - if reversedCursor { - curSort.Reverse() - } return set, f, func() error { - set, err = s.fetchFullPageOfFederationExposedModules(ctx, q, curSort, f.PageCursor, f.Limit, f.Check) - + q, err = s.convertFederationExposedModuleFilter(f) if err != nil { return err } - if f.Limit > 0 && len(set) > 0 { - if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) { - f.PrevPage = s.collectFederationExposedModuleCursorValues(set[0], curSort.Columns()...) - f.PrevPage.Reverse = true - } + // Paging enabled + // {search: {enablePaging:true}} + // Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned) + f.PrevPage, f.NextPage = nil, nil - // Less items fetched then requested by page-limit - // not very likely there's another page - f.NextPage = s.collectFederationExposedModuleCursorValues(set[len(set)-1], curSort.Columns()...) + if f.PageCursor != nil { + // Page cursor exists so we need to validate it against used sort + if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil { + return err + } + } + + if len(f.Sort) == 0 { + f.Sort = filter.SortExprSet{} + } + + // Make sure results are always sorted at least by primary keys + if f.Sort.Get("id") == nil { + f.Sort = append(f.Sort, &filter.SortExpr{Column: "id"}) + } + + sort := f.Sort.Clone() + + // When cursor for a previous page is used it's marked as reversed + // This tells us to flip the descending flag on all used sort keys + if f.PageCursor != nil && f.PageCursor.Reverse { + sort.Reverse() + } + + // Apply sorting expr from filter to query + if q, err = setOrderBy(q, sort, s.sortableFederationExposedModuleColumns()); err != nil { + return err + } + + set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationExposedModules(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check) + if err != nil { + return err } f.PageCursor = nil @@ -79,58 +85,62 @@ func (s Store) SearchFederationExposedModules(ctx context.Context, f types.Expos // // Function applies: // - cursor conditions (where ...) -// - sorting rules (order by ...) // - limit // // Main responsibility of this function is to perform additional sequential queries in case when not enough results -// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched +// are collected due to failed check on a specific row (by check fn). +// +// Function then moves cursor to the last item fetched func (s Store) fetchFullPageOfFederationExposedModules( ctx context.Context, q squirrel.SelectBuilder, - sort filter.SortExprSet, + sortColumns []string, + sortDesc bool, cursor *filter.PagingCursor, limit uint, check func(*types.ExposedModule) (bool, error), -) ([]*types.ExposedModule, error) { +) (set []*types.ExposedModule, prev, next *filter.PagingCursor, err error) { var ( - set = make([]*types.ExposedModule, 0, DefaultSliceCapacity) - aux []*types.ExposedModule - last *types.ExposedModule + aux []*types.ExposedModule // When cursor for a previous page is used it's marked as reversed // This tells us to flip the descending flag on all used sort keys - reversedCursor = cursor != nil && cursor.Reverse + reversedOrder = cursor != nil && cursor.Reverse // copy of the select builder tryQuery squirrel.SelectBuilder fetched uint - err error ) - // Make sure we always end our sort by primary keys - if sort.Get("id") == nil { - sort = append(sort, &filter.SortExpr{Column: "id"}) - } + set = make([]*types.ExposedModule, 0, DefaultSliceCapacity) - // Apply sorting expr from filter to query - if q, err = setOrderBy(q, sort, s.sortableFederationExposedModuleColumns()); err != nil { - return nil, err + if cursor != nil { + cursor.Reverse = sortDesc } for try := 0; try < MaxRefetches; try++ { tryQuery = setCursorCond(q, cursor) if limit > 0 { - tryQuery = tryQuery.Limit(uint64(limit)) + tryQuery = tryQuery.Limit(uint64(limit + 1)) } - if aux, fetched, last, err = s.QueryFederationExposedModules(ctx, tryQuery, check); err != nil { - return nil, err + if aux, err = s.QueryFederationExposedModules(ctx, tryQuery, check); err != nil { + return nil, nil, nil, err } - if limit > 0 && uint(len(aux)) >= limit { + fetched = uint(len(aux)) + if cursor != nil && prev == nil && fetched > 0 { + // Cursor for previous page is calculated only when cursor is used (so, not on first page) + prev = s.collectFederationExposedModuleCursorValues(aux[0], sortColumns...) + } + + // Point cursor to the last fetched element + if fetched > limit && limit > 0 { + next = s.collectFederationExposedModuleCursorValues(aux[limit-1], sortColumns...) + // we should use only as much as requested - set = append(set, aux[0:limit]...) + set = append(set, aux[:limit]...) break } else { set = append(set, aux...) @@ -138,7 +148,7 @@ func (s Store) fetchFullPageOfFederationExposedModules( // if limit is not set or we've already collected enough items // we can break the loop right away - if limit == 0 || fetched == 0 || fetched < limit { + if limit == 0 || fetched == 0 || fetched <= limit { break } @@ -149,23 +159,23 @@ func (s Store) fetchFullPageOfFederationExposedModules( } // @todo improve strategy for collecting next page with lower limit - - // Point cursor to the last fetched element - if cursor = s.collectFederationExposedModuleCursorValues(last, sort.Columns()...); cursor == nil { - break - } } - if reversedCursor { - // Cursor for previous page was used - // Fetched set needs to be reverseCursor because we've forced a descending order to - // get the previous page + if reversedOrder { + // Fetched set needs to be reversed because we've forced a descending order to get the previous page for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 { set[i], set[j] = set[j], set[i] } + + // and flip prev/next cursors too + prev, next = next, prev } - return set, nil + if prev != nil { + prev.Reverse = true + } + + return set, prev, next, nil } // QueryFederationExposedModules queries the database, converts and checks each row and @@ -177,39 +187,35 @@ func (s Store) QueryFederationExposedModules( ctx context.Context, q squirrel.Sqlizer, check func(*types.ExposedModule) (bool, error), -) ([]*types.ExposedModule, uint, *types.ExposedModule, error) { +) ([]*types.ExposedModule, error) { var ( set = make([]*types.ExposedModule, 0, DefaultSliceCapacity) res *types.ExposedModule // Query rows with rows, err = s.Query(ctx, q) - - fetched uint ) if err != nil { - return nil, 0, nil, err + return nil, err } defer rows.Close() for rows.Next() { - fetched++ if err = rows.Err(); err == nil { res, err = s.internalFederationExposedModuleRowScanner(rows) } if err != nil { - return nil, 0, nil, err + return nil, err } - // If check function is set, call it and act accordingly + // check fn set, call it and see if it passed the test + // if not, skip the item if check != nil { if chk, err := check(res); err != nil { - return nil, 0, nil, err + return nil, err } else if !chk { - // did not pass the check - // go with the next row continue } } @@ -217,7 +223,7 @@ func (s Store) QueryFederationExposedModules( set = append(set, res) } - return set, fetched, res, rows.Err() + return set, rows.Err() } // LookupFederationExposedModuleByID searches for federation module by ID @@ -383,6 +389,9 @@ func (s Store) internalFederationExposedModuleRowScanner(row rowScanner) (res *t &res.ComposeModuleID, &res.ComposeNamespaceID, &res.Fields, + &res.CreatedBy, + &res.UpdatedBy, + &res.DeletedBy, &res.CreatedAt, &res.UpdatedAt, &res.DeletedAt, @@ -432,25 +441,23 @@ func (Store) federationExposedModuleColumns(aa ...string) []string { alias + "rel_compose_module", alias + "rel_compose_namespace", alias + "fields", + alias + "created_by", + alias + "updated_by", + alias + "deleted_by", alias + "created_at", alias + "updated_at", alias + "deleted_at", } } -// {true true true true true} +// {true true false true true true} // sortableFederationExposedModuleColumns returns all FederationExposedModule columns flagged as sortable // // With optional string arg, all columns are returned aliased func (Store) sortableFederationExposedModuleColumns() map[string]string { return map[string]string{ - "id": "id", "created_at": "created_at", - "createdat": "created_at", - "updated_at": "updated_at", - "updatedat": "updated_at", - "deleted_at": "deleted_at", - "deletedat": "deleted_at", + "id": "id", } } @@ -467,6 +474,9 @@ func (s Store) internalFederationExposedModuleEncoder(res *types.ExposedModule) "rel_compose_module": res.ComposeModuleID, "rel_compose_namespace": res.ComposeNamespaceID, "fields": res.Fields, + "created_by": res.CreatedBy, + "updated_by": res.UpdatedBy, + "deleted_by": res.DeletedBy, "created_at": res.CreatedAt, "updated_at": res.UpdatedAt, "deleted_at": res.DeletedAt, @@ -499,14 +509,6 @@ func (s Store) collectFederationExposedModuleCursorValues(res *types.ExposedModu cursor.Set(c, res.ID, false) pkId = true - case "created_at": - cursor.Set(c, res.CreatedAt, false) - - case "updated_at": - cursor.Set(c, res.UpdatedAt, false) - - case "deleted_at": - cursor.Set(c, res.DeletedAt, false) } } diff --git a/store/rdbms/federation_module_mappings.gen.go b/store/rdbms/federation_module_mappings.gen.go index 9b0806e25..f7282e9c1 100644 --- a/store/rdbms/federation_module_mappings.gen.go +++ b/store/rdbms/federation_module_mappings.gen.go @@ -30,44 +30,56 @@ func (s Store) SearchFederationModuleMappings(ctx context.Context, f types.Modul set []*types.ModuleMapping q squirrel.SelectBuilder ) - q, err = s.convertFederationModuleMappingFilter(f) - if err != nil { - return nil, f, err - } - - // Cleanup anything we've accidentally received... - f.PrevPage, f.NextPage = nil, nil - - // When cursor for a previous page is used it's marked as reversed - // This tells us to flip the descending flag on all used sort keys - reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse - - // Sorting and paging are both enabled in definition yaml file - // {search: {enableSorting:true, enablePaging:true}} - curSort := f.Sort.Clone() - - // If paging with reverse cursor, change the sorting - // direction for all columns we're sorting by - if reversedCursor { - curSort.Reverse() - } return set, f, func() error { - set, err = s.fetchFullPageOfFederationModuleMappings(ctx, q, curSort, f.PageCursor, f.Limit, f.Check) - + q, err = s.convertFederationModuleMappingFilter(f) if err != nil { return err } - if f.Limit > 0 && len(set) > 0 { - if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) { - f.PrevPage = s.collectFederationModuleMappingCursorValues(set[0], curSort.Columns()...) - f.PrevPage.Reverse = true - } + // Paging enabled + // {search: {enablePaging:true}} + // Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned) + f.PrevPage, f.NextPage = nil, nil - // Less items fetched then requested by page-limit - // not very likely there's another page - f.NextPage = s.collectFederationModuleMappingCursorValues(set[len(set)-1], curSort.Columns()...) + if f.PageCursor != nil { + // Page cursor exists so we need to validate it against used sort + if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil { + return err + } + } + + if len(f.Sort) == 0 { + f.Sort = filter.SortExprSet{} + } + + // Make sure results are always sorted at least by primary keys + if f.Sort.Get("rel_federation_module") == nil { + f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_federation_module"}) + } + if f.Sort.Get("rel_compose_module") == nil { + f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_compose_module"}) + } + if f.Sort.Get("rel_compose_namespace") == nil { + f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_compose_namespace"}) + } + + sort := f.Sort.Clone() + + // When cursor for a previous page is used it's marked as reversed + // This tells us to flip the descending flag on all used sort keys + if f.PageCursor != nil && f.PageCursor.Reverse { + sort.Reverse() + } + + // Apply sorting expr from filter to query + if q, err = setOrderBy(q, sort, s.sortableFederationModuleMappingColumns()); err != nil { + return err + } + + set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationModuleMappings(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check) + if err != nil { + return err } f.PageCursor = nil @@ -79,66 +91,62 @@ func (s Store) SearchFederationModuleMappings(ctx context.Context, f types.Modul // // Function applies: // - cursor conditions (where ...) -// - sorting rules (order by ...) // - limit // // Main responsibility of this function is to perform additional sequential queries in case when not enough results -// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched +// are collected due to failed check on a specific row (by check fn). +// +// Function then moves cursor to the last item fetched func (s Store) fetchFullPageOfFederationModuleMappings( ctx context.Context, q squirrel.SelectBuilder, - sort filter.SortExprSet, + sortColumns []string, + sortDesc bool, cursor *filter.PagingCursor, limit uint, check func(*types.ModuleMapping) (bool, error), -) ([]*types.ModuleMapping, error) { +) (set []*types.ModuleMapping, prev, next *filter.PagingCursor, err error) { var ( - set = make([]*types.ModuleMapping, 0, DefaultSliceCapacity) - aux []*types.ModuleMapping - last *types.ModuleMapping + aux []*types.ModuleMapping // When cursor for a previous page is used it's marked as reversed // This tells us to flip the descending flag on all used sort keys - reversedCursor = cursor != nil && cursor.Reverse + reversedOrder = cursor != nil && cursor.Reverse // copy of the select builder tryQuery squirrel.SelectBuilder fetched uint - err error ) - // Make sure we always end our sort by primary keys - if sort.Get("rel_federation_module") == nil { - sort = append(sort, &filter.SortExpr{Column: "rel_federation_module"}) - } + set = make([]*types.ModuleMapping, 0, DefaultSliceCapacity) - if sort.Get("rel_compose_module") == nil { - sort = append(sort, &filter.SortExpr{Column: "rel_compose_module"}) - } - - if sort.Get("rel_compose_namespace") == nil { - sort = append(sort, &filter.SortExpr{Column: "rel_compose_namespace"}) - } - - // Apply sorting expr from filter to query - if q, err = setOrderBy(q, sort, s.sortableFederationModuleMappingColumns()); err != nil { - return nil, err + if cursor != nil { + cursor.Reverse = sortDesc } for try := 0; try < MaxRefetches; try++ { tryQuery = setCursorCond(q, cursor) if limit > 0 { - tryQuery = tryQuery.Limit(uint64(limit)) + tryQuery = tryQuery.Limit(uint64(limit + 1)) } - if aux, fetched, last, err = s.QueryFederationModuleMappings(ctx, tryQuery, check); err != nil { - return nil, err + if aux, err = s.QueryFederationModuleMappings(ctx, tryQuery, check); err != nil { + return nil, nil, nil, err } - if limit > 0 && uint(len(aux)) >= limit { + fetched = uint(len(aux)) + if cursor != nil && prev == nil && fetched > 0 { + // Cursor for previous page is calculated only when cursor is used (so, not on first page) + prev = s.collectFederationModuleMappingCursorValues(aux[0], sortColumns...) + } + + // Point cursor to the last fetched element + if fetched > limit && limit > 0 { + next = s.collectFederationModuleMappingCursorValues(aux[limit-1], sortColumns...) + // we should use only as much as requested - set = append(set, aux[0:limit]...) + set = append(set, aux[:limit]...) break } else { set = append(set, aux...) @@ -146,7 +154,7 @@ func (s Store) fetchFullPageOfFederationModuleMappings( // if limit is not set or we've already collected enough items // we can break the loop right away - if limit == 0 || fetched == 0 || fetched < limit { + if limit == 0 || fetched == 0 || fetched <= limit { break } @@ -157,23 +165,23 @@ func (s Store) fetchFullPageOfFederationModuleMappings( } // @todo improve strategy for collecting next page with lower limit - - // Point cursor to the last fetched element - if cursor = s.collectFederationModuleMappingCursorValues(last, sort.Columns()...); cursor == nil { - break - } } - if reversedCursor { - // Cursor for previous page was used - // Fetched set needs to be reverseCursor because we've forced a descending order to - // get the previous page + if reversedOrder { + // Fetched set needs to be reversed because we've forced a descending order to get the previous page for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 { set[i], set[j] = set[j], set[i] } + + // and flip prev/next cursors too + prev, next = next, prev } - return set, nil + if prev != nil { + prev.Reverse = true + } + + return set, prev, next, nil } // QueryFederationModuleMappings queries the database, converts and checks each row and @@ -185,39 +193,35 @@ func (s Store) QueryFederationModuleMappings( ctx context.Context, q squirrel.Sqlizer, check func(*types.ModuleMapping) (bool, error), -) ([]*types.ModuleMapping, uint, *types.ModuleMapping, error) { +) ([]*types.ModuleMapping, error) { var ( set = make([]*types.ModuleMapping, 0, DefaultSliceCapacity) res *types.ModuleMapping // Query rows with rows, err = s.Query(ctx, q) - - fetched uint ) if err != nil { - return nil, 0, nil, err + return nil, err } defer rows.Close() for rows.Next() { - fetched++ if err = rows.Err(); err == nil { res, err = s.internalFederationModuleMappingRowScanner(rows) } if err != nil { - return nil, 0, nil, err + return nil, err } - // If check function is set, call it and act accordingly + // check fn set, call it and see if it passed the test + // if not, skip the item if check != nil { if chk, err := check(res); err != nil { - return nil, 0, nil, err + return nil, err } else if !chk { - // did not pass the check - // go with the next row continue } } @@ -225,7 +229,7 @@ func (s Store) QueryFederationModuleMappings( set = append(set, res) } - return set, fetched, res, rows.Err() + return set, rows.Err() } // LookupFederationModuleMappingByFederationModuleIDComposeModuleIDComposeNamespaceID searches for module mapping by federation module id and compose module id @@ -449,7 +453,7 @@ func (Store) federationModuleMappingColumns(aa ...string) []string { } } -// {true true true true true} +// {true true false true true true} // sortableFederationModuleMappingColumns returns all FederationModuleMapping columns flagged as sortable // diff --git a/store/rdbms/federation_nodes.gen.go b/store/rdbms/federation_nodes.gen.go index 83816ae66..7b1794690 100644 --- a/store/rdbms/federation_nodes.gen.go +++ b/store/rdbms/federation_nodes.gen.go @@ -29,15 +29,15 @@ func (s Store) SearchFederationNodes(ctx context.Context, f types.NodeFilter) (t set []*types.Node q squirrel.SelectBuilder ) - q, err = s.convertFederationNodeFilter(f) - if err != nil { - return nil, f, err - } return set, f, func() error { - set, _, _, err = s.QueryFederationNodes(ctx, q, f.Check) - return err + q, err = s.convertFederationNodeFilter(f) + if err != nil { + return err + } + set, err = s.QueryFederationNodes(ctx, q, f.Check) + return err }() } @@ -50,39 +50,35 @@ func (s Store) QueryFederationNodes( ctx context.Context, q squirrel.Sqlizer, check func(*types.Node) (bool, error), -) ([]*types.Node, uint, *types.Node, error) { +) ([]*types.Node, error) { var ( set = make([]*types.Node, 0, DefaultSliceCapacity) res *types.Node // Query rows with rows, err = s.Query(ctx, q) - - fetched uint ) if err != nil { - return nil, 0, nil, err + return nil, err } defer rows.Close() for rows.Next() { - fetched++ if err = rows.Err(); err == nil { res, err = s.internalFederationNodeRowScanner(rows) } if err != nil { - return nil, 0, nil, err + return nil, err } - // If check function is set, call it and act accordingly + // check fn set, call it and see if it passed the test + // if not, skip the item if check != nil { if chk, err := check(res); err != nil { - return nil, 0, nil, err + return nil, err } else if !chk { - // did not pass the check - // go with the next row continue } } @@ -90,7 +86,7 @@ func (s Store) QueryFederationNodes( set = append(set, res) } - return set, fetched, res, rows.Err() + return set, rows.Err() } // LookupFederationNodeByID searches for federation node by ID @@ -332,7 +328,7 @@ func (Store) federationNodeColumns(aa ...string) []string { } } -// {true true false false true} +// {true true false false false true} // internalFederationNodeEncoder encodes fields from types.Node to store.Payload (map) // diff --git a/store/rdbms/federation_nodes_sync.gen.go b/store/rdbms/federation_nodes_sync.gen.go index a3b0109cf..66f314758 100644 --- a/store/rdbms/federation_nodes_sync.gen.go +++ b/store/rdbms/federation_nodes_sync.gen.go @@ -30,44 +30,50 @@ func (s Store) SearchFederationNodesSyncs(ctx context.Context, f types.NodeSyncF set []*types.NodeSync q squirrel.SelectBuilder ) - q, err = s.convertFederationNodesSyncFilter(f) - if err != nil { - return nil, f, err - } - - // Cleanup anything we've accidentally received... - f.PrevPage, f.NextPage = nil, nil - - // When cursor for a previous page is used it's marked as reversed - // This tells us to flip the descending flag on all used sort keys - reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse - - // Sorting and paging are both enabled in definition yaml file - // {search: {enableSorting:true, enablePaging:true}} - curSort := f.Sort.Clone() - - // If paging with reverse cursor, change the sorting - // direction for all columns we're sorting by - if reversedCursor { - curSort.Reverse() - } return set, f, func() error { - set, err = s.fetchFullPageOfFederationNodesSyncs(ctx, q, curSort, f.PageCursor, f.Limit, f.Check) - + q, err = s.convertFederationNodesSyncFilter(f) if err != nil { return err } - if f.Limit > 0 && len(set) > 0 { - if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) { - f.PrevPage = s.collectFederationNodesSyncCursorValues(set[0], curSort.Columns()...) - f.PrevPage.Reverse = true - } + // Paging enabled + // {search: {enablePaging:true}} + // Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned) + f.PrevPage, f.NextPage = nil, nil - // Less items fetched then requested by page-limit - // not very likely there's another page - f.NextPage = s.collectFederationNodesSyncCursorValues(set[len(set)-1], curSort.Columns()...) + if f.PageCursor != nil { + // Page cursor exists so we need to validate it against used sort + if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil { + return err + } + } + + if len(f.Sort) == 0 { + f.Sort = filter.SortExprSet{} + } + + // Make sure results are always sorted at least by primary keys + if f.Sort.Get("rel_node") == nil { + f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_node"}) + } + + sort := f.Sort.Clone() + + // When cursor for a previous page is used it's marked as reversed + // This tells us to flip the descending flag on all used sort keys + if f.PageCursor != nil && f.PageCursor.Reverse { + sort.Reverse() + } + + // Apply sorting expr from filter to query + if q, err = setOrderBy(q, sort, s.sortableFederationNodesSyncColumns()); err != nil { + return err + } + + set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationNodesSyncs(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check) + if err != nil { + return err } f.PageCursor = nil @@ -79,58 +85,62 @@ func (s Store) SearchFederationNodesSyncs(ctx context.Context, f types.NodeSyncF // // Function applies: // - cursor conditions (where ...) -// - sorting rules (order by ...) // - limit // // Main responsibility of this function is to perform additional sequential queries in case when not enough results -// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched +// are collected due to failed check on a specific row (by check fn). +// +// Function then moves cursor to the last item fetched func (s Store) fetchFullPageOfFederationNodesSyncs( ctx context.Context, q squirrel.SelectBuilder, - sort filter.SortExprSet, + sortColumns []string, + sortDesc bool, cursor *filter.PagingCursor, limit uint, check func(*types.NodeSync) (bool, error), -) ([]*types.NodeSync, error) { +) (set []*types.NodeSync, prev, next *filter.PagingCursor, err error) { var ( - set = make([]*types.NodeSync, 0, DefaultSliceCapacity) - aux []*types.NodeSync - last *types.NodeSync + aux []*types.NodeSync // When cursor for a previous page is used it's marked as reversed // This tells us to flip the descending flag on all used sort keys - reversedCursor = cursor != nil && cursor.Reverse + reversedOrder = cursor != nil && cursor.Reverse // copy of the select builder tryQuery squirrel.SelectBuilder fetched uint - err error ) - // Make sure we always end our sort by primary keys - if sort.Get("rel_node") == nil { - sort = append(sort, &filter.SortExpr{Column: "rel_node"}) - } + set = make([]*types.NodeSync, 0, DefaultSliceCapacity) - // Apply sorting expr from filter to query - if q, err = setOrderBy(q, sort, s.sortableFederationNodesSyncColumns()); err != nil { - return nil, err + if cursor != nil { + cursor.Reverse = sortDesc } for try := 0; try < MaxRefetches; try++ { tryQuery = setCursorCond(q, cursor) if limit > 0 { - tryQuery = tryQuery.Limit(uint64(limit)) + tryQuery = tryQuery.Limit(uint64(limit + 1)) } - if aux, fetched, last, err = s.QueryFederationNodesSyncs(ctx, tryQuery, check); err != nil { - return nil, err + if aux, err = s.QueryFederationNodesSyncs(ctx, tryQuery, check); err != nil { + return nil, nil, nil, err } - if limit > 0 && uint(len(aux)) >= limit { + fetched = uint(len(aux)) + if cursor != nil && prev == nil && fetched > 0 { + // Cursor for previous page is calculated only when cursor is used (so, not on first page) + prev = s.collectFederationNodesSyncCursorValues(aux[0], sortColumns...) + } + + // Point cursor to the last fetched element + if fetched > limit && limit > 0 { + next = s.collectFederationNodesSyncCursorValues(aux[limit-1], sortColumns...) + // we should use only as much as requested - set = append(set, aux[0:limit]...) + set = append(set, aux[:limit]...) break } else { set = append(set, aux...) @@ -138,7 +148,7 @@ func (s Store) fetchFullPageOfFederationNodesSyncs( // if limit is not set or we've already collected enough items // we can break the loop right away - if limit == 0 || fetched == 0 || fetched < limit { + if limit == 0 || fetched == 0 || fetched <= limit { break } @@ -149,23 +159,23 @@ func (s Store) fetchFullPageOfFederationNodesSyncs( } // @todo improve strategy for collecting next page with lower limit - - // Point cursor to the last fetched element - if cursor = s.collectFederationNodesSyncCursorValues(last, sort.Columns()...); cursor == nil { - break - } } - if reversedCursor { - // Cursor for previous page was used - // Fetched set needs to be reverseCursor because we've forced a descending order to - // get the previous page + if reversedOrder { + // Fetched set needs to be reversed because we've forced a descending order to get the previous page for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 { set[i], set[j] = set[j], set[i] } + + // and flip prev/next cursors too + prev, next = next, prev } - return set, nil + if prev != nil { + prev.Reverse = true + } + + return set, prev, next, nil } // QueryFederationNodesSyncs queries the database, converts and checks each row and @@ -177,39 +187,35 @@ func (s Store) QueryFederationNodesSyncs( ctx context.Context, q squirrel.Sqlizer, check func(*types.NodeSync) (bool, error), -) ([]*types.NodeSync, uint, *types.NodeSync, error) { +) ([]*types.NodeSync, error) { var ( set = make([]*types.NodeSync, 0, DefaultSliceCapacity) res *types.NodeSync // Query rows with rows, err = s.Query(ctx, q) - - fetched uint ) if err != nil { - return nil, 0, nil, err + return nil, err } defer rows.Close() for rows.Next() { - fetched++ if err = rows.Err(); err == nil { res, err = s.internalFederationNodesSyncRowScanner(rows) } if err != nil { - return nil, 0, nil, err + return nil, err } - // If check function is set, call it and act accordingly + // check fn set, call it and see if it passed the test + // if not, skip the item if check != nil { if chk, err := check(res); err != nil { - return nil, 0, nil, err + return nil, err } else if !chk { - // did not pass the check - // go with the next row continue } } @@ -217,7 +223,7 @@ func (s Store) QueryFederationNodesSyncs( set = append(set, res) } - return set, fetched, res, rows.Err() + return set, rows.Err() } // LookupFederationNodesSyncByNodeID searches for sync activity by node ID @@ -437,7 +443,7 @@ func (Store) federationNodesSyncColumns(aa ...string) []string { } } -// {true true true true true} +// {true true false true true true} // sortableFederationNodesSyncColumns returns all FederationNodesSync columns flagged as sortable // diff --git a/store/rdbms/federation_shared_modules.gen.go b/store/rdbms/federation_shared_modules.gen.go index 673636205..c4ef750b7 100644 --- a/store/rdbms/federation_shared_modules.gen.go +++ b/store/rdbms/federation_shared_modules.gen.go @@ -30,44 +30,50 @@ func (s Store) SearchFederationSharedModules(ctx context.Context, f types.Shared set []*types.SharedModule q squirrel.SelectBuilder ) - q, err = s.convertFederationSharedModuleFilter(f) - if err != nil { - return nil, f, err - } - - // Cleanup anything we've accidentally received... - f.PrevPage, f.NextPage = nil, nil - - // When cursor for a previous page is used it's marked as reversed - // This tells us to flip the descending flag on all used sort keys - reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse - - // Sorting and paging are both enabled in definition yaml file - // {search: {enableSorting:true, enablePaging:true}} - curSort := f.Sort.Clone() - - // If paging with reverse cursor, change the sorting - // direction for all columns we're sorting by - if reversedCursor { - curSort.Reverse() - } return set, f, func() error { - set, err = s.fetchFullPageOfFederationSharedModules(ctx, q, curSort, f.PageCursor, f.Limit, f.Check) - + q, err = s.convertFederationSharedModuleFilter(f) if err != nil { return err } - if f.Limit > 0 && len(set) > 0 { - if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) { - f.PrevPage = s.collectFederationSharedModuleCursorValues(set[0], curSort.Columns()...) - f.PrevPage.Reverse = true - } + // Paging enabled + // {search: {enablePaging:true}} + // Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned) + f.PrevPage, f.NextPage = nil, nil - // Less items fetched then requested by page-limit - // not very likely there's another page - f.NextPage = s.collectFederationSharedModuleCursorValues(set[len(set)-1], curSort.Columns()...) + if f.PageCursor != nil { + // Page cursor exists so we need to validate it against used sort + if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil { + return err + } + } + + if len(f.Sort) == 0 { + f.Sort = filter.SortExprSet{} + } + + // Make sure results are always sorted at least by primary keys + if f.Sort.Get("id") == nil { + f.Sort = append(f.Sort, &filter.SortExpr{Column: "id"}) + } + + sort := f.Sort.Clone() + + // When cursor for a previous page is used it's marked as reversed + // This tells us to flip the descending flag on all used sort keys + if f.PageCursor != nil && f.PageCursor.Reverse { + sort.Reverse() + } + + // Apply sorting expr from filter to query + if q, err = setOrderBy(q, sort, s.sortableFederationSharedModuleColumns()); err != nil { + return err + } + + set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationSharedModules(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check) + if err != nil { + return err } f.PageCursor = nil @@ -79,58 +85,62 @@ func (s Store) SearchFederationSharedModules(ctx context.Context, f types.Shared // // Function applies: // - cursor conditions (where ...) -// - sorting rules (order by ...) // - limit // // Main responsibility of this function is to perform additional sequential queries in case when not enough results -// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched +// are collected due to failed check on a specific row (by check fn). +// +// Function then moves cursor to the last item fetched func (s Store) fetchFullPageOfFederationSharedModules( ctx context.Context, q squirrel.SelectBuilder, - sort filter.SortExprSet, + sortColumns []string, + sortDesc bool, cursor *filter.PagingCursor, limit uint, check func(*types.SharedModule) (bool, error), -) ([]*types.SharedModule, error) { +) (set []*types.SharedModule, prev, next *filter.PagingCursor, err error) { var ( - set = make([]*types.SharedModule, 0, DefaultSliceCapacity) - aux []*types.SharedModule - last *types.SharedModule + aux []*types.SharedModule // When cursor for a previous page is used it's marked as reversed // This tells us to flip the descending flag on all used sort keys - reversedCursor = cursor != nil && cursor.Reverse + reversedOrder = cursor != nil && cursor.Reverse // copy of the select builder tryQuery squirrel.SelectBuilder fetched uint - err error ) - // Make sure we always end our sort by primary keys - if sort.Get("id") == nil { - sort = append(sort, &filter.SortExpr{Column: "id"}) - } + set = make([]*types.SharedModule, 0, DefaultSliceCapacity) - // Apply sorting expr from filter to query - if q, err = setOrderBy(q, sort, s.sortableFederationSharedModuleColumns()); err != nil { - return nil, err + if cursor != nil { + cursor.Reverse = sortDesc } for try := 0; try < MaxRefetches; try++ { tryQuery = setCursorCond(q, cursor) if limit > 0 { - tryQuery = tryQuery.Limit(uint64(limit)) + tryQuery = tryQuery.Limit(uint64(limit + 1)) } - if aux, fetched, last, err = s.QueryFederationSharedModules(ctx, tryQuery, check); err != nil { - return nil, err + if aux, err = s.QueryFederationSharedModules(ctx, tryQuery, check); err != nil { + return nil, nil, nil, err } - if limit > 0 && uint(len(aux)) >= limit { + fetched = uint(len(aux)) + if cursor != nil && prev == nil && fetched > 0 { + // Cursor for previous page is calculated only when cursor is used (so, not on first page) + prev = s.collectFederationSharedModuleCursorValues(aux[0], sortColumns...) + } + + // Point cursor to the last fetched element + if fetched > limit && limit > 0 { + next = s.collectFederationSharedModuleCursorValues(aux[limit-1], sortColumns...) + // we should use only as much as requested - set = append(set, aux[0:limit]...) + set = append(set, aux[:limit]...) break } else { set = append(set, aux...) @@ -138,7 +148,7 @@ func (s Store) fetchFullPageOfFederationSharedModules( // if limit is not set or we've already collected enough items // we can break the loop right away - if limit == 0 || fetched == 0 || fetched < limit { + if limit == 0 || fetched == 0 || fetched <= limit { break } @@ -149,23 +159,23 @@ func (s Store) fetchFullPageOfFederationSharedModules( } // @todo improve strategy for collecting next page with lower limit - - // Point cursor to the last fetched element - if cursor = s.collectFederationSharedModuleCursorValues(last, sort.Columns()...); cursor == nil { - break - } } - if reversedCursor { - // Cursor for previous page was used - // Fetched set needs to be reverseCursor because we've forced a descending order to - // get the previous page + if reversedOrder { + // Fetched set needs to be reversed because we've forced a descending order to get the previous page for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 { set[i], set[j] = set[j], set[i] } + + // and flip prev/next cursors too + prev, next = next, prev } - return set, nil + if prev != nil { + prev.Reverse = true + } + + return set, prev, next, nil } // QueryFederationSharedModules queries the database, converts and checks each row and @@ -177,39 +187,35 @@ func (s Store) QueryFederationSharedModules( ctx context.Context, q squirrel.Sqlizer, check func(*types.SharedModule) (bool, error), -) ([]*types.SharedModule, uint, *types.SharedModule, error) { +) ([]*types.SharedModule, error) { var ( set = make([]*types.SharedModule, 0, DefaultSliceCapacity) res *types.SharedModule // Query rows with rows, err = s.Query(ctx, q) - - fetched uint ) if err != nil { - return nil, 0, nil, err + return nil, err } defer rows.Close() for rows.Next() { - fetched++ if err = rows.Err(); err == nil { res, err = s.internalFederationSharedModuleRowScanner(rows) } if err != nil { - return nil, 0, nil, err + return nil, err } - // If check function is set, call it and act accordingly + // check fn set, call it and see if it passed the test + // if not, skip the item if check != nil { if chk, err := check(res); err != nil { - return nil, 0, nil, err + return nil, err } else if !chk { - // did not pass the check - // go with the next row continue } } @@ -217,7 +223,7 @@ func (s Store) QueryFederationSharedModules( set = append(set, res) } - return set, fetched, res, rows.Err() + return set, rows.Err() } // LookupFederationSharedModuleByID searches for shared federation module by ID @@ -382,6 +388,9 @@ func (s Store) internalFederationSharedModuleRowScanner(row rowScanner) (res *ty &res.Name, &res.ExternalFederationModuleID, &res.Fields, + &res.CreatedBy, + &res.UpdatedBy, + &res.DeletedBy, &res.CreatedAt, &res.UpdatedAt, &res.DeletedAt, @@ -430,25 +439,23 @@ func (Store) federationSharedModuleColumns(aa ...string) []string { alias + "name", alias + "xref_module", alias + "fields", + alias + "created_by", + alias + "updated_by", + alias + "deleted_by", alias + "created_at", alias + "updated_at", alias + "deleted_at", } } -// {true true true true true} +// {true true false true true true} // sortableFederationSharedModuleColumns returns all FederationSharedModule columns flagged as sortable // // With optional string arg, all columns are returned aliased func (Store) sortableFederationSharedModuleColumns() map[string]string { return map[string]string{ - "id": "id", "created_at": "created_at", - "createdat": "created_at", - "updated_at": "updated_at", - "updatedat": "updated_at", - "deleted_at": "deleted_at", - "deletedat": "deleted_at", + "id": "id", } } @@ -464,6 +471,9 @@ func (s Store) internalFederationSharedModuleEncoder(res *types.SharedModule) st "name": res.Name, "xref_module": res.ExternalFederationModuleID, "fields": res.Fields, + "created_by": res.CreatedBy, + "updated_by": res.UpdatedBy, + "deleted_by": res.DeletedBy, "created_at": res.CreatedAt, "updated_at": res.UpdatedAt, "deleted_at": res.DeletedAt, @@ -496,14 +506,6 @@ func (s Store) collectFederationSharedModuleCursorValues(res *types.SharedModule cursor.Set(c, res.ID, false) pkId = true - case "created_at": - cursor.Set(c, res.CreatedAt, false) - - case "updated_at": - cursor.Set(c, res.UpdatedAt, false) - - case "deleted_at": - cursor.Set(c, res.DeletedAt, false) } } diff --git a/store/rdbms/rdbms_schema.go b/store/rdbms/rdbms_schema.go index bcd25149a..92d28becc 100644 --- a/store/rdbms/rdbms_schema.go +++ b/store/rdbms/rdbms_schema.go @@ -503,6 +503,7 @@ func (Schema) FederationModuleShared() *Table { ColumnDef("xref_module", ColumnTypeIdentifier), ColumnDef("fields", ColumnTypeText), CUDTimestamps, + CUDUsers, ) } @@ -516,6 +517,7 @@ func (Schema) FederationModuleExposed() *Table { ColumnDef("rel_compose_namespace", ColumnTypeIdentifier), ColumnDef("fields", ColumnTypeText), CUDTimestamps, + CUDUsers, AddIndex("unique_node_compose_module", IColumn("rel_node", "rel_compose_module", "rel_compose_namespace")), )