Base DAL service integration into Compose services
This commit is contained in:
parent
81dda274eb
commit
383b07d1d7
@ -20,6 +20,8 @@ import (
|
||||
"github.com/cortezaproject/corteza-server/pkg/apigw"
|
||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
"github.com/cortezaproject/corteza-server/pkg/corredor"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal/capabilities"
|
||||
"github.com/cortezaproject/corteza-server/pkg/eventbus"
|
||||
"github.com/cortezaproject/corteza-server/pkg/healthcheck"
|
||||
"github.com/cortezaproject/corteza-server/pkg/http"
|
||||
@ -53,6 +55,10 @@ const (
|
||||
bootLevelProvisioned
|
||||
bootLevelServicesInitialized
|
||||
bootLevelActivated
|
||||
|
||||
defaultComposeRecordTable = "compose_record"
|
||||
defaultComposeRecordValueCol = "values"
|
||||
defaultPartitionFormat = "compose_record_{{namespace}}_{{module}}"
|
||||
)
|
||||
|
||||
// Setup configures all required services
|
||||
@ -266,6 +272,26 @@ func (app *CortezaApp) InitServices(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
{
|
||||
// Init DAL and prepare default connection
|
||||
if _, err = dal.InitGlobalService(
|
||||
ctx,
|
||||
app.Log.Named("DAL"),
|
||||
app.Opt.Environment.IsDevelopment(),
|
||||
|
||||
// DB_DSN is the default connection with full capabilities
|
||||
app.Opt.DB.DSN,
|
||||
dal.ConnectionDefaults{
|
||||
ModelIdent: defaultComposeRecordTable,
|
||||
AttributeIdent: defaultComposeRecordValueCol,
|
||||
|
||||
PartitionFormat: defaultPartitionFormat,
|
||||
},
|
||||
capabilities.FullCapabilities()...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := app.Provision(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -28,6 +28,8 @@ type (
|
||||
eventbus eventDispatcher
|
||||
store store.Storer
|
||||
locale ResourceTranslationsManagerService
|
||||
|
||||
dal dalDDL
|
||||
}
|
||||
|
||||
moduleAccessController interface {
|
||||
@ -50,6 +52,9 @@ type (
|
||||
Create(ctx context.Context, module *types.Module) (*types.Module, error)
|
||||
Update(ctx context.Context, module *types.Module) (*types.Module, error)
|
||||
DeleteByID(ctx context.Context, namespaceID, moduleID uint64) error
|
||||
|
||||
// @note probably temporary just so tests are easier
|
||||
ReloadDALModels(ctx context.Context) error
|
||||
}
|
||||
|
||||
moduleUpdateHandler func(ctx context.Context, ns *types.Namespace, c *types.Module) (moduleChanges, error)
|
||||
@ -77,14 +82,17 @@ var (
|
||||
})
|
||||
)
|
||||
|
||||
func Module() *module {
|
||||
return &module{
|
||||
func Module(ctx context.Context, dal dalDDL) (*module, error) {
|
||||
svc := &module{
|
||||
ac: DefaultAccessControl,
|
||||
eventbus: eventbus.Service(),
|
||||
actionlog: DefaultActionlog,
|
||||
store: DefaultStore,
|
||||
locale: DefaultResourceTranslation,
|
||||
dal: dal,
|
||||
}
|
||||
|
||||
return svc, svc.ReloadDALModels(ctx)
|
||||
}
|
||||
|
||||
func (svc module) Find(ctx context.Context, filter types.ModuleFilter) (set types.ModuleSet, f types.ModuleFilter, err error) {
|
||||
@ -321,6 +329,10 @@ func (svc module) Create(ctx context.Context, new *types.Module) (*types.Module,
|
||||
return
|
||||
}
|
||||
|
||||
if err = svc.addModuleToDAL(ctx, ns, new); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_ = svc.eventbus.WaitFor(ctx, event.ModuleAfterCreate(new, nil, ns))
|
||||
return nil
|
||||
})
|
||||
|
||||
355
compose/service/module_dal.go
Normal file
355
compose/service/module_dal.go
Normal file
@ -0,0 +1,355 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/compose/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
"github.com/cortezaproject/corteza-server/pkg/handle"
|
||||
systemTypes "github.com/cortezaproject/corteza-server/system/types"
|
||||
)
|
||||
|
||||
type (
|
||||
dalDDL interface {
|
||||
ConnectionDefaults(ctx context.Context, connectionID uint64) (dft dal.ConnectionDefaults, err error)
|
||||
|
||||
ReloadModel(ctx context.Context, models ...*dal.Model) (err error)
|
||||
AddModel(ctx context.Context, models ...*dal.Model) (err error)
|
||||
RemoveModel(ctx context.Context, models ...*dal.Model) (err error)
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// https://www.rfc-editor.org/errata/eid1690
|
||||
emailLength = 254
|
||||
|
||||
// Generally the upper most limit
|
||||
urlLength = 2048
|
||||
|
||||
sysID = "ID"
|
||||
sysNamespaceID = "namespaceID"
|
||||
sysModuleID = "moduleID"
|
||||
sysCreatedAt = "createdAt"
|
||||
sysCreatedBy = "createdBy"
|
||||
sysUpdatedAt = "updatedAt"
|
||||
sysUpdatedBy = "updatedBy"
|
||||
sysDeletedAt = "deletedAt"
|
||||
sysDeletedBy = "deletedBy"
|
||||
sysOwnedBy = "ownedBy"
|
||||
|
||||
colSysID = "id"
|
||||
colSysNamespaceID = "rel_namespace"
|
||||
colSysModuleID = "module_id"
|
||||
colSysCreatedAt = "created_at"
|
||||
colSysCreatedBy = "created_by"
|
||||
colSysUpdatedAt = "updated_at"
|
||||
colSysUpdatedBy = "updated_by"
|
||||
colSysDeletedAt = "deleted_at"
|
||||
colSysDeletedBy = "deleted_by"
|
||||
colSysOwnedBy = "owned_by"
|
||||
)
|
||||
|
||||
// ReloadDALModels reconstructs the DAL's data model based on the store.Storer
|
||||
//
|
||||
// Directly using store so we don't spam the action log
|
||||
func (svc *module) ReloadDALModels(ctx context.Context) (err error) {
|
||||
var (
|
||||
namespaces types.NamespaceSet
|
||||
modules types.ModuleSet
|
||||
fields types.ModuleFieldSet
|
||||
models dal.ModelSet
|
||||
)
|
||||
|
||||
namespaces, _, err = svc.store.SearchComposeNamespaces(ctx, types.NamespaceFilter{})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var model *dal.Model
|
||||
for _, ns := range namespaces {
|
||||
modules, _, err = svc.store.SearchComposeModules(ctx, types.ModuleFilter{
|
||||
NamespaceID: ns.ID,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, mod := range modules {
|
||||
fields, _, err = svc.store.SearchComposeModuleFields(ctx, types.ModuleFieldFilter{
|
||||
ModuleID: []uint64{mod.ID},
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
mod.Fields = append(mod.Fields, fields...)
|
||||
|
||||
model, err = svc.moduleToModel(ctx, ns, mod)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
models = append(models, model)
|
||||
}
|
||||
}
|
||||
|
||||
return svc.dal.ReloadModel(ctx, models...)
|
||||
}
|
||||
|
||||
func (svc *module) moduleToModel(ctx context.Context, ns *types.Namespace, mod *types.Module) (*dal.Model, error) {
|
||||
ccfg, err := svc.dal.ConnectionDefaults(ctx, mod.DALConfig.ConnectionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getCodec := moduleFieldCodecBuilder(mod.DALConfig.Partitioned, ccfg)
|
||||
|
||||
// Metadata
|
||||
out := &dal.Model{
|
||||
ConnectionID: mod.DALConfig.ConnectionID,
|
||||
Ident: svc.formatPartitionIdent(ns, mod, ccfg),
|
||||
|
||||
Attributes: make(dal.AttributeSet, len(mod.Fields)),
|
||||
|
||||
ResourceID: mod.ID,
|
||||
ResourceType: types.ModuleResourceType,
|
||||
Resource: mod.RbacResource(),
|
||||
}
|
||||
|
||||
// Handle user-defined fields
|
||||
for i, f := range mod.Fields {
|
||||
out.Attributes[i], err = svc.moduleFieldToAttribute(getCodec, ns, mod, f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Handle system fields; either default or user defined
|
||||
if !mod.DALConfig.Partitioned {
|
||||
// When not partitioned the default system fields should be defined along side the `values` column
|
||||
out.Attributes = append(out.Attributes, svc.moduleModelDefaultSysAttributes(getCodec)...)
|
||||
} else {
|
||||
// When partitioned, we use store codec defined on the module
|
||||
out.Attributes = append(out.Attributes, svc.moduleModelSysAttributes(mod, getCodec)...)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (svc *module) moduleModelSysAttributes(mod *types.Module, getCodec func(f *types.ModuleField) dal.Codec) (out dal.AttributeSet) {
|
||||
if mod.SystemFields.ID != nil {
|
||||
out = append(out, dal.PrimaryAttribute(sysID, getCodec(&types.ModuleField{Name: sysID, Encoding: *mod.SystemFields.ID})))
|
||||
}
|
||||
|
||||
if mod.SystemFields.ModuleID != nil {
|
||||
out = append(out, dal.FullAttribute(sysModuleID, &dal.TypeID{}, getCodec(&types.ModuleField{Name: sysModuleID, Encoding: *mod.SystemFields.ModuleID})))
|
||||
}
|
||||
if mod.SystemFields.NamespaceID != nil {
|
||||
out = append(out, dal.FullAttribute(sysNamespaceID, &dal.TypeID{}, getCodec(&types.ModuleField{Name: sysNamespaceID, Encoding: *mod.SystemFields.NamespaceID})))
|
||||
}
|
||||
|
||||
if mod.SystemFields.OwnedBy != nil {
|
||||
out = append(out, dal.FullAttribute(sysOwnedBy, &dal.TypeID{}, getCodec(&types.ModuleField{Name: sysOwnedBy, Encoding: *mod.SystemFields.OwnedBy})))
|
||||
}
|
||||
|
||||
if mod.SystemFields.CreatedAt != nil {
|
||||
out = append(out, dal.FullAttribute(sysCreatedAt, &dal.TypeTimestamp{}, getCodec(&types.ModuleField{Name: sysCreatedAt, Encoding: *mod.SystemFields.CreatedAt})))
|
||||
}
|
||||
if mod.SystemFields.CreatedBy != nil {
|
||||
out = append(out, dal.FullAttribute(sysCreatedBy, &dal.TypeID{}, getCodec(&types.ModuleField{Name: sysCreatedBy, Encoding: *mod.SystemFields.CreatedBy})))
|
||||
}
|
||||
|
||||
if mod.SystemFields.UpdatedAt != nil {
|
||||
out = append(out, dal.FullAttribute(sysUpdatedAt, &dal.TypeTimestamp{}, getCodec(&types.ModuleField{Name: sysUpdatedAt, Encoding: *mod.SystemFields.UpdatedAt})))
|
||||
}
|
||||
if mod.SystemFields.UpdatedBy != nil {
|
||||
out = append(out, dal.FullAttribute(sysUpdatedBy, &dal.TypeID{}, getCodec(&types.ModuleField{Name: sysUpdatedBy, Encoding: *mod.SystemFields.UpdatedBy})))
|
||||
}
|
||||
|
||||
if mod.SystemFields.DeletedAt != nil {
|
||||
out = append(out, dal.FullAttribute(sysDeletedAt, &dal.TypeTimestamp{}, getCodec(&types.ModuleField{Name: sysDeletedAt, Encoding: *mod.SystemFields.DeletedAt})))
|
||||
}
|
||||
if mod.SystemFields.DeletedBy != nil {
|
||||
out = append(out, dal.FullAttribute(sysDeletedBy, &dal.TypeID{}, getCodec(&types.ModuleField{Name: sysDeletedBy, Encoding: *mod.SystemFields.DeletedBy})))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (svc *module) moduleModelDefaultSysAttributes(getCodec func(f *types.ModuleField) dal.Codec) dal.AttributeSet {
|
||||
return dal.AttributeSet{
|
||||
dal.PrimaryAttribute(sysID, &dal.CodecAlias{Ident: colSysID}),
|
||||
|
||||
dal.FullAttribute(sysModuleID, &dal.TypeID{}, &dal.CodecAlias{Ident: colSysModuleID}),
|
||||
dal.FullAttribute(sysNamespaceID, &dal.TypeID{}, &dal.CodecAlias{Ident: colSysNamespaceID}),
|
||||
|
||||
dal.FullAttribute(sysOwnedBy, &dal.TypeID{}, &dal.CodecAlias{Ident: colSysOwnedBy}),
|
||||
|
||||
dal.FullAttribute(sysCreatedAt, &dal.TypeTimestamp{}, &dal.CodecAlias{Ident: colSysCreatedAt}),
|
||||
dal.FullAttribute(sysCreatedBy, &dal.TypeID{}, &dal.CodecAlias{Ident: colSysCreatedBy}),
|
||||
|
||||
dal.FullAttribute(sysUpdatedAt, &dal.TypeTimestamp{}, &dal.CodecAlias{Ident: colSysUpdatedAt}),
|
||||
dal.FullAttribute(sysUpdatedBy, &dal.TypeID{}, &dal.CodecAlias{Ident: colSysUpdatedBy}),
|
||||
|
||||
dal.FullAttribute(sysDeletedAt, &dal.TypeTimestamp{}, &dal.CodecAlias{Ident: colSysDeletedAt}),
|
||||
dal.FullAttribute(sysDeletedBy, &dal.TypeID{}, &dal.CodecAlias{Ident: colSysDeletedBy}),
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *module) moduleFieldToAttribute(getCodec func(f *types.ModuleField) dal.Codec, ns *types.Namespace, mod *types.Module, f *types.ModuleField) (out *dal.Attribute, err error) {
|
||||
kind := f.Kind
|
||||
if kind == "" {
|
||||
kind = "String"
|
||||
}
|
||||
|
||||
switch strings.ToLower(kind) {
|
||||
case "bool":
|
||||
at := &dal.TypeBoolean{}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "datetime":
|
||||
switch {
|
||||
case f.IsDateOnly():
|
||||
at := &dal.TypeDate{}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case f.IsTimeOnly():
|
||||
at := &dal.TypeTime{}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
default:
|
||||
at := &dal.TypeTimestamp{}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
}
|
||||
case "email":
|
||||
at := &dal.TypeText{Length: emailLength}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "file":
|
||||
at := &dal.TypeRef{
|
||||
RefModel: &dal.Model{Resource: "corteza::system:attachment"},
|
||||
RefAttribute: &dal.Attribute{Ident: "id"},
|
||||
}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "number":
|
||||
at := &dal.TypeNumber{
|
||||
Precision: f.Options.Precision(),
|
||||
}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "record":
|
||||
at := &dal.TypeRef{
|
||||
RefModel: &dal.Model{
|
||||
ResourceID: f.Options.UInt64("moduleID"),
|
||||
ResourceType: types.ModuleResourceType,
|
||||
},
|
||||
RefAttribute: &dal.Attribute{
|
||||
Ident: "id",
|
||||
},
|
||||
}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "select":
|
||||
at := &dal.TypeEnum{
|
||||
Values: f.SelectOptions(),
|
||||
}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "string":
|
||||
at := &dal.TypeText{
|
||||
Length: 0,
|
||||
}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "url":
|
||||
at := &dal.TypeText{
|
||||
Length: urlLength,
|
||||
}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
case "user":
|
||||
at := &dal.TypeRef{
|
||||
RefModel: &dal.Model{
|
||||
ResourceType: systemTypes.UserResourceType,
|
||||
},
|
||||
RefAttribute: &dal.Attribute{
|
||||
Ident: "id",
|
||||
},
|
||||
}
|
||||
out = dal.FullAttribute(f.Name, at, getCodec(f))
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid field %s: kind %s not supported", f.Name, f.Kind)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (svc *module) formatPartitionIdent(ns *types.Namespace, mod *types.Module, cfg dal.ConnectionDefaults) string {
|
||||
if !mod.DALConfig.Partitioned {
|
||||
return cfg.ModelIdent
|
||||
}
|
||||
|
||||
pfmt := mod.DALConfig.PartitionFormat
|
||||
if pfmt == "" {
|
||||
pfmt = cfg.PartitionFormat
|
||||
}
|
||||
if pfmt == "" {
|
||||
// @todo put in config or something
|
||||
pfmt = "compose_record_{{namespace}}_{{module}}"
|
||||
}
|
||||
|
||||
// @note we must not use name here since it is translatable
|
||||
mh, _ := handle.Cast(nil, mod.Handle, strconv.FormatUint(mod.ID, 10))
|
||||
nsh, _ := handle.Cast(nil, ns.Slug, strconv.FormatUint(ns.ID, 10))
|
||||
rpl := strings.NewReplacer(
|
||||
"{{module}}", mh,
|
||||
"{{namespace}}", nsh,
|
||||
)
|
||||
|
||||
return rpl.Replace(pfmt)
|
||||
}
|
||||
|
||||
func moduleFieldCodecBuilder(partitioned bool, cfg dal.ConnectionDefaults) func(f *types.ModuleField) dal.Codec {
|
||||
return func(f *types.ModuleField) dal.Codec {
|
||||
return moduleFieldCodec(f, partitioned, cfg)
|
||||
}
|
||||
}
|
||||
|
||||
func moduleFieldCodec(f *types.ModuleField, partitioned bool, cfg dal.ConnectionDefaults) (strat dal.Codec) {
|
||||
if partitioned {
|
||||
strat = &dal.CodecPlain{}
|
||||
} else {
|
||||
ident := cfg.AttributeIdent
|
||||
if ident == "" {
|
||||
// @todo put in configs or something
|
||||
ident = "values"
|
||||
}
|
||||
|
||||
strat = &dal.CodecRecordValueSetJSON{
|
||||
Ident: ident,
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case f.Encoding.EncodingStrategyAlias != nil:
|
||||
strat = &dal.CodecAlias{
|
||||
Ident: f.Encoding.EncodingStrategyAlias.Ident,
|
||||
}
|
||||
case f.Encoding.EncodingStrategyJSON != nil:
|
||||
strat = &dal.CodecRecordValueSetJSON{
|
||||
Ident: f.Encoding.EncodingStrategyJSON.Ident,
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// // // // // // // // // // // // // // // // // // // // // // // // //
|
||||
// Utilities
|
||||
|
||||
func (svc *module) addModuleToDAL(ctx context.Context, ns *types.Namespace, mod *types.Module) (err error) {
|
||||
// Update DAL
|
||||
model, err := svc.moduleToModel(ctx, ns, mod)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if err = svc.dal.AddModel(ctx, model); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
@ -15,6 +15,8 @@ import (
|
||||
"github.com/cortezaproject/corteza-server/pkg/actionlog"
|
||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
"github.com/cortezaproject/corteza-server/pkg/corredor"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal/capabilities"
|
||||
"github.com/cortezaproject/corteza-server/pkg/envoy/resource"
|
||||
"github.com/cortezaproject/corteza-server/pkg/errors"
|
||||
"github.com/cortezaproject/corteza-server/pkg/eventbus"
|
||||
@ -31,6 +33,8 @@ const (
|
||||
|
||||
type (
|
||||
record struct {
|
||||
dal dalDML
|
||||
|
||||
actionlog actionlog.Recorder
|
||||
|
||||
ac recordAccessController
|
||||
@ -146,13 +150,14 @@ type (
|
||||
ErrorIndex map[string]int
|
||||
)
|
||||
|
||||
func Record() RecordService {
|
||||
func Record(dal dalDML) RecordService {
|
||||
svc := &record{
|
||||
actionlog: DefaultActionlog,
|
||||
ac: DefaultAccessControl,
|
||||
eventbus: eventbus.Service(),
|
||||
optEmitEvents: true,
|
||||
store: DefaultStore,
|
||||
dal: dal,
|
||||
|
||||
formatter: values.Formatter(),
|
||||
sanitizer: values.Sanitizer(),
|
||||
@ -252,7 +257,9 @@ func (svc record) lookup(ctx context.Context, namespaceID, moduleID uint64, look
|
||||
func (svc record) FindByID(ctx context.Context, namespaceID, moduleID, recordID uint64) (r *types.Record, err error) {
|
||||
return svc.lookup(ctx, namespaceID, moduleID, func(m *types.Module, props *recordActionProps) (*types.Record, error) {
|
||||
props.record.ID = recordID
|
||||
return store.LookupComposeRecordByID(ctx, svc.store, m, recordID)
|
||||
|
||||
out := svc.prepareRecordTarget(m)
|
||||
return out, svc.dal.Lookup(ctx, m.ModelFilter(), capabilities.LookupCapabilities(m.DALConfig.Capabilities...), dal.PKValues{"id": recordID}, out)
|
||||
})
|
||||
}
|
||||
|
||||
@ -318,7 +325,18 @@ func (svc record) Find(ctx context.Context, filter types.RecordFilter) (set type
|
||||
}
|
||||
}
|
||||
|
||||
set, f, err = store.SearchComposeRecords(ctx, svc.store, m, filter)
|
||||
dalFilter := filter.ToFilter()
|
||||
if m.DALConfig.Partitioned {
|
||||
dalFilter = filter.ToConstraintedFilter(m.DALConfig.Constraints)
|
||||
}
|
||||
|
||||
var iter dal.Iterator
|
||||
iter, err = svc.dal.Search(ctx, m.ModelFilter(), svc.recSearchCapabilities(m, filter), dalFilter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
set, f, err = svc.drainIterator(ctx, iter, filter, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -518,9 +536,10 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec
|
||||
return nil, RecordErrValueInput().Wrap(rve)
|
||||
}
|
||||
|
||||
err = store.Tx(ctx, svc.store, func(ctx context.Context, s store.Storer) error {
|
||||
return store.CreateComposeRecord(ctx, s, m, new)
|
||||
})
|
||||
err = svc.dal.Create(ctx, m.ModelFilter(), svc.recCreateCapabilities(m), svc.recToGetter(new)...)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
106
compose/service/record_dal.go
Normal file
106
compose/service/record_dal.go
Normal file
@ -0,0 +1,106 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/compose/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal/capabilities"
|
||||
"github.com/cortezaproject/corteza-server/pkg/filter"
|
||||
)
|
||||
|
||||
type (
|
||||
dalDML interface {
|
||||
Create(ctx context.Context, m dal.ModelFilter, capabilities capabilities.Set, vv ...dal.ValueGetter) error
|
||||
Search(ctx context.Context, m dal.ModelFilter, capabilities capabilities.Set, f filter.Filter) (dal.Iterator, error)
|
||||
Lookup(ctx context.Context, m dal.ModelFilter, capabilities capabilities.Set, lookup dal.ValueGetter, dst dal.ValueSetter) (err error)
|
||||
}
|
||||
)
|
||||
|
||||
func (svc *record) drainIterator(ctx context.Context, iter dal.Iterator, f types.RecordFilter, module *types.Module) (set types.RecordSet, outFilter types.RecordFilter, err error) {
|
||||
set = make(types.RecordSet, 0, f.Limit)
|
||||
|
||||
i := 0
|
||||
for iter.Next(ctx) {
|
||||
auxr := svc.prepareRecordTarget(module)
|
||||
if err = iter.Scan(auxr); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
set = append(set, auxr)
|
||||
|
||||
i++
|
||||
}
|
||||
err = iter.Err()
|
||||
|
||||
outFilter = f
|
||||
pp := f.Paging.Clone()
|
||||
|
||||
if len(set) > 0 && f.PrevPage != nil {
|
||||
pp.PrevPage, err = iter.BackCursor(set[0])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(set) > 0 {
|
||||
pp.NextPage, err = iter.ForwardCursor(set[len(set)-1])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
outFilter.Paging = *pp
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (svc *record) prepareRecordTarget(module *types.Module) *types.Record {
|
||||
// so we can avoid some code later involving (non)partitioned modules :seenoevil:
|
||||
return &types.Record{
|
||||
ModuleID: module.ID,
|
||||
NamespaceID: module.NamespaceID,
|
||||
Values: make(types.RecordValueSet, 0, len(module.Fields)),
|
||||
}
|
||||
}
|
||||
|
||||
func (svc *record) recToGetter(rr ...*types.Record) (out []dal.ValueGetter) {
|
||||
out = make([]dal.ValueGetter, len(rr))
|
||||
|
||||
for i := range rr {
|
||||
out[i] = rr[i]
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// recCreateCapabilities utility helps construct required creation capabilities
|
||||
func (svc *record) recCreateCapabilities(m *types.Module) (out capabilities.Set) {
|
||||
return capabilities.CreateCapabilities(m.DALConfig.Capabilities...)
|
||||
}
|
||||
|
||||
// recFilterCapabilities utility helps construct required filter capabilities based on the provided record filter
|
||||
func (svc *record) recFilterCapabilities(f types.RecordFilter) (out capabilities.Set) {
|
||||
if f.PageCursor != nil {
|
||||
out = append(out, capabilities.Paging)
|
||||
}
|
||||
|
||||
if f.IncPageNavigation {
|
||||
out = append(out, capabilities.Paging)
|
||||
}
|
||||
|
||||
if f.IncTotal {
|
||||
out = append(out, capabilities.Stats)
|
||||
}
|
||||
|
||||
if f.Sort != nil {
|
||||
out = append(out, capabilities.Sorting)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (svc *record) recSearchCapabilities(m *types.Module, f types.RecordFilter) (out capabilities.Set) {
|
||||
return capabilities.SearchCapabilities(m.DALConfig.Capabilities...).
|
||||
Union(svc.recFilterCapabilities(f))
|
||||
}
|
||||
@ -3,10 +3,12 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/cortezaproject/corteza-server/pkg/discovery"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
"github.com/cortezaproject/corteza-server/pkg/discovery"
|
||||
|
||||
automationService "github.com/cortezaproject/corteza-server/automation/service"
|
||||
"github.com/cortezaproject/corteza-server/compose/automation"
|
||||
"github.com/cortezaproject/corteza-server/compose/types"
|
||||
@ -34,7 +36,7 @@ type (
|
||||
|
||||
Config struct {
|
||||
ActionLog options.ActionLogOpt
|
||||
Discovery options.DiscoveryOpt
|
||||
Discovery options.DiscoveryOpt
|
||||
Storage options.ObjectStoreOpt
|
||||
UserFinder userFinder
|
||||
}
|
||||
@ -167,10 +169,12 @@ func Initialize(ctx context.Context, log *zap.Logger, s store.Storer, c Config)
|
||||
}
|
||||
|
||||
DefaultNamespace = Namespace()
|
||||
DefaultModule = Module()
|
||||
if DefaultModule, err = Module(ctx, dal.Service()); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
DefaultImportSession = ImportSession()
|
||||
DefaultRecord = Record()
|
||||
DefaultRecord = Record(dal.Service())
|
||||
DefaultPage = Page()
|
||||
DefaultChart = Chart()
|
||||
DefaultNotification = Notification(c.UserFinder)
|
||||
|
||||
@ -1,39 +0,0 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal/capabilities"
|
||||
)
|
||||
|
||||
type (
|
||||
DAL struct {
|
||||
ID uint64
|
||||
Handle string
|
||||
DSN string
|
||||
Location string
|
||||
|
||||
// ...
|
||||
|
||||
// @todo IMO having it like so (instead of in a struct) allows for more
|
||||
// flexibility with less data
|
||||
Enforced capabilities.Set
|
||||
Supported capabilities.Set
|
||||
Unsupported capabilities.Set
|
||||
Enabled capabilities.Set
|
||||
}
|
||||
)
|
||||
|
||||
// @note this conforms to the crs.crsDefiner interface
|
||||
|
||||
func (crs DAL) ComposeRecordStoreID() uint64 {
|
||||
return crs.ID
|
||||
}
|
||||
|
||||
func (crs DAL) StoreDSN() string {
|
||||
return crs.DSN
|
||||
}
|
||||
|
||||
func (crs DAL) Capabilities() capabilities.Set {
|
||||
return append(crs.Enforced, crs.Supported...)
|
||||
}
|
||||
|
||||
// ---
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
discovery "github.com/cortezaproject/corteza-server/discovery/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal/capabilities"
|
||||
"github.com/cortezaproject/corteza-server/pkg/filter"
|
||||
"github.com/cortezaproject/corteza-server/pkg/locale"
|
||||
@ -14,21 +15,25 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
DalDef struct {
|
||||
ComposeRecordStoreID uint64
|
||||
Capabilities capabilities.Set
|
||||
DALConfig struct {
|
||||
ConnectionID uint64 `json:"connectionID,string"`
|
||||
Capabilities capabilities.Set `json:"capabilities"`
|
||||
|
||||
Partitioned bool
|
||||
PartitionFormat string
|
||||
Constraints map[string][]any `json:"constraints"`
|
||||
|
||||
Partitioned bool `json:"partitioned"`
|
||||
PartitionFormat string `json:"partitionFormat"`
|
||||
}
|
||||
|
||||
Module struct {
|
||||
ID uint64 `json:"moduleID,string"`
|
||||
Handle string `json:"handle"`
|
||||
Meta types.JSONText `json:"meta"`
|
||||
Fields ModuleFieldSet `json:"fields"`
|
||||
|
||||
Store DalDef
|
||||
DALConfig DALConfig `json:"DALConfig"`
|
||||
|
||||
Fields ModuleFieldSet `json:"fields"`
|
||||
SystemFields SystemFieldSet `json:"systemFields"`
|
||||
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
|
||||
@ -44,6 +49,24 @@ type (
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
SystemFieldSet struct {
|
||||
ID *EncodingStrategy
|
||||
|
||||
ModuleID *EncodingStrategy
|
||||
NamespaceID *EncodingStrategy
|
||||
|
||||
OwnedBy *EncodingStrategy
|
||||
|
||||
CreatedAt *EncodingStrategy
|
||||
CreatedBy *EncodingStrategy
|
||||
|
||||
UpdatedAt *EncodingStrategy
|
||||
UpdatedBy *EncodingStrategy
|
||||
|
||||
DeletedAt *EncodingStrategy
|
||||
DeletedBy *EncodingStrategy
|
||||
}
|
||||
|
||||
ModuleMeta struct {
|
||||
Discovery discovery.ModuleMeta `json:"discovery"`
|
||||
}
|
||||
@ -88,6 +111,18 @@ func (m *Module) encodeTranslations() (out locale.ResourceTranslationSet) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Module) ModelFilter() dal.ModelFilter {
|
||||
return dal.ModelFilter{
|
||||
ConnectionID: m.DALConfig.ConnectionID,
|
||||
|
||||
ResourceID: m.ID,
|
||||
|
||||
ResourceType: ModuleResourceType,
|
||||
// @todo will use this for now but should probably change
|
||||
Resource: m.RbacResource(),
|
||||
}
|
||||
}
|
||||
|
||||
// FindByHandle finds module by it's handle
|
||||
func (set ModuleSet) FindByHandle(handle string) *Module {
|
||||
for i := range set {
|
||||
|
||||
@ -59,7 +59,6 @@ type (
|
||||
|
||||
EncodingStrategyJSON struct {
|
||||
Ident string `json:"ident"`
|
||||
Path []any `json:"path"`
|
||||
}
|
||||
|
||||
ModuleFieldFilter struct {
|
||||
|
||||
@ -76,6 +76,7 @@ type (
|
||||
|
||||
// wrapping struct for recordFilter that
|
||||
recordFilter struct {
|
||||
constraints map[string][]any
|
||||
RecordFilter
|
||||
}
|
||||
)
|
||||
@ -84,30 +85,41 @@ const (
|
||||
OperationTypeCreate OperationType = "create"
|
||||
OperationTypeUpdate OperationType = "update"
|
||||
OperationTypeDelete OperationType = "delete"
|
||||
|
||||
recordFieldID = "id"
|
||||
recordFieldModuleID = "moduleId"
|
||||
recordFieldNamespaceID = "namespaceId"
|
||||
)
|
||||
|
||||
// ToFilter wraps RecordFilter with struct that
|
||||
// imlements filter.Filter interface
|
||||
func (f RecordFilter) ToFilter() filter.Filter {
|
||||
return &recordFilter{f}
|
||||
}
|
||||
|
||||
func (f recordFilter) Constraints() map[string][]any {
|
||||
c := make(map[string][]any)
|
||||
|
||||
for _, id := range f.LabeledIDs {
|
||||
c["id"] = append(c["id"], id)
|
||||
c[recordFieldID] = append(c[recordFieldID], id)
|
||||
}
|
||||
|
||||
if f.ModuleID > 0 {
|
||||
c["moduleId"] = []any{f.ModuleID}
|
||||
c[recordFieldModuleID] = []any{f.ModuleID}
|
||||
}
|
||||
|
||||
if f.ModuleID > 0 {
|
||||
c["namespaceId"] = []any{f.ModuleID}
|
||||
if f.NamespaceID > 0 {
|
||||
c[recordFieldNamespaceID] = []any{f.NamespaceID}
|
||||
}
|
||||
|
||||
return c
|
||||
return f.ToConstraintedFilter(c)
|
||||
}
|
||||
|
||||
func (f RecordFilter) ToConstraintedFilter(c map[string][]any) filter.Filter {
|
||||
return &recordFilter{
|
||||
RecordFilter: f,
|
||||
constraints: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (f recordFilter) Constraints() map[string][]any {
|
||||
return f.constraints
|
||||
}
|
||||
|
||||
func (f recordFilter) Expression() string { return f.Query }
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/cortezaproject/corteza-server/federation/service"
|
||||
"github.com/cortezaproject/corteza-server/federation/types"
|
||||
"github.com/cortezaproject/corteza-server/pkg/auth"
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal"
|
||||
"github.com/cortezaproject/corteza-server/pkg/errors"
|
||||
"github.com/cortezaproject/corteza-server/pkg/federation"
|
||||
"github.com/cortezaproject/corteza-server/pkg/filter"
|
||||
@ -94,7 +95,8 @@ func (ctrl SyncData) ReadExposedAll(ctx context.Context, r *request.SyncDataRead
|
||||
}
|
||||
|
||||
// todo - handle error properly
|
||||
if list, _, err := (cs.Record()).Find(ctx, rf); err != nil || len(list) == 0 {
|
||||
// @todo !!!
|
||||
if list, _, err := (cs.Record(dal.Service())).Find(ctx, rf); err != nil || len(list) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -221,7 +223,8 @@ func (ctrl SyncData) readExposed(ctx context.Context, r *request.SyncDataReadExp
|
||||
return nil, err
|
||||
}
|
||||
|
||||
list, f, err := (cs.Record()).Find(ctx, f)
|
||||
// @todo !!!
|
||||
list, f, err := (cs.Record(dal.Service())).Find(ctx, f)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
40
system/types/connection.go
Normal file
40
system/types/connection.go
Normal file
@ -0,0 +1,40 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/dal/capabilities"
|
||||
)
|
||||
|
||||
type (
|
||||
Connection struct {
|
||||
ID uint64 `json:"id,string"`
|
||||
Handle string `json:"handle"`
|
||||
|
||||
DSN string `json:"dsn"`
|
||||
Location string `json:"location"`
|
||||
Ownership string `json:"ownership"`
|
||||
Sensitive bool `json:"sensitive"`
|
||||
|
||||
Config ConnectionConfig `json:"config"`
|
||||
Capabilities ConnectionCapabilities `json:"capabilities"`
|
||||
|
||||
CreatedAt time.Time `json:"createdAt,omitempty"`
|
||||
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
|
||||
DeletedAt *time.Time `json:"deletedAt,omitempty"`
|
||||
}
|
||||
|
||||
ConnectionCapabilities struct {
|
||||
Enforced capabilities.Set `json:"enforced"`
|
||||
Supported capabilities.Set `json:"supported"`
|
||||
Unsupported capabilities.Set `json:"unsupported"`
|
||||
Enabled capabilities.Set `json:"enabled"`
|
||||
}
|
||||
|
||||
ConnectionConfig struct {
|
||||
DefaultModelIdent string `json:"defaultModelIdent"`
|
||||
DefaultAttributeIdent string `json:"defaultAttributeIdent"`
|
||||
|
||||
DefaultPartitionFormat string `json:"defaultPartitionFormat"`
|
||||
}
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user