3
0

Rework compose record import/export

- Tweak importing to avoid record service
- Rework record export to use Envoy
This commit is contained in:
Tomaž Jerman 2021-02-24 15:56:48 +01:00
parent b98241460c
commit 398c9449cc
8 changed files with 392 additions and 934 deletions

View File

@ -1,122 +0,0 @@
package encoder
import (
syst "github.com/cortezaproject/corteza-server/system/types"
)
type (
multiple uint
field struct {
name string
encodeAllMulti bool
}
FlatWriter interface {
Write([]string) error
Flush()
}
StructuredEncoder interface {
Encode(interface{}) error
}
userFinder func(ID uint64) (*syst.User, error)
flatWriter struct {
w FlatWriter
ff []field
u userFinder
tz string
}
structuredEncoder struct {
w StructuredEncoder
ff []field
u userFinder
tz string
}
)
func Field(name string) field {
return field{name: name}
}
func MakeFields(nn ...string) []field {
ff := make([]field, len(nn))
for i := range nn {
ff[i] = field{name: nn[i]}
}
return ff
}
func preprocessHeader(hh []field, tz string) []field {
nhh := make([]field, 0)
// We need to prepare additional header fields for exporting
if tz != "" && tz != "UTC" {
for _, f := range hh {
switch f.name {
case "createdAt",
"updatedAt",
"deletedAt":
nhh = append(nhh, f, field{name: f.name + "_date"}, field{name: f.name + "_time"})
break
default:
nhh = append(nhh, f)
break
}
}
} else {
return hh
}
return nhh
}
func MultiValueField(name string) field {
return field{name: name, encodeAllMulti: true}
}
func NewFlatWriter(w FlatWriter, header bool, u userFinder, tz string, ff ...field) *flatWriter {
f := &flatWriter{
w: w,
ff: preprocessHeader(ff, tz),
u: u,
tz: tz,
}
if header {
f.writeHeader()
}
return f
}
func (enc flatWriter) Flush() {
enc.w.Flush()
}
func (enc flatWriter) writeHeader() {
ss := make([]string, len(enc.ff))
for i := range enc.ff {
ss[i] = enc.ff[i].name
}
_ = enc.w.Write(ss)
}
func NewStructuredEncoder(w StructuredEncoder, u userFinder, tz string, ff ...field) *structuredEncoder {
return &structuredEncoder{
w: w,
// No need for additional timezone headers, since the output is structured
ff: ff,
u: u,
tz: tz,
}
}
func (enc structuredEncoder) Flush() {
// noop
}

View File

@ -1,55 +0,0 @@
package encoder
import (
"io"
"github.com/360EntSecGroup-Skylar/excelize/v2"
)
type (
excelizeEncoder struct {
row int
f *excelize.File
w io.Writer
ff []field
u userFinder
tz string
}
)
func NewExcelizeEncoder(w io.Writer, header bool, u userFinder, tz string, ff ...field) *excelizeEncoder {
enc := &excelizeEncoder{
f: excelize.NewFile(),
w: w,
ff: preprocessHeader(ff, tz),
u: u,
tz: tz,
}
if header {
enc.writeHeader()
}
return enc
}
func (enc *excelizeEncoder) Flush() {
_ = enc.f.Write(enc.w)
}
// Returns current row + column to alphanumeric cell name
func (enc excelizeEncoder) pos(col int) string {
cn, _ := excelize.CoordinatesToCellName(col, enc.row)
return cn
}
func (enc excelizeEncoder) sheet() string {
return "Sheet1"
}
func (enc *excelizeEncoder) writeHeader() {
enc.row++
for p := range enc.ff {
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p+1), enc.ff[p].name)
}
}

View File

@ -1,316 +0,0 @@
package encoder
import (
"fmt"
"strconv"
"time"
"github.com/cortezaproject/corteza-server/compose/types"
)
type (
parsedTime struct {
field string
value string
}
)
// Time formatter
//
// Takes ptr to time.Time so we can conver both cases (value + ptr)
// The function also generates additional fields with included timezone.
func fmtTime(field string, tp *time.Time, tz string) (pt []*parsedTime, err error) {
if tp == nil {
return pt, nil
}
pt = append(pt, &parsedTime{
field: field,
value: tp.UTC().Format(time.RFC3339),
})
if tz == "" || tz == "UTC" {
return
}
loc, err := time.LoadLocation(tz)
if err != nil {
return pt, err
}
tt := tp.In(loc)
pt = append(pt,
&parsedTime{
field: field + "_date",
value: tt.Format("2006-01-02"),
}, &parsedTime{
field: field + "_time",
value: tt.Format("15:04:05"),
})
return
}
func fmtUint64(u uint64) string {
return strconv.FormatUint(u, 10)
}
func fmtSysUser(u uint64, finder userFinder) (string, error) {
if u <= 0 || finder == nil {
return fmtUint64(u), nil
}
su, err := finder(u)
if err != nil {
return "", err
}
switch true {
case su.Name != "":
return su.Name, nil
case su.Handle != "":
return su.Handle, nil
case su.Email != "":
return su.Email, nil
case su.Username != "":
return su.Username, nil
}
return fmt.Sprintf("%d", su.ID), nil
}
func (enc flatWriter) Record(r *types.Record) (err error) {
var out = make([]string, len(enc.ff))
procTime := func(d []string, pts []*parsedTime, base int) {
for i, p := range pts {
d[base+i] = p.value
}
}
for f, field := range enc.ff {
switch field.name {
case "recordID", "ID":
out[f] = fmtUint64(r.ID)
case "moduleID":
out[f] = fmtUint64(r.ModuleID)
case "namespaceID":
out[f] = fmtUint64(r.NamespaceID)
case "ownedBy":
out[f], err = fmtSysUser(r.OwnedBy, enc.u)
if err != nil {
return err
}
case "createdBy":
out[f], err = fmtSysUser(r.CreatedBy, enc.u)
if err != nil {
return err
}
case "createdAt":
tt, err := fmtTime("createdAt", &r.CreatedAt, enc.tz)
if err != nil {
return err
}
procTime(out, tt, f)
case "updatedBy":
out[f], err = fmtSysUser(r.UpdatedBy, enc.u)
if err != nil {
return err
}
case "updatedAt":
tt, err := fmtTime("updatedAt", r.UpdatedAt, enc.tz)
if err != nil {
return err
}
procTime(out, tt, f)
case "deletedBy":
out[f], err = fmtSysUser(r.DeletedBy, enc.u)
if err != nil {
return err
}
case "deletedAt":
tt, err := fmtTime("deletedAt", r.DeletedAt, enc.tz)
if err != nil {
return err
}
procTime(out, tt, f)
default:
vv := r.Values.FilterByName(field.name)
// @todo support for field.encodeAllMulti
if len(vv) > 0 {
out[f] = vv[0].Value
}
}
}
defer enc.w.Flush()
return enc.w.Write(out)
}
func (enc structuredEncoder) Record(r *types.Record) (err error) {
var (
// Exporter can choose fields so we need this buffer
// to hold just what we need
out = make(map[string]interface{})
vv types.RecordValueSet
c int
)
procTime := func(d map[string]interface{}, pts []*parsedTime) {
for _, p := range pts {
d[p.field] = p.value
}
}
for _, f := range enc.ff {
switch f.name {
case "recordID", "ID":
// encode all numbers as string (to prevent overflow of uint64 values)
out[f.name] = fmtUint64(r.ID)
case "moduleID":
// encode all numbers as string (to prevent overflow of uint64 values)
out[f.name] = fmtUint64(r.ModuleID)
case "namespaceID":
// encode all numbers as string (to prevent overflow of uint64 values)
out[f.name] = fmtUint64(r.NamespaceID)
case "ownedBy":
out[f.name], err = fmtSysUser(r.OwnedBy, enc.u)
if err != nil {
return err
}
case "createdBy":
out[f.name], err = fmtSysUser(r.CreatedBy, enc.u)
if err != nil {
return err
}
case "createdAt":
tt, err := fmtTime("createdAt", &r.CreatedAt, enc.tz)
if err != nil {
return err
}
procTime(out, tt)
case "updatedBy":
out[f.name], err = fmtSysUser(r.UpdatedBy, enc.u)
if err != nil {
return err
}
case "updatedAt":
if r.UpdatedAt == nil {
out[f.name] = nil
} else {
tt, err := fmtTime("updatedAt", r.UpdatedAt, enc.tz)
if err != nil {
return err
}
procTime(out, tt)
}
case "deletedBy":
out[f.name], err = fmtSysUser(r.DeletedBy, enc.u)
if err != nil {
return err
}
case "deletedAt":
if r.DeletedAt == nil {
out[f.name] = nil
} else {
tt, err := fmtTime("deletedAt", r.DeletedAt, enc.tz)
if err != nil {
return err
}
procTime(out, tt)
}
default:
vv = r.Values.FilterByName(f.name)
c = len(vv)
if c == 0 {
break
}
if c == 1 {
out[f.name] = vv[0].Value
} else {
multi := make([]string, c)
for n := range vv {
multi[n] = vv[n].Value
}
out[f.name] = multi
}
}
}
return enc.w.Encode(out)
}
func (enc *excelizeEncoder) Record(r *types.Record) (err error) {
enc.row++
var u string
procTime := func(pts []*parsedTime, base int) {
for i, p := range pts {
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(base+i), p.value)
}
}
for p, f := range enc.ff {
p++
switch f.name {
case "recordID", "ID":
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), fmtUint64(r.ID))
case "moduleID":
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), fmtUint64(r.ModuleID))
case "namespaceID":
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), fmtUint64(r.NamespaceID))
case "ownedBy":
u, err = fmtSysUser(r.OwnedBy, enc.u)
if err != nil {
return err
}
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u)
case "createdBy":
u, err = fmtSysUser(r.CreatedBy, enc.u)
if err != nil {
return err
}
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u)
case "createdAt":
tt, err := fmtTime("createdAt", &r.CreatedAt, enc.tz)
if err != nil {
return err
}
procTime(tt, p)
case "updatedBy":
u, err = fmtSysUser(r.UpdatedBy, enc.u)
if err != nil {
return err
}
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u)
case "updatedAt":
tt, err := fmtTime("updatedAt", r.UpdatedAt, enc.tz)
if err != nil {
return err
}
procTime(tt, p)
case "deletedBy":
u, err = fmtSysUser(r.DeletedBy, enc.u)
if err != nil {
return err
}
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), u)
case "deletedAt":
tt, err := fmtTime("deletedAt", r.DeletedAt, enc.tz)
if err != nil {
return err
}
procTime(tt, p)
default:
vv := r.Values.FilterByName(f.name)
if len(vv) > 0 {
_ = enc.f.SetCellStr(enc.sheet(), enc.pos(p), vv[0].Value)
}
}
}
return nil
}

View File

@ -1,102 +0,0 @@
package encoder
import (
"bytes"
"encoding/csv"
"encoding/json"
"testing"
"time"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/stretchr/testify/require"
)
func Test_RecordEncoding(t *testing.T) {
type args struct {
}
tests := []struct {
name string
ff []field
rr []*types.Record
flatResult string
structResult string
}{
{
name: "covering the basics",
ff: MakeFields("recordID", "ownedBy", "createdAt", "deletedAt", "some-foo-field", "foo", "fff"),
rr: []*types.Record{
&types.Record{
ID: 12345,
OwnedBy: 12345,
CreatedAt: time.Unix(1504976400, 0),
},
&types.Record{
ID: 54321,
OwnedBy: 12345,
CreatedAt: time.Unix(12345, 0),
Values: []*types.RecordValue{
{
Name: "foo",
Value: "bar",
},
{
Name: "fff",
Value: "1",
},
{
Name: "fff",
Value: "2",
},
},
},
},
flatResult: `recordID,ownedBy,createdAt,deletedAt,some-foo-field,foo,fff` + "\n" +
`12345,12345,2017-09-09T17:00:00Z,,,,` + "\n" +
`54321,12345,1970-01-01T03:25:45Z,,,bar,1` + "\n",
structResult: `{"createdAt":"2017-09-09T17:00:00Z","deletedAt":null,"ownedBy":"12345","recordID":"12345"}` + "\n" +
`{"createdAt":"1970-01-01T03:25:45Z","deletedAt":null,"fff":["1","2"],"foo":"bar","ownedBy":"12345","recordID":"54321"}` + "\n",
},
}
for _, tt := range tests {
t.Run(tt.name+" (csv)", func(t *testing.T) {
buf := bytes.NewBuffer([]byte{})
csvWriter := csv.NewWriter(buf)
fenc := NewFlatWriter(csvWriter, true, nil, "UTC", tt.ff...)
for _, r := range tt.rr {
if err := fenc.Record(r); err != nil {
t.Errorf("unexpected error = %v,", err)
}
}
csvWriter.Flush()
require.True(t,
buf.String() == tt.flatResult,
"Unexpected result: \n%s\n%s",
buf.String(),
tt.flatResult)
})
t.Run(tt.name+" (json)", func(t *testing.T) {
buf := bytes.NewBuffer([]byte{})
jsonEnc := json.NewEncoder(buf)
senc := NewStructuredEncoder(jsonEnc, nil, "UTC", tt.ff...)
for _, r := range tt.rr {
if err := senc.Record(r); err != nil {
t.Errorf("unexpected error = %v,", err)
}
}
require.True(t,
buf.String() == tt.structResult,
"Unexpected result: \n%s\n%s",
buf.String(),
tt.structResult)
})
}
}

View File

@ -2,24 +2,28 @@ package rest
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/cortezaproject/corteza-server/compose/encoder"
"github.com/cortezaproject/corteza-server/compose/rest/request"
"github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/api"
"github.com/cortezaproject/corteza-server/pkg/corredor"
"github.com/cortezaproject/corteza-server/pkg/envoy"
"github.com/cortezaproject/corteza-server/pkg/envoy/csv"
ejson "github.com/cortezaproject/corteza-server/pkg/envoy/json"
"github.com/cortezaproject/corteza-server/pkg/envoy/resource"
estore "github.com/cortezaproject/corteza-server/pkg/envoy/store"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/payload"
"github.com/cortezaproject/corteza-server/store"
systemService "github.com/cortezaproject/corteza-server/system/service"
systemTypes "github.com/cortezaproject/corteza-server/system/types"
)
type (
@ -316,8 +320,63 @@ func (ctrl *Record) ImportRun(ctx context.Context, r *request.RecordImportRun) (
ses.OnError = r.OnError
// Errors are presented in the session
ctrl.record.Import(ctx, ses)
return ses, nil
err = func() (err error) {
if ses.Progress.StartedAt != nil {
return fmt.Errorf("unable to start import: import session already active")
}
sa := time.Now()
ses.Progress.StartedAt = &sa
// Prepare additional metadata
tpl := resource.NewComposeRecordTemplate(
strconv.FormatUint(ses.ModuleID, 10),
strconv.FormatUint(ses.NamespaceID, 10),
ses.Name,
resource.MapToMappingTplSet(ses.Fields),
)
// Shape the data
ses.Resources = append(ses.Resources, tpl)
rt := resource.ComposeRecordShaper()
ses.Resources, err = resource.Shape(ses.Resources, rt)
// Build
cfg := &estore.EncoderConfig{
// For now the identifier is ignored, so this will never occur
OnExisting: resource.Skip,
Defer: func() {
ses.Progress.Completed++
},
}
if ses.OnError == service.IMPORT_ON_ERROR_SKIP {
cfg.DeferNok = func(err error) error {
ses.Progress.Failed++
ses.Progress.FailReason = err.Error()
return nil
}
}
se := estore.NewStoreEncoder(service.DefaultStore, cfg)
bld := envoy.NewBuilder(se)
g, err := bld.Build(ctx, ses.Resources...)
if err != nil {
return err
}
// Encode
err = envoy.Encode(ctx, g, se)
now := time.Now()
ses.Progress.FinishedAt = &now
if err != nil {
ses.Progress.FailReason = err.Error()
return err
}
return
}()
return ses, ctrl.record.RecordImport(ctx, err)
}
func (ctrl *Record) ImportProgress(ctx context.Context, r *request.RecordImportProgress) (interface{}, error) {
@ -331,31 +390,23 @@ func (ctrl *Record) ImportProgress(ctx context.Context, r *request.RecordImportP
}
func (ctrl *Record) Export(ctx context.Context, r *request.RecordExport) (interface{}, error) {
type (
// ad-hoc interface for our encoder
Encoder interface {
service.Encoder
Flush()
}
)
var (
err error
// Record encoder
recordEncoder Encoder
filename = fmt.Sprintf("; filename=%s.%s", r.Filename, r.Ext)
f = types.RecordFilter{
rf = &types.RecordFilter{
Query: r.Filter,
NamespaceID: r.NamespaceID,
ModuleID: r.ModuleID,
Query: r.Filter,
}
f = estore.NewDecodeFilter().
ComposeRecord(rf)
contentType string
)
// Access control.
// Access control
if _, err = ctrl.module.FindByID(ctx, r.NamespaceID, r.ModuleID); err != nil {
return nil, err
}
@ -365,47 +416,37 @@ func (ctrl *Record) Export(ctx context.Context, r *request.RecordExport) (interf
}
return func(w http.ResponseWriter, req *http.Request) {
ff := encoder.MakeFields(r.Fields...)
if len(ff) == 0 {
if len(r.Fields) == 0 {
http.Error(w, "no record value fields provided", http.StatusBadRequest)
}
// Custom user getter function for the underlying encoders.
//
// not the most optimal solution; we have no other means to do a proper preload of users
// @todo preload users
users := map[uint64]*systemTypes.User{}
uf := func(ID uint64) (*systemTypes.User, error) {
var err error
if _, exists := users[ID]; exists {
// nonexistent users are also cached!
return users[ID], nil
}
// @todo this "communication" between system and compose
// services is ad-hoc solution
users[ID], err = ctrl.userFinder.FindByID(ctx, ID)
if err != nil {
return nil, err
}
return users[ID], nil
fx := make(map[string]bool)
for _, f := range r.Fields {
fx[f] = true
}
sd := estore.Decoder()
nn, err := sd.Decode(ctx, service.DefaultStore, f)
if err != nil {
http.Error(w, fmt.Sprintf("failed to fetch records: %s", err.Error()), http.StatusBadRequest)
}
var encoder envoy.PrepareEncodeStreammer
switch strings.ToLower(r.Ext) {
case "json", "jsonl", "ldjson", "ndjson":
contentType = "application/jsonl"
recordEncoder = encoder.NewStructuredEncoder(json.NewEncoder(w), uf, r.Timezone, ff...)
encoder = ejson.NewBulkRecordEncoder(&ejson.EncoderConfig{
Fields: fx,
Timezone: r.Timezone,
})
case "csv":
contentType = "text/csv"
recordEncoder = encoder.NewFlatWriter(csv.NewWriter(w), true, uf, r.Timezone, ff...)
case "xlsx":
contentType = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
recordEncoder = encoder.NewExcelizeEncoder(w, true, uf, r.Timezone, ff...)
encoder = csv.NewBulkRecordEncoder(&csv.EncoderConfig{
Fields: fx,
Timezone: r.Timezone,
})
default:
http.Error(w, "unsupported format ("+r.Ext+")", http.StatusBadRequest)
@ -415,13 +456,28 @@ func (ctrl *Record) Export(ctx context.Context, r *request.RecordExport) (interf
w.Header().Add("Content-Type", contentType)
w.Header().Add("Content-Disposition", "attachment"+filename)
if err = ctrl.record.Export(ctx, f, recordEncoder); err != nil {
bld := envoy.NewBuilder(encoder)
g, err := bld.Build(ctx, nn...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
recordEncoder.Flush()
}, nil
err = envoy.Encode(ctx, g, encoder)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
ss := encoder.Stream()
// Find only the stream we are interested in
for _, s := range ss {
if s.Resource == resource.COMPOSE_RECORD_RESOURCE_TYPE {
io.Copy(w, s.Source)
}
}
err = ctrl.record.RecordExport(ctx, *rf)
}, err
}
func (ctrl Record) Exec(ctx context.Context, r *request.RecordExec) (interface{}, error) {

View File

@ -13,9 +13,7 @@ import (
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/corredor"
"github.com/cortezaproject/corteza-server/pkg/envoy"
"github.com/cortezaproject/corteza-server/pkg/envoy/resource"
estore "github.com/cortezaproject/corteza-server/pkg/envoy/store"
"github.com/cortezaproject/corteza-server/pkg/errors"
"github.com/cortezaproject/corteza-server/pkg/eventbus"
"github.com/cortezaproject/corteza-server/pkg/label"
@ -79,8 +77,8 @@ type (
Report(ctx context.Context, namespaceID, moduleID uint64, metrics, dimensions, filter string) (interface{}, error)
Find(ctx context.Context, filter types.RecordFilter) (set types.RecordSet, f types.RecordFilter, err error)
Export(context.Context, types.RecordFilter, Encoder) error
Import(context.Context, *recordImportSession) error
RecordExport(context.Context, types.RecordFilter) error
RecordImport(context.Context, error) error
Create(ctx context.Context, record *types.Record) (*types.Record, error)
Update(ctx context.Context, record *types.Record) (*types.Record, error)
@ -97,10 +95,6 @@ type (
EventEmitting(enable bool)
}
Encoder interface {
Record(*types.Record) error
}
recordImportSession struct {
Name string `json:"-"`
SessionID uint64 `json:"sessionID,string"`
@ -311,96 +305,13 @@ func (svc record) Find(ctx context.Context, filter types.RecordFilter) (set type
return set, f, svc.recordAction(ctx, aProps, RecordActionSearch, err)
}
func (svc record) Import(ctx context.Context, ses *recordImportSession) (err error) {
var (
aProps = &recordActionProps{}
)
err = func() (err error) {
if ses.Progress.StartedAt != nil {
return fmt.Errorf("unable to start import: import session already active")
}
sa := time.Now()
ses.Progress.StartedAt = &sa
// Prepare additional metadata
tpl := resource.NewComposeRecordTemplate(
strconv.FormatUint(ses.ModuleID, 10),
strconv.FormatUint(ses.NamespaceID, 10),
ses.Name,
resource.MapToMappingTplSet(ses.Fields),
)
// Shape the data
ses.Resources = append(ses.Resources, tpl)
rt := resource.ComposeRecordShaper()
ses.Resources, err = resource.Shape(ses.Resources, rt)
// Build
cfg := &estore.EncoderConfig{
// For now the identifier is ignored, so this will never occur
OnExisting: resource.Skip,
Defer: func() {
ses.Progress.Completed++
},
}
if ses.OnError == IMPORT_ON_ERROR_SKIP {
cfg.DeferNok = func(err error) error {
ses.Progress.Failed++
ses.Progress.FailReason = err.Error()
return nil
}
}
se := estore.NewStoreEncoder(svc.store, cfg)
bld := envoy.NewBuilder(se)
g, err := bld.Build(ctx, ses.Resources...)
if err != nil {
return err
}
// Encode
err = envoy.Encode(ctx, g, se)
ses.Progress.FinishedAt = now()
if err != nil {
ses.Progress.FailReason = err.Error()
ses.Progress.Failed++
return err
}
return
}()
return svc.recordAction(ctx, aProps, RecordActionImport, err)
func (svc record) RecordImport(ctx context.Context, err error) error {
return svc.recordAction(ctx, &recordActionProps{}, RecordActionImport, err)
}
// Export returns all records
//
// @todo better value handling
func (svc record) Export(ctx context.Context, f types.RecordFilter, enc Encoder) (err error) {
var (
aProps = &recordActionProps{filter: &f}
m *types.Module
set types.RecordSet
)
err = func() (err error) {
m, err = loadModule(ctx, svc.store, f.ModuleID)
if err != nil {
return err
}
set, _, err = store.SearchComposeRecords(ctx, svc.store, m, f)
if err != nil {
return err
}
return set.Walk(enc.Record)
}()
return svc.recordAction(ctx, aProps, RecordActionExport, err)
// RecordExport records that the export has occurred
func (svc record) RecordExport(ctx context.Context, f types.RecordFilter) (err error) {
return svc.recordAction(ctx, &recordActionProps{filter: &f}, RecordActionExport, err)
}
// Bulk handles provided set of bulk record operations.
@ -541,7 +452,7 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec
return nil, RecordErrNotAllowedToCreate()
}
if err = svc.generalValueSetValidation(m, new.Values); err != nil {
if err = RecordValueSanitazion(m, new.Values); err != nil {
return
}
@ -550,12 +461,10 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec
)
if svc.optEmitEvents {
// Handle input payload
if rve = svc.procCreate(ctx, svc.store, invokerID, m, new); !rve.IsValid() {
return nil, RecordErrValueInput().Wrap(rve)
}
new.Values = svc.formatter.Run(m, new.Values)
if err = svc.eventbus.WaitFor(ctx, event.RecordBeforeCreate(new, nil, m, ns, rve)); err != nil {
return
} else if !rve.IsValid() {
@ -563,8 +472,7 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec
}
}
// Assign defaults (only on missing values)
new.Values = svc.setDefaultValues(m, new.Values)
new.Values = RecordValueDefaults(m, new.Values)
// Handle payload from automation scripts
if rve = svc.procCreate(ctx, svc.store, invokerID, m, new); !rve.IsValid() {
@ -594,6 +502,166 @@ func (svc record) create(ctx context.Context, new *types.Record) (rec *types.Rec
return
}
// RecordValueSanitazion does basic field and format validation
//
// Received values must fit the data model: on unknown fields
// or multi/single value mismatch we return an error
//
// Record value errors is intentionally NOT used here; if input fails here
// we can assume that form builder (or whatever it was that assembled the record values)
// was misconfigured and will most likely failed to properly parse the
// record value errors payload too
func RecordValueSanitazion(m *types.Module, vv types.RecordValueSet) (err error) {
var (
aProps = &recordActionProps{}
numeric = regexp.MustCompile(`^[1-9](\d+)$`)
)
err = vv.Walk(func(v *types.RecordValue) error {
var field = m.Fields.FindByName(v.Name)
if field == nil {
return RecordErrFieldNotFound(aProps.setField(v.Name))
}
if field.IsRef() {
if v.Value == "" {
return nil
}
if !numeric.MatchString(v.Value) {
return RecordErrInvalidReferenceFormat(aProps.setField(v.Name).setValue(v.Value))
}
}
return nil
})
if err != nil {
return
}
// Make sure there are no multi values in a non-multi value fields
err = m.Fields.Walk(func(field *types.ModuleField) error {
if !field.Multi && len(vv.FilterByName(field.Name)) > 1 {
return RecordErrInvalidValueStructure(aProps.setField(field.Name))
}
return nil
})
if err != nil {
return
}
return
}
func RecordUpdateOwner(invokerID uint64, r, old *types.Record) *types.Record {
if old == nil {
if r.OwnedBy == 0 {
// If od owner is not set, make current user
// the owner of the record
r.OwnedBy = invokerID
}
} else {
if r.OwnedBy == 0 {
if old.OwnedBy > 0 {
// Owner not set/send in the payload
//
// Fallback to old owner (if set)
r.OwnedBy = old.OwnedBy
} else {
// If od owner is not set, make current user
// the owner of the record
r.OwnedBy = invokerID
}
}
}
return r
}
func RecordValueMerger(ctx context.Context, ac recordValueAccessController, m *types.Module, vv, old types.RecordValueSet) (types.RecordValueSet, *types.RecordValueErrorSet) {
if old != nil {
// Value merge process does not know anything about permissions so
// in case when new values are missing but do exist in the old set and their update/read is denied
// we need to copy them to ensure value merge process them correctly
for _, f := range m.Fields {
if len(vv.FilterByName(f.Name)) == 0 && !ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(f.Name)) {
// copy all fields from old to new
vv = append(vv, old.FilterByName(f.Name).GetClean()...)
}
}
// Merge new (updated) values with old ones
// This way we get list of updated, stale and deleted values
// that we can selectively update in the repository
vv = old.Merge(vv)
}
rve := &types.RecordValueErrorSet{}
_ = vv.Walk(func(v *types.RecordValue) error {
if v.IsUpdated() && !ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(v.Name)) {
rve.Push(types.RecordValueError{Kind: "updateDenied", Meta: map[string]interface{}{"field": v.Name, "value": v.Value}})
}
return nil
})
return vv, rve
}
func RecordPreparer(ctx context.Context, s store.Storer, ss recordValuesSanitizer, vv recordValuesValidator, ff recordValuesFormatter, m *types.Module, new *types.Record) *types.RecordValueErrorSet {
// Before values are processed further and
// sent to automation scripts (if any)
// we need to make sure it does not get un-sanitized data
new.Values = ss.Run(m, new.Values)
rve := &types.RecordValueErrorSet{}
values.Expression(ctx, m, new, nil, rve)
if !rve.IsValid() {
return rve
}
// Run validation of the updated records
rve = vv.Run(ctx, s, m, new)
if !rve.IsValid() {
return rve
}
// Cleanup the values
new.Values = new.Values.GetClean()
// Formatting
new.Values = ff.Run(m, new.Values)
return nil
}
func RecordValueDefaults(m *types.Module, vv types.RecordValueSet) (out types.RecordValueSet) {
out = vv
for _, f := range m.Fields {
if f.DefaultValue == nil {
continue
}
for i, dv := range f.DefaultValue {
// Default values on field are (might be) without field name and place
if !out.Has(f.Name, uint(i)) {
out = append(out, &types.RecordValue{
Name: f.Name,
Value: dv.Value,
Place: uint(i),
})
}
}
}
return
}
// Raw update function that is responsible for value validation, event dispatching
// and update.
func (svc record) update(ctx context.Context, upd *types.Record) (rec *types.Record, err error) {
@ -628,7 +696,7 @@ func (svc record) update(ctx context.Context, upd *types.Record) (rec *types.Rec
return nil, RecordErrStaleData()
}
if err = svc.generalValueSetValidation(m, upd.Values); err != nil {
if err = RecordValueSanitazion(m, upd.Values); err != nil {
return
}
@ -642,17 +710,6 @@ func (svc record) update(ctx context.Context, upd *types.Record) (rec *types.Rec
return nil, RecordErrValueInput().Wrap(rve)
}
// Before we pass values to record-before-update handling events
// values needs do be cleaned up
//
// Value merge inside procUpdate sets delete flag we need
// when changes are applied but we do not want deleted values
// to be sent to handler
upd.Values = upd.Values.GetClean()
// Before we pass values to automation scripts, they should be formatted
upd.Values = svc.formatter.Run(m, upd.Values)
// Scripts can (besides simple error value) return complex record value error set
// that is passed back to the UI or any other API consumer
//
@ -720,15 +777,9 @@ func (svc record) Create(ctx context.Context, new *types.Record) (rec *types.Rec
// of the creation procedure and after results are back from the automation scripts
//
// Both these points introduce external data that need to be checked fully in the same manner
func (svc record) procCreate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, new *types.Record) *types.RecordValueErrorSet {
// Mark all values as updated (new)
func (svc record) procCreate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, new *types.Record) (rve *types.RecordValueErrorSet) {
new.Values.SetUpdatedFlag(true)
// Before values are processed further and
// sent to automation scripts (if any)
// we need to make sure it does not get un-sanitized data
new.Values = svc.sanitizer.Run(m, new.Values)
// Reset values to new record
// to make sure nobody slips in something we do not want
new.ID = nextID()
@ -739,33 +790,13 @@ func (svc record) procCreate(ctx context.Context, s store.Storer, invokerID uint
new.DeletedAt = nil
new.DeletedBy = 0
if new.OwnedBy == 0 {
// If od owner is not set, make current user
// the owner of the record
new.OwnedBy = invokerID
}
rve := &types.RecordValueErrorSet{}
_ = new.Values.Walk(func(v *types.RecordValue) error {
if v.IsUpdated() && !svc.ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(v.Name)) {
rve.Push(types.RecordValueError{Kind: "updateDenied", Meta: map[string]interface{}{"field": v.Name, "value": v.Value}})
}
return nil
})
new = RecordUpdateOwner(invokerID, new, nil)
new.Values, rve = RecordValueMerger(ctx, svc.ac, m, new.Values, nil)
if !rve.IsValid() {
return rve
}
values.Expression(ctx, m, new, nil, rve)
if !rve.IsValid() {
return rve
}
// Run validation of the updated records
return svc.validator.Run(ctx, s, m, new)
rve = RecordPreparer(ctx, svc.store, svc.sanitizer, svc.validator, svc.formatter, m, new)
return rve
}
func (svc record) Update(ctx context.Context, upd *types.Record) (rec *types.Record, err error) {
@ -789,17 +820,10 @@ func (svc record) Update(ctx context.Context, upd *types.Record) (rec *types.Rec
// of the update procedure and after results are back from the automation scripts
//
// Both these points introduce external data that need to be checked fully in the same manner
func (svc record) procUpdate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, upd *types.Record, old *types.Record) *types.RecordValueErrorSet {
func (svc record) procUpdate(ctx context.Context, s store.Storer, invokerID uint64, m *types.Module, upd *types.Record, old *types.Record) (rve *types.RecordValueErrorSet) {
// Mark all values as updated (new)
upd.Values.SetUpdatedFlag(true)
// First sanitization
//
// Before values are merged with existing data and
// sent to automation scripts (if any)
// we need to make sure it does not get sanitized data
upd.Values = svc.sanitizer.Run(m, upd.Values)
// Copy values to updated record
// to make sure nobody slips in something we do not want
upd.CreatedAt = old.CreatedAt
@ -809,55 +833,13 @@ func (svc record) procUpdate(ctx context.Context, s store.Storer, invokerID uint
upd.DeletedAt = old.DeletedAt
upd.DeletedBy = old.DeletedBy
if upd.OwnedBy == 0 {
if old.OwnedBy > 0 {
// Owner not set/send in the payload
//
// Fallback to old owner (if set)
upd.OwnedBy = old.OwnedBy
} else {
// If od owner is not set, make current user
// the owner of the record
upd.OwnedBy = invokerID
}
}
// Value merge process does not know anything about permissions so
// in case when new values are missing but do exist in the old set and their update/read is denied
// we need to copy them to ensure value merge process them correctly
for _, f := range m.Fields {
if len(upd.Values.FilterByName(f.Name)) == 0 && !svc.ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(f.Name)) {
// copy all fields from old to new
upd.Values = append(upd.Values, old.Values.FilterByName(f.Name).GetClean()...)
}
}
// Merge new (updated) values with old ones
// This way we get list of updated, stale and deleted values
// that we can selectively update in the repository
upd.Values = old.Values.Merge(upd.Values)
rve := &types.RecordValueErrorSet{}
_ = upd.Values.Walk(func(v *types.RecordValue) error {
if v.IsUpdated() && !svc.ac.CanUpdateRecordValue(ctx, m.Fields.FindByName(v.Name)) {
rve.Push(types.RecordValueError{Kind: "updateDenied", Meta: map[string]interface{}{"field": v.Name, "value": v.Value}})
}
return nil
})
upd = RecordUpdateOwner(invokerID, upd, old)
upd.Values, rve = RecordValueMerger(ctx, svc.ac, m, upd.Values, old.Values)
if !rve.IsValid() {
return rve
}
values.Expression(ctx, m, upd, old, rve)
if !rve.IsValid() {
return rve
}
// Run validation of the updated records
return svc.validator.Run(ctx, s, m, upd)
rve = RecordPreparer(ctx, svc.store, svc.sanitizer, svc.validator, svc.formatter, m, upd)
return rve
}
func (svc record) recordInfoUpdate(ctx context.Context, r *types.Record) {
@ -1262,7 +1244,7 @@ func (svc record) Iterator(ctx context.Context, f types.RecordFilter, fn eventbu
recordableAction = RecordActionIteratorClone
// Assign defaults (only on missing values)
rec.Values = svc.setDefaultValues(m, rec.Values)
rec.Values = RecordValueDefaults(m, rec.Values)
// Handle payload from automation scripts
if rve := svc.procCreate(ctx, svc.store, invokerID, m, rec); !rve.IsValid() {
@ -1311,83 +1293,6 @@ func (svc record) Iterator(ctx context.Context, f types.RecordFilter, fn eventbu
}
func (svc record) setDefaultValues(m *types.Module, vv types.RecordValueSet) (out types.RecordValueSet) {
out = vv
for _, f := range m.Fields {
if f.DefaultValue == nil {
continue
}
for i, dv := range f.DefaultValue {
// Default values on field are (might be) without field name and place
if !out.Has(f.Name, uint(i)) {
out = append(out, &types.RecordValue{
Name: f.Name,
Value: dv.Value,
Place: uint(i),
})
}
}
}
return
}
// Does basic field and format validation
//
// Received values must fit the data model: on unknown fields
// or multi/single value mismatch we return an error
//
// Record value errors is intentionally NOT used here; if input fails here
// we can assume that form builder (or whatever it was that assembled the record values)
// was misconfigured and will most likely failed to properly parse the
// record value errors payload too
func (svc record) generalValueSetValidation(m *types.Module, vv types.RecordValueSet) (err error) {
var (
aProps = &recordActionProps{}
numeric = regexp.MustCompile(`^[1-9](\d+)$`)
)
err = vv.Walk(func(v *types.RecordValue) error {
var field = m.Fields.FindByName(v.Name)
if field == nil {
return RecordErrFieldNotFound(aProps.setField(v.Name))
}
if field.IsRef() {
if v.Value == "" {
return nil
}
if !numeric.MatchString(v.Value) {
return RecordErrInvalidReferenceFormat(aProps.setField(v.Name).setValue(v.Value))
}
}
return nil
})
if err != nil {
return
}
// Make sure there are no multi values in a non-multi value fields
err = m.Fields.Walk(func(field *types.ModuleField) error {
if !field.Multi && len(vv.FilterByName(field.Name)) > 1 {
return RecordErrInvalidValueStructure(aProps.setField(field.Name))
}
return nil
})
if err != nil {
return
}
return
}
// checks record-value-read access permissions for all module fields and removes unreadable fields from all records
func trimUnreadableRecordFields(ctx context.Context, ac recordValueAccessController, m *types.Module, rr ...*types.Record) {
var (

View File

@ -4,20 +4,21 @@ import (
"bytes"
"context"
"fmt"
"github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/id"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/tests/helpers"
"github.com/steinfletcher/apitest"
"github.com/steinfletcher/apitest-jsonpath"
"github.com/stretchr/testify/require"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
"testing"
"time"
"github.com/cortezaproject/corteza-server/compose/service"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/pkg/id"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/tests/helpers"
"github.com/steinfletcher/apitest"
jsonpath "github.com/steinfletcher/apitest-jsonpath"
"github.com/stretchr/testify/require"
)
func (h helper) clearRecords() {
@ -315,8 +316,10 @@ func TestRecordExport(t *testing.T) {
h.clearRecords()
module := h.repoMakeRecordModuleWithFields("record export module")
expected := "id,name\n"
for i := 0; i < 10; i++ {
h.makeRecord(module, &types.RecordValue{Name: "name", Value: fmt.Sprintf("d%d", i), Place: uint(i)})
r := h.makeRecord(module, &types.RecordValue{Name: "name", Value: fmt.Sprintf("d%d", i), Place: uint(i)})
expected += fmt.Sprintf("%d,d%d\n", r.ID, i)
}
// we'll not use standard asserts (AssertNoErrors) here,
@ -330,7 +333,7 @@ func TestRecordExport(t *testing.T) {
b, err := ioutil.ReadAll(r.Response.Body)
h.noError(err)
h.a.Equal("name\nd0\nd1\nd2\nd3\nd4\nd5\nd6\nd7\nd8\nd9\n", string(b))
h.a.Equal(expected, string(b))
}
func (h helper) apiInitRecordImport(api *apitest.APITest, url, f string, file []byte) *apitest.Response {
@ -408,6 +411,7 @@ func TestRecordImportInit_invalidFileFormat(t *testing.T) {
func TestRecordImportRun(t *testing.T) {
h := newHelper(t)
h.clearRecords()
h.allow(types.ModuleRBACResource.AppendWildcard(), "record.create")
module := h.repoMakeRecordModuleWithFields("record import run module")
tests := []struct {
@ -449,6 +453,74 @@ func TestRecordImportRun_sessionNotFound(t *testing.T) {
End()
}
func TestRecordImportRunForbidden(t *testing.T) {
h := newHelper(t)
h.clearRecords()
h.deny(types.ModuleRBACResource.AppendWildcard(), "record.create")
module := h.repoMakeRecordModuleWithFields("record import run module")
tests := []struct {
Name string
Content string
}{
{
Name: "f1.csv",
Content: "fname,femail\nv1,v2\n",
},
}
for _, test := range tests {
t.Run(t.Name(), func(t *testing.T) {
url := fmt.Sprintf("/namespace/%d/module/%d/record/import", module.NamespaceID, module.ID)
rsp := &rImportSession{}
api := h.apiInit()
r := h.apiInitRecordImport(api, url, test.Name, []byte(test.Content)).End()
r.JSON(rsp)
h.apiRunRecordImport(api, fmt.Sprintf("%s/%s", url, rsp.Response.SessionID), `{"fields":{"fname":"name","femail":"email"},"onError":"fail"}`).
Assert(helpers.AssertErrorP("not allowed to create records for module")).
End()
})
}
}
func TestRecordImportRunForbidden_field(t *testing.T) {
h := newHelper(t)
h.clearRecords()
h.allow(types.ModuleRBACResource.AppendWildcard(), "record.create")
module := h.repoMakeRecordModuleWithFields("record import run module")
f := module.Fields.FindByName("name")
h.deny(types.ModuleFieldRBACResource.AppendID(f.ID), "record.value.update")
tests := []struct {
Name string
Content string
}{
{
Name: "f1.csv",
Content: "fname,femail\nv1,v2\n",
},
}
for _, test := range tests {
t.Run(t.Name(), func(t *testing.T) {
url := fmt.Sprintf("/namespace/%d/module/%d/record/import", module.NamespaceID, module.ID)
rsp := &rImportSession{}
api := h.apiInit()
r := h.apiInitRecordImport(api, url, test.Name, []byte(test.Content)).End()
r.JSON(rsp)
h.apiRunRecordImport(api, fmt.Sprintf("%s/%s", url, rsp.Response.SessionID), `{"fields":{"fname":"name","femail":"email"},"onError":"fail"}`).
Assert(helpers.AssertErrorP("1 issue(s) found")).
End()
})
}
}
func TestRecordImportImportProgress(t *testing.T) {
h := newHelper(t)
h.clearRecords()

View File

@ -151,3 +151,23 @@ func AssertBody(expected string) assertFn {
return nil
}
}
// AssertErrorP checks if the expected error is part of the error messsage
func AssertErrorP(expectedError string) assertFn {
return func(rsp *http.Response, _ *http.Request) (err error) {
tmp := StdErrorResponse{}
if err = DecodeBody(rsp, &tmp); err != nil {
return err
}
if tmp.Error.Message == "" {
return errors.Errorf("No error, expecting error with: %v", expectedError)
}
if !strings.Contains(tmp.Error.Message, expectedError) {
return errors.Errorf("Expecting error with %v, got: %v", expectedError, tmp.Error.Message)
}
return nil
}
}