diff --git a/pkg/dal/diff.go b/pkg/dal/diff.go index d9817380b..814332756 100644 --- a/pkg/dal/diff.go +++ b/pkg/dal/diff.go @@ -5,8 +5,10 @@ type ( // ModelDiff defines one identified missmatch between two models ModelDiff struct { - Type modelDiffType + Type modelDiffType + // Original will be nil when a new attribute is being added Original *Attribute + // Asserted will be nil wen an existing attribute is being removed Asserted *Attribute } @@ -14,8 +16,9 @@ type ( ) const ( - AttributeMissing modelDiffType = "attributeMissing" - AttributeTypeMissmatch modelDiffType = "typeMissmatch" + AttributeMissing modelDiffType = "attributeMissing" + AttributeTypeMissmatch modelDiffType = "typeMissmatch" + AttributeSensitivityMissmatch modelDiffType = "sensitivityMissmatch" ) // Diff calculates the diff between models a and b where a is used as base @@ -34,6 +37,21 @@ func (a *Model) Diff(b *Model) (out ModelDiffSet) { } } + aIndex := make(map[string]struct { + found bool + attr *Attribute + }) + for _, _attr := range a.Attributes { + attr := _attr + aIndex[attr.Ident] = struct { + found bool + attr *Attribute + }{ + attr: attr, + } + } + + // Deleted and update ones for _, _attrA := range a.Attributes { attrA := _attrA @@ -55,6 +73,32 @@ func (a *Model) Diff(b *Model) (out ModelDiffSet) { Asserted: attrBAux.attr, }) } + + // Other stuff + // @todo improve; for now it'll do + if attrA.SensitivityLevel != attrBAux.attr.SensitivityLevel { + out = append(out, &ModelDiff{ + Type: AttributeSensitivityMissmatch, + Original: attrA, + Asserted: attrBAux.attr, + }) + } + } + + // New + for _, _attrB := range b.Attributes { + attrB := _attrB + + // Missmatches + _, ok := aIndex[attrB.Ident] + if !ok { + out = append(out, &ModelDiff{ + Type: AttributeMissing, + Original: nil, + Asserted: attrB, + }) + continue + } } return diff --git a/pkg/dal/driver.go b/pkg/dal/driver.go index b19ec4d72..ab2b55c4d 100644 --- a/pkg/dal/driver.go +++ b/pkg/dal/driver.go @@ -79,7 +79,7 @@ type ( // Specific operations require data transformations (type change). // Some basic ops. should be implemented on DB driver level, but greater controll can be // achieved via the trans functions. - UpdateModelAttribute(ctx context.Context, sch *Model, old Attribute, new Attribute, trans ...TransformationFunction) error + UpdateModelAttribute(ctx context.Context, sch *Model, old, new *Attribute, trans ...TransformationFunction) error } ConnectionCloser interface { diff --git a/pkg/dal/errors.go b/pkg/dal/errors.go index a53d4d891..a60f759da 100644 --- a/pkg/dal/errors.go +++ b/pkg/dal/errors.go @@ -1,44 +1,153 @@ package dal import ( - "github.com/cortezaproject/corteza-server/pkg/errors" - "github.com/cortezaproject/corteza-server/pkg/locale" + "fmt" ) -func errModelHigherSensitivity(model, connection string) error { - return errors.New( - errors.KindSensitiveData, +// Generic errors - "model sensitivity surpasses connection sensitivity", - - errors.Meta("type", "invalid sensitivity"), - - // Translation namespace & key - errors.Meta(locale.ErrorMetaNamespace{}, "internal"), - errors.Meta(locale.ErrorMetaKey{}, "dal.sensitivity.model-exceeds-connection"), - errors.Meta("model", model), - errors.Meta("connection", connection), - - errors.StackSkip(1), - errors.StackTrimAtFn("http.HandlerFunc.ServeHTTP"), - ) +func errModelNotFound(modelID uint64) error { + return fmt.Errorf("model %d does not exist", modelID) } -func errAttributeHigherSensitivity(model, attribute string) error { - return errors.New( - errors.KindSensitiveData, - - "attribute sensitivity surpasses model sensitivity", - - errors.Meta("type", "invalid sensitivity"), - - // Translation namespace & key - errors.Meta(locale.ErrorMetaNamespace{}, "internal"), - errors.Meta(locale.ErrorMetaKey{}, "dal.sensitivity.attribute-exceeds-model"), - errors.Meta("model", model), - errors.Meta("attribute", attribute), - - errors.StackSkip(1), - errors.StackTrimAtFn("http.HandlerFunc.ServeHTTP"), - ) +func errConnectionNotFound(connectionID uint64) error { + return fmt.Errorf("connection %d does not exist", connectionID) } + +// Connection errors +// - create +func errConnectionCreateMissingSensitivityLevel(connectionID, sensitivityLevelID uint64) error { + return fmt.Errorf("cannot create connection %d: sensitivity level does not exist %d", connectionID, sensitivityLevelID) +} +func errConnectionCreateConnectionFailed(connectionID uint64, err error) error { + return fmt.Errorf("cannot create connection %d: connection failed: %v", connectionID, err) +} +func errConnectionDeleteNotFound(connectionID uint64) error { + return fmt.Errorf("cannot delete connection %d: connection does not exist", connectionID) +} +func errConnectionDeleteCloserFailed(connectionID uint64, err error) error { + return fmt.Errorf("cannot delete connection %d: connection's driver failed to close: %v", connectionID, err) +} + +// - update +func errConnectionUpdateNotFound(connectionID uint64) error { + return fmt.Errorf("cannot update connection %d: connection does not exist", connectionID) +} +func errConnectionUpdateMissingSensitivityLevel(connectionID, sensitivityLevelID uint64) error { + return fmt.Errorf("cannot update connection %d: sensitivity level %d does not exist", connectionID, sensitivityLevelID) +} + +// Model errors +// - create +func errModelCreateProblematicConnection(connectionID, modelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: connection has issues", modelID, connectionID) +} +func errModelCreateMissingConnection(connectionID, modelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: connection does not exist", modelID, connectionID) +} +func errModelCreateDuplicate(connectionID, modelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: model already exists", modelID, connectionID) +} +func errModelCreateMissingSensitivityLevel(connectionID, modelID, sensitivityLevelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: sensitivity level %d does not exist", modelID, connectionID, sensitivityLevelID) +} +func errModelCreateGreaterSensitivityLevel(connectionID, modelID, modelSensitivityLevelID, connSensitivityLevelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: sensitivity level %d exceeds connection supported sensitivity level %d", modelID, connectionID, modelSensitivityLevelID, connSensitivityLevelID) +} +func errModelCreateMissingAttributeSensitivityLevel(connectionID, modelID, sensitivityLevelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: attribute sensitivity level %d does not exist", modelID, connectionID, sensitivityLevelID) +} +func errModelCreateGreaterAttributeSensitivityLevel(connectionID, modelID, attrSensitivityLevelID, modelSensitivityLevelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: attribute sensitivity level %d exceeds model supported sensitivity level %d", modelID, connectionID, attrSensitivityLevelID, modelSensitivityLevelID) +} +func errModelCreateConnectionModelUnsupported(connectionID, modelID uint64) error { + return fmt.Errorf("cannot create model %d on connection %d: model already exists for connection but is not compatible with provided definition", modelID, connectionID) +} + +// - update +func errModelUpdateProblematicConnection(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: connection has issues", modelID, connectionID) +} +func errModelUpdateMissingConnection(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: connection does not exist", modelID, connectionID) +} +func errModelUpdateConnectionModelUnsupported(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: model already exists for connection but is not compatible with provided definition", modelID, connectionID) +} +func errModelUpdateMissingOldModel(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: model does not exist", modelID, connectionID) +} +func errModelUpdateDuplicate(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: model already exists", modelID, connectionID) +} +func errModelUpdateConnectionMissmatch(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: cannot change model connection", modelID, connectionID) +} +func errModelUpdateMissingSensitivityLevel(connectionID, modelID, sensitivityLevelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: sensitivity level %d does not exist", modelID, connectionID, sensitivityLevelID) +} +func errModelUpdateGreaterSensitivityLevel(connectionID, modelID, modelSensitivityLevelID, connSensitivityLevelID uint64) error { + return fmt.Errorf("cannot update model %d on connection %d: sensitivity level %d exceeds connection supported sensitivity level %d", modelID, connectionID, modelSensitivityLevelID, connSensitivityLevelID) +} + +// Attribute errors +// - Update +func errAttributeUpdateProblematicConnection(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update attribute for model %d on connection %d: connection has issues", modelID, connectionID) +} +func errAttributeUpdateMissingModel(connectionID, modelID uint64) error { + return fmt.Errorf("cannot update attribute for model %d on connection %d: model does not exist", modelID, connectionID) +} +func errAttributeUpdateMissingSensitivityLevel(connectionID, modelID, sensitivityLevelID uint64) error { + return fmt.Errorf("cannot update attribute for model %d on connection %d: sensitivity level %d does not exist", modelID, connectionID, sensitivityLevelID) +} +func errAttributeUpdateGreaterSensitivityLevel(connectionID, modelID, attrSensitivityLevelID, modelSensitivityLevelID uint64) error { + return fmt.Errorf("cannot update attribute for model %d on connection %d: sensitivity level %d exceeds model supported sensitivity level %d", modelID, connectionID, attrSensitivityLevelID, modelSensitivityLevelID) +} + +// Record errors + +func errRecordOpProblematicConnection(connectionID uint64) error { + return fmt.Errorf("cannot perform record operation: connection %d has issues", connectionID) +} +func errRecordOpProblematicModel(modelID uint64) error { + return fmt.Errorf("cannot perform record operation: model %d has issues", modelID) +} + +// func errModelHigherSensitivity(model, connection string) error { +// return errors.New( +// errors.KindSensitiveData, + +// "model sensitivity surpasses connection sensitivity", + +// errors.Meta("type", "invalid sensitivity"), + +// // Translation namespace & key +// errors.Meta(locale.ErrorMetaNamespace{}, "internal"), +// errors.Meta(locale.ErrorMetaKey{}, "dal.sensitivity.model-exceeds-connection"), +// errors.Meta("model", model), +// errors.Meta("connection", connection), + +// errors.StackSkip(1), +// errors.StackTrimAtFn("http.HandlerFunc.ServeHTTP"), +// ) +// } + +// func errAttributeHigherSensitivity(model, attribute string) error { +// return errors.New( +// errors.KindSensitiveData, + +// "attribute sensitivity surpasses model sensitivity", + +// errors.Meta("type", "invalid sensitivity"), + +// // Translation namespace & key +// errors.Meta(locale.ErrorMetaNamespace{}, "internal"), +// errors.Meta(locale.ErrorMetaKey{}, "dal.sensitivity.attribute-exceeds-model"), +// errors.Meta("model", model), +// errors.Meta("attribute", attribute), + +// errors.StackSkip(1), +// errors.StackTrimAtFn("http.HandlerFunc.ServeHTTP"), +// ) +// } diff --git a/pkg/dal/issues.go b/pkg/dal/issues.go new file mode 100644 index 000000000..100b753a5 --- /dev/null +++ b/pkg/dal/issues.go @@ -0,0 +1,134 @@ +package dal + +type ( + issue struct { + kind issueKind + err error + } + issueSet []issue + + issueHelper struct { + // these two will be used to help clear out unneeded errors + connections []uint64 + models []uint64 + + connectionIssues dalIssueIndex + modelIssues dalIssueIndex + } + + issueKind string + dalIssueIndex map[uint64]issueSet +) + +const ( + connectionIssue issueKind = "connection" + modelIssue issueKind = "model" +) + +func newIssueHelper() *issueHelper { + return &issueHelper{ + connectionIssues: make(dalIssueIndex), + modelIssues: make(dalIssueIndex), + } +} + +func makeIssue(kind issueKind, err error) issue { + return issue{ + kind: kind, + err: err, + } +} + +func (svc *service) SearchConnectionIssues(connectionID uint64) (out []error) { + for _, issue := range svc.connectionIssues[connectionID] { + out = append(out, issue.err) + } + + return +} + +func (svc *service) SearchModelIssues(connectionID, resourceID uint64) (out []error) { + // @todo index by connection as well + // if _, ok := svc.modelIssues[connectionID]; !ok { + // return + // } + + for _, issue := range svc.modelIssues[resourceID] { + out = append(out, issue.err) + } + + return +} + +func (svc *service) hasConnectionIssues(connectionID uint64) bool { + return len(svc.SearchConnectionIssues(connectionID)) > 0 +} + +func (svc *service) hasModelIssues(connectionID, modelID uint64) bool { + return len(svc.SearchModelIssues(connectionID, modelID)) > 0 +} + +func (svc *service) updateIssues(issues *issueHelper) { + for _, connectionID := range issues.connections { + delete(svc.connectionIssues, connectionID) + } + for connectionID, issues := range issues.connectionIssues { + svc.connectionIssues[connectionID] = issues + } + + for _, modelID := range issues.models { + delete(svc.modelIssues, modelID) + } + for modelID, issues := range issues.modelIssues { + svc.modelIssues[modelID] = issues + } +} + +func (svc *service) clearModelIssues() { + svc.modelIssues = make(dalIssueIndex) +} + +func (rd *issueHelper) addConnection(connectionID uint64) *issueHelper { + rd.connections = append(rd.connections, connectionID) + return rd +} + +func (rd *issueHelper) addModel(modelID uint64) *issueHelper { + rd.models = append(rd.models, modelID) + return rd +} + +func (rd *issueHelper) addConnectionIssue(connectionID uint64, err error) { + rd.connectionIssues[connectionID] = append(rd.connectionIssues[connectionID], makeIssue(connectionIssue, err)) +} + +func (rd *issueHelper) addModelIssue(connectionID, resourceID uint64, err error) { + rd.modelIssues[resourceID] = append(rd.modelIssues[resourceID], makeIssue(modelIssue, err)) +} + +func (a *issueHelper) mergeWith(b *issueHelper) { + if b == nil { + return + } + + for connectionID, issues := range b.connectionIssues { + a.connectionIssues[connectionID] = append(a.connectionIssues[connectionID], issues...) + } + + for modelID, issues := range b.modelIssues { + a.modelIssues[modelID] = append(a.modelIssues[modelID], issues...) + } +} + +// Op check utils + +func (svc *service) canOpRecord(connectionID, modelID uint64) (err error) { + if svc.hasConnectionIssues(connectionID) { + return errRecordOpProblematicConnection(connectionID) + } + if svc.hasModelIssues(connectionID, modelID) { + return errRecordOpProblematicModel(modelID) + } + + return nil +} diff --git a/pkg/dal/model.go b/pkg/dal/model.go index 829665cbc..3ed802ada 100644 --- a/pkg/dal/model.go +++ b/pkg/dal/model.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/PaesslerAG/gval" + "github.com/cortezaproject/corteza-server/pkg/dal/capabilities" "github.com/cortezaproject/corteza-server/pkg/handle" "github.com/modern-go/reflect2" ) @@ -45,6 +46,8 @@ type ( SensitivityLevel uint64 Attributes AttributeSet + + Capabilities capabilities.Set } ModelSet []*Model @@ -110,20 +113,27 @@ func (a *Attribute) WithMultiValue() *Attribute { return a } -// FindByResource returns the model that matches the resource -func (mm ModelSet) FindByResource(resType string, resource string) *Model { +func (mm ModelSet) FindByResourceID(resourceID uint64) *Model { for _, m := range mm { - if m.ResourceType == resType && m.Resource == resource { + if m.ResourceID == resourceID { return m } } - return nil } -func (mm ModelSet) FindByID(id uint64) *Model { +func (mm ModelSet) FindByResourceIdent(resourceType, resourceIdent string) *Model { for _, m := range mm { - if m.ResourceID == id { + if m.ResourceType == resourceType && m.Resource == resourceIdent { + return m + } + } + return nil +} + +func (mm ModelSet) FindByIdent(ident string) *Model { + for _, m := range mm { + if m.Ident == ident { return m } } @@ -151,6 +161,17 @@ func (aa ModelSet) FilterByReferenced(b *Model) (out ModelSet) { return } +func (m Model) ToFilter() ModelFilter { + return ModelFilter{ + ConnectionID: m.ConnectionID, + + ResourceID: m.ResourceID, + + ResourceType: m.ResourceType, + Resource: m.Resource, + } +} + // HasAttribute returns true when the model includes the specified attribute func (m Model) HasAttribute(ident string) bool { return m.Attributes.FindByIdent(ident) != nil diff --git a/pkg/dal/sensitivity_level.go b/pkg/dal/sensitivity_level.go index 1bc70cd51..2fb44a2a0 100644 --- a/pkg/dal/sensitivity_level.go +++ b/pkg/dal/sensitivity_level.go @@ -4,6 +4,7 @@ type ( SensitivityLevel struct { Handle string ID uint64 + Level int } SensitivityLevelSet []SensitivityLevel @@ -15,11 +16,31 @@ type ( } ) +func SensitivityLevelIndex(levels ...SensitivityLevel) *sensitivityLevelIndex { + out := &sensitivityLevelIndex{ + set: make(SensitivityLevelSet, len(levels)), + byHandle: make(map[string]int), + byID: make(map[uint64]int), + } + + for i, l := range levels { + out.set[i] = l + out.byHandle[l.Handle] = i + out.byID[l.ID] = i + } + + return out +} + func (sli sensitivityLevelIndex) includes(l uint64) (ok bool) { if l == 0 { return true } + if sli.byID == nil { + return false + } + _, ok = sli.byID[l] return } diff --git a/pkg/dal/service.go b/pkg/dal/service.go index 38af8d458..0bb1a9b6f 100644 --- a/pkg/dal/service.go +++ b/pkg/dal/service.go @@ -34,7 +34,6 @@ type ( service struct { connections map[uint64]*connectionWrap - primary *connectionWrap primaryConnectionID uint64 // Indexed by corresponding storeID @@ -44,6 +43,9 @@ type ( inDev bool sensitivityLevels sensitivityLevelIndex + + connectionIssues dalIssueIndex + modelIssues dalIssueIndex } ) @@ -56,38 +58,52 @@ var ( ) // InitGlobalService initializes a fresh DAL where the given primary connection -func InitGlobalService(ctx context.Context, log *zap.Logger, inDev bool, connectionID uint64, cp ConnectionParams, cm ConnectionMeta, capabilities ...capabilities.Capability) (*service, error) { +func InitGlobalService(ctx context.Context, log *zap.Logger, inDev bool, connectionID uint64, cp ConnectionParams, cm ConnectionMeta, capabilities ...capabilities.Capability) (_ *service, err error) { + log.Debug("initializing DAL service with primary connection", zap.Any("connection params", cp)) + + // To help prevent awkward issues due to globally shared resources + if gSvc != nil { + panic("cannot initialize global DAL service: already initialized") + } + if gSvc == nil { - log.Debug("initializing DAL service with primary connection", zap.Any("connection params", cp)) - - gSvc = &service{ - connections: make(map[uint64]*connectionWrap), - models: make(map[uint64]ModelSet), - primary: nil, - primaryConnectionID: connectionID, - - logger: log, - inDev: inDev, - } - - var err error - cw := &connectionWrap{ - meta: cm, - sensitivityLevel: cm.SensitivityLevel, - label: cm.Label, - connectionID: connectionID, - } - cw.connection, err = connect(ctx, log, inDev, cp, capabilities...) - if err != nil { - return nil, err - } - - gSvc.primary = cw + gSvc, err = New(ctx, log, inDev, connectionID, cp, cm, capabilities...) + return gSvc, err } return gSvc, nil } +func New(ctx context.Context, log *zap.Logger, inDev bool, connectionID uint64, cp ConnectionParams, cm ConnectionMeta, capabilities ...capabilities.Capability) (*service, error) { + svc := &service{ + connections: make(map[uint64]*connectionWrap), + models: make(map[uint64]ModelSet), + primaryConnectionID: connectionID, + + logger: log, + inDev: inDev, + + connectionIssues: make(dalIssueIndex), + modelIssues: make(dalIssueIndex), + } + + var err error + cw := &connectionWrap{ + meta: cm, + sensitivityLevel: cm.SensitivityLevel, + label: cm.Label, + connectionID: connectionID, + } + cw.connection, err = connect(ctx, log, inDev, cp, capabilities...) + if err != nil { + return nil, err + } + + svc.connections[connectionID] = cw + + return svc, nil +} + // Service returns the global initialized DAL service // // If InitGlobalService has not yet been called the function will panic @@ -110,7 +126,7 @@ func (svc *service) Drivers() (drivers []Driver) { return } -func (svc *service) ReloadSensitivityLevels(levels SensitivityLevelSet) (err error) { +func (svc *service) ReloadSensitivityLevels(levels ...SensitivityLevel) (err error) { svc.logger.Debug("reloading sensitivity levels", zap.Any("sensitivity levels", levels)) newLevelIndex := svc.newSensitivityLevelIndex(levels) @@ -122,6 +138,54 @@ func (svc *service) ReloadSensitivityLevels(levels SensitivityLevelSet) (err err // Replace old ones svc.sensitivityLevels = newLevelIndex + svc.logger.Debug("reloaded sensitivity levels") + return +} + +func (svc *service) CreateSensitivityLevel(levels ...SensitivityLevel) (err error) { + svc.logger.Debug("creating sensitivity levels", zap.Any("sensitivity levels", levels)) + newIndex := svc.newAddedSensitivityLevelIndex(svc.sensitivityLevels, levels...) + + // Validate state after sensitivity level change + if err = svc.validateNewSensitivityLevels(newIndex); err != nil { + return + } + + // Replace old ones + svc.sensitivityLevels = newIndex + svc.logger.Debug("created sensitivity levels") + return +} + +func (svc *service) UpdateSensitivityLevel(levels ...SensitivityLevel) (err error) { + svc.logger.Debug("updating sensitivity levels", zap.Any("sensitivity levels", levels)) + newIndex := svc.newRemovedSensitivityLevelIndex(svc.sensitivityLevels, levels...) + newIndex = svc.newAddedSensitivityLevelIndex(newIndex, levels...) + + // Validate state after sensitivity level change + if err = svc.validateNewSensitivityLevels(newIndex); err != nil { + return + } + + // Replace old ones + svc.sensitivityLevels = newIndex + svc.logger.Debug("updated sensitivity levels") + return +} + +func (svc *service) DeleteSensitivityLevel(levels ...SensitivityLevel) (err error) { + svc.logger.Debug("deleting sensitivity levels", zap.Any("sensitivity levels", levels)) + newIndex := svc.newRemovedSensitivityLevelIndex(svc.sensitivityLevels, levels...) + + // Validate state after sensitivity level change + if err = svc.validateNewSensitivityLevels(newIndex); err != nil { + return + } + + // Replace old ones + svc.sensitivityLevels = newIndex + svc.logger.Debug("deleted sensitivity levels") + return } @@ -130,42 +194,54 @@ func (svc *service) ReloadSensitivityLevels(levels SensitivityLevelSet) (err err // // // // // // // // // // // // // // // // // // // // // // // // // // Connection management -// AddConnection adds a new connection to the DAL -func (svc *service) AddConnection(ctx context.Context, connectionID uint64, cp ConnectionParams, cm ConnectionMeta, capabilities ...capabilities.Capability) (err error) { - svc.logger.Debug("adding new connection", zap.Uint64("connectionID", connectionID), zap.Any("connection params", cp)) +// CreateConnection adds a new connection to the DAL +func (svc *service) CreateConnection(ctx context.Context, connectionID uint64, cp ConnectionParams, cm ConnectionMeta, capabilities ...capabilities.Capability) (err error) { + svc.logger.Debug("creating connection", zap.Uint64("connectionID", connectionID), zap.Any("connection params", cp)) + var ( + issues = newIssueHelper().addConnection(connectionID) + ) + defer svc.updateIssues(issues) + + // sensitivity levels + if !svc.sensitivityLevels.includes(cm.SensitivityLevel) { + issues.addConnectionIssue(connectionID, errConnectionCreateMissingSensitivityLevel(connectionID, cm.SensitivityLevel)) + } + + // Prepare connection bits cw := &connectionWrap{ connectionID: connectionID, meta: cm, sensitivityLevel: cm.SensitivityLevel, label: cm.Label, } + if cw.connection, err = connect(ctx, svc.logger, svc.inDev, cp, capabilities...); err != nil { + issues.addConnectionIssue(connectionID, errConnectionCreateConnectionFailed(connectionID, err)) + } - cw.connection, err = connect(ctx, svc.logger, svc.inDev, cp, capabilities...) - if err != nil { - return - } - if connectionID == DefaultConnectionID || connectionID == svc.primaryConnectionID { - svc.primary = cw - } else { - svc.connections[connectionID] = cw - } - return + svc.addConnection(cw) + + svc.logger.Debug("created connection") + return nil } -// RemoveConnection removes the given connection from the DAL -func (svc *service) RemoveConnection(ctx context.Context, connectionID uint64) (err error) { - svc.logger.Debug("removing connection", zap.Uint64("connectionID", connectionID)) +// DeleteConnection removes the given connection from the DAL +func (svc *service) DeleteConnection(ctx context.Context, connectionID uint64) (err error) { + svc.logger.Debug("deleting connection", zap.Uint64("connectionID", connectionID)) - c, _, err := svc.getConnection(ctx, connectionID) - if err != nil { - return fmt.Errorf("can not remove connection %d: %w", connectionID, err) + var ( + issues = newIssueHelper().addConnection(connectionID) + ) + + c := svc.getConnectionByID(connectionID) + if c == nil { + return errConnectionDeleteNotFound(connectionID) } // Potential cleanups if cc, ok := c.connection.(ConnectionCloser); ok { - if err = cc.Close(ctx); err != nil { - return err + if err := cc.Close(ctx); err != nil { + svc.logger.Error(errConnectionDeleteCloserFailed(c.connectionID, err).Error()) } } @@ -173,27 +249,92 @@ func (svc *service) RemoveConnection(ctx context.Context, connectionID uint64) ( // // @todo this is temporary until a proper update function is prepared. // The primary connection must not be removable! - if connectionID == DefaultConnectionID || connectionID == svc.primary.connectionID { - svc.primary = nil - } else { - delete(svc.connections, connectionID) - } + svc.removeConnection(connectionID) + + // Only if successful should we cleanup the issue registry + svc.updateIssues(issues) + + svc.logger.Debug("deleted connection") return nil } // UpdateConnection updates the given connection -// -// @todo make this better; for now remove + add func (svc *service) UpdateConnection(ctx context.Context, connectionID uint64, cp ConnectionParams, cm ConnectionMeta, capabilities ...capabilities.Capability) (err error) { svc.logger.Debug("updating connection", zap.Uint64("connectionID", connectionID)) - if err = svc.RemoveConnection(ctx, connectionID); err != nil { - return - } - // @todo check sensitivity level against modules + var ( + issues = newIssueHelper().addConnection(connectionID) + oldConn *connectionWrap + ) + defer svc.updateIssues(issues) - return svc.AddConnection(ctx, connectionID, cp, cm, capabilities...) + // Validation + { + // Check if connection exists + oldConn = svc.getConnectionByID(connectionID) + if oldConn == nil { + issues.addConnectionIssue(connectionID, errConnectionUpdateNotFound(connectionID)) + } + + // sensitivity levels + if !svc.sensitivityLevels.includes(cm.SensitivityLevel) { + issues.addConnectionIssue(connectionID, errConnectionUpdateMissingSensitivityLevel(connectionID, cm.SensitivityLevel)) + } + + // Check already registered models and their capabilities + // + // Defer the return till the end so we can get a nicer report of what all is wrong + errored := false + for _, model := range svc.models[connectionID] { + // - capabilities + if !model.Capabilities.IsSubset(capabilities...) { + issues.addConnectionIssue(connectionID, fmt.Errorf("cannot update connection %d: new connection does not support existing models", connectionID)) + errored = errored || true + } + // - sensitivity levels + if !svc.sensitivityLevels.isSubset(model.SensitivityLevel, cm.SensitivityLevel) { + issues.addConnectionIssue(connectionID, fmt.Errorf("cannot update connection %d: new connection sensitivity level does not support model %d", connectionID, model.ResourceID)) + errored = errored || true + } + } + + // Don't update if meta bits are not ok + if errored { + return + } + } + + // close old connection + { + if cc, ok := oldConn.connection.(ConnectionCloser); ok { + if err = cc.Close(ctx); err != nil { + issues.addConnectionIssue(connectionID, err) + return nil + } + } + svc.removeConnection(connectionID) + } + + // open new connection + { + newConnection, err := connect(ctx, svc.logger, svc.inDev, cp, capabilities...) + if err != nil { + issues.addConnectionIssue(connectionID, err) + } + + svc.addConnection(&connectionWrap{ + meta: cm, + sensitivityLevel: cm.SensitivityLevel, + label: cm.Label, + connectionID: connectionID, + connection: newConnection, + }) + } + + svc.logger.Debug("updated connection") + + return nil } // // // // // // // // // // // // // // // // // // // // // // // // // @@ -202,26 +343,46 @@ func (svc *service) UpdateConnection(ctx context.Context, connectionID uint64, c // DML func (svc *service) Create(ctx context.Context, mf ModelFilter, capabilities capabilities.Set, rr ...ValueGetter) (err error) { + if err = svc.canOpRecord(mf.ConnectionID, mf.ResourceID); err != nil { + return wrapError("cannot create record", err) + } + model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) if err != nil { - return + return wrapError("cannot create record", err) } return cw.connection.Create(ctx, model, rr...) } -func (svc *service) Update(ctx context.Context, mf ModelFilter, capabilities capabilities.Set, r ValueGetter) (err error) { - model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) - if err != nil { - return +func (svc *service) Update(ctx context.Context, mf ModelFilter, capabilities capabilities.Set, rr ...ValueGetter) (err error) { + if err = svc.canOpRecord(mf.ConnectionID, mf.ResourceID); err != nil { + return wrapError("cannot update record", err) } - return cw.connection.Update(ctx, model, r) + model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) + if err != nil { + return wrapError("cannot update record", err) + } + + for _, r := range rr { + if err = cw.connection.Update(ctx, model, r); err != nil { + return wrapError("cannot update record", err) + } + } + + return } func (svc *service) Search(ctx context.Context, mf ModelFilter, capabilities capabilities.Set, f filter.Filter) (iter Iterator, err error) { + if err = svc.canOpRecord(mf.ConnectionID, mf.ResourceID); err != nil { + err = wrapError("cannot search record", err) + return + } + model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) if err != nil { + err = wrapError("cannot search record", err) return } @@ -229,25 +390,43 @@ func (svc *service) Search(ctx context.Context, mf ModelFilter, capabilities cap } func (svc *service) Lookup(ctx context.Context, mf ModelFilter, capabilities capabilities.Set, lookup ValueGetter, dst ValueSetter) (err error) { + if err = svc.canOpRecord(mf.ConnectionID, mf.ResourceID); err != nil { + return wrapError("cannot lookup record", err) + } + model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) if err != nil { - return + return wrapError("cannot lookup record", err) } return cw.connection.Lookup(ctx, model, lookup, dst) } -func (svc *service) Delete(ctx context.Context, mf ModelFilter, capabilities capabilities.Set, pkv ValueGetter) (err error) { - model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) - if err != nil { - return +func (svc *service) Delete(ctx context.Context, mf ModelFilter, capabilities capabilities.Set, vv ...ValueGetter) (err error) { + if err = svc.canOpRecord(mf.ConnectionID, mf.ResourceID); err != nil { + return wrapError("cannot delete record", err) } - return cw.connection.Delete(ctx, model, pkv) -} -func (svc *service) Truncate(ctx context.Context, mf ModelFilter, capabilities capabilities.Set) (err error) { model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) if err != nil { - return + return wrapError("cannot delete record", err) + } + + for _, v := range vv { + if err = cw.connection.Delete(ctx, model, v); err != nil { + return wrapError("cannot delete record", err) + } + } + return +} + +func (svc *service) Truncate(ctx context.Context, mf ModelFilter, capabilities capabilities.Set) (err error) { + if err = svc.canOpRecord(mf.ConnectionID, mf.ResourceID); err != nil { + return wrapError("cannot truncate record", err) + } + + model, cw, err := svc.storeOpPrep(ctx, mf, capabilities) + if err != nil { + return wrapError("cannot truncate record", err) } return cw.connection.Truncate(ctx, model) @@ -256,11 +435,11 @@ func (svc *service) Truncate(ctx context.Context, mf ModelFilter, capabilities c func (svc *service) storeOpPrep(ctx context.Context, mf ModelFilter, capabilities capabilities.Set) (model *Model, cw *connectionWrap, err error) { model = svc.getModelByFilter(mf) if model == nil { - err = fmt.Errorf("model not found") + err = errModelNotFound(mf.ResourceID) return } - cw, _, err = svc.getConnection(ctx, model.ConnectionID, capabilities...) + cw, _, err = svc.getConnection(model.ConnectionID, capabilities...) if err != nil { return } @@ -275,22 +454,352 @@ func (svc *service) storeOpPrep(ctx context.Context, mf ModelFilter, capabilitie // ReloadModel unregister old models and register the new ones func (svc *service) ReloadModel(ctx context.Context, models ...*Model) (err error) { - svc.logger.Debug("reloading models") + svc.logger.Debug("reloading models", zap.Int("count", len(models))) // Clear up the old ones // @todo profile if manually removing nested pointers makes it faster svc.models = make(map[uint64]ModelSet) - return svc.AddModel(ctx, models...) + svc.clearModelIssues() + + err = svc.CreateModel(ctx, models...) + if err != nil { + return + } + + svc.logger.Debug("reloaded models") + + return +} + +func (svc *service) SearchModels(ctx context.Context) (out ModelSet, err error) { + out = make(ModelSet, 0, 100) + for _, models := range svc.models { + out = append(out, models...) + } + return +} + +// AddModel adds support for a new model +func (svc *service) CreateModel(ctx context.Context, models ...*Model) (err error) { + svc.logger.Debug("creating models", zap.Int("count", len(models))) + + var ( + issues = newIssueHelper() + auxIssues = newIssueHelper() + ) + defer svc.updateIssues(issues) + + // Validate models + for _, model := range models { + svc.logger.Debug("validating model", zap.Uint64("ID", model.ResourceID)) + issues.addModel(model.ResourceID) + + // Assure the connection has no issues + if svc.hasConnectionIssues(model.ConnectionID) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateProblematicConnection(model.ConnectionID, model.ResourceID)) + } + + // Check the connection exists + conn := svc.getConnectionByID(model.ConnectionID) + if conn == nil { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateMissingConnection(model.ConnectionID, model.ResourceID)) + } + + // Check if model for the given resource already exists + existing := svc.FindModelByResourceID(model.ConnectionID, model.ResourceID) + if existing != nil { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateDuplicate(model.ConnectionID, model.ResourceID)) + } + + // Check sensitivity levels + // - model + if !svc.sensitivityLevels.includes(model.SensitivityLevel) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateMissingSensitivityLevel(model.ConnectionID, model.ResourceID, model.SensitivityLevel)) + } else { + // Only check if it is present + if !svc.sensitivityLevels.isSubset(model.SensitivityLevel, conn.sensitivityLevel) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateGreaterSensitivityLevel(model.ConnectionID, model.ResourceID, model.SensitivityLevel, conn.sensitivityLevel)) + } + } + // - attributes + for _, attr := range model.Attributes { + if !svc.sensitivityLevels.includes(attr.SensitivityLevel) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateMissingAttributeSensitivityLevel(model.ConnectionID, model.ResourceID, attr.SensitivityLevel)) + } else { + if !svc.sensitivityLevels.isSubset(attr.SensitivityLevel, model.SensitivityLevel) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateGreaterAttributeSensitivityLevel(model.ConnectionID, model.ResourceID, attr.SensitivityLevel, model.SensitivityLevel)) + } + } + } + + svc.logger.Debug("validated model") + } + + // Add models to corresponding connections + for connection, models := range svc.modelByConnection(models) { + for _, model := range models { + svc.logger.Debug("adding model", zap.Uint64("model", model.ResourceID)) + + connectionIssues := svc.hasConnectionIssues(model.ConnectionID) + modelIssues := svc.hasModelIssues(model.ConnectionID, model.ResourceID) + + if !modelIssues && !connectionIssues { + svc.logger.Debug("adding model to connection", zap.Uint64("connection", model.ConnectionID), zap.Uint64("model", model.ResourceID)) + + // Add model to connection + auxIssues, err = svc.registerModelToConnection(ctx, connection, model) + issues.mergeWith(auxIssues) + if err != nil { + return + } + } else { + if connectionIssues { + svc.logger.Warn("not adding to connection due to connection issues", zap.Uint64("connection", model.ConnectionID)) + } + if modelIssues { + svc.logger.Warn("not adding to connection due to model issues", zap.Uint64("model", model.ResourceID)) + } + } + + // Add model to internal registry + svc.models[model.ConnectionID] = append(svc.models[model.ConnectionID], model) + } + } + + svc.logger.Debug("created models") + + return +} + +// DeleteModel removes support for the model and deletes it from the connection +func (svc *service) DeleteModel(ctx context.Context, models ...*Model) (err error) { + svc.logger.Debug("deleting models", zap.Int("count", len(models))) + + var ( + issues = newIssueHelper() + ) + defer svc.updateIssues(issues) + + // validation + skip := make(map[uint64]bool) + for _, model := range models { + issues.addModel(model.ResourceID) + + // Validate existence + old := svc.FindModelByResourceIdent(model.ConnectionID, model.ResourceType, model.Resource) + if old == nil { + skip[model.ResourceID] = true + continue + } + + // Validate no leftover references + // @todo we can probably expand on this quitea bit + // for _, registered := range svc.models { + // refs := registered.FilterByReferenced(model) + // if len(refs) > 0 { + // return fmt.Errorf("cannot remove model %s: referenced by other models", model.Resource) + // } + // } + } + + // Work + for _, model := range models { + svc.logger.Debug("deleting model", zap.Uint64("model", model.ResourceID)) + + if skip[model.ResourceID] { + svc.logger.Debug("model does not exist; skipping") + continue + } + + oldModels := svc.models[model.ConnectionID] + svc.models[model.ConnectionID] = make(ModelSet, 0, len(oldModels)) + for _, o := range oldModels { + if o.Resource == model.Resource { + continue + } + + svc.models[model.ConnectionID] = append(svc.models[model.ConnectionID], o) + } + + // @todo should the underlying store be notified about this? + // how should this be handled; a straight up delete doesn't sound sane to me + // anymore + + svc.logger.Debug("deleted model") + } + + return nil +} + +func (svc *service) UpdateModel(ctx context.Context, old *Model, new *Model) (err error) { + svc.logger.Debug("updating model", zap.Uint64("model", old.ResourceID)) + + var ( + conn *connectionWrap + + issues = newIssueHelper().addModel(old.ResourceID) + ) + defer svc.updateIssues(issues) + + // Validation + { + // Assure the connection has no issues + if svc.hasConnectionIssues(old.ConnectionID) { + issues.addModelIssue(old.ConnectionID, old.ResourceID, errModelUpdateProblematicConnection(old.ConnectionID, old.ResourceID)) + } + // Check the connection exists + conn = svc.getConnectionByID(old.ConnectionID) + if conn == nil { + issues.addModelIssue(old.ConnectionID, old.ResourceID, errModelUpdateMissingConnection(old.ConnectionID, old.ResourceID)) + } + + // Check if old one exists + if tmp := svc.FindModelByResourceID(old.ConnectionID, old.ResourceID); tmp == nil { + issues.addModelIssue(old.ConnectionID, old.ResourceID, errModelUpdateMissingOldModel(old.ConnectionID, old.ResourceID)) + } + + // Check if the new one can fit in + // - if ident changed, check if it's duplicated + if old.Ident != new.Ident { + if tmp := svc.FindModelByIdent(new.ConnectionID, new.Ident); tmp == nil { + issues.addModelIssue(old.ConnectionID, old.ResourceID, errModelUpdateDuplicate(new.ConnectionID, new.ResourceID)) + } + } + // - assure same connection + // @todo some migration between different connections + if old.ConnectionID != new.ConnectionID { + issues.addModelIssue(new.ConnectionID, new.ResourceID, errModelUpdateConnectionMissmatch(old.ConnectionID, old.ResourceID)) + } + + // Sensitivity levels + // - model + if !svc.sensitivityLevels.includes(new.SensitivityLevel) { + issues.addModelIssue(new.ConnectionID, new.ResourceID, errModelUpdateMissingSensitivityLevel(new.ConnectionID, new.ResourceID, new.SensitivityLevel)) + } else { + if !svc.sensitivityLevels.isSubset(new.SensitivityLevel, conn.sensitivityLevel) { + issues.addModelIssue(new.ConnectionID, new.ResourceID, errModelUpdateGreaterSensitivityLevel(new.ConnectionID, new.ResourceID, new.SensitivityLevel, conn.sensitivityLevel)) + } + } + + // @note attribute check should be done in update model attribute so it's omitted here + } + + // Update connection + connectionIssues := svc.hasConnectionIssues(new.ConnectionID) + modelIssues := svc.hasModelIssues(new.ConnectionID, new.ResourceID) + + if !modelIssues && !connectionIssues { + svc.logger.Debug("updating connection's model", zap.Uint64("connection", new.ConnectionID), zap.Uint64("model", new.ResourceID)) + + err = conn.connection.UpdateModel(ctx, old, new) + if err != nil { + issues.addModelIssue(new.ConnectionID, new.ResourceID, err) + } + } else { + if connectionIssues { + svc.logger.Warn("not updating connection's model due to connection issues", zap.Uint64("connection", new.ConnectionID)) + } + if modelIssues { + svc.logger.Warn("not updating connection's model due to model issues", zap.Uint64("model", new.ResourceID)) + } + } + + // Update registry + ok := false + for i, model := range svc.models[old.ConnectionID] { + if model.ResourceID == old.ResourceID { + svc.models[old.ConnectionID][i] = new + ok = true + break + } + } + if !ok { + svc.models[old.ConnectionID] = append(svc.models[old.ConnectionID], new) + } + + svc.logger.Debug("updated model") + + return +} + +func (svc *service) UpdateModelAttribute(ctx context.Context, model *Model, old, new *Attribute, trans ...TransformationFunction) (err error) { + svc.logger.Debug("updating model attribute", zap.Uint64("model", model.ResourceID)) + + var ( + conn *connectionWrap + issues = newIssueHelper().addModel(model.ResourceID) + ) + defer svc.updateIssues(issues) + + // Validation + { + // Connection issues + if svc.hasConnectionIssues(model.ConnectionID) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errAttributeUpdateProblematicConnection(model.ConnectionID, model.ResourceID)) + } + + // Check if it exists + model := svc.FindModelByResourceID(model.ConnectionID, model.ResourceID) + if model == nil { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errAttributeUpdateMissingModel(model.ConnectionID, model.ResourceID)) + } + + if !svc.sensitivityLevels.includes(new.SensitivityLevel) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errAttributeUpdateMissingSensitivityLevel(model.ConnectionID, model.ResourceID, new.SensitivityLevel)) + } else { + if !svc.sensitivityLevels.isSubset(new.SensitivityLevel, model.SensitivityLevel) { + issues.addModelIssue(model.ConnectionID, model.ResourceID, errAttributeUpdateGreaterSensitivityLevel(model.ConnectionID, model.ResourceID, new.SensitivityLevel, model.SensitivityLevel)) + } + } + + conn = svc.getConnectionByID(model.ConnectionID) + } + + // Update attribute + // Update connection + connectionIssues := svc.hasConnectionIssues(model.ConnectionID) + modelIssues := svc.hasModelIssues(model.ConnectionID, model.ResourceID) + + if !modelIssues && !connectionIssues { + svc.logger.Debug("updating model attribute", zap.Uint64("connection", model.ConnectionID), zap.Uint64("model", model.ResourceID)) + + err = conn.connection.UpdateModelAttribute(ctx, model, old, new, trans...) + if err != nil { + issues.addModelIssue(model.ConnectionID, model.ResourceID, err) + } + } else { + if connectionIssues { + svc.logger.Warn("not updating model attribute due to connection issues", zap.Uint64("connection", model.ConnectionID)) + } + if modelIssues { + svc.logger.Warn("not updating model attribute due to model issues", zap.Uint64("model", model.ResourceID)) + } + } + + // Update registry + ok := false + model = svc.FindModelByResourceID(model.ConnectionID, model.ResourceID) + for i, attribute := range model.Attributes { + if attribute.Ident == old.Ident { + model.Attributes[i] = new + ok = true + break + } + } + if !ok { + model.Attributes = append(model.Attributes, new) + } + + svc.logger.Debug("updated model attribute") + + return } func (svc *service) ModelIdentFormatter(connectionID uint64) (f *IdentFormatter, err error) { - c := svc.connections[connectionID] - if connectionID == 0 || connectionID == svc.primary.connectionID { - c = svc.primary - } + c := svc.getConnectionByID(connectionID) if c == nil { - err = fmt.Errorf("connection %d does not exist", connectionID) + err = errConnectionNotFound(connectionID) return } @@ -310,105 +819,33 @@ func (svc *service) ModelIdentFormatter(connectionID uint64) (f *IdentFormatter, return } -// AddModel adds support for a new model -func (svc *service) AddModel(ctx context.Context, models ...*Model) (err error) { - svc.logger.Debug("adding model", zap.Int("count", len(models))) - - var ( - cw *connectionWrap - ) - - for connectionID, models := range svc.modelByConnection(models) { - cw, _, err = svc.getConnection(ctx, connectionID) - if err != nil { - return err - } - - err = svc.registerModel(ctx, cw, connectionID, models) - if err != nil { - return - } - } - - return +func (svc *service) FindModelByResourceID(connectionID uint64, resourceID uint64) *Model { + return svc.models[connectionID].FindByResourceID(resourceID) } -// RemoveModel removes support for the given model -func (svc *service) RemoveModel(ctx context.Context, models ...*Model) (err error) { - svc.logger.Debug("removing models", zap.Int("count", len(models))) - - // validation - for _, model := range models { - svc.logger.Debug("removing model", zap.String("resource type", model.ResourceType), zap.String("resource model", model.Resource)) - - // Validate existence - old := svc.GetModelByResource(model.ConnectionID, model.ResourceType, model.Resource) - if old == nil { - return fmt.Errorf("cannot remove model %s: model not found", model.Resource) - } - - // Validate no leftover references - // @todo we can probably expand on this quitea bit - for _, registered := range svc.models { - refs := registered.FilterByReferenced(model) - if len(refs) > 0 { - return fmt.Errorf("cannot remove model %s: referenced by other models", model.Resource) - } - } - } - - // Work - for _, model := range models { - oldModels := svc.models[model.ConnectionID] - svc.models[model.ConnectionID] = make(ModelSet, 0, len(oldModels)) - for _, o := range oldModels { - if o.Resource == model.Resource { - continue - } - - svc.models[model.ConnectionID] = append(svc.models[model.ConnectionID], o) - } - - // @todo should the underlying store be notified about this? - } - - return nil +func (svc *service) FindModelByResourceIdent(connectionID uint64, resourceType, resourceIdent string) *Model { + return svc.models[connectionID].FindByResourceIdent(resourceType, resourceIdent) } -// DeleteModel removes support for the model and deletes it from the connection -// -// @todo do we really want this? -func (svc *service) DeleteModel(ctx context.Context, models ...*Model) (err error) { - panic("implement DeleteModel") -} - -func (svc *service) UpdateModel(ctx context.Context, old *Model, new *Model) error { - panic("implement UpdateModel") -} - -func (svc *service) UpdateModelAttribute(ctx context.Context, sch *Model, old Attribute, new Attribute, trans ...TransformationFunction) error { - panic("implement UpdateModelAttribute") +func (svc *service) FindModelByIdent(connectionID uint64, ident string) *Model { + return svc.models[connectionID].FindByIdent(ident) } // // // // // // // // // // // // // // // // // // // // // // // // // // Utilities -func (svc *service) GetModelByID(connectionID uint64, id uint64) *Model { - return svc.models[connectionID].FindByID(id) +func (svc *service) getConnectionByID(connectionID uint64) (cw *connectionWrap) { + if connectionID == DefaultConnectionID || connectionID == svc.primaryConnectionID { + return svc.connections[svc.primaryConnectionID] + } + + return svc.connections[connectionID] } -func (svc *service) GetModelByResource(connectionID uint64, resType string, resource string) *Model { - return svc.models[connectionID].FindByResource(resType, resource) -} - -func (svc *service) getConnection(ctx context.Context, connectionID uint64, cc ...capabilities.Capability) (cw *connectionWrap, can capabilities.Set, err error) { +func (svc *service) getConnection(connectionID uint64, cc ...capabilities.Capability) (cw *connectionWrap, can capabilities.Set, err error) { err = func() error { // get the requested connection - if connectionID == DefaultConnectionID || connectionID == svc.primary.connectionID { - cw = svc.primary - } else { - cw = svc.connections[connectionID] - } + cw = svc.getConnectionByID(connectionID) if cw == nil { return fmt.Errorf("connection %d does not exist", connectionID) } @@ -430,80 +867,95 @@ func (svc *service) getConnection(ctx context.Context, connectionID uint64, cc . } // modelByConnection maps the given models by their CRS -func (svc *service) modelByConnection(models ModelSet) (out map[uint64]ModelSet) { - out = make(map[uint64]ModelSet) +func (svc *service) modelByConnection(models ModelSet) (out map[*connectionWrap]ModelSet) { + out = make(map[*connectionWrap]ModelSet) for _, model := range models { - out[model.ConnectionID] = append(out[model.ConnectionID], model) + c := svc.getConnectionByID(model.ConnectionID) + out[c] = append(out[c], model) } return } -func (svc *service) registerModel(ctx context.Context, cw *connectionWrap, connectionID uint64, models ModelSet) (err error) { - for _, model := range models { - svc.logger.Debug("adding model for connection", zap.Uint64("connectionID", connectionID), zap.String("resource type", model.ResourceType), zap.String("resource model", model.Resource), zap.String("model ident", model.Ident)) +func (svc *service) registerModelToConnection(ctx context.Context, cw *connectionWrap, model *Model) (issues *issueHelper, err error) { + issues = newIssueHelper() - existing := svc.GetModelByResource(connectionID, model.ResourceType, model.Resource) - if existing != nil { - return fmt.Errorf("cannot add model %s to store %d: model already exists", model.Resource, connectionID) - } - - err = svc.registerModelToConnection(ctx, cw, model) - if err != nil { - return - } - - svc.models[connectionID] = append(svc.models[connectionID], model) - } - - return -} - -func (svc *service) registerModelToConnection(ctx context.Context, cw *connectionWrap, model *Model) (err error) { available, err := cw.connection.Models(ctx) if err != nil { - return err + issues.addModelIssue(model.ConnectionID, model.ResourceID, err) + return issues, nil } // Check if already in there - if existing := available.FindByResource(model.ResourceType, model.Resource); existing != nil { + if existing := available.FindByResourceIdent(model.ResourceType, model.Resource); existing != nil { // Assert validity diff := existing.Diff(model) if len(diff) > 0 { - return fmt.Errorf("cannot add model %d: model already exists for connection %d: models not compatible: %v", existing.ResourceID, cw.connectionID, diff) + issues.addModelIssue(model.ConnectionID, model.ResourceID, errModelCreateConnectionModelUnsupported(model.ConnectionID, model.ResourceID)) + return issues, nil } - return nil - } - - // Validate model against connection - { - if !svc.sensitivityLevels.isSubset(model.SensitivityLevel, cw.sensitivityLevel) { - return errModelHigherSensitivity(model.Label, cw.label) - } - - for _, attr := range model.Attributes { - if !svc.sensitivityLevels.isSubset(attr.SensitivityLevel, model.SensitivityLevel) { - return errAttributeHigherSensitivity(model.Label, attr.Label) - } - } + return } // Try to add to store err = cw.connection.CreateModel(ctx, model) if err != nil { - return + issues.addModelIssue(model.ConnectionID, model.ResourceID, err) + return issues, nil } - return nil + return nil, nil } func (svc *service) getModelByFilter(mf ModelFilter) *Model { if mf.ResourceID > 0 { - return svc.GetModelByID(mf.ConnectionID, mf.ResourceID) + return svc.FindModelByResourceID(mf.ConnectionID, mf.ResourceID) } - return svc.GetModelByResource(mf.ConnectionID, mf.ResourceType, mf.Resource) + return svc.FindModelByResourceIdent(mf.ConnectionID, mf.ResourceType, mf.Resource) +} + +func (svc *service) newAddedSensitivityLevelIndex(sli sensitivityLevelIndex, add ...SensitivityLevel) (out sensitivityLevelIndex) { + newLevels := make(SensitivityLevelSet, 0, len(sli.set)+len(add)) + + var ( + i = 0 + j = 0 + ) + + for i < len(sli.set) { + for j < len(add) { + if sli.set[i].Level <= add[j].Level { + newLevels = append(newLevels, sli.set[i]) + i++ + } + if sli.set[i].Level > add[j].Level { + newLevels = append(newLevels, add[j]) + j++ + } + } + } + + if j < len(add)-1 { + newLevels = append(newLevels, add[j:]...) + } + + return svc.newSensitivityLevelIndex(newLevels) +} + +func (svc *service) newRemovedSensitivityLevelIndex(sli sensitivityLevelIndex, remove ...SensitivityLevel) (out sensitivityLevelIndex) { + newLevels := make(SensitivityLevelSet, 0, len(sli.set)+len(remove)) + + removeSet := SensitivityLevelSet(remove) + + for _, l := range sli.set { + if !removeSet.includes(l.ID) { + newLevels = append(newLevels, l) + } + } + + return svc.newSensitivityLevelIndex(newLevels) } func (svc *service) newSensitivityLevelIndex(levels SensitivityLevelSet) (out sensitivityLevelIndex) { @@ -565,3 +1017,19 @@ func (svc *service) validateNewSensitivityLevels(levels sensitivityLevelIndex) ( } return } + +func (svc *service) removeConnection(connectionID uint64) { + if connectionID == DefaultConnectionID || connectionID == svc.primaryConnectionID { + connectionID = svc.primaryConnectionID + } + + delete(svc.connections, connectionID) +} + +func (svc *service) addConnection(cw *connectionWrap) { + svc.connections[cw.connectionID] = cw +} + +func wrapError(pfx string, err error) error { + return fmt.Errorf("%s: %v", pfx, err) +} diff --git a/store/adapters/rdbms/dal/connection.go b/store/adapters/rdbms/dal/connection.go index 4e95af08a..c8ac8df21 100644 --- a/store/adapters/rdbms/dal/connection.go +++ b/store/adapters/rdbms/dal/connection.go @@ -108,15 +108,18 @@ func (c *connection) CreateModel(ctx context.Context, model *dal.Model, model2 . func (c *connection) DeleteModel(ctx context.Context, model *dal.Model, model2 ...*dal.Model) error { //TODO implement me + return nil panic("implement me") } func (c *connection) UpdateModel(ctx context.Context, old *dal.Model, new *dal.Model) error { //TODO implement me + return nil panic("implement me") } -func (c *connection) UpdateModelAttribute(ctx context.Context, sch *dal.Model, old dal.Attribute, new dal.Attribute, trans ...dal.TransformationFunction) error { +func (c *connection) UpdateModelAttribute(ctx context.Context, sch *dal.Model, old, new *dal.Attribute, trans ...dal.TransformationFunction) error { //TODO implement me + return nil panic("implement me") } diff --git a/store/adapters/rdbms/drivers/postgres/dal.go b/store/adapters/rdbms/drivers/postgres/dal.go index 4dfe38113..2066aeb4e 100644 --- a/store/adapters/rdbms/drivers/postgres/dal.go +++ b/store/adapters/rdbms/drivers/postgres/dal.go @@ -29,5 +29,5 @@ func dalConnector(ctx context.Context, dsn string, cc ...capabilities.Capability return } - return rdbmsdal.Connection(db, Dialect()), nil + return rdbmsdal.Connection(db, Dialect(), cc...), nil } diff --git a/store/adapters/rdbms/drivers/sqlite/dal.go b/store/adapters/rdbms/drivers/sqlite/dal.go index 0615dcbf6..3c608e872 100644 --- a/store/adapters/rdbms/drivers/sqlite/dal.go +++ b/store/adapters/rdbms/drivers/sqlite/dal.go @@ -29,5 +29,5 @@ func dalConnector(ctx context.Context, dsn string, cc ...capabilities.Capability return } - return rdbmsdal.Connection(db, Dialect()), nil + return rdbmsdal.Connection(db, Dialect(), cc...), nil }