Properly implement model cache management on DAL RDMBS
This commit is contained in:
parent
b5267bcbf9
commit
ff69698027
@ -63,10 +63,10 @@ type (
|
||||
// CreateModel adds support for the given models to the underlying database
|
||||
//
|
||||
// The operation returns an error if any of the models already exists.
|
||||
CreateModel(context.Context, *Model, ...*Model) error
|
||||
CreateModel(context.Context, ...*Model) error
|
||||
|
||||
// DeleteModel removes support for the given model from the underlying database
|
||||
DeleteModel(context.Context, *Model, ...*Model) error
|
||||
DeleteModel(context.Context, ...*Model) error
|
||||
|
||||
// UpdateModel requests for metadata changes to the existing model
|
||||
//
|
||||
|
||||
@ -553,12 +553,19 @@ func (svc *service) ReplaceModel(ctx context.Context, model *Model) (err error)
|
||||
|
||||
// Add to connection
|
||||
connectionIssues := svc.hasConnectionIssues(model.ConnectionID)
|
||||
modelIssues := svc.hasModelIssues(model.ResourceID)
|
||||
if connectionIssues {
|
||||
log.Warn("not adding to connection due to connection issues")
|
||||
log.Warn(
|
||||
"not adding to connection due to connection issues",
|
||||
zap.Any("issues", svc.SearchConnectionIssues(model.ConnectionID)),
|
||||
)
|
||||
}
|
||||
|
||||
modelIssues := svc.hasModelIssues(model.ResourceID)
|
||||
if modelIssues {
|
||||
log.Warn("not adding to connection due to model issues")
|
||||
log.Warn(
|
||||
"not adding to connection due to model issues",
|
||||
zap.Any("issues", svc.SearchModelIssues(model.ResourceID)),
|
||||
)
|
||||
}
|
||||
|
||||
connection := svc.GetConnectionByID(model.ConnectionID)
|
||||
@ -868,7 +875,7 @@ func (svc *service) registerModelToConnection(ctx context.Context, cw *Connectio
|
||||
|
||||
available, err := cw.connection.Models(ctx)
|
||||
if err != nil {
|
||||
issues.addModelIssue(model.ResourceID, err)
|
||||
issues.addConnectionIssue(model.ConnectionID, err)
|
||||
return issues, nil
|
||||
}
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package dal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
@ -48,23 +49,17 @@ func Connection(db sqlx.ExtContext, dialect drivers.Dialect, cc ...dal.Operation
|
||||
//
|
||||
// This allows us to have same resource or ident on different res-types
|
||||
// For example: module's model for revisions has same resouce and ident but different type
|
||||
func (c *connection) model(m *dal.Model) *model {
|
||||
key := m.ResourceType + "|" + m.Resource + "|" + m.Ident
|
||||
if key == "" {
|
||||
panic("can not add model without a key (combo of resource type, resource and ident)")
|
||||
}
|
||||
|
||||
func (c *connection) withModel(m *dal.Model, fn func(m *model) error) error {
|
||||
var (
|
||||
key = cacheKey(m)
|
||||
)
|
||||
c.mux.RLock()
|
||||
if c.models[key] == nil {
|
||||
c.mux.RUnlock()
|
||||
c.mux.Lock()
|
||||
c.models[key] = Model(m, c.db, c.dialect)
|
||||
defer c.mux.Unlock()
|
||||
return c.models[key]
|
||||
defer c.mux.RUnlock()
|
||||
if cached, ok := c.models[cacheKey(m)]; ok {
|
||||
return fn(cached)
|
||||
}
|
||||
|
||||
defer c.mux.RUnlock()
|
||||
return c.models[key]
|
||||
return fmt.Errorf("model %q (%d) not loaded", key, m.ResourceID)
|
||||
}
|
||||
|
||||
func (c *connection) Operations() dal.OperationSet {
|
||||
@ -75,56 +70,118 @@ func (c *connection) Can(operations ...dal.Operation) bool {
|
||||
return c.operations.IsSuperset(operations...)
|
||||
}
|
||||
|
||||
func (c *connection) Create(ctx context.Context, m *dal.Model, rr ...dal.ValueGetter) error {
|
||||
return c.model(m).Create(ctx, rr...)
|
||||
func (c *connection) Create(ctx context.Context, m *dal.Model, rr ...dal.ValueGetter) (err error) {
|
||||
return c.withModel(m, func(m *model) error {
|
||||
return m.Create(ctx, rr...)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *connection) Update(ctx context.Context, m *dal.Model, r dal.ValueGetter) error {
|
||||
return c.model(m).Update(ctx, r)
|
||||
func (c *connection) Update(ctx context.Context, m *dal.Model, r dal.ValueGetter) (err error) {
|
||||
return c.withModel(m, func(m *model) error {
|
||||
return m.Update(ctx, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *connection) Lookup(ctx context.Context, m *dal.Model, pkv dal.ValueGetter, r dal.ValueSetter) error {
|
||||
return c.model(m).Lookup(ctx, pkv, r)
|
||||
func (c *connection) Lookup(ctx context.Context, m *dal.Model, pkv dal.ValueGetter, r dal.ValueSetter) (err error) {
|
||||
return c.withModel(m, func(m *model) error {
|
||||
return m.Lookup(ctx, pkv, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *connection) Search(ctx context.Context, m *dal.Model, f filter.Filter) (dal.Iterator, error) {
|
||||
return c.model(m).Search(f)
|
||||
func (c *connection) Search(ctx context.Context, m *dal.Model, f filter.Filter) (i dal.Iterator, _ error) {
|
||||
return i, c.withModel(m, func(m *model) (err error) {
|
||||
i, err = m.Search(f)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func (c *connection) Delete(ctx context.Context, m *dal.Model, pkv dal.ValueGetter) error {
|
||||
return c.model(m).Delete(ctx, pkv)
|
||||
func (c *connection) Delete(ctx context.Context, m *dal.Model, pkv dal.ValueGetter) (err error) {
|
||||
return c.withModel(m, func(m *model) error {
|
||||
return m.Delete(ctx, pkv)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *connection) Truncate(ctx context.Context, m *dal.Model) error {
|
||||
return c.model(m).Truncate(ctx)
|
||||
func (c *connection) Truncate(ctx context.Context, m *dal.Model) (err error) {
|
||||
return c.withModel(m, func(m *model) error {
|
||||
return m.Truncate(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *connection) Models(ctx context.Context) (dal.ModelSet, error) {
|
||||
//TODO implement me
|
||||
// not raising not-supported error
|
||||
// because we do not want to break
|
||||
// DAL service model adding procedure
|
||||
return nil, nil
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *connection) CreateModel(ctx context.Context, model *dal.Model, model2 ...*dal.Model) error {
|
||||
//TODO implement me
|
||||
// CreateModel checks/creates db tables in the database and catches the processed model
|
||||
//
|
||||
// @todo DDL operations
|
||||
func (c *connection) CreateModel(ctx context.Context, mm ...*dal.Model) error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
for _, m := range mm {
|
||||
// @todo check if table exists and if the structure matches the model
|
||||
// @todo check if table's columns are compatible with model's attributes
|
||||
// @todo if table does not exist, create it
|
||||
|
||||
// cache the model
|
||||
c.models[cacheKey(m)] = Model(m, c.db, c.dialect)
|
||||
}
|
||||
|
||||
return nil
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *connection) DeleteModel(ctx context.Context, model *dal.Model, model2 ...*dal.Model) error {
|
||||
//TODO implement me
|
||||
// DeleteModel removes db tables from the database and removes the processed model from cache
|
||||
//
|
||||
// @todo DDL operations
|
||||
// @todo some tables should not be removed (like compose_record on primary connection)
|
||||
func (c *connection) DeleteModel(ctx context.Context, mm ...*dal.Model) error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
for _, m := range mm {
|
||||
// @todo check if table exists and if it can be removed
|
||||
|
||||
// remove from cache
|
||||
delete(c.models, cacheKey(m))
|
||||
}
|
||||
|
||||
return nil
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
// UpdateModel alters db tables from the database and refreshes the processed model in the cache
|
||||
//
|
||||
// @todo DDL operations
|
||||
// @todo some tables should not be removed (like compose_record on primary connection)
|
||||
func (c *connection) UpdateModel(ctx context.Context, old *dal.Model, new *dal.Model) error {
|
||||
//TODO implement me
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
// remove from cache
|
||||
delete(c.models, cacheKey(old))
|
||||
|
||||
// @todo check if column exists and if it can be removed
|
||||
|
||||
// update the cache
|
||||
c.models[cacheKey(new)] = Model(new, c.db, c.dialect)
|
||||
return nil
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
// UpdateModelAttribute alters column on a db table and runs data transformations
|
||||
func (c *connection) UpdateModelAttribute(ctx context.Context, sch *dal.Model, old, new *dal.Attribute, trans ...dal.TransformationFunction) error {
|
||||
//TODO implement me
|
||||
// not raising not-supported error
|
||||
// because we do not want to break
|
||||
// DAL service model adding procedure
|
||||
|
||||
// @todo implement model column altering
|
||||
return nil
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func cacheKey(m *dal.Model) (key string) {
|
||||
key = m.ResourceType + "|" + m.Resource + "|" + m.Ident
|
||||
if key == "" {
|
||||
panic("can not add model without a key (combo of resource type, resource and ident)")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user