3
0

Added bidirectional support, created fields on modules, user check on sync.

This commit is contained in:
Peter Grlica 2020-11-19 08:15:40 +01:00
parent 39eb2c14d6
commit c1d4350a07
20 changed files with 554 additions and 393 deletions

View File

@ -6,6 +6,7 @@ import (
cs "github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/federation/service"
ss "github.com/cortezaproject/corteza-server/system/service"
"github.com/spf13/cobra"
)
@ -15,7 +16,9 @@ func commandSyncData(ctx context.Context) func(*cobra.Command, []string) {
&service.Syncer{},
&service.Mapper{},
service.DefaultSharedModule,
cs.DefaultRecord)
cs.DefaultRecord,
ss.DefaultUser,
ss.DefaultRole)
syncData := service.WorkerData(syncService, service.DefaultLogger)
syncData.Watch(ctx, time.Second*30, 50)

View File

@ -6,6 +6,7 @@ import (
cs "github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/federation/service"
ss "github.com/cortezaproject/corteza-server/system/service"
"github.com/spf13/cobra"
)
@ -15,9 +16,11 @@ func commandSyncStructure(ctx context.Context) func(*cobra.Command, []string) {
&service.Syncer{},
&service.Mapper{},
service.DefaultSharedModule,
cs.DefaultRecord)
cs.DefaultRecord,
ss.DefaultUser,
ss.DefaultRole)
syncStructure := service.WorkerStructure(syncService, service.DefaultLogger)
syncStructure.Watch(ctx, time.Second*10, 10)
syncStructure.Watch(ctx, time.Second*service.DefaultOptions.StructureMonitorInterval, service.DefaultOptions.StructurePageSize)
}
}

View File

@ -3,6 +3,7 @@ package rest
import (
"context"
"fmt"
"strings"
"time"
cs "github.com/cortezaproject/corteza-server/compose/service"
@ -11,6 +12,8 @@ import (
"github.com/cortezaproject/corteza-server/federation/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/filter"
ss "github.com/cortezaproject/corteza-server/system/service"
st "github.com/cortezaproject/corteza-server/system/types"
)
type (
@ -106,11 +109,12 @@ func (ctrl SyncData) ReadExposedAll(ctx context.Context, r *request.SyncDataRead
func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExposed) (interface{}, error) {
var (
err error
em *types.ExposedModule
err error
em *types.ExposedModule
users st.UserSet
)
if _, err := service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil {
if _, err = service.DefaultNode.FindBySharedNodeID(ctx, r.NodeID); err != nil {
return nil, err
}
@ -118,9 +122,18 @@ func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExp
return nil, err
}
if users, _, err = ss.DefaultUser.With(ctx).Find(st.UserFilter{}); err != nil {
return nil, err
}
users, _ = users.Filter(func(u *st.User) (bool, error) {
return strings.Contains(u.Handle, "federation_"), nil
})
f := ct.RecordFilter{
ModuleID: em.ComposeModuleID,
Query: buildLastSyncQuery(r.LastSync),
Check: ignoreFederated(users),
}
if f.Paging, err = filter.NewPaging(r.Limit, r.PageCursor); err != nil {
@ -138,13 +151,7 @@ func (ctrl SyncData) ReadExposed(ctx context.Context, r *request.SyncDataReadExp
}
// do the actual field filtering
err = list.Walk(func(r *ct.Record) error {
r.Values, err = r.Values.Filter(func(rv *ct.RecordValue) (bool, error) {
return em.Fields.HasField(rv.Name)
})
return err
})
err = list.Walk(filterExposedFields(em))
if err != nil {
return nil, err
@ -172,3 +179,33 @@ func buildLastSyncQuery(ts uint64) string {
t.UTC().Format(time.RFC3339),
t.UTC().Format(time.RFC3339))
}
// ignoreFederated matches the created user id with any
// of the generated federated users and omits it
func ignoreFederated(users st.UserSet) func(r *ct.Record) (bool, error) {
return func(r *ct.Record) (bool, error) {
var keep = true
users.Walk(func(u *st.User) error {
if u.ID == r.CreatedBy {
keep = false
return nil
}
return nil
})
return keep, nil
}
}
// filterExposedFields omits the fields that are not exposed as defined
// in the exposed module definition in store
func filterExposedFields(em *types.ExposedModule) func(r *ct.Record) error {
return func(r *ct.Record) error {
var err error
r.Values, err = r.Values.Filter(func(rv *ct.RecordValue) (bool, error) {
return em.Fields.HasField(rv.Name)
})
return err
}
}

View File

@ -8,6 +8,7 @@ import (
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/store"
)
@ -110,6 +111,7 @@ func (svc exposedModule) Update(ctx context.Context, updated *types.ExposedModul
updated.UpdatedAt = now()
updated.CreatedAt = old.CreatedAt
updated.UpdatedBy = auth.GetIdentityFromContext(ctx).Identity()
// set labels
AddFederationLabel(m, node.BaseURL)
@ -205,12 +207,6 @@ func (svc exposedModule) Create(ctx context.Context, new *types.ExposedModule) (
aProps = &exposedModuleActionProps{create: new}
)
// check if compose module actually exists
// TODO - how do we handle namespace?
// if _, err := svc.compose.With(ctx).FindByID(r.NamespaceID, new.ComposeModuleID); err == nil {
// return nil, ExposedModuleErrComposeModuleNotFound()
// }
err := store.Tx(ctx, svc.store, func(ctx context.Context, s store.Storer) (err error) {
// TODO
// if !svc.ac.CanCreateFederationExposedModule(ctx, ns) {
@ -235,13 +231,12 @@ func (svc exposedModule) Create(ctx context.Context, new *types.ExposedModule) (
// Check for node - compose.Module combo
if err = svc.uniqueCheck(ctx, new); err != nil {
return ExposedModuleErrNotUnique()
return err
}
new.ID = nextID()
new.CreatedAt = *now()
new.UpdatedAt = nil
new.DeletedAt = nil
new.CreatedBy = auth.GetIdentityFromContext(ctx).Identity()
// check if Fields can be unmarshaled to the fields structure
if new.Fields != nil {
@ -273,7 +268,9 @@ func (svc exposedModule) uniqueCheck(ctx context.Context, m *types.ExposedModule
ComposeNamespaceID: m.ComposeNamespaceID,
}
if set, _, err := store.SearchFederationExposedModules(ctx, svc.store, f); len(set) > 0 && err == nil {
set, _, err := store.SearchFederationExposedModules(ctx, svc.store, f)
if len(set) > 0 && err == nil {
return ExposedModuleErrNotUnique()
} else if err != nil {
return err

View File

@ -16,6 +16,7 @@ import (
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/system/service"
ss "github.com/cortezaproject/corteza-server/system/service"
"github.com/cortezaproject/corteza-server/system/types"
"go.uber.org/zap"
)
@ -137,11 +138,17 @@ func Initialize(ctx context.Context, log *zap.Logger, s store.Storer, c Config)
}
func Watchers(ctx context.Context) {
if !DefaultOptions.Enabled {
return
}
syncService := NewSync(
&Syncer{},
&Mapper{},
DefaultSharedModule,
cs.DefaultRecord)
cs.DefaultRecord,
ss.DefaultUser,
ss.DefaultRole)
syncStructure := WorkerStructure(syncService, DefaultLogger)
syncData := WorkerData(syncService, service.DefaultLogger)

View File

@ -6,6 +6,7 @@ import (
composeService "github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/store"
)
@ -68,8 +69,7 @@ func (svc sharedModule) Create(ctx context.Context, new *types.SharedModule) (*t
new.ID = nextID()
new.CreatedAt = *now()
new.UpdatedAt = nil
new.DeletedAt = nil
new.CreatedBy = auth.GetIdentityFromContext(ctx).Identity()
// check if Fields can be unmarshaled to the fields structure
if new.Fields != nil {
@ -94,6 +94,7 @@ func (svc sharedModule) Update(ctx context.Context, updated *types.SharedModule)
err := store.Tx(ctx, svc.store, func(ctx context.Context, s store.Storer) (err error) {
updated.UpdatedAt = now()
updated.UpdatedBy = auth.GetIdentityFromContext(ctx).Identity()
aProps.setModule(updated)

View File

@ -3,6 +3,7 @@ package service
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
@ -10,6 +11,8 @@ import (
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/system/service"
ss "github.com/cortezaproject/corteza-server/system/service"
st "github.com/cortezaproject/corteza-server/system/types"
)
type (
@ -18,15 +21,19 @@ type (
mapper *Mapper
sharedModuleService SharedModuleService
composeRecordService cs.RecordService
systemUserService ss.UserService
systemRoleService ss.RoleService
}
)
func NewSync(s *Syncer, m *Mapper, sm SharedModuleService, cs cs.RecordService) *Sync {
func NewSync(s *Syncer, m *Mapper, sm SharedModuleService, cs cs.RecordService, us ss.UserService, rs ss.RoleService) *Sync {
return &Sync{
syncer: s,
mapper: m,
sharedModuleService: sm,
composeRecordService: cs,
systemUserService: us,
systemRoleService: rs,
}
}
@ -143,3 +150,28 @@ func (s *Sync) GetLastSyncTime(ctx context.Context, nodeID uint64, syncType stri
return &ns.TimeOfAction, nil
}
// LoadUserWithRoles gets the federation user, that was
// created at node pairing process
func (s *Sync) LoadUserWithRoles(ctx context.Context, nodeID uint64) (*st.User, error) {
var (
u *st.User
err error
)
// get the federated user, associated for this node
if u, err = s.systemUserService.With(ctx).FindByHandle(fmt.Sprintf("federation_%d", nodeID)); err != nil {
return nil, err
}
// attach the roles
rr, _, err := s.systemRoleService.With(ctx).Find(st.RoleFilter{MemberID: u.ID})
if err != nil {
return nil, err
}
u.SetRoles(rr.IDs())
return u, nil
}

View File

@ -8,7 +8,10 @@ import (
ct "github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/decoder"
st "github.com/cortezaproject/corteza-server/system/types"
"github.com/davecgh/go-spew/spew"
"go.uber.org/zap"
)
@ -29,6 +32,8 @@ type (
ModuleMappings *types.ModuleFieldMappingSet
ModuleMappingValues *ct.RecordValueSet
SyncService *Sync
Node *types.Node
User *st.User
}
)
@ -61,6 +66,16 @@ func (w *syncWorkerData) PrepareForNodes(ctx context.Context, urls chan Url) {
// get all shared modules and their module mappings
for _, n := range nodes {
// get the user, associated for this node
u, err := w.syncService.LoadUserWithRoles(ctx, n.ID)
if err != nil {
w.logger.Info("could not preload federation user, skipping", zap.Uint64("nodeID", n.ID), zap.Error(err))
continue
}
spew.Dump("user", u, u.Roles())
set, err := w.syncService.GetSharedModules(ctx, n.ID)
if err != nil {
@ -91,7 +106,7 @@ func (w *syncWorkerData) PrepareForNodes(ctx context.Context, urls chan Url) {
// get the last sync per-node
lastSync, _ := w.syncService.GetLastSyncTime(ctx, n.ID, types.NodeSyncTypeData)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", n.SharedNodeID, sm.ExternalFederationModuleID)
basePath := fmt.Sprintf("/nodes/%d/modules/%d/records/", n.SharedNodeID, sm.ExternalFederationModuleID)
z := []zap.Field{
zap.Uint64("nodeID", n.ID),
@ -122,6 +137,8 @@ func (w *syncWorkerData) PrepareForNodes(ctx context.Context, urls chan Url) {
ModuleMappingValues: &mappingValues,
NodeBaseURL: n.BaseURL,
SyncService: w.syncService,
User: u,
Node: n,
}
go w.queueUrl(&url, urls, processer)
@ -173,6 +190,9 @@ func (w *syncWorkerData) Watch(ctx context.Context, delay time.Duration, limit i
continue
}
// use the authToken from node pairing
ctx = context.WithValue(ctx, FederationUserToken, url.Meta.(*dataProcesser).Node.AuthToken)
responseBody, err := w.syncService.FetchUrl(ctx, s)
if err != nil {
@ -194,7 +214,7 @@ func (w *syncWorkerData) Watch(ctx context.Context, delay time.Duration, limit i
continue
}
basePath := fmt.Sprintf("/federation/nodes/%d/modules/%d/records/", p.Meta.(*dataProcesser).NodeID, p.Meta.(*dataProcesser).ID)
basePath := fmt.Sprintf("/nodes/%d/modules/%d/records/", p.Meta.(*dataProcesser).NodeID, p.Meta.(*dataProcesser).ID)
u := types.SyncerURI{
BaseURL: p.Meta.(*dataProcesser).NodeBaseURL,
@ -242,10 +262,11 @@ func (w *syncWorkerData) Watch(ctx context.Context, delay time.Duration, limit i
// uses the decode package to decode the whole set, depending on
// the filtering that was used (limit)
func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (int, error) {
processed := 0
o, err := decoder.DecodeFederationRecordSync([]byte(payload))
spew.Dump("got records", o)
if err != nil {
return processed, err
}
@ -254,6 +275,9 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (int, erro
return processed, nil
}
// get the user that is tied to this node
ctx = auth.SetIdentityToContext(ctx, dp.User)
for _, er := range o {
dp.SyncService.mapper.Merge(&er.Values, dp.ModuleMappingValues, dp.ModuleMappings)
@ -268,6 +292,7 @@ func (dp *dataProcesser) Process(ctx context.Context, payload []byte) (int, erro
_, err := dp.SyncService.CreateRecord(ctx, rec)
if err != nil {
spew.Dump("Error saving record", err, rec)
continue
}

View File

@ -7,7 +7,9 @@ import (
"time"
"github.com/cortezaproject/corteza-server/federation/types"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/decoder"
st "github.com/cortezaproject/corteza-server/system/types"
"github.com/davecgh/go-spew/spew"
"go.uber.org/zap"
)
@ -25,6 +27,8 @@ type (
SharedNodeID uint64
NodeBaseURL string
SyncService *Sync
Node *types.Node
User *st.User
}
)
@ -59,9 +63,20 @@ func (w *syncWorkerStructure) PrepareForNodes(ctx context.Context, urls chan Url
// get all shared modules and their module mappings
for _, n := range nodes {
// get the user, associated for this node
u, err := w.syncService.LoadUserWithRoles(ctx, n.ID)
if err != nil {
w.logger.Info("could not preload federation user, skipping", zap.Uint64("nodeID", n.ID), zap.Error(err))
continue
}
spew.Dump("user", u, u.Roles())
// get the last sync per-node
lastSync, _ := w.syncService.GetLastSyncTime(ctx, n.ID, types.NodeSyncTypeStructure)
basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", n.SharedNodeID)
basePath := fmt.Sprintf("/nodes/%d/modules/exposed/", n.SharedNodeID)
z := []zap.Field{
zap.Uint64("nodeID", n.ID),
@ -88,6 +103,8 @@ func (w *syncWorkerStructure) PrepareForNodes(ctx context.Context, urls chan Url
SharedNodeID: n.SharedNodeID,
NodeBaseURL: n.BaseURL,
SyncService: w.syncService,
User: u,
Node: n,
}
go w.queueUrl(&url, urls, processer)
@ -138,6 +155,9 @@ func (w *syncWorkerStructure) Watch(ctx context.Context, delay time.Duration, li
continue
}
// use the authToken from node pairing
ctx = context.WithValue(ctx, FederationUserToken, url.Meta.(*structureProcesser).Node.AuthToken)
responseBody, err := w.syncService.FetchUrl(ctx, s)
if err != nil {
@ -160,7 +180,7 @@ func (w *syncWorkerStructure) Watch(ctx context.Context, delay time.Duration, li
continue
}
basePath := fmt.Sprintf("/federation/nodes/%d/modules/exposed/", p.Meta.(*structureProcesser).SharedNodeID)
basePath := fmt.Sprintf("/nodes/%d/modules/exposed/", p.Meta.(*structureProcesser).SharedNodeID)
u := types.SyncerURI{
BaseURL: p.Meta.(*structureProcesser).NodeBaseURL,
@ -206,7 +226,6 @@ func (w *syncWorkerStructure) Watch(ctx context.Context, delay time.Duration, li
// the filtering that was used (limit)
func (dp *structureProcesser) Process(ctx context.Context, payload []byte) (int, error) {
processed := 0
o, err := decoder.DecodeFederationModuleSync([]byte(payload))
if err != nil {
@ -217,6 +236,9 @@ func (dp *structureProcesser) Process(ctx context.Context, payload []byte) (int,
return processed, nil
}
// get the user that is tied to this node
ctx = auth.SetIdentityToContext(ctx, dp.User)
for _, em := range o {
new := &types.SharedModule{
NodeID: dp.NodeID,

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"net/http"
"net/http/httputil"
"time"
"github.com/cortezaproject/corteza-server/federation/types"
@ -38,6 +39,8 @@ type (
}
)
const FederationUserToken string = "authToken"
func (h *Syncer) Queue(url Url, out chan Url) {
out <- url
}
@ -53,6 +56,13 @@ func (h *Syncer) Fetch(ctx context.Context, url string) (io.Reader, error) {
return nil, err
}
if authToken := ctx.Value(FederationUserToken); authToken != nil {
req.Header.Add("Authorization", `Bearer `+authToken.(string))
}
dr, _ := httputil.DumpRequest(req, true)
spew.Dump("> using headers", string(dr))
resp, err := client.Do(req)
if err != nil {
spew.Dump("ERR", err)

View File

@ -17,8 +17,11 @@ type (
Fields ModuleFieldSet `json:"fields"`
CreatedAt time.Time `json:"createdAt,omitempty"`
CreatedBy uint64 `json:"createdBy,string" `
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
UpdatedBy uint64 `json:"updatedBy,string,omitempty" `
DeletedAt *time.Time `json:"deletedAt,omitempty"`
DeletedBy uint64 `json:"deletedBy,string,omitempty" `
}
ExposedModuleFilter struct {

View File

@ -16,8 +16,11 @@ type (
Fields ModuleFieldSet `json:"fields"`
CreatedAt time.Time `json:"createdAt,omitempty"`
CreatedBy uint64 `json:"createdBy,string" `
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
UpdatedBy uint64 `json:"updatedBy,string,omitempty" `
DeletedAt *time.Time `json:"deletedAt,omitempty"`
DeletedBy uint64 `json:"deletedBy,string,omitempty" `
}
SharedModuleFilter struct {

View File

@ -12,9 +12,12 @@ fields:
- { field: ComposeModuleID }
- { field: ComposeNamespaceID }
- { field: Fields, type: "json.Text" }
- { field: CreatedAt, sortable: true }
- { field: UpdatedAt, sortable: true }
- { field: DeletedAt, sortable: true }
- { field: CreatedBy }
- { field: UpdatedBy }
- { field: DeletedBy }
- { field: CreatedAt }
- { field: UpdatedAt }
- { field: DeletedAt }
lookups:
- fields: [ID]

View File

@ -11,9 +11,12 @@ fields:
- { field: Name }
- { field: ExternalFederationModuleID }
- { field: Fields, type: "json.Text" }
- { field: CreatedAt, sortable: true }
- { field: UpdatedAt, sortable: true }
- { field: DeletedAt, sortable: true }
- { field: CreatedBy }
- { field: UpdatedBy }
- { field: DeletedBy }
- { field: CreatedAt }
- { field: UpdatedAt }
- { field: DeletedAt }
lookups:
- fields: [ID]

View File

@ -30,44 +30,50 @@ func (s Store) SearchFederationExposedModules(ctx context.Context, f types.Expos
set []*types.ExposedModule
q squirrel.SelectBuilder
)
q, err = s.convertFederationExposedModuleFilter(f)
if err != nil {
return nil, f, err
}
// Cleanup anything we've accidentally received...
f.PrevPage, f.NextPage = nil, nil
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse
// Sorting and paging are both enabled in definition yaml file
// {search: {enableSorting:true, enablePaging:true}}
curSort := f.Sort.Clone()
// If paging with reverse cursor, change the sorting
// direction for all columns we're sorting by
if reversedCursor {
curSort.Reverse()
}
return set, f, func() error {
set, err = s.fetchFullPageOfFederationExposedModules(ctx, q, curSort, f.PageCursor, f.Limit, f.Check)
q, err = s.convertFederationExposedModuleFilter(f)
if err != nil {
return err
}
if f.Limit > 0 && len(set) > 0 {
if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) {
f.PrevPage = s.collectFederationExposedModuleCursorValues(set[0], curSort.Columns()...)
f.PrevPage.Reverse = true
}
// Paging enabled
// {search: {enablePaging:true}}
// Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned)
f.PrevPage, f.NextPage = nil, nil
// Less items fetched then requested by page-limit
// not very likely there's another page
f.NextPage = s.collectFederationExposedModuleCursorValues(set[len(set)-1], curSort.Columns()...)
if f.PageCursor != nil {
// Page cursor exists so we need to validate it against used sort
if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil {
return err
}
}
if len(f.Sort) == 0 {
f.Sort = filter.SortExprSet{}
}
// Make sure results are always sorted at least by primary keys
if f.Sort.Get("id") == nil {
f.Sort = append(f.Sort, &filter.SortExpr{Column: "id"})
}
sort := f.Sort.Clone()
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
if f.PageCursor != nil && f.PageCursor.Reverse {
sort.Reverse()
}
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationExposedModuleColumns()); err != nil {
return err
}
set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationExposedModules(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check)
if err != nil {
return err
}
f.PageCursor = nil
@ -79,58 +85,62 @@ func (s Store) SearchFederationExposedModules(ctx context.Context, f types.Expos
//
// Function applies:
// - cursor conditions (where ...)
// - sorting rules (order by ...)
// - limit
//
// Main responsibility of this function is to perform additional sequential queries in case when not enough results
// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched
// are collected due to failed check on a specific row (by check fn).
//
// Function then moves cursor to the last item fetched
func (s Store) fetchFullPageOfFederationExposedModules(
ctx context.Context,
q squirrel.SelectBuilder,
sort filter.SortExprSet,
sortColumns []string,
sortDesc bool,
cursor *filter.PagingCursor,
limit uint,
check func(*types.ExposedModule) (bool, error),
) ([]*types.ExposedModule, error) {
) (set []*types.ExposedModule, prev, next *filter.PagingCursor, err error) {
var (
set = make([]*types.ExposedModule, 0, DefaultSliceCapacity)
aux []*types.ExposedModule
last *types.ExposedModule
aux []*types.ExposedModule
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor = cursor != nil && cursor.Reverse
reversedOrder = cursor != nil && cursor.Reverse
// copy of the select builder
tryQuery squirrel.SelectBuilder
fetched uint
err error
)
// Make sure we always end our sort by primary keys
if sort.Get("id") == nil {
sort = append(sort, &filter.SortExpr{Column: "id"})
}
set = make([]*types.ExposedModule, 0, DefaultSliceCapacity)
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationExposedModuleColumns()); err != nil {
return nil, err
if cursor != nil {
cursor.Reverse = sortDesc
}
for try := 0; try < MaxRefetches; try++ {
tryQuery = setCursorCond(q, cursor)
if limit > 0 {
tryQuery = tryQuery.Limit(uint64(limit))
tryQuery = tryQuery.Limit(uint64(limit + 1))
}
if aux, fetched, last, err = s.QueryFederationExposedModules(ctx, tryQuery, check); err != nil {
return nil, err
if aux, err = s.QueryFederationExposedModules(ctx, tryQuery, check); err != nil {
return nil, nil, nil, err
}
if limit > 0 && uint(len(aux)) >= limit {
fetched = uint(len(aux))
if cursor != nil && prev == nil && fetched > 0 {
// Cursor for previous page is calculated only when cursor is used (so, not on first page)
prev = s.collectFederationExposedModuleCursorValues(aux[0], sortColumns...)
}
// Point cursor to the last fetched element
if fetched > limit && limit > 0 {
next = s.collectFederationExposedModuleCursorValues(aux[limit-1], sortColumns...)
// we should use only as much as requested
set = append(set, aux[0:limit]...)
set = append(set, aux[:limit]...)
break
} else {
set = append(set, aux...)
@ -138,7 +148,7 @@ func (s Store) fetchFullPageOfFederationExposedModules(
// if limit is not set or we've already collected enough items
// we can break the loop right away
if limit == 0 || fetched == 0 || fetched < limit {
if limit == 0 || fetched == 0 || fetched <= limit {
break
}
@ -149,23 +159,23 @@ func (s Store) fetchFullPageOfFederationExposedModules(
}
// @todo improve strategy for collecting next page with lower limit
// Point cursor to the last fetched element
if cursor = s.collectFederationExposedModuleCursorValues(last, sort.Columns()...); cursor == nil {
break
}
}
if reversedCursor {
// Cursor for previous page was used
// Fetched set needs to be reverseCursor because we've forced a descending order to
// get the previous page
if reversedOrder {
// Fetched set needs to be reversed because we've forced a descending order to get the previous page
for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 {
set[i], set[j] = set[j], set[i]
}
// and flip prev/next cursors too
prev, next = next, prev
}
return set, nil
if prev != nil {
prev.Reverse = true
}
return set, prev, next, nil
}
// QueryFederationExposedModules queries the database, converts and checks each row and
@ -177,39 +187,35 @@ func (s Store) QueryFederationExposedModules(
ctx context.Context,
q squirrel.Sqlizer,
check func(*types.ExposedModule) (bool, error),
) ([]*types.ExposedModule, uint, *types.ExposedModule, error) {
) ([]*types.ExposedModule, error) {
var (
set = make([]*types.ExposedModule, 0, DefaultSliceCapacity)
res *types.ExposedModule
// Query rows with
rows, err = s.Query(ctx, q)
fetched uint
)
if err != nil {
return nil, 0, nil, err
return nil, err
}
defer rows.Close()
for rows.Next() {
fetched++
if err = rows.Err(); err == nil {
res, err = s.internalFederationExposedModuleRowScanner(rows)
}
if err != nil {
return nil, 0, nil, err
return nil, err
}
// If check function is set, call it and act accordingly
// check fn set, call it and see if it passed the test
// if not, skip the item
if check != nil {
if chk, err := check(res); err != nil {
return nil, 0, nil, err
return nil, err
} else if !chk {
// did not pass the check
// go with the next row
continue
}
}
@ -217,7 +223,7 @@ func (s Store) QueryFederationExposedModules(
set = append(set, res)
}
return set, fetched, res, rows.Err()
return set, rows.Err()
}
// LookupFederationExposedModuleByID searches for federation module by ID
@ -383,6 +389,9 @@ func (s Store) internalFederationExposedModuleRowScanner(row rowScanner) (res *t
&res.ComposeModuleID,
&res.ComposeNamespaceID,
&res.Fields,
&res.CreatedBy,
&res.UpdatedBy,
&res.DeletedBy,
&res.CreatedAt,
&res.UpdatedAt,
&res.DeletedAt,
@ -432,25 +441,23 @@ func (Store) federationExposedModuleColumns(aa ...string) []string {
alias + "rel_compose_module",
alias + "rel_compose_namespace",
alias + "fields",
alias + "created_by",
alias + "updated_by",
alias + "deleted_by",
alias + "created_at",
alias + "updated_at",
alias + "deleted_at",
}
}
// {true true true true true}
// {true true false true true true}
// sortableFederationExposedModuleColumns returns all FederationExposedModule columns flagged as sortable
//
// With optional string arg, all columns are returned aliased
func (Store) sortableFederationExposedModuleColumns() map[string]string {
return map[string]string{
"id": "id", "created_at": "created_at",
"createdat": "created_at",
"updated_at": "updated_at",
"updatedat": "updated_at",
"deleted_at": "deleted_at",
"deletedat": "deleted_at",
"id": "id",
}
}
@ -467,6 +474,9 @@ func (s Store) internalFederationExposedModuleEncoder(res *types.ExposedModule)
"rel_compose_module": res.ComposeModuleID,
"rel_compose_namespace": res.ComposeNamespaceID,
"fields": res.Fields,
"created_by": res.CreatedBy,
"updated_by": res.UpdatedBy,
"deleted_by": res.DeletedBy,
"created_at": res.CreatedAt,
"updated_at": res.UpdatedAt,
"deleted_at": res.DeletedAt,
@ -499,14 +509,6 @@ func (s Store) collectFederationExposedModuleCursorValues(res *types.ExposedModu
cursor.Set(c, res.ID, false)
pkId = true
case "created_at":
cursor.Set(c, res.CreatedAt, false)
case "updated_at":
cursor.Set(c, res.UpdatedAt, false)
case "deleted_at":
cursor.Set(c, res.DeletedAt, false)
}
}

View File

@ -30,44 +30,56 @@ func (s Store) SearchFederationModuleMappings(ctx context.Context, f types.Modul
set []*types.ModuleMapping
q squirrel.SelectBuilder
)
q, err = s.convertFederationModuleMappingFilter(f)
if err != nil {
return nil, f, err
}
// Cleanup anything we've accidentally received...
f.PrevPage, f.NextPage = nil, nil
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse
// Sorting and paging are both enabled in definition yaml file
// {search: {enableSorting:true, enablePaging:true}}
curSort := f.Sort.Clone()
// If paging with reverse cursor, change the sorting
// direction for all columns we're sorting by
if reversedCursor {
curSort.Reverse()
}
return set, f, func() error {
set, err = s.fetchFullPageOfFederationModuleMappings(ctx, q, curSort, f.PageCursor, f.Limit, f.Check)
q, err = s.convertFederationModuleMappingFilter(f)
if err != nil {
return err
}
if f.Limit > 0 && len(set) > 0 {
if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) {
f.PrevPage = s.collectFederationModuleMappingCursorValues(set[0], curSort.Columns()...)
f.PrevPage.Reverse = true
}
// Paging enabled
// {search: {enablePaging:true}}
// Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned)
f.PrevPage, f.NextPage = nil, nil
// Less items fetched then requested by page-limit
// not very likely there's another page
f.NextPage = s.collectFederationModuleMappingCursorValues(set[len(set)-1], curSort.Columns()...)
if f.PageCursor != nil {
// Page cursor exists so we need to validate it against used sort
if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil {
return err
}
}
if len(f.Sort) == 0 {
f.Sort = filter.SortExprSet{}
}
// Make sure results are always sorted at least by primary keys
if f.Sort.Get("rel_federation_module") == nil {
f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_federation_module"})
}
if f.Sort.Get("rel_compose_module") == nil {
f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_compose_module"})
}
if f.Sort.Get("rel_compose_namespace") == nil {
f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_compose_namespace"})
}
sort := f.Sort.Clone()
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
if f.PageCursor != nil && f.PageCursor.Reverse {
sort.Reverse()
}
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationModuleMappingColumns()); err != nil {
return err
}
set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationModuleMappings(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check)
if err != nil {
return err
}
f.PageCursor = nil
@ -79,66 +91,62 @@ func (s Store) SearchFederationModuleMappings(ctx context.Context, f types.Modul
//
// Function applies:
// - cursor conditions (where ...)
// - sorting rules (order by ...)
// - limit
//
// Main responsibility of this function is to perform additional sequential queries in case when not enough results
// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched
// are collected due to failed check on a specific row (by check fn).
//
// Function then moves cursor to the last item fetched
func (s Store) fetchFullPageOfFederationModuleMappings(
ctx context.Context,
q squirrel.SelectBuilder,
sort filter.SortExprSet,
sortColumns []string,
sortDesc bool,
cursor *filter.PagingCursor,
limit uint,
check func(*types.ModuleMapping) (bool, error),
) ([]*types.ModuleMapping, error) {
) (set []*types.ModuleMapping, prev, next *filter.PagingCursor, err error) {
var (
set = make([]*types.ModuleMapping, 0, DefaultSliceCapacity)
aux []*types.ModuleMapping
last *types.ModuleMapping
aux []*types.ModuleMapping
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor = cursor != nil && cursor.Reverse
reversedOrder = cursor != nil && cursor.Reverse
// copy of the select builder
tryQuery squirrel.SelectBuilder
fetched uint
err error
)
// Make sure we always end our sort by primary keys
if sort.Get("rel_federation_module") == nil {
sort = append(sort, &filter.SortExpr{Column: "rel_federation_module"})
}
set = make([]*types.ModuleMapping, 0, DefaultSliceCapacity)
if sort.Get("rel_compose_module") == nil {
sort = append(sort, &filter.SortExpr{Column: "rel_compose_module"})
}
if sort.Get("rel_compose_namespace") == nil {
sort = append(sort, &filter.SortExpr{Column: "rel_compose_namespace"})
}
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationModuleMappingColumns()); err != nil {
return nil, err
if cursor != nil {
cursor.Reverse = sortDesc
}
for try := 0; try < MaxRefetches; try++ {
tryQuery = setCursorCond(q, cursor)
if limit > 0 {
tryQuery = tryQuery.Limit(uint64(limit))
tryQuery = tryQuery.Limit(uint64(limit + 1))
}
if aux, fetched, last, err = s.QueryFederationModuleMappings(ctx, tryQuery, check); err != nil {
return nil, err
if aux, err = s.QueryFederationModuleMappings(ctx, tryQuery, check); err != nil {
return nil, nil, nil, err
}
if limit > 0 && uint(len(aux)) >= limit {
fetched = uint(len(aux))
if cursor != nil && prev == nil && fetched > 0 {
// Cursor for previous page is calculated only when cursor is used (so, not on first page)
prev = s.collectFederationModuleMappingCursorValues(aux[0], sortColumns...)
}
// Point cursor to the last fetched element
if fetched > limit && limit > 0 {
next = s.collectFederationModuleMappingCursorValues(aux[limit-1], sortColumns...)
// we should use only as much as requested
set = append(set, aux[0:limit]...)
set = append(set, aux[:limit]...)
break
} else {
set = append(set, aux...)
@ -146,7 +154,7 @@ func (s Store) fetchFullPageOfFederationModuleMappings(
// if limit is not set or we've already collected enough items
// we can break the loop right away
if limit == 0 || fetched == 0 || fetched < limit {
if limit == 0 || fetched == 0 || fetched <= limit {
break
}
@ -157,23 +165,23 @@ func (s Store) fetchFullPageOfFederationModuleMappings(
}
// @todo improve strategy for collecting next page with lower limit
// Point cursor to the last fetched element
if cursor = s.collectFederationModuleMappingCursorValues(last, sort.Columns()...); cursor == nil {
break
}
}
if reversedCursor {
// Cursor for previous page was used
// Fetched set needs to be reverseCursor because we've forced a descending order to
// get the previous page
if reversedOrder {
// Fetched set needs to be reversed because we've forced a descending order to get the previous page
for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 {
set[i], set[j] = set[j], set[i]
}
// and flip prev/next cursors too
prev, next = next, prev
}
return set, nil
if prev != nil {
prev.Reverse = true
}
return set, prev, next, nil
}
// QueryFederationModuleMappings queries the database, converts and checks each row and
@ -185,39 +193,35 @@ func (s Store) QueryFederationModuleMappings(
ctx context.Context,
q squirrel.Sqlizer,
check func(*types.ModuleMapping) (bool, error),
) ([]*types.ModuleMapping, uint, *types.ModuleMapping, error) {
) ([]*types.ModuleMapping, error) {
var (
set = make([]*types.ModuleMapping, 0, DefaultSliceCapacity)
res *types.ModuleMapping
// Query rows with
rows, err = s.Query(ctx, q)
fetched uint
)
if err != nil {
return nil, 0, nil, err
return nil, err
}
defer rows.Close()
for rows.Next() {
fetched++
if err = rows.Err(); err == nil {
res, err = s.internalFederationModuleMappingRowScanner(rows)
}
if err != nil {
return nil, 0, nil, err
return nil, err
}
// If check function is set, call it and act accordingly
// check fn set, call it and see if it passed the test
// if not, skip the item
if check != nil {
if chk, err := check(res); err != nil {
return nil, 0, nil, err
return nil, err
} else if !chk {
// did not pass the check
// go with the next row
continue
}
}
@ -225,7 +229,7 @@ func (s Store) QueryFederationModuleMappings(
set = append(set, res)
}
return set, fetched, res, rows.Err()
return set, rows.Err()
}
// LookupFederationModuleMappingByFederationModuleIDComposeModuleIDComposeNamespaceID searches for module mapping by federation module id and compose module id
@ -449,7 +453,7 @@ func (Store) federationModuleMappingColumns(aa ...string) []string {
}
}
// {true true true true true}
// {true true false true true true}
// sortableFederationModuleMappingColumns returns all FederationModuleMapping columns flagged as sortable
//

View File

@ -29,15 +29,15 @@ func (s Store) SearchFederationNodes(ctx context.Context, f types.NodeFilter) (t
set []*types.Node
q squirrel.SelectBuilder
)
q, err = s.convertFederationNodeFilter(f)
if err != nil {
return nil, f, err
}
return set, f, func() error {
set, _, _, err = s.QueryFederationNodes(ctx, q, f.Check)
return err
q, err = s.convertFederationNodeFilter(f)
if err != nil {
return err
}
set, err = s.QueryFederationNodes(ctx, q, f.Check)
return err
}()
}
@ -50,39 +50,35 @@ func (s Store) QueryFederationNodes(
ctx context.Context,
q squirrel.Sqlizer,
check func(*types.Node) (bool, error),
) ([]*types.Node, uint, *types.Node, error) {
) ([]*types.Node, error) {
var (
set = make([]*types.Node, 0, DefaultSliceCapacity)
res *types.Node
// Query rows with
rows, err = s.Query(ctx, q)
fetched uint
)
if err != nil {
return nil, 0, nil, err
return nil, err
}
defer rows.Close()
for rows.Next() {
fetched++
if err = rows.Err(); err == nil {
res, err = s.internalFederationNodeRowScanner(rows)
}
if err != nil {
return nil, 0, nil, err
return nil, err
}
// If check function is set, call it and act accordingly
// check fn set, call it and see if it passed the test
// if not, skip the item
if check != nil {
if chk, err := check(res); err != nil {
return nil, 0, nil, err
return nil, err
} else if !chk {
// did not pass the check
// go with the next row
continue
}
}
@ -90,7 +86,7 @@ func (s Store) QueryFederationNodes(
set = append(set, res)
}
return set, fetched, res, rows.Err()
return set, rows.Err()
}
// LookupFederationNodeByID searches for federation node by ID
@ -332,7 +328,7 @@ func (Store) federationNodeColumns(aa ...string) []string {
}
}
// {true true false false true}
// {true true false false false true}
// internalFederationNodeEncoder encodes fields from types.Node to store.Payload (map)
//

View File

@ -30,44 +30,50 @@ func (s Store) SearchFederationNodesSyncs(ctx context.Context, f types.NodeSyncF
set []*types.NodeSync
q squirrel.SelectBuilder
)
q, err = s.convertFederationNodesSyncFilter(f)
if err != nil {
return nil, f, err
}
// Cleanup anything we've accidentally received...
f.PrevPage, f.NextPage = nil, nil
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse
// Sorting and paging are both enabled in definition yaml file
// {search: {enableSorting:true, enablePaging:true}}
curSort := f.Sort.Clone()
// If paging with reverse cursor, change the sorting
// direction for all columns we're sorting by
if reversedCursor {
curSort.Reverse()
}
return set, f, func() error {
set, err = s.fetchFullPageOfFederationNodesSyncs(ctx, q, curSort, f.PageCursor, f.Limit, f.Check)
q, err = s.convertFederationNodesSyncFilter(f)
if err != nil {
return err
}
if f.Limit > 0 && len(set) > 0 {
if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) {
f.PrevPage = s.collectFederationNodesSyncCursorValues(set[0], curSort.Columns()...)
f.PrevPage.Reverse = true
}
// Paging enabled
// {search: {enablePaging:true}}
// Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned)
f.PrevPage, f.NextPage = nil, nil
// Less items fetched then requested by page-limit
// not very likely there's another page
f.NextPage = s.collectFederationNodesSyncCursorValues(set[len(set)-1], curSort.Columns()...)
if f.PageCursor != nil {
// Page cursor exists so we need to validate it against used sort
if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil {
return err
}
}
if len(f.Sort) == 0 {
f.Sort = filter.SortExprSet{}
}
// Make sure results are always sorted at least by primary keys
if f.Sort.Get("rel_node") == nil {
f.Sort = append(f.Sort, &filter.SortExpr{Column: "rel_node"})
}
sort := f.Sort.Clone()
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
if f.PageCursor != nil && f.PageCursor.Reverse {
sort.Reverse()
}
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationNodesSyncColumns()); err != nil {
return err
}
set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationNodesSyncs(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check)
if err != nil {
return err
}
f.PageCursor = nil
@ -79,58 +85,62 @@ func (s Store) SearchFederationNodesSyncs(ctx context.Context, f types.NodeSyncF
//
// Function applies:
// - cursor conditions (where ...)
// - sorting rules (order by ...)
// - limit
//
// Main responsibility of this function is to perform additional sequential queries in case when not enough results
// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched
// are collected due to failed check on a specific row (by check fn).
//
// Function then moves cursor to the last item fetched
func (s Store) fetchFullPageOfFederationNodesSyncs(
ctx context.Context,
q squirrel.SelectBuilder,
sort filter.SortExprSet,
sortColumns []string,
sortDesc bool,
cursor *filter.PagingCursor,
limit uint,
check func(*types.NodeSync) (bool, error),
) ([]*types.NodeSync, error) {
) (set []*types.NodeSync, prev, next *filter.PagingCursor, err error) {
var (
set = make([]*types.NodeSync, 0, DefaultSliceCapacity)
aux []*types.NodeSync
last *types.NodeSync
aux []*types.NodeSync
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor = cursor != nil && cursor.Reverse
reversedOrder = cursor != nil && cursor.Reverse
// copy of the select builder
tryQuery squirrel.SelectBuilder
fetched uint
err error
)
// Make sure we always end our sort by primary keys
if sort.Get("rel_node") == nil {
sort = append(sort, &filter.SortExpr{Column: "rel_node"})
}
set = make([]*types.NodeSync, 0, DefaultSliceCapacity)
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationNodesSyncColumns()); err != nil {
return nil, err
if cursor != nil {
cursor.Reverse = sortDesc
}
for try := 0; try < MaxRefetches; try++ {
tryQuery = setCursorCond(q, cursor)
if limit > 0 {
tryQuery = tryQuery.Limit(uint64(limit))
tryQuery = tryQuery.Limit(uint64(limit + 1))
}
if aux, fetched, last, err = s.QueryFederationNodesSyncs(ctx, tryQuery, check); err != nil {
return nil, err
if aux, err = s.QueryFederationNodesSyncs(ctx, tryQuery, check); err != nil {
return nil, nil, nil, err
}
if limit > 0 && uint(len(aux)) >= limit {
fetched = uint(len(aux))
if cursor != nil && prev == nil && fetched > 0 {
// Cursor for previous page is calculated only when cursor is used (so, not on first page)
prev = s.collectFederationNodesSyncCursorValues(aux[0], sortColumns...)
}
// Point cursor to the last fetched element
if fetched > limit && limit > 0 {
next = s.collectFederationNodesSyncCursorValues(aux[limit-1], sortColumns...)
// we should use only as much as requested
set = append(set, aux[0:limit]...)
set = append(set, aux[:limit]...)
break
} else {
set = append(set, aux...)
@ -138,7 +148,7 @@ func (s Store) fetchFullPageOfFederationNodesSyncs(
// if limit is not set or we've already collected enough items
// we can break the loop right away
if limit == 0 || fetched == 0 || fetched < limit {
if limit == 0 || fetched == 0 || fetched <= limit {
break
}
@ -149,23 +159,23 @@ func (s Store) fetchFullPageOfFederationNodesSyncs(
}
// @todo improve strategy for collecting next page with lower limit
// Point cursor to the last fetched element
if cursor = s.collectFederationNodesSyncCursorValues(last, sort.Columns()...); cursor == nil {
break
}
}
if reversedCursor {
// Cursor for previous page was used
// Fetched set needs to be reverseCursor because we've forced a descending order to
// get the previous page
if reversedOrder {
// Fetched set needs to be reversed because we've forced a descending order to get the previous page
for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 {
set[i], set[j] = set[j], set[i]
}
// and flip prev/next cursors too
prev, next = next, prev
}
return set, nil
if prev != nil {
prev.Reverse = true
}
return set, prev, next, nil
}
// QueryFederationNodesSyncs queries the database, converts and checks each row and
@ -177,39 +187,35 @@ func (s Store) QueryFederationNodesSyncs(
ctx context.Context,
q squirrel.Sqlizer,
check func(*types.NodeSync) (bool, error),
) ([]*types.NodeSync, uint, *types.NodeSync, error) {
) ([]*types.NodeSync, error) {
var (
set = make([]*types.NodeSync, 0, DefaultSliceCapacity)
res *types.NodeSync
// Query rows with
rows, err = s.Query(ctx, q)
fetched uint
)
if err != nil {
return nil, 0, nil, err
return nil, err
}
defer rows.Close()
for rows.Next() {
fetched++
if err = rows.Err(); err == nil {
res, err = s.internalFederationNodesSyncRowScanner(rows)
}
if err != nil {
return nil, 0, nil, err
return nil, err
}
// If check function is set, call it and act accordingly
// check fn set, call it and see if it passed the test
// if not, skip the item
if check != nil {
if chk, err := check(res); err != nil {
return nil, 0, nil, err
return nil, err
} else if !chk {
// did not pass the check
// go with the next row
continue
}
}
@ -217,7 +223,7 @@ func (s Store) QueryFederationNodesSyncs(
set = append(set, res)
}
return set, fetched, res, rows.Err()
return set, rows.Err()
}
// LookupFederationNodesSyncByNodeID searches for sync activity by node ID
@ -437,7 +443,7 @@ func (Store) federationNodesSyncColumns(aa ...string) []string {
}
}
// {true true true true true}
// {true true false true true true}
// sortableFederationNodesSyncColumns returns all FederationNodesSync columns flagged as sortable
//

View File

@ -30,44 +30,50 @@ func (s Store) SearchFederationSharedModules(ctx context.Context, f types.Shared
set []*types.SharedModule
q squirrel.SelectBuilder
)
q, err = s.convertFederationSharedModuleFilter(f)
if err != nil {
return nil, f, err
}
// Cleanup anything we've accidentally received...
f.PrevPage, f.NextPage = nil, nil
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor := f.PageCursor != nil && f.PageCursor.Reverse
// Sorting and paging are both enabled in definition yaml file
// {search: {enableSorting:true, enablePaging:true}}
curSort := f.Sort.Clone()
// If paging with reverse cursor, change the sorting
// direction for all columns we're sorting by
if reversedCursor {
curSort.Reverse()
}
return set, f, func() error {
set, err = s.fetchFullPageOfFederationSharedModules(ctx, q, curSort, f.PageCursor, f.Limit, f.Check)
q, err = s.convertFederationSharedModuleFilter(f)
if err != nil {
return err
}
if f.Limit > 0 && len(set) > 0 {
if f.PageCursor != nil && (!f.PageCursor.Reverse || uint(len(set)) == f.Limit) {
f.PrevPage = s.collectFederationSharedModuleCursorValues(set[0], curSort.Columns()...)
f.PrevPage.Reverse = true
}
// Paging enabled
// {search: {enablePaging:true}}
// Cleanup unwanted cursors (only relevant is f.PageCursor, next&prev are reset and returned)
f.PrevPage, f.NextPage = nil, nil
// Less items fetched then requested by page-limit
// not very likely there's another page
f.NextPage = s.collectFederationSharedModuleCursorValues(set[len(set)-1], curSort.Columns()...)
if f.PageCursor != nil {
// Page cursor exists so we need to validate it against used sort
if f.Sort, err = f.PageCursor.Sort(f.Sort); err != nil {
return err
}
}
if len(f.Sort) == 0 {
f.Sort = filter.SortExprSet{}
}
// Make sure results are always sorted at least by primary keys
if f.Sort.Get("id") == nil {
f.Sort = append(f.Sort, &filter.SortExpr{Column: "id"})
}
sort := f.Sort.Clone()
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
if f.PageCursor != nil && f.PageCursor.Reverse {
sort.Reverse()
}
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationSharedModuleColumns()); err != nil {
return err
}
set, f.PrevPage, f.NextPage, err = s.fetchFullPageOfFederationSharedModules(ctx, q, sort.Columns(), sort.Reversed(), f.PageCursor, f.Limit, f.Check)
if err != nil {
return err
}
f.PageCursor = nil
@ -79,58 +85,62 @@ func (s Store) SearchFederationSharedModules(ctx context.Context, f types.Shared
//
// Function applies:
// - cursor conditions (where ...)
// - sorting rules (order by ...)
// - limit
//
// Main responsibility of this function is to perform additional sequential queries in case when not enough results
// are collected due to failed check on a specific row (by check fn). Function then moves cursor to the last item fetched
// are collected due to failed check on a specific row (by check fn).
//
// Function then moves cursor to the last item fetched
func (s Store) fetchFullPageOfFederationSharedModules(
ctx context.Context,
q squirrel.SelectBuilder,
sort filter.SortExprSet,
sortColumns []string,
sortDesc bool,
cursor *filter.PagingCursor,
limit uint,
check func(*types.SharedModule) (bool, error),
) ([]*types.SharedModule, error) {
) (set []*types.SharedModule, prev, next *filter.PagingCursor, err error) {
var (
set = make([]*types.SharedModule, 0, DefaultSliceCapacity)
aux []*types.SharedModule
last *types.SharedModule
aux []*types.SharedModule
// When cursor for a previous page is used it's marked as reversed
// This tells us to flip the descending flag on all used sort keys
reversedCursor = cursor != nil && cursor.Reverse
reversedOrder = cursor != nil && cursor.Reverse
// copy of the select builder
tryQuery squirrel.SelectBuilder
fetched uint
err error
)
// Make sure we always end our sort by primary keys
if sort.Get("id") == nil {
sort = append(sort, &filter.SortExpr{Column: "id"})
}
set = make([]*types.SharedModule, 0, DefaultSliceCapacity)
// Apply sorting expr from filter to query
if q, err = setOrderBy(q, sort, s.sortableFederationSharedModuleColumns()); err != nil {
return nil, err
if cursor != nil {
cursor.Reverse = sortDesc
}
for try := 0; try < MaxRefetches; try++ {
tryQuery = setCursorCond(q, cursor)
if limit > 0 {
tryQuery = tryQuery.Limit(uint64(limit))
tryQuery = tryQuery.Limit(uint64(limit + 1))
}
if aux, fetched, last, err = s.QueryFederationSharedModules(ctx, tryQuery, check); err != nil {
return nil, err
if aux, err = s.QueryFederationSharedModules(ctx, tryQuery, check); err != nil {
return nil, nil, nil, err
}
if limit > 0 && uint(len(aux)) >= limit {
fetched = uint(len(aux))
if cursor != nil && prev == nil && fetched > 0 {
// Cursor for previous page is calculated only when cursor is used (so, not on first page)
prev = s.collectFederationSharedModuleCursorValues(aux[0], sortColumns...)
}
// Point cursor to the last fetched element
if fetched > limit && limit > 0 {
next = s.collectFederationSharedModuleCursorValues(aux[limit-1], sortColumns...)
// we should use only as much as requested
set = append(set, aux[0:limit]...)
set = append(set, aux[:limit]...)
break
} else {
set = append(set, aux...)
@ -138,7 +148,7 @@ func (s Store) fetchFullPageOfFederationSharedModules(
// if limit is not set or we've already collected enough items
// we can break the loop right away
if limit == 0 || fetched == 0 || fetched < limit {
if limit == 0 || fetched == 0 || fetched <= limit {
break
}
@ -149,23 +159,23 @@ func (s Store) fetchFullPageOfFederationSharedModules(
}
// @todo improve strategy for collecting next page with lower limit
// Point cursor to the last fetched element
if cursor = s.collectFederationSharedModuleCursorValues(last, sort.Columns()...); cursor == nil {
break
}
}
if reversedCursor {
// Cursor for previous page was used
// Fetched set needs to be reverseCursor because we've forced a descending order to
// get the previous page
if reversedOrder {
// Fetched set needs to be reversed because we've forced a descending order to get the previous page
for i, j := 0, len(set)-1; i < j; i, j = i+1, j-1 {
set[i], set[j] = set[j], set[i]
}
// and flip prev/next cursors too
prev, next = next, prev
}
return set, nil
if prev != nil {
prev.Reverse = true
}
return set, prev, next, nil
}
// QueryFederationSharedModules queries the database, converts and checks each row and
@ -177,39 +187,35 @@ func (s Store) QueryFederationSharedModules(
ctx context.Context,
q squirrel.Sqlizer,
check func(*types.SharedModule) (bool, error),
) ([]*types.SharedModule, uint, *types.SharedModule, error) {
) ([]*types.SharedModule, error) {
var (
set = make([]*types.SharedModule, 0, DefaultSliceCapacity)
res *types.SharedModule
// Query rows with
rows, err = s.Query(ctx, q)
fetched uint
)
if err != nil {
return nil, 0, nil, err
return nil, err
}
defer rows.Close()
for rows.Next() {
fetched++
if err = rows.Err(); err == nil {
res, err = s.internalFederationSharedModuleRowScanner(rows)
}
if err != nil {
return nil, 0, nil, err
return nil, err
}
// If check function is set, call it and act accordingly
// check fn set, call it and see if it passed the test
// if not, skip the item
if check != nil {
if chk, err := check(res); err != nil {
return nil, 0, nil, err
return nil, err
} else if !chk {
// did not pass the check
// go with the next row
continue
}
}
@ -217,7 +223,7 @@ func (s Store) QueryFederationSharedModules(
set = append(set, res)
}
return set, fetched, res, rows.Err()
return set, rows.Err()
}
// LookupFederationSharedModuleByID searches for shared federation module by ID
@ -382,6 +388,9 @@ func (s Store) internalFederationSharedModuleRowScanner(row rowScanner) (res *ty
&res.Name,
&res.ExternalFederationModuleID,
&res.Fields,
&res.CreatedBy,
&res.UpdatedBy,
&res.DeletedBy,
&res.CreatedAt,
&res.UpdatedAt,
&res.DeletedAt,
@ -430,25 +439,23 @@ func (Store) federationSharedModuleColumns(aa ...string) []string {
alias + "name",
alias + "xref_module",
alias + "fields",
alias + "created_by",
alias + "updated_by",
alias + "deleted_by",
alias + "created_at",
alias + "updated_at",
alias + "deleted_at",
}
}
// {true true true true true}
// {true true false true true true}
// sortableFederationSharedModuleColumns returns all FederationSharedModule columns flagged as sortable
//
// With optional string arg, all columns are returned aliased
func (Store) sortableFederationSharedModuleColumns() map[string]string {
return map[string]string{
"id": "id", "created_at": "created_at",
"createdat": "created_at",
"updated_at": "updated_at",
"updatedat": "updated_at",
"deleted_at": "deleted_at",
"deletedat": "deleted_at",
"id": "id",
}
}
@ -464,6 +471,9 @@ func (s Store) internalFederationSharedModuleEncoder(res *types.SharedModule) st
"name": res.Name,
"xref_module": res.ExternalFederationModuleID,
"fields": res.Fields,
"created_by": res.CreatedBy,
"updated_by": res.UpdatedBy,
"deleted_by": res.DeletedBy,
"created_at": res.CreatedAt,
"updated_at": res.UpdatedAt,
"deleted_at": res.DeletedAt,
@ -496,14 +506,6 @@ func (s Store) collectFederationSharedModuleCursorValues(res *types.SharedModule
cursor.Set(c, res.ID, false)
pkId = true
case "created_at":
cursor.Set(c, res.CreatedAt, false)
case "updated_at":
cursor.Set(c, res.UpdatedAt, false)
case "deleted_at":
cursor.Set(c, res.DeletedAt, false)
}
}

View File

@ -503,6 +503,7 @@ func (Schema) FederationModuleShared() *Table {
ColumnDef("xref_module", ColumnTypeIdentifier),
ColumnDef("fields", ColumnTypeText),
CUDTimestamps,
CUDUsers,
)
}
@ -516,6 +517,7 @@ func (Schema) FederationModuleExposed() *Table {
ColumnDef("rel_compose_namespace", ColumnTypeIdentifier),
ColumnDef("fields", ColumnTypeText),
CUDTimestamps,
CUDUsers,
AddIndex("unique_node_compose_module", IColumn("rel_node", "rel_compose_module", "rel_compose_namespace")),
)